123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- const express = require('express');
- const http = require( 'http');
- const WebSocket = require('ws');
- const cors = require('cors');
- const app = express();
- //initialize a simple http server
- const server = http.createServer(app);
- //initialize the WebSocket server instance
- const wss = new WebSocket.Server({ server });
- const messages = [];
- var sockets = [];
- /**
- * openPromise returns promise with resolve and reject callbacks available in object
- *
- * @access public
- * @return promise
- */
- function openPromise(){
- let resolve, reject;
- let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
- np.resolve = resolve;
- np.reject = reject;
- return np
- }
- /**
- * asynchronize returns generator of promises of chunks or messages
- *
- * @param s - Stream or Socket to generate promises
- * @param chunkEventName="message" stream event for data chunk or message
- * Use 'message' for ws or `data` for node Streams
- * @param endEventName="close" stream event for end/close of stream/socket
- * @access public
- * @return function*
- */
- function asynchronize({s, chunkEventName="message", endEventName="close"}){
- return function*(){
- //buffer of chunks incoming from stream
- const chunks = {};
- //buffer of promises yielding outside
- const promises = {};
- const clear = i => (delete chunks[i], delete promises[i])
- let chunkCount = 0; //autoincrement of chunks
- let promiseCount = 0; //autoincrement of promises
- let end = false; //flag
- //check availability of chunk and promise. If any, resolve promise, and clear both from queue
- const chunkAndPromise = i => (i in chunks) &&
- (i in promises) && (
- promises[i].resolve(chunks[i]),
- clear(i))
- s.on(chunkEventName, data => {
- chunks[chunkCount] = data
- chunkAndPromise(chunkCount)
- chunkCount++
- })
- //when end of stream becomes
- s.on(endEventName, () => {
- end = true;
- //all pending promises should be rejected
- for (let i in promises){
- promises[i].reject(new Error('End Of S'))
- }
- })
- while (!end){
- let p;
- promises[promiseCount] = p = openPromise();
- chunkAndPromise(promiseCount)
- promiseCount++;
- yield p; //yield promise outside
- }
- }
- }
- /**
- * broadcast sends same message to socket list
- *
- * @param data message to send
- * @access public
- * @return undefined
- */
- function broadcast(data){
- for (s of sockets){
- console.log(data)
- s.send(JSON.stringify(data))
- }
- }
- /**
- * Collection of remote available functions
- */
- let RPC = {
- /**
- * addMessage - saves message in history, broadcast it then returns length of history
- */
- addMessage({nick, message}){
- messages.push({nick, message, timestamp: (new Date).getTime()})
- broadcast({func: 'addMessage', nick, message})
- return messages.length;
- },
- /**
- * getMessages - returns messages younger than offset
- */
- getMessages({offset=0}){
- return messages.slice(offset)
- },
- /**
- * getUserCount - returns length of socket array (current online users)
- */
- getUserCount(){
- return sockets.length;
- }
- }
- wss.on('connection', async ws => {
- let gena = asynchronize({s:ws})
- sockets.push(ws)
- //broadcast new usercount after connect
- broadcast({func:'getUserCount', value: sockets.length})
- //main loop, iterating incoming messages
- for (let p of gena()){
- try {
- let message = await p;
- let data = JSON.parse(message)
- if (data.func in RPC){
- //run function from RPC if any, and send to socket result of it
- ws.send(JSON.stringify(RPC[data.func](data)))
- }
- }
- catch(e){
- //catching known rejection
- if (e.message === 'End Of S'){
- console.log('client OTPAL')
- //break loop when connection closed
- break;
- }
- // otherwise print error, but it should be throw e there
- console.log(e)
- }
- }
- //clear closed socket from list of sockets
- sockets = sockets.filter(s => s !== ws)
- //broadcast new usercount after disconnect
- broadcast({func:'getUserCount', value: sockets.length})
- });
- app.use(express.static('public'));
- app.use(cors());
- //start our server
- server.listen(process.env.PORT || 8999, () => {
- console.log(`Server started on port ${server.address().port} :)`);
- });
|