Skip to content

Commit 08e3b4c

Browse files
authored
feat(storage): add pyiceberg wrapper (#1326)
1 parent 78ebf2c commit 08e3b4c

File tree

4 files changed

+438
-0
lines changed

4 files changed

+438
-0
lines changed

src/storage/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies = [
2626
"deprecation >=2.1.0",
2727
"pydantic >=2.11.7",
2828
"yarl>=1.20.1",
29+
"pyiceberg>=0.10.0",
2930
]
3031

3132
[project.urls]

src/storage/src/storage3/_async/analytics.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List, Optional
22

33
from httpx import QueryParams
4+
from pyiceberg.catalog.rest import RestCatalog
45

56
from ..types import (
67
AnalyticsBucket,
@@ -49,3 +50,24 @@ async def delete(self, bucket_name: str) -> AnalyticsBucketDeleteResponse:
4950
http_method="DELETE", path=["bucket", bucket_name]
5051
)
5152
return AnalyticsBucketDeleteResponse.model_validate_json(data.content)
53+
54+
def catalog(
55+
self, catalog_name: str, access_key_id: str, secret_access_key: str
56+
) -> RestCatalog:
57+
catalog_uri = self._request._base_url
58+
s3_endpoint = self._request._base_url.parent.joinpath("s3")
59+
service_key = self._request.headers.get("apiKey")
60+
assert service_key, "apiKey must be passed in the headers."
61+
return RestCatalog(
62+
catalog_name,
63+
warehouse=catalog_name,
64+
uri=str(catalog_uri),
65+
token=service_key,
66+
**{
67+
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
68+
"s3.endpoint": str(s3_endpoint),
69+
"s3.access-key-id": access_key_id,
70+
"s3.secret-access-key": secret_access_key,
71+
"s3.force-virtual-addressing": "False",
72+
},
73+
)

src/storage/src/storage3/_sync/analytics.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List, Optional
22

33
from httpx import QueryParams
4+
from pyiceberg.catalog.rest import RestCatalog
45

56
from ..types import (
67
AnalyticsBucket,
@@ -47,3 +48,24 @@ def list(
4748
def delete(self, bucket_name: str) -> AnalyticsBucketDeleteResponse:
4849
data = self._request.send(http_method="DELETE", path=["bucket", bucket_name])
4950
return AnalyticsBucketDeleteResponse.model_validate_json(data.content)
51+
52+
def catalog(
53+
self, catalog_name: str, access_key_id: str, secret_access_key: str
54+
) -> RestCatalog:
55+
catalog_uri = self._request._base_url
56+
s3_endpoint = self._request._base_url.parent.joinpath("s3")
57+
service_key = self._request.headers.get("apiKey")
58+
assert service_key, "apiKey must be passed in the headers."
59+
return RestCatalog(
60+
catalog_name,
61+
warehouse=catalog_name,
62+
uri=str(catalog_uri),
63+
token=service_key,
64+
**{
65+
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
66+
"s3.endpoint": str(s3_endpoint),
67+
"s3.access-key-id": access_key_id,
68+
"s3.secret-access-key": secret_access_key,
69+
"s3.force-virtual-addressing": "False",
70+
},
71+
)

0 commit comments

Comments
 (0)