import { WebSocketServer } from "ws"; import { MongoClient, ObjectId } from "mongodb"; const mongo = new MongoClient(process.env.MONGO_CONN_STR); const db = mongo.db(); const collections = {}; const wss = new WebSocketServer({ port: 7130 }); const connections = []; const subscriptions = []; async function executeSet(message, ws) { let subject = collections[message.subject]; if (!subject) { subject = collections[message.subject] = db.collection(message.subject); } let newValue = { $set: {} }; newValue.$set[message.field + ".modifiedAt"] = Date.now(); newValue.$set[message.field + ".modifiedBy"] = "anonymous"; newValue.$set[message.field + ".transientValue"] = message.value; await subject.updateMany(message.filter, newValue, { upsert: true }); } wss.on("connection", function connection(ws) { connections.push(ws); console.log("accepted new connection"); ws.on("error", console.error); ws.on("close", (e) => { console.log("closed a connection"); connections.splice(connections.indexOf(ws), 1); }); ws.on("message", function message(data, isBinary) { console.log("received a new message"); try { let message = JSON.parse(data); console.log(message); if ( message.filter && message.filter._id && typeof message.filter._id === "string" ) { try { message.filter._id = new ObjectId(message.filter._id); console.log(message.filter._id); } catch (e) { console.error("Invalid ObjectId in filter:", e); } } console.log("message action is " + message.action); switch (message.action) { case "set": executeSet(message, ws).catch(console.error); break; case "subscribe": let subject = collections[message.subject]; if (!subject) { subject = collections[message.subject] = db.collection( message.subject ); } subject .find(message.filter, {}) .toArray() .then((r) => { ws.send( JSON.stringify({ subject: message.subject, field: message.field, filter: message.filter, value: r.map((v) => v[message.field]), }) ); }); break; } } catch (e) { console.error("error while processing message", e); ws.send(JSON.stringify(e)); } }); });