Skip to content

Commit 3714254

Browse files
authored
Add memory-size partitioning to from_dataframe; change partition size params to partition_rows and partition_bytes (#1181)
Change from_dataframe arguments: * Rename threshold to partition_rows (a simple row_count maximum) * Add partition_bytes parameter (allows bytewise, memory size partitioning) * Remove partition_size (no more recalculating given number to estimate an average pixel size)
1 parent c745c93 commit 3714254

File tree

7 files changed

+167
-75
lines changed

7 files changed

+167
-75
lines changed

benchmarks/benchmarks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def time_save_big_catalog():
123123
"catalog_type": "object",
124124
"lowest_order": 6,
125125
"highest_order": 10,
126-
"threshold": 500,
126+
"partition_rows": 500,
127127
}
128128

129129
catalog = lsdb.from_dataframe(mock_partition_df, margin_threshold=None, **kwargs)

docs/tutorials/import_catalogs.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
" catalog_type=\"object\",\n",
116116
" lowest_order=2,\n",
117117
" highest_order=5,\n",
118-
" threshold=100,\n",
118+
" partition_rows=100,\n",
119119
")\n",
120120
"\n",
121121
"# Save it to disk in HATS format\n",

src/lsdb/loaders/dataframe/dataframe_catalog_loader.py

Lines changed: 98 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
from __future__ import annotations
22

3-
import math
43
import re
54
import warnings
65

