'use strict' var EventEmitter = require('events').EventEmitter var utils = require('./utils') var sasl = require('./crypto/sasl') var TypeOverrides = require('./type-overrides') var ConnectionParameters = require('./connection-parameters') var Query = require('./query') var defaults = require('./defaults') var Connection = require('./connection') const crypto = require('./crypto/utils') class Client extends EventEmitter { constructor(config) { super() this.connectionParameters = new ConnectionParameters(config) this.user = this.connectionParameters.user this.database = this.connectionParameters.database this.port = this.connectionParameters.port this.host = this.connectionParameters.host // "hiding" the password so it doesn't show up in stack traces // or if the client is console.logged Object.defineProperty(this, 'password', { configurable: true, enumerable: false, writable: true, value: this.connectionParameters.password, }) this.replication = this.connectionParameters.replication var c = config || {} this._Promise = c.Promise || global.Promise this._types = new TypeOverrides(c.types) this._ending = false this._ended = false this._connecting = false this._connected = false this._connectionError = false this._queryable = true this.connection = c.connection || new Connection({ stream: c.stream, ssl: this.connectionParameters.ssl, keepAlive: c.keepAlive || false, keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, encoding: this.connectionParameters.client_encoding || 'utf8', }) this.queryQueue = [] this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null this.ssl = this.connectionParameters.ssl || false // As with Password, make SSL->Key (the private key) non-enumerable. // It won't show up in stack traces // or if the client is console.logged if (this.ssl && this.ssl.key) { Object.defineProperty(this.ssl, 'key', { enumerable: false, }) } this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 } _errorAllQueries(err) { const enqueueError = (query) => { process.nextTick(() => { query.handleError(err, this.connection) }) } if (this.activeQuery) { enqueueError(this.activeQuery) this.activeQuery = null } this.queryQueue.forEach(enqueueError) this.queryQueue.length = 0 } _connect(callback) { var self = this var con = this.connection this._connectionCallback = callback if (this._connecting || this._connected) { const err = new Error('Client has already been connected. You cannot reuse a client.') process.nextTick(() => { callback(err) }) return } this._connecting = true this.connectionTimeoutHandle if (this._connectionTimeoutMillis > 0) { this.connectionTimeoutHandle = setTimeout(() => { con._ending = true con.stream.destroy(new Error('timeout expired')) }, this._connectionTimeoutMillis) } if (this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) } else { con.connect(this.port, this.host) } // once connection is established send startup message con.on('connect', function () { if (self.ssl) { con.requestSsl() } else { con.startup(self.getStartupConf()) } }) con.on('sslconnect', function () { con.startup(self.getStartupConf()) }) this._attachListeners(con) con.once('end', () => { const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') clearTimeout(this.connectionTimeoutHandle) this._errorAllQueries(error) this._ended = true if (!this._ending) { // if the connection is ended without us calling .end() // on this client then we have an unexpected disconnection // treat this as an error unless we've already emitted an error // during connection. if (this._connecting && !this._connectionError) { if (this._connectionCallback) { this._connectionCallback(error) } else { this._handleErrorEvent(error) } } else if (!this._connectionError) { this._handleErrorEvent(error) } } process.nextTick(() => { this.emit('end') }) }) } connect(callback) { if (callback) { this._connect(callback) return } return new this._Promise((resolve, reject) => { this._connect((error) => { if (error) { reject(error) } else { resolve() } }) }) } _attachListeners(con) { // password request handling con.on('authenticationCleartextPassword', this._handleAuthCleartextPassword.bind(this)) // password request handling con.on('authenticationMD5Password', this._handleAuthMD5Password.bind(this)) // password request handling (SASL) con.on('authenticationSASL', this._handleAuthSASL.bind(this)) con.on('authenticationSASLContinue', this._handleAuthSASLContinue.bind(this)) con.on('authenticationSASLFinal', this._handleAuthSASLFinal.bind(this)) con.on('backendKeyData', this._handleBackendKeyData.bind(this)) con.on('error', this._handleErrorEvent.bind(this)) con.on('errorMessage', this._handleErrorMessage.bind(this)) con.on('readyForQuery', this._handleReadyForQuery.bind(this)) con.on('notice', this._handleNotice.bind(this)) con.on('rowDescription', this._handleRowDescription.bind(this)) con.on('dataRow', this._handleDataRow.bind(this)) con.on('portalSuspended', this._handlePortalSuspended.bind(this)) con.on('emptyQuery', this._handleEmptyQuery.bind(this)) con.on('commandComplete', this._handleCommandComplete.bind(this)) con.on('parseComplete', this._handleParseComplete.bind(this)) con.on('copyInResponse', this._handleCopyInResponse.bind(this)) con.on('copyData', this._handleCopyData.bind(this)) con.on('notification', this._handleNotification.bind(this)) } // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function // it can be supplied by the user if required - this is a breaking change! _checkPgPass(cb) { const con = this.connection if (typeof this.password === 'function') { this._Promise .resolve() .then(() => this.password()) .then((pass) => { if (pass !== undefined) { if (typeof pass !== 'string') { con.emit('error', new TypeError('Password must be a string')) return } this.connectionParameters.password = this.password = pass } else { this.connectionParameters.password = this.password = null } cb() }) .catch((err) => { con.emit('error', err) }) } else if (this.password !== null) { cb() } else { try { const pgPass = require('pgpass') pgPass(this.connectionParameters, (pass) => { if (undefined !== pass) { this.connectionParameters.password = this.password = pass } cb() }) } catch (e) { this.emit('error', e) } } } _handleAuthCleartextPassword(msg) { this._checkPgPass(() => { this.connection.password(this.password) }) } _handleAuthMD5Password(msg) { this._checkPgPass(async () => { try { const hashedPassword = await crypto.postgresMd5PasswordHash(this.user, this.password, msg.salt) this.connection.password(hashedPassword) } catch (e) { this.emit('error', e) } }) } _handleAuthSASL(msg) { this._checkPgPass(() => { try { this.saslSession = sasl.startSession(msg.mechanisms) this.connection.sendSASLInitialResponseMessage(this.saslSession.mechanism, this.saslSession.response) } catch (err) { this.connection.emit('error', err) } }) } async _handleAuthSASLContinue(msg) { try { await sasl.continueSession(this.saslSession, this.password, msg.data) this.connection.sendSCRAMClientFinalMessage(this.saslSession.response) } catch (err) { this.connection.emit('error', err) } } _handleAuthSASLFinal(msg) { try { sasl.finalizeSession(this.saslSession, msg.data) this.saslSession = null } catch (err) { this.connection.emit('error', err) } } _handleBackendKeyData(msg) { this.processID = msg.processID this.secretKey = msg.secretKey } _handleReadyForQuery(msg) { if (this._connecting) { this._connecting = false this._connected = true clearTimeout(this.connectionTimeoutHandle) // process possible callback argument to Client#connect if (this._connectionCallback) { this._connectionCallback(null, this) // remove callback for proper error handling // after the connect event this._connectionCallback = null } this.emit('connect') } const { activeQuery } = this this.activeQuery = null this.readyForQuery = true if (activeQuery) { activeQuery.handleReadyForQuery(this.connection) } this._pulseQueryQueue() } // if we receieve an error event or error message // during the connection process we handle it here _handleErrorWhileConnecting(err) { if (this._connectionError) { // TODO(bmc): this is swallowing errors - we shouldn't do this return } this._connectionError = true clearTimeout(this.connectionTimeoutHandle) if (this._connectionCallback) { return this._connectionCallback(err) } this.emit('error', err) } // if we're connected and we receive an error event from the connection // this means the socket is dead - do a hard abort of all queries and emit // the socket error on the client as well _handleErrorEvent(err) { if (this._connecting) { return this._handleErrorWhileConnecting(err) } this._queryable = false this._errorAllQueries(err) this.emit('error', err) } // handle error messages from the postgres backend _handleErrorMessage(msg) { if (this._connecting) { return this._handleErrorWhileConnecting(msg) } const activeQuery = this.activeQuery if (!activeQuery) { this._handleErrorEvent(msg) return } this.activeQuery = null activeQuery.handleError(msg, this.connection) } _handleRowDescription(msg) { // delegate rowDescription to active query this.activeQuery.handleRowDescription(msg) } _handleDataRow(msg) { // delegate dataRow to active query this.activeQuery.handleDataRow(msg) } _handlePortalSuspended(msg) { // delegate portalSuspended to active query this.activeQuery.handlePortalSuspended(this.connection) } _handleEmptyQuery(msg) { // delegate emptyQuery to active query this.activeQuery.handleEmptyQuery(this.connection) } _handleCommandComplete(msg) { // delegate commandComplete to active query this.activeQuery.handleCommandComplete(msg, this.connection) } _handleParseComplete(msg) { // if a prepared statement has a name and properly parses // we track that its already been executed so we don't parse // it again on the same client if (this.activeQuery.name) { this.connection.parsedStatements[this.activeQuery.name] = this.activeQuery.text } } _handleCopyInResponse(msg) { this.activeQuery.handleCopyInResponse(this.connection) } _handleCopyData(msg) { this.activeQuery.handleCopyData(msg, this.connection) } _handleNotification(msg) { this.emit('notification', msg) } _handleNotice(msg) { this.emit('notice', msg) } getStartupConf() { var params = this.connectionParameters var data = { user: params.user, database: params.database, } var appName = params.application_name || params.fallback_application_name if (appName) { data.application_name = appName } if (params.replication) { data.replication = '' + params.replication } if (params.statement_timeout) { data.statement_timeout = String(parseInt(params.statement_timeout, 10)) } if (params.lock_timeout) { data.lock_timeout = String(parseInt(params.lock_timeout, 10)) } if (params.idle_in_transaction_session_timeout) { data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) } if (params.options) { data.options = params.options } return data } cancel(client, query) { if (client.activeQuery === query) { var con = this.connection if (this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) } else { con.connect(this.port, this.host) } // once connection is established send cancel message con.on('connect', function () { con.cancel(client.processID, client.secretKey) }) } else if (client.queryQueue.indexOf(query) !== -1) { client.queryQueue.splice(client.queryQueue.indexOf(query), 1) } } setTypeParser(oid, format, parseFn) { return this._types.setTypeParser(oid, format, parseFn) } getTypeParser(oid, format) { return this._types.getTypeParser(oid, format) } // escapeIdentifier and escapeLiteral moved to utility functions & exported // on PG // re-exported here for backwards compatibility escapeIdentifier(str) { return utils.escapeIdentifier(str) } escapeLiteral(str) { return utils.escapeLiteral(str) } _pulseQueryQueue() { if (this.readyForQuery === true) { this.activeQuery = this.queryQueue.shift() if (this.activeQuery) { this.readyForQuery = false this.hasExecuted = true const queryError = this.activeQuery.submit(this.connection) if (queryError) { process.nextTick(() => { this.activeQuery.handleError(queryError, this.connection) this.readyForQuery = true this._pulseQueryQueue() }) } } else if (this.hasExecuted) { this.activeQuery = null this.emit('drain') } } } query(config, values, callback) { // can take in strings, config object or query object var query var result var readTimeout var readTimeoutTimer var queryCallback if (config === null || config === undefined) { throw new TypeError('Client was passed a null or undefined query') } else if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config if (typeof values === 'function') { query.callback = query.callback || values } } else { readTimeout = this.connectionParameters.query_timeout query = new Query(config, values, callback) if (!query.callback) { result = new this._Promise((resolve, reject) => { query.callback = (err, res) => (err ? reject(err) : resolve(res)) }).catch(err => { // replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the // application that created the query Error.captureStackTrace(err); throw err; }) } } if (readTimeout) { queryCallback = query.callback readTimeoutTimer = setTimeout(() => { var error = new Error('Query read timeout') process.nextTick(() => { query.handleError(error, this.connection) }) queryCallback(error) // we already returned an error, // just do nothing if query completes query.callback = () => {} // Remove from queue var index = this.queryQueue.indexOf(query) if (index > -1) { this.queryQueue.splice(index, 1) } this._pulseQueryQueue() }, readTimeout) query.callback = (err, res) => { clearTimeout(readTimeoutTimer) queryCallback(err, res) } } if (this.binary && !query.binary) { query.binary = true } if (query._result && !query._result._types) { query._result._types = this._types } if (!this._queryable) { process.nextTick(() => { query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) }) return result } if (this._ending) { process.nextTick(() => { query.handleError(new Error('Client was closed and is not queryable'), this.connection) }) return result } this.queryQueue.push(query) this._pulseQueryQueue() return result } ref() { this.connection.ref() } unref() { this.connection.unref() } end(cb) { this._ending = true // if we have never connected, then end is a noop, callback immediately if (!this.connection._connecting || this._ended) { if (cb) { cb() } else { return this._Promise.resolve() } } if (this.activeQuery || !this._queryable) { // if we have an active query we need to force a disconnect // on the socket - otherwise a hung query could block end forever this.connection.stream.destroy() } else { this.connection.end() } if (cb) { this.connection.once('end', cb) } else { return new this._Promise((resolve) => { this.connection.once('end', resolve) }) } } } // expose a Query constructor Client.Query = Query module.exports = Client