@@ -10,6 +10,8 @@ import PostgresNIO
1010public actor PostgresConnectionPool {
1111
1212 private static let postgresMaxNameLength : Int = 32 // PostgreSQL allows 64 but we add some extra info
13+ private static let healthCheckInterval : TimeInterval = 5.0
14+ private static let idleConnectionsCheckInterval : TimeInterval = 60.0
1315
1416 private let logger : Logger
1517 private let eventLoopGroup : EventLoopGroup
@@ -18,7 +20,8 @@ public actor PostgresConnectionPool {
1820 private let connectionName : String
1921 private let poolName : String
2022 private let poolSize : Int
21- private let queryTimeout : TimeInterval
23+ private let maxIdleConnections : Int ?
24+ private let queryTimeout : TimeInterval ?
2225
2326 private let onOpenConnection : ( ( PostgresConnection , Logger ) async throws -> Void ) ?
2427 private let onReturnConnection : ( ( PostgresConnection , Logger ) async throws -> Void ) ?
@@ -27,6 +30,7 @@ public actor PostgresConnectionPool {
2730 private var connections : [ PoolConnection ] = [ ]
2831 private var available : Deque < PoolConnection > = [ ]
2932 private var continuations : Deque < PoolContinuation > = [ ]
33+ private var inUseConnectionCounts : Deque < Int > = [ ]
3034
3135 private var didStartWatcherTask = false
3236 private var didShutdown = false
@@ -46,6 +50,7 @@ public actor PostgresConnectionPool {
4650 self . connectionName = String ( configuration. applicationName. replacingPattern ( " [^- \\ w \\ d \\ s()] " , with: " " ) . prefix ( PostgresConnectionPool . postgresMaxNameLength) )
4751 self . poolName = " \( configuration. connection. username) @ \( configuration. connection. host) : \( configuration. connection. port) / \( configuration. connection. database) "
4852 self . poolSize = configuration. poolSize
53+ self . maxIdleConnections = configuration. maxIdleConnections
4954 self . queryTimeout = configuration. queryTimeout
5055
5156 self . onOpenConnection = configuration. onOpenConnection
@@ -63,9 +68,9 @@ public actor PostgresConnectionPool {
6368 self . postgresConfiguration = postgresConfiguration
6469 }
6570
66- deinit {
67- assert ( didShutdown, " Must call destroy () before releasing a PostgresConnectionPool " )
68- }
71+ // deinit {
72+ // assert(didShutdown, "Must call shutdown () before releasing a PostgresConnectionPool")
73+ // }
6974
7075 /// Takes one connection from the pool and dishes it out to the caller.
7176 @discardableResult
@@ -127,6 +132,9 @@ public actor PostgresConnectionPool {
127132 connection. state = . available
128133 available. append ( connection)
129134 }
135+ else {
136+ assert ( available. contains ( connection) )
137+ }
130138
131139 Task . detached { [ weak self] in
132140 await self ? . handleNextContinuation ( )
@@ -138,47 +146,57 @@ public actor PostgresConnectionPool {
138146 ///
139147 /// Must be done here since Swift doesn't yet allow async deinit.
140148 public func shutdown( ) async {
141- logger. debug ( " [ \( poolName) ] destroy () " )
149+ logger. debug ( " [ \( poolName) ] shutdown () " )
142150
143151 didShutdown = true
144152
145153 // Cancel all waiting continuations
146154 for poolContinuation in continuations {
147155 poolContinuation. continuation. resume ( throwing: PoolError . cancelled)
148156 }
157+ continuations. removeAll ( )
158+
159+ available. removeAll ( )
149160
150161 // Close all open connections
151162 connections. forEach ( { $0. state = . closed } )
152163 for poolConnection in connections {
153- if let connection = poolConnection. connection {
154- if let onCloseConnection = onCloseConnection {
155- do {
156- try await onCloseConnection ( connection, logger)
157- }
158- catch {
159- logger. warning ( " [ \( poolName) ] onCloseConnection error: \( error) " )
160- }
161- }
162-
163- do {
164- try await connection. close ( )
165- }
166- catch {
167- logger. warning ( " [ \( poolName) ] connection.close() error: \( error) " )
168- }
169- }
164+ await closeConnection ( poolConnection)
170165 }
166+ connections. removeAll ( )
171167
172168 // Shut down the event loop.
173169 try ? await eventLoopGroup. shutdownGracefully ( )
174170 }
175171
176172 // MARK: - Private
177173
174+ private func closeConnection( _ poolConnection: PoolConnection ) async {
175+ poolConnection. state = . closed
176+
177+ guard let connection = poolConnection. connection else { return }
178+
179+ if let onCloseConnection = onCloseConnection {
180+ do {
181+ try await onCloseConnection ( connection, logger)
182+ }
183+ catch {
184+ logger. warning ( " [ \( poolName) ] onCloseConnection error: \( error) " )
185+ }
186+ }
187+
188+ do {
189+ try await connection. close ( )
190+ }
191+ catch {
192+ logger. warning ( " [ \( poolName) ] connection.close() error: \( error) " )
193+ }
194+ }
195+
178196 private func checkConnections( ) async {
179197 defer {
180198 Task . after (
181- seconds: 5.0 ,
199+ seconds: PostgresConnectionPool . healthCheckInterval ,
182200 priority: . low,
183201 operation: { [ weak self] in
184202 await self ? . checkConnections ( )
@@ -192,6 +210,8 @@ public actor PostgresConnectionPool {
192210
193211 // TODO: Kill self if too many stuck connections
194212
213+ await closeIdleConnections ( )
214+
195215 let usageCounter = connections. reduce ( 0 ) { $0 + $1. usageCounter }
196216 logger. debug ( " [ \( poolName) ] \( connections. count) connections ( \( available. count) available, \( usageCounter) queries), \( continuations. count) continuations left " )
197217
@@ -205,6 +225,35 @@ public actor PostgresConnectionPool {
205225 }
206226 }
207227
228+ // TODO: This doesn't work well with short bursts of activity that fall between the 5 seconds check interval
229+ private func closeIdleConnections( ) async {
230+ guard let maxIdleConnections else { return }
231+
232+ // 60 seconds
233+ let minArrayLength = Int ( PostgresConnectionPool . idleConnectionsCheckInterval / PostgresConnectionPool. healthCheckInterval)
234+ assert ( minArrayLength >= 1 , " idleConnectionsCheckInterval must be higher than healthCheckInterval " )
235+ if inUseConnectionCounts. count > minArrayLength {
236+ inUseConnectionCounts. removeFirst ( )
237+ }
238+ inUseConnectionCounts. append ( connections. count - available. count)
239+
240+ guard continuations. isEmpty,
241+ inUseConnectionCounts. count >= minArrayLength,
242+ let maxInUse = inUseConnectionCounts. max ( )
243+ else { return }
244+
245+ let toClose = ( available. count - maxIdleConnections) - maxInUse
246+ guard toClose > 0 else { return }
247+
248+ logger. debug ( " [ \( poolName) ] Closing \( toClose) idle connections " )
249+
250+ for _ in 1 ... toClose {
251+ guard let poolConnection = available. popFirst ( ) else { break }
252+
253+ await closeConnection ( poolConnection)
254+ }
255+ }
256+
208257 private func handleNextContinuation( ) async {
209258 guard continuations. isNotEmpty else {
210259 logger. debug ( " [ \( poolName) ] No more continuations left, \( connections. count) connections, \( available. count) available " )
@@ -237,39 +286,25 @@ public actor PostgresConnectionPool {
237286 catch {
238287 logger. warning ( " [ \( poolName) ] Health check for connection \( poolConnection. id) failed " )
239288
240- poolConnection. state = . closed
241-
242- if let connection = poolConnection. connection {
243- if let onCloseConnection = onCloseConnection {
244- do {
245- try await onCloseConnection ( connection, logger)
246- }
247- catch {
248- logger. warning ( " [ \( poolName) ] onCloseConnection error: \( error) " )
249- }
250- }
251-
252- do {
253- try await connection. close ( )
254- }
255- catch {
256- logger. warning ( " [ \( poolName) ] connection.close() error: \( error) " )
257- }
258- }
289+ await closeConnection ( poolConnection)
259290 }
260291 }
261292 else {
262- poolConnection . state = . closed
293+ await closeConnection ( poolConnection )
263294 }
264295 }
265296 }
266297
298+ private func nameForConnection( id: Int ) -> String {
299+ " \( connectionName) - CONN: \( id) "
300+ }
301+
267302 private func openConnection( ) async {
268303 if !didStartWatcherTask {
269304 didStartWatcherTask = true
270305
271306 Task . after (
272- seconds: 5.0 ,
307+ seconds: PostgresConnectionPool . healthCheckInterval ,
273308 priority: . low,
274309 operation: { [ weak self] in
275310 await self ? . checkConnections ( )
@@ -305,8 +340,11 @@ public actor PostgresConnectionPool {
305340 logger. debug ( " [ \( poolName) ] Connection \( poolConnection. id) established in \( connectionRuntime. rounded ( toPlaces: 2 ) ) s " )
306341
307342 do {
308- try await connection. query ( PostgresQuery ( stringLiteral: " SET application_name=' \( connectionName) - CONN: \( poolConnection. id) ' " ) , logger: logger)
309- try await connection. query ( PostgresQuery ( stringLiteral: " SET statement_timeout= \( Int ( queryTimeout * 1000 ) ) " ) , logger: logger)
343+ try await connection. query ( PostgresQuery ( stringLiteral: " SET application_name=' \( nameForConnection ( id: poolConnection. id) ) ' " ) , logger: logger)
344+
345+ if let queryTimeout {
346+ try await connection. query ( PostgresQuery ( stringLiteral: " SET statement_timeout= \( Int ( queryTimeout * 1000 ) ) " ) , logger: logger)
347+ }
310348
311349 if let onOpenConnection = onOpenConnection {
312350 try await onOpenConnection ( connection, logger)
0 commit comments