Skip to content

Commit 1bcd090

Browse files
committed
Refactor Connection Reset Handling
* Introduced startup parameters in PgSQL_Connection (removed default session parameters from PgSQL_Session) * Startup parameters are populated during both frontend and backend connection creation * Parameters provided via connection options are set as startup parameters * Backend connection parameter handling updated: only critical variables are now set via connection options to prevent interference with DISCARD ALL during connection reset; remaining parameters will be applied using individual SET commands
1 parent 431fda0 commit 1bcd090

File tree

7 files changed

+168
-125
lines changed

7 files changed

+168
-125
lines changed

include/PgSQL_Connection.h

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class PgSQL_Connection_userinfo {
244244

245245
class PgSQL_Connection {
246246
public:
247-
PgSQL_Connection();
247+
PgSQL_Connection(bool is_client_conn);
248248
~PgSQL_Connection();
249249

250250
PG_ASYNC_ST handler(short event);
@@ -312,7 +312,7 @@ class PgSQL_Connection {
312312
if (error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL ||
313313
error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR ||
314314
error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_PANIC) {
315-
return true;
315+
return true;
316316
}
317317
return false;
318318
}
@@ -442,6 +442,42 @@ class PgSQL_Connection {
442442

443443
bool IsKeepMultiplexEnabledVariables(char* query_digest_text);
444444

445+
/**
446+
* @brief Retrieves startup parameter and it's hash
447+
*
448+
* This function tries to retrieve value and hash of startup paramters if present (provided in connection parameters).
449+
* If value is not found, it falls back to the thread-specific default variables.
450+
*
451+
* @param idx The index of startup parameter to retrieve.
452+
* @return The value and hash of startup parameter.
453+
*
454+
*/
455+
std::pair<const char*, uint32_t> get_startup_parameter_and_hash(enum pgsql_variable_name idx);
456+
457+
/**
458+
* @brief Copies tracked PgSQL session variables to startup parameters
459+
*
460+
* This function synchronizes the current tracked session variables (in `variables` and `var_hash`)
461+
* to the startup parameters arrays (`startup_parameters` and `startup_parameters_hash`). If `copy_only_critical_param`
462+
* is true, only the critical parameters (indices 0 to PGSQL_NAME_LAST_LOW_WM-1) are copied.
463+
* Otherwise, all tracked variables up to PGSQL_NAME_LAST_HIGH_WM are copied.
464+
*
465+
* @param copy_only_critical_param If true, only critical parameters are copied; otherwise, all tracked variables.
466+
*/
467+
void copy_pgsql_variables_to_startup_parameters(bool copy_only_critical_param);
468+
469+
/**
470+
* @brief Copies startup parameters to tracked PgSQL session variables.
471+
*
472+
* This function synchronizes the startup parameters arrays (`startup_parameters` and `startup_parameters_hash`)
473+
* to the tracked session variables (`variables` and `var_hash`). If `copy_only_critical_param` is true,
474+
* only the critical parameters (indices 0 to PGSQL_NAME_LAST_LOW_WM-1) are copied. Otherwise, all tracked
475+
* variables up to PGSQL_NAME_LAST_HIGH_WM are copied.
476+
*
477+
* @param copy_only_critical_param If true, only critical parameters are copied; otherwise, all tracked variables.
478+
*/
479+
void copy_startup_parameters_to_pgsql_variables(bool copy_only_critical_param);
480+
445481
struct {
446482
unsigned long length;
447483
char* ptr;
@@ -471,6 +507,9 @@ class PgSQL_Connection {
471507
bool var_absent[PGSQL_NAME_LAST_HIGH_WM] = { false };
472508
std::vector<uint32_t> dynamic_variables_idx;
473509

510+
uint32_t startup_parameters_hash[PGSQL_NAME_LAST_HIGH_WM] = {};
511+
char* startup_parameters[PGSQL_NAME_LAST_HIGH_WM] = {};
512+
474513
/**
475514
* @brief Keeps tracks of the 'server_status'. Do not confuse with the 'server_status' from the
476515
* 'MYSQL' connection itself. This flag keeps track of the configured server status from the
@@ -499,7 +538,7 @@ class PgSQL_Connection {
499538
bool reusable;
500539
bool processing_multi_statement;
501540
bool multiplex_delayed;
502-
541+
bool is_client_connection; // true if this is a client connection, false if it is a server connection
503542

504543
PgSQL_SrvC *parent;
505544
PgSQL_Connection_userinfo* userinfo;

include/PgSQL_Session.h

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,6 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
369369
PtrSize_t pkt;
370370
std::string untracked_option_parameters;
371371
PgSQL_DateStyle_t current_datestyle = {};
372-
char* default_session_variables[PGSQL_NAME_LAST_HIGH_WM] = {};
373372

374373
#if 0
375374
// uint64_t
@@ -526,22 +525,6 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
526525
void detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose = false);
527526
void generate_status_one_hostgroup(int hid, std::string& s);
528527
void set_previous_status_mode3(bool allow_execute = true);
529-
530-
void set_default_session_variable(enum pgsql_variable_name idx, const char* value);
531-
532-
/**
533-
* @brief Retrieves default session variable
534-
*
535-
* This function tries to retrieve value of default session variable if present (provided in connection parameters).
536-
* If value is not found, it falls back to the thread-specific default variables.
537-
*
538-
* @param idx The index of the session variable to retrieve.
539-
* @return The value of the session variable
540-
*
541-
*/
542-
const char* get_default_session_variable(enum pgsql_variable_name idx);
543-
544-
void reset_default_session_variable(enum pgsql_variable_name idx);
545528
};
546529

547530
#define PgSQL_KILL_QUERY 1

lib/Base_Thread.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ S Base_Thread::create_new_session_and_client_data_stream(int _fd) {
132132
sess->client_myds->myprot.dump_pkt = true;
133133
#endif
134134
if constexpr (std::is_same_v<T, PgSQL_Thread>) {
135-
PgSQL_Connection* myconn = new PgSQL_Connection();
135+
PgSQL_Connection* myconn = new PgSQL_Connection(true);
136136
sess->client_myds->attach_connection(myconn);
137137
} else if constexpr (std::is_same_v<T, MySQL_Thread>) {
138138
MySQL_Connection* myconn = new MySQL_Connection();

lib/PgSQL_Connection.cpp

Lines changed: 104 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,9 @@ void print_backtrace(void);
147147

148148
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
149149

150-
PgSQL_Connection::PgSQL_Connection() {
150+
PgSQL_Connection::PgSQL_Connection(bool is_client_conn) {
151151
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new PgSQL_Connection %p\n", this);
152+
is_client_connection = is_client_conn;
152153
pgsql_conn = NULL;
153154
result_type = 0;
154155
pgsql_result = NULL;
@@ -215,13 +216,6 @@ PgSQL_Connection::~PgSQL_Connection() {
215216
delete query_result_reuse;
216217
query_result_reuse = NULL;
217218
}
218-
for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) {
219-
if (variables[i].value) {
220-
free(variables[i].value);
221-
variables[i].value = NULL;
222-
var_hash[i] = 0;
223-
}
224-
}
225219

226220
if (connected_host_details.hostname) {
227221
free(connected_host_details.hostname);
@@ -233,6 +227,21 @@ PgSQL_Connection::~PgSQL_Connection() {
233227
}
234228

235229
if (options.init_connect) free(options.init_connect);
230+
231+
for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; ++i) {
232+
if (variables[i].value) {
233+
free(variables[i].value);
234+
variables[i].value = NULL;
235+
var_hash[i] = 0;
236+
}
237+
}
238+
239+
for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; ++i) {
240+
if (startup_parameters[i]) {
241+
free(startup_parameters[i]);
242+
startup_parameters_hash[i] = 0;
243+
}
244+
}
236245
}
237246

238247
void PgSQL_Connection::next_event(PG_ASYNC_ST new_st) {
@@ -756,44 +765,28 @@ void PgSQL_Connection::connect_start() {
756765
// charset validation is already done
757766
pgsql_variables.server_set_hash_and_value(myds->sess, PGSQL_CLIENT_ENCODING, client_charset, client_charset_hash);
758767

759-
std::vector<unsigned int> client_options;
760-
client_options.reserve(PGSQL_NAME_LAST_LOW_WM + myds->sess->client_myds->myconn->dynamic_variables_idx.size());
768+
// optimized way to set client parameters on backend connection when creating a new connection
769+
conninfo << "options='";
770+
// excluding client_encoding, which is already set above
771+
for (int idx = 1; idx < PGSQL_NAME_LAST_LOW_WM; idx++) {
772+
const char* value = pgsql_variables.client_get_value(myds->sess, idx);
773+
const char* escaped_str = escape_string_backslash_spaces(value);
774+
conninfo << "-c " << pgsql_tracked_variables[idx].set_variable_name << "=" << escaped_str << " ";
775+
if (escaped_str != value)
776+
free((char*)escaped_str);
761777

762-
// excluding PGSQL_CLIENT_ENCODING
763-
for (unsigned int idx = 1; idx < PGSQL_NAME_LAST_LOW_WM; idx++) {
764-
if (pgsql_variables.client_get_hash(myds->sess, idx) == 0) continue;
765-
client_options.push_back(idx);
778+
const uint32_t hash = pgsql_variables.client_get_hash(myds->sess, idx);
779+
pgsql_variables.server_set_hash_and_value(myds->sess, idx, value, hash);
766780
}
767781

768-
for (uint32_t idx : myds->sess->client_myds->myconn->dynamic_variables_idx) {
769-
assert(pgsql_variables.client_get_hash(myds->sess, idx));
770-
client_options.push_back(idx);
771-
}
782+
myds->sess->mybe->server_myds->myconn->copy_pgsql_variables_to_startup_parameters(true);
772783

773-
if (client_options.empty() == false ||
774-
myds->sess->untracked_option_parameters.empty() == false) {
775-
776-
// optimized way to set client parameters on backend connection when creating a new connection
777-
conninfo << "options='";
778-
for (int idx : client_options) {
779-
const char* value = pgsql_variables.client_get_value(myds->sess, idx);
780-
const char* escaped_str = escape_string_backslash_spaces(value);
781-
conninfo << "-c " << pgsql_tracked_variables[idx].set_variable_name << "=" << escaped_str << " ";
782-
if (escaped_str != value)
783-
free((char*)escaped_str);
784-
785-
const uint32_t hash = pgsql_variables.client_get_hash(myds->sess, idx);
786-
pgsql_variables.server_set_hash_and_value(myds->sess, idx, value, hash);
787-
}
788-
789-
myds->sess->mybe->server_myds->myconn->reorder_dynamic_variables_idx();
790-
791-
// if there are untracked parameters, the session should lock on the host group
792-
if (myds->sess->untracked_option_parameters.empty() == false) {
793-
conninfo << myds->sess->untracked_option_parameters;
794-
}
795-
conninfo << "'";
784+
// if there are untracked parameters, the session should lock on the host group
785+
if (myds->sess->untracked_option_parameters.empty() == false) {
786+
conninfo << myds->sess->untracked_option_parameters;
796787
}
788+
conninfo << "'";
789+
797790
}
798791

799792
/*conninfo << "postgres://";
@@ -1881,16 +1874,19 @@ void PgSQL_Connection::reset() {
18811874
reusable = true;
18821875
creation_time = monotonic_time();
18831876

1884-
for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) {
1877+
for (int i = (is_client_connection ? 0 : PGSQL_NAME_LAST_LOW_WM + 1);
1878+
i < PGSQL_NAME_LAST_HIGH_WM;
1879+
i++) {
18851880
var_hash[i] = 0;
18861881
if (variables[i].value) {
18871882
free(variables[i].value);
18881883
variables[i].value = NULL;
1889-
var_hash[i] = 0;
18901884
}
18911885
}
18921886
dynamic_variables_idx.clear();
18931887

1888+
if (!is_client_connection) copy_startup_parameters_to_pgsql_variables(/*copy_only_critical_param=*/true);
1889+
18941890
if (options.init_connect) {
18951891
free(options.init_connect);
18961892
options.init_connect = NULL;
@@ -2202,3 +2198,68 @@ void PgSQL_Connection::set_error_from_PQerrorMessage() {
22022198
const std::string_view& full_msg = !primary_msg.empty() ? primary_msg : lib_errmsg;
22032199
PgSQL_Error_Helper::fill_error_info(error_info, sqlstate.data(), full_msg.data(), severity.data());
22042200
}
2201+
2202+
std::pair<const char*, uint32_t> PgSQL_Connection::get_startup_parameter_and_hash(enum pgsql_variable_name idx) {
2203+
// within valid range?
2204+
assert(idx >= 0 && idx < PGSQL_NAME_LAST_HIGH_WM);
2205+
2206+
// Attempt to retrieve value from default startup parameters
2207+
if (startup_parameters_hash[idx] != 0) {
2208+
assert(startup_parameters[idx]);
2209+
return { startup_parameters[idx], startup_parameters_hash[idx] };
2210+
}
2211+
// fall back to thread-specific default
2212+
return { pgsql_thread___default_variables[idx], 0 };
2213+
}
2214+
2215+
void PgSQL_Connection::copy_pgsql_variables_to_startup_parameters(bool copy_only_critical_param) {
2216+
2217+
//memcpy(startup_parameters_hash, var_hash, sizeof(uint32_t) * PGSQL_NAME_LAST_LOW_WM);
2218+
for (int i = 0; i < PGSQL_NAME_LAST_LOW_WM; ++i) {
2219+
assert(var_hash[i]);
2220+
assert(variables[i].value);
2221+
startup_parameters_hash[i] = var_hash[i];
2222+
free(startup_parameters[i]);
2223+
startup_parameters[i] = strdup(variables[i].value);
2224+
}
2225+
2226+
if (copy_only_critical_param) return;
2227+
2228+
for (int i = PGSQL_NAME_LAST_LOW_WM + 1; i < PGSQL_NAME_LAST_HIGH_WM; i++) {
2229+
if (var_hash[i] != 0) {
2230+
startup_parameters_hash[i] = var_hash[i];
2231+
free(startup_parameters[i]);
2232+
startup_parameters[i] = strdup(variables[i].value);
2233+
} else {
2234+
startup_parameters_hash[i] = 0;
2235+
free(startup_parameters[i]);
2236+
startup_parameters[i] = nullptr;
2237+
}
2238+
}
2239+
}
2240+
2241+
void PgSQL_Connection::copy_startup_parameters_to_pgsql_variables(bool copy_only_critical_param) {
2242+
2243+
//memcpy(var_hash, startup_parameters_hash, sizeof(uint32_t) * PGSQL_NAME_LAST_LOW_WM);
2244+
for (int i = 0; i < PGSQL_NAME_LAST_LOW_WM; i++) {
2245+
assert(startup_parameters_hash[i]);
2246+
assert(startup_parameters[i]);
2247+
var_hash[i] = startup_parameters_hash[i];
2248+
free(variables[i].value);
2249+
variables[i].value = strdup(startup_parameters[i]);
2250+
}
2251+
2252+
if (copy_only_critical_param) return;
2253+
2254+
for (int i = PGSQL_NAME_LAST_LOW_WM + 1; i < PGSQL_NAME_LAST_HIGH_WM; i++) {
2255+
if (startup_parameters_hash[i]) {
2256+
var_hash[i] = startup_parameters_hash[i];
2257+
free(variables[i].value);
2258+
variables[i].value = strdup(startup_parameters[i]);
2259+
} else {
2260+
var_hash[i] = 0;
2261+
free(variables[i].value);
2262+
variables[i].value = nullptr;
2263+
}
2264+
}
2265+
}

lib/PgSQL_HostGroups_Manager.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2298,7 +2298,7 @@ PgSQL_Connection * PgSQL_SrvConnList::get_random_MyConn(PgSQL_Session *sess, boo
22982298
}
22992299

23002300
// we must create a new connection
2301-
conn = new PgSQL_Connection();
2301+
conn = new PgSQL_Connection(false);
23022302
conn->parent=mysrvc;
23032303
// if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa
23042304
conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG);
@@ -2312,7 +2312,7 @@ PgSQL_Connection * PgSQL_SrvConnList::get_random_MyConn(PgSQL_Session *sess, boo
23122312
unsigned int conns_free = mysrvc->ConnectionsFree->conns_length();
23132313
unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length();
23142314
if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) {
2315-
conn = new PgSQL_Connection();
2315+
conn = new PgSQL_Connection(false);
23162316
conn->parent=mysrvc;
23172317
// if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa
23182318
conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG);
@@ -2357,7 +2357,7 @@ PgSQL_Connection * PgSQL_SrvConnList::get_random_MyConn(PgSQL_Session *sess, boo
23572357
__sync_fetch_and_add(&PgHGM->status.server_connections_delayed, 1);
23582358
return NULL;
23592359
} else {
2360-
conn = new PgSQL_Connection();
2360+
conn = new PgSQL_Connection(false);
23612361
conn->parent=mysrvc;
23622362
// if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa
23632363
conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG);

lib/PgSQL_Protocol.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,11 +1211,9 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
12111211
if (pgsql_variables.client_set_value(sess, PGSQL_DATESTYLE, datestyle.c_str(), false)) {
12121212
// change current datestyle
12131213
sess->current_datestyle = PgSQL_DateStyle_Util::parse_datestyle(datestyle);
1214-
sess->set_default_session_variable(PGSQL_DATESTYLE, datestyle.c_str());
12151214
}
12161215
} else {
12171216
pgsql_variables.client_set_value(sess, idx, value_copy.c_str(), false);
1218-
sess->set_default_session_variable((enum pgsql_variable_name)idx, value_copy.c_str());
12191217
}
12201218
} else {
12211219
// parameter provided is not part of the tracked variables. Will lock on hostgroup on next query.
@@ -1234,10 +1232,10 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
12341232
continue;
12351233
const char* val = pgsql_thread___default_variables[i];
12361234
pgsql_variables.client_set_value(sess, i, val, false);
1237-
sess->set_default_session_variable((pgsql_variable_name)i, val);
12381235
}
12391236

12401237
sess->client_myds->myconn->reorder_dynamic_variables_idx();
1238+
sess->client_myds->myconn->copy_pgsql_variables_to_startup_parameters(false);
12411239
}
12421240
else {
12431241
// we always duplicate username and password, or crashes happen

0 commit comments

Comments
 (0)