index.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. const express = require('express');
  2. const http = require( 'http');
  3. const WebSocket = require('ws');
  4. const cors = require('cors');
  5. const app = express();
  6. //initialize a simple http server
  7. const server = http.createServer(app);
  8. //initialize the WebSocket server instance
  9. const wss = new WebSocket.Server({ server });
  10. const messages = [];
  11. var sockets = [];
  12. /**
  13. * openPromise returns promise with resolve and reject callbacks available in object
  14. *
  15. * @access public
  16. * @return promise
  17. */
  18. function openPromise(){
  19. let resolve, reject;
  20. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  21. np.resolve = resolve;
  22. np.reject = reject;
  23. return np
  24. }
  25. /**
  26. * asynchronize returns generator of promises of chunks or messages
  27. *
  28. * @param s - Stream or Socket to generate promises
  29. * @param chunkEventName="message" stream event for data chunk or message
  30. * Use 'message' for ws or `data` for node Streams
  31. * @param endEventName="close" stream event for end/close of stream/socket
  32. * @access public
  33. * @return function*
  34. */
  35. function asynchronize({s, chunkEventName="message", endEventName="close"}){
  36. return function*(){
  37. //buffer of chunks incoming from stream
  38. const chunks = {};
  39. //buffer of promises yielding outside
  40. const promises = {};
  41. const clear = i => (delete chunks[i], delete promises[i])
  42. let chunkCount = 0; //autoincrement of chunks
  43. let promiseCount = 0; //autoincrement of promises
  44. let end = false; //flag
  45. //check availability of chunk and promise. If any, resolve promise, and clear both from queue
  46. const chunkAndPromise = i => (i in chunks) &&
  47. (i in promises) && (
  48. promises[i].resolve(chunks[i]),
  49. clear(i))
  50. s.on(chunkEventName, data => {
  51. chunks[chunkCount] = data
  52. chunkAndPromise(chunkCount)
  53. chunkCount++
  54. })
  55. //when end of stream becomes
  56. s.on(endEventName, () => {
  57. end = true;
  58. //all pending promises should be rejected
  59. for (let i in promises){
  60. promises[i].reject(new Error('End Of S'))
  61. }
  62. })
  63. while (!end){
  64. let p;
  65. promises[promiseCount] = p = openPromise();
  66. chunkAndPromise(promiseCount)
  67. promiseCount++;
  68. yield p; //yield promise outside
  69. }
  70. }
  71. }
  72. /**
  73. * broadcast sends same message to socket list
  74. *
  75. * @param data message to send
  76. * @access public
  77. * @return undefined
  78. */
  79. function broadcast(data){
  80. for (s of sockets){
  81. console.log(data)
  82. s.send(JSON.stringify(data))
  83. }
  84. }
  85. /**
  86. * Collection of remote available functions
  87. */
  88. let RPC = {
  89. /**
  90. * addMessage - saves message in history, broadcast it then returns length of history
  91. */
  92. addMessage({nick, message}){
  93. messages.push({nick, message, timestamp: (new Date).getTime()})
  94. broadcast({func: 'addMessage', nick, message})
  95. return messages.length;
  96. },
  97. /**
  98. * getMessages - returns messages younger than offset
  99. */
  100. getMessages({offset=0}){
  101. return messages.slice(offset)
  102. },
  103. /**
  104. * getUserCount - returns length of socket array (current online users)
  105. */
  106. getUserCount(){
  107. return sockets.length;
  108. }
  109. }
  110. wss.on('connection', async ws => {
  111. let gena = asynchronize({s:ws})
  112. sockets.push(ws)
  113. //broadcast new usercount after connect
  114. broadcast({func:'getUserCount', value: sockets.length})
  115. //main loop, iterating incoming messages
  116. for (let p of gena()){
  117. try {
  118. let message = await p;
  119. let data = JSON.parse(message)
  120. if (data.func in RPC){
  121. //run function from RPC if any, and send to socket result of it
  122. ws.send(JSON.stringify(RPC[data.func](data)))
  123. }
  124. }
  125. catch(e){
  126. //catching known rejection
  127. if (e.message === 'End Of S'){
  128. console.log('client OTPAL')
  129. //break loop when connection closed
  130. break;
  131. }
  132. // otherwise print error, but it should be throw e there
  133. console.log(e)
  134. }
  135. }
  136. //clear closed socket from list of sockets
  137. sockets = sockets.filter(s => s !== ws)
  138. //broadcast new usercount after disconnect
  139. broadcast({func:'getUserCount', value: sockets.length})
  140. });
  141. app.use(express.static('public'));
  142. app.use(cors());
  143. //start our server
  144. server.listen(process.env.PORT || 8999, () => {
  145. console.log(`Server started on port ${server.address().port} :)`);
  146. });