Skip to content

Commit b407d1c

Browse files
authored
Merge pull request #22 from feast-dev/compute-engine
Compute engine
2 parents e959053 + 7b3023d commit b407d1c

File tree

10 files changed

+4499
-1
lines changed

10 files changed

+4499
-1
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@ terraform.tfstate.backup
1212
**/derby.log
1313
**/metastore_db/*
1414
.env
15-
.idea
15+
.idea
16+
.venv
17+
**/.ipynb_checkpoints

module_5_compute_engine/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Feast Compute Engine Demo
2+
This is a demo of the Feast Compute Engine, which allows you to run feature transformations(filter, aggregation, custom udf etc.) by specified compute engine.
3+
4+
## Install and Run
5+
Check notebook [compute_example.ipynb](compute_example.ipynb) for more details.
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 2,
6+
"metadata": {
7+
"application/vnd.databricks.v1+cell": {
8+
"inputWidgets": {},
9+
"nuid": "0c4646a7-272d-44ec-95fe-3480a267e173",
10+
"showTitle": false,
11+
"title": ""
12+
},
13+
"collapsed": true,
14+
"jupyter": {
15+
"outputs_hidden": true
16+
}
17+
},
18+
"outputs": [
19+
{
20+
"name": "stdout",
21+
"output_type": "stream",
22+
"text": [
23+
"zsh:1: no matches found: feast[spark,aws,redis]\r\n",
24+
"Note: you may need to restart the kernel to use updated packages.\n"
25+
]
26+
}
27+
],
28+
"source": [
29+
"%pip install feast[spark,postgres,snowflake]"
30+
]
31+
},
32+
{
33+
"cell_type": "markdown",
34+
"metadata": {
35+
"collapsed": false,
36+
"jupyter": {
37+
"outputs_hidden": false
38+
}
39+
},
40+
"source": [
41+
"## Apply feature definition"
42+
]
43+
},
44+
{
45+
"cell_type": "code",
46+
"execution_count": 2,
47+
"metadata": {
48+
"collapsed": false,
49+
"jupyter": {
50+
"outputs_hidden": false
51+
}
52+
},
53+
"outputs": [],
54+
"source": [
55+
"import os\n",
56+
"os.chdir(\"./feature_repo\")\n"
57+
]
58+
},
59+
{
60+
"cell_type": "code",
61+
"execution_count": 11,
62+
"metadata": {},
63+
"outputs": [
64+
{
65+
"name": "stderr",
66+
"output_type": "stream",
67+
"text": [
68+
"/Users/haoxu/dev/feature_store/feast/sdk/python/feast/batch_feature_view.py:93: RuntimeWarning: Batch feature views are experimental features in alpha development. Some functionality may still be unstable so functionality can change in the future.\n",
69+
" warnings.warn(\n",
70+
"WARNING: Using incubator modules: jdk.incubator.vector\n",
71+
"Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n",
72+
"Setting default log level to \"WARN\".\n",
73+
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
74+
"25/06/10 20:39:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
75+
"25/06/10 20:39:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n"
76+
]
77+
},
78+
{
79+
"name": "stdout",
80+
"output_type": "stream",
81+
"text": [
82+
"No project found in the repository. Using project name feast_demo_compute_engine defined in feature_store.yaml\n",
83+
"Applying changes for project feast_demo_compute_engine\n",
84+
"Deploying infrastructure for order_stats\n"
85+
]
86+
},
87+
{
88+
"name": "stderr",
89+
"output_type": "stream",
90+
"text": [
91+
"INFO:py4j.clientserver:Closing down clientserver connection\n"
92+
]
93+
},
94+
{
95+
"data": {
96+
"text/plain": [
97+
"0"
98+
]
99+
},
100+
"execution_count": 11,
101+
"metadata": {},
102+
"output_type": "execute_result"
103+
}
104+
],
105+
"source": [
106+
"os.system(\"feast apply\")\n"
107+
]
108+
},
109+
{
110+
"cell_type": "markdown",
111+
"metadata": {
112+
"collapsed": false,
113+
"jupyter": {
114+
"outputs_hidden": false
115+
}
116+
},
117+
"source": [
118+
"## Load Feature store API"
119+
]
120+
},
121+
{
122+
"cell_type": "code",
123+
"execution_count": 12,
124+
"metadata": {
125+
"application/vnd.databricks.v1+cell": {
126+
"inputWidgets": {},
127+
"nuid": "37baef9e-ffac-4cf9-ab6c-778e0321d544",
128+
"showTitle": false,
129+
"title": ""
130+
}
131+
},
132+
"outputs": [],
133+
"source": [
134+
"from feast import FeatureStore\n",
135+
"store = FeatureStore(repo_path=\".\")"
136+
]
137+
},
138+
{
139+
"cell_type": "markdown",
140+
"metadata": {
141+
"collapsed": false,
142+
"jupyter": {
143+
"outputs_hidden": false
144+
}
145+
},
146+
"source": [
147+
"## Materialization"
148+
]
149+
},
150+
{
151+
"cell_type": "markdown",
152+
"metadata": {
153+
"collapsed": false,
154+
"jupyter": {
155+
"outputs_hidden": false
156+
}
157+
},
158+
"source": [
159+
"### Local"
160+
]
161+
},
162+
{
163+
"cell_type": "code",
164+
"execution_count": 9,
165+
"metadata": {
166+
"application/vnd.databricks.v1+cell": {
167+
"inputWidgets": {},
168+
"nuid": "83ad34fa-cbd3-4100-8c01-40720ddbc14e",
169+
"showTitle": false,
170+
"title": ""
171+
}
172+
},
173+
"outputs": [
174+
{
175+
"name": "stdout",
176+
"output_type": "stream",
177+
"text": [
178+
"Materializing \u001b[1m\u001b[32m1\u001b[0m feature views from \u001b[1m\u001b[32m1992-04-20 00:00:00+00:00\u001b[0m to \u001b[1m\u001b[32m2025-04-21 00:00:00+00:00\u001b[0m into the \u001b[1m\u001b[32mpostgres\u001b[0m online store.\n",
179+
"\n",
180+
"\u001b[1m\u001b[32morder_stats\u001b[0m:\n",
181+
"Elapsed time: 84.29606699943542 seconds\n"
182+
]
183+
}
184+
],
185+
"source": [
186+
"from datetime import datetime\n",
187+
"import time\n",
188+
"\n",
189+
"start_time = time.time()\n",
190+
"store.materialize(\n",
191+
" start_date=datetime(1992,4,20),\n",
192+
" end_date=datetime(2025,4,21),\n",
193+
")\n",
194+
"end_time = time.time()\n",
195+
"elapsed_time = end_time - start_time\n",
196+
"print(f\"Elapsed time: {elapsed_time} seconds\")"
197+
]
198+
},
199+
{
200+
"cell_type": "markdown",
201+
"metadata": {},
202+
"source": [
203+
"### Retrieve feature data"
204+
]
205+
},
206+
{
207+
"cell_type": "code",
208+
"execution_count": 15,
209+
"metadata": {
210+
"scrolled": true
211+
},
212+
"outputs": [
213+
{
214+
"data": {
215+
"text/plain": [
216+
"{'O_CUSTKEY': [397082], 'O_TOTALPRICE': [304962.84375]}"
217+
]
218+
},
219+
"execution_count": 15,
220+
"metadata": {},
221+
"output_type": "execute_result"
222+
}
223+
],
224+
"source": [
225+
"store.get_online_features(\n",
226+
" features=[\"order_stats:O_TOTALPRICE\"],\n",
227+
" entity_rows=[{\"O_CUSTKEY\": 397082}]\n",
228+
").to_dict()"
229+
]
230+
},
231+
{
232+
"cell_type": "markdown",
233+
"metadata": {},
234+
"source": [
235+
"### Spark"
236+
]
237+
},
238+
{
239+
"cell_type": "code",
240+
"execution_count": 14,
241+
"metadata": {
242+
"scrolled": true
243+
},
244+
"outputs": [
245+
{
246+
"name": "stdout",
247+
"output_type": "stream",
248+
"text": [
249+
"Materializing \u001b[1m\u001b[32m1\u001b[0m feature views from \u001b[1m\u001b[32m1992-04-20 00:00:00+00:00\u001b[0m to \u001b[1m\u001b[32m2025-04-21 00:00:00+00:00\u001b[0m into the \u001b[1m\u001b[32mpostgres\u001b[0m online store.\n",
250+
"\n",
251+
"\u001b[1m\u001b[32morder_stats\u001b[0m:\n"
252+
]
253+
},
254+
{
255+
"name": "stderr",
256+
"output_type": "stream",
257+
"text": [
258+
"[Stage 8:===================================================> (10 + 1) / 11]"
259+
]
260+
},
261+
{
262+
"name": "stdout",
263+
"output_type": "stream",
264+
"text": [
265+
"Elapsed time: 90.78104996681213 seconds\n"
266+
]
267+
},
268+
{
269+
"name": "stderr",
270+
"output_type": "stream",
271+
"text": [
272+
" "
273+
]
274+
}
275+
],
276+
"source": [
277+
"# Run in local mode\n",
278+
"from datetime import datetime\n",
279+
"import time\n",
280+
"\n",
281+
"start_time = time.time()\n",
282+
"store.materialize(\n",
283+
" start_date=datetime(1992,4,20),\n",
284+
" end_date=datetime(2025,4,21),\n",
285+
")\n",
286+
"end_time = time.time()\n",
287+
"elapsed_time = end_time - start_time\n",
288+
"print(f\"Elapsed time: {elapsed_time} seconds\")"
289+
]
290+
}
291+
],
292+
"metadata": {
293+
"application/vnd.databricks.v1+notebook": {
294+
"dashboards": [],
295+
"language": "python",
296+
"notebookMetadata": {
297+
"mostRecentlyExecutedCommandWithImplicitDF": {
298+
"commandId": 402528431658022,
299+
"dataframes": [
300+
"_sqldf"
301+
]
302+
},
303+
"pythonIndentUnit": 2
304+
},
305+
"notebookName": "Feast demo",
306+
"notebookOrigID": 1254642919516165,
307+
"widgets": {}
308+
},
309+
"kernelspec": {
310+
"display_name": "Python 3 (ipykernel)",
311+
"language": "python",
312+
"name": "python3"
313+
},
314+
"language_info": {
315+
"codemirror_mode": {
316+
"name": "ipython",
317+
"version": 3
318+
},
319+
"file_extension": ".py",
320+
"mimetype": "text/x-python",
321+
"name": "python",
322+
"nbconvert_exporter": "python",
323+
"pygments_lexer": "ipython3",
324+
"version": "3.12.9"
325+
},
326+
"vscode": {
327+
"interpreter": {
328+
"hash": "7d634b9af180bcb32a446a43848522733ff8f5bbf0cc46dba1a83bede04bf237"
329+
}
330+
}
331+
},
332+
"nbformat": 4,
333+
"nbformat_minor": 4
334+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from feast import (
2+
Field,
3+
FileSource,
4+
PushSource,
5+
RequestSource,
6+
SnowflakeSource
7+
)
8+
from feast.types import Int64, Float32
9+
10+
# driver_stats = SparkSource(
11+
# name="driver_stats_source",
12+
# path="../data/driver_stats_lat_lon.parquet",
13+
# timestamp_field="event_timestamp",
14+
# created_timestamp_column="created",
15+
# description="A table describing the stats of a driver based on hourly logs",
16+
17+
# )
18+
19+
20+
tpch_sf = SnowflakeSource(
21+
database="SNOWFLAKE_SAMPLE_DATA",
22+
schema="TPCH_SF10",
23+
table="ORDERS",
24+
timestamp_field="O_ORDERDATE"
25+
)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from feast import (
2+
Entity,
3+
ValueType,
4+
)
5+
6+
customer = Entity(
7+
name="customer",
8+
join_keys=["O_CUSTKEY"],
9+
value_type=ValueType.INT64,
10+
description="Custoemr id",
11+
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from feast import FeatureService
2+
3+
from features import *
4+
#
5+
# feature_service = FeatureService(
6+
# name="model_v1",
7+
# features=[bfv[["conv_rate"]]],
8+
9+
# )

0 commit comments

Comments
 (0)