Skip to content

Commit 92a5608

Browse files
authored
[FSTORE-1863] Delta insert triggers FeatureStoreException: 'numTargetRowsInserted' (#665) (#672)
1 parent 324320a commit 92a5608

File tree

2 files changed

+274
-25
lines changed

2 files changed

+274
-25
lines changed

python/hsfs/core/delta_engine.py

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -170,42 +170,67 @@ def _generate_merge_query(self, source_alias, updates_alias):
170170

171171
@staticmethod
172172
def _get_last_commit_metadata(spark_context, base_path):
173+
"""
174+
Retrieve oldest and last data-changing commits (MERGE/WRITE) from a Delta table.
175+
Uses shared filtering logic for both Spark and delta-rs.
176+
"""
177+
data_ops = ["MERGE", "WRITE"]
178+
179+
# --- Get commit history ---
173180
fg_source_table = DeltaTable.forPath(spark_context, base_path)
181+
history = fg_source_table.history()
182+
history_records = [r.asDict() for r in history.collect()]
183+
184+
if not history_records:
185+
return None
186+
187+
# --- Shared logic below ---
188+
filtered = [c for c in history_records if c.get("operation") in data_ops]
189+
if not filtered:
190+
return None
191+
192+
# oldest = smallest version, latest = largest version
193+
oldest_commit = min(filtered, key=lambda c: c["version"])
194+
last_commit = max(filtered, key=lambda c: c["version"])
195+
196+
return DeltaEngine._get_delta_feature_group_commit(last_commit, oldest_commit)
174197

175-
# Get info about the latest commit
176-
last_commit = fg_source_table.history(1).first().asDict()
177-
version = last_commit["version"]
198+
@staticmethod
199+
def _get_delta_feature_group_commit(last_commit, oldest_commit):
200+
# Extract info about the latest commit
201+
operation = last_commit["operation"]
178202
commit_timestamp = util.convert_event_time_to_timestamp(
179203
last_commit["timestamp"]
180204
)
181205
commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp)
182206
operation_metrics = last_commit["operationMetrics"]
183207

184-
# Get info about the oldest remaining commit
185-
oldest_commit = fg_source_table.history().orderBy("version").first().asDict()
208+
# Extract info about the oldest remaining commit
186209
oldest_commit_timestamp = util.convert_event_time_to_timestamp(
187210
oldest_commit["timestamp"]
188211
)
189212

190-
if version == 0:
191-
fg_commit = feature_group_commit.FeatureGroupCommit(
192-
commitid=None,
193-
commit_date_string=commit_date_string,
194-
commit_time=commit_timestamp,
195-
rows_inserted=operation_metrics["numOutputRows"],
196-
rows_updated=0,
197-
rows_deleted=0,
198-
last_active_commit_time=oldest_commit_timestamp,
199-
)
200-
else:
201-
fg_commit = feature_group_commit.FeatureGroupCommit(
202-
commitid=None,
203-
commit_date_string=commit_date_string,
204-
commit_time=commit_timestamp,
205-
rows_inserted=operation_metrics["numTargetRowsInserted"],
206-
rows_updated=operation_metrics["numTargetRowsUpdated"],
207-
rows_deleted=operation_metrics["numTargetRowsDeleted"],
208-
last_active_commit_time=oldest_commit_timestamp,
209-
)
213+
# Default all to zero
214+
rows_inserted = 0
215+
rows_updated = 0
216+
rows_deleted = 0
217+
218+
# Depending on operation, set the relevant metrics
219+
if operation == "WRITE":
220+
rows_inserted = operation_metrics.get("numOutputRows", 0)
221+
elif operation == "MERGE":
222+
rows_inserted = operation_metrics.get("numTargetRowsInserted", 0)
223+
rows_updated = operation_metrics.get("numTargetRowsUpdated", 0)
224+
rows_deleted = operation_metrics.get("numTargetRowsDeleted", 0)
225+
226+
fg_commit = feature_group_commit.FeatureGroupCommit(
227+
commitid=None,
228+
commit_date_string=commit_date_string,
229+
commit_time=commit_timestamp,
230+
rows_inserted=rows_inserted,
231+
rows_updated=rows_updated,
232+
rows_deleted=rows_deleted,
233+
last_active_commit_time=oldest_commit_timestamp,
234+
)
210235

