Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 71 additions & 5 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

import math
from typing import Union

import pandas as pd
from hsfs.hopsworks_udf import udf
Expand All @@ -25,26 +26,78 @@


@udf(float, drop=["feature"])
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
def min_max_scaler(
feature: Union[pd.Series, int, float],
statistics: TransformationStatistics = feature_statistics,
) -> Union[pd.Series, int, float]:
"""
Scales the feature to the range [0, 1] using the min and max values of the feature in the training data.

# Arguments
feature: `Union[pd.Series, int, float]`. The feature to scale between 0 and 1.
statistics: `TransformationStatistics`. The training dataset statistics of the feature, this is computed and passed automatically by Hopsworks when the transformation function is executed.

# Returns
`pd.Series`. The scaled feature.
"""
return (feature - statistics.feature.min) / (
statistics.feature.max - statistics.feature.min
)


@udf(float, drop=["feature"])
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
def standard_scaler(
feature: Union[pd.Series, int, float],
statistics: TransformationStatistics = feature_statistics,
) -> Union[pd.Series, int, float]:
"""
Standardize features by transforming the feature to have a mean of 0 and a standard deviation of 1.

# Arguments
feature: `Union[pd.Series, int, float]`. The feature to standardize.
statistics: `TransformationStatistics`. The training dataset statistics of the feature, this is computed and passed automatically by Hopsworks when the transformation function is executed.

# Returns
`pd.Series`. The scaled feature.
"""
return (feature - statistics.feature.mean) / statistics.feature.stddev


@udf(float, drop=["feature"])
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
def robust_scaler(
feature: Union[pd.Series, int, float],
statistics: TransformationStatistics = feature_statistics,
) -> Union[pd.Series, int, float]:
"""
Robust scale features by transforming the feature to have a median of 0 and a interquartile range of 1.

# Arguments
feature: `Union[pd.Series, int, float]`. The feature to robust scale.
statistics: `TransformationStatistics`. The training dataset statistics of the feature, this is computed and passed automatically by Hopsworks when the transformation function is executed.

# Returns
`pd.Series`. The scaled feature.
"""
return (feature - statistics.feature.percentiles[49]) / (
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
)


@udf(int, drop=["feature"], mode="pandas")
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
def label_encoder(
feature: Union[pd.Series, str],
statistics: TransformationStatistics = feature_statistics,
) -> Union[pd.Series, int]:
"""
Encode categorical features into numerical features.

# Arguments
feature: `Union[pd.Series, str]`. The feature to encode.
statistics: `TransformationStatistics`. The training dataset statistics of the feature, this is computed and passed automatically by Hopsworks when the transformation function is executed.

# Returns
`pd.Series`. The encoded feature.
"""
unique_data = sorted([value for value in statistics.feature.unique_values])
value_to_index = {value: index for index, value in enumerate(unique_data)}
# Unknown categories not present in training dataset are encoded as -1.
Expand All @@ -57,7 +110,20 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie


@udf(bool, drop=["feature"], mode="pandas")
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
def one_hot_encoder(
feature: Union[pd.Series, str],
statistics: TransformationStatistics = feature_statistics,
) -> pd.DataFrame:
"""
Encode categorical features as a one-hot numeric array.

# Arguments
feature: `Union[pd.Series, str]`. The feature to encode.
statistics: `TransformationStatistics`. The training dataset statistics of the feature, this is computed and passed automatically by Hopsworks when the transformation function is executed.

# Returns
`pd.DataFrame`. A pandas dataframe with the one-hot encoded features.
"""
unique_data = [value for value in statistics.feature.unique_values]

# One hot encode features. Re-indexing to set missing categories to False and drop categories not in training data statistics.
Expand Down
127 changes: 83 additions & 44 deletions python/hsfs/hopsworks_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

import ast
import copy
Expand All @@ -38,7 +39,12 @@

class UDFExecutionMode(Enum):
"""
Class that store the possible execution types of UDF's.
Possible execution modes for a transformation function. The execution mode specifies the way in which a transformation function is executed in Hopsworks.

Currently Hopsworks supports three execution modes for a transformation function.
- `default` - This mode executes the transformation function as a Pandas UDF during training dataset generation or batch inference and as a Python UDF during feature vector reterival.
- `pandas` - This mode would always execute the transformation function as a Pandas UDF.
- `python` - This mode would always execute the transformation function as a Python UDF.
"""

