Skip to content

Commit 3c4e09f

Browse files
authored
Merge pull request #5225 from sysown/v3.0_refactor_prepared_statement_cache_design_5211
Refactored Prepared-Statement Cache Design (Lock-Free Hot Path) - Part 2
2 parents 5d2d26d + 9c0e14a commit 3c4e09f

23 files changed

+1231
-949
lines changed

deps/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a:
306306
cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch
307307
cd postgresql/postgresql && patch -p0 < ../fmt_err_msg.patch
308308
cd postgresql/postgresql && patch -p0 < ../bind_fmt_text.patch
309+
cd postgresql/postgresql && patch -p0 < ../pqsendpipelinesync.patch
309310
#cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0"
310311
cd postgresql/postgresql && LD_LIBRARY_PATH="$(SSL_LDIR)" ./configure --with-ssl=openssl --with-includes="$(SSL_IDIR)" --with-libraries="$(SSL_LDIR)" --without-readline
311312
cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
diff --git src/interfaces/libpq/fe-exec.c src/interfaces/libpq/fe-exec.c
2+
index b833e76..51ad8d8 100644
3+
--- src/interfaces/libpq/fe-exec.c
4+
+++ src/interfaces/libpq/fe-exec.c
5+
@@ -4558,3 +4558,65 @@ int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) {
6+
return psHandleRowData(conn, is_first_packet, result);
7+
}
8+
9+
+int
10+
+PQsendPipelineSync(PGconn *conn)
11+
+{
12+
+ PGcmdQueueEntry *entry;
13+
+
14+
+ if (!conn)
15+
+ return 0;
16+
+
17+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
18+
+ {
19+
+ libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode");
20+
+ return 0;
21+
+ }
22+
+
23+
+ switch (conn->asyncStatus)
24+
+ {
25+
+ case PGASYNC_COPY_IN:
26+
+ case PGASYNC_COPY_OUT:
27+
+ case PGASYNC_COPY_BOTH:
28+
+ /* should be unreachable */
29+
+ appendPQExpBufferStr(&conn->errorMessage,
30+
+ "internal error: cannot send pipeline while in COPY\n");
31+
+ return 0;
32+
+ case PGASYNC_READY:
33+
+ case PGASYNC_READY_MORE:
34+
+ case PGASYNC_BUSY:
35+
+ case PGASYNC_IDLE:
36+
+ case PGASYNC_PIPELINE_IDLE:
37+
+ /* OK to send sync */
38+
+ break;
39+
+ }
40+
+
41+
+ entry = pqAllocCmdQueueEntry(conn);
42+
+ if (entry == NULL)
43+
+ return 0; /* error msg already set */
44+
+
45+
+ entry->queryclass = PGQUERY_SYNC;
46+
+ entry->query = NULL;
47+
+
48+
+ /* construct the Sync message */
49+
+ if (pqPutMsgStart('S', conn) < 0 ||
50+
+ pqPutMsgEnd(conn) < 0)
51+
+ goto sendFailed;
52+
+
53+
+ /*
54+
+ * Give the data a push (in pipeline mode, only if we're past the size
55+
+ * threshold). In nonblock mode, don't complain if we're unable to send
56+
+ * it all; PQgetResult() will do any additional flushing needed.
57+
+ */
58+
+ if (pqPipelineFlush(conn) < 0)
59+
+ goto sendFailed;
60+
+
61+
+ /* OK, it's launched! */
62+
+ pqAppendCmdQueueEntry(conn, entry);
63+
+
64+
+ return 1;
65+
+
66+
+sendFailed:
67+
+ pqRecycleCmdQueueEntry(conn, entry);
68+
+ /* error message should be set up already */
69+
+ return 0;
70+
+}
71+
diff --git src/interfaces/libpq/libpq-fe.h src/interfaces/libpq/libpq-fe.h
72+
index 47f25e0..b769b64 100644
73+
--- src/interfaces/libpq/libpq-fe.h
74+
+++ src/interfaces/libpq/libpq-fe.h
75+
@@ -688,6 +688,9 @@ extern const PGresult *PQgetResultFromPGconn(PGconn *conn);
76+
/* ProxySQL special handler function */
77+
extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result);
78+
79+
+/* Send a pipeline sync message without flushing the send buffer */
80+
+extern int PQsendPipelineSync(PGconn *conn);
81+
+
82+
#ifdef __cplusplus
83+
}
84+
#endif

include/PgSQL_Connection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
class PgSQL_SrvC;
1414
class PgSQL_Query_Result;
15-
class PgSQL_STMTs_local_v14;
15+
class PgSQL_STMT_Local;
1616
//class PgSQL_Describe_Prepared_Info;
1717
class PgSQL_Bind_Info;
1818
//#define STATUS_PGSQL_CONNECTION_SEQUENCE 0x00000001
@@ -639,7 +639,7 @@ class PgSQL_Connection {
639639
bool exit_pipeline_mode; // true if it is safe to exit pipeline mode
640640
bool resync_failed; // true if the last resync attempt failed
641641

642-
PgSQL_STMTs_local_v14* local_stmts;
642+
PgSQL_STMT_Local* local_stmts;
643643
PgSQL_SrvC *parent;
644644
PgSQL_Connection_userinfo* userinfo;
645645
PgSQL_Data_Stream* myds;

include/PgSQL_ExplicitTxnStateMgr.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ struct TxnCmd {
5656
*/
5757
class PgSQL_TxnCmdParser {
5858
public:
59+
PgSQL_TxnCmdParser() noexcept { tokens.reserve(16); }
60+
~PgSQL_TxnCmdParser() noexcept = default;
61+
5962
TxnCmd parse(std::string_view input, bool in_transaction_mode) noexcept;
6063

6164
private:
@@ -67,14 +70,20 @@ class PgSQL_TxnCmdParser {
6770
TxnCmd parse_start(size_t& pos) noexcept;
6871

6972
// Helpers
70-
static std::string to_lower(std::string_view s) noexcept {
71-
std::string s_copy(s);
72-
std::transform(s_copy.begin(), s_copy.end(), s_copy.begin(), ::tolower);
73-
return s_copy;
73+
inline static bool iequals(std::string_view a, std::string_view b) noexcept {
74+
if (a.size() != b.size()) return false;
75+
for (size_t i = 0; i < a.size(); ++i) {
76+
char ca = a[i];
77+
char cb = b[i];
78+
if (ca >= 'A' && ca <= 'Z') ca += 32;
79+
if (cb >= 'A' && cb <= 'Z') cb += 32;
80+
if (ca != cb) return false;
81+
}
82+
return true;
7483
}
7584

7685
inline static bool contains(std::vector<std::string_view>&& list, std::string_view value) noexcept {
77-
for (const auto& item : list) if (item == value) return true;
86+
for (const auto& item : list) if (iequals(item, value)) return true;
7887
return false;
7988
}
8089
};

0 commit comments

Comments
 (0)