并发并行与Go并发编程.docx
并发并行与Go并发编程并发与并行并发concurrency并发的关注点在于任务切分。举例来讲你是一个创业公司的CEO开场只有你一个人你一人分饰多角一会做产品规划一会写代码一会见客户固然你不能见客户的同时写代码但由于你切分了任务分配了时间片表现出来好似是多个任务一起在执行。并行parallelism并行的关注点在于同时执行。还是上面的例子你发现你自己太忙了时间分配不过来于是请了工程师产品经理市场总监各司一职这时候多个任务可以同时执行了。GreenThread用户空间首先是在用户空间防止内核态以及用户态的切换导致的本钱。由语言或框架层调度更小的栈空间允许创立大量实例百万级别几个概念Continuation这个概念不熟悉FP编程的人可能不太熟悉不过这里可以简单的顾名思义可以理解为让我们的程序可以暂停然后下次调用继续contine从上次暂停的地方开场的一种机制。相当于程序调用多了一种入口。Coroutine是Continuation的一种实现一般表现为语言层面的组件或类库。主要提供yieldresume机制。Fiber以及Coroutine其实是一体两面的主要是从系统层面描绘可以理解成Coroutine运行之后的东西就是Fiber。GoroutineGoroutine其实就是前面GreenThread系列解决方案的一种演进以及实现。首先它内置了Coroutine机制。因为要用户态的调度必须有可以让代码片段可以暂停/继续的机制。其次它内置了一个调度器实现了Coroutine的多线程并行调度同时通过对网络等库的封装对用户屏蔽了调度细节。最后提供了Channel机制用于Goroutine之间通信实现CSP并发模型CommunicatingSequentialProcesses。因为Go的Channel是通过语法关键词提供的对用户屏蔽了许多细节。其实Go的Channel以及Java中的SynchronousQueue是一样的机制假如有buffer其实就是ArrayBlockQueue。Goroutine调度器这个图一般讲Goroutine调度器的地方都会引用想要仔细解析的可以看看原boke小编点击浏览原文获取。这里只讲明几点M代表系统线程P代表处理器核G代表Goroutine。Go实现了M:N的调度也就是讲线程以及Goroutine之间是多对多的关系。这点在许多GreenThread/Coroutine的调度器并没有实现。比方Java1.1版本之前的线程其实是GreenThread这个词就来源于Java但由于没实现多对多的调度也就是没有真正实现并行发挥不了多核的优势所以后来改成基于系统内核的Thread实现了。某个系统线程假如被阻塞排列在该线程上的Goroutine会被迁移。当然还有其他机制比方M空闲了假如全局队列没有任务可能会从其他M偷任务执行相当于一种rebalance机制。这里不再细讲有需要看专门的分析文章。详细的实现策略以及我们前面分析的机制类似。系统启动时会启动一个独立的后台线程不在Goroutine的调度线程池里启动netpoll的轮询。当有Goroutine提议网络恳求时网络库会将fd文件描绘符以及pollDesc用于描绘netpoll的构造体包含因为读/写这个fd而阻塞的Goroutine关联起来然后调用runtime.gopark方法挂起当前的Goroutine。当后台的netpoll轮询获取到epollLinux环境下的event会将event中的pollDesc取出来找到关联的阻塞Goroutine并进展恢复。Goroutine是银弹么Goroutine很大程度上降低了并发的开发本钱是不是我们所有需要并发的地方直接gofunc就搞定了呢Go通过Goroutine的调度解决了CPU利用率的问题。但遇到其他的瓶颈资源怎样处理比方带锁的分享资源比方数据库连接等。互联网在线应用场景下假如每个恳求都扔到一个Goroutine里当资源出现瓶颈的时候会导致大量的Goroutine阻塞最后用户恳求超时。这时候就需要用Goroutine池来进展控流同时问题又来了池子里设置多少个Goroutine适宜所以这个问题还是没有从更本上解决。go没有严格的内置的logicalprocessor数量限制,但是go的runtime默认限制了每个program最多使用10,000个线程,可以通过SetMaxThreads修改.下列图展示了Concurrency以及Parallelism的区别goroutine使用go块go的用法很简单,如下.假如没有最外面的括号(),会显示go块必须是一个函数调用.没有()只是一个函数的声明,有了()是一个调用(没有参数的)gofunc()for_,n:rangenumsout-nclose(out)()channelchannel默认上是阻塞的也就是讲假如Channel满了就阻塞写假如Channel空了就阻塞读。于是我们就可以使用这种特性来同步我们的发送以及接收端。channel-,发送一个新的值到通道中-channel,从通道中接收一个值,这个更像有两层含义,一个是会返回一个结果,当做赋值来用:msg:-channel;另外一个含义是等待这个channel发送消息,所以还有一个等的含义在.所以假如你直接写fmt.Print(-channel)本意只是想输出下这个chan传来的值,但是其实他还会阻塞住等着channel来发.默认发送以及接收操作是阻塞的直到发送方以及接收方都准备完毕。funcmain()messages:make(chanstring)gofunc()messages-ping()msg:-messagesfmt.Println(msg)所以你要是这么写:是一辈子都不会执行到print的(会死锁)funcmain()messages:make(chanstring)messages-pingmsg:-messagesfmt.Println(msg)所以在一个go程中,发送messages-msgchannel的时候,要格外小心,不然一不注意就死锁了.(解决方法:1.用带缓存的chan;2.使用带有default的select发送)selectcasemessages-msg:fmt.Println(sentmessage)default:fmt.Println(nomessagesent)range用于channel的range是阻塞的.下面程序会显示deadloc,去掉注释就好了.queue:make(chanstring,2)/queue-one/queue-two/close(queue)forelem:rangequeuefmt.Println(elem)通道缓冲加了缓存之后,就像你向channel发送消息的时候(message-ping),ping就已经发送出去了(到缓存).就像一个异步的队列?到时候,-message直接从缓存中取值就好了(异步.)但是你要这么写,利用通道缓冲,就可以.无缓冲的意味着只有在对应的接收(-chan)通道准备好接收时,才允许发送(chan-),可缓存通道允许在没有对应接收方的情况下缓存限定数量的值。funcmain()message:make(chanstring,1)message-pingmsg:-messagefmt.Print(msg)要是多发一个messages-channel,fatalerror:allgoroutinesareasleep-deadlock!,要是多承受一个fmt.Println(-messages),会打印出bufferedchannel,然后报同样的errorfuncmain()messages:make(chanstring,2)messages-bufferedmessages-channelfmt.Println(-messages)fmt.Println(-messages)通道同步使用通道同步,假如你把-done这行代码从程序中移除程序甚至会在worker还没开场运行时就完毕了。funcworker(donechanbool)fmt.Print(working.)time.Sleep(time.Second)/workingfmt.Println(done)done-truefuncmain()done:make(chanbool,1)goworker(done)-done/blocking阻塞在这里,知道worker执行完毕发送方向可以指定这个通道是不是只用来发送或接收值。这个特性提升了程序的类型平安性。pong函数允许通道pings来接收数据另一通道pongs来发送数据。funcping(pingschan-string,msgstring)pings-msgfuncpong(pings-chanstring,pongschan-string)msg:-pingspongs-msgfuncmain()pings:make(chanstring,1)pongs:make(chanstring,1)ping(pings,passedmessage)pong(pings,pongs)fmt.Println(-pongs)selectGo的select让你可以同时等待多个通道操作。(poll/epoll?)注意select要么写个死循环用超时,要不就定好次数.或加上default让select变成非阻塞的gofunc()time.Sleep(time.Second*1)c1-one()gofunc()time.Sleep(time.Second*2)c2-two()fori:0;i2;iselectcasemsg1:-c1:fmt.Println(received,msg1)casemsg2:-c2:fmt.Println(received,msg2)超时处理其中time.After返回-chanTime,直接向select发送消息selectcaseres:-c1:fmt.Println(res)case-time.After(time.Second*1):fmt.Println(timeout1)非阻塞通道操作default,当监听的channel都没有准备好的时候默认执行的.selectcasemsg:-messages:fmt.Println(receivedmessage,msg)default:fmt.Println(nomessagereceived)可以使用select语句来检测chan是否已经满了ch:make(chanint,1)ch-1selectcasech-2:default:fmt.Println(channelisfull!)通道关闭一个非空的通道也是可以关闭的但是通道中剩下的值仍然可以被接收到queue:make(chanstring,2)queue-onequeue-twoclose(queue)forelem:rangequeuefmt.Println(elem)定时器在将来某一刻执行一次时使用的定时器表示在将来某一时刻的独立事件。你告诉定时器需要等待的时间然后它将提供一个用于通知的通道。可以显示的关闭timer1:time.NewTimer(time.Second*2)-timer1.C-timer1.C直到这个定时器的通道C明确的发送了定时器失效的值(2s)之前将一直阻塞。假如你只是要单纯的等待用time.Sleep,定时器是可以在它失效之前把它给取消的stop2:timer2.Stop()打点器当你想要在固定的时间间隔重复执行,定时的执行直到我们将它停顿funcmain()/打点器以及定时器的机制有点相似一个通道用来发送数据。这里我们在这个通道上使用内置的range来迭代值每隔500ms发送一次的值。ticker:time.NewTicker(time.Millisecond*500)gofunc()fort:rangeticker.Cfmt.Println(Tickat,t)()/打点器可以以及定时器一样被停顿。一旦一个打点停顿了将不能再从它的通道中接收到值。我们将在运行后1600ms停顿这个打点器。time.Sleep(time.Millisecond*1600)ticker.Stop()fmt.Println(Tickerstopped)生成器类似于提供了一个效劳,不过只是适用于调用不是很频繁funcrand_generator_2()chanintout:make(chanint)gofunc()forout-rand.Int()()returnoutfuncmain()/生成随机数作为一个效劳rand_service_handler:rand_generator_2()fmt.Printf(%dn,-rand_service_handler)多路复用Apache使用途理每个连接都需要一个进程所以其并发性能不是很好。而Nighx使用多路复用的技术让一个进程处理多个连接所以并发性能比拟好。多路复用技术可以用来整合多个通道。提升性能以及操作的便捷。其实就是整合了多个上面的生成器funcrand_generator_3()chanintrand_generator_1:rand_generator_2()rand_generator_2:rand_generator_2()out:make(chanint)gofunc()for/读取生成器1中的数据整合out-rand_generator_1()gofunc()for/读取生成器2中的数据整合out-rand_generator_2()returnoutFurture技术可以在不准备好参数的情况下调用函数。函数调用以及函数参数准备这两个经过可以完全解耦。可以在调用的时候不关心数据是否准备好返回值是否计算好的问题。让程序中的组件在准备好数据的时候自动跑起来。这个最后获得-q.result也是可以放到execQuery上面的把Furture技术可以以及各个其他技术组合起来用。可以通太多路复用技术监听多个结果Channel当有结果后自动返回。可以以以及生成器组合使用生成器不断消费数据Furture技术逐个处理数据。Furture技术自身还可以首尾相连形成一个并发的pipefilter。这个pipefilter可以用于读写数据流操作数据流。typequerystructsqlchanstringresultchanstringfuncexecQuery(qquery)gofunc()sql:-q.sqlq.result-getsql()funcmain()q:querymake(chanstring,1),make(chanstring,1)execQuery(q)/准备参数q.sql-select*fromtablefmt.Println(-q.result)ChainFilter技术程序创立了10个Filter每个分别过滤一个素数所以可以输出前10个素数。funcGenerate(chchan-int)fori:2;ich-ifuncFilter(in-chanint,outchan-int,primeint)fori:-in/Receivevaluefromin.ifi%prime!0out-i/Senditoout./Theprimesieve:Daisy-chainFilterprocesses.funcmain()ch:make(chanint)/Createanewchannel.goGenerate(ch)/LaunchGenerategoroutine.fori:0;i10;iprime:-chprint(prime,n)ch1:make(chanint)goFilter(ch,ch1,prime)chch1分享变量有些时候使用分享变量可以让代码更加简洁typesharded_varstructreaderchanintwriterchanintfuncsharded_var_whachdog(vsharded_var)/分享变量维护协程gofunc()varvalueint0for/监听读写通道完成效劳selectcasevalue-v.writer:casev.reader-value:()funcmain()v:sharded_varmake(chanint),make(chanint)/初始化并开场维护协程sharded_var_whachdog(v)fmt.Println(-v.reader)v.writer-1fmt.Println(-v.reader)Concurrencypatterns下面介绍了一些常用的并发形式.Runner当你的程序会运行在后台,可以是cronjob或是Iron.io这样的worker-based云环境.这个程序就可以监控以及中断你的程序,假如你的程序运行的太久了.定义了三个channel来通知任务状态.interrupt:接收系统的终止信号(比方ctrl-c),接收到之后系统就优雅的退出complete:指示任务完成状态或返回错误timeout:当超时了之后,系统就优雅的退出tasks是一个函数类型的slice,你可以往里面存放签名为funcfuncName(idint)的函数,作为你的任务.task(id)就是在执行任务了(当然只是用来模拟任务,可以定义一个任务接口来存放任务,此处是为了简便).注意tasks里面的任务是串行执行的,这些任务的执行发生在一个单独的goroutine中.New方法里的interruptchannelbuffer设置为1,也就是讲当用户重复ctrlc的时候,程序也只会收到一个信号,其他的信号会被丢弃.在run()方法中,在开场执行任务前(task(id),会前检查执行流程有没有被中断(ifr.gotInterrupt(),这里用了一个带default语句的select.一旦收到中断的事件,程序就不再承受任何其他事件了(signal.Stop(r.interrupt).在Start()方法中,在go块中执行run()方法,任何当前的goroutine会阻塞在select这边,直到收到run()返回的completechannel或超时返回./Runnerrunsasetoftaskswithinagiventimeoutandcanbeshutdownonanoperatingsysteminterrupt.typeRunnerstruct/interruptchannelreportsasignalfromtheoperatingsystem.interruptchanos.Signal/pletechanerror/timeoutreportsthattimehasrunout.timeout-chantime.Time/tasksholdsasetoffunctionsthatareexecuted/synchronouslyinindexorder.tasksfunc(int)/ErrTimeoutisreturnedwhenavalueisreceivedonthetimeoutchannel.varErrTimeouterrors.New(receivedtimeout)/ErrInterruptisreturnedwhenaneventfromtheOSisreceived.varErrInterrupterrors.New(receivedinterrupt)/Newreturnsanewready-to-useRunner.funcNew(dtime.Duration)*RunnerreturnRunnerinterrupt:make(chanos.Signal,1),complete:make(chanerror),timeout:time.After(d),/AddattachestaskstotheRunner.AtaskisafunctionthattakesanintID.表示可以传入多个参数func(r*Runner)Add(tasks.func(int)r.tasksappend(r.tasks,tasks.)/Startrunsalltasksandmonitorschannelevents.func(r*Runner)Start()error/Wewanttoreceiveallinterruptbasedsignals.signal.Notify(r.interrupt,os.Interrupt)/Runthedifferenttasksonadifferentgoroutine.gofunc()rplete-r.run()()select/Signaledwhenprocessingisdone.caseerr:-rplete:returnerr/Signaledwhenwerunoutoftime.case-r.timeout:returnErrTimeout/runexecuteseachregisteredtask.func(r*Runner)run()errorforid,task:ranger.tasks/CheckforaninterruptsignalfromtheOS.ifr.gotInterrupt()returnErrInterrupt/Executetheregisteredtask.task(id)returnnil/gotInterruptverifiesiftheinterruptsignalhasbeenissued.func(r*Runner)gotInterrupt()boolselect/Signaledwhenaninterrupteventissent.case-r.interrupt:/Stopreceivinganyfurthersignals.signal.Stop(r.interrupt)returntrue/Continuerunningasnormal.default:returnfalsemain方法consttimeout3*time.Second/mainistheentrypointfortheprogram.funcmain()log.Println(Startingwork.)/Createanewtimervalueforthisrun.r:runner.New(timeout)/Addthetaskstoberun.r.Add(createTask(),createTask(),createTask()/Runthetasksandhandletheresult.iferr:r.Start();err!nilswitcherrcaserunner.ErrTimeout:log.Println(Terminatingduetotimeout.)os.Exit(1)caserunner.ErrInterrupt:log.Println(Terminatingduetointerrupt.)os.Exit(2)log.Println(Processended.)/createTaskreturnsanexampletaskthatsleepsforthespecified/numberofsecondsbasedontheid.funccreateTask()func(int)returnfunc(idint)log.Printf(Processor-Task#%d.,id)time.Sleep(time.Duration(id)*time.Second)Pooling当你有一些特定的资源要分享,比方数据库连接或内存buffers,这个形式就非常有用goroutine要用一个资源,就去pool中去拿,用完了就还回去.例子中的资源是只要实现了io.Closer接口即可.m用来保证多goroutine下对Poll的操作都是value-safe的.resources将会是一个bufferedchannel,会包含将要共享的资源.factory的作用是创立一个新的资源,当poll有需要的时候.closed用来指示pool有无被关闭New函数承受一个用来创立新资源的函数对象(fnfunc()(io.Closer,error),返回一个资源)还有一个size参数.Acquire函数先从pool中取资源,要是取不到用factory新建一个func(p*Pool)Acquire()(io.Closer,error)select/Checkforafreeresource.caser,_:-p.resources:returnr,nil/Provideanewresourcesincetherearenoneavailable.default:returnp.factory()Release函数:假如pool已经关闭,就直接return.否那么就向resource这个bufferedchannel里发送要释放的资源.default语句是假如resource已经满了,就关闭这个pool.Close函数:当程序运行完关闭pool的时候,应该调用Close函数,这个函数首先关闭resource这个bufferedchannel,然后再把bufferedchannel中的任务关闭(io.Closer).注意这个加锁./Poolmanagesasetofresourcesthatcanbesharedsafelybymultiplegoroutines./Theresourcebeingmanagedmustimplementtheio.Closerinterface.typePoolstructmsync.Mutexresourceschanio.Closerfactoryfunc()(io.Closer,error)closedbool/ErrPoolClosedisreturnedwhenanAcquirereturnsonaclosedpool.varErrPoolClosederrors.New(Poolhasbeenclosed.)/Newcreatesapoolthatmanagesresources.Apoolrequiresa/functionthatcanallocateanewresourceandthesizeofthepool.funcNew(fnfunc()(io.Closer,error),sizeuint)(*Pool,error)ifsize0returnnil,errors.New(Sizevaluetoosmall.)returnPoolfactory:fn,resources:make(chanio.Closer,size),nil/Acquireretrievesaresourcefromthepool.func(p*Pool)Acquire()(io.Closer,error)select/Checkforafreeresource.caser,ok:-p.resources:log.Println(Acquire:,SharedResource)if!okreturnnil,ErrPoolClosedreturnr,nil/Provideanewresourcesincetherearenoneavailable.default:log.Println(Acquire:,NewResource)returnp.factory()/Releaseplacesanewresourceontothepool.func(p*Pool)Release(rio.Closer)/SecurethisoperationwiththeCloseoperation.p.m.Lock()deferp.m.Unlock()/Ifthepoolisclosed,discardtheresource.ifp.closedr.Close()returnselect/Attempttoplacethenewresourceonthequeue.casep.resources-r:log.Println(Release:,InQueue)/Ifthequeueisalreadyatcapweclosetheresource.default:log.Println(Release:,Closing)r.Close()/Closewillshutdownthepoolandcloseallexistingresources.func(p*Pool)Close()/SecurethisoperationwiththeReleaseoperation.p.m.Lock()deferp.m.Unlock()/Ifthepoolisalreadyclose,dontdoanything.ifp.closedreturn/Setthepoolasclosed.p.closedtrue/Closethechannelbeforewedrainthechannelofits/resources.Ifwedontdothis,wewillhaveadeadlock.close(p.resources)/Closetheresourcesforr:rangep.resourcesr.Close()mainconst(maxGoroutines25/thenumberofroutinestouse.pooledResources2/numberofresourcesinthepool)/dbConnectionsimulatesaresourcetoshare.typedbConnectionstructIDint32/Closeimplementstheio.CloserinterfacesodbConnectioncanbemanagedbythepool.Closeperformsanyresourcereleasemanagement.func(dbConn*dbConnection)Close()errorlog.Println(Close:Connection,dbConn.ID)returnnil/idCounterprovidessupportforgivingeachconnectionauniqueid.varidCounterint32/createConnectionisafactorymethodthatwillbecalledbythepoolwhenanewconnectionisneeded.funccreateConnection()(io.Closer,error)id:atomic.AddInt32(idCounter,1)log.Println(Create:NewConnection,id)returndbConnectionid,nil/mainistheentrypointforallGoprograms.funcmain()varwgsync.WaitGroupwg.Add(maxGoroutines)/Createthepooltomanageourconnections.p,err:pool.New(createConnection,pooledResources)iferr!nillog.Println(err)/Performqueriesusingconnectionsfromthepool.forquery:0;querymaxGoroutines;query/Eachgoroutineneedsitsowncopyofthequeryvalueelsetheywillallbesharingthesamequeryvariable.gofunc(qint)performQueries(q,p)wg.Done()(query)/Waitforthegoroutinestofinish.wg.Wait()/Closethepool.log.Println(ShutdownProgram.)p.Close()/performQueriesteststheresourcepoolofconnections.funcperformQueries(queryint,p*pool.Pool)/Acquireaconnectionfromthepool.conn,err:p.Acquire()iferr!nillog.Println(err)return/Releasetheconnectionbacktothepool.deferp.Release(conn)/Waittosimulateaqueryresponse.time.Sleep(time.Duration(rand.Intn(1000)*time.Millisecond)log.Printf(Query:QID%dCID%dn,query,conn.(*dbConnection).ID)WorkNew函数开启了固定个数(maxGoroutines)个goroutine,注意这边work是一个unbufferedchannel.这个forrange会阻塞直到channel中有值可以取.要是work这个channel被关闭了,这个forrange就完毕,然后调用wg.DoneRun函数提交任务到pool中去w.work-w.注意这个work是一个unbufferedchannel,所以得等一个goroutine把它取走,否那么会阻塞住.这是我们需要保证的,因为我们想要调用者保证这个任务被提交之后立即开场运行typeWorkerinterfaceTask()/PoolprovidesapoolofgoroutinesthatcanexecuteanyWorker/tasksthataresubmitted.typePoolstructworkchanWorkerwgsync.WaitGroup/Newcreatesanewworkpool.funcNew(maxGoroutinesint)*Poolp:Poolwork:make(chanWorker),p.wg.Add(maxGoroutines)fori:0;imaxGoroutines;igofunc()forw:rangep.workw.Task()p.wg.Done()()returnp/Runsubmitsworktothepool.func(p*Pool)Run(wWorker