|
@@ -14,11 +14,13 @@ const messages = [];
|
|
|
var sockets = [];
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-function noPromise(){
|
|
|
+/**
|
|
|
+ * openPromise returns promise with resolve and reject callbacks available in object
|
|
|
+ *
|
|
|
+ * @access public
|
|
|
+ * @return promise
|
|
|
+ */
|
|
|
+function openPromise(){
|
|
|
let resolve, reject;
|
|
|
|
|
|
let np = new Promise((ok, fail) => {resolve = ok; reject = fail});
|
|
@@ -29,49 +31,70 @@ function noPromise(){
|
|
|
return np
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+/**
|
|
|
+ * asynchronize returns generator of promises of chunks or messages
|
|
|
+ *
|
|
|
+ * @param s - Stream or Socket to generate promises
|
|
|
+ * @param chunkEventName="message" stream event for data chunk or message
|
|
|
+ * Use 'message' for ws or `data` for node Streams
|
|
|
+ * @param endEventName="close" stream event for end/close of stream/socket
|
|
|
+ * @access public
|
|
|
+ * @return function*
|
|
|
+ */
|
|
|
function asynchronize({s, chunkEventName="message", endEventName="close"}){
|
|
|
return function*(){
|
|
|
+ //buffer of chunks incoming from stream
|
|
|
const chunks = {};
|
|
|
+ //buffer of promises yielding outside
|
|
|
const promises = {};
|
|
|
|
|
|
const clear = i => (delete chunks[i], delete promises[i])
|
|
|
|
|
|
|
|
|
- let chunkCount = 0;
|
|
|
- let promiseCount = 0;
|
|
|
- let end = false;
|
|
|
+ let chunkCount = 0; //autoincrement of chunks
|
|
|
+ let promiseCount = 0; //autoincrement of promises
|
|
|
+ let end = false; //flag
|
|
|
|
|
|
s.on(chunkEventName, data => {
|
|
|
chunks[chunkCount] = data
|
|
|
|
|
|
- if (chunkCount in promises){
|
|
|
+ //if we've gived promise for current chunk
|
|
|
+ if (chunkCount in promises){
|
|
|
+ //resolving it
|
|
|
promises[chunkCount].resolve(chunks[chunkCount])
|
|
|
+ //and remove from both queues
|
|
|
clear(chunkCount)
|
|
|
}
|
|
|
chunkCount++
|
|
|
})
|
|
|
|
|
|
+ //when end of stream becomes
|
|
|
s.on(endEventName, () => {
|
|
|
end = true;
|
|
|
|
|
|
+ //all pending promises should be rejected
|
|
|
for (let i in promises){
|
|
|
promises[i].reject(new Error('End Of S'))
|
|
|
}
|
|
|
})
|
|
|
|
|
|
while (!end){
|
|
|
- let p = noPromise()
|
|
|
+ let p = openPromise()
|
|
|
|
|
|
- if (promiseCount in chunks){
|
|
|
- p.resolve(chunks[promiseCount])
|
|
|
+ if (promiseCount in chunks){ //if chunk for this promise already exists
|
|
|
+ p.resolve(chunks[promiseCount]) //resolve it
|
|
|
+ //and clear this chunk and promise from queues
|
|
|
clear(promiseCount)
|
|
|
}
|
|
|
else {
|
|
|
+ //otherwise save it in buffer
|
|
|
+ //for later chunk or reject if end of stream
|
|
|
promises[promiseCount] = p;
|
|
|
promiseCount++
|
|
|
}
|
|
|
|
|
|
- yield p;
|
|
|
+ yield p; //yield promise outside
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -81,6 +104,13 @@ function asynchronize({s, chunkEventName="message", endEventName="close"}){
|
|
|
|
|
|
|
|
|
|
|
|
+/**
|
|
|
+ * broadcast sends same message to socket list
|
|
|
+ *
|
|
|
+ * @param data message to send
|
|
|
+ * @access public
|
|
|
+ * @return undefined
|
|
|
+ */
|
|
|
function broadcast(data){
|
|
|
for (s of sockets){
|
|
|
console.log(data)
|
|
@@ -89,17 +119,29 @@ function broadcast(data){
|
|
|
}
|
|
|
|
|
|
|
|
|
+/**
|
|
|
+ * Collection of remote available functions
|
|
|
+ */
|
|
|
let RPC = {
|
|
|
+ /**
|
|
|
+ * addMessage - saves message in history, broadcast it then returns length of history
|
|
|
+ */
|
|
|
addMessage({nick, message}){
|
|
|
messages.push({nick, message, timestamp: (new Date).getTime()})
|
|
|
broadcast({func: 'addMessage', nick, message})
|
|
|
return messages.length;
|
|
|
},
|
|
|
|
|
|
+ /**
|
|
|
+ * getMessages - returns messages younger than offset
|
|
|
+ */
|
|
|
getMessages({offset=0}){
|
|
|
return messages.slice(offset)
|
|
|
},
|
|
|
|
|
|
+ /**
|
|
|
+ * getUserCount - returns length of socket array (current online users)
|
|
|
+ */
|
|
|
getUserCount(){
|
|
|
return sockets.length;
|
|
|
}
|
|
@@ -115,28 +157,36 @@ wss.on('connection', async ws => {
|
|
|
|
|
|
sockets.push(ws)
|
|
|
|
|
|
+ //broadcast new usercount after connect
|
|
|
broadcast({func:'getUserCount', value: sockets.length})
|
|
|
|
|
|
+ //main loop, iterating incoming messages
|
|
|
for (let p of gena()){
|
|
|
try {
|
|
|
let message = await p;
|
|
|
let data = JSON.parse(message)
|
|
|
|
|
|
if (data.func in RPC){
|
|
|
+ //run function from RPC if any, and send to socket result of it
|
|
|
ws.send(JSON.stringify(RPC[data.func](data)))
|
|
|
}
|
|
|
}
|
|
|
catch(e){
|
|
|
+ //catching known rejection
|
|
|
if (e.message === 'End Of S'){
|
|
|
console.log('client OTPAL')
|
|
|
+ //break loop when connection closed
|
|
|
break;
|
|
|
}
|
|
|
+ // otherwise print error, but it should be throw e there
|
|
|
console.log(e)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ //clear closed socket from list of sockets
|
|
|
sockets = sockets.filter(s => s !== ws)
|
|
|
|
|
|
+ //broadcast new usercount after disconnect
|
|
|
broadcast({func:'getUserCount', value: sockets.length})
|
|
|
});
|
|
|
|