farm.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. 'use strict'
  2. const DEFAULT_OPTIONS = {
  3. workerOptions : {}
  4. , maxCallsPerWorker : Infinity
  5. , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length
  6. , maxConcurrentCallsPerWorker : 10
  7. , maxConcurrentCalls : Infinity
  8. , maxCallTime : Infinity // exceed this and the whole worker is terminated
  9. , maxRetries : Infinity
  10. , forcedKillTime : 100
  11. , autoStart : false
  12. , onChild : function() {}
  13. }
  14. const fork = require('./fork')
  15. , TimeoutError = require('errno').create('TimeoutError')
  16. , ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
  17. , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
  18. function Farm (options, path) {
  19. this.options = Object.assign({}, DEFAULT_OPTIONS, options)
  20. this.path = path
  21. this.activeCalls = 0
  22. }
  23. // make a handle to pass back in the form of an external API
  24. Farm.prototype.mkhandle = function (method) {
  25. return function () {
  26. let args = Array.prototype.slice.call(arguments)
  27. if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
  28. let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
  29. if (typeof args[args.length - 1] == 'function')
  30. return process.nextTick(args[args.length - 1].bind(null, err))
  31. throw err
  32. }
  33. this.addCall({
  34. method : method
  35. , callback : args.pop()
  36. , args : args
  37. , retries : 0
  38. })
  39. }.bind(this)
  40. }
  41. // a constructor of sorts
  42. Farm.prototype.setup = function (methods) {
  43. let iface
  44. if (!methods) { // single-function export
  45. iface = this.mkhandle()
  46. } else { // multiple functions on the export
  47. iface = {}
  48. methods.forEach(function (m) {
  49. iface[m] = this.mkhandle(m)
  50. }.bind(this))
  51. }
  52. this.searchStart = -1
  53. this.childId = -1
  54. this.children = {}
  55. this.activeChildren = 0
  56. this.callQueue = []
  57. if (this.options.autoStart) {
  58. while (this.activeChildren < this.options.maxConcurrentWorkers)
  59. this.startChild()
  60. }
  61. return iface
  62. }
  63. // when a child exits, check if there are any outstanding jobs and requeue them
  64. Farm.prototype.onExit = function (childId) {
  65. // delay this to give any sends a chance to finish
  66. setTimeout(function () {
  67. let doQueue = false
  68. if (this.children[childId] && this.children[childId].activeCalls) {
  69. this.children[childId].calls.forEach(function (call, i) {
  70. if (!call) return
  71. else if (call.retries >= this.options.maxRetries) {
  72. this.receive({
  73. idx : i
  74. , child : childId
  75. , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
  76. })
  77. } else {
  78. call.retries++
  79. this.callQueue.unshift(call)
  80. doQueue = true
  81. }
  82. }.bind(this))
  83. }
  84. this.stopChild(childId)
  85. doQueue && this.processQueue()
  86. }.bind(this), 10)
  87. }
  88. // start a new worker
  89. Farm.prototype.startChild = function () {
  90. this.childId++
  91. let forked = fork(this.path, this.options.workerOptions)
  92. , id = this.childId
  93. , c = {
  94. send : forked.send
  95. , child : forked.child
  96. , calls : []
  97. , activeCalls : 0
  98. , exitCode : null
  99. }
  100. this.options.onChild(forked.child);
  101. forked.child.on('message', function(data) {
  102. if (data.owner !== 'farm') {
  103. return;
  104. }
  105. this.receive(data);
  106. }.bind(this))
  107. forked.child.once('exit', function (code) {
  108. c.exitCode = code
  109. this.onExit(id)
  110. }.bind(this))
  111. this.activeChildren++
  112. this.children[id] = c
  113. }
  114. // stop a worker, identified by id
  115. Farm.prototype.stopChild = function (childId) {
  116. let child = this.children[childId]
  117. if (child) {
  118. child.send({owner: 'farm', event: 'die'})
  119. setTimeout(function () {
  120. if (child.exitCode === null)
  121. child.child.kill('SIGKILL')
  122. }, this.options.forcedKillTime).unref()
  123. ;delete this.children[childId]
  124. this.activeChildren--
  125. }
  126. }
  127. // called from a child process, the data contains information needed to
  128. // look up the child and the original call so we can invoke the callback
  129. Farm.prototype.receive = function (data) {
  130. let idx = data.idx
  131. , childId = data.child
  132. , args = data.args
  133. , child = this.children[childId]
  134. , call
  135. if (!child) {
  136. return console.error(
  137. 'Worker Farm: Received message for unknown child. '
  138. + 'This is likely as a result of premature child death, '
  139. + 'the operation will have been re-queued.'
  140. )
  141. }
  142. call = child.calls[idx]
  143. if (!call) {
  144. return console.error(
  145. 'Worker Farm: Received message for unknown index for existing child. '
  146. + 'This should not happen!'
  147. )
  148. }
  149. if (this.options.maxCallTime !== Infinity)
  150. clearTimeout(call.timer)
  151. if (args[0] && args[0].$error == '$error') {
  152. let e = args[0]
  153. switch (e.type) {
  154. case 'TypeError': args[0] = new TypeError(e.message); break
  155. case 'RangeError': args[0] = new RangeError(e.message); break
  156. case 'EvalError': args[0] = new EvalError(e.message); break
  157. case 'ReferenceError': args[0] = new ReferenceError(e.message); break
  158. case 'SyntaxError': args[0] = new SyntaxError(e.message); break
  159. case 'URIError': args[0] = new URIError(e.message); break
  160. default: args[0] = new Error(e.message)
  161. }
  162. args[0].type = e.type
  163. args[0].stack = e.stack
  164. // Copy any custom properties to pass it on.
  165. Object.keys(e).forEach(function(key) {
  166. args[0][key] = e[key];
  167. });
  168. }
  169. process.nextTick(function () {
  170. call.callback.apply(null, args)
  171. })
  172. ;delete child.calls[idx]
  173. child.activeCalls--
  174. this.activeCalls--
  175. if (child.calls.length >= this.options.maxCallsPerWorker
  176. && !Object.keys(child.calls).length) {
  177. // this child has finished its run, kill it
  178. this.stopChild(childId)
  179. }
  180. // allow any outstanding calls to be processed
  181. this.processQueue()
  182. }
  183. Farm.prototype.childTimeout = function (childId) {
  184. let child = this.children[childId]
  185. , i
  186. if (!child)
  187. return
  188. for (i in child.calls) {
  189. this.receive({
  190. idx : i
  191. , child : childId
  192. , args : [ new TimeoutError('worker call timed out!') ]
  193. })
  194. }
  195. this.stopChild(childId)
  196. }
  197. // send a call to a worker, identified by id
  198. Farm.prototype.send = function (childId, call) {
  199. let child = this.children[childId]
  200. , idx = child.calls.length
  201. child.calls.push(call)
  202. child.activeCalls++
  203. this.activeCalls++
  204. child.send({
  205. owner : 'farm'
  206. , idx : idx
  207. , child : childId
  208. , method : call.method
  209. , args : call.args
  210. })
  211. if (this.options.maxCallTime !== Infinity) {
  212. call.timer =
  213. setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
  214. }
  215. }
  216. // a list of active worker ids, in order, but the starting offset is
  217. // shifted each time this method is called, so we work our way through
  218. // all workers when handing out jobs
  219. Farm.prototype.childKeys = function () {
  220. let cka = Object.keys(this.children)
  221. , cks
  222. if (this.searchStart >= cka.length - 1)
  223. this.searchStart = 0
  224. else
  225. this.searchStart++
  226. cks = cka.splice(0, this.searchStart)
  227. return cka.concat(cks)
  228. }
  229. // Calls are added to a queue, this processes the queue and is called
  230. // whenever there might be a chance to send more calls to the workers.
  231. // The various options all impact on when we're able to send calls,
  232. // they may need to be kept in a queue until a worker is ready.
  233. Farm.prototype.processQueue = function () {
  234. let cka, i = 0, childId
  235. if (!this.callQueue.length)
  236. return this.ending && this.end()
  237. if (this.activeChildren < this.options.maxConcurrentWorkers)
  238. this.startChild()
  239. for (cka = this.childKeys(); i < cka.length; i++) {
  240. childId = +cka[i]
  241. if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
  242. && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
  243. this.send(childId, this.callQueue.shift())
  244. if (!this.callQueue.length)
  245. return this.ending && this.end()
  246. } /*else {
  247. console.log(
  248. , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
  249. , this.children[childId].calls.length < this.options.maxCallsPerWorker
  250. , this.children[childId].calls.length , this.options.maxCallsPerWorker)
  251. }*/
  252. }
  253. if (this.ending)
  254. this.end()
  255. }
  256. // add a new call to the call queue, then trigger a process of the queue
  257. Farm.prototype.addCall = function (call) {
  258. if (this.ending)
  259. return this.end() // don't add anything new to the queue
  260. this.callQueue.push(call)
  261. this.processQueue()
  262. }
  263. // kills child workers when they're all done
  264. Farm.prototype.end = function (callback) {
  265. let complete = true
  266. if (this.ending === false)
  267. return
  268. if (callback)
  269. this.ending = callback
  270. else if (this.ending == null)
  271. this.ending = true
  272. Object.keys(this.children).forEach(function (child) {
  273. if (!this.children[child])
  274. return
  275. if (!this.children[child].activeCalls)
  276. this.stopChild(child)
  277. else
  278. complete = false
  279. }.bind(this))
  280. if (complete && typeof this.ending == 'function') {
  281. process.nextTick(function () {
  282. this.ending()
  283. this.ending = false
  284. }.bind(this))
  285. }
  286. }
  287. module.exports = Farm
  288. module.exports.TimeoutError = TimeoutError