async-queue.js 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. 'use strict';
  2. const BaseError = require('../../errors/base-error');
  3. const ConnectionError = require('../../errors/connection-error');
  4. /**
  5. * Thrown when a connection to a database is closed while an operation is in progress
  6. */
  7. class AsyncQueueError extends BaseError {
  8. constructor(message) {
  9. super(message);
  10. this.name = 'SequelizeAsyncQueueError';
  11. }
  12. }
  13. exports.AsyncQueueError = AsyncQueueError;
  14. class AsyncQueue {
  15. constructor() {
  16. this.previous = Promise.resolve();
  17. this.closed = false;
  18. this.rejectCurrent = () => {};
  19. }
  20. close() {
  21. this.closed = true;
  22. this.rejectCurrent(new ConnectionError(new AsyncQueueError('the connection was closed before this query could finish executing')));
  23. }
  24. enqueue(asyncFunction) {
  25. // This outer promise might seems superflous since down below we return asyncFunction().then(resolve, reject).
  26. // However, this ensures that this.previous will never be a rejected promise so the queue will
  27. // always keep going, while still communicating rejection from asyncFunction to the user.
  28. return new Promise((resolve, reject) => {
  29. this.previous = this.previous.then(
  30. () => {
  31. this.rejectCurrent = reject;
  32. if (this.closed) {
  33. return reject(new ConnectionError(new AsyncQueueError('the connection was closed before this query could be executed')));
  34. }
  35. return asyncFunction().then(resolve, reject);
  36. }
  37. );
  38. });
  39. }
  40. }
  41. exports.default = AsyncQueue;