123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- var async = require('./async.js');
- module.exports = {
- iterator: wrapIterator,
- callback: wrapCallback
- };
- function wrapIterator(iterator)
- {
- var stream = this;
- return function(item, key, cb)
- {
- var aborter
- , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
- ;
- stream.jobs[key] = wrappedCb;
-
- if (iterator.length == 2)
- {
- aborter = iterator(item, wrappedCb);
- }
-
- else
- {
- aborter = iterator(item, key, wrappedCb);
- }
- return aborter;
- };
- }
- function wrapCallback(callback)
- {
- var stream = this;
- var wrapped = function(error, result)
- {
- return finisher.call(stream, error, result, callback);
- };
- return wrapped;
- }
- function wrapIteratorCallback(callback, key)
- {
- var stream = this;
- return function(error, output)
- {
-
- if (!(key in stream.jobs))
- {
- callback(error, output);
- return;
- }
-
- delete stream.jobs[key];
- return streamer.call(stream, error, {key: key, value: output}, callback);
- };
- }
- function streamer(error, output, callback)
- {
- if (error && !this.error)
- {
- this.error = error;
- this.pause();
- this.emit('error', error);
-
- callback(error, output && output.value);
- return;
- }
-
- this.push(output);
-
-
- callback(error, output && output.value);
- }
- function finisher(error, output, callback)
- {
-
-
- if (!error)
- {
- this.push(null);
- }
-
- callback(error, output);
- }
|