468 lines
13 KiB
JavaScript
468 lines
13 KiB
JavaScript
'use strict'
|
||
const EventEmitter = require('events').EventEmitter
|
||
|
||
const NOOP = function () {}
|
||
|
||
const removeWhere = (list, predicate) => {
|
||
const i = list.findIndex(predicate)
|
||
|
||
return i === -1 ? undefined : list.splice(i, 1)[0]
|
||
}
|
||
|
||
class IdleItem {
|
||
constructor(client, idleListener, timeoutId) {
|
||
this.client = client
|
||
this.idleListener = idleListener
|
||
this.timeoutId = timeoutId
|
||
}
|
||
}
|
||
|
||
class PendingItem {
|
||
constructor(callback) {
|
||
this.callback = callback
|
||
}
|
||
}
|
||
|
||
function throwOnDoubleRelease() {
|
||
throw new Error('Release called on client which has already been released to the pool.')
|
||
}
|
||
|
||
function promisify(Promise, callback) {
|
||
if (callback) {
|
||
return { callback: callback, result: undefined }
|
||
}
|
||
let rej
|
||
let res
|
||
const cb = function (err, client) {
|
||
err ? rej(err) : res(client)
|
||
}
|
||
const result = new Promise(function (resolve, reject) {
|
||
res = resolve
|
||
rej = reject
|
||
}).catch((err) => {
|
||
// replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the
|
||
// application that created the query
|
||
Error.captureStackTrace(err)
|
||
throw err
|
||
})
|
||
return { callback: cb, result: result }
|
||
}
|
||
|
||
function makeIdleListener(pool, client) {
|
||
return function idleListener(err) {
|
||
err.client = client
|
||
|
||
client.removeListener('error', idleListener)
|
||
client.on('error', () => {
|
||
pool.log('additional client error after disconnection due to error', err)
|
||
})
|
||
pool._remove(client)
|
||
// TODO - document that once the pool emits an error
|
||
// the client has already been closed & purged and is unusable
|
||
pool.emit('error', err, client)
|
||
}
|
||
}
|
||
|
||
class Pool extends EventEmitter {
|
||
constructor(options, Client) {
|
||
super()
|
||
this.options = Object.assign({}, options)
|
||
|
||
if (options != null && 'password' in options) {
|
||
// "hiding" the password so it doesn't show up in stack traces
|
||
// or if the client is console.logged
|
||
Object.defineProperty(this.options, 'password', {
|
||
configurable: true,
|
||
enumerable: false,
|
||
writable: true,
|
||
value: options.password,
|
||
})
|
||
}
|
||
if (options != null && options.ssl && options.ssl.key) {
|
||
// "hiding" the ssl->key so it doesn't show up in stack traces
|
||
// or if the client is console.logged
|
||
Object.defineProperty(this.options.ssl, 'key', {
|
||
enumerable: false,
|
||
})
|
||
}
|
||
|
||
this.options.max = this.options.max || this.options.poolSize || 10
|
||
this.options.maxUses = this.options.maxUses || Infinity
|
||
this.options.allowExitOnIdle = this.options.allowExitOnIdle || false
|
||
this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0
|
||
this.log = this.options.log || function () {}
|
||
this.Client = this.options.Client || Client || require('pg').Client
|
||
this.Promise = this.options.Promise || global.Promise
|
||
|
||
if (typeof this.options.idleTimeoutMillis === 'undefined') {
|
||
this.options.idleTimeoutMillis = 10000
|
||
}
|
||
|
||
this._clients = []
|
||
this._idle = []
|
||
this._expired = new WeakSet()
|
||
this._pendingQueue = []
|
||
this._endCallback = undefined
|
||
this.ending = false
|
||
this.ended = false
|
||
}
|
||
|
||
_isFull() {
|
||
return this._clients.length >= this.options.max
|
||
}
|
||
|
||
_pulseQueue() {
|
||
this.log('pulse queue')
|
||
if (this.ended) {
|
||
this.log('pulse queue ended')
|
||
return
|
||
}
|
||
if (this.ending) {
|
||
this.log('pulse queue on ending')
|
||
if (this._idle.length) {
|
||
this._idle.slice().map((item) => {
|
||
this._remove(item.client)
|
||
})
|
||
}
|
||
if (!this._clients.length) {
|
||
this.ended = true
|
||
this._endCallback()
|
||
}
|
||
return
|
||
}
|
||
|
||
// if we don't have any waiting, do nothing
|
||
if (!this._pendingQueue.length) {
|
||
this.log('no queued requests')
|
||
return
|
||
}
|
||
// if we don't have any idle clients and we have no more room do nothing
|
||
if (!this._idle.length && this._isFull()) {
|
||
return
|
||
}
|
||
const pendingItem = this._pendingQueue.shift()
|
||
if (this._idle.length) {
|
||
const idleItem = this._idle.pop()
|
||
clearTimeout(idleItem.timeoutId)
|
||
const client = idleItem.client
|
||
client.ref && client.ref()
|
||
const idleListener = idleItem.idleListener
|
||
|
||
return this._acquireClient(client, pendingItem, idleListener, false)
|
||
}
|
||
if (!this._isFull()) {
|
||
return this.newClient(pendingItem)
|
||
}
|
||
throw new Error('unexpected condition')
|
||
}
|
||
|
||
_remove(client) {
|
||
const removed = removeWhere(this._idle, (item) => item.client === client)
|
||
|
||
if (removed !== undefined) {
|
||
clearTimeout(removed.timeoutId)
|
||
}
|
||
|
||
this._clients = this._clients.filter((c) => c !== client)
|
||
client.end()
|
||
this.emit('remove', client)
|
||
}
|
||
|
||
connect(cb) {
|
||
if (this.ending) {
|
||
const err = new Error('Cannot use a pool after calling end on the pool')
|
||
return cb ? cb(err) : this.Promise.reject(err)
|
||
}
|
||
|
||
const response = promisify(this.Promise, cb)
|
||
const result = response.result
|
||
|
||
// if we don't have to connect a new client, don't do so
|
||
if (this._isFull() || this._idle.length) {
|
||
// if we have idle clients schedule a pulse immediately
|
||
if (this._idle.length) {
|
||
process.nextTick(() => this._pulseQueue())
|
||
}
|
||
|
||
if (!this.options.connectionTimeoutMillis) {
|
||
this._pendingQueue.push(new PendingItem(response.callback))
|
||
return result
|
||
}
|
||
|
||
const queueCallback = (err, res, done) => {
|
||
clearTimeout(tid)
|
||
response.callback(err, res, done)
|
||
}
|
||
|
||
const pendingItem = new PendingItem(queueCallback)
|
||
|
||
// set connection timeout on checking out an existing client
|
||
const tid = setTimeout(() => {
|
||
// remove the callback from pending waiters because
|
||
// we're going to call it with a timeout error
|
||
removeWhere(this._pendingQueue, (i) => i.callback === queueCallback)
|
||
pendingItem.timedOut = true
|
||
response.callback(new Error('timeout exceeded when trying to connect'))
|
||
}, this.options.connectionTimeoutMillis)
|
||
|
||
this._pendingQueue.push(pendingItem)
|
||
return result
|
||
}
|
||
|
||
this.newClient(new PendingItem(response.callback))
|
||
|
||
return result
|
||
}
|
||
|
||
newClient(pendingItem) {
|
||
const client = new this.Client(this.options)
|
||
this._clients.push(client)
|
||
const idleListener = makeIdleListener(this, client)
|
||
|
||
this.log('checking client timeout')
|
||
|
||
// connection timeout logic
|
||
let tid
|
||
let timeoutHit = false
|
||
if (this.options.connectionTimeoutMillis) {
|
||
tid = setTimeout(() => {
|
||
this.log('ending client due to timeout')
|
||
timeoutHit = true
|
||
// force kill the node driver, and let libpq do its teardown
|
||
client.connection ? client.connection.stream.destroy() : client.end()
|
||
}, this.options.connectionTimeoutMillis)
|
||
}
|
||
|
||
this.log('connecting new client')
|
||
client.connect((err) => {
|
||
if (tid) {
|
||
clearTimeout(tid)
|
||
}
|
||
client.on('error', idleListener)
|
||
if (err) {
|
||
this.log('client failed to connect', err)
|
||
// remove the dead client from our list of clients
|
||
this._clients = this._clients.filter((c) => c !== client)
|
||
if (timeoutHit) {
|
||
err.message = 'Connection terminated due to connection timeout'
|
||
}
|
||
|
||
// this client won’t be released, so move on immediately
|
||
this._pulseQueue()
|
||
|
||
if (!pendingItem.timedOut) {
|
||
pendingItem.callback(err, undefined, NOOP)
|
||
}
|
||
} else {
|
||
this.log('new client connected')
|
||
|
||
if (this.options.maxLifetimeSeconds !== 0) {
|
||
const maxLifetimeTimeout = setTimeout(() => {
|
||
this.log('ending client due to expired lifetime')
|
||
this._expired.add(client)
|
||
const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client)
|
||
if (idleIndex !== -1) {
|
||
this._acquireClient(
|
||
client,
|
||
new PendingItem((err, client, clientRelease) => clientRelease()),
|
||
idleListener,
|
||
false
|
||
)
|
||
}
|
||
}, this.options.maxLifetimeSeconds * 1000)
|
||
|
||
maxLifetimeTimeout.unref()
|
||
client.once('end', () => clearTimeout(maxLifetimeTimeout))
|
||
}
|
||
|
||
return this._acquireClient(client, pendingItem, idleListener, true)
|
||
}
|
||
})
|
||
}
|
||
|
||
// acquire a client for a pending work item
|
||
_acquireClient(client, pendingItem, idleListener, isNew) {
|
||
if (isNew) {
|
||
this.emit('connect', client)
|
||
}
|
||
|
||
this.emit('acquire', client)
|
||
|
||
client.release = this._releaseOnce(client, idleListener)
|
||
|
||
client.removeListener('error', idleListener)
|
||
|
||
if (!pendingItem.timedOut) {
|
||
if (isNew && this.options.verify) {
|
||
this.options.verify(client, (err) => {
|
||
if (err) {
|
||
client.release(err)
|
||
return pendingItem.callback(err, undefined, NOOP)
|
||
}
|
||
|
||
pendingItem.callback(undefined, client, client.release)
|
||
})
|
||
} else {
|
||
pendingItem.callback(undefined, client, client.release)
|
||
}
|
||
} else {
|
||
if (isNew && this.options.verify) {
|
||
this.options.verify(client, client.release)
|
||
} else {
|
||
client.release()
|
||
}
|
||
}
|
||
}
|
||
|
||
// returns a function that wraps _release and throws if called more than once
|
||
_releaseOnce(client, idleListener) {
|
||
let released = false
|
||
|
||
return (err) => {
|
||
if (released) {
|
||
throwOnDoubleRelease()
|
||
}
|
||
|
||
released = true
|
||
this._release(client, idleListener, err)
|
||
}
|
||
}
|
||
|
||
// release a client back to the poll, include an error
|
||
// to remove it from the pool
|
||
_release(client, idleListener, err) {
|
||
client.on('error', idleListener)
|
||
|
||
client._poolUseCount = (client._poolUseCount || 0) + 1
|
||
|
||
this.emit('release', err, client)
|
||
|
||
// TODO(bmc): expose a proper, public interface _queryable and _ending
|
||
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
|
||
if (client._poolUseCount >= this.options.maxUses) {
|
||
this.log('remove expended client')
|
||
}
|
||
this._remove(client)
|
||
this._pulseQueue()
|
||
return
|
||
}
|
||
|
||
const isExpired = this._expired.has(client)
|
||
if (isExpired) {
|
||
this.log('remove expired client')
|
||
this._expired.delete(client)
|
||
this._remove(client)
|
||
this._pulseQueue()
|
||
return
|
||
}
|
||
|
||
// idle timeout
|
||
let tid
|
||
if (this.options.idleTimeoutMillis) {
|
||
tid = setTimeout(() => {
|
||
this.log('remove idle client')
|
||
this._remove(client)
|
||
}, this.options.idleTimeoutMillis)
|
||
|
||
if (this.options.allowExitOnIdle) {
|
||
// allow Node to exit if this is all that's left
|
||
tid.unref()
|
||
}
|
||
}
|
||
|
||
if (this.options.allowExitOnIdle) {
|
||
client.unref()
|
||
}
|
||
|
||
this._idle.push(new IdleItem(client, idleListener, tid))
|
||
this._pulseQueue()
|
||
}
|
||
|
||
query(text, values, cb) {
|
||
// guard clause against passing a function as the first parameter
|
||
if (typeof text === 'function') {
|
||
const response = promisify(this.Promise, text)
|
||
setImmediate(function () {
|
||
return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported'))
|
||
})
|
||
return response.result
|
||
}
|
||
|
||
// allow plain text query without values
|
||
if (typeof values === 'function') {
|
||
cb = values
|
||
values = undefined
|
||
}
|
||
const response = promisify(this.Promise, cb)
|
||
cb = response.callback
|
||
|
||
this.connect((err, client) => {
|
||
if (err) {
|
||
return cb(err)
|
||
}
|
||
|
||
let clientReleased = false
|
||
const onError = (err) => {
|
||
if (clientReleased) {
|
||
return
|
||
}
|
||
clientReleased = true
|
||
client.release(err)
|
||
cb(err)
|
||
}
|
||
|
||
client.once('error', onError)
|
||
this.log('dispatching query')
|
||
try {
|
||
client.query(text, values, (err, res) => {
|
||
this.log('query dispatched')
|
||
client.removeListener('error', onError)
|
||
if (clientReleased) {
|
||
return
|
||
}
|
||
clientReleased = true
|
||
client.release(err)
|
||
if (err) {
|
||
return cb(err)
|
||
}
|
||
return cb(undefined, res)
|
||
})
|
||
} catch (err) {
|
||
client.release(err)
|
||
return cb(err)
|
||
}
|
||
})
|
||
return response.result
|
||
}
|
||
|
||
end(cb) {
|
||
this.log('ending')
|
||
if (this.ending) {
|
||
const err = new Error('Called end on pool more than once')
|
||
return cb ? cb(err) : this.Promise.reject(err)
|
||
}
|
||
this.ending = true
|
||
const promised = promisify(this.Promise, cb)
|
||
this._endCallback = promised.callback
|
||
this._pulseQueue()
|
||
return promised.result
|
||
}
|
||
|
||
get waitingCount() {
|
||
return this._pendingQueue.length
|
||
}
|
||
|
||
get idleCount() {
|
||
return this._idle.length
|
||
}
|
||
|
||
get expiredCount() {
|
||
return this._clients.reduce((acc, client) => acc + (this._expired.has(client) ? 1 : 0), 0)
|
||
}
|
||
|
||
get totalCount() {
|
||
return this._clients.length
|
||
}
|
||
}
|
||
module.exports = Pool
|