From 9fa3d75fb365d48db4836f9e4d893d35a96a3525 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 29 Oct 2025 16:40:06 +0500 Subject: [PATCH 1/7] Backport PQsendPipelineSync from PostgreSQL 17 and update code to use it - Backport PQsendPipelineSync to PostgreSQL 16.3, enabling pipeline synchronization without flushing the send buffer. - Replace calls to PQPipelineSync in code with PQsendPipelineSync to use the new functionality. --- deps/Makefile | 1 + deps/postgresql/pqsendpipelinesync.patch | 84 ++++++++++++++++++++++++ lib/PgSQL_Connection.cpp | 15 ++--- 3 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 deps/postgresql/pqsendpipelinesync.patch diff --git a/deps/Makefile b/deps/Makefile index 59440ec1f2..25627777c4 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -306,6 +306,7 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a: cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch cd postgresql/postgresql && patch -p0 < ../fmt_err_msg.patch cd postgresql/postgresql && patch -p0 < ../bind_fmt_text.patch + cd postgresql/postgresql && patch -p0 < ../pqsendpipelinesync.patch #cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0" cd postgresql/postgresql && LD_LIBRARY_PATH="$(SSL_LDIR)" ./configure --with-ssl=openssl --with-includes="$(SSL_IDIR)" --with-libraries="$(SSL_LDIR)" --without-readline cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0 diff --git a/deps/postgresql/pqsendpipelinesync.patch b/deps/postgresql/pqsendpipelinesync.patch new file mode 100644 index 0000000000..f5eff66245 --- /dev/null +++ b/deps/postgresql/pqsendpipelinesync.patch @@ -0,0 +1,84 @@ +diff --git src/interfaces/libpq/fe-exec.c src/interfaces/libpq/fe-exec.c +index b833e76..51ad8d8 100644 +--- src/interfaces/libpq/fe-exec.c ++++ src/interfaces/libpq/fe-exec.c +@@ -4558,3 +4558,65 @@ int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) { + return psHandleRowData(conn, is_first_packet, result); + } + ++int ++PQsendPipelineSync(PGconn *conn) ++{ ++ PGcmdQueueEntry *entry; ++ ++ if (!conn) ++ return 0; ++ ++ if (conn->pipelineStatus == PQ_PIPELINE_OFF) ++ { ++ libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode"); ++ return 0; ++ } ++ ++ switch (conn->asyncStatus) ++ { ++ case PGASYNC_COPY_IN: ++ case PGASYNC_COPY_OUT: ++ case PGASYNC_COPY_BOTH: ++ /* should be unreachable */ ++ appendPQExpBufferStr(&conn->errorMessage, ++ "internal error: cannot send pipeline while in COPY\n"); ++ return 0; ++ case PGASYNC_READY: ++ case PGASYNC_READY_MORE: ++ case PGASYNC_BUSY: ++ case PGASYNC_IDLE: ++ case PGASYNC_PIPELINE_IDLE: ++ /* OK to send sync */ ++ break; ++ } ++ ++ entry = pqAllocCmdQueueEntry(conn); ++ if (entry == NULL) ++ return 0; /* error msg already set */ ++ ++ entry->queryclass = PGQUERY_SYNC; ++ entry->query = NULL; ++ ++ /* construct the Sync message */ ++ if (pqPutMsgStart('S', conn) < 0 || ++ pqPutMsgEnd(conn) < 0) ++ goto sendFailed; ++ ++ /* ++ * Give the data a push (in pipeline mode, only if we're past the size ++ * threshold). In nonblock mode, don't complain if we're unable to send ++ * it all; PQgetResult() will do any additional flushing needed. ++ */ ++ if (pqPipelineFlush(conn) < 0) ++ goto sendFailed; ++ ++ /* OK, it's launched! */ ++ pqAppendCmdQueueEntry(conn, entry); ++ ++ return 1; ++ ++sendFailed: ++ pqRecycleCmdQueueEntry(conn, entry); ++ /* error message should be set up already */ ++ return 0; ++} +diff --git src/interfaces/libpq/libpq-fe.h src/interfaces/libpq/libpq-fe.h +index 47f25e0..b769b64 100644 +--- src/interfaces/libpq/libpq-fe.h ++++ src/interfaces/libpq/libpq-fe.h +@@ -688,6 +688,9 @@ extern const PGresult *PQgetResultFromPGconn(PGconn *conn); + /* ProxySQL special handler function */ + extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result); + ++/* Send a pipeline sync message without flushing the send buffer */ ++extern int PQsendPipelineSync(PGconn *conn); ++ + #ifdef __cplusplus + } + #endif diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 2f7b9953b5..a77da0f74a 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1663,8 +1663,7 @@ void PgSQL_Connection::stmt_prepare_start() { return; } } else { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; @@ -1730,8 +1729,7 @@ void PgSQL_Connection::stmt_describe_start() { return; } } else { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; @@ -1755,8 +1753,7 @@ void PgSQL_Connection::resync_start() { PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { proxy_error("Failed to send pipeline sync.\n"); resync_failed = true; return; @@ -1878,8 +1875,7 @@ void PgSQL_Connection::stmt_execute_start() { return; } } else { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; @@ -1905,8 +1901,7 @@ void PgSQL_Connection::reset_session_start() { reset_session_in_pipeline = is_pipeline_active(); if (reset_session_in_pipeline) { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; From 9eb934e7f0442c77d417eb6fd81328ceed3743d4 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 30 Oct 2025 00:43:09 +0500 Subject: [PATCH 2/7] Buffer response until Extended Query frame completes; send early only if resultset threshold is reached. --- lib/PgSQL_Session.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 773358cb02..f679b12181 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -2651,6 +2651,7 @@ int PgSQL_Session::handler() { #endif // ENABLE_TIMER int handler_ret = 0; bool wrong_pass = false; + bool in_pending_state = false; if (to_process == 0) return 0; // this should be redundant if the called does the same check proxy_debug(PROXY_DEBUG_NET, 1, "Thread=%p, Session=%p -- Processing session %p\n", this->thread, this, this); //unsigned int j; @@ -3125,6 +3126,8 @@ int PgSQL_Session::handler() { case 1: if (myconn->query_result && myconn->query_result->get_resultset_size() > (unsigned int)pgsql_thread___threshold_resultset_size) { myconn->query_result->get_resultset(client_myds->PSarrayOUT); + } else { + in_pending_state = true; } break; // rc==2 : a multi-resultset (or multi statement) was detected, and the current statement is completed @@ -3217,7 +3220,8 @@ int PgSQL_Session::handler() { } } - writeout(); + if (!in_pending_state) + writeout(); if (wrong_pass == true) { client_myds->array2buffer_full(); From 61ba18246522047488a7ce69d1cfff4294c50ee2 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 30 Oct 2025 01:14:20 +0500 Subject: [PATCH 3/7] Introduce inline functions for efficient ASCII whitespace detection and uint32-to-string conversion --- include/gen_utils.h | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/include/gen_utils.h b/include/gen_utils.h index 12e498a73a..904a4a2764 100644 --- a/include/gen_utils.h +++ b/include/gen_utils.h @@ -394,4 +394,33 @@ std::unique_ptr get_SQLite3_resulset(MYSQL_RES* resultset); std::vector split_string(const std::string& str, char delimiter); +inline constexpr bool fast_isspace(unsigned char c) noexcept +{ + // Matches: '\t' (0x09) through '\r' (0x0D), and ' ' (0x20) + // That is: '\t', '\n', '\v', '\f', '\r', ' ' + // + // (c - '\t') < 5 -> true for 0x09-0x0D inclusive + // (c == ' ') -> true for space + // + // Use bitwise OR `|` (not logical `||`) to keep it branchless. + return (c == ' ') | (static_cast(c - '\t') < 5); +} + +inline constexpr char* fast_uint32toa(uint32_t value, char* out) { + char* p = out; + do { + *p++ = '0' + (value % 10); + value /= 10; + } while (value); + *p = '\0'; + char* start = out; + char* end = p - 1; + while (start < end) { + char t = *start; + *start++ = *end; + *end-- = t; + } + return p; +} + #endif /* __GEN_FUNCTIONS */ \ No newline at end of file From 7a3a5c71dfdec71ca172bd13071550d007d86cff Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 30 Oct 2025 01:23:41 +0500 Subject: [PATCH 4/7] Optimize hot path: replace std::string with char[] to avoid heap --- include/gen_utils.h | 4 ++-- lib/PgSQL_Session.cpp | 19 ++++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/include/gen_utils.h b/include/gen_utils.h index 904a4a2764..b5c440c7f1 100644 --- a/include/gen_utils.h +++ b/include/gen_utils.h @@ -406,7 +406,7 @@ inline constexpr bool fast_isspace(unsigned char c) noexcept return (c == ' ') | (static_cast(c - '\t') < 5); } -inline constexpr char* fast_uint32toa(uint32_t value, char* out) { +inline constexpr char* fast_uint32toa(uint32_t value, char* out) noexcept { char* p = out; do { *p++ = '0' + (value % 10); @@ -423,4 +423,4 @@ inline constexpr char* fast_uint32toa(uint32_t value, char* out) { return p; } -#endif /* __GEN_FUNCTIONS */ \ No newline at end of file +#endif /* __GEN_FUNCTIONS */ diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index f679b12181..73dfda60ee 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -2582,6 +2582,13 @@ void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* my } } +inline void build_backend_stmt_name(char* buf, unsigned int stmt_backend_id) { + char* p = buf; + const char* prefix = PROXYSQL_PS_PREFIX; + while (*prefix) *p++ = *prefix++; + p = fast_uint32toa(stmt_backend_id, p); +} + // this function was inline int PgSQL_Session::RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) { PROXY_TRACE2(); @@ -2599,9 +2606,10 @@ int PgSQL_Session::RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) { this, myconn, myconn->pgsql_conn, backend_stmt_id); } // this is used to generate the name of the prepared statement in the backend - const std::string& backend_stmt_name = std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id); + char backend_stmt_name[32]; + build_backend_stmt_name(backend_stmt_name, CurrentQuery.extended_query_info.stmt_backend_id); rc = myconn->async_query(myds->revents, (char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - backend_stmt_name.c_str(), PGSQL_EXTENDED_QUERY_TYPE_PARSE, &CurrentQuery.extended_query_info); + backend_stmt_name, PGSQL_EXTENDED_QUERY_TYPE_PARSE, &CurrentQuery.extended_query_info); } break; case PROCESSING_STMT_DESCRIBE: @@ -2610,9 +2618,10 @@ int PgSQL_Session::RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) { { PgSQL_Extended_Query_Type type = (status == PROCESSING_STMT_DESCRIBE) ? PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE : PGSQL_EXTENDED_QUERY_TYPE_EXECUTE; - const std::string& backend_stmt_name = - std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id); - rc = myconn->async_query(myds->revents, nullptr, 0, backend_stmt_name.c_str(), type, &CurrentQuery.extended_query_info); + + char backend_stmt_name[32]; + build_backend_stmt_name(backend_stmt_name, CurrentQuery.extended_query_info.stmt_backend_id); + rc = myconn->async_query(myds->revents, nullptr, 0, backend_stmt_name, type, &CurrentQuery.extended_query_info); } break; /* case PROCESSING_STMT_EXECUTE: From 7c665b9f786075d3c354c6241f60c68ceab176a8 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Mon, 10 Nov 2025 12:15:22 +0500 Subject: [PATCH 5/7] =?UTF-8?q?Checking=20the=20data=20stream=20on=20both?= =?UTF-8?q?=20ends=20doesn=E2=80=99t=20apply=20to=20frontend=20connections?= =?UTF-8?q?,=20since=20response=20data=20is=20buffered=20during=20extended?= =?UTF-8?q?=20queries.=20Fixed=20TAP=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/PgSQL_Data_Stream.cpp | 3 +- ...nded_query_protocol_query_rules_test-t.cpp | 46 ++++++++----------- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index 0e081a425c..9a7a0c0a08 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -438,7 +438,8 @@ void PgSQL_Data_Stream::shut_hard() { } void PgSQL_Data_Stream::check_data_flow() { - if ((PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) { + // This does not apply to frontend data streams because response data is buffered during extended queries. + if ((myds_type != MYDS_FRONTEND) && (PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) { // there is data at both sides of the data stream: this is considered a fatal error proxy_error("Session=%p, DataStream=%p -- Data at both ends of a PgSQL data stream: IN <%d bytes %d packets> , OUT <%d bytes %d packets>\n", sess, this, PSarrayIN->len, queue_data(queueIN), PSarrayOUT->len, queue_data(queueOUT)); shut_soft(); diff --git a/test/tap/tests/pgsql-extended_query_protocol_query_rules_test-t.cpp b/test/tap/tests/pgsql-extended_query_protocol_query_rules_test-t.cpp index 75ea069027..44b7da81d9 100644 --- a/test/tap/tests/pgsql-extended_query_protocol_query_rules_test-t.cpp +++ b/test/tap/tests/pgsql-extended_query_protocol_query_rules_test-t.cpp @@ -152,65 +152,55 @@ void consume_results(PGconn* conn) { PGresult* res = nullptr; bool saw_error = false; std::string errmsg; + + // Keep looping until PQgetResult() returns NULL and + // connection is not busy anymore. for (;;) { - // Drain all immediately available results while ((res = PQgetResult(conn)) != nullptr) { ExecStatusType status = PQresultStatus(res); if (status == PGRES_FATAL_ERROR) { saw_error = true; errmsg = PQresultErrorMessage(res); } - if (status == PGRES_PIPELINE_ABORTED) { - // If pipeline was aborted, we need to clear the error + else if (status == PGRES_PIPELINE_ABORTED) { saw_error = true; - errmsg = std::string("Pipeline aborted : ") + PQresultErrorMessage(res); + errmsg = std::string("Pipeline aborted: ") + PQresultErrorMessage(res); } PQclear(res); } if (!PQisBusy(conn)) { - while ((res = PQgetResult(conn)) != nullptr) { + // Ensure all results are drained + while ((res = PQgetResult(conn)) != nullptr) PQclear(res); - } - break; // ReadyForQuery reached + break; } - // ---- handle flushing + reading ---- - int f = PQflush(conn); - if (f == -1) { + if (PQflush(conn) == -1) { throw std::runtime_error(std::string("PQflush failed: ") + PQerrorMessage(conn)); } - short events = POLLIN; - if (f == 1) { - // still data to send also watch POLLOUT - events |= POLLOUT; - } - struct pollfd pfd; pfd.fd = PQsocket(conn); - pfd.events = events; - if (pfd.fd < 0) { + pfd.events = POLLIN | POLLOUT; + if (pfd.fd < 0) throw std::runtime_error("Invalid PostgreSQL socket"); - } - if (poll(&pfd, 1, -1) < 0) { + if (poll(&pfd, 1, -1) < 0) throw std::runtime_error("poll() failed"); - } - // If socket readable consume input - if (pfd.revents & POLLIN) { - if (PQconsumeInput(conn) == 0) { - throw std::runtime_error( - std::string("PQconsumeInput failed: ") + PQerrorMessage(conn)); - } + if ((pfd.revents & POLLIN) && PQconsumeInput(conn) == 0) { + throw std::runtime_error(std::string("PQconsumeInput failed: ") + PQerrorMessage(conn)); } - // If socket writable and f==1 PQflush() will be retried on next iteration } if (saw_error) { throw std::runtime_error("PostgreSQL error: " + errmsg); } + + // call PQgetResult() one final time to clear any leftover + while ((res = PQgetResult(conn)) != nullptr) + PQclear(res); } void test_query_processor(PGconn* admin_conn) { From e744c2bbb70c59630f37f2deceed4e34070e5d68 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Mon, 10 Nov 2025 18:24:27 +0500 Subject: [PATCH 6/7] Optimize transaction command parsing to avoid unnecessary tokenization Previously, the parser always tokenized the full command, even when we only needed to check whether it was a transaction command. Now, it first extracts the first word to determine relevance and performs full tokenization only when necessary. --- include/PgSQL_ExplicitTxnStateMgr.h | 19 +++- lib/PgSQL_ExplicitTxnStateMgr.cpp | 143 +++++++++++++++++++++++----- lib/PgSQL_Protocol.cpp | 6 +- lib/PgSQL_Session.cpp | 2 +- lib/PgSQL_Variables_Validator.cpp | 26 ++--- lib/ProxySQL_Config.cpp | 2 +- 6 files changed, 149 insertions(+), 49 deletions(-) diff --git a/include/PgSQL_ExplicitTxnStateMgr.h b/include/PgSQL_ExplicitTxnStateMgr.h index 3fbf66da5d..ce2ccd8dfa 100644 --- a/include/PgSQL_ExplicitTxnStateMgr.h +++ b/include/PgSQL_ExplicitTxnStateMgr.h @@ -56,6 +56,9 @@ struct TxnCmd { */ class PgSQL_TxnCmdParser { public: + PgSQL_TxnCmdParser() noexcept { tokens.reserve(16); } + ~PgSQL_TxnCmdParser() noexcept = default; + TxnCmd parse(std::string_view input, bool in_transaction_mode) noexcept; private: @@ -67,14 +70,20 @@ class PgSQL_TxnCmdParser { TxnCmd parse_start(size_t& pos) noexcept; // Helpers - static std::string to_lower(std::string_view s) noexcept { - std::string s_copy(s); - std::transform(s_copy.begin(), s_copy.end(), s_copy.begin(), ::tolower); - return s_copy; + inline static bool iequals(std::string_view a, std::string_view b) noexcept { + if (a.size() != b.size()) return false; + for (size_t i = 0; i < a.size(); ++i) { + char ca = a[i]; + char cb = b[i]; + if (ca >= 'A' && ca <= 'Z') ca += 32; + if (cb >= 'A' && cb <= 'Z') cb += 32; + if (ca != cb) return false; + } + return true; } inline static bool contains(std::vector&& list, std::string_view value) noexcept { - for (const auto& item : list) if (item == value) return true; + for (const auto& item : list) if (iequals(item, value)) return true; return false; } }; diff --git a/lib/PgSQL_ExplicitTxnStateMgr.cpp b/lib/PgSQL_ExplicitTxnStateMgr.cpp index 4389fae156..f8d0234851 100644 --- a/lib/PgSQL_ExplicitTxnStateMgr.cpp +++ b/lib/PgSQL_ExplicitTxnStateMgr.cpp @@ -327,16 +327,97 @@ bool PgSQL_ExplicitTxnStateMgr::handle_transaction(std::string_view input) { return true; } - TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mode) noexcept { - tokens.clear(); TxnCmd cmd; - bool in_quote = false; + + if (input.empty()) return cmd; + + // Extract first word without full tokenization size_t start = 0; + size_t end = 0; + + while (start < input.size() && fast_isspace(input[start])) { + start++; + } + + if (start >= input.size()) return cmd; + + // Find end of first word + end = start; + bool in_quote = false; char quote_char = 0; - // Tokenize with quote handling - for (size_t i = 0; i <= input.size(); ++i) { + while (end < input.size()) { + char c = input[end]; + + if (!in_quote && (c == '"' || c == '\'')) { + // If we hit a quote at the start, this isn't a transaction command + return cmd; + } + + if (fast_isspace(c) || c == ';') { + break; + } + + end++; + } + + std::string_view first_word = input.substr(start, end - start); + + // Check if this is a transaction command we care about + TxnCmd::Type cmd_type = TxnCmd::UNKNOWN; + + if (in_transaction_mode) { + if (iequals(first_word, "begin")) { + cmd.type = TxnCmd::BEGIN; + return cmd; + } + + if (iequals(first_word, "start")) { + cmd_type = TxnCmd::BEGIN; + } else if (iequals(first_word, "savepoint")) { + cmd_type = TxnCmd::SAVEPOINT; + } else if (iequals(first_word, "release")) { + cmd_type = TxnCmd::RELEASE; + } else if (iequals(first_word, "rollback")) { + cmd_type = TxnCmd::ROLLBACK; + } + } else { + + if (iequals(first_word, "commit") || iequals(first_word, "end")) { + cmd.type = TxnCmd::COMMIT; + return cmd; + } + + if (iequals(first_word, "abort")) { + cmd.type = TxnCmd::ROLLBACK; + return cmd; + } + + if (iequals(first_word, "rollback")) { + cmd_type = TxnCmd::ROLLBACK; + } + } + + // If not a transaction command, return early + if (cmd_type == TxnCmd::UNKNOWN) { + return cmd; + } + + // Continue tokenization from where we left off + tokens.clear(); + + // Continue tokenizing the rest of the input + in_quote = false; + quote_char = 0; + start = end; // Continue from after the first word + + while (start < input.size() && fast_isspace(input[start])) { + start++; + } + + // Tokenize the remaining input + for (size_t i = start; i <= input.size(); ++i) { const bool at_end = i == input.size(); const char c = at_end ? 0 : input[i]; @@ -344,6 +425,7 @@ TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mod if (c == quote_char || at_end) { tokens.emplace_back(input.substr(start + 1, i - start - 1)); in_quote = false; + start = i + 1; } continue; } @@ -353,41 +435,50 @@ TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mod quote_char = c; start = i; } - else if (isspace(c) || c == ';' || at_end) { + else if (fast_isspace(c) || c == ';' || at_end) { if (start < i) tokens.emplace_back(input.substr(start, i - start)); start = i + 1; } } - if (tokens.empty()) return cmd; - size_t pos = 0; - const std::string first = to_lower(tokens[pos++]); - - if (in_transaction_mode == true) { - if (first == "begin") cmd.type = TxnCmd::BEGIN; - else if (first == "start") cmd = parse_start(pos); - else if (first == "savepoint") cmd = parse_savepoint(pos); - else if (first == "release") cmd = parse_release(pos); - else if (first == "rollback") cmd = parse_rollback(pos); + + if (in_transaction_mode) { + + switch (cmd_type) { + case TxnCmd::BEGIN: + cmd = parse_start(pos); + break; + case TxnCmd::SAVEPOINT: + cmd = parse_savepoint(pos); + break; + case TxnCmd::RELEASE: + cmd = parse_release(pos); + break; + case TxnCmd::ROLLBACK: + cmd = parse_rollback(pos); + break; + default: + break; + } } else { - if (first == "commit" || first == "end") cmd.type = TxnCmd::COMMIT; - else if (first == "abort") cmd.type = TxnCmd::ROLLBACK; - else if (first == "rollback") cmd = parse_rollback(pos); + if (cmd_type == TxnCmd::ROLLBACK) + cmd = parse_rollback(pos); } + return cmd; } TxnCmd PgSQL_TxnCmdParser::parse_rollback(size_t& pos) noexcept { TxnCmd cmd{ TxnCmd::ROLLBACK }; - while (pos < tokens.size() && contains({ "work", "transaction" }, to_lower(tokens[pos]))) pos++; + while (pos < tokens.size() && contains({ "work", "transaction" }, tokens[pos])) pos++; - if (pos < tokens.size() && to_lower(tokens[pos]) == "to") { + if (pos < tokens.size() && iequals(tokens[pos], "to")) { cmd.type = TxnCmd::ROLLBACK_TO; - if (++pos < tokens.size() && to_lower(tokens[pos]) == "savepoint") pos++; + if (++pos < tokens.size() && iequals(tokens[pos], "savepoint")) pos++; if (pos < tokens.size()) cmd.savepoint = tokens[pos++]; - } else if (pos < tokens.size() && to_lower(tokens[pos]) == "and") { - if (++pos < tokens.size() && to_lower(tokens[pos]) == "chain") { + } else if (pos < tokens.size() && iequals(tokens[pos], "and")) { + if (++pos < tokens.size() && iequals(tokens[pos], "chain")) { cmd.type = TxnCmd::ROLLBACK_AND_CHAIN; pos++; } @@ -403,14 +494,14 @@ TxnCmd PgSQL_TxnCmdParser::parse_savepoint(size_t& pos) noexcept { TxnCmd PgSQL_TxnCmdParser::parse_release(size_t& pos) noexcept { TxnCmd cmd{ TxnCmd::RELEASE }; - if (pos < tokens.size() && to_lower(tokens[pos]) == "savepoint") pos++; + if (pos < tokens.size() && iequals(tokens[pos], "savepoint")) pos++; if (pos < tokens.size()) cmd.savepoint = tokens[pos++]; return cmd; } TxnCmd PgSQL_TxnCmdParser::parse_start(size_t& pos) noexcept { TxnCmd cmd{ TxnCmd::UNKNOWN }; - if (pos < tokens.size() && to_lower(tokens[pos]) == "transaction") { + if (pos < tokens.size() && iequals(tokens[pos], "transaction")) { cmd.type = TxnCmd::BEGIN; pos++; } diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index c005c6c5cd..0355fed845 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -741,7 +741,7 @@ std::vector> PgSQL_Protocol::parse_options(c while (pos < input.size()) { // Skip leading spaces - while (pos < input.size() && std::isspace(input[pos])) { + while (pos < input.size() && fast_isspace(input[pos])) { ++pos; } @@ -751,7 +751,7 @@ std::vector> PgSQL_Protocol::parse_options(c pos += 2; // Skip "-c", "--" } - while (pos < input.size() && std::isspace(input[pos])) { + while (pos < input.size() && fast_isspace(input[pos])) { ++pos; } @@ -772,7 +772,7 @@ std::vector> PgSQL_Protocol::parse_options(c bool last_was_escape = false; while (pos < input.size()) { char c = input[pos]; - if (std::isspace(c) && !last_was_escape) { + if (fast_isspace(c) && !last_was_escape) { break; } if (c == '\\' && !last_was_escape) { diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 73dfda60ee..87400028b8 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -6557,7 +6557,7 @@ std::vector PgSQL_DateStyle_Util::split_datestyle(std::string_view int* lastNonSpace = (currentToken == 1) ? &lastNonSpace1 : &lastNonSpace2; // Cache is-space check. - bool is_space = std::isspace(static_cast(c)); + bool is_space = fast_isspace(static_cast(c)); // Skip leading whitespace for a new token. if (currentStr->empty() && is_space) { continue; diff --git a/lib/PgSQL_Variables_Validator.cpp b/lib/PgSQL_Variables_Validator.cpp index a0a3752797..ed63cda107 100644 --- a/lib/PgSQL_Variables_Validator.cpp +++ b/lib/PgSQL_Variables_Validator.cpp @@ -183,7 +183,7 @@ bool pgsql_variable_validate_maintenance_work_mem(const char* value, const param if (transformed_value) *transformed_value = nullptr; // Skip leading whitespace - while (isspace((unsigned char)*p)) p++; + while (fast_isspace((unsigned char)*p)) p++; // Parse numeric part num = strtoll(p, &endptr, 10); @@ -196,11 +196,11 @@ bool pgsql_variable_validate_maintenance_work_mem(const char* value, const param p = endptr; // Skip whitespace after number - while (isspace((unsigned char)*p)) p++; + while (fast_isspace((unsigned char)*p)) p++; // Parse unit if (*p != '\0') { - char tmp_unit = tolower(*p); + char tmp_unit = ::tolower(*p); switch (tmp_unit) { case 'k': case 'm': @@ -210,7 +210,7 @@ bool pgsql_variable_validate_maintenance_work_mem(const char* value, const param unit = toupper(*p++); has_unit = true; // Check optional 'b'/'B' - if (tolower(*p) == 'b') p++; + if (::tolower(*p) == 'b') p++; break; default: return false; @@ -218,7 +218,7 @@ bool pgsql_variable_validate_maintenance_work_mem(const char* value, const param } // Skip trailing whitespace - while (isspace((unsigned char)*p)) p++; + while (fast_isspace((unsigned char)*p)) p++; // Validate entire string consumed if (*p != '\0') return false; @@ -241,7 +241,7 @@ bool pgsql_variable_validate_maintenance_work_mem_v2(const char* value, const pa const char* input = value; /* Trim leading whitespace */ - while (isspace((unsigned char)*input)) input++; + while (fast_isspace((unsigned char)*input)) input++; /* Parse numeric part */ uint64_t number; @@ -256,7 +256,7 @@ bool pgsql_variable_validate_maintenance_work_mem_v2(const char* value, const pa //num_len = endptr - input; // Skip whitespace after number - while (isspace((unsigned char)*endptr)) endptr++; + while (fast_isspace((unsigned char)*endptr)) endptr++; /* Parse unit part */ const char* unit_ptr = endptr; @@ -273,7 +273,7 @@ bool pgsql_variable_validate_maintenance_work_mem_v2(const char* value, const pa /* Convert unit to lowercase for validation */ char u[3] = { 0 }; for (int i = 0; i < 2 && unit_ptr[i]; i++) - u[i] = tolower((unsigned char)unit_ptr[i]); + u[i] = ::tolower((unsigned char)unit_ptr[i]); /* Validate unit and set multiplier */ if (unit_len == 1 && u[0] == 'b') { @@ -332,7 +332,7 @@ bool pgsql_variable_validate_maintenance_work_mem_v3(const char* value, const pa (void)session; // Trim leading whitespace - while (isspace((unsigned char)*value)) value++; + while (fast_isspace((unsigned char)*value)) value++; char* endptr; const char* num_start = value; @@ -371,7 +371,7 @@ bool pgsql_variable_validate_maintenance_work_mem_v3(const char* value, const pa // Convert unit to lowercase for validation char u[3] = { 0 }; for (int i = 0; i < 2 && unit_ptr[i]; i++) - u[i] = tolower((unsigned char)unit_ptr[i]); + u[i] = ::tolower((unsigned char)unit_ptr[i]); // Validate units and set multipliers if (unit_len == 1 && u[0] == 'b') { @@ -471,7 +471,7 @@ bool pgsql_variable_validate_search_path(const char* value, const params_t* para while (*token && result) { /* skip leading whitespace */ - while (*token && isspace((unsigned char)*token)) token++; + while (*token && fast_isspace((unsigned char)*token)) token++; if (*token == '\0') break; const char* part_start = token; @@ -508,7 +508,7 @@ bool pgsql_variable_validate_search_path(const char* value, const params_t* para } } else { // unquoted identifier or $user - while (*token && *token != ',' && !isspace(*token)) token++; + while (*token && *token != ',' && !fast_isspace(*token)) token++; part_len = (size_t)(token - part_start); if (part_len == 0 || part_len > 63) { result = false; @@ -543,7 +543,7 @@ bool pgsql_variable_validate_search_path(const char* value, const params_t* para normalized[norm_pos] = '\0'; // skip whitespace after part - while (*token && isspace(*token)) token++; + while (*token && fast_isspace(*token)) token++; // expect comma or end if (*token == ',') { diff --git a/lib/ProxySQL_Config.cpp b/lib/ProxySQL_Config.cpp index 758f60c66c..56facae970 100644 --- a/lib/ProxySQL_Config.cpp +++ b/lib/ProxySQL_Config.cpp @@ -92,7 +92,7 @@ int ProxySQL_Config::Read_Global_Variables_from_configfile(const char *prefix) { char *query=(char *)malloc(strlen(q)+strlen(prefix)+strlen(n)+strlen(value_string.c_str())); sprintf(query,q, prefix, n, value_string.c_str()); //fprintf(stderr, "%s\n", query); - admindb->execute(query); + admindb->execute(query); free(query); } admindb->execute("PRAGMA foreign_keys = ON"); From 0162e8ff3c0b57859ba127b74a20fc0775c23ed4 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Mon, 10 Nov 2025 18:43:16 +0500 Subject: [PATCH 7/7] Replaced use of the generic write_generic() helper with direct packet construction for selected PostgreSQL protocol messages to reduce overhead and improve performance. --- lib/PgSQL_Protocol.cpp | 55 ++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index 0355fed845..0aed8033d1 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1612,8 +1612,10 @@ bool PgSQL_Protocol::generate_ready_for_query_packet(bool send, char trx_state, // to avoid memory leak assert(send == true || _ptr); - PG_pkt pgpkt{}; - pgpkt.write_ReadyForQuery(trx_state); + PG_pkt pgpkt(8); + pgpkt.put_char('Z'); + pgpkt.put_uint32(5); + pgpkt.put_char(trx_state); // transaction state auto buff = pgpkt.detach(); if (send == true) { (*myds)->PSarrayOUT->add((void*)buff.first, buff.second); @@ -1696,15 +1698,13 @@ bool PgSQL_Protocol::generate_describe_completion_packet(bool send, bool ready, bool PgSQL_Protocol::generate_close_completion_packet(bool send, bool ready, char trx_state, PtrSize_t* _ptr) { // to avoid memory leak assert(send == true || _ptr); - PG_pkt pgpkt{}; + PG_pkt pgpkt(16); + pgpkt.put_char('3'); + pgpkt.put_uint32(4); if (ready == true) { - pgpkt.set_multi_pkt_mode(true); - } - // Close completion message - pgpkt.write_CloseCompletion(); - if (ready == true) { - pgpkt.write_ReadyForQuery(trx_state); - pgpkt.set_multi_pkt_mode(false); + pgpkt.put_char('Z'); + pgpkt.put_uint32(5); // size of the ReadyForQuery packet + pgpkt.put_char(trx_state); // transaction state } auto buff = pgpkt.detach(); if (send == true) { @@ -1720,15 +1720,13 @@ bool PgSQL_Protocol::generate_close_completion_packet(bool send, bool ready, cha bool PgSQL_Protocol::generate_bind_completion_packet(bool send, bool ready, char trx_state, PtrSize_t* _ptr) { // to avoid memory leak assert(send == true || _ptr); - PG_pkt pgpkt{}; + PG_pkt pgpkt(16); + pgpkt.put_char('2'); + pgpkt.put_uint32(4); if (ready == true) { - pgpkt.set_multi_pkt_mode(true); - } - // Bind completion message - pgpkt.write_BindCompletion(); - if (ready == true) { - pgpkt.write_ReadyForQuery(trx_state); - pgpkt.set_multi_pkt_mode(false); + pgpkt.put_char('Z'); + pgpkt.put_uint32(5); // size of the ReadyForQuery packet + pgpkt.put_char(trx_state); // transaction state } auto buff = pgpkt.detach(); if (send == true) { @@ -1743,7 +1741,7 @@ bool PgSQL_Protocol::generate_bind_completion_packet(bool send, bool ready, char bool PgSQL_Protocol::generate_no_data_packet(bool send, PtrSize_t* _ptr) { // to avoid memory leak assert(send == true || _ptr); - PG_pkt pgpkt(5); + PG_pkt pgpkt(8); pgpkt.put_char('n'); pgpkt.put_uint32(4); // size of the NoData packet (Fixed 4 bytes) auto buff = pgpkt.detach(); @@ -1759,21 +1757,14 @@ bool PgSQL_Protocol::generate_no_data_packet(bool send, PtrSize_t* _ptr) { bool PgSQL_Protocol::generate_parse_completion_packet(bool send, bool ready, char trx_state, PtrSize_t* _ptr) { // to avoid memory leak assert(send == true || _ptr); - - PG_pkt pgpkt{}; - - if (ready == true) { - pgpkt.set_multi_pkt_mode(true); - } - - // Parse completion message - pgpkt.write_ParseCompletion(); - + PG_pkt pgpkt(16); + pgpkt.put_char('1'); + pgpkt.put_uint32(4); if (ready == true) { - pgpkt.write_ReadyForQuery(trx_state); - pgpkt.set_multi_pkt_mode(false); + pgpkt.put_char('Z'); + pgpkt.put_uint32(5); // size of the ReadyForQuery packet + pgpkt.put_char(trx_state); // transaction state } - auto buff = pgpkt.detach(); if (send == true) { (*myds)->PSarrayOUT->add((void*)buff.first, buff.second);