server/index.js
2025-03-01 11:02:23 +01:00

70 lines
2.0 KiB
JavaScript

import { WebSocketServer } from 'ws';
import { MongoClient } from 'mongodb';
const mongo = new MongoClient(process.env.MONGO_CONN_STR);
const db = mongo.db();
const collections = {};
const wss = new WebSocketServer({ port: 7130 });
const connections = [];
const subscriptions = [];
async function executeSet(message, ws) {
let subject = collections[message.subject];
if (!subject) {
subject = collections[message.subject] = db.collection(message.subject);
}
let newValue = {$set: {}};
newValue.$set[message.field + '.modifiedAt'] = Date.now();
newValue.$set[message.field + '.modifiedBy'] = 'anonymous';
newValue.$set[message.field + '.transientValue'] = message.value;
await subject.updateMany(message.filter, newValue, { upsert: true })
}
wss.on('connection', function connection(ws) {
connections.push(ws);
console.log('accepted new connection');
ws.on('error', console.error);
ws.on('close', e => {
console.log('closed a connection');
connections.splice(connections.indexOf(ws), 1);
});
ws.on('message', function message(data, isBinary) {
console.log('received a new message');
try {
let message = JSON.parse(data);
console.log('message action is ' + message.action);
switch (message.action) {
case 'set':
executeSet(message, ws).catch(console.error);
break;
case 'subscribe':
let subject = collections[message.subject];
if (!subject) {
subject = collections[message.subject] = db.collection(message.subject);
}
subject.find(message.filter, {}).toArray().then(r => {
ws.send(JSON.stringify({
subject: message.subject,
field: message.field,
filter: message.filter,
value: r.map(v => v[message.field])
}));
});
break;
}
} catch (e) {
console.error('error while processing message', e);
ws.send(JSON.stringify(e));
}
});
});