211236
return fg_commit
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
#
2+
# Copyright 2025 Hopsworks AB
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from hsfs.core.delta_engine import DeltaEngine
18+
from hsfs.feature_group_commit import FeatureGroupCommit
19+
20+
21+
class TestDeltaEngine:
22+
23+
def test_get_last_commit_metadata_spark(self, mocker):
24+
# Arrange
25+
mock_history_data = [
26+
{"version": 1, "operation": "WRITE", "timestamp": "2024-01-01T00:00:00Z"},
27+
{"version": 2, "operation": "MERGE", "timestamp": "2024-01-02T00:00:00Z"},
28+
{"version": 3, "operation": "OPTIMIZE", "timestamp": "2024-01-03T00:00:00Z"},
29+
]
30+
31+
# Create fake Rows with asDict()
32+
mock_rows = [mocker.MagicMock(asDict=lambda row=row: row) for row in mock_history_data]
33+
34+
# Mock Spark DataFrame
35+
mock_spark_df = mocker.MagicMock()
36+
mock_spark_df.collect.return_value = mock_rows
37+
38+
# Mock DeltaTable
39+
mock_delta_table = mocker.MagicMock()
40+
mock_delta_table.history.return_value = mock_spark_df
41+
42+
mocker_get_delta_feature_group_commit = mocker.patch("hsfs.core.delta_engine.DeltaEngine._get_delta_feature_group_commit", return_value="result")
43+
44+
# Patch DeltaTable
45+
mocker.patch("delta.tables.DeltaTable.forPath", return_value=mock_delta_table)
46+
47+
# Act
48+
result = DeltaEngine._get_last_commit_metadata(mocker.MagicMock(), "s3://some/path")
49+
50+
# Assert
51+
assert result == "result"
52+
mocker_get_delta_feature_group_commit.assert_called_once()
53+
mocker_get_delta_feature_group_commit.assert_called_once_with(mock_history_data[1], mock_history_data[0])
54+
55+
def test_get_last_commit_metadata_empty_history(self, mocker):
56+
# Arrange
57+
mock_history_data = []
58+
59+
# Create fake Rows with asDict()
60+
mock_rows = [mocker.MagicMock(asDict=lambda row=row: row) for row in mock_history_data]
61+
62+
# Mock Spark DataFrame
63+
mock_spark_df = mocker.MagicMock()
64+
mock_spark_df.collect.return_value = mock_rows
65+
66+
# Mock DeltaTable
67+
mock_delta_table = mocker.MagicMock()
68+
mock_delta_table.history.return_value = mock_spark_df
69+
70+
mocker_get_delta_feature_group_commit = mocker.patch("hsfs.core.delta_engine.DeltaEngine._get_delta_feature_group_commit", return_value="result")
71+
72+
# Patch DeltaTable
73+
mocker.patch("delta.tables.DeltaTable.forPath", return_value=mock_delta_table)
74+
75+
# Act
76+
result = DeltaEngine._get_last_commit_metadata(None, "s3://some/path")
77+
78+
# Assert
79+
assert result is None
80+
mocker_get_delta_feature_group_commit.assert_not_called()
81+
82+
def test_get_last_commit_metadata_one_history_entry(self, mocker):
83+
# Arrange
84+
mock_history_data = [
85+
{"version": 1, "operation": "WRITE", "timestamp": "2024-01-01T00:00:00Z"},
86+
]
87+
88+
# Create fake Rows with asDict()
89+
mock_rows = [mocker.MagicMock(asDict=lambda row=row: row) for row in mock_history_data]
90+
91+
# Mock Spark DataFrame
92+
mock_spark_df = mocker.MagicMock()
93+
mock_spark_df.collect.return_value = mock_rows
94+
95+
# Mock DeltaTable
96+
mock_delta_table = mocker.MagicMock()
97+
mock_delta_table.history.return_value = mock_spark_df
98+
99+
mocker_get_delta_feature_group_commit = mocker.patch("hsfs.core.delta_engine.DeltaEngine._get_delta_feature_group_commit", return_value="result")
100+
101+
# Patch DeltaTable
102+
mocker.patch("delta.tables.DeltaTable.forPath", return_value=mock_delta_table)
103+
104+
# Act
105+
result = DeltaEngine._get_last_commit_metadata(None, "s3://some/path")
106+
107+
# Assert
108+
assert result == "result"
109+
mocker_get_delta_feature_group_commit.assert_called_once()
110+
mocker_get_delta_feature_group_commit.assert_called_once_with(mock_history_data[0], mock_history_data[0])
111+
112+
def test_get_last_commit_metadata_one_history_entry_optimize(self, mocker):
113+
# Arrange
114+
mock_history_data = [
115+
{"version": 1, "operation": "OPTIMIZE", "timestamp": "2024-01-01T00:00:00Z"},
116+
]
117+
118+
# Create fake Rows with asDict()
119+
mock_rows = [mocker.MagicMock(asDict=lambda row=row: row) for row in mock_history_data]
120+
121+
# Mock Spark DataFrame
122+
mock_spark_df = mocker.MagicMock()
123+
mock_spark_df.collect.return_value = mock_rows
124+
125+
# Mock DeltaTable
126+
mock_delta_table = mocker.MagicMock()
127+
mock_delta_table.history.return_value = mock_spark_df
128+
129+
mocker_get_delta_feature_group_commit = mocker.patch("hsfs.core.delta_engine.DeltaEngine._get_delta_feature_group_commit", return_value="result")
130+
131+
# Patch DeltaTable
132+
mocker.patch("delta.tables.DeltaTable.forPath", return_value=mock_delta_table)
133+
134+
# Act
135+
result = DeltaEngine._get_last_commit_metadata(None, "s3://some/path")
136+
137+
# Assert
138+
assert result is None
139+
mocker_get_delta_feature_group_commit.assert_not_called()
140+
141+
def test_get_delta_feature_group_commit_merge(self, mocker):
142+
# Arrange
143+
last_commit = {
144+
"operation": "MERGE",
145+
"timestamp": "2024-01-02T12:00:00Z",
146+
"operationMetrics": {
147+
"numTargetRowsInserted": 10,
148+
"numTargetRowsUpdated": 5,
149+
"numTargetRowsDeleted": 2,
150+
},
151+
}
152+
oldest_commit = {
153+
"timestamp": "2024-01-01T08:00:00Z",
154+
}
155+
156+
mocker.patch("hsfs.core.delta_engine.util.convert_event_time_to_timestamp", side_effect = lambda ts: ts)
157+
mocker.patch("hsfs.core.delta_engine.util.get_hudi_datestr_from_timestamp", side_effect = lambda ts: f"date-{ts}")
158+
159+
# Act
160+
fg_commit = DeltaEngine._get_delta_feature_group_commit(last_commit, oldest_commit)
161+
162+
# Assert
163+
assert isinstance(fg_commit, FeatureGroupCommit)
164+
assert fg_commit.commit_time == "2024-01-02T12:00:00Z"
165+
assert fg_commit.commit_date_string == "date-2024-01-02T12:00:00Z"
166+
assert fg_commit.rows_inserted == 10
167+
assert fg_commit.rows_updated == 5
168+
assert fg_commit.rows_deleted == 2
169+
assert fg_commit.last_active_commit_time == "2024-01-01T08:00:00Z"
170+
171+
def test_get_delta_feature_group_commit_write(self, mocker):
172+
# Arrange
173+
last_commit = {
174+
"operation": "WRITE",
175+
"timestamp": "2024-01-02T12:00:00Z",
176+
"operationMetrics": {
177+
"numOutputRows": 10
178+
},
179+
}
180+
oldest_commit = {
181+
"timestamp": "2024-01-01T08:00:00Z",
182+
}
183+
184+
mocker.patch("hsfs.core.delta_engine.util.convert_event_time_to_timestamp", side_effect = lambda ts: ts)
185+
mocker.patch("hsfs.core.delta_engine.util.get_hudi_datestr_from_timestamp", side_effect = lambda ts: f"date-{ts}")
186+
187+
# Act
188+
fg_commit = DeltaEngine._get_delta_feature_group_commit(last_commit, oldest_commit)
189+
190+
# Assert
191+
assert isinstance(fg_commit, FeatureGroupCommit)
192+
assert fg_commit.commit_time == "2024-01-02T12:00:00Z"
193+
assert fg_commit.commit_date_string == "date-2024-01-02T12:00:00Z"
194+
assert fg_commit.rows_inserted == 10
195+
assert fg_commit.rows_updated == 0
196+
assert fg_commit.rows_deleted == 0
197+
assert fg_commit.last_active_commit_time == "2024-01-01T08:00:00Z"
198+
199+
def test_get_delta_feature_group_commit_other(self, mocker):
200+
# Arrange
201+
last_commit = {
202+
"operation": "OPTIMIZE",
203+
"timestamp": "2024-01-02T12:00:00Z",
204+
"operationMetrics": {
205+
},
206+
}
207+
oldest_commit = {
208+
"timestamp": "2024-01-01T08:00:00Z",
209+
}
210+
211+
mocker.patch("hsfs.core.delta_engine.util.convert_event_time_to_timestamp", side_effect = lambda ts: ts)
212+
mocker.patch("hsfs.core.delta_engine.util.get_hudi_datestr_from_timestamp", side_effect = lambda ts: f"date-{ts}")
213+
214+
# Act
215+
fg_commit = DeltaEngine._get_delta_feature_group_commit(last_commit, oldest_commit)
216+
217+
# Assert
218+
assert isinstance(fg_commit, FeatureGroupCommit)
219+
assert fg_commit.commit_time == "2024-01-02T12:00:00Z"
220+
assert fg_commit.commit_date_string == "date-2024-01-02T12:00:00Z"
221+
assert fg_commit.rows_inserted == 0
222+
assert fg_commit.rows_updated == 0
223+
assert fg_commit.rows_deleted == 0
224+
assert fg_commit.last_active_commit_time == "2024-01-01T08:00:00Z"

0 commit comments

Comments
 (0)