76
import astropy.units as u
87
import hats as hc
8+
import hats.pixel_math.healpix_shim as hp
99
import nested_pandas as npd
1010
import numpy as np
1111
import pandas as pd
1212
import pyarrow as pa
1313
from hats.catalog import CatalogType, TableProperties
14+
from hats.io.size_estimates import get_mem_size_per_row
1415
from hats.pixel_math import HealpixPixel, generate_histogram
1516
from hats.pixel_math.healpix_pixel_function import get_pixel_argsort
17+
from hats.pixel_math.sparse_histogram import supplemental_count_histogram
1618
from hats.pixel_math.spatial_index import (
1719
SPATIAL_INDEX_COLUMN,
1820
SPATIAL_INDEX_ORDER,
@@ -44,8 +46,8 @@ def __init__(
4446
lowest_order: int = 0,
4547
highest_order: int = 7,
4648
drop_empty_siblings: bool = False,
47-
partition_size: int | None = None,
48-
threshold: int | None = None,
49+
partition_rows: int | None = None,
50+
partition_bytes: int | None = None,
4951
should_generate_moc: bool = True,
5052
moc_max_order: int = 10,
5153
use_pyarrow_types: bool = True,
@@ -71,10 +73,12 @@ def __init__(
7173
drop_empty_siblings : bool, default False
7274
When determining final partitionining, if 3 of 4 pixels are empty,
7375
keep only the non-empty pixel
74-
partition_size : int or None, default None
75-
The desired partition size, in number of rows.
76-
threshold : int or None, default None
77-
The maximum number of data points per pixel.
76+
partition_rows : int or None, default None
77+
The maximum partition size, in number of rows. If specified,
78+
'partition_bytes' must be None.
79+
partition_bytes : int or None, default None
80+
The maximum partition size, in bytes. If specified,
81+
'partition_rows' must be None.
7882
should_generate_moc : bool, default True
7983
Should we generate a MOC (multi-order coverage map) of the data.
8084
It can improve performance when joining/crossmatching to other hats-sharded datasets.
@@ -92,7 +96,7 @@ def __init__(
9296
self.lowest_order = lowest_order
9397
self.highest_order = highest_order
9498
self.drop_empty_siblings = drop_empty_siblings
95-
self.threshold = self._calculate_threshold(partition_size, threshold)
99+
self.partition_rows, self.partition_bytes = self._calculate_threshold(partition_rows, partition_bytes)
96100

97101
if ra_column is None:
98102
ra_column = self._find_column("ra")
@@ -103,11 +107,31 @@ def __init__(
103107
if dataframe[ra_column].isna().any() or dataframe[dec_column].isna().any():
104108
raise ValueError(f"NaN values found in {ra_column}/{dec_column} columns")
105109

110+
# Don't allow deprecated arguments.
111+
for deprecated_arg in ["threshold", "partition_size"]:
112+
if deprecated_arg in kwargs:
113+
raise ValueError(
114+
f"'{deprecated_arg}' is deprecated; use 'partition_rows' or 'partition_bytes' instead."
115+
)
116+
117+
# Don't allow users to specify hats_max_rows or hats_max_bytes directly.
118+
for invalid_arg in ["hats_max_rows", "hats_max_bytes"]:
119+
if invalid_arg in kwargs:
120+
raise ValueError(
121+
f"{invalid_arg} should not be provided in kwargs; "
122+
"use 'partition_rows' or 'partition_bytes' instead"
123+
)
124+
125+
# Set partitioning kwarg to pass to catalog info creation.
126+
if self.partition_rows is not None:
127+
kwargs = dict(kwargs, hats_max_rows=self.partition_rows)
128+
elif self.partition_bytes is not None:
129+
kwargs = dict(kwargs, hats_max_bytes=self.partition_bytes)
130+
106131
self.catalog_info = self._create_catalog_info(
107132
ra_column=ra_column,
108133
dec_column=dec_column,
109134
total_rows=len(self.dataframe),
110-
hats_max_rows=self.threshold,
111135
**kwargs,
112136
)
113137
self.should_generate_moc = should_generate_moc
@@ -130,28 +154,50 @@ def _find_column(self, search_term: str) -> str:
130154
raise ValueError(f"Found {n_matches} possible columns for {search_term}")
131155
return matches[0]
132156

133-
def _calculate_threshold(self, partition_size: int | None = None, threshold: int | None = None) -> int:
134-
"""Calculates the number of points per HEALPix pixel (threshold) for the desired partition size."""
157+
def _calculate_threshold(
158+
self,
159+
partition_rows: int | None = None,
160+
partition_bytes: int | None = None,
161+
) -> tuple[int | None, int | None]:
162+
"""Verifies partition thresholds and sets default if necessary.
163+
164+
If partition_rows is provided, no partition exceeds the maximum number of rows;
165+
if partition_bytes is provided, no partition exceeds the maximum memory size.
166+
167+
If neither partition_rows nor partition_bytes is provided, a default
168+
partition size of approximately 1 GiB is used.
169+
170+
Parameters
171+
----------
172+
partition_rows : int or None, default None
173+
The desired partition size maximum, in number of rows.
174+
partition_bytes : int or None, default None
175+
The desired partition size maximum, in bytes.
176+
177+
Returns
178+
-------
179+
tuple[int or None, int or None]
180+
The validated partition_rows and partition_bytes.
181+
182+
Raises
183+
------
184+
ValueError
185+
If both partition_rows and partition_bytes are specified.
186+
"""
135187
self.df_total_memory = self.dataframe.memory_usage(deep=True).sum()
136188
if self.df_total_memory > (1 << 30) or len(self.dataframe) > 1_000_000:
137189
warnings.warn(
138190
"from_dataframe is not intended for large datasets. "
139191
"Consider using hats-import: https://hats-import.readthedocs.io/",
140192
RuntimeWarning,
141193
)
142-
if threshold is not None and partition_size is not None:
143-
raise ValueError("Specify only one: threshold or partition_size")
144-
if threshold is None:
145-
if partition_size is not None:
146-
# Round the number of partitions to the next integer, otherwise the
147-
# number of pixels per partition may exceed the threshold
148-
num_partitions = math.ceil(len(self.dataframe) / partition_size)
149-
threshold = len(self.dataframe) // num_partitions
150-
else:
151-
# Each partition in memory will be of roughly 1Gib
152-
partition_memory = self.df_total_memory / len(self.dataframe)
153-
threshold = math.ceil((1 << 30) / partition_memory)
154-
return threshold
194+
195+
if partition_rows is not None and partition_bytes is not None:
196+
raise ValueError("Specify only one: partition_rows or partition_bytes")
197+
if partition_rows is None and partition_bytes is None:
198+
# Default to 1 GiB partitions
199+
partition_bytes = 1 << 30
200+
return partition_rows, partition_bytes
155201

156202
def _create_catalog_info(
157203
self,
@@ -238,18 +284,38 @@ def _compute_pixel_list(self) -> list[HealpixPixel]:
238284
list[HealpixPixel]
239285
HEALPix pixels for the final partitioning.
240286
"""
241-
raw_histogram = generate_histogram(
242-
self.dataframe,
243-
highest_order=self.highest_order,
244-
ra_column=self.catalog_info.ra_column,
245-
dec_column=self.catalog_info.dec_column,
246-
)
287+
# Generate histograms.
288+
if self.partition_rows is not None:
289+
row_count_histogram = generate_histogram(
290+
self.dataframe,
291+
highest_order=self.highest_order,
292+
ra_column=self.catalog_info.ra_column,
293+
dec_column=self.catalog_info.dec_column,
294+
)
295+
mem_size_histogram = None
296+
else:
297+
row_mem_sizes = get_mem_size_per_row(self.dataframe)
298+
mapped_pixels = hp.radec2pix(
299+
self.highest_order,
300+
self.dataframe[self.catalog_info.ra_column].values,
301+
self.dataframe[self.catalog_info.dec_column].values,
302+
)
303+
(row_count_sparse_histo, mem_size_sparse_histo) = supplemental_count_histogram(
304+
mapped_pixels,
305+
row_mem_sizes,
306+
highest_order=self.highest_order,
307+
)
308+
row_count_histogram = row_count_sparse_histo.to_array()
309+
mem_size_histogram = mem_size_sparse_histo.to_array()
310+
311+
# Generate alignment.
247312
alignment = hc.pixel_math.generate_alignment(
248-
raw_histogram,
313+
row_count_histogram,
249314
highest_order=self.highest_order,
250315
lowest_order=self.lowest_order,
251-
threshold=self.threshold,
316+
threshold=(self.partition_rows if self.partition_rows is not None else self.partition_bytes),
252317
drop_empty_siblings=self.drop_empty_siblings,
318+
mem_size_histogram=mem_size_histogram,
253319
)
254320
pixel_list = list({HealpixPixel(tup[0], tup[1]) for tup in alignment if not tup is None})
255321
return list(np.array(pixel_list)[get_pixel_argsort(pixel_list)])

src/lsdb/loaders/dataframe/from_dataframe.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ def from_dataframe(
1717
lowest_order: int = 0,
1818
highest_order: int = 7,
1919
drop_empty_siblings: bool = True,
20-
partition_size: int | None = None,
21-
threshold: int | None = None,
20+
partition_rows: int | None = None,
21+
partition_bytes: int | None = None,
2222
margin_order: int = -1,
2323
margin_threshold: float | None = 5.0,
2424
should_generate_moc: bool = True,
@@ -50,10 +50,12 @@ def from_dataframe(
5050
drop_empty_siblings : bool, default True
5151
When determining final partitionining, if 3 of 4 pixels are empty,
5252
keep only the non-empty pixel
53-
partition_size : int or None, default None
54-
The desired partition size, in number of rows.
55-
threshold : int or None, default None
56-
The maximum number of data points per pixel.
53+
partition_rows : int or None, default None
54+
The desired partition size, in number of rows. Only one of
55+
`partition_rows` or `partition_bytes` should be specified.
56+
partition_bytes : int or None, default None
57+
The desired partition size, in bytes. Only one of
58+
`partition_rows` or `partition_bytes` should be specified.
5759
margin_order : int, default -1
5860
The order at which to generate the margin cache.
5961
margin_threshold : float or None, default 5
@@ -91,8 +93,8 @@ def from_dataframe(
9193
lowest_order=lowest_order,
9294
highest_order=highest_order,
9395
drop_empty_siblings=drop_empty_siblings,
94-
partition_size=partition_size,
95-
threshold=threshold,
96+
partition_rows=partition_rows,
97+
partition_bytes=partition_bytes,
9698
should_generate_moc=should_generate_moc,
9799
moc_max_order=moc_max_order,
98100
use_pyarrow_types=use_pyarrow_types,

tests/lsdb/catalog/test_catalog.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ def divme(df, _=None):
921921
return 1 / df["a"]
922922

923923
# Force every row into a separate partition
924-
nfc = lsdb.from_dataframe(nf, ra_column="a", dec_column="b", partition_size=1)
924+
nfc = lsdb.from_dataframe(nf, ra_column="a", dec_column="b", partition_rows=1)
925925

926926
with pytest.raises(RuntimeError, match=r"function divme to partition 3: Not so fast"):
927927
nfc.map_partitions(divme, include_pixel=False).compute()

tests/lsdb/io/test_to_hats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def test_save_catalog_point_map(small_sky_order1_df, tmp_path):
163163
catalog_name="small_sky_order1",
164164
lowest_order=6,
165165
highest_order=8,
166-
threshold=500,
166+
partition_rows=500,
167167
)
168168

169169
small_sky_order1_catalog.write_catalog(

0 commit comments

Comments
 (0)