index.js 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. 'use strict';
  2. const Observable = require('any-observable');
  3. function or(option, alternate, required) {
  4. const result = option === false ? false : option || alternate;
  5. if ((required && !result) || (result && typeof result !== 'string')) {
  6. throw new TypeError(alternate + 'Event must be a string.');
  7. }
  8. return result;
  9. }
  10. module.exports = (stream, opts) => {
  11. opts = opts || {};
  12. let complete = false;
  13. let dataListeners = [];
  14. const awaited = opts.await;
  15. const dataEvent = or(opts.dataEvent, 'data', true);
  16. const errorEvent = or(opts.errorEvent, 'error');
  17. const endEvent = or(opts.endEvent, 'end');
  18. function cleanup() {
  19. complete = true;
  20. dataListeners.forEach(listener => {
  21. stream.removeListener(dataEvent, listener);
  22. });
  23. dataListeners = null;
  24. }
  25. const completion = new Promise((resolve, reject) => {
  26. function onEnd(result) {
  27. if (awaited) {
  28. awaited.then(resolve);
  29. } else {
  30. resolve(result);
  31. }
  32. }
  33. if (endEvent) {
  34. stream.once(endEvent, onEnd);
  35. } else if (awaited) {
  36. onEnd();
  37. }
  38. if (errorEvent) {
  39. stream.once(errorEvent, reject);
  40. }
  41. if (awaited) {
  42. awaited.catch(reject);
  43. }
  44. }).catch(err => {
  45. cleanup();
  46. throw err;
  47. }).then(result => {
  48. cleanup();
  49. return result;
  50. });
  51. return new Observable(observer => {
  52. completion
  53. .then(observer.complete.bind(observer))
  54. .catch(observer.error.bind(observer));
  55. if (complete) {
  56. return null;
  57. }
  58. const onData = data => {
  59. observer.next(data);
  60. };
  61. stream.on(dataEvent, onData);
  62. dataListeners.push(onData);
  63. return () => {
  64. stream.removeListener(dataEvent, onData);
  65. if (complete) {
  66. return;
  67. }
  68. const idx = dataListeners.indexOf(onData);
  69. if (idx !== -1) {
  70. dataListeners.splice(idx, 1);
  71. }
  72. };
  73. });
  74. };