index.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. const express = require('express');
  2. const http = require( 'http');
  3. const WebSocket = require('ws');
  4. const app = express();
  5. //initialize a simple http server
  6. const server = http.createServer(app);
  7. //initialize the WebSocket server instance
  8. const wss = new WebSocket.Server({ server });
  9. const messages = [];
  10. var sockets = [];
  11. /**
  12. * openPromise returns promise with resolve and reject callbacks available in object
  13. *
  14. * @access public
  15. * @return promise
  16. */
  17. function openPromise(){
  18. let resolve, reject;
  19. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  20. np.resolve = resolve;
  21. np.reject = reject;
  22. return np
  23. }
  24. /**
  25. * asynchronize returns generator of promises of chunks or messages
  26. *
  27. * @param s - Stream or Socket to generate promises
  28. * @param chunkEventName="message" stream event for data chunk or message
  29. * Use 'message' for ws or `data` for node Streams
  30. * @param endEventName="close" stream event for end/close of stream/socket
  31. * @access public
  32. * @return function*
  33. */
  34. function asynchronize({s, chunkEventName="message", endEventName="close"}){
  35. return function*(){
  36. //buffer of chunks incoming from stream
  37. const chunks = {};
  38. //buffer of promises yielding outside
  39. const promises = {};
  40. const clear = i => (delete chunks[i], delete promises[i])
  41. let chunkCount = 0; //autoincrement of chunks
  42. let promiseCount = 0; //autoincrement of promises
  43. let end = false; //flag
  44. //check availability of chunk and promise. If any, resolve promise, and clear both from queue
  45. const chunkAndPromise = i => (i in chunks) &&
  46. (i in promises) && (
  47. promises[i].resolve(chunks[i]),
  48. clear(i))
  49. s.on(chunkEventName, data => {
  50. chunks[chunkCount] = data
  51. chunkAndPromise(chunkCount)
  52. chunkCount++
  53. })
  54. //when end of stream becomes
  55. s.on(endEventName, () => {
  56. end = true;
  57. //all pending promises should be rejected
  58. for (let i in promises){
  59. promises[i].reject(new Error('End Of S'))
  60. }
  61. })
  62. while (!end){
  63. let p;
  64. promises[promiseCount] = p = openPromise();
  65. chunkAndPromise(promiseCount)
  66. promiseCount++;
  67. yield p; //yield promise outside
  68. }
  69. }
  70. }
  71. /**
  72. * broadcast sends same message to socket list
  73. *
  74. * @param data message to send
  75. * @access public
  76. * @return undefined
  77. */
  78. function broadcast(data){
  79. for (s of sockets){
  80. console.log(data)
  81. s.send(JSON.stringify(data))
  82. }
  83. }
  84. /**
  85. * Collection of remote available functions
  86. */
  87. let RPC = {
  88. /**
  89. * addMessage - saves message in history, broadcast it then returns length of history
  90. */
  91. addMessage({nick, message}){
  92. messages.push({nick, message, timestamp: (new Date).getTime()})
  93. broadcast({func: 'addMessage', nick, message})
  94. return messages.length;
  95. },
  96. /**
  97. * getMessages - returns messages younger than offset
  98. */
  99. getMessages({offset=0}){
  100. return messages.slice(offset)
  101. },
  102. /**
  103. * getUserCount - returns length of socket array (current online users)
  104. */
  105. getUserCount(){
  106. return sockets.length;
  107. }
  108. }
  109. wss.on('connection', async ws => {
  110. let gena = asynchronize({s:ws})
  111. sockets.push(ws)
  112. //broadcast new usercount after connect
  113. broadcast({func:'getUserCount', value: sockets.length})
  114. //main loop, iterating incoming messages
  115. for (let p of gena()){
  116. try {
  117. let message = await p;
  118. let data = JSON.parse(message)
  119. if (data.func in RPC){
  120. //run function from RPC if any, and send to socket result of it
  121. ws.send(JSON.stringify(RPC[data.func](data)))
  122. }
  123. }
  124. catch(e){
  125. //catching known rejection
  126. if (e.message === 'End Of S'){
  127. console.log('client OTPAL')
  128. //break loop when connection closed
  129. break;
  130. }
  131. // otherwise print error, but it should be throw e there
  132. console.log(e)
  133. }
  134. }
  135. //clear closed socket from list of sockets
  136. sockets = sockets.filter(s => s !== ws)
  137. //broadcast new usercount after disconnect
  138. broadcast({func:'getUserCount', value: sockets.length})
  139. });
  140. app.use(express.static('public'));
  141. //start our server
  142. server.listen(process.env.PORT || 8999, () => {
  143. console.log(`Server started on port ${server.address().port} :)`);
  144. });