const express = require('express'); const http = require( 'http'); const WebSocket = require('ws'); const cors = require('cors'); const app = express(); //initialize a simple http server const server = http.createServer(app); //initialize the WebSocket server instance const wss = new WebSocket.Server({ server }); const messages = []; var sockets = []; /** * openPromise returns promise with resolve and reject callbacks available in object * * @access public * @return promise */ function openPromise(){ let resolve, reject; let np = new Promise((ok, fail) => {resolve = ok; reject = fail}); np.resolve = resolve; np.reject = reject; return np } /** * asynchronize returns generator of promises of chunks or messages * * @param s - Stream or Socket to generate promises * @param chunkEventName="message" stream event for data chunk or message * Use 'message' for ws or `data` for node Streams * @param endEventName="close" stream event for end/close of stream/socket * @access public * @return function* */ function asynchronize({s, chunkEventName="message", endEventName="close"}){ return function*(){ //buffer of chunks incoming from stream const chunks = {}; //buffer of promises yielding outside const promises = {}; const clear = i => (delete chunks[i], delete promises[i]) let chunkCount = 0; //autoincrement of chunks let promiseCount = 0; //autoincrement of promises let end = false; //flag //check availability of chunk and promise. If any, resolve promise, and clear both from queue const chunkAndPromise = i => (i in chunks) && (i in promises) && ( promises[i].resolve(chunks[i]), clear(i)) s.on(chunkEventName, data => { chunks[chunkCount] = data chunkAndPromise(chunkCount) chunkCount++ }) //when end of stream becomes s.on(endEventName, () => { end = true; //all pending promises should be rejected for (let i in promises){ promises[i].reject(new Error('End Of S')) } }) while (!end){ let p; promises[promiseCount] = p = openPromise(); chunkAndPromise(promiseCount) promiseCount++; yield p; //yield promise outside } } } /** * broadcast sends same message to socket list * * @param data message to send * @access public * @return undefined */ function broadcast(data){ for (s of sockets){ console.log(data) s.send(JSON.stringify(data)) } } /** * Collection of remote available functions */ let RPC = { /** * addMessage - saves message in history, broadcast it then returns length of history */ addMessage({nick, message}){ messages.push({nick, message, timestamp: (new Date).getTime()}) broadcast({func: 'addMessage', nick, message}) return messages.length; }, /** * getMessages - returns messages younger than offset */ getMessages({offset=0}){ return messages.slice(offset) }, /** * getUserCount - returns length of socket array (current online users) */ getUserCount(){ return sockets.length; } } wss.on('connection', async ws => { let gena = asynchronize({s:ws}) sockets.push(ws) //broadcast new usercount after connect broadcast({func:'getUserCount', value: sockets.length}) //main loop, iterating incoming messages for (let p of gena()){ try { let message = await p; let data = JSON.parse(message) if (data.func in RPC){ //run function from RPC if any, and send to socket result of it ws.send(JSON.stringify(RPC[data.func](data))) } } catch(e){ //catching known rejection if (e.message === 'End Of S'){ console.log('client OTPAL') //break loop when connection closed break; } // otherwise print error, but it should be throw e there console.log(e) } } //clear closed socket from list of sockets sockets = sockets.filter(s => s !== ws) //broadcast new usercount after disconnect broadcast({func:'getUserCount', value: sockets.length}) }); app.use(express.static('public')); app.use(cors()); //start our server server.listen(process.env.PORT || 8999, () => { console.log(`Server started on port ${server.address().port} :)`); });