Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4077646
init
bubriks Aug 7, 2025
b7ee740
ruff fix
bubriks Aug 7, 2025
86b58c6
small fix
bubriks Aug 7, 2025
7cdb799
Merge branch 'main' into FSTORE-1795
bubriks Aug 7, 2025
b712c4e
update fixtures
bubriks Aug 7, 2025
1a81dd1
some test fixes
bubriks Aug 7, 2025
9286187
ruff fix
bubriks Aug 7, 2025
34ae32f
test fixes
bubriks Aug 7, 2025
fe9550a
ruff fix
bubriks Aug 7, 2025
3f68bfb
data source fixes
bubriks Aug 8, 2025
26e23ba
improve docs and bug fixes
bubriks Aug 8, 2025
a202745
move td to data source
bubriks Aug 13, 2025
e9bfb40
Merge branch 'main' into FSTORE-1795
bubriks Aug 13, 2025
d41c9e5
ruff fix
bubriks Aug 13, 2025
4c8c309
fix tests
bubriks Aug 14, 2025
c183407
add more tests for ExternalFeatureGroupEngine.save
bubriks Aug 14, 2025
528789d
Merge branch 'main' into FSTORE-1795
bubriks Aug 29, 2025
9884991
Merge branch 'main' into FSTORE-1795
bubriks Sep 10, 2025
effe203
updates to make training datasets work
bubriks Sep 10, 2025
ec659d0
ruff fix
bubriks Sep 10, 2025
eaeb198
test fix
bubriks Sep 10, 2025
5fd50af
test fixes
bubriks Sep 10, 2025
03690bc
add get_feature_groups_provenance to data source
bubriks Sep 10, 2025
bec0276
ruff fix
bubriks Sep 10, 2025
67d5883
add data_source_api docs
bubriks Sep 10, 2025
cd7fc6b
add get_training_datasets_provenance to storage connector
bubriks Sep 11, 2025
318438c
ruff fix
bubriks Sep 11, 2025
1f28528
small java client fix
bubriks Sep 11, 2025
2007be5
update java client for data source
bubriks Sep 11, 2025
456764f
feedback fix
bubriks Sep 12, 2025
0b3fd0a
ruff
bubriks Sep 12, 2025
ff4f7ac
typing
bubriks Sep 12, 2025
3253904
Merge branch 'main' into FSTORE-1795
bubriks Sep 15, 2025
99e697c
remove return types due to issues with circular import when generatin…
bubriks Sep 15, 2025
1b1e81a
ruff fix
bubriks Sep 15, 2025
baaf4bd
Merge branch 'main' into FSTORE-1795
bubriks Sep 22, 2025
59fd25c
Merge branch 'main' into FSTORE-1795
bubriks Sep 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/templates/api/data_source_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Data Source

## Retrieval

{{ds_get}}

## Properties

{{data_source_properties}}

## Methods

