1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- 'use strict';
- const Observable = require('any-observable');
- function or(option, alternate, required) {
- const result = option === false ? false : option || alternate;
- if ((required && !result) || (result && typeof result !== 'string')) {
- throw new TypeError(alternate + 'Event must be a string.');
- }
- return result;
- }
- module.exports = (stream, opts) => {
- opts = opts || {};
- let complete = false;
- let dataListeners = [];
- const awaited = opts.await;
- const dataEvent = or(opts.dataEvent, 'data', true);
- const errorEvent = or(opts.errorEvent, 'error');
- const endEvent = or(opts.endEvent, 'end');
- function cleanup() {
- complete = true;
- dataListeners.forEach(listener => {
- stream.removeListener(dataEvent, listener);
- });
- dataListeners = null;
- }
- const completion = new Promise((resolve, reject) => {
- function onEnd(result) {
- if (awaited) {
- awaited.then(resolve);
- } else {
- resolve(result);
- }
- }
- if (endEvent) {
- stream.once(endEvent, onEnd);
- } else if (awaited) {
- onEnd();
- }
- if (errorEvent) {
- stream.once(errorEvent, reject);
- }
- if (awaited) {
- awaited.catch(reject);
- }
- }).catch(err => {
- cleanup();
- throw err;
- }).then(result => {
- cleanup();
- return result;
- });
- return new Observable(observer => {
- completion
- .then(observer.complete.bind(observer))
- .catch(observer.error.bind(observer));
- if (complete) {
- return null;
- }
- const onData = data => {
- observer.next(data);
- };
- stream.on(dataEvent, onData);
- dataListeners.push(onData);
- return () => {
- stream.removeListener(dataEvent, onData);
- if (complete) {
- return;
- }
- const idx = dataListeners.indexOf(onData);
- if (idx !== -1) {
- dataListeners.splice(idx, 1);
- }
- };
- });
- };
|