Skip to content
Merged
2 changes: 1 addition & 1 deletion benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def time_save_big_catalog():
"catalog_type": "object",
"lowest_order": 6,
"highest_order": 10,
"threshold": 500,
"partition_rows": 500,
}

catalog = lsdb.from_dataframe(mock_partition_df, margin_threshold=None, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/import_catalogs.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
" catalog_type=\"object\",\n",
" lowest_order=2,\n",
" highest_order=5,\n",
" threshold=100,\n",
" partition_rows=100,\n",
")\n",
"\n",
"# Save it to disk in HATS format\n",
Expand Down
130 changes: 98 additions & 32 deletions src/lsdb/loaders/dataframe/dataframe_catalog_loader.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from __future__ import annotations

import math
import re
import warnings

import astropy.units as u
import hats as hc
import hats.pixel_math.healpix_shim as hp
import nested_pandas as npd
import numpy as np
import pandas as pd
import pyarrow as pa
from hats.catalog import CatalogType, TableProperties
from hats.io.size_estimates import get_mem_size_per_row
from hats.pixel_math import HealpixPixel, generate_histogram
from hats.pixel_math.healpix_pixel_function import get_pixel_argsort
from hats.pixel_math.sparse_histogram import supplemental_count_histogram
from hats.pixel_math.spatial_index import (
SPATIAL_INDEX_COLUMN,
SPATIAL_INDEX_ORDER,
Expand Down Expand Up @@ -44,8 +46,8 @@ def __init__(
lowest_order: int = 0,
highest_order: int = 7,
drop_empty_siblings: bool = False,
partition_size: int | None = None,
threshold: int | None = None,
partition_rows: int | None = None,
partition_bytes: int | None = None,
should_generate_moc: bool = True,
moc_max_order: int = 10,
use_pyarrow_types: bool = True,
Expand All @@ -71,10 +73,12 @@ def __init__(
drop_empty_siblings : bool, default False
When determining final partitionining, if 3 of 4 pixels are empty,
keep only the non-empty pixel
partition_size : int or None, default None
The desired partition size, in number of rows.
threshold : int or None, default None
The maximum number of data points per pixel.
partition_rows : int or None, default None
The maximum partition size, in number of rows. If specified,
'partition_bytes' must be None.
partition_bytes : int or None, default None
The maximum partition size, in bytes. If specified,
'partition_rows' must be None.
should_generate_moc : bool, default True
Should we generate a MOC (multi-order coverage map) of the data.
It can improve performance when joining/crossmatching to other hats-sharded datasets.
Expand All @@ -92,7 +96,7 @@ def __init__(
self.lowest_order = lowest_order
self.highest_order = highest_order
self.drop_empty_siblings = drop_empty_siblings
self.threshold = self._calculate_threshold(partition_size, threshold)
self.partition_rows, self.partition_bytes = self._calculate_threshold(partition_rows, partition_bytes)

if ra_column is None:
ra_column = self._find_column("ra")
Expand All @@ -103,11 +107,31 @@ def __init__(
if dataframe[ra_column].isna().any() or dataframe[dec_column].isna().any():
raise ValueError(f"NaN values found in {ra_column}/{dec_column} columns")

# Don't allow deprecated arguments.
for deprecated_arg in ["threshold", "partition_size"]:
if deprecated_arg in kwargs:
raise ValueError(
f"'{deprecated_arg}' is deprecated; use 'partition_rows' or 'partition_bytes' instead."
)

# Don't allow users to specify hats_max_rows or hats_max_bytes directly.
for invalid_arg in ["hats_max_rows", "hats_max_bytes"]:
if invalid_arg in kwargs:
raise ValueError(
f"{invalid_arg} should not be provided in kwargs; "
"use 'partition_rows' or 'partition_bytes' instead"
)

# Set partitioning kwarg to pass to catalog info creation.
if self.partition_rows is not None:
kwargs = dict(kwargs, hats_max_rows=self.partition_rows)
elif self.partition_bytes is not None:
kwargs = dict(kwargs, hats_max_bytes=self.partition_bytes)

self.catalog_info = self._create_catalog_info(
ra_column=ra_column,
dec_column=dec_column,
total_rows=len(self.dataframe),
hats_max_rows=self.threshold,
**kwargs,
)
self.should_generate_moc = should_generate_moc
Expand All @@ -130,28 +154,50 @@ def _find_column(self, search_term: str) -> str:
raise ValueError(f"Found {n_matches} possible columns for {search_term}")
return matches[0]

def _calculate_threshold(self, partition_size: int | None = None, threshold: int | None = None) -> int:
"""Calculates the number of points per HEALPix pixel (threshold) for the desired partition size."""
def _calculate_threshold(
self,
partition_rows: int | None = None,
partition_bytes: int | None = None,
) -> tuple[int | None, int | None]:
"""Verifies partition thresholds and sets default if necessary.

If partition_rows is provided, no partition exceeds the maximum number of rows;
if partition_bytes is provided, no partition exceeds the maximum memory size.

If neither partition_rows nor partition_bytes is provided, a default
partition size of approximately 1 GiB is used.

Parameters
----------
partition_rows : int or None, default None
The desired partition size maximum, in number of rows.
partition_bytes : int or None, default None
The desired partition size maximum, in bytes.

Returns
-------
tuple[int or None, int or None]
The validated partition_rows and partition_bytes.

Raises
------
ValueError
If both partition_rows and partition_bytes are specified.
"""
self.df_total_memory = self.dataframe.memory_usage(deep=True).sum()
if self.df_total_memory > (1 << 30) or len(self.dataframe) > 1_000_000:
warnings.warn(
"from_dataframe is not intended for large datasets. "
"Consider using hats-import: https://hats-import.readthedocs.io/",
RuntimeWarning,
)
if threshold is not None and partition_size is not None:
raise ValueError("Specify only one: threshold or partition_size")
if threshold is None:
if partition_size is not None:
# Round the number of partitions to the next integer, otherwise the
# number of pixels per partition may exceed the threshold
num_partitions = math.ceil(len(self.dataframe) / partition_size)
threshold = len(self.dataframe) // num_partitions
else:
# Each partition in memory will be of roughly 1Gib
partition_memory = self.df_total_memory / len(self.dataframe)
threshold = math.ceil((1 << 30) / partition_memory)
return threshold

if partition_rows is not None and partition_bytes is not None:
raise ValueError("Specify only one: partition_rows or partition_bytes")
if partition_rows is None and partition_bytes is None:
# Default to 1 GiB partitions
partition_bytes = 1 << 30
return partition_rows, partition_bytes

def _create_catalog_info(
self,
Expand Down Expand Up @@ -238,18 +284,38 @@ def _compute_pixel_list(self) -> list[HealpixPixel]:
list[HealpixPixel]
HEALPix pixels for the final partitioning.
"""
raw_histogram = generate_histogram(
self.dataframe,
highest_order=self.highest_order,
ra_column=self.catalog_info.ra_column,
dec_column=self.catalog_info.dec_column,
)
# Generate histograms.
if self.partition_rows is not None:
row_count_histogram = generate_histogram(
self.dataframe,
highest_order=self.highest_order,
ra_column=self.catalog_info.ra_column,
dec_column=self.catalog_info.dec_column,
)
mem_size_histogram = None
else:
row_mem_sizes = get_mem_size_per_row(self.dataframe)
mapped_pixels = hp.radec2pix(
self.highest_order,
self.dataframe[self.catalog_info.ra_column].values,
self.dataframe[self.catalog_info.dec_column].values,
)
(row_count_sparse_histo, mem_size_sparse_histo) = supplemental_count_histogram(
mapped_pixels,
row_mem_sizes,
highest_order=self.highest_order,
)
row_count_histogram = row_count_sparse_histo.to_array()
mem_size_histogram = mem_size_sparse_histo.to_array()

# Generate alignment.
alignment = hc.pixel_math.generate_alignment(
raw_histogram,
row_count_histogram,
highest_order=self.highest_order,
lowest_order=self.lowest_order,
threshold=self.threshold,
threshold=(self.partition_rows if self.partition_rows is not None else self.partition_bytes),
drop_empty_siblings=self.drop_empty_siblings,
mem_size_histogram=mem_size_histogram,
)
pixel_list = list({HealpixPixel(tup[0], tup[1]) for tup in alignment if not tup is None})
return list(np.array(pixel_list)[get_pixel_argsort(pixel_list)])
Expand Down
18 changes: 10 additions & 8 deletions src/lsdb/loaders/dataframe/from_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def from_dataframe(
lowest_order: int = 0,
highest_order: int = 7,
drop_empty_siblings: bool = True,
partition_size: int | None = None,
threshold: int | None = None,
partition_rows: int | None = None,
partition_bytes: int | None = None,
margin_order: int = -1,
margin_threshold: float | None = 5.0,
should_generate_moc: bool = True,
Expand Down Expand Up @@ -50,10 +50,12 @@ def from_dataframe(
drop_empty_siblings : bool, default True
When determining final partitionining, if 3 of 4 pixels are empty,
keep only the non-empty pixel
partition_size : int or None, default None
The desired partition size, in number of rows.
threshold : int or None, default None
The maximum number of data points per pixel.
partition_rows : int or None, default None
The desired partition size, in number of rows. Only one of
`partition_rows` or `partition_bytes` should be specified.
partition_bytes : int or None, default None
The desired partition size, in bytes. Only one of
`partition_rows` or `partition_bytes` should be specified.
margin_order : int, default -1
The order at which to generate the margin cache.
margin_threshold : float or None, default 5
Expand Down Expand Up @@ -91,8 +93,8 @@ def from_dataframe(
lowest_order=lowest_order,
highest_order=highest_order,
drop_empty_siblings=drop_empty_siblings,
partition_size=partition_size,
threshold=threshold,
partition_rows=partition_rows,
partition_bytes=partition_bytes,
should_generate_moc=should_generate_moc,
moc_max_order=moc_max_order,
use_pyarrow_types=use_pyarrow_types,
Expand Down
2 changes: 1 addition & 1 deletion tests/lsdb/catalog/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ def divme(df, _=None):
return 1 / df["a"]

# Force every row into a separate partition
nfc = lsdb.from_dataframe(nf, ra_column="a", dec_column="b", partition_size=1)
nfc = lsdb.from_dataframe(nf, ra_column="a", dec_column="b", partition_rows=1)

with pytest.raises(RuntimeError, match=r"function divme to partition 3: Not so fast"):
nfc.map_partitions(divme, include_pixel=False).compute()
Expand Down
2 changes: 1 addition & 1 deletion tests/lsdb/io/test_to_hats.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def test_save_catalog_point_map(small_sky_order1_df, tmp_path):
catalog_name="small_sky_order1",
lowest_order=6,
highest_order=8,
threshold=500,
partition_rows=500,
)

small_sky_order1_catalog.write_catalog(
Expand Down
Loading