{{data_source_methods}}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.onlineTopicName = onlineTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.dataSource.setStorageConnector(storageConnector);
this.dataSource.setPath(path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.dataSource.setStorageConnector(storageConnector);
this.dataSource.setPath(path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ public class DataSource extends RestDto<DataSource> {
@Setter
private String path = "";

@Getter
@Setter
private StorageConnector storageConnector = null;

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ public abstract class FeatureGroupBase<T> {
@Setter
protected OnlineConfig onlineConfig;

@Getter
@Setter
protected StorageConnector storageConnector;

@Getter
@Setter
protected DataSource dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,34 @@ public StorageConnector getStorageConnector(String name) throws FeatureStoreExce
return storageConnectorApi.getByName(this, name, StorageConnector.class);
}

/**
* Get a previously created data source from the feature store.
*
* <p>data sources encapsulate all information needed for the execution engine to read and write to a specific
* storage.
*
* <p>If you want to connect to the online feature store, see the getOnlineDataSource` method to get the
* JDBC connector for the Online Feature Store.
*
* <pre>
* {@code
* // get feature store handle
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
* DataSource ds = fs.getDataSource("ds_name");
* }
* </pre>
*
* @param name Name of the data source to retrieve.
* @return DataSource Data source object.
* @throws FeatureStoreException If unable to retrieve DataSource from the feature store.
* @throws IOException Generic IO exception.
*/
public DataSource getDataSource(String name) throws FeatureStoreException, IOException {
DataSource dataSource = new DataSource();
dataSource.setStorageConnector(getStorageConnector(name));
return dataSource;
}

/**
* Get a previously created HopsFs compliant storage connector from the feature store.
*
Expand Down
318 changes: 318 additions & 0 deletions java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureViewBase.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public StreamFeatureGroup(FeatureStoreBase featureStore, @NonNull String name, I
this.onlineTopicName = onlineTopicName;
this.eventTime = eventTime;
this.timeTravelFormat = timeTravelFormat;
this.storageConnector = storageConnector;
this.onlineConfig = onlineConfig;
this.dataSource.setStorageConnector(storageConnector);
this.dataSource.setPath(path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class TrainingDatasetBase {

@Getter
@Setter
protected StorageConnector storageConnector;
protected DataSource dataSource;

@Getter
@Setter
Expand All @@ -138,14 +138,19 @@ public TrainingDatasetBase(Integer version, String description, DataFormat dataF
TrainingDatasetType trainingDatasetType, Float validationSize, Float testSize,
String trainStart, String trainEnd, String validationStart,
String validationEnd, String testStart, String testEnd, Integer timeSplitSize,
FilterLogic extraFilterLogic, Filter extraFilter)
FilterLogic extraFilterLogic, Filter extraFilter, DataSource dataSource)
throws FeatureStoreException, ParseException {
this.version = version;
this.description = description;
this.dataFormat = dataFormat != null ? dataFormat : DataFormat.PARQUET;
this.coalesce = coalesce != null ? coalesce : false;
this.location = location;
this.storageConnector = storageConnector;
if (dataSource == null) {
this.dataSource = new DataSource();
this.dataSource.setStorageConnector(storageConnector);
this.dataSource.setPath(location);
} else {
this.dataSource = dataSource;
}
this.trainSplit = trainSplit;
this.splits = splits == null ? Lists.newArrayList() : splits;
this.seed = seed;
Expand All @@ -155,7 +160,7 @@ public TrainingDatasetBase(Integer version, String description, DataFormat dataF
this.eventStartTime = eventStartTime != null ? FeatureGroupUtils.getDateFromDateString(eventStartTime) : null;
this.eventEndTime = eventEndTime != null ? FeatureGroupUtils.getDateFromDateString(eventEndTime) : null;
this.trainingDatasetType = trainingDatasetType != null ? trainingDatasetType :
getTrainingDatasetType(storageConnector);
getTrainingDatasetType(dataSource);
setValTestSplit(validationSize, testSize);
setTimeSeriesSplits(timeSplitSize, trainStart, trainEnd, validationStart, validationEnd, testStart, testEnd);
if (extraFilter != null) {
Expand Down Expand Up @@ -225,10 +230,10 @@ public void setLabel(List<String> label) {
this.label = label.stream().map(String::toLowerCase).collect(Collectors.toList());
}

public TrainingDatasetType getTrainingDatasetType(StorageConnector storageConnector) {
if (storageConnector == null) {
public TrainingDatasetType getTrainingDatasetType(DataSource dataSource) {
if (dataSource == null || dataSource.getStorageConnector() == null) {
return TrainingDatasetType.HOPSFS_TRAINING_DATASET;
} else if (storageConnector.getStorageConnectorType() == StorageConnectorType.HOPSFS) {
} else if (dataSource.getStorageConnector().getStorageConnectorType() == StorageConnectorType.HOPSFS) {
return TrainingDatasetType.HOPSFS_TRAINING_DATASET;
} else {
return TrainingDatasetType.EXTERNAL_TRAINING_DATASET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
this.description = description;
this.primaryKeys = primaryKeys != null
? primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.storageConnector = storageConnector;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.eventTime = eventTime;
Expand All @@ -97,6 +96,7 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
this.topicName = topicName;
this.notificationTopicName = notificationTopicName;
this.onlineConfig = onlineConfig;
this.dataSource.setStorageConnector(storageConnector);
this.dataSource.setPath(path);
this.dataSource.setQuery(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.dataSource.setStorageConnector(storageConnector);
this.dataSource.setPath(path);
}

Expand Down
Loading
Loading