DEFAULT = "default"
Expand Down Expand Up @@ -76,36 +82,37 @@ def udf(
return_type: Union[List[type], type],
drop: Optional[Union[str, List[str]]] = None,
mode: Literal["default", "python", "pandas"] = "default",
) -> "HopsworksUdf":
) -> HopsworksUdf:
"""
Create an User Defined Function that can be and used within the Hopsworks Feature Store to create transformation functions.

Hopsworks UDF's are user defined functions that executes as 'pandas_udf' when executing
in spark engine and as pandas functions in the python engine. The pandas udf/pandas functions
gets as inputs pandas Series's and can provide as output a pandas Series or a pandas DataFrame.
A Hopsworks udf is defined using the `hopsworks_udf` decorator. The outputs of the defined UDF
must be mentioned in the decorator as a list of python types.
Decorator used to define a User Defined Function (UDF) in Hopsworks.

UDF's defined using this decorator can be attached to:
- Feature groups: To create On-Demand transformation functions.
- Feature views: To create Model-Dependent transformation functions.

!!! example
```python
from hopsworks import udf

@udf(float)
def add_one(data1):
return data1 + 1
@udf(float, drop=["data2"], mode="python")
def add(data1, data2):
return data1 + data2
```

# Arguments
return_type: `Union[List[type], type]`. The output types of the defined UDF
drop: `Optional[Union[str, List[str]]]`. The features to be dropped after application of transformation functions. Default's to None.
mode: `Literal["default", "python", "pandas"]`. The exection mode of the UDF. Default's to 'default'
return_type: The type of the output features returned from the the transformation function.
drop: List that contains the names of the arguments to be removed form the dataframe after the application of the transformation function. The features mapped to these arguments are removed from the final output dataframe after all transformation functions are executed.
mode: The execution mode of the UDF determines the way in which the UDF is executed during training dataset generation, batch data retrieval and online feature vector retrieval. Currently Hopsworks supports three execution modes for a transformation function:
- `default` - This mode executes the transformation function as a Pandas UDF during training dataset generation or batch inference and as a Python UDF during feature vector retrieval.
- `pandas` - This mode would always execute the transformation function as a Pandas UDF.
- `python` - This mode would always execute the transformation function as a Python UDF.
Default's to 'default'.

# Returns
`HopsworksUdf`: The metadata object for hopsworks UDF's.
`HopsworksUdf`: The metadata object that contains all information to execute the UDF in both python and spark engines.

# Raises
`hopsworks.client.exceptions.FeatureStoreException` : If unable to create UDF.
`hopsworks.client.exceptions.FeatureStoreException` : If unable to create the UDF.
"""

def wrapper(func: Callable) -> HopsworksUdf:
Expand Down Expand Up @@ -147,21 +154,23 @@ class HopsworksUdf:
"""
Meta data for user defined functions.

Stores meta data required to execute the user defined function in both spark and python engine.
The class generates uses the metadata to dynamically generate user defined functions based on the
engine it is executed in.
This class stores all the meta data required to execute the user defined function in both spark and python engine.
This metadata is used to generate the source code to execute the function in both python and spark.

!!! Note
UDFs in Hopsworks should only be created using the `udf` decorator. It should not be directly created by using the constructor.

# Arguments
func : `Union[Callable, str]`. The transformation function object or the source code of the transformation function.
return_types : `Union[List[type], type, List[str], str]`. A python type or a list of python types that denotes the data types of the columns output from the transformation functions.
name : `Optional[str]`. Name of the transformation function.
transformation_features : `Optional[List[TransformationFeature]]`. A list of objects of `TransformationFeature` that maps the feature used for transformation to their corresponding statistics argument names if any
transformation_function_argument_names : `Optional[List[str]]`. The argument names of the transformation function.
return_types : `Union[List[type], type, List[str], str]`. A python type or a list of python types that denotes the data types of the columns returned from the UDF.
name : `Optional[str]`. Name of the UDF.
transformation_features : `Optional[List[TransformationFeature]]`. A list of objects of `TransformationFeature` that maps the argument names of the UDF to the feature names that are passed as input to it when the UDF is executed.
transformation_function_argument_names : `Optional[List[str]]`. The name of the arguments used by the UDF.
dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied.
dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped
feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view.
output_column_names: `Optional[List[str]]`. The names of the output columns returned from the transformation function.
generate_output_col_names: `bool`. Generate default output column names for the transformation function. Default's to True.
dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped.
feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view. These are required if an on-demand feature from a feature group is added to a feature view using a prefix.
output_column_names: `Optional[List[str]]`. The names of the output columns returned from the UDF.
generate_output_col_names: `bool`. Generate default output column names for the UDF. Default's to True.
"""

# Mapping for converting python types to spark types - required for creating pandas UDF's.
Expand Down Expand Up @@ -697,9 +706,11 @@ def renaming_wrapper(*args):
# returning executed function object
return eval("renaming_wrapper", scope)

def __call__(self, *features: List[str]) -> "HopsworksUdf":
def __call__(self, *features: List[str]) -> HopsworksUdf:
"""
Set features to be passed as arguments to the user defined functions
Set features to be passed as arguments to the user defined functions.

This function maps the UDF argument names to the passed feature names. When executing the UDF, Hopsworks extracts the data corresponding to these features and passes them to the UDF.

# Arguments
features: Name of features to be passed to the User Defined function
Expand Down Expand Up @@ -745,9 +756,16 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf":
udf.dropped_features = updated_dropped_features
return udf

def alias(self, *args: str):
def alias(self, *args: str) -> HopsworksUdf:
"""
Set the names of the transformed features output by the UDF.

