Skip to content

Commit e56e0de

Browse files
authored
fix: revert fix(sqllab): Force trino client async execution (#24859) (#25541)
1 parent ef1807c commit e56e0de

File tree

5 files changed

+18
-114
lines changed

5 files changed

+18
-114
lines changed

superset/db_engine_specs/base.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,24 +1066,6 @@ def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
10661066
query object"""
10671067
# TODO: Fix circular import error caused by importing sql_lab.Query
10681068

1069-
@classmethod
1070-
def execute_with_cursor(
1071-
cls, cursor: Any, sql: str, query: Query, session: Session
1072-
) -> None:
1073-
"""
1074-
Trigger execution of a query and handle the resulting cursor.
1075-
1076-
For most implementations this just makes calls to `execute` and
1077-
`handle_cursor` consecutively, but in some engines (e.g. Trino) we may
1078-
need to handle client limitations such as lack of async support and
1079-
perform a more complicated operation to get information from the cursor
1080-
in a timely manner and facilitate operations such as query stop
1081-
"""
1082-
logger.debug("Query %d: Running query: %s", query.id, sql)
1083-
cls.execute(cursor, sql, async_=True)
1084-
logger.debug("Query %d: Handling cursor", query.id)
1085-
cls.handle_cursor(cursor, query, session)
1086-
10871069
@classmethod
10881070
def extract_error_message(cls, ex: Exception) -> str:
10891071
return f"{cls.engine} error: {cls._extract_error_message(ex)}"

superset/db_engine_specs/trino.py

Lines changed: 6 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import contextlib
2020
import logging
21-
import threading
22-
import time
2321
from typing import Any, TYPE_CHECKING
2422

2523
import simplejson as json
@@ -153,22 +151,15 @@ def get_tracking_url(cls, cursor: Cursor) -> str | None:
153151

154152
@classmethod
155153
def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
156-
"""
157-
Handle a trino client cursor.
158-
159-
WARNING: if you execute a query, it will block until complete and you
160-
will not be able to handle the cursor until complete. Use
161-
`execute_with_cursor` instead, to handle this asynchronously.
162-
"""
163-
164-
# Adds the executed query id to the extra payload so the query can be cancelled
165-
cancel_query_id = cursor.query_id
166-
logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id)
167-
query.set_extra_json_key(key=QUERY_CANCEL_KEY, value=cancel_query_id)
168-
169154
if tracking_url := cls.get_tracking_url(cursor):
170155
query.tracking_url = tracking_url
171156

157+
# Adds the executed query id to the extra payload so the query can be cancelled
158+
query.set_extra_json_key(
159+
key=QUERY_CANCEL_KEY,
160+
value=(cancel_query_id := cursor.stats["queryId"]),
161+
)
162+
172163
session.commit()
173164

174165
# if query cancelation was requested prior to the handle_cursor call, but
@@ -182,51 +173,6 @@ def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
182173

183174
super().handle_cursor(cursor=cursor, query=query, session=session)
184175

185-
@classmethod
186-
def execute_with_cursor(
187-
cls, cursor: Any, sql: str, query: Query, session: Session
188-
) -> None:
189-
"""
190-
Trigger execution of a query and handle the resulting cursor.
191-
192-
Trino's client blocks until the query is complete, so we need to run it
193-
in another thread and invoke `handle_cursor` to poll for the query ID
194-
to appear on the cursor in parallel.
195-
"""
196-
execute_result: dict[str, Any] = {}
197-
198-
def _execute(results: dict[str, Any]) -> None:
199-
logger.debug("Query %d: Running query: %s", query.id, sql)
200-
201-
# Pass result / exception information back to the parent thread
202-
try:
203-
cls.execute(cursor, sql)
204-
results["complete"] = True
205-
except Exception as ex: # pylint: disable=broad-except
206-
results["complete"] = True
207-
results["error"] = ex
208-
209-
execute_thread = threading.Thread(target=_execute, args=(execute_result,))
210-
execute_thread.start()
211-
212-
# Wait for a query ID to be available before handling the cursor, as
213-
# it's required by that method; it may never become available on error.
214-
while not cursor.query_id and not execute_result.get("complete"):
215-
time.sleep(0.1)
216-
217-
logger.debug("Query %d: Handling cursor", query.id)
218-
cls.handle_cursor(cursor, query, session)
219-
220-
# Block until the query completes; same behaviour as the client itself
221-
logger.debug("Query %d: Waiting for query to complete", query.id)
222-
while not execute_result.get("complete"):
223-
time.sleep(0.5)
224-
225-
# Unfortunately we'll mangle the stack trace due to the thread, but
226-
# throwing the original exception allows mapping database errors as normal
227-
if err := execute_result.get("error"):
228-
raise err
229-
230176
@classmethod
231177
def prepare_cancel_query(cls, query: Query, session: Session) -> None:
232178
if QUERY_CANCEL_KEY not in query.extra:

superset/sql_lab.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
191191
return handle_query_error(ex, query, session)
192192

193193

194-
def execute_sql_statement( # pylint: disable=too-many-arguments
194+
def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statements
195195
sql_statement: str,
196196
query: Query,
197197
session: Session,
@@ -271,7 +271,10 @@ def execute_sql_statement( # pylint: disable=too-many-arguments
271271
)
272272
session.commit()
273273
with stats_timing("sqllab.query.time_executing_query", stats_logger):
274-
db_engine_spec.execute_with_cursor(cursor, sql, query, session)
274+
logger.debug("Query %d: Running query: %s", query.id, sql)
275+
db_engine_spec.execute(cursor, sql, async_=True)
276+
logger.debug("Query %d: Handling cursor", query.id)
277+
db_engine_spec.handle_cursor(cursor, query, session)
275278

276279
with stats_timing("sqllab.query.time_fetching_results", stats_logger):
277280
logger.debug(

tests/unit_tests/db_engine_specs/test_trino.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ def test_handle_cursor_early_cancel(
352352
query_id = "myQueryId"
353353

354354
cursor_mock = engine_mock.return_value.__enter__.return_value
355-
cursor_mock.query_id = query_id
355+
cursor_mock.stats = {"queryId": query_id}
356356
session_mock = mocker.MagicMock()
357357

358358
query = Query()
@@ -366,32 +366,3 @@ def test_handle_cursor_early_cancel(
366366
assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
367367
else:
368368
assert cancel_query_mock.call_args is None
369-
370-
371-
def test_execute_with_cursor_in_parallel(mocker: MockerFixture):
372-
"""Test that `execute_with_cursor` fetches query ID from the cursor"""
373-
from superset.db_engine_specs.trino import TrinoEngineSpec
374-
375-
query_id = "myQueryId"
376-
377-
mock_cursor = mocker.MagicMock()
378-
mock_cursor.query_id = None
379-
380-
mock_query = mocker.MagicMock()
381-
mock_session = mocker.MagicMock()
382-
383-
def _mock_execute(*args, **kwargs):
384-
mock_cursor.query_id = query_id
385-
386-
mock_cursor.execute.side_effect = _mock_execute
387-
388-
TrinoEngineSpec.execute_with_cursor(
389-
cursor=mock_cursor,
390-
sql="SELECT 1 FROM foo",
391-
query=mock_query,
392-
session=mock_session,
393-
)
394-
395-
mock_query.set_extra_json_key.assert_called_once_with(
396-
key=QUERY_CANCEL_KEY, value=query_id
397-
)

tests/unit_tests/sql_lab_test.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def test_execute_sql_statement(mocker: MockerFixture, app: None) -> None:
5555
)
5656

5757
database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True)
58-
db_engine_spec.execute_with_cursor.assert_called_with(
59-
cursor, "SELECT 42 AS answer LIMIT 2", query, session
58+
db_engine_spec.execute.assert_called_with(
59+
cursor, "SELECT 42 AS answer LIMIT 2", async_=True
6060
)
6161
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
6262

@@ -106,8 +106,10 @@ def test_execute_sql_statement_with_rls(
106106
101,
107107
force=True,
108108
)
109-
db_engine_spec.execute_with_cursor.assert_called_with(
110-
cursor, "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", query, session
109+
db_engine_spec.execute.assert_called_with(
110+
cursor,
111+
"SELECT * FROM sales WHERE organization_id=42 LIMIT 101",
112+
async_=True,
111113
)
112114
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
113115

0 commit comments

Comments
 (0)