Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions python/lance_namespace/src/lance_namespace/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import pyarrow as pa

from lance_namespace.namespace import LanceNamespace
from lance_namespace.schema import convert_pyarrow_schema_to_json_arrow
from lance_namespace_urllib3_client.models import (
ListNamespacesRequest,
ListNamespacesResponse,
Expand Down Expand Up @@ -440,7 +441,9 @@ def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse
location = table.sd.location if table.sd else None
if not location:
raise ValueError(f"Table {request.id} has no location")

dataset = lance.dataset(location, storage_options=self.storage_properties)
schema = convert_pyarrow_schema_to_json_arrow(dataset.schema)

# Build properties from Hive metadata
properties = {}
if table.parameters:
Expand All @@ -459,7 +462,7 @@ def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse
# Note: We don't load the Lance dataset here, just return Hive metadata
# Schema will be None as we're not opening the dataset
return DescribeTableResponse(
var_schema=None,
schema=schema,
location=location,
version=version,
properties=properties
Expand All @@ -483,7 +486,7 @@ def register_table(self, request: RegisterTableRequest) -> RegisterTableResponse
managed_by = request.properties.get(MANAGED_BY_KEY, "storage") if request.properties else "storage"

# We always need to open the dataset to get schema for Hive columns
dataset = lance.dataset(request.location)
dataset = lance.dataset(request.location, storage_options=self.storage_properties)
schema = dataset.schema

# Only track version if managed_by is "impl"
Expand Down Expand Up @@ -659,9 +662,9 @@ def create_table(self, request: CreateTableRequest, request_data: bytes) -> Crea
# Check if dataset already exists
if os.path.exists(location):
raise ValueError(f"Table {request.id} already exists at {location}")
dataset = lance.write_dataset(table, location)
dataset = lance.write_dataset(table, location, storage_options=self.storage_properties)
elif request.mode == "create_or_replace":
dataset = lance.write_dataset(table, location, mode="overwrite")
dataset = lance.write_dataset(table, location, mode="overwrite", storage_options=self.storage_properties)
else:
raise ValueError(f"Unsupported create mode: {request.mode}")

Expand Down Expand Up @@ -734,7 +737,7 @@ def create_empty_table(self, request: CreateEmptyTableRequest) -> CreateEmptyTab
)

# Create table in Hive
with self.client_pool.get_client() as client:
with self.client as client:
client.create_table(hive_table)

return CreateEmptyTableResponse(location=location)
Expand Down
56 changes: 34 additions & 22 deletions python/lance_namespace/src/lance_namespace/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,22 @@ def convert_pyarrow_schema_to_json_arrow(schema: "pa.Schema") -> JsonArrowSchema
name=field.name,
nullable=field.nullable,
type=convert_pyarrow_type_to_json_arrow(field.type),
metadata=field.metadata
metadata=validate_metadata(field.metadata)
)
fields.append(json_field)

return JsonArrowSchema(fields=fields)

def validate_metadata(metadata):
if not metadata:
return {}

return {
(k.decode('utf-8') if isinstance(k, bytes) else k):
(v.decode('utf-8') if isinstance(v, bytes) else v)
for k, v in metadata.items()
}


def convert_pyarrow_type_to_json_arrow(dtype: "pa.DataType") -> JsonArrowDataType:
"""Convert PyArrow data type to JSON Arrow data type.
Expand All @@ -89,55 +99,57 @@ def convert_pyarrow_type_to_json_arrow(dtype: "pa.DataType") -> JsonArrowDataTyp
raise ImportError("PyArrow is required for schema conversion")

if pa.types.is_boolean(dtype):
return JsonArrowDataType(name="bool")
return JsonArrowDataType(type="bool")
elif pa.types.is_int8(dtype):
return JsonArrowDataType(name="int", bitWidth=8, isSigned=True)
return JsonArrowDataType(type="int", bitWidth=8, isSigned=True)
elif pa.types.is_int16(dtype):
return JsonArrowDataType(name="int", bitWidth=16, isSigned=True)
return JsonArrowDataType(type="int", bitWidth=16, isSigned=True)
elif pa.types.is_int32(dtype):
return JsonArrowDataType(name="int", bitWidth=32, isSigned=True)
return JsonArrowDataType(type="int", bitWidth=32, isSigned=True)
elif pa.types.is_int64(dtype):
return JsonArrowDataType(name="int", bitWidth=64, isSigned=True)
return JsonArrowDataType(type="int", bitWidth=64, isSigned=True)
elif pa.types.is_uint8(dtype):
return JsonArrowDataType(name="int", bitWidth=8, isSigned=False)
return JsonArrowDataType(type="int", bitWidth=8, isSigned=False)
elif pa.types.is_uint16(dtype):
return JsonArrowDataType(name="int", bitWidth=16, isSigned=False)
return JsonArrowDataType(type="int", bitWidth=16, isSigned=False)
elif pa.types.is_uint32(dtype):
return JsonArrowDataType(name="int", bitWidth=32, isSigned=False)
return JsonArrowDataType(type="int", bitWidth=32, isSigned=False)
elif pa.types.is_uint64(dtype):
return JsonArrowDataType(name="int", bitWidth=64, isSigned=False)
return JsonArrowDataType(type="int", bitWidth=64, isSigned=False)
elif pa.types.is_float32(dtype):
return JsonArrowDataType(name="floatingpoint", precision="SINGLE")
return JsonArrowDataType(type="floatingpoint", precision="SINGLE")
elif pa.types.is_float64(dtype):
return JsonArrowDataType(name="floatingpoint", precision="DOUBLE")
return JsonArrowDataType(type="floatingpoint", precision="DOUBLE")
elif pa.types.is_string(dtype):
return JsonArrowDataType(name="utf8")
return JsonArrowDataType(type="utf8")
elif pa.types.is_binary(dtype):
return JsonArrowDataType(name="binary")
return JsonArrowDataType(type="binary")
elif pa.types.is_timestamp(dtype):
return JsonArrowDataType(
name="timestamp",
type="timestamp",
unit=dtype.unit,
timezone=dtype.tz
)
elif pa.types.is_date32(dtype):
return JsonArrowDataType(name="date", unit="DAY")
return JsonArrowDataType(type="date", unit="DAY")
elif pa.types.is_date64(dtype):
return JsonArrowDataType(name="date", unit="MILLISECOND")
return JsonArrowDataType(type="date", unit="MILLISECOND")
elif pa.types.is_decimal(dtype):
return JsonArrowDataType(
name="decimal",
type="decimal",
precision=dtype.precision,
scale=dtype.scale
)
elif pa.types.is_fixed_size_list(dtype):
return JsonArrowDataType(type="fixed_size_list")
elif pa.types.is_list(dtype):
return JsonArrowDataType(name="list")
return JsonArrowDataType(type="list")
elif pa.types.is_struct(dtype):
return JsonArrowDataType(name="struct")
return JsonArrowDataType(type="struct")
elif pa.types.is_map(dtype):
return JsonArrowDataType(name="map")
return JsonArrowDataType(type="map")
else:
return JsonArrowDataType(name="unknown")
return JsonArrowDataType(type="unknown")


def convert_json_arrow_type_to_pyarrow(json_type: JsonArrowDataType) -> "pa.DataType":
Expand Down
Loading