Skip to content

Commit 7c665b9

Browse files
committed
Checking the data stream on both ends doesn’t apply to frontend connections, since response data is buffered during extended queries.
Fixed TAP test
1 parent 7a3a5c7 commit 7c665b9

File tree

2 files changed

+20
-29
lines changed

2 files changed

+20
-29
lines changed

lib/PgSQL_Data_Stream.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,8 @@ void PgSQL_Data_Stream::shut_hard() {
438438
}
439439

440440
void PgSQL_Data_Stream::check_data_flow() {
441-
if ((PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
441+
// This does not apply to frontend data streams because response data is buffered during extended queries.
442+
if ((myds_type != MYDS_FRONTEND) && (PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
442443
// there is data at both sides of the data stream: this is considered a fatal error
443444
proxy_error("Session=%p, DataStream=%p -- Data at both ends of a PgSQL data stream: IN <%d bytes %d packets> , OUT <%d bytes %d packets>\n", sess, this, PSarrayIN->len, queue_data(queueIN), PSarrayOUT->len, queue_data(queueOUT));
444445
shut_soft();

test/tap/tests/pgsql-extended_query_protocol_query_rules_test-t.cpp

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -152,65 +152,55 @@ void consume_results(PGconn* conn) {
152152
PGresult* res = nullptr;
153153
bool saw_error = false;
154154
std::string errmsg;
155+
156+
// Keep looping until PQgetResult() returns NULL and
157+
// connection is not busy anymore.
155158
for (;;) {
156-
// Drain all immediately available results
157159
while ((res = PQgetResult(conn)) != nullptr) {
158160
ExecStatusType status = PQresultStatus(res);
159161
if (status == PGRES_FATAL_ERROR) {
160162
saw_error = true;
161163
errmsg = PQresultErrorMessage(res);
162164
}
163-
if (status == PGRES_PIPELINE_ABORTED) {
164-
// If pipeline was aborted, we need to clear the error
165+
else if (status == PGRES_PIPELINE_ABORTED) {
165166
saw_error = true;
166-
errmsg = std::string("Pipeline aborted : ") + PQresultErrorMessage(res);
167+
errmsg = std::string("Pipeline aborted: ") + PQresultErrorMessage(res);
167168
}
168169
PQclear(res);
169170
}
170171

171172
if (!PQisBusy(conn)) {
172-
while ((res = PQgetResult(conn)) != nullptr) {
173+
// Ensure all results are drained
174+
while ((res = PQgetResult(conn)) != nullptr)
173175
PQclear(res);
174-
}
175-
break; // ReadyForQuery reached
176+
break;
176177
}
177178

178-
// ---- handle flushing + reading ----
179-
int f = PQflush(conn);
180-
if (f == -1) {
179+
if (PQflush(conn) == -1) {
181180
throw std::runtime_error(std::string("PQflush failed: ") + PQerrorMessage(conn));
182181
}
183182

184-
short events = POLLIN;
185-
if (f == 1) {
186-
// still data to send also watch POLLOUT
187-
events |= POLLOUT;
188-
}
189-
190183
struct pollfd pfd;
191184
pfd.fd = PQsocket(conn);
192-
pfd.events = events;
193-
if (pfd.fd < 0) {
185+
pfd.events = POLLIN | POLLOUT;
186+
if (pfd.fd < 0)
194187
throw std::runtime_error("Invalid PostgreSQL socket");
195-
}
196188

197-
if (poll(&pfd, 1, -1) < 0) {
189+
if (poll(&pfd, 1, -1) < 0)
198190
throw std::runtime_error("poll() failed");
199-
}
200191

201-
// If socket readable consume input
202-
if (pfd.revents & POLLIN) {
203-
if (PQconsumeInput(conn) == 0) {
204-
throw std::runtime_error(
205-
std::string("PQconsumeInput failed: ") + PQerrorMessage(conn));
206-
}
192+
if ((pfd.revents & POLLIN) && PQconsumeInput(conn) == 0) {
193+
throw std::runtime_error(std::string("PQconsumeInput failed: ") + PQerrorMessage(conn));
207194
}
208-
// If socket writable and f==1 PQflush() will be retried on next iteration
209195
}
210196

211197
if (saw_error) {
212198
throw std::runtime_error("PostgreSQL error: " + errmsg);
213199
}
200+
201+
// call PQgetResult() one final time to clear any leftover
202+
while ((res = PQgetResult(conn)) != nullptr)
203+
PQclear(res);
214204
}
215205

216206
void test_query_processor(PGconn* admin_conn) {

0 commit comments

Comments
 (0)