Skip to content

Commit feec123

Browse files
sc
Signed-off-by: Praateek <[email protected]>
1 parent dbf9a52 commit feec123

File tree

2 files changed

+4
-5
lines changed

2 files changed

+4
-5
lines changed

nemo_curator/stages/deduplication/semantic/identify_duplicates.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ def process_batch(self, tasks: list[FileGroupTask]) -> list[FileGroupTask]:
8989

9090
all_files = [file for task in tasks for file in task.data]
9191
# Read using filters
92-
92+
# We read file by file since list[files] when files are remote urls can fail
93+
# See https://github.com/pandas-dev/pandas/issues/62922
9394
df: pd.DataFrame = pd.concat(
9495
[
9596
pd.read_parquet(

nemo_curator/stages/text/deduplication/removal.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ def process(self, task: DocumentBatch) -> DocumentBatch:
7676
# Filter the parquet files for IDs to remove within this range
7777
read_dupes_t0 = time.perf_counter()
7878

79-
# we use pq.read_table instead of pd.read_parquet since ids_to_remove_path is a directory
80-
# and it might error out when the directory is a cloud path
81-
removal_table = pd.read_parquet(
79+
removal_df = pd.read_parquet(
8280
self.ids_to_remove_path,
8381
filters=[(self.duplicate_id_field, ">=", min_id), (self.duplicate_id_field, "<=", max_id)],
8482
columns=[self.duplicate_id_field],
@@ -89,7 +87,7 @@ def process(self, task: DocumentBatch) -> DocumentBatch:
8987

9088
# Filter out documents with IDs in the removal set using pandas
9189
time_to_remove_t0 = time.perf_counter()
92-
removal_ids = set(removal_table[self.duplicate_id_field].tolist())
90+
removal_ids = set(removal_df[self.duplicate_id_field].tolist())
9391
df = df[~df[self.id_field].isin(removal_ids)]
9492
removal_ids_time = time.perf_counter() - time_to_remove_t0
9593
self._log_metrics(

0 commit comments

Comments
 (0)