Compare commits
4 Commits
4667563094
...
6954969e6e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6954969e6e | ||
|
|
765ca6011e | ||
|
|
10ceebd1e6 | ||
|
|
9e9d3f35aa |
2
.env.example
Normal file
2
.env.example
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
# connection string for mongodb in the form of "mongodb://user:password@host:port/db"
|
||||||
|
MONGO_CONN_STR=mongodb://timo:bert@127.0.0.1:27017/testSuite
|
||||||
6
.mocharc.js
Normal file
6
.mocharc.js
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
import { readFileSync } from "fs";
|
||||||
|
|
||||||
|
try {
|
||||||
|
const envFile = readFileSync('./.env', { encoding: 'utf8' });
|
||||||
|
process.env.MONGO_TEST_CONN_STR = envFile.split('MONGO_TEST_CONN_STR=')[1].split('\n')[0];
|
||||||
|
} catch (e) {}
|
||||||
91
index.js
91
index.js
@ -1,88 +1,7 @@
|
|||||||
import { WebSocketServer } from "ws";
|
import * as ws from './lib/ws.js';
|
||||||
import { MongoClient, ObjectId } from "mongodb";
|
import * as db from './lib/db.js';
|
||||||
|
|
||||||
const mongo = new MongoClient(process.env.MONGO_CONN_STR);
|
let myDb = db.init(process.env.MONGO_CONN_STR)
|
||||||
const db = mongo.db();
|
.then(dbFuncs => {
|
||||||
const collections = {};
|
ws.init({ port: process.env.WEB_SOCKET_PORT }, dbFuncs);
|
||||||
|
|
||||||
const wss = new WebSocketServer({ port: 7130 });
|
|
||||||
const connections = [];
|
|
||||||
const subscriptions = [];
|
|
||||||
|
|
||||||
async function executeSet(message, ws) {
|
|
||||||
let subject = collections[message.subject];
|
|
||||||
if (!subject) {
|
|
||||||
subject = collections[message.subject] = db.collection(message.subject);
|
|
||||||
}
|
|
||||||
let newValue = { $set: {} };
|
|
||||||
newValue.$set[message.field + ".modifiedAt"] = Date.now();
|
|
||||||
newValue.$set[message.field + ".modifiedBy"] = "anonymous";
|
|
||||||
newValue.$set[message.field + ".transientValue"] = message.value;
|
|
||||||
|
|
||||||
await subject.updateMany(message.filter, newValue, { upsert: true });
|
|
||||||
}
|
|
||||||
|
|
||||||
wss.on("connection", function connection(ws) {
|
|
||||||
connections.push(ws);
|
|
||||||
console.log("accepted new connection");
|
|
||||||
|
|
||||||
ws.on("error", console.error);
|
|
||||||
ws.on("close", (e) => {
|
|
||||||
console.log("closed a connection");
|
|
||||||
connections.splice(connections.indexOf(ws), 1);
|
|
||||||
});
|
|
||||||
|
|
||||||
ws.on("message", function message(data, isBinary) {
|
|
||||||
console.log("received a new message");
|
|
||||||
console.log("test");
|
|
||||||
|
|
||||||
try {
|
|
||||||
let message = JSON.parse(data);
|
|
||||||
console.log(message);
|
|
||||||
if (
|
|
||||||
message.filter &&
|
|
||||||
message.filter._id &&
|
|
||||||
typeof message.filter._id === "string"
|
|
||||||
) {
|
|
||||||
try {
|
|
||||||
message.filter._id = new ObjectId(message.filter._id);
|
|
||||||
console.log(message.filter._id);
|
|
||||||
} catch (e) {
|
|
||||||
console.error("Invalid ObjectId in filter:", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
console.log("message action is " + message.action);
|
|
||||||
|
|
||||||
switch (message.action) {
|
|
||||||
case "set":
|
|
||||||
executeSet(message, ws).catch(console.error);
|
|
||||||
break;
|
|
||||||
case "subscribe":
|
|
||||||
let subject = collections[message.subject];
|
|
||||||
if (!subject) {
|
|
||||||
subject = collections[message.subject] = db.collection(
|
|
||||||
message.subject
|
|
||||||
);
|
|
||||||
}
|
|
||||||
subject
|
|
||||||
.find(message.filter, {})
|
|
||||||
.toArray()
|
|
||||||
.then((r) => {
|
|
||||||
ws.send(
|
|
||||||
JSON.stringify({
|
|
||||||
subject: message.subject,
|
|
||||||
field: message.field,
|
|
||||||
filter: message.filter,
|
|
||||||
value: r.map((v) => v[message.field]),
|
|
||||||
})
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
console.error("error while processing message", e);
|
|
||||||
ws.send(JSON.stringify(e));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|||||||
50
lib/db.js
Normal file
50
lib/db.js
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
import { MongoClient } from 'mongodb';
|
||||||
|
|
||||||
|
let mongo;
|
||||||
|
let db;
|
||||||
|
let functionCollection = {};
|
||||||
|
|
||||||
|
export function init(connectionString) {
|
||||||
|
try {
|
||||||
|
mongo = new MongoClient(connectionString);
|
||||||
|
db = mongo.db();
|
||||||
|
return mongo.connect().then(() => Promise.resolve(functionCollection));
|
||||||
|
} catch (e) {
|
||||||
|
return Promise.reject(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
functionCollection.subscribe = function subscribe (collectionName, filter) {
|
||||||
|
let collection = db.collection(collectionName);
|
||||||
|
return collection
|
||||||
|
.find(filter, {})
|
||||||
|
.toArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
functionCollection.persist = function persist (collectionName, filter, fieldName, value) {
|
||||||
|
let collection = db.collection(collectionName);
|
||||||
|
let newValue = { $set: {} };
|
||||||
|
newValue.$set[fieldName + ".modifiedAt"] = Date.now();
|
||||||
|
newValue.$set[fieldName + ".modifiedBy"] = "anonymous";
|
||||||
|
newValue.$set[fieldName + ".transientValue"] = value;
|
||||||
|
|
||||||
|
return collection.updateMany(filter, newValue, { upsert: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
functionCollection.publish = function publish (b) {
|
||||||
|
console.log('publish')
|
||||||
|
}
|
||||||
|
|
||||||
|
functionCollection.close = function close () {
|
||||||
|
return mongo.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
functionCollection.purge = function purge (collectionNames) {
|
||||||
|
let promises = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < collectionNames.length; i++) {
|
||||||
|
const collection = db.collection(collectionNames[i]);
|
||||||
|
promises.push(collection.drop());
|
||||||
|
}
|
||||||
|
return Promise.all(promises);
|
||||||
|
}
|
||||||
35
lib/ws.js
Normal file
35
lib/ws.js
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import { WebSocketServer } from 'ws';
|
||||||
|
|
||||||
|
let wss;
|
||||||
|
let connections = [];
|
||||||
|
|
||||||
|
export function init(cfg, db) {
|
||||||
|
wss = new WebSocketServer(cfg);
|
||||||
|
console.log(cfg);
|
||||||
|
wss.on('connection', ws => {
|
||||||
|
console.log('new connection')
|
||||||
|
connections.push(ws);
|
||||||
|
|
||||||
|
ws.on('error', console.error);
|
||||||
|
ws.on('close', (e) => {
|
||||||
|
connections.splice(connections.indexOf(ws), 1);
|
||||||
|
});
|
||||||
|
ws.on('message', (data, isBinary) => {
|
||||||
|
try {
|
||||||
|
let message = JSON.parse(data);
|
||||||
|
|
||||||
|
switch (message.action) {
|
||||||
|
case "set":
|
||||||
|
break;
|
||||||
|
case "subscribe":
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw 'invalid action "' + message.action + '"';
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return wss;
|
||||||
|
}
|
||||||
@ -4,8 +4,10 @@
|
|||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "echo \"Error: no test specified\" && exit 1",
|
"test": "mocha",
|
||||||
"start": "node --env-file=.env index.js"
|
"testWatch": "mocha -w",
|
||||||
|
"start": "node --env-file=.env index.js",
|
||||||
|
"watch": "node --watch --env-file=.env index.js"
|
||||||
},
|
},
|
||||||
"author": "",
|
"author": "",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
@ -13,5 +15,8 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"mongodb": "^6.13.0",
|
"mongodb": "^6.13.0",
|
||||||
"ws": "^8.18.0"
|
"ws": "^8.18.0"
|
||||||
|
},
|
||||||
|
"devDependencies": {
|
||||||
|
"mocha": "^11.1.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
74
test/db.test.js
Normal file
74
test/db.test.js
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import { before } from 'mocha';
|
||||||
|
import * as db from '../lib/db.js';
|
||||||
|
import * as assert from 'assert/strict';
|
||||||
|
|
||||||
|
const rng = Math.random();
|
||||||
|
|
||||||
|
describe('lib/db', function () {
|
||||||
|
let myDb;
|
||||||
|
let dbFuncs;
|
||||||
|
|
||||||
|
describe('#init()', function () {
|
||||||
|
it('should fail to connect given invalid connection string', function () {
|
||||||
|
return db.init('not a connection string').then(() => Promise.reject()).catch(() => Promise.resolve());
|
||||||
|
});
|
||||||
|
it('should successfully connect to mongodb', function () {
|
||||||
|
return myDb = db.init(process.env.MONGO_TEST_CONN_STR);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
describe('#close()', function () {
|
||||||
|
it('should close the connection', function () {
|
||||||
|
return myDb.then(db => db.close());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('with a established connection ->', function () {
|
||||||
|
before(function () {
|
||||||
|
myDb = db.init(process.env.MONGO_TEST_CONN_STR)
|
||||||
|
.then(funcs => dbFuncs = funcs);
|
||||||
|
});
|
||||||
|
|
||||||
|
after(function () {
|
||||||
|
dbFuncs.purge(['readWriteTest'])
|
||||||
|
.then(() => myDb.then(db => db.close()));
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('#persist()', function () {
|
||||||
|
it('should be able to write to mongodb', function () {
|
||||||
|
return dbFuncs.persist('readWriteTest', {}, 'rng', rng);
|
||||||
|
});
|
||||||
|
it('should be able to write a second entry into the same collection based on a filter', function () {
|
||||||
|
return dbFuncs.persist('readWriteTest', { 'rng.transientValue': 2*rng }, 'rng', 2*rng);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('#subscribe()', function () {
|
||||||
|
it('should be able to read from mongodb', function () {
|
||||||
|
return dbFuncs
|
||||||
|
.subscribe('readWriteTest', {})
|
||||||
|
.then(v => {
|
||||||
|
assert.equal(v.length, 2);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
it('should be able to query for specific values from mongodb', function () {
|
||||||
|
return dbFuncs
|
||||||
|
.subscribe('readWriteTest', { 'rng.transientValue': rng })
|
||||||
|
.then(v => {
|
||||||
|
assert.equal(v.length, 1);
|
||||||
|
assert.equal(v[0].rng.transientValue, rng);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
it('should be able to query for specific values from mongodb based on _id', function () {
|
||||||
|
return dbFuncs
|
||||||
|
.subscribe('readWriteTest', { 'rng.transientValue': rng })
|
||||||
|
.then(v => {
|
||||||
|
return dbFuncs.subscribe('readWriteTest', { '_id': v[0]._id });
|
||||||
|
})
|
||||||
|
.then(v => {
|
||||||
|
assert.equal(v.length, 1);
|
||||||
|
assert.equal(v[0].rng.transientValue, rng);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Loading…
x
Reference in New Issue
Block a user