diff --git a/packages/common/eventbus.js b/packages/common/eventbus.js new file mode 100644 index 0000000000000000000000000000000000000000..468820fe364028b6d78747e9e7a54190f4f5dced --- /dev/null +++ b/packages/common/eventbus.js @@ -0,0 +1,155 @@ + +const OPERATION_FETCH_RETAINED = 'fetch-retained'; + +export function checkIndentifier(name, allowEmpty=false) { + if (name.length === 0 && allowEmpty) + return; + // we are strict here, so we can used special characters to extend the format later on + if (!/^[a-z]+[a-z0-9-]*$/.test(name)) { + throw new Error('Only a-z0-9 and - allowed: ' + JSON.stringify(name)); + } +} + +export function createEventName(busName, eventName, operation) { + checkIndentifier(busName, true); + checkIndentifier(eventName); + let result = 'vpu' + ':' + busName + ':' + eventName; + if (operation !== undefined) { + checkIndentifier(operation); + result += ':' + operation; + } + return result; +} + +/** + * An event bus implementation which doesn't depend on a global bus instance and supports retained messages + * (similar to MQTT retained messages) + */ +export class EventBus +{ + /** + * @param {object} options + * @param {string} options.name The bus name, events will only be visible on the same bus + */ + constructor(options={}) { + const {name = ''} = options; + this._busName = name; + this._retainedData = {}; + this._retainedListeners = {}; + this._listeners = {}; + } + + _name(name, operation) { + return createEventName(this._busName, name, operation); + } + + /** + * Subscribe to an event. Note that this will immediately trigger the callback in case there exists a + * retained event on the bus. + * + * @param {string} name The event name + * @param {Function} callback The callback to call in case the event is received + */ + subscribe(name, callback) { + const listeners = this._listeners[name] || new Map(); + + if (listeners.has(callback)) { + throw new Error('already subscribed to: ' + JSON.stringify(name)); + } + + const eventHandler = (e) => { + const meta = {}; + const detail = e.detail; + if (detail.retain !== undefined) + meta.retain = detail.retain; + callback(detail.data, meta); + e.preventDefault(); + }; + + window.addEventListener(this._name(name), eventHandler); + + this._listeners[name] = listeners.set(callback, eventHandler); + + const fetchEvent = new CustomEvent(this._name(name, OPERATION_FETCH_RETAINED), { + detail: {callback: eventHandler} + }); + window.dispatchEvent(fetchEvent); + } + + /** + * Unsubscribe from an event given the name and callback used for subscribing. + * + * @param {string} name The event name + * @param {Function} callback The callback used when calling subscribe() + */ + unsubscribe(name, callback) { + const listeners = this._listeners[name] || new Map(); + const eventHandler = listeners.get(callback); + if (eventHandler === undefined) { + throw new Error("Not subscribed to: " + name); + } + window.removeEventListener(this._name(name), eventHandler); + listeners.delete(callback); + } + + /** + * Publish a value for an event name. Set the retained flag to send the event also to future subscribers. + * + * @param {string} name + * @param {any} data + * @param {object} options + * @param {boolean} options.retain If the event should be retained i.e. send to all future subscribers as well + * @returns {boolean} If the event was handled by at least one bus member. + */ + publish(name, data, options={}) { + const {retain = false} = options; + const eventName = this._name(name); + + if (retain && this._retainedListeners[name] === undefined) { + const retainedEventHandler = (e) => { + const data = this._retainedData[name]; + if (data !== undefined) { + const callback = e.detail['callback']; + e.stopImmediatePropagation(); + const event = new CustomEvent(eventName, {detail: {data: data, retain: retain}}); + callback(event); + } + }; + window.addEventListener(this._name(name, OPERATION_FETCH_RETAINED), retainedEventHandler); + + const eventHandler = (e) => { + const detail = e.detail; + if (detail.retain) { + this._retainedData[name] = detail.data; + } + }; + window.addEventListener(eventName, eventHandler); + + this._retainedListeners[name] = [retainedEventHandler, eventHandler]; + } + + const event = new CustomEvent(eventName, {detail: {data: data, retain: retain}, cancelable: true}); + return !window.dispatchEvent(event); + } + + /** + * Cleans up all subscriptions, retained messages and other event handlers. + * + * This _needs_ to be called for every bus instance at some point. + * Otherwise the callbacks will stay alive forever and will leak. + */ + close() { + for (const [name, funcs] of Object.entries(this._retainedListeners)) { + const [retainedHandler, handler] = funcs; + window.removeEventListener(this._name(name, OPERATION_FETCH_RETAINED), retainedHandler); + window.removeEventListener(this._name(name), handler); + delete this._retainedListeners[name]; + delete this._retainedData[name]; + } + for (const [name, callbacks] of Object.entries(this._listeners)) { + for (const callback of callbacks.keys()) { + this.unsubscribe(name, callback); + } + } + } +} diff --git a/packages/common/index.js b/packages/common/index.js index d561d965b310859e8ac720ea7e2ac7fe3f70bed0..cb63e2ac3aeb88e0edbca2d85336e3c9389710d5 100644 --- a/packages/common/index.js +++ b/packages/common/index.js @@ -1 +1,3 @@ -// dummy entry point +import {EventBus} from './eventbus.js'; + +export {EventBus}; \ No newline at end of file diff --git a/packages/common/test/eventbus.js b/packages/common/test/eventbus.js new file mode 100644 index 0000000000000000000000000000000000000000..cf6d94030ba693c511a77bb58b6bd573e5221bc2 --- /dev/null +++ b/packages/common/test/eventbus.js @@ -0,0 +1,149 @@ +import {assert} from 'chai'; +import {EventBus, createEventName, checkIndentifier} from '../eventbus.js'; + +suite('helpers', () => { + test('createEventName', () => { + assert.equal(createEventName('foo', 'bar'), 'vpu:foo:bar'); + assert.equal(createEventName('', 'bar'), 'vpu::bar'); + assert.equal(createEventName('foo', 'bar', 'baz'), 'vpu:foo:bar:baz'); + }); + + test('checkIndentifier', () => { + const ok = ['foo', 'bar', 'a123']; + const notOk = ['', 'foo bar', '123', 'a_', 'b:', ':']; + + checkIndentifier('', true); + + for(let key of ok) { + checkIndentifier(key); + } + + for(let key of notOk) { + assert.throws(() => { + checkIndentifier(key); + }); + } + }); +}); + +suite('events', () => { + test('basics', () => { + const bus = new EventBus(); + bus.close(); + }); + + test('pub sub', () => { + const bus = new EventBus(); + let called = false; + bus.subscribe("foo", (data) => { + called = true; + assert.deepEqual(data, 42); + }); + + const res = bus.publish("foo", 42); + assert.isTrue(called); + assert.isTrue(res); + bus.close(); + }); + + test('no handler', () => { + const bus = new EventBus(); + const res = bus.publish("foo", 42); + assert.isFalse(res); + bus.close(); + }); + + test('no event after unsubscribe', () => { + const bus = new EventBus(); + let called = false; + + const func = () => { + called = true; + }; + bus.subscribe("foo", func); + bus.unsubscribe("foo", func); + + const res = bus.publish("foo", 42); + assert.isFalse(res); + assert.isFalse(called); + bus.close(); + }); + + test('retained', () => { + const bus = new EventBus(); + let calledData = null; + + const func = (data, meta) => { + calledData = {data: data, meta: meta}; + }; + + bus.subscribe("foo", func); + + let res = bus.publish("foo", 42, {retain: true}); + assert.isTrue(res); + + assert.equal(calledData.data, 42); + assert.isTrue(calledData.meta.retain); + + calledData = null; + + res = bus.publish("foo", 24); + assert.isTrue(res); + + assert.equal(calledData.data, 24); + assert.isFalse(calledData.meta.retain); + bus.unsubscribe("foo", func); + + calledData = null; + bus.subscribe("foo", func); + assert.equal(calledData.data, 42); + assert.isTrue(calledData.meta.retain); + + bus.close(); + }); + + test('multiple busses', () => { + const bus = new EventBus(); + const bus2 = new EventBus(); + + let called = false; + + const func = () => { + called = true; + }; + bus.subscribe("foo", func); + + const res = bus2.publish("foo", 42); + assert.isTrue(res); + assert.isTrue(called); + + bus2.close(); + bus.close(); + }); + + test('multiple retain conflict', () => { + const bus = new EventBus(); + bus.publish("foo", 42, {retain: true}); + + const bus2 = new EventBus(); + bus.publish("foo", 24, {retain: true}); + + const bus3 = new EventBus(); + + let calledData = null; + let callCount = 0; + const func = (data, meta) => { + callCount++; + calledData = {data: data, meta: meta}; + }; + bus3.subscribe("foo", func); + + assert.equal(callCount, 1); + assert.deepEqual(calledData.data, 24); + assert.isTrue(calledData.meta.retain); + + bus3.close(); + bus2.close(); + bus.close(); + }); +}); \ No newline at end of file