Skip to content

Commit 23c12e2

Browse files
authored
Use Connection Pool in PG Queries (#248)
1 parent e5e9562 commit 23c12e2

File tree

6 files changed

+453
-74
lines changed

6 files changed

+453
-74
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ public static void init() throws IOException {
162162

163163
Datastore mongoDatastore = DatastoreProvider.getDatastore("Mongo", config);
164164
System.out.println(mongoDatastore.listCollections());
165-
166165
postgres =
167166
new GenericContainer<>(DockerImageName.parse("postgres:13.1"))
168167
.withEnv("POSTGRES_PASSWORD", "postgres")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
package org.hypertrace.core.documentstore.postgres;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertThrows;
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
9+
import java.sql.Connection;
10+
import java.sql.PreparedStatement;
11+
import java.sql.ResultSet;
12+
import java.sql.SQLException;
13+
import java.time.Duration;
14+
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
15+
import org.hypertrace.core.documentstore.model.config.ConnectionCredentials;
16+
import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig;
17+
import org.hypertrace.core.documentstore.model.config.DatabaseType;
18+
import org.hypertrace.core.documentstore.model.config.Endpoint;
19+
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
20+
import org.junit.jupiter.api.AfterAll;
21+
import org.junit.jupiter.api.BeforeAll;
22+
import org.junit.jupiter.api.Test;
23+
import org.testcontainers.containers.GenericContainer;
24+
import org.testcontainers.containers.wait.strategy.Wait;
25+
import org.testcontainers.junit.jupiter.Testcontainers;
26+
import org.testcontainers.utility.DockerImageName;
27+
28+
@Testcontainers
29+
public class PostgresConnectionPoolIntegrationTest {
30+
31+
private static GenericContainer<?> postgres;
32+
private static String host;
33+
private static int port;
34+
35+
@BeforeAll
36+
public static void init() {
37+
postgres =
38+
new GenericContainer<>(DockerImageName.parse("postgres:13.1"))
39+
.withEnv("POSTGRES_PASSWORD", "postgres")
40+
.withEnv("POSTGRES_USER", "postgres")
41+
.withExposedPorts(5432)
42+
.waitingFor(Wait.forListeningPort());
43+
postgres.start();
44+
45+
host = postgres.getHost();
46+
port = postgres.getMappedPort(5432);
47+
}
48+
49+
@AfterAll
50+
public static void shutdown() {
51+
postgres.stop();
52+
}
53+
54+
@Test
55+
public void testGetConnection() throws SQLException {
56+
final PostgresConnectionConfig config = createTestConfig();
57+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
58+
59+
try (final Connection connection = pool.getConnection()) {
60+
assertNotNull(connection);
61+
assertTrue(connection.getAutoCommit(), "Regular connection should have autoCommit=true");
62+
assertFalse(connection.isClosed());
63+
64+
// Verify the connection works by executing a simple query
65+
try (final PreparedStatement stmt = connection.prepareStatement("SELECT 1");
66+
final ResultSet rs = stmt.executeQuery()) {
67+
assertTrue(rs.next());
68+
assertEquals(1, rs.getInt(1));
69+
}
70+
}
71+
72+
pool.close();
73+
}
74+
75+
@Test
76+
public void testGetTransactionalConnection() throws SQLException {
77+
final PostgresConnectionConfig config = createTestConfig();
78+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
79+
80+
try (final Connection connection = pool.getTransactionalConnection()) {
81+
assertNotNull(connection);
82+
assertFalse(
83+
connection.getAutoCommit(), "Transactional connection should have autoCommit=false");
84+
assertFalse(connection.isClosed());
85+
86+
// Verify the connection works by executing a simple query
87+
try (final PreparedStatement stmt = connection.prepareStatement("SELECT 2");
88+
final ResultSet rs = stmt.executeQuery()) {
89+
assertTrue(rs.next());
90+
assertEquals(2, rs.getInt(1));
91+
}
92+
93+
// Verify we can commit manually
94+
connection.commit();
95+
}
96+
97+
pool.close();
98+
}
99+
100+
@Test
101+
public void testBothPoolsIndependent() throws SQLException {
102+
final PostgresConnectionConfig config = createTestConfig();
103+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
104+
105+
// Get connections from both pools simultaneously
106+
try (final Connection regularConn = pool.getConnection();
107+
final Connection transactionalConn = pool.getTransactionalConnection()) {
108+
109+
assertNotNull(regularConn);
110+
assertNotNull(transactionalConn);
111+
112+
// Verify they have different autoCommit settings
113+
assertTrue(regularConn.getAutoCommit());
114+
assertFalse(transactionalConn.getAutoCommit());
115+
116+
// Both should work independently
117+
try (final PreparedStatement stmt1 = regularConn.prepareStatement("SELECT 'regular'");
118+
final ResultSet rs1 = stmt1.executeQuery()) {
119+
assertTrue(rs1.next());
120+
assertEquals("regular", rs1.getString(1));
121+
}
122+
123+
try (final PreparedStatement stmt2 =
124+
transactionalConn.prepareStatement("SELECT 'transactional'");
125+
final ResultSet rs2 = stmt2.executeQuery()) {
126+
assertTrue(rs2.next());
127+
assertEquals("transactional", rs2.getString(1));
128+
}
129+
130+
transactionalConn.commit();
131+
}
132+
133+
pool.close();
134+
}
135+
136+
@Test
137+
public void testConnectionPooling() throws SQLException {
138+
final PostgresConnectionConfig config = createTestConfig();
139+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
140+
141+
// Get and release connections multiple times
142+
Connection conn1 = pool.getConnection();
143+
assertNotNull(conn1);
144+
conn1.close();
145+
146+
Connection conn2 = pool.getConnection();
147+
assertNotNull(conn2);
148+
conn2.close();
149+
150+
// Verify pooling is working by getting multiple connections from transactional pool
151+
Connection tConn1 = pool.getTransactionalConnection();
152+
assertNotNull(tConn1);
153+
tConn1.close();
154+
155+
Connection tConn2 = pool.getTransactionalConnection();
156+
assertNotNull(tConn2);
157+
tConn2.close();
158+
159+
pool.close();
160+
}
161+
162+
@Test
163+
public void testTransactionalCommitAndRollback() throws SQLException {
164+
final PostgresConnectionConfig config = createTestConfig();
165+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
166+
167+
// Create a test table
168+
try (final Connection setupConn = pool.getTransactionalConnection()) {
169+
try (final PreparedStatement stmt =
170+
setupConn.prepareStatement(
171+
"CREATE TABLE IF NOT EXISTS test_table (id INT PRIMARY KEY, value TEXT)")) {
172+
stmt.execute();
173+
}
174+
setupConn.commit();
175+
}
176+
177+
// Test commit
178+
try (final Connection conn = pool.getTransactionalConnection()) {
179+
try (final PreparedStatement stmt =
180+
conn.prepareStatement("INSERT INTO test_table (id, value) VALUES (1, 'test')")) {
181+
stmt.execute();
182+
}
183+
conn.commit();
184+
185+
// Verify data was committed
186+
try (final PreparedStatement stmt =
187+
conn.prepareStatement("SELECT value FROM test_table WHERE id = 1");
188+
final ResultSet rs = stmt.executeQuery()) {
189+
assertTrue(rs.next());
190+
assertEquals("test", rs.getString(1));
191+
}
192+
}
193+
194+
// Test rollback
195+
try (final Connection conn = pool.getTransactionalConnection()) {
196+
try (final PreparedStatement stmt =
197+
conn.prepareStatement("INSERT INTO test_table (id, value) VALUES (2, 'rollback_me')")) {
198+
stmt.execute();
199+
}
200+
conn.rollback();
201+
202+
// Verify data was not committed
203+
try (final PreparedStatement stmt =
204+
conn.prepareStatement("SELECT value FROM test_table WHERE id = 2");
205+
final ResultSet rs = stmt.executeQuery()) {
206+
assertFalse(rs.next(), "Data should have been rolled back");
207+
}
208+
}
209+
210+
// Cleanup
211+
try (final Connection cleanupConn = pool.getTransactionalConnection()) {
212+
try (final PreparedStatement stmt =
213+
cleanupConn.prepareStatement("DROP TABLE IF EXISTS test_table")) {
214+
stmt.execute();
215+
}
216+
cleanupConn.commit();
217+
}
218+
219+
pool.close();
220+
}
221+
222+
@Test
223+
public void testClose() throws SQLException {
224+
final PostgresConnectionConfig config = createTestConfig();
225+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
226+
227+
// Get connections to ensure pools are active
228+
final Connection regularConnection = pool.getConnection();
229+
final Connection transactionalConnection = pool.getTransactionalConnection();
230+
231+
assertNotNull(regularConnection);
232+
assertNotNull(transactionalConnection);
233+
234+
// Close the connections back to the pool
235+
regularConnection.close();
236+
transactionalConnection.close();
237+
238+
// Close the pool - should not throw
239+
pool.close();
240+
241+
// After closing the pool, trying to get connections should fail with IllegalStateException
242+
assertThrows(IllegalStateException.class, pool::getConnection);
243+
assertThrows(IllegalStateException.class, pool::getTransactionalConnection);
244+
}
245+
246+
@Test
247+
public void testCloseIdempotent() throws SQLException {
248+
final PostgresConnectionConfig config = createTestConfig();
249+
final PostgresConnectionPool pool = new PostgresConnectionPool(config);
250+
251+
// First close
252+
pool.close();
253+
254+
// Second close should not throw
255+
pool.close();
256+
}
257+
258+
private static PostgresConnectionConfig createTestConfig() {
259+
return (PostgresConnectionConfig)
260+
ConnectionConfig.builder()
261+
.type(DatabaseType.POSTGRES)
262+
.addEndpoint(Endpoint.builder().host(host).port(port).build())
263+
.database("postgres")
264+
.credentials(
265+
ConnectionCredentials.builder().username("postgres").password("postgres").build())
266+
.connectionPoolConfig(
267+
ConnectionPoolConfig.builder()
268+
.maxConnections(5)
269+
.connectionAccessTimeout(Duration.ofSeconds(10))
270+
.connectionSurrenderTimeout(Duration.ofSeconds(30))
271+
.build())
272+
.build();
273+
}
274+
}

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public PostgresClient(final PostgresConnectionConfig config) {
3333
this.connectionPool = new PostgresConnectionPool(connectionConfig);
3434
}
3535

36+
// todo: Deprecate this method. All connections should be obtained from the connection pool.
3637
public synchronized Connection getConnection() {
3738
try {
3839
if (connection == null) {
@@ -48,10 +49,22 @@ public synchronized Connection getConnection() {
4849
return connection;
4950
}
5051

52+
/**
53+
* Get a pooled connection with autoCommit=true. Use for read queries that don't need manual
54+
* transaction management.
55+
*/
5156
public Connection getPooledConnection() throws SQLException {
5257
return connectionPool.getConnection();
5358
}
5459

60+
/**
61+
* Get a pooled connection with autoCommit=false. Use for operations that require manual
62+
* transaction management (commit/rollback).
63+
*/
64+
public Connection getTransactionalConnection() throws SQLException {
65+
return connectionPool.getTransactionalConnection();
66+
}
67+
5568
public Map<String, String> getCustomParameters() {
5669
return connectionConfig.customParameters();
5770
}

0 commit comments

Comments
 (0)