From 6954969e6e44103ca11a06c82354c699f059d117 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 31 Mar 2025 14:17:29 +0200 Subject: [PATCH] first real commit --- index.js | 93 ++----------------------------------------------- lib/ws.js | 34 ++++++++++++++++-- test/db.test.js | 21 +++++++++++ 3 files changed, 55 insertions(+), 93 deletions(-) diff --git a/index.js b/index.js index 05f72f4..fe783b2 100644 --- a/index.js +++ b/index.js @@ -1,94 +1,7 @@ import * as ws from './lib/ws.js'; import * as db from './lib/db.js'; -let myWs = ws.init({ port: process.env.WEB_SOCKET_PORT }); -let myDb = db.init(process.env.MONGO_CONN_STR); - - -/* -const mongo = new MongoClient(process.env.MONGO_CONN_STR); -const db = mongo.db(); -const collections = {}; - -const wss = new WebSocketServer({ port: 7130 }); -const connections = []; -const subscriptions = []; - -async function executeSet(message, ws) { - let subject = collections[message.subject]; - if (!subject) { - subject = collections[message.subject] = db.collection(message.subject); - } - let newValue = { $set: {} }; - newValue.$set[message.field + ".modifiedAt"] = Date.now(); - newValue.$set[message.field + ".modifiedBy"] = "anonymous"; - newValue.$set[message.field + ".transientValue"] = message.value; - - await subject.updateMany(message.filter, newValue, { upsert: true }); -} - -wss.on("connection", function connection(ws) { - connections.push(ws); - console.log("accepted new connection"); - - ws.on("error", console.error); - ws.on("close", (e) => { - console.log("closed a connection"); - connections.splice(connections.indexOf(ws), 1); +let myDb = db.init(process.env.MONGO_CONN_STR) + .then(dbFuncs => { + ws.init({ port: process.env.WEB_SOCKET_PORT }, dbFuncs); }); - - ws.on("message", function message(data, isBinary) { - console.log("received a new message"); - console.log("test"); - - try { - let message = JSON.parse(data); - console.log(message); - if ( - message.filter && - message.filter._id && - typeof message.filter._id === "string" - ) { - try { - message.filter._id = new ObjectId(message.filter._id); - console.log(message.filter._id); - } catch (e) { - console.error("Invalid ObjectId in filter:", e); - } - } - console.log("message action is " + message.action); - - switch (message.action) { - case "set": - executeSet(message, ws).catch(console.error); - break; - case "subscribe": - let subject = collections[message.subject]; - if (!subject) { - subject = collections[message.subject] = db.collection( - message.subject - ); - } - subject - .find(message.filter, {}) - .toArray() - .then((r) => { - ws.send( - JSON.stringify({ - subject: message.subject, - field: message.field, - filter: message.filter, - value: r.map((v) => v[message.field]), - }) - ); - }); - - break; - } - } catch (e) { - console.error("error while processing message", e); - ws.send(JSON.stringify(e)); - } - }); -}); -*/ \ No newline at end of file diff --git a/lib/ws.js b/lib/ws.js index ef3e493..55e0d14 100644 --- a/lib/ws.js +++ b/lib/ws.js @@ -1,7 +1,35 @@ -import { WebSocketServer } from "ws"; +import { WebSocketServer } from 'ws'; let wss; +let connections = []; -export function init(cfg) { +export function init(cfg, db) { wss = new WebSocketServer(cfg); -} \ No newline at end of file + console.log(cfg); + wss.on('connection', ws => { + console.log('new connection') + connections.push(ws); + + ws.on('error', console.error); + ws.on('close', (e) => { + connections.splice(connections.indexOf(ws), 1); + }); + ws.on('message', (data, isBinary) => { + try { + let message = JSON.parse(data); + + switch (message.action) { + case "set": + break; + case "subscribe": + break; + default: + throw 'invalid action "' + message.action + '"'; + } + } catch (e) { + console.error(e); + } + }); + }); + return wss; +} diff --git a/test/db.test.js b/test/db.test.js index 27bd3c4..25776a2 100644 --- a/test/db.test.js +++ b/test/db.test.js @@ -37,12 +37,33 @@ describe('lib/db', function () { it('should be able to write to mongodb', function () { return dbFuncs.persist('readWriteTest', {}, 'rng', rng); }); + it('should be able to write a second entry into the same collection based on a filter', function () { + return dbFuncs.persist('readWriteTest', { 'rng.transientValue': 2*rng }, 'rng', 2*rng); + }); }); describe('#subscribe()', function () { it('should be able to read from mongodb', function () { return dbFuncs .subscribe('readWriteTest', {}) + .then(v => { + assert.equal(v.length, 2); + }); + }); + it('should be able to query for specific values from mongodb', function () { + return dbFuncs + .subscribe('readWriteTest', { 'rng.transientValue': rng }) + .then(v => { + assert.equal(v.length, 1); + assert.equal(v[0].rng.transientValue, rng); + }); + }); + it('should be able to query for specific values from mongodb based on _id', function () { + return dbFuncs + .subscribe('readWriteTest', { 'rng.transientValue': rng }) + .then(v => { + return dbFuncs.subscribe('readWriteTest', { '_id': v[0]._id }); + }) .then(v => { assert.equal(v.length, 1); assert.equal(v[0].rng.transientValue, rng);