1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- function openPromise(){
- let resolve, reject;
- let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
- np.resolve = resolve;
- np.reject = reject;
- return np
- }
- function asynchronize({s, chunkEventName, endEventName, errEventName, countMethodName}){
- return function* (){
- const chunks = {};
- const promises = {};
- const clear = i => (delete chunks[i], delete promises[i])
- let chunkCount = 0;
- let promiseCount = 0;
- let end = false;
- if (!('on' in s)){ //no on method in browser
- s.on = function(eventName, callback){ //polyfill
- this['on' + eventName] = callback;
- }
- }
- //check availability of chunk and promise. If any, resolve promise, and clear both from queue
- const chunkAndPromise = i => (i in chunks) &&
- (i in promises) && (
- promises[i].resolve(chunks[i]),
- clear(i))
- s.on(chunkEventName, data => {
- chunks[chunkCount] = data
- chunkAndPromise(chunkCount)
- chunkCount++
- })
- s.on(endEventName, () => {
- end = true;
- closeAllEmptyPromises()
- //console.log('END OF STREAM')
- })
- if (errEventName)
- s.on(errEventName, () => {
- end = true;
- closeAllEmptyPromises()
- //console.log('ERR OF STREAM')
- })
- const closeAllEmptyPromises = () => {
- for (let i in promises){ //when end and chunks are exhausted
- promises[i].reject(new Error('End Of S')) //reject all left promises
- }
- }
- if (countMethodName){
- let count = s[countMethodName](true)
- const checker = count =>
- count <= 0 &&
- (end = true ,
- closeAllEmptyPromises()
- /*, console.log(`COUNT ${count}`)*/ )
- if (count.then && typeof count.then === 'function')
- count.then(checker)
- else
- checker(count)
- }
- while (!end || Object.keys(chunks).length){
- let p;
- promises[promiseCount] = p = openPromise();
- chunkAndPromise(promiseCount)
- promiseCount++;
- yield p; //yield promise outside
- }
- //closeAllEmptyPromises()
- }
- }
- module.exports = {openPromise, asynchronize}
|