JavaScript的模型与浏览器类似,是运行在单个进程的单个线程上的,它的好处是:程序状态是单一的,没有在多线程的情况下没有锁,线程同步问题。但是单进程单线程并非完美的结构,如今CPU基本都是多核的,一个Node进程只能利用一个核,因此Node实际应用中有一个问题:需要充分利用多核CPU服务器。另外Node执行在单线程上,一旦单线程上抛出了异常没有被捕获,将会引起整个进程的崩溃,因此Node实际应用中的另一个问题是:保证进程的健壮性和稳定性。
多进程架构 Node提供了child_process
模块,并且提供了fork
函数供我们实现进程的复制,我们在worker.js中创建一个http服务器:
1 2 3 4 5 var http = require ('http' )http.createServer ((req, res ) => { res.writeHead (200 , { 'Content-Type' : 'text/plain' }) res.end ("Hello World" ) }).listen (Math .round ((1 + Math .random ()) * 10000 ), '127.0.0.1' )
通过node worker.js
启动它,将会监听一个10000到20000之间的一个随机端口。
新建master.js,写入以下代码,通过node master.js
启动它:
1 2 3 4 5 var fork = require ('child_process' ).fork var cpus = require ('os' ).cpus ()for (let i = 0 ; i < cpus.length ; i++) { fork ('./worker.js' ) }
这段代码会根据当前机器的CPU数量复制出对应的Node进程数,通过ps -aux|grep node
可以查看到当前进程的数量:
1 2 3 4 5 6 7 8 9 10 # ps -aux|grep node root 59 0.1 1.0 330372 32260 pts/0 Sl+ 10:40 0:00 node master.js root 66 0.1 1.1 330628 34524 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 67 0.1 1.1 330628 34536 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 69 0.1 1.1 330628 34680 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 74 0.1 1.0 330628 32468 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 78 0.1 1.0 330628 32536 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 87 0.1 1.0 330628 32648 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 92 0.1 1.0 330628 32484 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js root 98 0.1 1.0 330628 32564 pts/0 Sl+ 10:40 0:00 /usr/bin/node ./worker.js
可以看到进程分两种:主进程和工作进程,主进程不负责具体的业务处理,而是负责调度或管理工作进程,它是趋向稳定的;工作进程负责具体的业务处理。通过fork复制的进程都是独立的进程,这个进程中有着独立而全新的V8实例,它需要至少30ms的启动时间和10MB内存。这里启动多个进程只是为了充分将CPU资源利用起来,而不是为了解决并发问题。Node通过事件驱动的方式在单线程上解决了大并发的问题。
创建子进程 child_process
模块给予Node可以随意创建子进程的能力,它提供了四个方法创建子进程:
spawn
:启动一个子进程来执行命令。
exec
:启动一个子进程来执行命令,它有一个回调函数获知子进程的状况。
execFile
:启动一个子进程来执行可执行文件。
fork
:通过指定要执行的JavaScript文件模块创建子进程。
四种方法的使用方法:
1 2 3 4 5 6 7 8 9 const cp = require ('child_process' );cp.spawn ('node' , ['worker.js' ]) cp.exec ('node worker.js' , (err, stdout, stderr ) => { }) cp.execFile ('worker.js' , (err, stdout, stderr ) => { }) cp.fork ('./worker.js' )
类型
回调/异常
进程类型
执行类型
可设置超时
spawn
×
任意
命令
×
exec
√
任意
命令
√
execFile
√
任意
可执行文件
√
fork
×
Node
JavaScript文件
×
进程间通信 对于child_process
模块,创建好了子进程,然后父子进程间通信是十分容易的。主进程与工作进程之间通过onmessage
和postMessage
进行通信,子进程对象由send方法实现主进程向子进程发送数据,message
事件实现收听子进程发来的数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 const cp = require ('child_process' );var child = cp.fork ("./worker.js" )child.on ('message' , (m ) => { console .log (`PARENT got message:${m} ` ); }) child.send ("hello world" ) process.on ('message' , (m ) => { console .log (`CHILD got message ${m} ` ); }) process.send ("bar" )
创建子进程后,为了实现父子进程之间的通信,父进程与子进程之间会创建IPC通道,通过IPC通道,父子进程才能通过message
和send
传递信息。
进程间通信原理 IPC的全称是Inter-Process Communication,即进程间通信,进程间通信的目的是为了让不同的进程能够互相访问资源并进行协调工作。实现IPC的技术有很多,如命名管道、匿名管道、socket、信号量、共享内存、消息队列、Domain Socket等。Node中实现IPC通道使用管道技术。
父进程在实际创建子进程前,会创建IPC通道并监听它,然后才真正创建子进程,并通过环境变量(NODE_CHANNEL_FD)告诉子进程这个IPC通道的文件描述符。子进程在启动的过程中,根据文件描述符去连接这个已存在的IPC通道,从而完成父子进程之间的连接。
建立连接后的父子进程就可以自由通信了,IPC通道与socket类似,属于双向通信。不同的是他们在系统内核中就完成了进程间的通信,不用经过实际的网络层。只有启动的子进程是Node进程时,子进程才会根据环境变量去连接IPC通道,对于其他类型的子进程无法实现进程间通信,除非其他进程也按约定去连接这个已创建好的IPC通道。
句柄传递 前面的示例中,启动的服务器分别监听各自的端口,如果让服务器都监听到相同的端口,会得到以下结果:
1 2 3 4 5 6 7 8 9 10 11 12 var http = require ('http' )http.createServer ((req, res ) => { res.writeHead (200 , { 'Content-Type' : 'text/plain' }) res.end ("Hello World" ) }).listen (8888 , '127.0.0.1' ) node :events :368 throw er; ^ Error : listen EADDRINUSE : address already in use 127.0 .0 .1 :8888 ......
可以看到只有一个进程能够监听到该端口上,其余的进程都抛出了EADDRINUSE
错误,这个问题破坏了我们将多个进程监听同一个端口的想法,要解决这个问题,通常的做法是让每个进程监听不同的端口,其中主进程监听主端口,主进程将接收到的请求分发到子进程上。
通过代理,可以避免端口不能重复监听的问题,甚至可以在代理进程上做适当的负载均衡。由于进程每收到一个连接,将会用掉一个文件描述符,因此代理方案中客户端连接到代理进程,代理进程连接到工作进程就要用掉两个文件描述符,操作系统的文件描述符是有限的,代理方案浪费掉一倍数量的文件描述符的做法影响了系统扩展能力。
Node引入了进程间发送句柄的功能以解决这个问题。send
方法的第二个参数就是句柄:
1 child.send (message, [sendHandle])
句柄是一种可以用来标识资源的引用,它的内部包含了指向对象的文件描述符。句柄可以用来标识一个服务器端socket对象、一个客户端socket对象、一个UDP套接字、一个管道等。
发送句柄意味着我们可以去掉前面所说的代理方案,使主进程接收到请求后,将请求直接发送给工作进程,而不是重新与工作进程之间建立新的socket连接来转发数据。下面是示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 process.on ('message' , (m, server ) => { if (m === 'server' ) { server.on ('connection' , (socket ) => { socket.end ("handle by child" ) }) } }) const cp = require ('child_process' );var child = cp.fork ("./worker.js" )var server=require ("net" ).createServer ()server.on ('connection' ,(socket )=> { socket.end ("handle by parent" ) }) server.listen (1337 ,()=> { child.send ('server' ,server) })
示例中直接将一个TCP服务器发送给了子进程,启动代码,然后使用curl工具:
1 2 3 4 5 6 # curl http://127.0.0.1:1337 handle by child # curl http://127.0.0.1:1337 handle by parent # curl http://127.0.0.1:1337 handle by child
可以看到子进程和父进程都有可能处理客户端发起的请求。
我们可以将服务发送给多个子进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 const cp = require ('child_process' );var child1 = cp.fork ("./worker.js" )var child2 = cp.fork ("./worker.js" )var server = require ("net" ).createServer ()server.on ('connection' , (socket ) => { socket.end ("handle by parent\n" ) }) server.listen (1337 , () => { child1.send ('server' , server) child2.send ('server' , server) }) process.on ('message' , (m, server ) => { if (m === 'server' ) { server.on ('connection' , (socket ) => { socket.end (`handle by child,pid is ${process.pid} \n` ) }) } })
启动代码,使用curl工具测试:
1 2 3 4 5 6 7 8 # curl http://127.0.0.1:1337 handle by child,pid is 436 # curl http://127.0.0.1:1337 handle by child,pid is 435 # curl http://127.0.0.1:1337 handle by child,pid is 435 # curl http://127.0.0.1:1337 handle by parent
可以看到测试的结果每次都可能不同,结果可能父进程处理,也可能被子进程处理。这是在TCP层面上完成的事情,我们尝试将其转化到HTTP层面来试试,对于主进程,我们可以在服务器发送句柄给子进程后,就关掉服务器的监听,让子进程来处理请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 const cp = require ('child_process' );var child1 = cp.fork ("./worker.js" )var child2 = cp.fork ("./worker.js" )var server = require ("net" ).createServer ()server.listen (1337 , () => { child1.send ('server' , server) child2.send ('server' , server) server.close () }) var http = require ("http" )var server = http.createServer (function (req, res ) { res.writeHead (200 , { "Content-Type" : "text/plain" }) res.end (`handle by child,pid is ${process.pid} \n` ) }) process.on ('message' , (m, tcp ) => { if (m === 'server' ) { tcp.on ("connection" , (socket ) => { server.emit ('connection' , socket) }) } })
重新启动再次测试,如下所示:
1 2 3 4 5 6 # curl http://127.0.0.1:1337 handle by child,pid is 704 # curl http://127.0.0.1:1337 handle by child,pid is 703 # curl http://127.0.0.1:1337 handle by child,pid is 703
这样一来,所有的请求都是由子进程处理了,整个过程中,服务的过程发生了改变:
主进程发送完句柄并关闭监听后,变成了下面的结构:
现在多个子进程可以同时监听同一个端口,并且不会抛出EADDRINUSE错误了。
句柄发送与还原 目前子进程对象send方法可以发送的句柄类型包括以下几种:
net.Socket
:TCP套接字
net.Server
:TCP服务器
net.Native
:C++层面的TCP套接字或IPC管道
dgram.Socket
:UDP套接字
dgram.Native
:C++层面的UDP套接字
send
方法在将消息发送到IPC管道之前,将消息组装成两个对象,一个参数是handle
,另一个是message
,message
参数如下所示:
1 2 3 4 5 { cmd :'NODE_HANDLE' , type :'net.Server' , msg :message }
发送到IPC管道中的实际上是我们要发送的句柄文件描述符,文件描述符实际上是一个整数值。这个message
对象在写入到IPC管道时会通过JSON.stringfy
进行序列化,所以最终发送到IPC通道中的都是字符串,send方法能发送消息和句柄并不意味他能发送任意对象。
连接了IPC通道的子进程可以读取父进程发来的消息,将字符串通过JSON.parse
解析还原成对象后,才触发message
事件将消息体传递给应用层使用。消息对象还要被进行过滤处理,message.cmd
的值如果以NODE_
为前缀,它将相应一个内部internalMessage
事件。如果message.cmd
的值是NODE_HANDLE
,它将取出message.type
的值和得到的文件描述符一起还原出一个对应的对象。
目前Node只支持上述几种句柄,并非任意类型的句柄都能在进程间传递,除非它由完整的发送和还原过程。
端口共同监听 我们独立启动的进程中,TCP服务器socket套接字的文件描述符并不相同,导致监听到相同的端口时会抛出异常。
Node对每一个监听的端口都设置了SO_REUSEADDR
选项,这个选项的含义是不同进程可以使用相同的网卡和端口进行监听,这个服务器套接字可以被不同程度的服用。
由于独立启动的进程互相之间不知道文件描述符,所以监听相同端口时就会失败。但对于send
发送的句柄还原出来的服务而言,他们的文件描述符是相同的,所以监听相同的端口不会引起异常。
多个引用监听相同端口时,文件描述符同一时间只能被某一个进程利用,所以只有一个进程能抢到连接,也就是说只有它能对请求进行响应。
集群 搭建集群后,虽然已经充分利用了CPU资源,但是依然还有一些问题需要考虑:
性能问题
多个工作进程的存活状态管理
工作进程的平滑重启
配置或者静态数据的动态重新载入
进程事件 子进程对象上除了message
事件,还有如下事件:
error
:当子进程无法被复制、创建、无法被杀死、无法发送消息时被触发。
exit
:子进程退出时触发的事件。
close
:在子进程的标准输入输出流中止时触发该事件。
disconnect
:父或子进程中调用disconnect
方法时触发该事件。
上面的事件时父进程能监听到的子进程事件。除了send
之外,还能通过kill方法给子进程发送消息,kill方法并不能真正将子进程杀死,它只是给子进程发送一个系统信号:
1 2 3 4 child.kill ([signal]) process.kill (pid,[signal])
Node提供了这些信号对应的信号事件,每个进程都可以监听这些信号事件。
自动重启 有了父子进程之间的事件后,就可以在这些关系之间创建出需要的机制了。我们能够监听子进程的exit
事件来获取其退出的信息,如果子进程退出了,我们可以重新启动给一个工作进程继续服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const fork = require ('child_process' ).fork ;var cpus = require ("os" ).cpus ()var server = require ("net" ).createServer ()var workers = {}var createWorker = function ( ) { var worker = fork (__dirname + '/worker.js' ) worker.on ('exit' , function ( ) { console .log (`Worker ${worker.pid} exited` ) delete workers[worker.pid ] createWorker () }) workers[worker.pid ] = worker console .log (`Worker ${worker.pid} created` ); } for (let index = 0 ; index < cpus.length ; index++) { createWorker () } process.on ("exit" , function ( ) { for (var pid in workers) { workers[pid].kill () } }) server.listen (1337 , () => { for (var pid in workers) { workers[pid].send ("server" , server) } server.close () })
运行上面的代码,并通过kill命令杀死一个进程:
1 2 3 4 5 6 7 8 9 # node master.js Worker 10352 created Worker 10358 created Worker 10359 created Worker 10363 created Worker 10367 created Worker 10373 created Worker 10383 created Worker 10389 created
1 2 Worker 10352 exited Worker 10410 created
在测试中我们主动杀死了一个进程,在实际业务中,可能有未处理的异常导致进程退出,我们需要处理这种异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 var http = require ("http" )var server = http.createServer (function (req, res ) { res.writeHead (200 , { "Content-Type" : "text/plain" }) res.end (`handle by child,pid is ${process.pid} \n` ) }) var worker;process.on ('message' , (m, tcp ) => { if (m === 'server' ) { worker = tcp worker.on ("connection" , (socket ) => { server.emit ('connection' , socket) }) } }) process.on ('uncaughtException' , function ( ) { worker.close (function ( ) { process.exit (1 ) }) })
一旦子进程有未处理的异常出现,工作进程就立即停止接受新的连接,当所有连接断开后,退出进程。主进程监听到工作进程退出后,将会立即启动新的工作进程,以此保证整个集群中总是有进程在为用户服务。
自杀信号 上述代码存在的问题是:要等到已有的所有连接断开后进程才退出,在极端情况下,所有工作进程都停止接受新的连接,全处在等待退出的状态,这样会导致请求丢失。因此我们在退出的流程增加一个自杀信号,工作进程得知要退出时,向主进程发送一个自杀信号,然后才停止连接。主进程在接收到自杀信号后,立即创建新的工作进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 var http = require ("http" )var server = http.createServer (function (req, res ) { res.writeHead (200 , { "Content-Type" : "text/plain" }) res.end (`handle by child,pid is ${process.pid} \n` ) throw new Error ("throw Exception" ) }) var worker;process.on ('message' , (m, tcp ) => { if (m === 'server' ) { worker = tcp worker.on ("connection" , (socket ) => { server.emit ('connection' , socket) }) } }) process.on ('uncaughtException' , function (err ) { console .error (err); process.send ({ act : "suicide" }) worker.close (function ( ) { process.exit (1 ) }) }) worker.on ('exit' , function ( ) { console .log (`Worker ${worker.pid} exited` ) delete workers[worker.pid ] }) worker.on ('message' , (function (message ) { if (message.act == 'suicide' ) { createWorker () } }))
为了模拟未捕获的异常,我们将工作进程的代码加上异常抛出,一旦有用户请求,就会抛出异常。运行代码:
1 2 3 4 5 6 7 8 9 # node master.js Worker 10967 created Worker 10973 created Worker 10974 created Worker 10976 created Worker 10982 created Worker 10990 created Worker 10994 created Worker 10999 created
用curl测试:
1 2 # curl http://127.0.0.1:1337 handle by child,pid is 10999
回头看看信息:
1 2 3 4 5 6 7 Error: throw Exception at Server.<anonymous> (/mnt/c/Users/DIAOAN/Desktop/process/worker.js:5:11) at Server.emit (node:events:390:28) at parserOnIncoming (node:_http_server:951:12) at HTTPParser.parserOnHeadersComplete (node:_http_common:128:17) Worker 11025 created Worker 10999 exited
现在我们完成了进程的平滑重启,一旦有异常出现,主进程会创建新的工作进程来服务,示意图如下:
限量重启 工作进程不能无限重启,如果进程在启动过程中就发生了错误,会导致工作进程不断地重启,这种不属于我们捕捉未知异常地情况,因为这种短时间内频繁重启极有可能是程序编写错误。
为了消除这种无意义的重启,应当设定重启规则,比如单位之间内只能重启多少次,超过限制则触发giveup
事件,告知放弃重启工作进程这个重要事件。
为了完成限量重启的统计,我们引入一个队列来做标记,在重启工作进程之间进行打点并判断重启是否太过频繁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 var limit = 10 var during = 60000 var restart = []var isTooFrequently = ( ) => { var time = Date .now () var length = restart.push (time) if (length > limit) { restart = restart.slice (limit * -1 ) } return restart.length >= limit && restart[restart.length - 1 ] - restart[0 ] < during } var workers = {}var createWorker = function ( ) { if (isTooFrequently ()) { process.emit ("giveup" , length, during) return } var worker = fork (__dirname + '/worker.js' ) worker.on ('exit' , function ( ) { console .log (`Worker ${worker.pid} exited` ) delete workers[worker.pid ] }) worker.on ('message' , (function (message ) { if (message.act == 'suicide' ) { createWorker () } })) workers[worker.pid ] = worker console .log (`Worker ${worker.pid} created` ); }
负载均衡 Node默认提供的机制是采用操作系统的抢占式策略,所谓抢占式就是在一堆工作进程中,闲着的进程对到来的请求进行争抢,谁抢到谁服务。对Node而言,分清进程的繁忙是由CPU、I/O两部分构成的,影响抢占的是CPU繁忙度,对不同的业务,可能存在I/O繁忙,CPU空闲的情况,这可能造成某个进程抢到较多的请求,形成负载不均衡的情况。
Node在v0.11中提供了新的策略使得负载均衡更合理,这种新的策略叫Round-Robin
,又叫轮叫调度。轮叫调度的工作方式是主进程接受连接,将其一次分发给工作进程。分发策略是在N个工作进程中,每次选择第i=(i+1) mod n
个进程来发送连接。在cluster
模块中启用它的方式:
1 2 3 4 cluster.schedulingPolicy =cluster.SCHED_RR export NODE_CLUSTER_SCHED_POLICY =none
状态共享 Node进程中不宜存放太多数据,同时Node进程之间也不支持共享数据。在实际业务中,往往需要共享一些数据,譬如配置数据。我们需要一种方案来实现数据在多个进程之间的共享。
第三方数据存储 数据共享最简单的方式就是通过第三方来进行数据存储,比如将数据存放到数据库、磁盘文件、Redis中。
主动通知 另一种方式是在数据发生变更时,主动通知子进程。即使是主动通知,也需要一种机制来及时获取数据的改变。
Cluster Node在v0.8版本就新增了cluster
模块,用以解决多核CPU利用率问题。
开头体到的创建Node进程集群,用cluster
也可以轻松实现:
1 2 3 4 5 6 7 8 9 10 11 const cluster = require ('cluster' )cluster.setupMaster ({ exec : "worker.js" }) var cpus = require ("os" ).cpus ()for (let index = 0 ; index < cpus.length ; index++) { cluster.fork () }
官方文档上实现前面子进程监听同一个端口的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 var cluster = require ("cluster" )var http = require ("http" )var { cpus } = require ("os" )var process = require ("process" )const numCPUs = cpus ().length ;if (cluster.isPrimary ) { console .log (`Primary ${process.pid} is running` ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork (); } cluster.on ('exit' , (worker, code, signal ) => { console .log (`worker ${worker.process.pid} died` ); }); } else { http.createServer ((req, res ) => { res.writeHead (200 ); res.end ('hello world\n' ); }).listen (8000 ); console .log (`Worker ${process.pid} started` ); }
官方实例中通过判断cluster.isPrimary
来确定进程是主进程还是工作进程,对于代码的可读性十分差。我们尽量使用cluster.setupPrimary
这个API,将主进程和工作进程从代码分离:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 var cluster = require ("cluster" )var { cpus } = require ("os" )var process = require ("process" )const numCPUs = cpus ().length ;cluster.setupPrimary ({ exec : "worker.js" }) console .log (`Primary ${process.pid} is running` );for (let i = 0 ; i < numCPUs; i++) { cluster.fork (); } cluster.on ('exit' , (worker, code, signal ) => { console .log (`worker ${worker.process.pid} died` ); }); var http = require ("http" )var process = require ("process" )http.createServer ((req, res ) => { res.writeHead (200 ); res.end (`handle by process ${process.pid} \n` ); }).listen (8000 );
Cluster工作原理 事实上cluster
模块就是child_process
模块和net
模块的结合体,cluster
启动时,它会在内部启动TCP服务器,在cluster.fork
子进程时,将这个TCP服务端的socket的文件描述符发送给工作进程,如果进程是通过cluster.fork
复制出来的,那么它的环境变量里就存在NODE_UNIQUE_ID
,如果工作进程中存在端口的监听,它将拿到文件描述符,通过SO_REUSEADDR
端口重用,进而实现多个子进程共享端口。在cluster
模块中,一个主进程只能管理一组工作进程。
总结 尽管Node从单线程来说是非常脆弱的,不仅不能充分利用CPU资源,而且稳定性也有所欠缺。但是群体的力量是强大的,通过主从模式,可以将一个Node应用的质量提升一个档次,每个进程只做一件事,并做好这件事,将复杂分解为简单,将简单组合为强大。