# Arguments
args: `str`. The names of the output features returned from the UDF.
# Returns
`HopsworksUdf`: The UDF object with the updated output feature names.
# Raises
`FeatureStoreException`: If the output feature names are not strings or if the number of output feature names does not match the number of features returned by the UDF.
"""
if len(args) == 1 and isinstance(args[0], list):
# If a single list is passed, use it directly
Expand Down Expand Up @@ -893,8 +911,8 @@ def json(self) -> str:

@classmethod
def from_response_json(
cls: "HopsworksUdf", json_dict: Dict[str, Any]
) -> "HopsworksUdf":
cls: HopsworksUdf, json_dict: Dict[str, Any]
) -> HopsworksUdf:
"""
Function that constructs the class object from its json serialization.

Expand Down Expand Up @@ -1000,32 +1018,41 @@ def from_response_json(

@property
def return_types(self) -> List[str]:
"""Get the output types of the UDF"""
"""
Types of the output features returned by the UDF. These return types is used to execute the UDF as a PandasUDF or Udf in the Spark Engine.
"""
# Update the number of outputs for one hot encoder to match the number of unique values for the feature
if self.function_name == "one_hot_encoder" and self.transformation_statistics:
self.update_return_type_one_hot()
return self._return_types

@property
def function_name(self) -> str:
"""Get the function name of the UDF"""
"""The name of the UDF."""
return self._function_name

@property
def statistics_required(self) -> bool:
"""Get if statistics for any feature is required by the UDF"""
"""
True if any features requires training dataset statistics in the defined UDF.

Arguments in an UDF can request training dataset statistics for it to be computed by setting the default value for the argument `statistics` to an instance of `TransformationStatistics` when defining a UDF.
"""
return bool(self.statistics_features)

@property
def transformation_statistics(
self,
) -> Optional[TransformationStatistics]:
"""Feature statistics required for the defined UDF"""
"""
The statistics being used when executing the transformation function.
This property is set internally by Hopsworks when generating training datasets and when a feature view is initialized for serving using the `init_serving` function.
"""
return self._statistics

@property
def output_column_names(self) -> List[str]:
"""Output columns names of the transformation function"""
"""The names of the features output by the UDF. The dataframe output after applying the transformation function will use these names to store the output columns."""
if self._feature_name_prefix:
return [
self._feature_name_prefix + output_col_name
Expand All @@ -1037,7 +1064,10 @@ def output_column_names(self) -> List[str]:
@property
def transformation_features(self) -> List[str]:
"""
List of feature names to be used in the User Defined Function.
The names of the feature whose data should be passed as input to the UDF. When executing the UDF, the data corresponding to these features are extracted and passed to the UDF.

By default, Hopsworks would use the name of the UDF's arguments as the feature names to be passed as input to the transformation function.
However this behaviour can be over ridden by explicitly passed the required feature name by calling the UDF.
"""
if self._feature_name_prefix:
return [
Expand All @@ -1054,7 +1084,7 @@ def transformation_features(self) -> List[str]:
@property
def unprefixed_transformation_features(self) -> List[str]:
"""
List of feature name used in the transformation function without the feature name prefix.
The unprefixed names of the feature whose data should be passed as input to the UDF.
"""
return [
transformation_feature.feature_name
Expand All @@ -1063,13 +1093,19 @@ def unprefixed_transformation_features(self) -> List[str]:

@property
def feature_name_prefix(self) -> Optional[str]:
"""The feature name prefix that needs to be added to the feature names"""
"""
The prefix that is applied to input and output feature names of a transformation function.

A prefix is set internally by Hopsworks if a on-demand feature from a feature group is added to a feature view using a prefix.
"""
return self._feature_name_prefix

@property
def statistics_features(self) -> List[str]:
"""
List of feature names that require statistics
List of feature names that requires the training statistics.

Arguments in an UDF can request training dataset statistics for it to be computed by setting the default value for the argument `statistics` to an instance of `TransformationStatistics` when defining a UDF.
"""
return [
transformation_feature.feature_name
Expand All @@ -1090,7 +1126,7 @@ def _statistics_argument_mapping(self) -> Dict[str, str]:
@property
def _statistics_argument_names(self) -> List[str]:
"""
List of argument names required for statistics
List of argument names required for statistics.
"""
return [
transformation_feature.statistic_argument_name
Expand All @@ -1101,7 +1137,7 @@ def _statistics_argument_names(self) -> List[str]:
@property
def dropped_features(self) -> List[str]:
"""
List of features that will be dropped after the UDF is applied.
List of features that will be dropped from output dataframe. These feature are dropped from the final output dataframe after all UDFs in the feature view/feature group are executed.
"""
if self._feature_name_prefix and self._dropped_features:
return [
Expand All @@ -1113,6 +1149,9 @@ def dropped_features(self) -> List[str]:

@property
def execution_mode(self) -> UDFExecutionMode:
"""
The execution modes for a transformation function. The execution mode specifies the way in which a transformation function is executed in Hopsworks.
"""
return self._execution_mode

@property
Expand Down
Loading
Loading