index.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. const express = require('express');
  2. const http = require( 'http');
  3. const WebSocket = require('ws');
  4. const app = express();
  5. //initialize a simple http server
  6. const server = http.createServer(app);
  7. //initialize the WebSocket server instance
  8. const wss = new WebSocket.Server({ server });
  9. const messages = [];
  10. var sockets = [];
  11. /**
  12. * openPromise returns promise with resolve and reject callbacks available in object
  13. *
  14. * @access public
  15. * @return promise
  16. */
  17. function openPromise(){
  18. let resolve, reject;
  19. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  20. np.resolve = resolve;
  21. np.reject = reject;
  22. return np
  23. }
  24. /**
  25. * asynchronize returns generator of promises of chunks or messages
  26. *
  27. * @param s - Stream or Socket to generate promises
  28. * @param chunkEventName="message" stream event for data chunk or message
  29. * Use 'message' for ws or `data` for node Streams
  30. * @param endEventName="close" stream event for end/close of stream/socket
  31. * @access public
  32. * @return function*
  33. */
  34. function asynchronize({s, chunkEventName="message", endEventName="close"}){
  35. return function*(){
  36. //buffer of chunks incoming from stream
  37. const chunks = {};
  38. //buffer of promises yielding outside
  39. const promises = {};
  40. const clear = i => (delete chunks[i], delete promises[i])
  41. let chunkCount = 0; //autoincrement of chunks
  42. let promiseCount = 0; //autoincrement of promises
  43. let end = false; //flag
  44. s.on(chunkEventName, data => {
  45. chunks[chunkCount] = data
  46. //if we've gived promise for current chunk
  47. if (chunkCount in promises){
  48. //resolving it
  49. promises[chunkCount].resolve(chunks[chunkCount])
  50. //and remove from both queues
  51. clear(chunkCount)
  52. }
  53. chunkCount++
  54. })
  55. //when end of stream becomes
  56. s.on(endEventName, () => {
  57. end = true;
  58. //all pending promises should be rejected
  59. for (let i in promises){
  60. promises[i].reject(new Error('End Of S'))
  61. }
  62. })
  63. while (!end){
  64. let p;
  65. promises[promiseCount] = p = openPromise();
  66. if (promiseCount in chunks){ //if chunk for this promise already exists
  67. p.resolve(chunks[promiseCount]) //resolve it
  68. //and clear this chunk and promise from queues
  69. clear(promiseCount)
  70. }
  71. promiseCount++;
  72. yield p; //yield promise outside
  73. }
  74. }
  75. }
  76. /**
  77. * broadcast sends same message to socket list
  78. *
  79. * @param data message to send
  80. * @access public
  81. * @return undefined
  82. */
  83. function broadcast(data){
  84. for (s of sockets){
  85. console.log(data)
  86. s.send(JSON.stringify(data))
  87. }
  88. }
  89. /**
  90. * Collection of remote available functions
  91. */
  92. let RPC = {
  93. /**
  94. * addMessage - saves message in history, broadcast it then returns length of history
  95. */
  96. addMessage({nick, message}){
  97. messages.push({nick, message, timestamp: (new Date).getTime()})
  98. broadcast({func: 'addMessage', nick, message})
  99. return messages.length;
  100. },
  101. /**
  102. * getMessages - returns messages younger than offset
  103. */
  104. getMessages({offset=0}){
  105. return messages.slice(offset)
  106. },
  107. /**
  108. * getUserCount - returns length of socket array (current online users)
  109. */
  110. getUserCount(){
  111. return sockets.length;
  112. }
  113. }
  114. wss.on('connection', async ws => {
  115. let gena = asynchronize({s:ws})
  116. sockets.push(ws)
  117. //broadcast new usercount after connect
  118. broadcast({func:'getUserCount', value: sockets.length})
  119. //main loop, iterating incoming messages
  120. for (let p of gena()){
  121. try {
  122. let message = await p;
  123. let data = JSON.parse(message)
  124. if (data.func in RPC){
  125. //run function from RPC if any, and send to socket result of it
  126. ws.send(JSON.stringify(RPC[data.func](data)))
  127. }
  128. }
  129. catch(e){
  130. //catching known rejection
  131. if (e.message === 'End Of S'){
  132. console.log('client OTPAL')
  133. //break loop when connection closed
  134. break;
  135. }
  136. // otherwise print error, but it should be throw e there
  137. console.log(e)
  138. }
  139. }
  140. //clear closed socket from list of sockets
  141. sockets = sockets.filter(s => s !== ws)
  142. //broadcast new usercount after disconnect
  143. broadcast({func:'getUserCount', value: sockets.length})
  144. });
  145. app.use(express.static('public'));
  146. //start our server
  147. server.listen(process.env.PORT || 8999, () => {
  148. console.log(`Server started on port ${server.address().port} :)`);
  149. });