If-Abfrage für ObjectId-Filterung eingebaut

This commit is contained in:
Timo Reichl 2025-03-30 13:39:12 +02:00
parent 25485dde69
commit 35cfe20062

View File

@ -1,5 +1,5 @@
import { WebSocketServer } from 'ws'; import { WebSocketServer } from "ws";
import { MongoClient } from 'mongodb'; import { MongoClient, ObjectId } from "mongodb";
const mongo = new MongoClient(process.env.MONGO_CONN_STR); const mongo = new MongoClient(process.env.MONGO_CONN_STR);
const db = mongo.db(); const db = mongo.db();
@ -14,56 +14,72 @@ async function executeSet(message, ws) {
if (!subject) { if (!subject) {
subject = collections[message.subject] = db.collection(message.subject); subject = collections[message.subject] = db.collection(message.subject);
} }
let newValue = {$set: {}}; let newValue = { $set: {} };
newValue.$set[message.field + '.modifiedAt'] = Date.now(); newValue.$set[message.field + ".modifiedAt"] = Date.now();
newValue.$set[message.field + '.modifiedBy'] = 'anonymous'; newValue.$set[message.field + ".modifiedBy"] = "anonymous";
newValue.$set[message.field + '.transientValue'] = message.value; newValue.$set[message.field + ".transientValue"] = message.value;
await subject.updateMany(message.filter, newValue, { upsert: true }) await subject.updateMany(message.filter, newValue, { upsert: true });
} }
wss.on('connection', function connection(ws) { wss.on("connection", function connection(ws) {
connections.push(ws); connections.push(ws);
console.log('accepted new connection'); console.log("accepted new connection");
ws.on('error', console.error); ws.on("error", console.error);
ws.on('close', e => { ws.on("close", (e) => {
console.log('closed a connection'); console.log("closed a connection");
connections.splice(connections.indexOf(ws), 1); connections.splice(connections.indexOf(ws), 1);
}); });
ws.on('message', function message(data, isBinary) { ws.on("message", function message(data, isBinary) {
console.log('received a new message'); console.log("received a new message");
if (
message.filter &&
message.filter._id &&
typeof message.filter._id === "string"
) {
try {
message.filter._id = new ObjectId(message.filter._id);
} catch (e) {
console.error("Invalid ObjectId in filter:", e);
}
}
try { try {
let message = JSON.parse(data); let message = JSON.parse(data);
console.log('message action is ' + message.action); console.log("message action is " + message.action);
switch (message.action) { switch (message.action) {
case 'set': case "set":
executeSet(message, ws).catch(console.error); executeSet(message, ws).catch(console.error);
break; break;
case 'subscribe': case "subscribe":
let subject = collections[message.subject]; let subject = collections[message.subject];
if (!subject) { if (!subject) {
subject = collections[message.subject] = db.collection(message.subject); subject = collections[message.subject] = db.collection(
message.subject
);
} }
subject.find(message.filter, {}).toArray().then(r => { subject
ws.send(JSON.stringify({ .find(message.filter, {})
.toArray()
.then((r) => {
ws.send(
JSON.stringify({
subject: message.subject, subject: message.subject,
field: message.field, field: message.field,
filter: message.filter, filter: message.filter,
value: r.map(v => v[message.field]) value: r.map((v) => v[message.field]),
})); })
);
}); });
break; break;
} }
} catch (e) { } catch (e) {
console.error('error while processing message', e); console.error("error while processing message", e);
ws.send(JSON.stringify(e)); ws.send(JSON.stringify(e));
} }
}); });
}); });