index.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. const express = require('express');
  2. const http = require( 'http');
  3. const WebSocket = require('ws');
  4. const app = express();
  5. //initialize a simple http server
  6. const server = http.createServer(app);
  7. //initialize the WebSocket server instance
  8. const wss = new WebSocket.Server({ server });
  9. const messages = [];
  10. var sockets = [];
  11. function noPromise(){
  12. let resolve, reject;
  13. let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
  14. np.resolve = resolve;
  15. np.reject = reject;
  16. return np
  17. }
  18. function asynchronize({s, chunkEventName="message", endEventName="close"}){
  19. return function*(){
  20. const chunks = {};
  21. const promises = {};
  22. const clear = i => (delete chunks[i], delete promises[i])
  23. let chunkCount = 0;
  24. let promiseCount = 0;
  25. let end = false;
  26. s.on(chunkEventName, data => {
  27. chunks[chunkCount] = data
  28. if (chunkCount in promises){
  29. promises[chunkCount].resolve(chunks[chunkCount])
  30. clear(chunkCount)
  31. }
  32. chunkCount++
  33. })
  34. s.on(endEventName, () => {
  35. end = true;
  36. for (let i in promises){
  37. promises[i].reject(new Error('End Of S'))
  38. }
  39. })
  40. while (!end){
  41. let p = noPromise()
  42. if (promiseCount in chunks){
  43. p.resolve(chunks[promiseCount])
  44. clear(promiseCount)
  45. }
  46. else {
  47. promises[promiseCount] = p;
  48. promiseCount++
  49. }
  50. yield p;
  51. }
  52. }
  53. }
  54. function broadcast(data){
  55. for (s of sockets){
  56. console.log(data)
  57. s.send(JSON.stringify(data))
  58. }
  59. }
  60. let RPC = {
  61. addMessage({nick, message}){
  62. messages.push({nick, message, timestamp: (new Date).getTime()})
  63. broadcast({func: 'addMessage', nick, message})
  64. return messages.length;
  65. },
  66. getMessages({offset=0}){
  67. return messages.slice(offset)
  68. },
  69. getUserCount(){
  70. return sockets.length;
  71. }
  72. }
  73. wss.on('connection', async ws => {
  74. let gena = asynchronize({s:ws})
  75. sockets.push(ws)
  76. broadcast({func:'getUserCount', value: sockets.length})
  77. for (let p of gena()){
  78. try {
  79. let message = await p;
  80. let data = JSON.parse(message)
  81. if (data.func in RPC){
  82. ws.send(JSON.stringify(RPC[data.func](data)))
  83. }
  84. }
  85. catch(e){
  86. if (e.message === 'End Of S'){
  87. console.log('client OTPAL')
  88. break;
  89. }
  90. console.log(e)
  91. }
  92. }
  93. sockets = sockets.filter(s => s !== ws)
  94. broadcast({func:'getUserCount', value: sockets.length})
  95. });
  96. app.use(express.static('public'));
  97. //start our server
  98. server.listen(process.env.PORT || 8999, () => {
  99. console.log(`Server started on port ${server.address().port} :)`);
  100. });