66 lines
1.6 KiB
JavaScript
66 lines
1.6 KiB
JavaScript
import { WebSocketServer } from 'ws';
|
|
import { ObjectId } from 'mongodb';
|
|
|
|
let wss;
|
|
let connections = [];
|
|
|
|
function replaceIdInFilter (filter) {
|
|
if (filter) {
|
|
for (const key in filter) {
|
|
if (key == '_id' && typeof filter[key] == 'string') {
|
|
filter[key] = new ObjectId(filter[key]);
|
|
} else if (typeof filter[key] == 'object') {
|
|
filter[key] = replaceIdInFilter(filter[key]);
|
|
}
|
|
}
|
|
}
|
|
return filter;
|
|
}
|
|
|
|
export function init(cfg, db) {
|
|
wss = new WebSocketServer(cfg);
|
|
|
|
wss.on('connection', ws => {
|
|
connections.push(ws);
|
|
|
|
ws.on('error', console.error);
|
|
ws.on('close', (e) => {
|
|
connections.splice(connections.indexOf(ws), 1);
|
|
});
|
|
ws.on('message', (data, isBinary) => {
|
|
try {
|
|
let message = JSON.parse(data);
|
|
|
|
switch (message.action) {
|
|
case "set":
|
|
db.persist(
|
|
message.subject,
|
|
replaceIdInFilter(message.filter),
|
|
message.field,
|
|
message.value
|
|
);
|
|
break;
|
|
case "subscribe":
|
|
db.subscribe(
|
|
message.subject,
|
|
replaceIdInFilter(message.filter)
|
|
).then(r => ws.send(
|
|
JSON.stringify({
|
|
subject: message.subject,
|
|
field: message.field,
|
|
filter: message.filter,
|
|
value: r.map(v => v[message.field]),
|
|
})
|
|
));
|
|
break;
|
|
default:
|
|
throw 'invalid action "' + message.action + '"';
|
|
}
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
});
|
|
});
|
|
return wss;
|
|
}
|