index.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. const express = require('express');
  2. const http = require( 'http');
  3. const WebSocket = require('ws');
  4. const app = express();
  5. //initialize a simple http server
  6. const server = http.createServer(app);
  7. //initialize the WebSocket server instance
  8. const wss = new WebSocket.Server({ server });
  9. const messages = [];
  10. var sockets = [];
  11. /**
  12. * openPromise returns promise with resolve and reject callbacks available in object
  13. *
  14. * @access public
  15. * @return promise
  16. */
  17. function openPromise(){
  18. let resolve, reject;
  19. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  20. np.resolve = resolve;
  21. np.reject = reject;
  22. return np
  23. }
  24. /**
  25. * asynchronize returns generator of promises of chunks or messages
  26. *
  27. * @param s - Stream or Socket to generate promises
  28. * @param chunkEventName="message" stream event for data chunk or message
  29. * Use 'message' for ws or `data` for node Streams
  30. * @param endEventName="close" stream event for end/close of stream/socket
  31. * @access public
  32. * @return function*
  33. */
  34. function asynchronize({s, chunkEventName="message", endEventName="close"}){
  35. return function*(){
  36. //buffer of chunks incoming from stream
  37. const chunks = {};
  38. //buffer of promises yielding outside
  39. const promises = {};
  40. const clear = i => (delete chunks[i], delete promises[i])
  41. let chunkCount = 0; //autoincrement of chunks
  42. let promiseCount = 0; //autoincrement of promises
  43. let end = false; //flag
  44. s.on(chunkEventName, data => {
  45. chunks[chunkCount] = data
  46. //if we've gived promise for current chunk
  47. if (chunkCount in promises){
  48. //resolving it
  49. promises[chunkCount].resolve(chunks[chunkCount])
  50. //and remove from both queues
  51. clear(chunkCount)
  52. }
  53. chunkCount++
  54. })
  55. //when end of stream becomes
  56. s.on(endEventName, () => {
  57. end = true;
  58. //all pending promises should be rejected
  59. for (let i in promises){
  60. promises[i].reject(new Error('End Of S'))
  61. }
  62. })
  63. while (!end){
  64. let p = openPromise()
  65. if (promiseCount in chunks){ //if chunk for this promise already exists
  66. p.resolve(chunks[promiseCount]) //resolve it
  67. //and clear this chunk and promise from queues
  68. clear(promiseCount)
  69. }
  70. else {
  71. //otherwise save it in buffer
  72. //for later chunk or reject if end of stream
  73. promises[promiseCount] = p;
  74. promiseCount++
  75. }
  76. yield p; //yield promise outside
  77. }
  78. }
  79. }
  80. /**
  81. * broadcast sends same message to socket list
  82. *
  83. * @param data message to send
  84. * @access public
  85. * @return undefined
  86. */
  87. function broadcast(data){
  88. for (s of sockets){
  89. console.log(data)
  90. s.send(JSON.stringify(data))
  91. }
  92. }
  93. /**
  94. * Collection of remote available functions
  95. */
  96. let RPC = {
  97. /**
  98. * addMessage - saves message in history, broadcast it then returns length of history
  99. */
  100. addMessage({nick, message}){
  101. messages.push({nick, message, timestamp: (new Date).getTime()})
  102. broadcast({func: 'addMessage', nick, message})
  103. return messages.length;
  104. },
  105. /**
  106. * getMessages - returns messages younger than offset
  107. */
  108. getMessages({offset=0}){
  109. return messages.slice(offset)
  110. },
  111. /**
  112. * getUserCount - returns length of socket array (current online users)
  113. */
  114. getUserCount(){
  115. return sockets.length;
  116. }
  117. }
  118. wss.on('connection', async ws => {
  119. let gena = asynchronize({s:ws})
  120. sockets.push(ws)
  121. //broadcast new usercount after connect
  122. broadcast({func:'getUserCount', value: sockets.length})
  123. //main loop, iterating incoming messages
  124. for (let p of gena()){
  125. try {
  126. let message = await p;
  127. let data = JSON.parse(message)
  128. if (data.func in RPC){
  129. //run function from RPC if any, and send to socket result of it
  130. ws.send(JSON.stringify(RPC[data.func](data)))
  131. }
  132. }
  133. catch(e){
  134. //catching known rejection
  135. if (e.message === 'End Of S'){
  136. console.log('client OTPAL')
  137. //break loop when connection closed
  138. break;
  139. }
  140. // otherwise print error, but it should be throw e there
  141. console.log(e)
  142. }
  143. }
  144. //clear closed socket from list of sockets
  145. sockets = sockets.filter(s => s !== ws)
  146. //broadcast new usercount after disconnect
  147. broadcast({func:'getUserCount', value: sockets.length})
  148. });
  149. app.use(express.static('public'));
  150. //start our server
  151. server.listen(process.env.PORT || 8999, () => {
  152. console.log(`Server started on port ${server.address().port} :)`);
  153. });