Skip to content

Commit ae8a535

Browse files
authored
[FSTORE-1906] Improve online schema validation (#728)
1 parent 3026272 commit ae8a535

File tree

6 files changed

+298
-150
lines changed

6 files changed

+298
-150
lines changed

python/hsfs/core/feature_group_engine.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,11 @@ def save(
107107
)
108108

109109
if (
110-
feature_group.online_enabled
111-
and not feature_group.embedding_index
112-
and validation_options.get("online_schema_validation", True)
110+
not validation_options
111+
or (
112+
validation_options.get("online_schema_validation", True) # for backwards compatibility
113+
and validation_options.get("schema_validation", True)
114+
)
113115
):
114116
# validate df schema
115117
dataframe_features = DataFrameValidator().validate_schema(
@@ -190,9 +192,11 @@ def insert(
190192
)
191193

192194
if (
193-
feature_group.online_enabled
194-
and not feature_group.embedding_index
195-
and validation_options.get("online_schema_validation", True)
195+
not validation_options
196+
or (
197+
validation_options.get("online_schema_validation", True) # for backwards compatibility
198+
and validation_options.get("schema_validation", True)
199+
)
196200
):
197201
# validate df schema
198202
dataframe_features = DataFrameValidator().validate_schema(

python/hsfs/core/schema_validation.py

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,6 @@ def _raise_validation_error(self, errors):
6060

6161
def validate_schema(self, feature_group, df, df_features):
6262
"""Common validation rules"""
63-
if feature_group.online_enabled is False:
64-
logger.warning("Feature group is not online enabled. Skipping validation")
65-
return df_features
66-
if feature_group._embedding_index is not None:
67-
logger.warning("Feature group is embedding type. Skipping validation")
68-
return df_features
69-
7063
validator = self.get_validator(df)
7164
if validator is None:
7265
# If no validator is found for this type, skip validation and return df_features
@@ -251,33 +244,57 @@ def _validate_df_specifics(self, feature_group, df):
251244
is_pk_null = False
252245
is_string_length_exceeded = False
253246

254-
# Check for null values in primary key columns
255-
for pk in feature_group.primary_key:
256-
if df.filter(df[pk].isNull()).count() > 0:
247+
# Collect string columns and primary key columns
248+
string_cols = [
249+
f.name for f in df.schema.fields if isinstance(f.dataType, StringType)
250+
]
251+
pk_cols = list(feature_group.primary_key) if feature_group.primary_key else []
252+
253+
# Build a single aggregation expression list:
254+
# - sum(when(col.isNull(),1).otherwise(0)) for pk null counts
255+
# - max(length(col)) for string max lengths
256+
agg_exprs = []
257+
for pk in pk_cols:
258+
agg_exprs.append(
259+
sf.sum(sf.when(sf.col(pk).isNull(), 1).otherwise(0)).alias(
260+
f"__nulls_{pk}"
261+
)
262+
)
263+
for col in string_cols:
264+
agg_exprs.append(sf.max(sf.length(sf.col(col))).alias(f"__maxlen_{col}"))
265+
266+
# If there is nothing to compute, return early
267+
if not agg_exprs:
268+
return errors, column_lengths, is_pk_null, is_string_length_exceeded
269+
270+
# Execute a single aggregation job
271+
agg_row = df.agg(*agg_exprs).collect()[0]
272+
273+
# Evaluate primary key nulls
274+
for pk in pk_cols:
275+
null_count = agg_row[f"__nulls_{pk}"]
276+
if null_count is not None and int(null_count) > 0:
257277
errors[pk] = f"Primary key column {pk} contains null values."
258278
is_pk_null = True
259279

260-
# Check string lengths for string columns
261-
for field in df.schema.fields:
262-
if isinstance(field.dataType, StringType):
263-
col = field.name
264-
# Compute max length - PySpark specific way
265-
currentmax_row = df.select(sf.max(sf.length(col))).collect()[0][0]
266-
currentmax = 0 if currentmax_row is None else currentmax_row
267-
268-
col_max_len = (
269-
self.get_online_varchar_length(
270-
self.get_feature_from_list(col, feature_group.features)
271-
)
272-
if feature_group.features
273-
else 100
280+
# Evaluate string length violations
281+
for col in string_cols:
282+
currentmax = agg_row[f"__maxlen_{col}"]
283+
currentmax = 0 if currentmax is None else int(currentmax)
284+
285+
col_max_len = (
286+
self.get_online_varchar_length(
287+
self.get_feature_from_list(col, feature_group.features)
274288
)
289+
if feature_group.features
290+
else 100
291+
)
275292

276-
if col_max_len is not None and currentmax > col_max_len:
277-
errors[col] = (
278-
f"String length exceeded. Column {col} has string values longer than maximum column limit of {col_max_len} characters."
279-
)
280-
column_lengths[col] = currentmax
281-
is_string_length_exceeded = True
293+
if col_max_len is not None and currentmax > col_max_len:
294+
errors[col] = (
295+
f"String length exceeded. Column {col} has string values longer than maximum column limit of {col_max_len} characters."
296+
)
297+
column_lengths[col] = currentmax
298+
is_string_length_exceeded = True
282299

283300
return errors, column_lengths, is_pk_null, is_string_length_exceeded

python/hsfs/feature_group.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2911,7 +2911,7 @@ def save(
29112911
* key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion.
29122912
* key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks.
29132913
* key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations.
2914-
* key `online_schema_validation` boolean value, set to `True` to validate the schema for online ingestion.
2914+
* key `schema_validation` boolean value, set to `True` to validate the schema.
29152915
wait: Wait for job and online ingestion to finish before returning, defaults to `False`.
29162916
Shortcut for write_options `{"wait_for_job": False, "wait_for_online_ingestion": False}`.
29172917
@@ -3120,7 +3120,7 @@ def insert(
31203120
* key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations.
31213121
* key `fetch_expectation_suite` a boolean value, by default `True`, to control whether the expectation
31223122
suite of the feature group should be fetched before every insert.
3123-
* key `online_schema_validation` boolean value, set to `True` to validate the schema for online ingestion.
3123+
* key `schema_validation` boolean value, set to `True` to validate the schema.
31243124
wait: Wait for job and online ingestion to finish before returning, defaults to `False`.
31253125
Shortcut for write_options `{"wait_for_job": False, "wait_for_online_ingestion": False}`.
31263126
transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime.

python/tests/core/test_feature_group_engine.py

Lines changed: 71 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,24 @@ def test_save_ge_report(self, mocker):
148148
# Assert
149149
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 0
150150

151-
def test_insert(self, mocker):
151+
@pytest.mark.parametrize(
152+
"online_enabled,validation_options,should_validate_schema",
153+
[
154+
# Online enabled
155+
(True, None, True),
156+
(True, {}, True),
157+
(True, {"online_schema_validation": False}, False),
158+
(True, {"schema_validation": False}, False),
159+
# Not enabled
160+
(False, None, True),
161+
(False, {}, True),
162+
(False, {"online_schema_validation": False}, False),
163+
(False, {"schema_validation": False}, False),
164+
],
165+
)
166+
def test_insert(
167+
self, online_enabled, validation_options, should_validate_schema, mocker
168+
):
152169
# Arrange
153170
feature_store_id = 99
154171

@@ -162,6 +179,9 @@ def test_insert(self, mocker):
162179
)
163180
mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")
164181
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
182+
mock_validate_schema = mocker.patch(
183+
"hsfs.core.schema_validation.DataFrameValidator.validate_schema"
184+
)
165185

166186
fg_engine = feature_group_engine.FeatureGroupEngine(
167187
feature_store_id=feature_store_id
@@ -174,6 +194,7 @@ def test_insert(self, mocker):
174194
primary_key=[],
175195
foreign_key=[],
176196
partition_key=[],
197+
online_enabled=online_enabled,
177198
)
178199

179200
# Act
@@ -184,11 +205,59 @@ def test_insert(self, mocker):
184205
operation=None,
185206
storage=None,
186207
write_options=None,
208+
validation_options=validation_options,
187209
)
188210

189211
# Assert
190-
assert mock_fg_api.return_value.delete_content.call_count == 0
191212
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1
213+
assert mock_fg_api.return_value.delete_content.call_count == 0
214+
assert mock_validate_schema.called == should_validate_schema
215+
216+
def test_insert_storage(self, mocker):
217+
# Arrange
218+
feature_store_id = 99
219+
220+
mocker.patch("hsfs.engine.get_type")
221+
mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance")
222+
mocker.patch(
223+
"hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
224+
)
225+
mocker.patch(
226+
"hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility"
227+
)
228+
mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")
229+
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
230+
231+
fg_engine = feature_group_engine.FeatureGroupEngine(
232+
feature_store_id=feature_store_id
233+
)
234+
235+
fg = feature_group.FeatureGroup(
236+
name="test",
237+
version=1,
238+
featurestore_id=feature_store_id,
239+
primary_key=[],
240+
foreign_key=[],
241+
partition_key=[],
242+
)
243+
244+
# Act
245+
with pytest.raises(exceptions.FeatureStoreException) as e_info:
246+
fg_engine.insert(
247+
feature_group=fg,
248+
feature_dataframe=None,
249+
overwrite=None,
250+
operation=None,
251+
storage="online",
252+
write_options=None,
253+
)
254+
255+
# Assert
256+
assert mock_fg_api.return_value.delete_content.call_count == 0
257+
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 0
258+
assert (
259+
str(e_info.value) == "Online storage is not enabled for this feature group."
260+
)
192261

193262
def test_insert_transformation_functions(self, mocker):
194263
# Arrange
@@ -343,52 +412,6 @@ def test_insert_ge_report(self, mocker):
343412
assert mock_fg_api.return_value.delete_content.call_count == 0
344413
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 0
345414

346-
def test_insert_storage(self, mocker):
347-
# Arrange
348-
feature_store_id = 99
349-
350-
mocker.patch("hsfs.engine.get_type")
351-
mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance")
352-
mocker.patch(
353-
"hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
354-
)
355-
mocker.patch(
356-
"hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility"
357-
)
358-
mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")
359-
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
360-
361-
fg_engine = feature_group_engine.FeatureGroupEngine(
362-
feature_store_id=feature_store_id
363-
)
364-
365-
fg = feature_group.FeatureGroup(
366-
name="test",
367-
version=1,
368-
featurestore_id=feature_store_id,
369-
primary_key=[],
370-
foreign_key=[],
371-
partition_key=[],
372-
)
373-
374-
# Act
375-
with pytest.raises(exceptions.FeatureStoreException) as e_info:
376-
fg_engine.insert(
377-
feature_group=fg,
378-
feature_dataframe=None,
379-
overwrite=None,
380-
operation=None,
381-
storage="online",
382-
write_options=None,
383-
)
384-
385-
# Assert
386-
assert mock_fg_api.return_value.delete_content.call_count == 0
387-
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 0
388-
assert (
389-
str(e_info.value) == "Online storage is not enabled for this feature group."
390-
)
391-
392415
def test_insert_overwrite(self, mocker):
393416
# Arrange
394417
feature_store_id = 99

0 commit comments

Comments
 (0)