diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 0340fd6a9..54d5bbf09 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -123,7 +123,7 @@ def time_save_big_catalog(): "catalog_type": "object", "lowest_order": 6, "highest_order": 10, - "threshold": 500, + "partition_rows": 500, } catalog = lsdb.from_dataframe(mock_partition_df, margin_threshold=None, **kwargs) diff --git a/docs/tutorials/import_catalogs.ipynb b/docs/tutorials/import_catalogs.ipynb index 0f913e5f5..d507433e1 100644 --- a/docs/tutorials/import_catalogs.ipynb +++ b/docs/tutorials/import_catalogs.ipynb @@ -115,7 +115,7 @@ " catalog_type=\"object\",\n", " lowest_order=2,\n", " highest_order=5,\n", - " threshold=100,\n", + " partition_rows=100,\n", ")\n", "\n", "# Save it to disk in HATS format\n", diff --git a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py index 2e87583d0..b5abf1e98 100644 --- a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py +++ b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py @@ -1,18 +1,20 @@ from __future__ import annotations -import math import re import warnings import astropy.units as u import hats as hc +import hats.pixel_math.healpix_shim as hp import nested_pandas as npd import numpy as np import pandas as pd import pyarrow as pa from hats.catalog import CatalogType, TableProperties +from hats.io.size_estimates import get_mem_size_per_row from hats.pixel_math import HealpixPixel, generate_histogram from hats.pixel_math.healpix_pixel_function import get_pixel_argsort +from hats.pixel_math.sparse_histogram import supplemental_count_histogram from hats.pixel_math.spatial_index import ( SPATIAL_INDEX_COLUMN, SPATIAL_INDEX_ORDER, @@ -44,8 +46,8 @@ def __init__( lowest_order: int = 0, highest_order: int = 7, drop_empty_siblings: bool = False, - partition_size: int | None = None, - threshold: int | None = None, + partition_rows: int | None = None, + partition_bytes: int | None = None, should_generate_moc: bool = True, moc_max_order: int = 10, use_pyarrow_types: bool = True, @@ -71,10 +73,12 @@ def __init__( drop_empty_siblings : bool, default False When determining final partitionining, if 3 of 4 pixels are empty, keep only the non-empty pixel - partition_size : int or None, default None - The desired partition size, in number of rows. - threshold : int or None, default None - The maximum number of data points per pixel. + partition_rows : int or None, default None + The maximum partition size, in number of rows. If specified, + 'partition_bytes' must be None. + partition_bytes : int or None, default None + The maximum partition size, in bytes. If specified, + 'partition_rows' must be None. should_generate_moc : bool, default True Should we generate a MOC (multi-order coverage map) of the data. It can improve performance when joining/crossmatching to other hats-sharded datasets. @@ -92,7 +96,7 @@ def __init__( self.lowest_order = lowest_order self.highest_order = highest_order self.drop_empty_siblings = drop_empty_siblings - self.threshold = self._calculate_threshold(partition_size, threshold) + self.partition_rows, self.partition_bytes = self._calculate_threshold(partition_rows, partition_bytes) if ra_column is None: ra_column = self._find_column("ra") @@ -103,11 +107,31 @@ def __init__( if dataframe[ra_column].isna().any() or dataframe[dec_column].isna().any(): raise ValueError(f"NaN values found in {ra_column}/{dec_column} columns") + # Don't allow deprecated arguments. + for deprecated_arg in ["threshold", "partition_size"]: + if deprecated_arg in kwargs: + raise ValueError( + f"'{deprecated_arg}' is deprecated; use 'partition_rows' or 'partition_bytes' instead." + ) + + # Don't allow users to specify hats_max_rows or hats_max_bytes directly. + for invalid_arg in ["hats_max_rows", "hats_max_bytes"]: + if invalid_arg in kwargs: + raise ValueError( + f"{invalid_arg} should not be provided in kwargs; " + "use 'partition_rows' or 'partition_bytes' instead" + ) + + # Set partitioning kwarg to pass to catalog info creation. + if self.partition_rows is not None: + kwargs = dict(kwargs, hats_max_rows=self.partition_rows) + elif self.partition_bytes is not None: + kwargs = dict(kwargs, hats_max_bytes=self.partition_bytes) + self.catalog_info = self._create_catalog_info( ra_column=ra_column, dec_column=dec_column, total_rows=len(self.dataframe), - hats_max_rows=self.threshold, **kwargs, ) self.should_generate_moc = should_generate_moc @@ -130,8 +154,36 @@ def _find_column(self, search_term: str) -> str: raise ValueError(f"Found {n_matches} possible columns for {search_term}") return matches[0] - def _calculate_threshold(self, partition_size: int | None = None, threshold: int | None = None) -> int: - """Calculates the number of points per HEALPix pixel (threshold) for the desired partition size.""" + def _calculate_threshold( + self, + partition_rows: int | None = None, + partition_bytes: int | None = None, + ) -> tuple[int | None, int | None]: + """Verifies partition thresholds and sets default if necessary. + + If partition_rows is provided, no partition exceeds the maximum number of rows; + if partition_bytes is provided, no partition exceeds the maximum memory size. + + If neither partition_rows nor partition_bytes is provided, a default + partition size of approximately 1 GiB is used. + + Parameters + ---------- + partition_rows : int or None, default None + The desired partition size maximum, in number of rows. + partition_bytes : int or None, default None + The desired partition size maximum, in bytes. + + Returns + ------- + tuple[int or None, int or None] + The validated partition_rows and partition_bytes. + + Raises + ------ + ValueError + If both partition_rows and partition_bytes are specified. + """ self.df_total_memory = self.dataframe.memory_usage(deep=True).sum() if self.df_total_memory > (1 << 30) or len(self.dataframe) > 1_000_000: warnings.warn( @@ -139,19 +191,13 @@ def _calculate_threshold(self, partition_size: int | None = None, threshold: int "Consider using hats-import: https://hats-import.readthedocs.io/", RuntimeWarning, ) - if threshold is not None and partition_size is not None: - raise ValueError("Specify only one: threshold or partition_size") - if threshold is None: - if partition_size is not None: - # Round the number of partitions to the next integer, otherwise the - # number of pixels per partition may exceed the threshold - num_partitions = math.ceil(len(self.dataframe) / partition_size) - threshold = len(self.dataframe) // num_partitions - else: - # Each partition in memory will be of roughly 1Gib - partition_memory = self.df_total_memory / len(self.dataframe) - threshold = math.ceil((1 << 30) / partition_memory) - return threshold + + if partition_rows is not None and partition_bytes is not None: + raise ValueError("Specify only one: partition_rows or partition_bytes") + if partition_rows is None and partition_bytes is None: + # Default to 1 GiB partitions + partition_bytes = 1 << 30 + return partition_rows, partition_bytes def _create_catalog_info( self, @@ -238,18 +284,38 @@ def _compute_pixel_list(self) -> list[HealpixPixel]: list[HealpixPixel] HEALPix pixels for the final partitioning. """ - raw_histogram = generate_histogram( - self.dataframe, - highest_order=self.highest_order, - ra_column=self.catalog_info.ra_column, - dec_column=self.catalog_info.dec_column, - ) + # Generate histograms. + if self.partition_rows is not None: + row_count_histogram = generate_histogram( + self.dataframe, + highest_order=self.highest_order, + ra_column=self.catalog_info.ra_column, + dec_column=self.catalog_info.dec_column, + ) + mem_size_histogram = None + else: + row_mem_sizes = get_mem_size_per_row(self.dataframe) + mapped_pixels = hp.radec2pix( + self.highest_order, + self.dataframe[self.catalog_info.ra_column].values, + self.dataframe[self.catalog_info.dec_column].values, + ) + (row_count_sparse_histo, mem_size_sparse_histo) = supplemental_count_histogram( + mapped_pixels, + row_mem_sizes, + highest_order=self.highest_order, + ) + row_count_histogram = row_count_sparse_histo.to_array() + mem_size_histogram = mem_size_sparse_histo.to_array() + + # Generate alignment. alignment = hc.pixel_math.generate_alignment( - raw_histogram, + row_count_histogram, highest_order=self.highest_order, lowest_order=self.lowest_order, - threshold=self.threshold, + threshold=(self.partition_rows if self.partition_rows is not None else self.partition_bytes), drop_empty_siblings=self.drop_empty_siblings, + mem_size_histogram=mem_size_histogram, ) pixel_list = list({HealpixPixel(tup[0], tup[1]) for tup in alignment if not tup is None}) return list(np.array(pixel_list)[get_pixel_argsort(pixel_list)]) diff --git a/src/lsdb/loaders/dataframe/from_dataframe.py b/src/lsdb/loaders/dataframe/from_dataframe.py index a9fa5134b..876e94d86 100644 --- a/src/lsdb/loaders/dataframe/from_dataframe.py +++ b/src/lsdb/loaders/dataframe/from_dataframe.py @@ -17,8 +17,8 @@ def from_dataframe( lowest_order: int = 0, highest_order: int = 7, drop_empty_siblings: bool = True, - partition_size: int | None = None, - threshold: int | None = None, + partition_rows: int | None = None, + partition_bytes: int | None = None, margin_order: int = -1, margin_threshold: float | None = 5.0, should_generate_moc: bool = True, @@ -50,10 +50,12 @@ def from_dataframe( drop_empty_siblings : bool, default True When determining final partitionining, if 3 of 4 pixels are empty, keep only the non-empty pixel - partition_size : int or None, default None - The desired partition size, in number of rows. - threshold : int or None, default None - The maximum number of data points per pixel. + partition_rows : int or None, default None + The desired partition size, in number of rows. Only one of + `partition_rows` or `partition_bytes` should be specified. + partition_bytes : int or None, default None + The desired partition size, in bytes. Only one of + `partition_rows` or `partition_bytes` should be specified. margin_order : int, default -1 The order at which to generate the margin cache. margin_threshold : float or None, default 5 @@ -91,8 +93,8 @@ def from_dataframe( lowest_order=lowest_order, highest_order=highest_order, drop_empty_siblings=drop_empty_siblings, - partition_size=partition_size, - threshold=threshold, + partition_rows=partition_rows, + partition_bytes=partition_bytes, should_generate_moc=should_generate_moc, moc_max_order=moc_max_order, use_pyarrow_types=use_pyarrow_types, diff --git a/tests/lsdb/catalog/test_catalog.py b/tests/lsdb/catalog/test_catalog.py index 70328010c..1adb9a743 100644 --- a/tests/lsdb/catalog/test_catalog.py +++ b/tests/lsdb/catalog/test_catalog.py @@ -921,7 +921,7 @@ def divme(df, _=None): return 1 / df["a"] # Force every row into a separate partition - nfc = lsdb.from_dataframe(nf, ra_column="a", dec_column="b", partition_size=1) + nfc = lsdb.from_dataframe(nf, ra_column="a", dec_column="b", partition_rows=1) with pytest.raises(RuntimeError, match=r"function divme to partition 3: Not so fast"): nfc.map_partitions(divme, include_pixel=False).compute() diff --git a/tests/lsdb/io/test_to_hats.py b/tests/lsdb/io/test_to_hats.py index 0770103c2..b12aadf3e 100644 --- a/tests/lsdb/io/test_to_hats.py +++ b/tests/lsdb/io/test_to_hats.py @@ -163,7 +163,7 @@ def test_save_catalog_point_map(small_sky_order1_df, tmp_path): catalog_name="small_sky_order1", lowest_order=6, highest_order=8, - threshold=500, + partition_rows=500, ) small_sky_order1_catalog.write_catalog( diff --git a/tests/lsdb/loaders/dataframe/test_from_dataframe.py b/tests/lsdb/loaders/dataframe/test_from_dataframe.py index 0f88fec70..183c1453e 100644 --- a/tests/lsdb/loaders/dataframe/test_from_dataframe.py +++ b/tests/lsdb/loaders/dataframe/test_from_dataframe.py @@ -28,7 +28,7 @@ def get_catalog_kwargs(catalog, **kwargs): "catalog_type": catalog_info.catalog_type, "lowest_order": 0, "highest_order": 5, - "threshold": 50, + "partition_rows": 50, **kwargs, } return kwargs @@ -51,8 +51,8 @@ def test_from_dataframe(small_sky_order1_df, small_sky_order1_catalog, helpers): assert ( catalog.hc_structure.catalog_info.hats_builder == f"lsdb v{version('lsdb')}, hats v{version('hats')}" ) - # The pixel threshold was specified properly - assert catalog.hc_structure.catalog_info.hats_max_rows == 50 + # The partition_rows threshold was specified properly + assert catalog.hc_structure.catalog_info.hats_max_rows <= 50 # Index is set to spatial index assert catalog._ddf.index.name == SPATIAL_INDEX_COLUMN # Dataframes have the same data (column data types may differ) @@ -81,12 +81,26 @@ def test_from_dataframe_catalog_of_invalid_type(small_sky_order1_df, small_sky_o small_sky_order1_df.reset_index(drop=True, inplace=True) -def test_from_dataframe_when_threshold_and_partition_size_specified( - small_sky_order1_df, small_sky_order1_catalog -): - """Tests that specifying simultaneously threshold and partition_size is invalid""" - kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_size=10, threshold=10_000) - with pytest.raises(ValueError, match="Specify only one: threshold or partition_size"): +def test_from_dataframe_invalid_partitioning_parameters(small_sky_order1_df, small_sky_order1_catalog): + """Tests that deprecated and conflicting partitioning parameters raise exceptions""" + # Fail when specifying both partition_rows and partition_bytes + kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_rows=10, partition_bytes=10_000) + with pytest.raises(ValueError, match="Specify only one:"): + lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) + + # Fail when using deprecated threshold parameter + kwargs = get_catalog_kwargs(small_sky_order1_catalog, threshold=10_000) + with pytest.raises(ValueError, match="is deprecated"): + lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) + + # Fail when user specifies hats_max_rows or hats_max_bytes in kwargs + kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_rows=10, hats_max_rows=10) + with pytest.raises(ValueError, match="hats_max_rows should not be provided in kwargs"): + lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) + kwargs = get_catalog_kwargs( + small_sky_order1_catalog, partition_rows=None, partition_bytes=10_000, hats_max_bytes=10_000 + ) + with pytest.raises(ValueError, match="hats_max_bytes should not be provided in kwargs"): lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) @@ -132,27 +146,37 @@ def test_from_dataframe_with_non_default_ra_dec_columns(small_sky_order1_df, sma lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) -def test_partitions_obey_partition_size(small_sky_order1_df, small_sky_order1_catalog): +def test_partitions_obey_partition_rows(small_sky_order1_df, small_sky_order1_catalog): """Tests that partitions are limited by the specified size""" # Use partitions with 10 rows - partition_size = 10 + partition_rows = 10 # Read CSV file for the small sky order 1 catalog - kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_size=partition_size, threshold=None) + kwargs = get_catalog_kwargs( + small_sky_order1_catalog, + partition_rows=partition_rows, + partition_bytes=None, + ) catalog = lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) # Calculate size of dataframe per partition - partition_sizes = [len(partition_df) for partition_df in catalog._ddf.partitions] - assert all(size <= partition_size for size in partition_sizes) + partition_rowss = [len(partition_df) for partition_df in catalog._ddf.partitions] + assert all(size <= partition_rows for size in partition_rowss) -def test_partitions_obey_threshold(small_sky_order1_df, small_sky_order1_catalog): - """Tests that partitions are limited by the specified threshold""" - threshold = 50 +def test_partitions_obey_partition_bytes(small_sky_order1_df, small_sky_order1_catalog): + """Tests that partitions are limited by the specified size in bytes""" + # Use partitions with approximately 1 kB + partition_bytes = 1 << 10 # 1 kB # Read CSV file for the small sky order 1 catalog - kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_size=None, threshold=threshold) + kwargs = get_catalog_kwargs( + small_sky_order1_catalog, + partition_rows=None, + partition_bytes=partition_bytes, + ) catalog = lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) - # Calculate number of pixels per partition - num_partition_pixels = [len(partition_df.compute().index) for partition_df in catalog._ddf.partitions] - assert all(num_pixels <= threshold for num_pixels in num_partition_pixels) + # Calculate size of dataframe per partition + for partition_df in catalog._ddf.partitions: + partition_memory = partition_df.memory_usage(deep=True).sum().compute() + assert partition_memory <= partition_bytes def test_from_dataframe_large_input(small_sky_order1_catalog, helpers): @@ -186,16 +210,16 @@ def test_from_dataframe_large_input(small_sky_order1_catalog, helpers): helpers.assert_divisions_are_correct(catalog) -def test_partitions_obey_default_threshold_when_no_arguments_specified( +def test_partitions_obey_default_partition_rows_when_no_arguments_specified( small_sky_order1_df, small_sky_order1_catalog ): - """Tests that partitions are limited by the default threshold - when no partition size or threshold is specified""" + """Tests that partitions are limited by the default partition size + when no partition_rows or partition_bytes is specified""" df_total_memory = small_sky_order1_df.memory_usage(deep=True).sum() partition_memory = df_total_memory / len(small_sky_order1_df) default_threshold = math.ceil((1 << 30) / partition_memory) # Read CSV file for the small sky order 1 catalog - kwargs = get_catalog_kwargs(small_sky_order1_catalog, threshold=None, partition_size=None) + kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_rows=None, partition_bytes=None) catalog = lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) # Calculate number of pixels per partition num_partition_pixels = [len(partition_df.compute().index) for partition_df in catalog._ddf.partitions] @@ -211,7 +235,7 @@ def test_catalog_pixels_nested_ordering(small_sky_source_df): catalog_type="source", lowest_order=0, highest_order=2, - threshold=3_000, + partition_rows=3_000, margin_threshold=None, ra_column="source_ra", dec_column="source_dec", @@ -237,7 +261,7 @@ def test_from_dataframe_small_sky_source_with_margins( dec_column="source_dec", lowest_order=0, highest_order=2, - threshold=3000, + partition_rows=3000, margin_threshold=180, margin_order=8, **kwargs, @@ -275,7 +299,7 @@ def test_from_dataframe_small_sky_source_with_margins( def test_from_dataframe_margin_threshold_from_order(small_sky_source_df, helpers): - # By default, the threshold is set to 5 arcsec, triggering a warning + # By default, the margin threshold is set to 5 arcsec, triggering a warning with pytest.warns(RuntimeWarning, match="Ignoring margin_threshold"): catalog = lsdb.from_dataframe( small_sky_source_df, @@ -283,7 +307,7 @@ def test_from_dataframe_margin_threshold_from_order(small_sky_source_df, helpers dec_column="source_dec", lowest_order=0, highest_order=2, - threshold=3000, + partition_rows=3000, margin_order=3, ) assert len(catalog.margin.get_healpix_pixels()) == 19 @@ -322,7 +346,7 @@ def test_from_dataframe_margin_is_empty(small_sky_order1_df, helpers): catalog_name="small_sky_order1", catalog_type="object", highest_order=5, - threshold=100, + partition_rows=100, ) assert len(catalog.margin.get_healpix_pixels()) == 0 assert catalog.margin._ddf_pixel_map == {} @@ -339,7 +363,7 @@ def test_from_dataframe_margin_threshold_zero(small_sky_order1_df, helpers): catalog_name="small_sky_order1", catalog_type="object", highest_order=5, - threshold=100, + partition_rows=100, margin_threshold=0, ) assert len(catalog.margin.get_healpix_pixels()) == 0