pool-cluster.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. 'use strict';
  2. const PoolClusterOptions = require('./config/pool-cluster-options');
  3. const PoolOptions = require('./config/pool-options');
  4. const Pool = require('./pool-promise');
  5. const PoolCallback = require('./pool-callback');
  6. const FilteredPoolCluster = require('./filtered-pool-cluster');
  7. const EventEmitter = require('events');
  8. const util = require('util');
  9. /**
  10. * Create a new Cluster.
  11. * Cluster handle pools with patterns and handle failover / distributed load
  12. * according to selectors (round robin / random / ordered )
  13. *
  14. * @param args cluster argurments. see pool-cluster-options.
  15. * @constructor
  16. */
  17. function PoolCluster(args) {
  18. const opts = new PoolClusterOptions(args);
  19. const nodes = {};
  20. let cachedPatterns = {};
  21. let nodeCounter = 0;
  22. EventEmitter.call(this);
  23. /**
  24. * Add a new pool node to cluster.
  25. *
  26. * @param id identifier
  27. * @param config pool configuration
  28. */
  29. this.add = (id, config) => {
  30. let identifier;
  31. if (typeof id === 'string' || id instanceof String) {
  32. identifier = id;
  33. if (nodes[identifier])
  34. throw new Error("Node identifier '" + identifier + "' already exist !");
  35. } else {
  36. identifier = 'PoolNode-' + nodeCounter++;
  37. config = id;
  38. }
  39. const options = new PoolOptions(config);
  40. const pool = _createPool(options);
  41. pool.initialize();
  42. nodes[identifier] = pool;
  43. };
  44. /**
  45. * End cluster (and underlying pools).
  46. *
  47. * @return {Promise<any[]>}
  48. */
  49. this.end = () => {
  50. cachedPatterns = {};
  51. const poolEndPromise = [];
  52. Object.keys(nodes).forEach((pool) => {
  53. poolEndPromise.push(nodes[pool].end());
  54. delete nodes[pool];
  55. });
  56. return Promise.all(poolEndPromise);
  57. };
  58. this.of = (pattern, selector) => {
  59. return new FilteredPoolCluster(this, pattern, selector);
  60. };
  61. /**
  62. * Remove nodes according to pattern.
  63. *
  64. * @param pattern pattern
  65. */
  66. this.remove = (pattern) => {
  67. if (!pattern) throw new Error('pattern parameter in Cluster.remove(pattern) is mandatory');
  68. const regex = RegExp(pattern);
  69. Object.keys(nodes).forEach((key) => {
  70. if (regex.test(key)) {
  71. nodes[key].end();
  72. delete nodes[key];
  73. cachedPatterns = {};
  74. }
  75. });
  76. };
  77. /**
  78. * Get connection from available pools matching pattern, according to selector
  79. *
  80. * @param pattern pattern filter (not mandatory)
  81. * @param selector node selector ('RR','RANDOM' or 'ORDER')
  82. * @return {Promise}
  83. */
  84. this.getConnection = (pattern, selector) => {
  85. return _getConnection(this, pattern, selector);
  86. };
  87. /**
  88. * Force using callback methods.
  89. */
  90. this.setCallback = () => {
  91. this.getConnection = _getConnectionCallback.bind(this, this);
  92. _createPool = _createPoolCallback;
  93. };
  94. /**
  95. * Get connection from available pools matching pattern, according to selector
  96. * with additional parameter to avoid reusing failing node
  97. *
  98. * @param pattern pattern filter (not mandatory)
  99. * @param selector node selector ('RR','RANDOM' or 'ORDER')
  100. * @param avoidNodeKey failing node
  101. * @param lastError last error
  102. * @return {Promise}
  103. * @private
  104. */
  105. const _getConnection = (cluster, pattern, selector, avoidNodeKey, lastError) => {
  106. const matchingNodeList = _matchingNodes(pattern || /^/);
  107. if (matchingNodeList.length === 0) {
  108. if (Object.keys(nodes).length === 0 && !lastError) {
  109. return Promise.reject(
  110. new Error(
  111. 'No node have been added to cluster ' +
  112. 'or nodes have been removed due to too much connection error'
  113. )
  114. );
  115. }
  116. if (avoidNodeKey === undefined)
  117. return Promise.reject(new Error("No node found for pattern '" + pattern + "'"));
  118. const errMsg =
  119. "No Connection available for '" +
  120. pattern +
  121. "'" +
  122. (lastError ? '. Last connection error was: ' + lastError.message : '');
  123. return Promise.reject(new Error(errMsg));
  124. }
  125. const retry = _getConnection.bind(this, this, pattern, selector);
  126. try {
  127. const nodeKey = _selectPool(matchingNodeList, selector, avoidNodeKey);
  128. return _handleConnectionError(cluster, matchingNodeList, nodeKey, retry);
  129. } catch (e) {
  130. return Promise.reject(e);
  131. }
  132. };
  133. let _createPool = (options) => {
  134. return new Pool(options, false);
  135. };
  136. const _createPoolCallback = (options) => {
  137. return new PoolCallback(options, false);
  138. };
  139. /**
  140. * Get connection from available pools matching pattern, according to selector
  141. * with additional parameter to avoid reusing failing node
  142. *
  143. * @param pattern pattern filter (not mandatory)
  144. * @param selector node selector ('RR','RANDOM' or 'ORDER')
  145. * @param callback callback function
  146. * @param avoidNodeKey failing node
  147. * @param lastError last error
  148. * @private
  149. */
  150. const _getConnectionCallback = (
  151. cluster,
  152. pattern,
  153. selector,
  154. callback,
  155. avoidNodeKey,
  156. lastError
  157. ) => {
  158. const matchingNodeList = _matchingNodes(pattern || /^/);
  159. if (matchingNodeList.length === 0) {
  160. if (Object.keys(nodes).length === 0 && !lastError) {
  161. callback(
  162. new Error(
  163. 'No node have been added to cluster ' +
  164. 'or nodes have been removed due to too much connection error'
  165. )
  166. );
  167. return;
  168. }
  169. if (avoidNodeKey === undefined)
  170. callback(new Error("No node found for pattern '" + pattern + "'"));
  171. const errMsg =
  172. "No Connection available for '" +
  173. pattern +
  174. "'" +
  175. (lastError ? '. Last connection error was: ' + lastError.message : '');
  176. callback(new Error(errMsg));
  177. return;
  178. }
  179. const retry = _getConnectionCallback.bind(this, this, pattern, selector, callback);
  180. try {
  181. const nodeKey = _selectPool(matchingNodeList, selector, avoidNodeKey);
  182. _handleConnectionCallbackError(this, matchingNodeList, nodeKey, retry, callback);
  183. } catch (e) {
  184. callback(e);
  185. }
  186. };
  187. /**
  188. * Selecting nodes according to pattern.
  189. *
  190. * @param pattern pattern
  191. * @return {*}
  192. * @private
  193. */
  194. const _matchingNodes = (pattern) => {
  195. if (cachedPatterns[pattern]) return cachedPatterns[pattern];
  196. const regex = RegExp(pattern);
  197. const matchingNodeList = [];
  198. Object.keys(nodes).forEach((key) => {
  199. if (regex.test(key)) {
  200. matchingNodeList.push(key);
  201. }
  202. });
  203. cachedPatterns[pattern] = matchingNodeList;
  204. return matchingNodeList;
  205. };
  206. /**
  207. * Select next node to be chosen in nodeList according to selector and failed nodes.
  208. *
  209. * @param nodeList current node list
  210. * @param selectorParam selector
  211. * @param avoidNodeKey last failing node to avoid selecting this one.
  212. * @return {Promise}
  213. * @private
  214. */
  215. const _selectPool = (nodeList, selectorParam, avoidNodeKey) => {
  216. const selector = selectorParam || opts.defaultSelector;
  217. let retry = 0;
  218. let selectorFct;
  219. let nodeKey;
  220. switch (selector) {
  221. case 'RR':
  222. selectorFct = roundRobinSelector;
  223. break;
  224. case 'RANDOM':
  225. selectorFct = randomSelector;
  226. break;
  227. case 'ORDER':
  228. selectorFct = orderedSelector;
  229. break;
  230. default:
  231. throw new Error(
  232. "Wrong selector value '" + selector + "'. Possible values are 'RR','RANDOM' or 'ORDER'"
  233. );
  234. }
  235. nodeKey = selectorFct(nodeList, retry);
  236. while (
  237. (avoidNodeKey === nodeKey || nodes[nodeKey].blacklistedUntil > Date.now()) &&
  238. retry < nodeList.length - 1
  239. ) {
  240. retry++;
  241. nodeKey = selectorFct(nodeList, retry);
  242. }
  243. return nodeKey;
  244. };
  245. /**
  246. * Round robin selector: using nodes one after the other.
  247. *
  248. * @param nodeList node list
  249. * @return {String}
  250. */
  251. const roundRobinSelector = (nodeList) => {
  252. let lastRoundRobin = nodeList.lastRrIdx;
  253. if (lastRoundRobin === undefined) lastRoundRobin = -1;
  254. if (++lastRoundRobin >= nodeList.length) lastRoundRobin = 0;
  255. nodeList.lastRrIdx = lastRoundRobin;
  256. return nodeList[lastRoundRobin];
  257. };
  258. /**
  259. * Random selector: use a random node.
  260. *
  261. * @param nodeList node list
  262. * @return {String}
  263. */
  264. const randomSelector = (nodeList) => {
  265. let randomIdx = Math.floor(Math.random() * nodeList.length);
  266. return nodeList[randomIdx];
  267. };
  268. /**
  269. * Ordered selector: always use the nodes in sequence, unless failing.
  270. *
  271. * @param nodeList node list
  272. * @param retry sequence number if last node is tagged has failing
  273. * @return {String}
  274. */
  275. const orderedSelector = (nodeList, retry) => {
  276. return nodeList[retry];
  277. };
  278. /**
  279. * Connect, or if fail handle retry / set timeout error
  280. *
  281. * @param cluster current cluster
  282. * @param nodeList current node list
  283. * @param nodeKey node name to connect
  284. * @param retryFct retry function
  285. * @return {Promise}
  286. * @private
  287. */
  288. const _handleConnectionError = (cluster, nodeList, nodeKey, retryFct) => {
  289. const node = nodes[nodeKey];
  290. return node
  291. .getConnection()
  292. .then((conn) => {
  293. node.errorCount = 0;
  294. return Promise.resolve(conn);
  295. })
  296. .catch((err) => {
  297. node.errorCount = node.errorCount ? node.errorCount + 1 : 1;
  298. node.blacklistedUntil = Date.now() + opts.restoreNodeTimeout;
  299. if (
  300. opts.removeNodeErrorCount &&
  301. node.errorCount >= opts.removeNodeErrorCount &&
  302. nodes[nodeKey]
  303. ) {
  304. delete nodes[nodeKey];
  305. cachedPatterns = {};
  306. delete nodeList.lastRrIdx;
  307. process.nextTick(() => cluster.emit('remove', nodeKey));
  308. //remove node from configuration if not already removed
  309. node.end().catch((err) => {
  310. // dismiss error
  311. });
  312. }
  313. if (nodeList.length !== 0 && opts.canRetry) {
  314. return retryFct(nodeKey, err);
  315. }
  316. return Promise.reject(err);
  317. });
  318. };
  319. /**
  320. * Connect, or if fail handle retry / set timeout error
  321. *
  322. * @param cluster current cluster
  323. * @param nodeList current node list
  324. * @param nodeKey node name to connect
  325. * @param retryFct retry function
  326. * @param callback callback function
  327. * @private
  328. */
  329. const _handleConnectionCallbackError = (cluster, nodeList, nodeKey, retryFct, callback) => {
  330. const node = nodes[nodeKey];
  331. node.getConnection((err, conn) => {
  332. if (err) {
  333. node.errorCount = node.errorCount ? node.errorCount + 1 : 1;
  334. node.blacklistedUntil = Date.now() + opts.restoreNodeTimeout;
  335. if (
  336. opts.removeNodeErrorCount &&
  337. node.errorCount >= opts.removeNodeErrorCount &&
  338. nodes[nodeKey]
  339. ) {
  340. delete nodes[nodeKey];
  341. cachedPatterns = {};
  342. delete nodeList.lastRrIdx;
  343. process.nextTick(() => cluster.emit('remove', nodeKey));
  344. //remove node from configuration if not already removed
  345. node.end(() => {
  346. //dismiss error
  347. });
  348. if (nodeList.length === 0) return Promise.reject(err);
  349. }
  350. if (opts.canRetry) return retryFct(nodeKey, err);
  351. callback(err);
  352. } else {
  353. node.errorCount = 0;
  354. callback(null, conn);
  355. }
  356. });
  357. };
  358. //*****************************************************************
  359. // internal public testing methods
  360. //*****************************************************************
  361. function TestMethods() {}
  362. TestMethods.prototype.getNodes = () => {
  363. return nodes;
  364. };
  365. this.__tests = new TestMethods();
  366. }
  367. util.inherits(PoolCluster, EventEmitter);
  368. module.exports = PoolCluster;