'use strict'
/* global WebSocket */
// Shared code for node and clients/browsers
const EventEmitter = require('eventemitter3')
const { assign, attempt, fromCallback, hasOwnProperty } = require('./utils')
const blacklist = ['close', 'open', 'error', 'pong', 'retry']
// utils
const concat = [].concat.bind([])
const emit = EventEmitter.prototype.emit
function isBlacklistedEvent (name) {
return blacklist.indexOf(name) >= 0
}
function getOwnProp (obj, name) {
return hasOwnProperty(obj, name) ? obj[name] : undefined
}
// errors
/**
* {@link Client#send}/{@link Client#invoke} is rejected with this
* error when connection is closed before a message is send or reply
* received (for `invoke` only).
*
* @class
* @augments Error
* @memberof Client
* @param {number} [id] Message id.
*/
function ConnectionError (id) {
this.name = 'ConnectionError'
this.id = id
}
ConnectionError.prototype = Object.create(Error.prototype)
/**
* {@link Client#invoke} is rejected with this error when no reply is
* received before {@link Client.SocketOptions} `ackTimeout`.
*
* @class
* @augments Error
* @memberof Client
* @param {number} id Message id.
*/
function TimeoutError (id) {
this.name = 'TimeoutError'
this.id = id
}
TimeoutError.prototype = Object.create(Error.prototype)
/**
* {@link Client#invoke} is rejected with this error by the other side
* when no handler is found for a procedure.
*
* @class
* @augments Error
* @memberof Client
* @param {string} procedure Name.
*/
function NoProcedureError (procedure) {
this.name = 'NoProcedureError'
this.procedure = procedure
}
NoProcedureError.prototype = Object.create(Error.prototype)
// validation
function validateId (id) {
return typeof id === 'number' && (id % 1) === 0 && id > 0
}
function validate (message) {
let passed = false
if (message instanceof Object) {
if (message.name) { // message
if (typeof message.name === 'string' && message.args instanceof Array) {
const nprops = Object.keys(message).length
if (nprops === 2 || (nprops === 3 && validateId(message.id))) {
passed = true
}
}
} else { // ack message
const nprops = Object.keys(message).length
if (nprops === 2 && validateId(message.id)) {
if (hasOwnProperty(message, 'error') || hasOwnProperty(message, 'result')) {
passed = true
}
}
}
}
if (!passed) {
throw new Error('Validation error')
}
}
// ack
class Ack {
constructor (id, timeout, cb) {
this.id = id
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve
this.reject = reject
})
this.cb = cb
this.timeout = setTimeout(() => this.forceNack(new TimeoutError(id)), timeout)
}
forceNack (error) {
clearTimeout(this.timeout)
this.cb()
this.reject(error)
}
settle (message) {
clearTimeout(this.timeout)
this.cb()
if (hasOwnProperty(message, 'error')) {
this.reject(message.error)
} else {
this.resolve(message.result)
}
}
}
// client
/**
* General format for all data that is sent or received over a
* websocket.
*
* @typedef {Object} Client.Message
*
* @property {number} [id]
* @property {string} [name]
* @property {Array} [args]
* @property {Object} [result]
* @property {Object} [error]
*/
/**
* Messages encoder. May also return promises for an asynchronous
* execution.
*
* @callback Client.Encoder
* @param {Client.Message} message Message.
* @return {Promise<Object>|Object} Data to send.
*/
/**
* Messages decoder. May also return promises for an asynchronous
* execution.
*
* @callback Client.Decoder
* @param {Object} data Received data.
* @return {Promise<Client.Message>|Client.Message} Message.
*/
/**
* Receive hook is run when a client receives a valid message via a
* websocket. May also return promises for an asynchronous execution.
*
* @callback Client.ReceiveHook
* @param {Client.Message} message Message.
* @return {Promise<undefined>|undefined} Promise, if it is rejected no
* handlers will be called.
*/
/**
* Send hook is run when a client sends any message via a
* websocket. May also return promises for an asynchronous execution.
*
* @callback Client.SendHook
* @param {Client.Message|Object} message Message or object if
* `isEncoded` is `true`.
* @param {boolean} isEncoded If a message has been already encoded
* via {@link Client#encodeMessage} or {@link Server#encodeMessage}.
* @return {Promise<undefined>|undefined} Promise, if it is rejected no
* handlers will be called.
*/
/**
* @typedef {Object} Client.RetryConfig
*
* @property {number} [factor=2]
* @property {number} [maxTimeout=Infinity]
* @property {number} [minTimeout=1000]
* @property {boolean} [randomize=true]
* @property {number} [retries=10]
*/
/**
* @typedef {Object} Client.SocketOptions
*
* @property {number} [ackTimeout=20000] Result wait timeout for
* {@link Client#invoke} in ms.
* @property {Object} [auth={}] Auth data.
* @property {boolean} [autoReconnect=true] Enable auto reconnect.
* @property {Client.RetryConfig} [autoReconnectOptions] Auto
* reconnect config.
* @property {string} [binaryType='arraybuffer'] W3C WebSocket
* binary data type.
* @property {Client.Decoder} [decoder=JSON.parse] Messages decoder.
* @property {Client.Encoder} [encoder=JSON.stringify] Messages
* encoder.
* @property {function} [errorFormatter=String] Converter for JS
* errors to some network format.
* @property {number} [pingInterval=20000] Ping interval in ms.
* @property {number} [pingTimeout=20000] Ping timeout in ms.
* @property {string|Array<string>} [protocols='ws-messaging']
* WebSocket protocols.
* @property {Client.ReceiveHook} [receiveHook] Receive hook.
* @property {Client.SendHook} [rendHook] Send hook.
* @property {boolean} [skipValidation=false] Skips build-in
* messages validation.
* @property {Object} [WebSocket=undefined] Alternative websocket
* constructor, if it is undefined then a global WebSocket is used.
* @property {boolean} [w3c=undefined] If WebSocket is using a w3c
* send API, or a ws one (from Node.js server implementation with a
* callback). By default if a global value is used, then it is `true`
* and `false` otherwise.
* @property {Object} [wsOptions] Additional options to pass to ws
* socket constructor.
*/
const retryConfig = {
factor: 2,
maxTimeout: Infinity,
minTimeout: 1000,
randomize: true,
retries: 10
}
const defaults = {
ackTimeout: 20000,
auth: {},
autoReconnect: true,
autoReconnectOptions: retryConfig,
binaryType: 'arraybuffer',
decoder: JSON.parse,
encoder: JSON.stringify,
errorFormatter: String,
pingInterval: 20000,
pingTimeout: 20000,
protocols: 'ws-messaging',
receiveHook: null,
sendHook: null,
skipValidation: false,
WebSocket: undefined,
w3c: undefined,
wsOptions: undefined
}
/**
* @extends EventEmitter
*
* @emits Client#close
* @emits Client#open
* @emits Client#error
* @emits Client#connect
*/
class Client extends EventEmitter {
/**
* Creates a client.
*
* @param {string} url WebSocket connection url.
* @param {Client.SocketOptions} [options] Socket options.
*/
constructor (url, options = {}) {
super()
this.url = url
/**
* Client id. Server-side only.
* @member {number} id
* @memberof Client
* @instance
* @readonly
*/
assign(this, defaults, options)
this.retryConfig = {}
assign(this.retryConfig, retryConfig, options.autoReconnectOptions)
if (!this.WebSocket) {
this.WebSocket = WebSocket
this.w3c = this.w3c === undefined ? true : this.w3c
}
/**
* If true, then a client is connected.
* @member {boolean}
* @readonly
*/
this.connected = false
this.counter = 1
this.data = {}
this.handlers = {}
this.pendingAcks = {}
this.attempt = 0
/**
* If true, then a client was closed via a close method or an auth
* error occurred.
* @member {boolean}
* @readonly
*/
this.terminated = false
this.register('ping', () => Promise.resolve())
this.reconnect()
}
_onMessage (data) {
let message
attempt(() => this.decoder(data.data))
.then(msg => { message = msg })
.then(() => { if (!this.skipValidation) { validate(message) } })
.then(() => { if (this.receiveHook) { return this.receiveHook(message) } })
.then(() => this._dispatch(message))
/**
* Emitted when the other side failed to decode or validate a
* websocket message, namely an error is occurred inside either
* `decoder` or `receiveHook`.
* @event Client#preprocessingError
* @param {Object} error Converted error.
*/
.catch(error => this.send('preprocessingError', this.errorFormatter(error)))
}
_setEvents () {
/**
* Emits w3c onopen WebSocket events.
* @event Client#open
*/
this.socket.onopen = emit.bind(this, 'open')
/**
* Emits w3c onerror WebSocket events. Does not throw if there are
* no listeners.
* @event Client#error
* @param {Error} error Error.
*/
this.socket.onerror = emit.bind(this, 'error')
this.socket.onclose = this._onClose.bind(this)
this.socket.onmessage = this._onMessage.bind(this)
}
_ping () {
this.pingTimeoutId = setTimeout(() => {
emit.call(this, 'ping')
const timeout = this.pingTimeout
const { message, promise } = this._makeMessage('ping', [], true, timeout)
this._send(message).then(() => promise)
.then(() => emit.call(this, 'pong'))
.then(() => this._ping())
.catch(() => this.close(4008, 'Ping timeout', false))
}, this.pingInterval)
}
_isOpen () {
return this.socket &&
(this.socket.readyState === 0 || this.socket.readyState === 1)
}
_reconnect () {
const { factor, maxTimeout, minTimeout, randomize, retries } = this.retryConfig
if (this.attempt >= retries || this.terminated) { return }
const rand = 1 + (randomize ? Math.random() : 0)
const timeout = Math.min(rand * minTimeout * Math.pow(factor, this.attempt), maxTimeout)
this.reconnectTimeoutId = setTimeout(this.reconnect.bind(this), timeout)
this.attempt++
}
_open () {
clearTimeout(this.reconnectTimeoutId)
/**
* Underlying websocket.
* @member {WebSocket}
* @readonly
*/
this.socket = new this.WebSocket(this.url, this.protocols, this.wsOptions)
if (this.w3c) { this.socket.binaryType = this.binaryType }
this.connectHandler = () => {
this.connected = true
this.attempt = 0
clearTimeout(this.authTimeoutId)
this._ping()
}
this.openHandler = () => {
this._send(this.auth, { isAuth: true })
this.authTimeoutId = setTimeout(
this.close.bind(this, 4008, 'Auth timeout', false),
this.ackTimeout)
this.once('connect', this.connectHandler)
}
this.once('open', this.openHandler)
this._setEvents()
if (this.attempt > 0) {
/**
* Emits retry events when auto reconnecting.
* @event Client#retry
* @param {number} attempt Attempt number starting from `1`.
*/
emit.call(this, 'retry', this.attempt)
}
}
_onClose (ev) {
this.connected = false
clearTimeout(this.pingTimeoutId)
clearTimeout(this.authTimeoutId)
clearTimeout(this.reconnectTimeoutId)
this.off('connect', this.connectHandler)
this.off('open', this.openHandler)
if (ev.code === 4003 || !this.url) { this.terminated = true }
for (const id in this.pendingAcks) {
/* istanbul ignore else */
if (hasOwnProperty(this.pendingAcks, id)) {
const ack = this.pendingAcks[id]
ack.forceNack(new ConnectionError(id))
}
}
this.pendingAcks = {}
if (!this.terminated && this.autoReconnect) { this._reconnect() }
/**
* Emits w3c onclose WebSocket events.
* @event Client#close
* @param {CloseEvent} data Close event data.
*/
emit.call(this, 'close', ev)
}
_makeMessage (name, args, needsAck, ackTimeout = this.ackTimeout) {
let promise
const message = { name, args }
if (needsAck) {
const id = this.counter++
const ack = new Ack(id, ackTimeout, () => delete this.pendingAcks[id])
this.pendingAcks[id] = ack
promise = ack.promise
message.id = id
}
return { message, promise }
}
/**
* Socket connection is open and client has passed an auth
* check. Client-side only.
* @event Client#connect
* @param {Object|undefined} data Auth reply data.
*/
_dispatch (message) {
if (message.name) {
if (isBlacklistedEvent(message.name)) { return }
if (message.name === 'connect' && this.connected) { return }
if (message.id) {
const id = message.id
const fn = getOwnProp(this.handlers, message.name)
if (fn) {
attempt(() => fn.apply(null, message.args))
.then((result = null) => this._send({ id, result }))
.catch(error => this._send({ id, error: this.errorFormatter(error) }))
} else {
const error = this.errorFormatter(new NoProcedureError(message.name))
this._send({ id, error })
}
} else {
emit.apply(this, concat(message.name, message.args))
}
} else {
const ack = this.pendingAcks[message.id]
if (ack) { ack.settle(message) }
}
}
_send (message, { skipEncoder = false, isAuth = false } = {}) {
return attempt(() => this.sendHook ? this.sendHook(message, skipEncoder) : null)
.then(() => skipEncoder ? message : this.encoder(message))
.then(data => {
if (!this.connected && !isAuth) {
const id = skipEncoder ? undefined : message.id
throw new ConnectionError(id)
}
if (this.w3c) {
return this.socket.send(data)
} else {
const binary = typeof data !== 'string'
return fromCallback(cb => this.socket.send(data, { binary }, cb))
}
})
}
/**
* Send an event, no reply. Use {@link on} or {@link once} methods
* to listen events on a recipient side. Reserved event names
* (__MUST NOT__ be used): `connect`, `close`, `open`, `error`,
* `ping`, `pong`, `retry`.
* @param {string} event Event name.
* @param {*} [args] Arguments.
* @returns {Promise<undefined>} Resolves when a data has been sent.
*/
send (event, ...args) {
const { message } = this._makeMessage(event, args, false)
return this._send(message)
}
/**
* Send a message encoded by {@link Client#encodeMessage} or {@link
* Server#encodeMessage}, useful for identical messages
* broadcasting.
* @param {Object} data Result of {@link Client#encodeMessage}.
* @returns {Promise<undefined>} Resolves when a data has been sent.
*/
sendEncoded (data) {
return this._send(data, { skipEncoder: true })
}
/**
* Encode a message for a later use with {@link Client#sendEncoded}.
* Reserved event names (__MUST NOT__ be used): `connect`, `close`,
* `open`, `error`, `ping`, `pong`, `retry`.
* @param {string} event Event name.
* @param {*} [args] Arguments.
* @returns {Object} Encoded message.
*/
encodeMessage (event, ...args) {
const { message } = this._makeMessage(event, args, false)
return attempt(() => this.encoder(message))
}
/**
* Invoke an RPC procedure. Use {@link Client#register} method to
* assign an RPC method handler. Reserved procedure names (__MUST
* NOT__ be used): `connect`, `close`, `open`, `error`, `ping`,
* `pong`, `retry`.
* @param {string} name Procedure name.
* @param {*} [args] Arguments.
* @returns {Promise<Object>} Resolves or rejects when a reply is
* received.
*/
invoke (name, ...args) {
const { message, promise } = this._makeMessage(name, args, true)
return this._send(message).then(() => promise)
}
/**
* Register an RPC handler. Each name must have no more than a one
* handler, so it throws an error on a duplicate handler
* registration attempt. Use {@link Client#invoke} to call a method.
* @param {string} name Procedure name.
* @param {function} handler A function that returns a Promise.
*/
register (name, handler) {
if (getOwnProp(this.handlers, name)) {
throw new Error(`Can't register a duplicate RPC handler for ${name}`)
}
this.handlers[name] = handler
}
/**
* Reconnect. Client-side only.
*/
reconnect () {
this.terminated = false
if (this._isOpen()) {
// eslint-disable-next-line no-useless-return
return
} else if (this.WebSocket && this.url) {
this._open()
} else {
throw new Error('Malformed configuration options')
}
}
/**
* Closes a client connection.
* @param {number} [code=1000] Code as per WebSocket spec.
* @param {string} [str] Optional string.
* @param {boolean} [terminate=true] Disable reconnect.
*/
close (code = 1000, str, terminate = true) {
if (!this.terminated) {
this.terminated = terminate
if (this._isOpen()) { this.socket.close(code, str) }
}
}
}
/**
* Alias for {@link Client#send}.
* @method
* @name Client#emit
*/
Client.prototype.emit = Client.prototype.send
/**
* Alias for {@link Client#sendEncoded}.
* @method
* @name Client#emitEncoded
*/
Client.prototype.emitEncoded = Client.prototype.sendEncoded
Client.ConnectionError = ConnectionError
Client.NoProcedureError = NoProcedureError
Client.TimeoutError = TimeoutError
module.exports = Client