-
Notifications
You must be signed in to change notification settings - Fork 197
Description
Expected behavior
trin client should update headers while sending spooling request:
def acknowledge(self) -> None:
def acknowledge_request():
try:
http_response = self._send_spooling_request(self.ack_uri, timeout=2)
if not http_response.ok:
self._request.raise_response_error(http_response)
except Exception as e:
logger.error(f"Failed to acknowledge spooling request for segment {self}: {e}")
# Start the acknowledgment in the executor thread
executor.submit(acknowledge_request)
def _send_spooling_request(self, uri: str, **kwargs) -> requests.Response:
headers_with_single_value = {}
for key, values in self.headers.items():
if len(values) > 1:
raise ValueError(f"Header '{key}' contains multiple values: {values}")
headers_with_single_value[key] = values[0]
return self._request._get(uri, headers=headers_with_single_value, **kwargs)
Actual behavior
While sending spooling request, https headers are not updated properly, which are sent through http_headers argument to connection object.
connection = connect(
host='https://<<host_name>>',
port=443,
user=user,
source='',
client_tags=[''],
legacy_primitive_types=True,
http_headers=http_headers,
request_timeout=120,
encoding=["json+zstd", "json+lz4", "json"]
)
Steps To Reproduce
`Sample code:
from typing import Dict, Tuple, Any
import time
import logging
import pprint
import requests
from trino.dbapi import connect # type: ignore
from trino.auth import Authentication
from boto3 import Session
from botocore.awsrequest import AWSRequest
from botocore.compat import HTTPHeaders
from botocore.hooks import HierarchicalEmitter
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from botocore.auth import SigV4Auth
Logging
logging.basicConfig(level=logging.DEBUG)
Debug helper
def debug_connection_object(obj):
print(f"\nObject type: {type(obj)}")
print("============== ATTRIBUTE VALUES ==============")
for attr in dir(obj):
if attr.startswith("_"):
continue
try:
value = getattr(obj, attr)
except Exception as e:
value = f"<error reading attribute: {e}>"
print(f"{attr}: {value}")
Get AWS SigV4 headers for Bouncer
def get_bouncer_auth_headers() -> Dict[str, str]:
auth_headers = [
"Authorization",
"X-Amz-Security-Token",
"X-Amz-Date"
]
session = Session()
credentials = session.get_credentials()
emitter = HierarchicalEmitter()
signer = RequestSigner(ServiceId("STS"), 'us-east-1', "sts", "v4", credentials, emitter)
url = "https://sts.amazonaws.com/"
headers = HTTPHeaders()
headers.add_header("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
request = AWSRequest(
method="POST",
url=url,
headers=headers,
data="Action=GetCallerIdentity&Version=2011-06-15",
params={}
)
signer.sign("GetCallerIdentity", request)
aws_headers = {k: v for k, v in request.headers.items() if k in auth_headers}
# Optional headers
aws_headers['Cache-Control'] = 'no-cache'
aws_headers['Accept'] = 'application/json'
return aws_headers
Custom AWS SigV4 Trino Authentication
class AwsSigV4Authentication(Authentication):
def init(self, region: str = "us-east-1", service: str = "sts"):
self.region = region
self.service = service
def _get_sigv4_headers(self) -> Dict[str, str]:
boto_session = Session()
credentials = boto_session.get_credentials()
request = AWSRequest(
method="POST",
url="https://sts.amazonaws.com/",
data="Action=GetCallerIdentity&Version=2011-06-15",
headers={"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"},
)
SigV4Auth(credentials, self.service, self.region).add_auth(request)
allowed = ["Authorization", "X-Amz-Security-Token", "X-Amz-Date"]
signed_headers = {k: v for k, v in request.headers.items() if k in allowed}
signed_headers["Cache-Control"] = "no-cache"
signed_headers["Accept"] = "application/json"
return signed_headers
def set_http_session(self, http_session: requests.Session) -> requests.Session:
headers = self._get_sigv4_headers()
http_session.headers.update(headers)
return http_session
def get_exceptions(self) -> Tuple[Any, ...]:
return ()
def __eq__(self, other: object) -> bool:
return isinstance(other, AwsSigV4Authentication) and \
self.region == other.region and self.service == other.service
Get Trino connection
def get_trino_connection(user: str):
http_headers = get_bouncer_auth_headers()
connection = connect(
host='https://trino-api.dev.data.lrl.lilly.com',
port=443,
user=user,
source='<REPLACE_ME>',
client_tags=['<REPLACE_ME>'],
legacy_primitive_types=True,
http_headers=http_headers,
request_timeout=120,
encoding=["json+zstd", "json+lz4", "json"],
)
return connection
Execute query
def execute_query(query: str, user: str = 'rdata'):
trino_connection = get_trino_connection(user)
cursor = trino_connection.cursor()
t1 = time.time()
cursor.execute(query)
rows = cursor.fetchall()
t2 = time.time()
print(f"Trino query executed in {round(t2-t1, 2)} seconds, rows returned: {len(rows)}.")
return rows
Sample query
sample_query = 'SELECT * FROM invitro.in_vitro_results_har.result LIMIT 60000'
execute_query(sample_query)
`
Log output
% python check_spooling_protocol.py
DEBUG:botocore.hooks:Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
DEBUG:botocore.hooks:Changing event name from before-call.apigateway to before-call.api-gateway
DEBUG:botocore.hooks:Changing event name from request-created.machinelearning.Predict to request-created.machine-learning.Predict
DEBUG:botocore.hooks:Changing event name from before-parameter-build.autoscaling.CreateLaunchConfiguration to before-parameter-build.auto-scaling.CreateLaunchConfiguration
DEBUG:botocore.utils:IMDS ENDPOINT: http://169.254.169.254/
DEBUG:botocore.credentials:Looking for credentials via: env
INFO:botocore.credentials:Found credentials in environment variables.
DEBUG:botocore.auth:Calculating signature using v4 auth.
DEBUG:botocore.auth:CanonicalRequest:
POST
/
host:sts.amazonaws.com
x-amz-date:20251118T170624Z
DEBUG:botocore.auth:StringToSign:
AWS4-HMAC-SHA256
20251118T170624Z
20251118/us-east-1/sts/aws4_request
DEBUG:botocore.auth:Signature:
DEBUG:tzlocal:/etc/localtime found
DEBUG:tzlocal:1 found: {'/etc/localtime is a symlink to': 'Asia/Kolkata'}
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1):
DEBUG:urllib3.connectionpool:HTTPS request returned 200
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1):
DEBUG:urllib3.connectionpool:HTTP request redirected 301
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1):
DEBUG:urllib3.connectionpool:HTTPS request returned 200
ERROR:trino.client:Failed to acknowledge spooling request for segment SpooledSegment(metadata={'segmentSize': 282244, 'uncompressedSize': 8627781, 'rowsCount': 6820, 'expiresAt': '2025-11-19T10:36:27.333', 'rowOffset': 1682}): error 500: b'invalid token'
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1):
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1):
ERROR:trino.client:Failed to acknowledge spooling request for segment SpooledSegment(metadata={'segmentSize': 552030, 'uncompressedSize': 17252082, 'rowsCount': 13625, 'expiresAt': '2025-11-19T10:36:27.788', 'rowOffset': 8502}): error 500: b'invalid token'
ERROR:trino.client:Failed to acknowledge spooling request for segment SpooledSegment(metadata={'segmentSize': 840759, 'uncompressedSize': 17222665, 'rowsCount': 13117, 'expiresAt': '2025-11-19T10:36:28.128', 'rowOffset': 22127}): error 500: b'invalid token'
ERROR:trino.client:Failed to acknowledge spooling request for segment SpooledSegment(metadata={'segmentSize': 1002490, 'uncompressedSize': 17207451, 'rowsCount': 13109, 'expiresAt': '2025-11-19T10:36:28.603', 'rowOffset': 35244}): error 500: b'invalid token'
Replaced all AWS credentials and signatures with .
Replaced internal hostnames with and .
Removed user-specific file paths.
Operating System
macOS 26.1 Build 25B78
Trino Python client version
0.336.0
Trino Server version
476
Python version
3.13.1
Are you willing to submit PR?
- Yes I am willing to submit a PR!