import EventEmitter from 'node:events'
import { EurekaCrypto } from './crypto.mjs'
import { EurekaServer } from './transport.mjs'
/**
* @typedef {Object} EurekaServerProps
* @property {'udp4' | 'udp6' | undefined} type
* @property {string[] | undefined} multicastGroups
* @property {string[] | undefined} interfaces
* @property {number | undefined} port
* @property {number | undefined} chunkSize
* @property {number | undefined} chunkSpacing
*/
/**
* @typedef {Object} EurekaCryptoProps
* @property {string} password
* @property {string} salt
* @property {'chacha20-poly1305' | 'aes-256-gcm' | undefined} algorithm
* @property {number | undefined} keyId
* @property {'scrypt' | 'pbkdf2' | undefined} kdfFunction
*
*/
/**
* @typedef {Object} Logger
* @property {function} log
* @property {function} info
* @property {function} error
* @property {function} warn
* @property {function} debug
* @property {function} trace
*/
/**
* @typedef {Object} EurekaProps
* @property {EurekaServerProps} server
* @property {EurekaCryptoProps} crypto
* @property {Logger} logger
*/
export class Eureka extends EventEmitter {
/**
* Create a Eurkea instance, this intended to facilitate discovery of
* other instances and share information between instances in a secure way.
* Authenticated encryption is used to ensure integrity of the messages.
*
* Additional authentication data is used to bind the encrypted payloads
* to the IP sending the data. Meaning if the data is tampered with and/or relayed
* through another box, the authenticated encryption will fail.
*
* This is intended to be a core library that can be re-used by another piece
* that applies more opinions on the structure of the messages and actions
* on receipt of messages.
*
* This is intended to be used on devices in the same network segment and visible to each other at L2.
*
* You must provided a shared salt and password, how this is fetched/seeded is left up to the upper layer.
* Additionally, rotation of the shared key material is also left to the upper layer.
* Currently rotation must happen all at once. In the future a keyset with keys that can be aged out for seemless
* rotation may be supported.
*
* You must provide the message payload that will be broadcast (by default every minute).
*
* You may override the crypto object used to protect and authenticate the payloads, to do so provide
* props.crypto.instance. If you do not the, a default crypto instance will be created which uses Scrypt and ChaCha20
*
* @param {EurekaProps} props
*/
constructor (props) {
super()
this._ready = false
this.validatePropsForServer(props)
this.validatePropsForEureka(props)
this.messageBroadcastInterval = props.broadcastInterval ?? 60000
this.messageBroadcastData = Buffer.from(JSON.stringify(props.messageData))
this.logger = props.logger
if (props.crypto?.instance) {
this.crypto = props.crypto.instance
} else {
this.validatePropsForCrypto(props)
this.crypto = new EurekaCrypto(this.buildCryptoProps(props))
}
this.server = new EurekaServer(this.buildServerProps(props))
.on('message', this.onMessage.bind(this))
.on('error', this.onError.bind(this))
.on('ready', this.onReady.bind(this))
this.broadcastTimer = setInterval(this.broadcast.bind(this), this.messageBroadcastInterval)
}
/**
* Broadcast the message to all specified multicast groups
* @returns {Promise<void>}
*/
async broadcast () {
try {
this.log('info', 'Sending Broadcast')
await this.server.sendMessage(this.messageBroadcastData)
} catch (err) {
this.onError(err)
}
}
/**
* Handler for when server is ready, this triggers an immediate
* broadcast when server is ready instead of waiting for next interval.
* @returns {void}
*/
onReady () {
this._ready = true
this.log('info', 'Server is ready to receive messages')
// trigger immediate startup broadcast
this.broadcast()
// bubble up to consumers
this.emit('ready')
}
/**
* Check if the server is ready
* @returns {boolean} true if the server is ready
*/
isReady () {
return this._ready
}
/**
* Handler for all error events, logs and bubbles it up.
* @param {any} err
*/
onError (err) {
// filter crypto errors and fire unauthenticated message recv event instead
if (err && err.message === 'Unsupported state or unable to authenticate data') {
this.log('warn', 'Message arrived that failed authentication', err)
this.emit('unauthMessage', err)
} else {
this.log('error', err)
this.emit('error', err)
}
}
/**
* Handlers for messages, at this layer the message has been authenticated
* and this is plaintext, message is logged and bubbled up
* @param {'*'} msg
*/
onMessage (msg) {
this.log('trace', msg)
try {
// expect a buffer
// expect a buffer of JSON UTF8 Data
this.emit('message', JSON.parse(msg.toString('utf8')))
} catch (err) {
this.onError(err)
}
}
buildCryptoProps (props) {
return { ...props.crypto, ...{ logger: this.logger } }
}
buildServerProps (props) {
const opts = props.server ?? {}
return {
...opts,
...{
crypto: this.crypto,
logger: this.logger
}
}
}
validatePropsForEureka (props) {
if (!props.messageData) throw new Error('props.messageData must be provided! This is the broadcast data sent to other clients.')
}
validatePropsForCrypto (props) {
if (!props.crypto.salt) throw new Error('props.crypto.salt must be provided!')
if (!props.crypto.password) throw new Error('props.crypto.password must be provided!')
}
validatePropsForServer (_props) {
// server can default everything, just a placeholder
}
/**
* Call into the logger object if provided, follows console interface behavior
* @param {('log'|'info'|'error'|'warn'|'debug'|'trace')} level
* @param {any} msg
*/
log (level, msg) {
if (this.logger) {
this.logger[level](msg)
}
}
/**
* Stop sending messages and clean up the resources created,
* This is a destructive call, you will need to remake a Eureka instance
* after calling this. It is intended to be called on exit.
*/
close () {
try {
clearInterval(this.broadcastTimer)
this.server.closeServer()
} catch (err) {
this.onError(err)
throw err
}
}
/**
* Set the broadcast data, it's expected that this is a model obj/instance
* as it is serialized and converted to a buffer. This will be sent on next interval.
* @param {any} data
*/
setBroadcastData (data) {
try {
const newData = Buffer.from(JSON.stringify(data))
this.messageBroadcastData = newData
} catch (err) {
this.onError(err)
}
}
/**
* Send a one time custom message
* @param {any} msg
* @returns {Promise<void>}
*/
async sendMessage (msg) {
const data = Buffer.from(JSON.stringify(msg))
try {
await this.server.sendMessage(data)
} catch (err) {
this.onError(err)
// since this is also invoked, re-throw
// we want the 'error' event to capture everything but also support
// throwing to caller in cases where method is expected to be direct called
// by consumers
throw err
}
}
}