Server.js

// @flow
'use strict'

const Client = require('./Client')
const EventEmitter = require('eventemitter3')
const WebSocket = require('ws')
const WebSocketServer = require('ws').Server
const uid = require('uid-safe')
const { CLOSE_FORBIDDEN } = require('./constants')
const { assign, attempt, fromCallback } = require('./utils')

const defaults = {
  WebSocketServer,
  connectionHook: null,
  authTimeout: 20000
}

/**
 * Connection hook is run when a client connects to a server. The
 * result is used as an auth reply data. May also return promises for
 * an asynchronous execution. If the promise is rejected or an error
 * is thrown, then auth has failed and the socket will be closed.
 *
 * @callback Server.ConnectionHook
 *
 * @param {Client} client Client.
 * @param {Object} auth Auth data.
 * @return {Promise<Object|undefined>|Object|undefined} Auth reply
 * data.
 */
/* :: type ConnectionHook = (client: Client, data?: any) => Promise<any> */

/**
 * @typedef {Object} Server.ServerOptions
 *
 * @property {Server.ConnectionHook} [connectionHook] Connection
 * hook.
 * @property {Object} [WebSocketServer] Alternative constructor for ws
 * server.
 * @property {number} [authTimeout=20000] Auth message wait timeout in
 * ms.
 */
/* ::
type ServerOptions = { WebSocketServer?: constructor,
                       connectionHook?: ConnectionHook,
                       authTimeout?: number
                     }
*/

/* :: type SocketOptions = Object */

/**
 * @extends EventEmitter
 *
 * @emits Server#ready
 * @emits Server#error
 *
 * @borrows Client#encodeMessage as #encodeMessage
 */
class Server extends EventEmitter {
  /* ::
  authTimeout: number
  WebSocketServer: constructor
  clients: Map<string, Object>
  connectionHook: ConnectionHook
  socketOptions: Object
  wss: Object
   */

  /**
   * Starts a server.
   *
   * @param {Object} wssOptions Options that are passed to ws server.
   * @param {Server.ServerOptions} [serverOptions] Server options.
   * @param {Client.SocketOptions} [socketOptions] Socket options.
   */
  constructor (wssOptions /* : Object */,
    serverOptions /* : ServerOptions */ = {},
    socketOptions /* : SocketOptions  */ = {}) {
    super()
    this.socketOptions = { WebSocket }
    assign(this.socketOptions, socketOptions)
    assign(this, defaults, serverOptions)
    this.clients = new Map()
    this._setEvents(wssOptions)
  }

  _setEvents (wssOptions /* : Object */) /* : void */ {
    /**
     * Emits a ready event.
     * @private
     * @event Server#ready
     */
    this.wss = new this.WebSocketServer(wssOptions, (error) => {
      error ? this.emit('error', error) : this.emit('ready')
    })
    /**
     * Emits wss error events. Does not throw if there are no
     * listeners.
     * @private
     * @event Server#error
     * @param {Error} error Error.
     */
    this.wss.on('error', this.emit.bind(this))
    this.wss.on('connection', socket => this._onConnection(socket))
  }

  _onConnection (socket /* : Object & EventEmitter */) /* : void */ {
    const timeout = setTimeout(
      socket.close.bind(socket, CLOSE_FORBIDDEN), this.authTimeout)
    socket.once('message', data => this._addClient(socket, data, timeout))
  }

  _addClient (socket /* : EventEmitter */,
    data /* : any */,
    timeout /* : number */) /* : void */ {
    let client
    clearTimeout(timeout)
    uid(18).then(id => {
      client = new Client(null, assign({ socket, id }, this.socketOptions))
      client.autoReconnect = false
      this.clients.set(client.id, client)
      client.on('close', () => this._removeClient(client.id))
      if (this.connectionHook) {
        return attempt(() => client.decoder(data))
          .then(authData => this.connectionHook(client, authData))
      }
    }).then(authReplyData => {
      if (client._isOpen()) {
        client._setEvents()
        client.connected = true
        client.send('connect', authReplyData).catch(() => {})
        client._ping()
      }
    }).catch(error => {
      /* istanbul ignore else */
      if (client) {
        const str = error.toString()
        client.close(CLOSE_FORBIDDEN, str)
      }
    })
  }

  _removeClient (id /* : string */) /* : void */ {
    this.clients.delete(id)
  }

  /**
   * Git a client by id.
   * @param {string} id Client id.
   * @returns {Client|undefined} Client if found.
   */
  getClient (id /* : string */) /* : Client */ {
    return this.clients.get(id)
  }

  encodeMessage (event /* : string */, ...args /* : Array<any> */) /* : any */ {
    const encoder = this.socketOptions.encoder || JSON.stringify
    return attempt(() => encoder({ name: event, args }))
  }

  /**
   * Closes a server.
   * @param {code} [code=1000] Code as per WebSocket spec.
   * @returns {Promise<undefined>} Promise.
   */
  close (code /* : number */ = 1000) /* : Promise<void> */ {
    for (const [, client] of this.clients) {
      client.close(code)
    }
    this.clients.clear()
    return fromCallback(cb => this.wss.close(cb))
  }
}

module.exports = Server