Skip to content

Commit 24be64d

Browse files
committed
fixed spark unit tests
1 parent 1f50c23 commit 24be64d

File tree

2 files changed

+55
-57
lines changed

2 files changed

+55
-57
lines changed

python/hsfs/builtin_transformations.py

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
4848
If IQR is zero (constant feature), the function centers the data by the
4949
median without scaling to avoid division by zero.
5050
"""
51-
q1 = statistics.feature.percentiles[25]
52-
q2 = statistics.feature.percentiles[50]
53-
q3 = statistics.feature.percentiles[75]
51+
q1 = statistics.feature.percentiles[24]
52+
q2 = statistics.feature.percentiles[49]
53+
q3 = statistics.feature.percentiles[74]
5454
iqr = q3 - q1
5555

5656
scaled_feature = feature.astype("float64")
@@ -168,9 +168,9 @@ def equal_frequency_binner(
168168
- NaN inputs remain NaN.
169169
"""
170170
s = feature.astype("float64")
171-
q1 = statistics.feature.percentiles[25]
172-
q2 = statistics.feature.percentiles[50]
173-
q3 = statistics.feature.percentiles[75]
171+
q1 = statistics.feature.percentiles[24]
172+
q2 = statistics.feature.percentiles[49]
173+
q3 = statistics.feature.percentiles[74]
174174

175175
# Check if we have valid quartiles
176176
if any(pd.isna([q1, q2, q3])):
@@ -217,9 +217,9 @@ def quantile_binner(feature: pd.Series, statistics=feature_statistics) -> pd.Ser
217217

218218
# Use quartiles: 25th, 50th, 75th percentiles
219219
p = statistics.feature.percentiles
220-
q25 = p[25] # Q1
221-
q50 = p[50] # Q2 (median)
222-
q75 = p[75] # Q3
220+
q25 = p[24] # Q1
221+
q50 = p[49] # Q2 (median)
222+
q75 = p[74] # Q3
223223

224224
# Check if we have valid quartiles
225225
if any(pd.isna([q25, q50, q75])):
@@ -325,7 +325,7 @@ def rank_normalizer(feature: pd.Series, statistics=feature_statistics) -> pd.Ser
325325
return pd.Series(result, index=feature.index)
326326

327327

