前言 犹豫公司的流式计算,并没有用类似于 Hadoop 的 mapreduce 机制或者 storm 或者 flink,是我们自研基于 erlang 的单节点服务,其优点就是:部署和迁移都十分简单,并且犹豫 erlang 的天然的良好的利用了多核 CPU 的优势,可以实现效率较高的大数据流式计算。但是由于其单机性,导致对单台机器的要求过于苛刻,并且不能进行扩展机器提高计算能力是其致命的缺点,所以目前我规划利用 golang,写一个支持分布式并行计算的服务,在此之前,了解了各大流式计算的基本思想,并且结合 golang 语言的特性,找到了一个叫glow
的服务,想要写好一个分布式流式计算的服务,我们先来看看 glow
有什么好的借鉴的思想和思路。
源码分析 我们要记得这 5 个内容,这是构成整个 flow 的核心名词
上下文 Context
步进 Step
任务 Task
数据集 Dataset
数据分片 DatasetShard
上下文(Context) 上下文有 4 个属性,其中 2 个问数组
Id int
Steps []flow.Step
Datasets []flow.Dataset
ChannelBufferSize int
步进(Step) & 任务(Task) 步进(Step) Step 有 6 个属性
Id int
Name string
Inputs []flow.Dataset (每一步的来源结果集)
Output flow.Dataset (每一个需要输出的结果集)
Function function (每一步操作接口提供的用户自定义业务逻辑)
Tasks []flow.Task (任务数基于上一步中的 Output 中 Task 中 Outputs 的数据分区数量)
任务(Task) Task 有 5 个属性
Id int
Inputs []flow.DatasetShard
Outputs []flow.DatasetShard (输出结果集的分区,各个分区处于平等关系)
Step flow.Step (所属哪一步的任务)
InputChans []reflect.Value
一个任务 Inputs 等于一个上一步中的 Output 中 Task 中的 Outputs 的数量
数据集(Dataset) & 数据粉分片(DatasetShard) 数据集(Dataset) Dataset 有 10 个属性
Id int (Step 输出的数据结集)
context flow.FlowContext
Type reflect.Type | *reflect.rtype
Shards []flow.DatasetShard (对应 Step 中 Tasks 中 Outputs 的数据分区)
Step flow.Step (属于哪一个结果集)
ReadingSteps []flow.Step (对应下一步的 Step)
ExternalInputChans []reflect.Value
ExternalOutputChans []reflect.Value
IsKeyPartitioned bool
isKeyLocalSorted bool
数据集分片(DatasetShard) DatasetShard 有 9 个属性
Id int
Parent flow.Dataset (所属的结果集)
WriteChan reflect.Value
ReadingTasks []flow.Task (Step 上有几个 Tasks 就有几个)
Counter int
ReadyTime time.Time
CloseTime time.Time
lock sync.RWMutex
readingChans []reflect.Value
word_count 和其他流式计算一样,提供了一个单词统计的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 flow.New().TextFile( "/etc/passwd" , 2 , ).Filter(func (line string ) bool { return !strings.HasPrefix(line, "#" ) }).Map(func (line string , ch chan string ) { for _, token := range strings.Split(line, ":" ) { ch <- token } }).Map(func (key string ) int { println ("map:" , key) return 1 }).Reduce(func (x int , y int ) int { println ("x:" , x) println ("y:" , y) println ("reduce:" , x+y) return x + y }).Map(func (x int ) { println ("count:" , x) }).Run()
我们看一下这个执行流程。
flow.New() 生成 flow.FlowContext
TextFile(“/etc/passwd”, 2) 打开/etc/passwd
文件,并且数据分片数量为:2
Filter(func) 将返回true
的数据筛选出来
Map(func(line string, ch chan string)) 需要运执行 map 运算,第一个参数为上一个 Step 的结果值,第二个参数说明需要通过一个可读可写的 chan 来写入传输数据,相当于二次拆分数据
Map(func(key string)) 从上一步的 Step 中的 chan 中读取出来的数据,每次来一个 key,都返回一个整型:1
Reduce(func(x int, y int))进行Reduce
的操作,将数据合并汇总,x 代表上一次 step 的总数,y 代表最近一次得到的值。但是这里比较特殊,在前面所有 step 都处理完毕之后,如果你是进行了数据分片的话,会把数据分片再合并一次。
Map(func(x int)) 由于我们进行了 Reduce 了,所以在 Reduce 之后的 map 只会进行一次运行,这个时候 x 就代表我们 Reduce 的 API 的最终结果
Run() 运行流式计算
按照看源码的套路
运行 demo,理解 demo
找到 demo 的运行入口
根据入口来查看运行方式
再回过头来看 demo 的细节
运行逻辑 1 2 3 func (d *Dataset) Run () { d.context.Run() }
1 2 3 4 5 6 7 8 9 func (fc *FlowContext) Run () { if taskRunner != nil && taskRunner.IsTaskMode() { taskRunner.Run(fc) } else if contextRunner != nil && contextRunner.IsDriverMode() { contextRunner.Run(fc) } else { fc.runFlowContextInStandAloneMode() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 func (fc *FlowContext) runFlowContextInStandAloneMode () { var wg sync.WaitGroup isDatasetStarted := make (map [int ]bool ) OnInterrupt(fc.OnInterrupt, nil ) for _, step := range fc.Steps { for _, input := range step.Inputs { if _, ok := isDatasetStarted[input.Id]; !ok { wg.Add(1 ) go func (input *Dataset) { defer wg.Done() input.RunDatasetInStandAloneMode() }(input) isDatasetStarted[input.Id] = true } } wg.Add(1 ) go func (step *Step) { defer wg.Done() step.RunStep() }(step) if step.Output != nil { if _, ok := isDatasetStarted[step.Output.Id]; !ok { wg.Add(1 ) go func (step *Step) { defer wg.Done() step.Output.RunDatasetInStandAloneMode() }(step) isDatasetStarted[step.Output.Id] = true } } } wg.Wait() }
看到这里的逻辑比较核心的有func (fc *FlowContext) runFlowContextInStandAloneMode()
,func (s *Step) RunStep()
我们就看到了最终的入口了,解释我写在代码中。
接下来,我们看一下 input/Output 运行的主逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (d *Dataset) RunDatasetInStandAloneMode () { var wg sync.WaitGroup if len (d.ExternalInputChans) > 0 { d.connectExternalInputChansToRead(&wg) for _, shard := range d.Shards { shard.SetupReadingChans() } } else { for _, shard := range d.Shards { wg.Add(1 ) go func (shard *DatasetShard) { defer wg.Done() shard.SetupReadingChans() var t reflect.Value for ok := true ; ok; { if t, ok = shard.WriteChan.Recv(); ok { shard.SendForRead(t) d.sendToExternalOutputChans(t) } } shard.CloseRead() }(shard) } } wg.Wait() d.closeExternalOutputChans() return }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (shard *DatasetShard) SetupReadingChans () { var uniqTasks []*Task seenTasks := make (map [*Task]bool ) for _, task := range shard.ReadingTasks { if ok := seenTasks[task]; ok { continue } seenTasks[task] = true uniqTasks = append (uniqTasks, task) } shard.lock.Lock() defer shard.lock.Unlock() for _, task := range uniqTasks { for i, s := range task.Inputs { if s == shard { shard.readingChans = append (shard.readingChans, task.InputChans[i]) } } } shard.ReadyTime = time.Now() }
1 2 3 4 5 6 7 8 9 10 func (s *DatasetShard) SendForRead (t reflect.Value) { s.lock.RLock() defer s.lock.RUnlock() s.Counter++ for _, c := range s.readingChans { c <- t } }
1 2 3 4 5 6 7 8 9 func (s *DatasetShard) CloseRead () { s.lock.RLock() defer s.lock.RUnlock() for _, c := range s.readingChans { close (c) } s.CloseTime = time.Now() }
接下来,我们看一下 RunStep 的主逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (s *Step) RunStep () { var wg sync.WaitGroup for i, t := range s.Tasks { wg.Add(1 ) go func (i int , t *Task) { defer wg.Done() t.RunTask() }(i, t) } wg.Wait() return }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (t *Task) RunTask () { t.Step.Function(t) for _, out := range t.Outputs { out.WriteChan.Close() } }
回到我们的程序:
=================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (fc *FlowContext) TextFile (fname string , shard int ) (ret *Dataset) { fn := func (out chan string ) { file, err := os.Open(fname) if err != nil { log.Panicf("Can not open file %s: %v" , fname, err) return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { out <- scanner.Text() } if err := scanner.Err(); err != nil { log.Printf("Scan file %s: %v" , fname, err) } } return fc.Source(fn, shard) }
这里,我们看到func (fc *FlowContext) TextFile(fname string, shard int) (ret *Dataset)
,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (fc *FlowContext) Source (f interface {}, shard int ) (ret *Dataset) { ret = fc.newNextDataset(shard, guessFunctionOutputType(f)) step := fc.AddOneToAllStep(nil , ret) step.Name = "Source" step.Function = func (task *Task) { ctype := reflect.ChanOf(reflect.BothDir, ret.Type) outChan := reflect.MakeChan(ctype, 0 ) fn := reflect.ValueOf(f) var wg sync.WaitGroup wg.Add(1 ) go func () { defer wg.Done() defer outChan.Close() fn.Call([]reflect.Value{outChan}) }() wg.Add(1 ) go func () { defer wg.Done() var t reflect.Value i := 0 for ok := true ; ok; { if t, ok = outChan.Recv(); ok { task.Outputs[i].WriteChan.Send(t) i++ if i == shard { i = 0 } } } }() wg.Wait() } return }