index.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. const express = require('express');
  2. const http = require( 'http');
  3. const WebSocket = require('ws');
  4. const cors = require('cors');
  5. const app = express();
  6. //initialize a simple http server
  7. const server = http.createServer(app);
  8. //initialize the WebSocket server instance
  9. const wss = new WebSocket.Server({ server });
  10. const messages = [];
  11. var sockets = [];
  12. /**
  13. * openPromise returns promise with resolve and reject callbacks available in object
  14. *
  15. * @access public
  16. * @return promise
  17. */
  18. function openPromise(){
  19. let resolve, reject;
  20. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  21. np.resolve = resolve;
  22. np.reject = reject;
  23. return np
  24. }
  25. /**
  26. * asynchronize returns generator of promises of chunks or messages
  27. *
  28. * @param s - Stream or Socket to generate promises
  29. * @param chunkEventName="message" stream event for data chunk or message
  30. * Use 'message' for ws or `data` for node Streams
  31. * @param endEventName="close" stream event for end/close of stream/socket
  32. * @access public
  33. * @return function*
  34. */
  35. function asynchronize({s, chunkEventName="message", endEventName="close"}){
  36. return function*(){
  37. //buffer of chunks incoming from stream
  38. const chunks = {};
  39. //buffer of promises yielding outside
  40. const promises = {};
  41. const clear = i => (delete chunks[i], delete promises[i])
  42. let chunkCount = 0; //autoincrement of chunks
  43. let promiseCount = 0; //autoincrement of promises
  44. let end = false; //flag
  45. //check availability of chunk and promise. If any, resolve promise, and clear both from queue
  46. const chunkAndPromise = i => (i in chunks) &&
  47. (i in promises) && (
  48. promises[i].resolve(chunks[i]),
  49. clear(i))
  50. s.on(chunkEventName, data => {
  51. chunks[chunkCount] = data
  52. chunkAndPromise(chunkCount)
  53. chunkCount++
  54. })
  55. //when end of stream becomes
  56. s.on(endEventName, () => {
  57. end = true;
  58. //all pending promises should be rejected
  59. for (let i in promises){
  60. promises[i].reject(new Error('End Of S'))
  61. }
  62. })
  63. while (!end){
  64. let p;
  65. promises[promiseCount] = p = openPromise();
  66. chunkAndPromise(promiseCount)
  67. promiseCount++;
  68. yield p; //yield promise outside
  69. }
  70. }
  71. }
  72. /**
  73. * broadcast sends same message to socket list
  74. *
  75. * @param data message to send
  76. * @access public
  77. * @return undefined
  78. */
  79. function broadcast(data){
  80. for (s of sockets){
  81. s.send(JSON.stringify(data))
  82. }
  83. }
  84. /**
  85. * Collection of remote available functions
  86. */
  87. let RPC = {
  88. /**
  89. * addMessage - saves message in history, broadcast it then returns length of history
  90. */
  91. addMessage({nick, message, id}){
  92. let messageObject = {nick, message, timestamp: (new Date).getTime(), id, sent: true}
  93. console.log(messageObject)
  94. messages.push(messageObject)
  95. broadcast({func: 'addMessage', ...messageObject})
  96. return messages.length;
  97. },
  98. /**
  99. * getMessages - returns messages younger than offset
  100. */
  101. getMessages({offset=0}){
  102. return messages.slice(offset)
  103. },
  104. /**
  105. * getUserCount - returns length of socket array (current online users)
  106. */
  107. getUserCount(){
  108. return sockets.length;
  109. }
  110. }
  111. wss.on('connection', async ws => {
  112. let gena = asynchronize({s:ws})
  113. sockets.push(ws)
  114. //broadcast new usercount after connect
  115. broadcast({func:'getUserCount', value: sockets.length})
  116. //main loop, iterating incoming messages
  117. for (let p of gena()){
  118. try {
  119. let message = await p;
  120. let data = JSON.parse(message)
  121. if (data.func in RPC){
  122. //run function from RPC if any, and send to socket result of it
  123. ws.send(JSON.stringify(RPC[data.func](data)))
  124. }
  125. }
  126. catch(e){
  127. //catching known rejection
  128. if (e.message === 'End Of S'){
  129. console.log('client OTPAL')
  130. //break loop when connection closed
  131. break;
  132. }
  133. // otherwise print error, but it should be throw e there
  134. console.log(e)
  135. }
  136. }
  137. //clear closed socket from list of sockets
  138. sockets = sockets.filter(s => s !== ws)
  139. //broadcast new usercount after disconnect
  140. broadcast({func:'getUserCount', value: sockets.length})
  141. });
  142. app.use(express.static('public'));
  143. app.use(cors());
  144. //start our server
  145. server.listen(process.env.PORT || 8999, () => {
  146. console.log(`Server started on port ${server.address().port} :)`);
  147. });