import { WebSocketServer } from 'ws'; import { ObjectId } from 'mongodb'; let wss; let connections = []; function replaceIdInFilter (filter) { if (filter) { for (const key in filter) { if (key == '_id' && typeof filter[key] == 'string') { filter[key] = new ObjectId(filter[key]); } else if (typeof filter[key] == 'object') { filter[key] = replaceIdInFilter(filter[key]); } } } return filter; } export function init(cfg, db) { wss = new WebSocketServer(cfg); wss.on('connection', ws => { connections.push(ws); ws.on('error', console.error); ws.on('close', (e) => { connections.splice(connections.indexOf(ws), 1); }); ws.on('message', (data, isBinary) => { try { let message = JSON.parse(data); switch (message.action) { case "set": db.persist( message.subject, replaceIdInFilter(message.filter), message.field, message.value ); break; case "subscribe": db.subscribe( message.subject, replaceIdInFilter(message.filter) ).then(r => ws.send( JSON.stringify({ subject: message.subject, field: message.field, filter: message.filter, value: r.map(v => v[message.field]), }) )); break; default: throw 'invalid action "' + message.action + '"'; } } catch (e) { console.error(e); } }); }); return wss; }