Skip to content
Snippets Groups Projects
Commit 33b7f02b authored by Reiter, Christoph's avatar Reiter, Christoph :snake:
Browse files

Add an event bus implementation

This generalizes the features of the events module and should replace it in the long run.

It allows simple pub/sub and retained events without a centralized instance.
parent 257c7c61
No related branches found
No related tags found
No related merge requests found
const OPERATION_FETCH_RETAINED = 'fetch-retained';
export function checkIndentifier(name, allowEmpty=false) {
if (name.length === 0 && allowEmpty)
return;
// we are strict here, so we can used special characters to extend the format later on
if (!/^[a-z]+[a-z0-9-]*$/.test(name)) {
throw new Error('Only a-z0-9 and - allowed: ' + JSON.stringify(name));
}
}
export function createEventName(busName, eventName, operation) {
checkIndentifier(busName, true);
checkIndentifier(eventName);
let result = 'vpu' + ':' + busName + ':' + eventName;
if (operation !== undefined) {
checkIndentifier(operation);
result += ':' + operation;
}
return result;
}
/**
* An event bus implementation which doesn't depend on a global bus instance and supports retained messages
* (similar to MQTT retained messages)
*/
export class EventBus
{
/**
* @param {object} options
* @param {string} options.name The bus name, events will only be visible on the same bus
*/
constructor(options={}) {
const {name = ''} = options;
this._busName = name;
this._retainedData = {};
this._retainedListeners = {};
this._listeners = {};
}
_name(name, operation) {
return createEventName(this._busName, name, operation);
}
/**
* Subscribe to an event. Note that this will immediately trigger the callback in case there exists a
* retained event on the bus.
*
* @param {string} name The event name
* @param {Function} callback The callback to call in case the event is received
*/
subscribe(name, callback) {
const listeners = this._listeners[name] || new Map();
if (listeners.has(callback)) {
throw new Error('already subscribed to: ' + JSON.stringify(name));
}
const eventHandler = (e) => {
const meta = {};
const detail = e.detail;
if (detail.retain !== undefined)
meta.retain = detail.retain;
callback(detail.data, meta);
e.preventDefault();
};
window.addEventListener(this._name(name), eventHandler);
this._listeners[name] = listeners.set(callback, eventHandler);
const fetchEvent = new CustomEvent(this._name(name, OPERATION_FETCH_RETAINED), {
detail: {callback: eventHandler}
});
window.dispatchEvent(fetchEvent);
}
/**
* Unsubscribe from an event given the name and callback used for subscribing.
*
* @param {string} name The event name
* @param {Function} callback The callback used when calling subscribe()
*/
unsubscribe(name, callback) {
const listeners = this._listeners[name] || new Map();
const eventHandler = listeners.get(callback);
if (eventHandler === undefined) {
throw new Error("Not subscribed to: " + name);
}
window.removeEventListener(this._name(name), eventHandler);
listeners.delete(callback);
}
/**
* Publish a value for an event name. Set the retained flag to send the event also to future subscribers.
*
* @param {string} name
* @param {any} data
* @param {object} options
* @param {boolean} options.retain If the event should be retained i.e. send to all future subscribers as well
* @returns {boolean} If the event was handled by at least one bus member.
*/
publish(name, data, options={}) {
const {retain = false} = options;
const eventName = this._name(name);
if (retain && this._retainedListeners[name] === undefined) {
const retainedEventHandler = (e) => {
const data = this._retainedData[name];
if (data !== undefined) {
const callback = e.detail['callback'];
e.stopImmediatePropagation();
const event = new CustomEvent(eventName, {detail: {data: data, retain: retain}});
callback(event);
}
};
window.addEventListener(this._name(name, OPERATION_FETCH_RETAINED), retainedEventHandler);
const eventHandler = (e) => {
const detail = e.detail;
if (detail.retain) {
this._retainedData[name] = detail.data;
}
};
window.addEventListener(eventName, eventHandler);
this._retainedListeners[name] = [retainedEventHandler, eventHandler];
}
const event = new CustomEvent(eventName, {detail: {data: data, retain: retain}, cancelable: true});
return !window.dispatchEvent(event);
}
/**
* Cleans up all subscriptions, retained messages and other event handlers.
*
* This _needs_ to be called for every bus instance at some point.
* Otherwise the callbacks will stay alive forever and will leak.
*/
close() {
for (const [name, funcs] of Object.entries(this._retainedListeners)) {
const [retainedHandler, handler] = funcs;
window.removeEventListener(this._name(name, OPERATION_FETCH_RETAINED), retainedHandler);
window.removeEventListener(this._name(name), handler);
delete this._retainedListeners[name];
delete this._retainedData[name];
}
for (const [name, callbacks] of Object.entries(this._listeners)) {
for (const callback of callbacks.keys()) {
this.unsubscribe(name, callback);
}
}
}
}
// dummy entry point import {EventBus} from './eventbus.js';
export {EventBus};
\ No newline at end of file
import {assert} from 'chai';
import {EventBus, createEventName, checkIndentifier} from '../eventbus.js';
suite('helpers', () => {
test('createEventName', () => {
assert.equal(createEventName('foo', 'bar'), 'vpu:foo:bar');
assert.equal(createEventName('', 'bar'), 'vpu::bar');
assert.equal(createEventName('foo', 'bar', 'baz'), 'vpu:foo:bar:baz');
});
test('checkIndentifier', () => {
const ok = ['foo', 'bar', 'a123'];
const notOk = ['', 'foo bar', '123', 'a_', 'b:', ':'];
checkIndentifier('', true);
for(let key of ok) {
checkIndentifier(key);
}
for(let key of notOk) {
assert.throws(() => {
checkIndentifier(key);
});
}
});
});
suite('events', () => {
test('basics', () => {
const bus = new EventBus();
bus.close();
});
test('pub sub', () => {
const bus = new EventBus();
let called = false;
bus.subscribe("foo", (data) => {
called = true;
assert.deepEqual(data, 42);
});
const res = bus.publish("foo", 42);
assert.isTrue(called);
assert.isTrue(res);
bus.close();
});
test('no handler', () => {
const bus = new EventBus();
const res = bus.publish("foo", 42);
assert.isFalse(res);
bus.close();
});
test('no event after unsubscribe', () => {
const bus = new EventBus();
let called = false;
const func = () => {
called = true;
};
bus.subscribe("foo", func);
bus.unsubscribe("foo", func);
const res = bus.publish("foo", 42);
assert.isFalse(res);
assert.isFalse(called);
bus.close();
});
test('retained', () => {
const bus = new EventBus();
let calledData = null;
const func = (data, meta) => {
calledData = {data: data, meta: meta};
};
bus.subscribe("foo", func);
let res = bus.publish("foo", 42, {retain: true});
assert.isTrue(res);
assert.equal(calledData.data, 42);
assert.isTrue(calledData.meta.retain);
calledData = null;
res = bus.publish("foo", 24);
assert.isTrue(res);
assert.equal(calledData.data, 24);
assert.isFalse(calledData.meta.retain);
bus.unsubscribe("foo", func);
calledData = null;
bus.subscribe("foo", func);
assert.equal(calledData.data, 42);
assert.isTrue(calledData.meta.retain);
bus.close();
});
test('multiple busses', () => {
const bus = new EventBus();
const bus2 = new EventBus();
let called = false;
const func = () => {
called = true;
};
bus.subscribe("foo", func);
const res = bus2.publish("foo", 42);
assert.isTrue(res);
assert.isTrue(called);
bus2.close();
bus.close();
});
test('multiple retain conflict', () => {
const bus = new EventBus();
bus.publish("foo", 42, {retain: true});
const bus2 = new EventBus();
bus.publish("foo", 24, {retain: true});
const bus3 = new EventBus();
let calledData = null;
let callCount = 0;
const func = (data, meta) => {
callCount++;
calledData = {data: data, meta: meta};
};
bus3.subscribe("foo", func);
assert.equal(callCount, 1);
assert.deepEqual(calledData.data, 24);
assert.isTrue(calledData.meta.retain);
bus3.close();
bus2.close();
bus.close();
});
});
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment