asynchronize.js 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. function openPromise(){
  2. let resolve, reject;
  3. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  4. np.resolve = resolve;
  5. np.reject = reject;
  6. return np
  7. }
  8. function asynchronize({s, chunkEventName, endEventName, errEventName, countMethodName}){
  9. return function* (){
  10. const chunks = {};
  11. const promises = {};
  12. const clear = i => (delete chunks[i], delete promises[i])
  13. let chunkCount = 0;
  14. let promiseCount = 0;
  15. let end = false;
  16. if (!('on' in s)){ //no on method in browser
  17. s.on = function(eventName, callback){ //polyfill
  18. this['on' + eventName] = callback;
  19. }
  20. }
  21. //check availability of chunk and promise. If any, resolve promise, and clear both from queue
  22. const chunkAndPromise = i => (i in chunks) &&
  23. (i in promises) && (
  24. promises[i].resolve(chunks[i]),
  25. clear(i))
  26. s.on(chunkEventName, data => {
  27. chunks[chunkCount] = data
  28. chunkAndPromise(chunkCount)
  29. chunkCount++
  30. })
  31. s.on(endEventName, () => {
  32. end = true;
  33. closeAllEmptyPromises()
  34. //console.log('END OF STREAM')
  35. })
  36. if (errEventName)
  37. s.on(errEventName, () => {
  38. end = true;
  39. closeAllEmptyPromises()
  40. //console.log('ERR OF STREAM')
  41. })
  42. const closeAllEmptyPromises = () => {
  43. for (let i in promises){ //when end and chunks are exhausted
  44. promises[i].reject(new Error('End Of S')) //reject all left promises
  45. }
  46. }
  47. if (countMethodName){
  48. let count = s[countMethodName](true)
  49. const checker = count =>
  50. count <= 0 &&
  51. (end = true ,
  52. closeAllEmptyPromises()
  53. /*, console.log(`COUNT ${count}`)*/ )
  54. if (count.then && typeof count.then === 'function')
  55. count.then(checker)
  56. else
  57. checker(count)
  58. }
  59. while (!end || Object.keys(chunks).length){
  60. let p;
  61. promises[promiseCount] = p = openPromise();
  62. chunkAndPromise(promiseCount)
  63. promiseCount++;
  64. yield p; //yield promise outside
  65. }
  66. //closeAllEmptyPromises()
  67. }
  68. }
  69. module.exports = {openPromise, asynchronize}