Skip to content
Closed
26 changes: 21 additions & 5 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class Pool extends EventEmitter {
return this._clients.length >= this.options.max
}

_pulseQueue() {
_pulseQueue(name) {
this.log('pulse queue')
if (this.ended) {
this.log('pulse queue ended')
Expand Down Expand Up @@ -142,7 +142,17 @@ class Pool extends EventEmitter {
}
const pendingItem = this._pendingQueue.shift()
if (this._idle.length) {
const idleItem = this._idle.pop()
let idleItem = null
if (name) {
// find if there is a free client that already has cached prepared statements
const index = this._idle.findIndex((item) => item.client.connection.parsedStatements[name])
if (index > -1) {
idleItem = this._idle.splice(index, 1)[0]
}
}
if (!idleItem) {
idleItem = this._idle.pop()
}
clearTimeout(idleItem.timeoutId)
const client = idleItem.client
client.ref && client.ref()
Expand All @@ -168,7 +178,13 @@ class Pool extends EventEmitter {
this.emit('remove', client)
}

connect(cb) {
connect(cb, name) {
// guard clause against passing a name as the first parameter
if (typeof cb === 'string') {
name = cb
cb = undefined
}

if (this.ending) {
const err = new Error('Cannot use a pool after calling end on the pool')
return cb ? cb(err) : this.Promise.reject(err)
Expand All @@ -181,7 +197,7 @@ class Pool extends EventEmitter {
if (this._isFull() || this._idle.length) {
// if we have idle clients schedule a pulse immediately
if (this._idle.length) {
process.nextTick(() => this._pulseQueue())
process.nextTick(() => this._pulseQueue(name))
}

if (!this.options.connectionTimeoutMillis) {
Expand Down Expand Up @@ -431,7 +447,7 @@ class Pool extends EventEmitter {
client.release(err)
return cb(err)
}
})
}, text?.name)
return response.result
}

Expand Down
56 changes: 56 additions & 0 deletions packages/pg-pool/test/prioritizing-prepared-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const expect = require('expect.js')
const co = require('co')

const describe = require('mocha').describe
const it = require('mocha').it

const Pool = require('../')

describe('prioritizing prepared client', () => {
it(
'can create a single client with prepared statment and reuse it',
co.wrap(function* () {
const pool = new Pool({ max: 2 })
expect(pool.waitingCount).to.equal(0)
expect(pool._idle.length).to.equal(0)

let res, firstClient, secondClient, firstPid, secondPid

// force the creation of two client and release.
// In this way we have two idle client
firstClient = yield pool.connect()
expect(pool._clients.length).to.equal(1)
secondClient = yield pool.connect()
expect(pool._clients.length).to.equal(2)
firstClient.release()
secondClient.release()
expect(pool._idle.length).to.equal(2)

// check the same client with prepared query

res = yield pool.query({ text: 'SELECT $1::text as name, pg_backend_pid() as pid', values: ['hi'], name: 'foo' })
expect(res.rows[0].name).to.equal('hi')
expect(pool._idle.length).to.equal(2)
firstPid = res.rows[0].pid

res = yield pool.query({ text: 'SELECT $1::text as name, pg_backend_pid() as pid', values: ['ho'], name: 'foo' })
expect(res.rows[0].name).to.equal('ho')
expect(pool._idle.length).to.equal(2)
secondPid = res.rows[0].pid

expect(firstPid).to.equal(secondPid)

// check also connect with name, return same client

firstClient = yield pool.connect('foo')
expect(pool._idle.length).to.equal(1)
res = yield firstClient.query({ text: 'SELECT $1::text as name, pg_backend_pid() as pid', values: ['hi'] })
expect(res.rows[0].name).to.equal('hi')
expect(firstPid).to.equal(res.rows[0].pid)
firstClient.release()
expect(pool._idle.length).to.equal(2)

pool.end()
})
)
})