328-
@udf(float, mode="pandas")
328+
@udf(float, drop=["feature"], mode="pandas")
329329
def winsorize(
330330
feature: pd.Series, statistics=feature_statistics, context: dict | None = None
331331
) -> pd.Series:
@@ -341,46 +341,37 @@ def winsorize(
341341
numerical_feature = feature.astype("float64")
342342
percentiles = statistics.feature.percentiles
343343

344-
print("1")
345344
# Defaults: 1 and 99 percentiles
346345
p_low = 1
347346
p_high = 99
348347
if isinstance(context, dict):
349348
p_low = context.get("p_low", p_low)
350349
p_high = context.get("p_high", p_high)
351350

352-
print("2")
353351
try:
354352
li = int(round(float(p_low)))
355353
ui = int(round(float(p_high)))
356354
except Exception:
357355
li, ui = 1, 99
358356

359-
print("3")
360357
# Bound indices
361358
max_idx = len(percentiles) - 1
362359
li = max(0, min(max_idx, li))
363360
ui = max(0, min(max_idx, ui))
364361

365-
print("4")
366-
367362
# Ensure li < ui
368363
if li >= ui:
369364
li, ui = 1, min(99, max_idx)
370365

371366
lower = percentiles[li]
372367
upper = percentiles[ui]
373368

374-
print("5")
375369
# Invalid bounds → return unchanged
376370
if pd.isna(lower) or pd.isna(upper) or lower > upper:
377371
return numerical_feature
378372

379-
print("6")
380-
# Winsorize (no rows dropped)
381-
clipped = numerical_feature.where(numerical_feature >= lower, lower).where(
382-
numerical_feature <= upper, upper
383-
)
373+
# Winsorize (no rows dropped), preserving NaN values
374+
clipped = numerical_feature.clip(lower=lower, upper=upper)
384375

385376
return pd.Series(clipped, index=feature.index)
386377

python/tests/test_builtin_winsorize.py

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,30 @@
22

33
import pandas as pd
44
from hsfs import engine as hopsworks_engine
5+
from hsfs import transformation_function
56
from hsfs.builtin_transformations import winsorize
67
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
78
from hsfs.engine import python as python_engine
8-
from python.hsfs import transformation_function
9-
from python.hsfs.transformation_function import TransformationType
10-
11-
12-
def _set_percentiles(pcts):
13-
winsorize.transformation_statistics = [
14-
FeatureDescriptiveStatistics(feature_name="feature", percentiles=pcts)
15-
]
16-
winsorize.transformation_context = None # reset each test
17-
18-
19-
def _run_raw(series: pd.Series) -> pd.Series:
20-
"""Call the raw UDF function directly so input stays a Series."""
21-
fn = winsorize.get_udf(online=True)
22-
23-
return fn(series)
9+
from hsfs.transformation_function import TransformationType
2410

2511

2612
def test_winsorize_default_thresholds():
27-
engine = python_engine.Engine()
28-
hopsworks_engine.set_instance(engine=engine, engine_type="python")
29-
30-
percentiles = list(range(100)) # [0..99]
31-
_set_percentiles(percentiles)
32-
33-
s = pd.Series([0.0, 1.0, 50.0, 99.0, 120.0, math.nan])
13+
# Arrange
14+
df = pd.DataFrame(
15+
{
16+
"col_0": [0.0, 1.0, 50.0, 99.0, 120.0, math.nan],
17+
"other": ["a", "b", "c", "d", "e", "f"],
18+
}
19+
)
3420

3521
tf = transformation_function.TransformationFunction(
3622
hopsworks_udf=winsorize("col_0"),
3723
featurestore_id=1,
3824
transformation_type=TransformationType.MODEL_DEPENDENT,
3925
)
4026

41-
percentiles = [float(i) for i in range(100)]
27+
# Percentiles from 0th to 100th (101 values)
28+
percentiles = [float(i) for i in range(101)]
4229
tf.transformation_statistics = [
4330
FeatureDescriptiveStatistics(feature_name="col_0", percentiles=percentiles)
4431
]
@@ -47,35 +34,55 @@ def test_winsorize_default_thresholds():
4734
hopsworks_engine.set_instance(engine=engine, engine_type="python")
4835

4936
# Act
50-
out = engine._apply_transformation_function([tf], s)
51-
52-
out = _run_raw(s)
37+
result = engine._apply_transformation_function([tf], df)
5338

39+
# Assert - defaults clip at 1st and 99th percentiles (values 1 and 99)
40+
assert list(result.columns) == ["other", "winsorize_col_0_"]
5441
expected = pd.Series([1.0, 1.0, 50.0, 99.0, 99.0, math.nan])
42+
expected.name = "winsorize_col_0_"
5543

56-
for got, want in zip(out.tolist(), expected.tolist()):
44+
for got, want in zip(result["winsorize_col_0_"].tolist(), expected.tolist()):
5745
if math.isnan(want):
5846
assert math.isnan(got)
5947
else:
6048
assert got == want
6149

6250

6351
def test_winsorize_context_override():
64-
engine = python_engine.Engine()
65-
hopsworks_engine.set_instance(engine=engine, engine_type="python")
52+
# Arrange
53+
df = pd.DataFrame(
54+
{
55+
"col_0": [2.0, 5.0, 95.0, 96.0, math.nan],
56+
"other": ["a", "b", "c", "d", "e"],
57+
}
58+
)
6659

67-
percentiles = list(range(100))
68-
_set_percentiles(percentiles)
60+
tf = transformation_function.TransformationFunction(
61+
hopsworks_udf=winsorize("col_0"),
62+
featurestore_id=1,
63+
transformation_type=TransformationType.MODEL_DEPENDENT,
64+
)
6965

70-
s = pd.Series([2.0, 5.0, 95.0, 96.0, math.nan])
66+
# Percentiles from 0th to 100th (101 values)
67+
percentiles = [float(i) for i in range(101)]
68+
tf.transformation_statistics = [
69+
FeatureDescriptiveStatistics(feature_name="col_0", percentiles=percentiles)
70+
]
7171

72-
winsorize.transformation_context = {"p_low": 5, "p_high": 95}
72+
engine = python_engine.Engine()
73+
hopsworks_engine.set_instance(engine=engine, engine_type="python")
7374

74-
out = _run_raw(s)
75+
# Act - Override percentile thresholds via context parameter
76+
result = engine._apply_transformation_function(
77+
[tf], df, transformation_context={"p_low": 5, "p_high": 95}
78+
)
7579

80+
# Assert - clips at 5th and 95th percentiles (values 5 and 95)
81+
assert list(result.columns) == ["other", "winsorize_col_0_"]
7682
expected = pd.Series([5.0, 5.0, 95.0, 95.0, math.nan])
83+
expected.name = "winsorize_col_0_"
7784

78-
for got, want in zip(out.tolist(), expected.tolist()):
85+
for got, want in zip(result["winsorize_col_0_"].tolist(), expected.tolist()):
7986
if math.isnan(want):
8087
assert math.isnan(got)
8188
else:

0 commit comments

Comments
 (0)