diff --git a/.gitignore b/.gitignore index 8cd9d08..058ceab 100644 --- a/.gitignore +++ b/.gitignore @@ -144,4 +144,5 @@ Digraph.gv.pdf .fn_graph_cache # Sandbox files for developers -sandbox.py \ No newline at end of file +sandbox.py.DS_Store +fn_graph/.DS_Store diff --git a/fn_graph/examples/car_savings.py b/fn_graph/examples/car_savings.py index 4660cc6..f17876f 100644 --- a/fn_graph/examples/car_savings.py +++ b/fn_graph/examples/car_savings.py @@ -1,12 +1,11 @@ """ A simple example showing basic functionality. """ -#%% from random import choice, random import pandas as pd import plotly.express as px -from fn_graph import Composer +from fn_graph.examples.solution.composer import PipelineComposer as Composer prices = [random() * 100_000 + 50000 for _ in range(10)] @@ -18,7 +17,6 @@ def get_car_prices(): price=prices, ) ) - return df diff --git a/fn_graph/examples/finance.py b/fn_graph/examples/finance.py index 29bce6e..2a9a333 100644 --- a/fn_graph/examples/finance.py +++ b/fn_graph/examples/finance.py @@ -1,35 +1,25 @@ """ -In finance, the Sharpe ratio (also known as the Sharpe index, the Sharpe measure, -and the reward-to-variability ratio) measures the performance of an investment -(e.g., a security or portfolio) compared to a risk-free asset, after adjusting -for its risk. It is defined as the difference between the returns of the -investment and the risk-free return, divided by the standard deviation of the -investment (i.e., its volatility). It represents the additional amount of return -that an investor receives per unit of increase in risk. - -This shows how to calculate a the Sharoe ratio for a small portfolio of shares. The -share data us pulled from yahoo finance and the analysis is done in pandas. We assume -a risk free rate of zero. +In finance, the Sharpe ratio measures the performance of an investment compared +to a risk-free asset, after adjusting for its risk. + +This shows how to calculate the Sharpe ratio for a small portfolio of shares. """ from datetime import date from math import sqrt -from pathlib import Path import matplotlib.pyplot as plt import pandas as pd import yfinance as yf -from fn_graph import Composer from pandas.plotting import register_matplotlib_converters +from fn_graph.examples.solution.composer import PipelineComposer as Composer + register_matplotlib_converters() plt.style.use("fivethirtyeight") def closing_prices(share_allocations, start_date, end_date): - """ - The closing prices of our portfolio pulled with yfinance. - """ data = yf.download( " ".join(share_allocations.keys()), start=start_date, end=end_date ) @@ -37,31 +27,20 @@ def closing_prices(share_allocations, start_date, end_date): def normalised_returns(closing_prices): - """ - Normalise the returns as a ratio of the initial price. - """ return closing_prices / closing_prices.iloc[0, :] def positions(normalised_returns, share_allocations, initial_total_position): - """ - Our total positions ovr time given an initial allocation. - """ - allocations = pd.DataFrame( { symbol: normalised_returns[symbol] * allocation for symbol, allocation in share_allocations.items() } ) - return allocations * initial_total_position def total_position(positions): - """ - The total value of out portfolio - """ return positions.sum(axis=1) @@ -70,23 +49,14 @@ def positions_plot(positions): def cumulative_return(total_position): - """ - The cumulative return of our portfolio - """ - return 100 * (total_position[-1] / total_position[0] - 1) + return 100 * (total_position.iloc[-1] / total_position.iloc[0] - 1) def daily_return(total_position): - """ - The daily return of our portfolio - """ return total_position.pct_change(1) def sharpe_ratio(daily_return): - """ - The sharpe ratio of the portfolio assuming a zero risk free rate. - """ return daily_return.mean() / daily_return.std() @@ -116,5 +86,4 @@ def annual_sharpe_ratio(sharpe_ratio, trading_days_in_a_year=252): .cache() ) -# Just for uniformity with the rest of the examples f = composer diff --git a/fn_graph/examples/machine_learning.py b/fn_graph/examples/machine_learning.py index 7b9dea3..43d6699 100644 --- a/fn_graph/examples/machine_learning.py +++ b/fn_graph/examples/machine_learning.py @@ -1,13 +1,7 @@ """ A simple machine learning example that builds a classifier for the standard iris dataset. - -This example uses scikit-learn to build a a classifier to detect the species of an iris -flower based on attributes of the flower. Based on the parameters different types of models -can be trained, and preprocessing can be turned on and off. It also show cases the integration -of visualisations to measure the accuracy of the model. """ -from fn_graph import Composer import sklearn, sklearn.datasets, sklearn.svm, sklearn.linear_model, sklearn.metrics from sklearn.model_selection import train_test_split import pandas as pd @@ -15,18 +9,14 @@ import seaborn as sns import matplotlib.pylab as plt +from fn_graph.examples.solution.composer import PipelineComposer as Composer + def iris(): - """ - Load the classic iris dataset - """ return sklearn.datasets.load_iris() def data(iris): - """ - Pull out the data as pandas DataFrame - """ df_train = pd.DataFrame( iris.data, columns=["feature{}".format(i) for i in range(4)] ) @@ -34,18 +24,10 @@ def data(iris): def investigate_data(data): - """ - Check for any visual correlations using seaborn - """ return sns.pairplot(data, hue="y") def preprocess_data(data, do_preprocess): - """ - Preprocess the data by scaling depending on the parameter - - We make sure we don't mutate the data because that is better practice. - """ processed = data.copy() if do_preprocess: processed.iloc[:, :-1] = sklearn.preprocessing.scale(processed.iloc[:, :-1]) @@ -53,9 +35,6 @@ def preprocess_data(data, do_preprocess): def split_data(preprocess_data): - """ - Split the data into test and train sets - """ return dict( zip( ("training_features", "test_features", "training_target", "test_target"), @@ -64,9 +43,6 @@ def split_data(preprocess_data): ) -# This is done verbosely purpose, but it could be more concise - - def training_features(split_data): return split_data["training_features"] @@ -84,43 +60,25 @@ def test_target(split_data): def model(training_features, training_target, model_type): - """ - Train the model - """ if model_type == "ols": - model = sklearn.linear_model.LogisticRegression() + m = sklearn.linear_model.LogisticRegression() elif model_type == "svm": - model = sklearn.svm.SVC() + m = sklearn.svm.SVC() else: raise ValueError("invalid model selection, choose either 'ols' or 'svm'") - model.fit(training_features, training_target) - return model + m.fit(training_features, training_target) + return m def predictions(model, test_features): - """ - Make some predictions foo the test data - """ return model.predict(test_features) def classification_metrics(predictions, test_target): - """ - Show some standard classification metrics - """ return sklearn.metrics.classification_report(test_target, predictions) -def plot_confusion_matrix( - cm, target_names, title="Confusion matrix", cmap=plt.cm.Blues -): - """ - Plots a confusion matrix using matplotlib. - - This is just a regular function that is not in the composer. - Shamelessly taken from https://scikit-learn.org/0.15/auto_examples/model_selection/plot_confusion_matrix.html - """ - +def plot_confusion_matrix(cm, target_names, title="Confusion matrix", cmap=plt.cm.Blues): plt.imshow(cm, interpolation="nearest", cmap=cmap) plt.title(title) plt.colorbar() @@ -134,9 +92,6 @@ def plot_confusion_matrix( def confusion_matrix(predictions, test_target): - """ - Show the confusion matrix - """ cm = sklearn.metrics.confusion_matrix(test_target, predictions) return plot_confusion_matrix(cm, ["setosa", "versicolor", "virginica"]) @@ -144,9 +99,7 @@ def confusion_matrix(predictions, test_target): f = ( Composer() .update_parameters( - # Parameter controlling the model type (ols, svc) model_type="ols", - # Parameter enabling data preprocessing do_preprocess=True, ) .update( diff --git a/fn_graph/examples/solution/.gitignore b/fn_graph/examples/solution/.gitignore new file mode 100644 index 0000000..55b73ca --- /dev/null +++ b/fn_graph/examples/solution/.gitignore @@ -0,0 +1,7 @@ +venv/ +__pycache__/ +*.pyc +artifacts/ +logs/ +mnt/ +.env diff --git a/fn_graph/examples/solution/artifact_store/__init__.py b/fn_graph/examples/solution/artifact_store/__init__.py new file mode 100644 index 0000000..a637009 --- /dev/null +++ b/fn_graph/examples/solution/artifact_store/__init__.py @@ -0,0 +1,3 @@ +from .base import BaseArtifactStore +from .fs import LocalFSArtifactStore +from .s3 import S3ArtifactStore diff --git a/fn_graph/examples/solution/artifact_store/base.py b/fn_graph/examples/solution/artifact_store/base.py new file mode 100644 index 0000000..127566f --- /dev/null +++ b/fn_graph/examples/solution/artifact_store/base.py @@ -0,0 +1,14 @@ +from abc import ABC, abstractmethod +from typing import Any + +class BaseArtifactStore(ABC): + @abstractmethod + def put(self, key: str, value: Any) -> None: ... + @abstractmethod + def get(self, key: str) -> Any: ... + @abstractmethod + def exists(self, key: str) -> bool: ... + @abstractmethod + def delete(self, key: str) -> None: ... + @abstractmethod + def metadata(self, key: str) -> dict: ... diff --git a/fn_graph/examples/solution/artifact_store/fs.py b/fn_graph/examples/solution/artifact_store/fs.py new file mode 100644 index 0000000..8372339 --- /dev/null +++ b/fn_graph/examples/solution/artifact_store/fs.py @@ -0,0 +1,48 @@ +import os +from pathlib import Path +from typing import Any + +import cloudpickle + +from .base import BaseArtifactStore + + +class LocalFSArtifactStore(BaseArtifactStore): + def __init__(self, base_dir: str, run_id: str): + self.base_dir = Path(base_dir) + self.run_id = run_id + self._run_dir = self.base_dir / run_id + self._run_dir.mkdir(parents=True, exist_ok=True) + + def _path(self, key: str) -> Path: + return self._run_dir / f"{key}.pkl" + + def put(self, key: str, value: Any) -> None: + path = self._path(key) + print(f"[LocalFSArtifactStore] writing {key} to {path}", flush=True) + tmp_path = path.with_suffix(".tmp") + data = cloudpickle.dumps(value, protocol=4) + tmp_path.write_bytes(data) + os.replace(tmp_path, path) + print(f"[LocalFSArtifactStore] {key} written, size: {len(data)} bytes", flush=True) + + def get(self, key: str) -> Any: + path = self._path(key) + print(f"[LocalFSArtifactStore] loading {key} from {path}", flush=True) + result = cloudpickle.loads(path.read_bytes()) + print(f"[LocalFSArtifactStore] {key} loaded, type: {type(result).__name__}", flush=True) + return result + + def exists(self, key: str) -> bool: + result = self._path(key).exists() + print(f"[LocalFSArtifactStore] exists({key}): {result}", flush=True) + return result + + def delete(self, key: str) -> None: + path = self._path(key) + if path.exists(): + path.unlink() + + def metadata(self, key: str) -> dict: + stat = os.stat(self._path(key)) + return {"size": stat.st_size, "mtime": stat.st_mtime} diff --git a/fn_graph/examples/solution/artifact_store/s3.py b/fn_graph/examples/solution/artifact_store/s3.py new file mode 100644 index 0000000..634a683 --- /dev/null +++ b/fn_graph/examples/solution/artifact_store/s3.py @@ -0,0 +1,52 @@ +from typing import Any + +import boto3 +import cloudpickle +from botocore.exceptions import ClientError + +from .base import BaseArtifactStore + + +class S3ArtifactStore(BaseArtifactStore): + def __init__(self, bucket: str, run_id: str, region: str): + self.bucket = bucket + self.run_id = run_id + self.region = region + self._client = boto3.client("s3", region_name=region) + + def _key(self, key: str) -> str: + return f"{self.run_id}/{key}.pkl" + + def put(self, key: str, value: Any) -> None: + s3_key = self._key(key) + print(f"[S3ArtifactStore] uploading {key} to s3://{self.bucket}/{s3_key}", flush=True) + data = cloudpickle.dumps(value, protocol=4) + self._client.put_object(Bucket=self.bucket, Key=s3_key, Body=data) + print(f"[S3ArtifactStore] {key} uploaded, size: {len(data)} bytes", flush=True) + + def get(self, key: str) -> Any: + s3_key = self._key(key) + print(f"[S3ArtifactStore] downloading {key} from s3://{self.bucket}/{s3_key}", flush=True) + response = self._client.get_object(Bucket=self.bucket, Key=s3_key) + result = cloudpickle.loads(response["Body"].read()) + print(f"[S3ArtifactStore] {key} downloaded, type: {type(result).__name__}", flush=True) + return result + + def exists(self, key: str) -> bool: + try: + self._client.head_object(Bucket=self.bucket, Key=self._key(key)) + result = True + except ClientError as e: + if e.response["Error"]["Code"] in ("404", "NoSuchKey"): + result = False + else: + raise + print(f"[S3ArtifactStore] exists({key}): {result}", flush=True) + return result + + def delete(self, key: str) -> None: + self._client.delete_object(Bucket=self.bucket, Key=self._key(key)) + + def metadata(self, key: str) -> dict: + response = self._client.head_object(Bucket=self.bucket, Key=self._key(key)) + return {"size": response["ContentLength"], "last_modified": response["LastModified"]} diff --git a/fn_graph/examples/solution/composer.py b/fn_graph/examples/solution/composer.py new file mode 100644 index 0000000..680a235 --- /dev/null +++ b/fn_graph/examples/solution/composer.py @@ -0,0 +1,225 @@ +""" +PipelineComposer — drop-in replacement for fn_graph.Composer. + +The only change needed to use this instead of plain fn_graph: + + # Before + from fn_graph import Composer + + # After (only this line changes) + from fn_graph.examples.solution.composer import PipelineComposer as Composer + +Everything else — update(), update_parameters(), calculate() — works identically. + +What this adds on top of plain fn_graph: +- Each node runs through a pluggable executor (in-process, Docker, or Lambda) +- Each node's output is saved to an artifact store (local disk, S3, etc.) +- Re-runs skip nodes whose outputs already exist (memoization) +- If a node fails, downstream nodes are skipped automatically +- Retry with backoff on Docker nodes +- All of this is configured via a YAML file — no code changes needed +""" + +import inspect +import networkx as nx +from fn_graph import Composer + +from artifact_store.base import BaseArtifactStore +from config import get_executor, load_config, get_artifact_store, get_node_config + + +class PipelineComposer(Composer): + """ + Subclass of fn_graph.Composer. + + Inherits update(), update_parameters(), update_from(), cache() unchanged. + Only calculate() is overridden — it now routes each node through our + executor and artifact store instead of running everything in one process. + """ + + def __init__(self, *, _functions=None, _parameters=None): + super().__init__(_functions=_functions, _parameters=_parameters) + + # ── Ensure chained calls return PipelineComposer, not base Composer ─────── + + def update(self, *args, **kwargs): + base = super().update(*args, **kwargs) + return PipelineComposer(_functions=base._functions, _parameters=base._parameters) + + def update_parameters(self, **kwargs): + base = super().update_parameters(**kwargs) + return PipelineComposer(_functions=base._functions, _parameters=base._parameters) + + def update_from(self, other): + base = super().update_from(other) + return PipelineComposer(_functions=base._functions, _parameters=base._parameters) + + # ── Graph helpers ───────────────────────────────────────────────────────── + + def dag(self): + """ + Build the dependency graph from function signatures. + fn_graph's implicit contract: if function B has a parameter named A, + then B depends on A. We use this to build the execution order. + """ + g = nx.DiGraph() + all_names = set(self._functions) | set(self._parameters) + for name in all_names: + g.add_node(name) + for fname, fn in self._functions.items(): + for param in inspect.signature(fn).parameters: + if param in all_names: + g.add_edge(param, fname) + return g + + def functions(self): + """Returns all node functions registered in this composer.""" + return dict(self._functions) + + def parameters(self): + """ + Returns pipeline-level parameters in the format: + { name: (type, value) } + These are the initial inputs seeded into the artifact store before + any node runs. + """ + return {k: (type(v), v) for k, v in self._parameters.items()} + + def _resolve_predecessors(self, node_name): + """ + For a given node, yields (param_name, source_node) pairs. + This tells the orchestrator exactly which upstream outputs to + load from the artifact store as inputs for this node. + """ + fn = self._functions.get(node_name) + if fn is None: + return + all_names = set(self._functions) | set(self._parameters) + for param in inspect.signature(fn).parameters: + if param in all_names: + yield (param, param) + + # ── calculate() — the orchestrator ─────────────────────────────────────── + + def calculate(self, outputs, config="config/toy_local.yaml", **kwargs): + """ + Override of fn_graph's calculate(). + + This is the orchestrator — it: + 1. Reads the DAG and determines execution order (topological sort) + 2. Seeds pipeline parameters into the artifact store + 3. For each node in order: + a. Checks if output already exists (memoization — skip if yes) + b. Checks if any upstream node failed (skip if yes) + c. Loads only the inputs that node needs from the artifact store + d. Dispatches the node to the configured executor + (in-process / Docker / Lambda — set in YAML config) + e. Saves the output back to the artifact store + 4. Returns results for the requested output nodes + + Args: + outputs: list of node names you want results for, + or None / [] to return all node results + config: path to YAML config file, or a pre-loaded config dict + + Returns: + dict of { node_name: result } + """ + # Load config from file or use pre-loaded dict + if isinstance(config, str): + cfg = load_config(config) + else: + cfg = config + + on_failure = cfg["pipeline"].get("on_failure", "stop") + artifact_store = get_artifact_store(cfg) + + dag = self.dag() + funcs = self.functions() + params = {name: val for name, (_, val) in self.parameters().items()} + + # Topological sort — nodes run in dependency order + topo_order = list(nx.topological_sort(dag)) + total = len(topo_order) + + print(f"\n[PipelineComposer] starting pipeline run", flush=True) + print(f"[PipelineComposer] execution order: {topo_order}", flush=True) + print(f"[PipelineComposer] on_failure: {on_failure}", flush=True) + + # Step 1 — seed pipeline parameters into artifact store + # These are the initial inputs (e.g. raw_path, model_type etc.) + print("\n[PipelineComposer] seeding parameters", flush=True) + for name, value in params.items(): + artifact_store.put(name, value) + print(f"[PipelineComposer] seeded: {name} = {value}", flush=True) + + failed_nodes = set() + + print("\n[PipelineComposer] beginning node execution", flush=True) + for i, node_name in enumerate(topo_order): + print(f"\n[PipelineComposer] --- node: {node_name} ({i+1}/{total}) ---", flush=True) + + # Skip parameter nodes — they're already in the artifact store + if node_name in params: + print(f"[PipelineComposer] skipping parameter node: {node_name}", flush=True) + continue + + # Memoization — if this node's output already exists, skip it + # Change run_id in the YAML config to force a fresh run + if artifact_store.exists(node_name): + print(f"[PipelineComposer] output exists, skipping: {node_name}", flush=True) + continue + + # If any upstream node failed, skip this node too + deps = list(dag.predecessors(node_name)) + blocked_by = [d for d in deps if d in failed_nodes] + if blocked_by: + print(f"[PipelineComposer] skipping {node_name}: upstream failed: {blocked_by}", flush=True) + failed_nodes.add(node_name) + continue + + # Load only the inputs this node declared — not the full pipeline state + # This minimises data transfer, especially important for Docker/Lambda + predecessors = list(self._resolve_predecessors(node_name)) + input_names = [p for p, _ in predecessors] + print(f"[PipelineComposer] loading inputs: {input_names}", flush=True) + kwargs_node = {p: artifact_store.get(n) for p, n in predecessors} + + # Get the executor for this node (memory / docker / lambda) + # Configured per-node in the YAML file + node_config = get_node_config(cfg, node_name) + executor = get_executor(node_config) + print(f"[PipelineComposer] executor: {type(executor).__name__}", flush=True) + + try: + # Run the node and save output to artifact store + result = executor.execute(node_name, funcs[node_name], kwargs_node) + artifact_store.put(node_name, result) + print(f"[PipelineComposer] node {node_name} done", flush=True) + + except Exception as e: + print(f"[PipelineComposer] ERROR in '{node_name}': {e}", flush=True) + failed_nodes.add(node_name) + if on_failure == "finish_running": + # Keep going — other independent nodes can still run + continue + else: + # Stop the pipeline immediately + raise + + if failed_nodes: + print(f"\n[PipelineComposer] finished with failures: {failed_nodes}", flush=True) + else: + print(f"\n[PipelineComposer] pipeline complete", flush=True) + + # Collect results from artifact store + all_results = { + name: artifact_store.get(name) + for name in funcs + if artifact_store.exists(name) + } + + # Return only the requested outputs, or everything if none specified + if outputs: + return {k: all_results[k] for k in outputs if k in all_results} + return all_results diff --git a/fn_graph/examples/solution/config.py b/fn_graph/examples/solution/config.py new file mode 100644 index 0000000..134108b --- /dev/null +++ b/fn_graph/examples/solution/config.py @@ -0,0 +1,79 @@ +import yaml + +from executor.memory import InMemoryExecutor +from executor.docker import DockerExecutor +from executor.lambda_executor import LambdaExecutor +from artifact_store.fs import LocalFSArtifactStore +from artifact_store.s3 import S3ArtifactStore + + +def load_config(yaml_path: str) -> dict: + print(f"[config] loading config from {yaml_path}", flush=True) + with open(yaml_path, "r") as f: + config = yaml.safe_load(f) + run_id = config["pipeline"]["run_id"] + store_type = config["artifact_store"]["type"] + nodes = config.get("nodes", {}) + print(f"[config] run_id: {run_id}", flush=True) + print(f"[config] artifact store type: {store_type}", flush=True) + print(f"[config] nodes configured: {list(nodes.keys())}", flush=True) + return config + + +def get_executor(node_config: dict): + """ + Build the right executor from a node config dict. + Supports retry config on DockerExecutor. + """ + if node_config is None: + node_config = {"executor": "memory"} + + executor_type = node_config.get("executor", "memory") + print(f"[config] creating executor: {executor_type}", flush=True) + + if executor_type == "memory": + return InMemoryExecutor() + + elif executor_type == "docker": + return DockerExecutor( + image=node_config.get("image", "fn_graph_worker_v2"), + retry=node_config.get("retry"), # None = no retry = 1 attempt + ) + + elif executor_type == "lambda": + return LambdaExecutor( + function_name=node_config["function_name"], + region=node_config["region"], + ) + + else: + raise ValueError(f"Unknown executor type: '{executor_type}'") + + +def get_artifact_store(config: dict): + store_cfg = config["artifact_store"] + store_type = store_cfg["type"] + run_id = config["pipeline"]["run_id"] + print(f"[config] creating artifact store: {store_type}", flush=True) + + if store_type == "fs": + return LocalFSArtifactStore(base_dir=store_cfg["base_dir"], run_id=run_id) + + elif store_type == "s3": + return S3ArtifactStore( + bucket=store_cfg["bucket"], + run_id=run_id, + region=store_cfg.get("region", "us-east-1"), + ) + + else: + raise ValueError(f"Unknown artifact store type: '{store_type}'") + + +def get_node_config(config: dict, node_name: str) -> dict: + """ + Returns the config for a node. + Falls back to '*' wildcard, then to memory default. + """ + nodes = config.get("nodes", {}) + return nodes.get(node_name) or nodes.get("*") or {"executor": "memory"} diff --git a/fn_graph/examples/solution/config/finance.yaml b/fn_graph/examples/solution/config/finance.yaml new file mode 100644 index 0000000..436933e --- /dev/null +++ b/fn_graph/examples/solution/config/finance.yaml @@ -0,0 +1,11 @@ +pipeline: + run_id: finance_run_001 + on_failure: stop + +artifact_store: + type: fs + base_dir: ./artifacts + +nodes: + "*": + executor: memory diff --git a/fn_graph/examples/solution/config/iris.yaml b/fn_graph/examples/solution/config/iris.yaml new file mode 100644 index 0000000..96e21ec --- /dev/null +++ b/fn_graph/examples/solution/config/iris.yaml @@ -0,0 +1,61 @@ +pipeline: + run_id: iris_run_001 + on_failure: finish_running # keep going if a node fails + +artifact_store: + type: fs + base_dir: ./artifacts + +nodes: + # --- in-process --- + iris: + executor: memory + investigate_data: + executor: memory + training_target: + executor: memory + test_target: + executor: memory + confusion_matrix: + executor: memory + + # --- docker --- + data: + executor: docker + image: fn_graph_worker_v2 + preprocess_data: + executor: docker + image: fn_graph_worker_v2 + model: + executor: docker + image: fn_graph_worker_v2 + retry: + max_attempts: 3 + delay_seconds: 5 + backoff: exponential # linear | exponential + + # --- lambda --- + split_data: + executor: lambda + function_name: fn_graph_worker_lambda + region: us-east-1 + training_features: + executor: lambda + function_name: fn_graph_worker_lambda + region: us-east-1 + test_features: + executor: lambda + function_name: fn_graph_worker_lambda + region: us-east-1 + predictions: + executor: lambda + function_name: fn_graph_worker_lambda + region: us-east-1 + classification_metrics: + executor: lambda + function_name: fn_graph_worker_lambda + region: us-east-1 + + # --- default for any node not listed above --- + "*": + executor: memory diff --git a/fn_graph/examples/solution/config/toy_docker.yaml b/fn_graph/examples/solution/config/toy_docker.yaml new file mode 100644 index 0000000..c6db1f3 --- /dev/null +++ b/fn_graph/examples/solution/config/toy_docker.yaml @@ -0,0 +1,19 @@ +pipeline: + run_id: toy_docker_001 + on_failure: finish_running + +artifact_store: + type: fs + base_dir: ./artifacts + +nodes: + run_model: + executor: docker + image: fn_graph_worker_v2 + retry: + max_attempts: 3 + delay_seconds: 2 + backoff: exponential + + "*": + executor: memory diff --git a/fn_graph/examples/solution/config/toy_lambda.yaml b/fn_graph/examples/solution/config/toy_lambda.yaml new file mode 100644 index 0000000..85c8d1b --- /dev/null +++ b/fn_graph/examples/solution/config/toy_lambda.yaml @@ -0,0 +1,16 @@ +pipeline: + run_id: toy_lambda_001 + on_failure: finish_running + +artifact_store: + type: fs + base_dir: ./artifacts + +nodes: + run_model: + executor: lambda + function_name: fn_graph_worker_lambda + region: us-east-1 + + "*": + executor: memory diff --git a/fn_graph/examples/solution/config/toy_local.yaml b/fn_graph/examples/solution/config/toy_local.yaml new file mode 100644 index 0000000..73ac462 --- /dev/null +++ b/fn_graph/examples/solution/config/toy_local.yaml @@ -0,0 +1,11 @@ +pipeline: + run_id: toy_run_001 + on_failure: finish_running + +artifact_store: + type: fs + base_dir: ./artifacts + +nodes: + "*": + executor: memory diff --git a/fn_graph/examples/solution/deploy_lambda.py b/fn_graph/examples/solution/deploy_lambda.py new file mode 100644 index 0000000..649d7ba --- /dev/null +++ b/fn_graph/examples/solution/deploy_lambda.py @@ -0,0 +1,163 @@ +""" +Deploy fn_graph Lambda worker. + +Two modes: + --mode zip (default) Package handler + cloudpickle into a .zip, upload directly. + No ECR, no Docker needed. Use this for testing. + --mode ecr Build Docker image, push to ECR, deploy container Lambda. + Use this for production (handles sklearn/numpy size limit). + +Usage: + # Zip deploy (testing — no ECR needed) + python deploy_lambda.py --mode zip --region us-east-1 --role arn:aws:iam::123456789012:role/my-role + + # ECR deploy (production) + python deploy_lambda.py --mode ecr --region us-east-1 --account 123456789012 --role arn:aws:iam::123456789012:role/my-role +""" + +import io +import subprocess +import sys +import zipfile +from pathlib import Path + + +def run(cmd: list, check=True): + print(f"[deploy] $ {' '.join(cmd)}", flush=True) + return subprocess.run(cmd, capture_output=False, text=True, check=check) + + +def deploy_zip(args): + import tempfile, shutil + print(f"\n[deploy/zip] function: {args.function}") + print(f"[deploy/zip] region: {args.region}") + + print("\n[deploy/zip] === step 1: building zip ===") + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf: + handler_path = Path(__file__).parent / "worker" / "lambda_handler.py" + print(f"[deploy/zip] adding {handler_path}", flush=True) + zf.write(handler_path, "lambda_handler.py") + + print(f"[deploy/zip] installing cloudpickle...", flush=True) + tmp = Path(tempfile.mkdtemp()) + subprocess.run( + [sys.executable, "-m", "pip", "install", "cloudpickle", "-t", str(tmp), "-q"], + check=True, + ) + for file in tmp.rglob("*"): + if file.is_file() and "__pycache__" not in str(file): + zf.write(file, str(file.relative_to(tmp))) + shutil.rmtree(tmp) + + zip_bytes = zip_buffer.getvalue() + print(f"[deploy/zip] zip size: {len(zip_bytes) / (1024*1024):.2f} MB", flush=True) + + print("\n[deploy/zip] === step 2: create or update Lambda function ===") + import boto3 + client = boto3.client("lambda", region_name=args.region) + + try: + client.get_function(FunctionName=args.function) + exists = True + except client.exceptions.ResourceNotFoundException: + exists = False + + if exists: + print(f"[deploy/zip] updating existing function: {args.function}", flush=True) + client.update_function_code(FunctionName=args.function, ZipFile=zip_bytes) + else: + if not args.role: + print("[deploy/zip] ERROR: --role is required when creating a new Lambda function.") + sys.exit(1) + print(f"[deploy/zip] creating new function: {args.function}", flush=True) + client.create_function( + FunctionName=args.function, + Runtime="python3.12", + Role=args.role, + Handler="lambda_handler.handler", + Code={"ZipFile": zip_bytes}, + Timeout=300, + MemorySize=512, + ) + + print(f"\n[deploy/zip] done. '{args.function}' is live in {args.region}") + print(f"[deploy/zip] test with: python toy_pipeline.py --config config/toy_lambda.yaml") + + +def deploy_ecr(args): + if not args.account: + print("[deploy/ecr] ERROR: --account is required for ECR deploy.") + sys.exit(1) + + image_name = args.function + ecr_registry = f"{args.account}.dkr.ecr.{args.region}.amazonaws.com" + ecr_uri = f"{ecr_registry}/{image_name}:latest" + + print(f"\n[deploy/ecr] region: {args.region}") + print(f"[deploy/ecr] account: {args.account}") + print(f"[deploy/ecr] ECR URI: {ecr_uri}") + + print("\n[deploy/ecr] === step 1: ECR login ===") + login = subprocess.run( + ["aws", "ecr", "get-login-password", "--region", args.region], + capture_output=True, text=True, check=True, + ) + subprocess.run( + ["docker", "login", "--username", "AWS", "--password-stdin", ecr_registry], + input=login.stdout, text=True, check=True, + ) + + print("\n[deploy/ecr] === step 2: ensure ECR repo exists ===") + subprocess.run( + ["aws", "ecr", "create-repository", "--repository-name", image_name, "--region", args.region], + capture_output=True, check=False, + ) + + print("\n[deploy/ecr] === step 3: build image ===") + run(["docker", "build", "-f", "worker/Dockerfile.lambda", "-t", image_name, "worker/"]) + + print("\n[deploy/ecr] === step 4: tag and push ===") + run(["docker", "tag", f"{image_name}:latest", ecr_uri]) + run(["docker", "push", ecr_uri]) + + print("\n[deploy/ecr] === step 5: create or update Lambda function ===") + check = subprocess.run( + ["aws", "lambda", "get-function", "--function-name", args.function, "--region", args.region], + capture_output=True, check=False, + ) + + if check.returncode == 0: + run(["aws", "lambda", "update-function-code", "--function-name", args.function, + "--image-uri", ecr_uri, "--region", args.region]) + else: + if not args.role: + print("[deploy/ecr] ERROR: --role is required when creating a new Lambda function.") + sys.exit(1) + run(["aws", "lambda", "create-function", "--function-name", args.function, + "--package-type", "Image", "--code", f"ImageUri={ecr_uri}", + "--role", args.role, "--region", args.region, "--timeout", "300", "--memory-size", "512"]) + + print(f"\n[deploy/ecr] done. '{args.function}' is live at {ecr_uri}") + print(f"[deploy/ecr] test with: python toy_pipeline.py --config config/toy_lambda.yaml") + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Deploy fn_graph Lambda worker.") + parser.add_argument("--mode", choices=["zip", "ecr"], default="zip", + help="zip = direct upload, no ECR (default). ecr = Docker image via ECR.") + parser.add_argument("--region", required=True, help="AWS region, e.g. us-east-1") + parser.add_argument("--account", default="", help="AWS account ID (required for ecr mode)") + parser.add_argument("--function", default="fn_graph_worker_lambda", help="Lambda function name") + parser.add_argument("--role", default="", help="IAM role ARN (required on first deploy)") + args = parser.parse_args() + + if args.mode == "zip": + deploy_zip(args) + else: + deploy_ecr(args) + + +if __name__ == "__main__": + main() diff --git a/fn_graph/examples/solution/executor/__init__.py b/fn_graph/examples/solution/executor/__init__.py new file mode 100644 index 0000000..9972aba --- /dev/null +++ b/fn_graph/examples/solution/executor/__init__.py @@ -0,0 +1,4 @@ +from .base import BaseExecutor +from .memory import InMemoryExecutor +from .docker import DockerExecutor +from .lambda_executor import LambdaExecutor diff --git a/fn_graph/examples/solution/executor/base.py b/fn_graph/examples/solution/executor/base.py new file mode 100644 index 0000000..6163a44 --- /dev/null +++ b/fn_graph/examples/solution/executor/base.py @@ -0,0 +1,6 @@ +from abc import ABC, abstractmethod +from typing import Any, Callable + +class BaseExecutor(ABC): + @abstractmethod + def execute(self, node_name: str, fn: Callable, kwargs: dict) -> Any: ... diff --git a/fn_graph/examples/solution/executor/docker.py b/fn_graph/examples/solution/executor/docker.py new file mode 100644 index 0000000..7b6a463 --- /dev/null +++ b/fn_graph/examples/solution/executor/docker.py @@ -0,0 +1,102 @@ +import base64 +import inspect +import socket +import subprocess +import time +from typing import Any, Callable + +import cloudpickle +import requests + +from .base import BaseExecutor + +_HEALTH_TIMEOUT = 30 +_HEALTH_INTERVAL = 0.5 + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +class DockerExecutor(BaseExecutor): + def __init__(self, image: str, retry: dict = None): + self.image = image + # retry config: { max_attempts, delay_seconds, backoff: linear|exponential } + self.retry = retry or {"max_attempts": 1, "delay_seconds": 0, "backoff": "linear"} + + def _run_once(self, node_name: str, fn: Callable, kwargs: dict) -> Any: + print(f"[DockerExecutor] starting container for node: {node_name}", flush=True) + print(f"[DockerExecutor] image: {self.image}", flush=True) + + port = _free_port() + print(f"[DockerExecutor] assigned port: {port}", flush=True) + + container_id = None + try: + result = subprocess.run( + ["docker", "run", "-d", "-p", f"{port}:8000", self.image], + capture_output=True, text=True, check=True, + ) + container_id = result.stdout.strip() + print(f"[DockerExecutor] container id: {container_id}", flush=True) + + deadline = time.time() + _HEALTH_TIMEOUT + while True: + try: + resp = requests.get(f"http://localhost:{port}/health", timeout=2) + if resp.status_code == 200: + break + except requests.exceptions.ConnectionError: + pass + if time.time() > deadline: + raise TimeoutError(f"Worker container did not become healthy within {_HEALTH_TIMEOUT}s") + time.sleep(_HEALTH_INTERVAL) + + print(f"[DockerExecutor] container healthy, sending work", flush=True) + + fn_source = inspect.getsource(fn) + kwargs_b64 = base64.b64encode(cloudpickle.dumps(kwargs, protocol=4)).decode() + + print(f"[DockerExecutor] posting to /execute, inputs: {list(kwargs.keys())}", flush=True) + resp = requests.post( + f"http://localhost:{port}/execute", + json={"node_name": node_name, "fn_source": fn_source, "kwargs_b64": kwargs_b64}, + timeout=300, + ) + print(f"[DockerExecutor] response received, status: {resp.status_code}", flush=True) + + if resp.status_code == 500: + payload = resp.json() + print(f"[DockerExecutor] ERROR in node '{node_name}': {payload.get('error')}", flush=True) + print(f"[DockerExecutor] traceback:\n{payload.get('traceback', '')}", flush=True) + raise RuntimeError(f"Worker error in node '{node_name}': {payload.get('error')}") + + resp.raise_for_status() + payload = resp.json() + result = cloudpickle.loads(base64.b64decode(payload["result_b64"])) + print(f"[DockerExecutor] node {node_name} complete, output type: {type(result).__name__}", flush=True) + return result + finally: + if container_id: + subprocess.run(["docker", "stop", container_id], capture_output=True) + subprocess.run(["docker", "rm", container_id], capture_output=True) + print(f"[DockerExecutor] container stopped and removed", flush=True) + + def execute(self, node_name: str, fn: Callable, kwargs: dict) -> Any: + max_attempts = self.retry.get("max_attempts", 1) + delay = self.retry.get("delay_seconds", 0) + backoff = self.retry.get("backoff", "linear") + + for attempt in range(1, max_attempts + 1): + try: + return self._run_once(node_name, fn, kwargs) + except Exception as e: + if attempt == max_attempts: + print(f"[DockerExecutor] all {max_attempts} attempts failed for node '{node_name}'", flush=True) + raise + wait = delay * (2 ** (attempt - 1)) if backoff == "exponential" else delay + print(f"[DockerExecutor] attempt {attempt} failed: {e}. retrying in {wait}s...", flush=True) + time.sleep(wait) diff --git a/fn_graph/examples/solution/executor/lambda_executor.py b/fn_graph/examples/solution/executor/lambda_executor.py new file mode 100644 index 0000000..0fa1908 --- /dev/null +++ b/fn_graph/examples/solution/executor/lambda_executor.py @@ -0,0 +1,57 @@ +import base64 +import inspect +import json +from typing import Any, Callable + +import boto3 +import cloudpickle + +from .base import BaseExecutor + +_LAMBDA_PAYLOAD_LIMIT = 6 * 1024 * 1024 # 6 MB + + +class LambdaExecutor(BaseExecutor): + def __init__(self, function_name: str, region: str): + self.function_name = function_name + self.region = region + + def execute(self, node_name: str, fn: Callable, kwargs: dict) -> Any: + print(f"[LambdaExecutor] invoking Lambda for node: {node_name}", flush=True) + print(f"[LambdaExecutor] function: {self.function_name}, region: {self.region}", flush=True) + + fn_source = inspect.getsource(fn) + kwargs_b64 = base64.b64encode(cloudpickle.dumps(kwargs, protocol=4)).decode() + payload = json.dumps({"node_name": node_name, "fn_source": fn_source, "kwargs_b64": kwargs_b64}) + payload_bytes = payload.encode() + size_mb = len(payload_bytes) / (1024 * 1024) + + if len(payload_bytes) > _LAMBDA_PAYLOAD_LIMIT: + print(f"[LambdaExecutor] ERROR: payload {size_mb:.2f}MB exceeds Lambda 6MB limit", flush=True) + raise ValueError( + f"Payload for node '{node_name}' is {size_mb:.2f}MB, which exceeds the 6MB Lambda " + "limit. Switch to an S3 artifact store and pass S3 paths instead." + ) + + print(f"[LambdaExecutor] payload size: {size_mb:.2f}MB, invoking...", flush=True) + + client = boto3.client("lambda", region_name=self.region) + response = client.invoke( + FunctionName=self.function_name, + InvocationType="RequestResponse", + Payload=payload_bytes, + ) + print(f"[LambdaExecutor] Lambda response received", flush=True) + + raw = response["Payload"].read() + result_payload = json.loads(raw) + + status_code = result_payload.get("statusCode", 200) + if status_code == 500 or "error" in result_payload: + print(f"[LambdaExecutor] ERROR in node '{node_name}': {result_payload.get('error')}", flush=True) + print(f"[LambdaExecutor] traceback:\n{result_payload.get('traceback', '')}", flush=True) + raise RuntimeError(f"Lambda error in node '{node_name}': {result_payload.get('error')}") + + result = cloudpickle.loads(base64.b64decode(result_payload["result_b64"])) + print(f"[LambdaExecutor] node {node_name} complete, output type: {type(result).__name__}", flush=True) + return result diff --git a/fn_graph/examples/solution/executor/memory.py b/fn_graph/examples/solution/executor/memory.py new file mode 100644 index 0000000..cebe81a --- /dev/null +++ b/fn_graph/examples/solution/executor/memory.py @@ -0,0 +1,12 @@ +from typing import Any, Callable + +from .base import BaseExecutor + + +class InMemoryExecutor(BaseExecutor): + def execute(self, node_name: str, fn: Callable, kwargs: dict) -> Any: + print(f"[InMemoryExecutor] running node: {node_name} directly in process", flush=True) + print(f"[InMemoryExecutor] inputs: {list(kwargs.keys())}", flush=True) + result = fn(**kwargs) + print(f"[InMemoryExecutor] node {node_name} complete, output type: {type(result).__name__}", flush=True) + return result diff --git a/fn_graph/examples/solution/run_pipeline.py b/fn_graph/examples/solution/run_pipeline.py new file mode 100644 index 0000000..8f2f617 --- /dev/null +++ b/fn_graph/examples/solution/run_pipeline.py @@ -0,0 +1,134 @@ +""" +Entry point for the pluggable pipeline orchestration layer. + +Usage: + python run_pipeline.py --pipeline fn_graph.examples.machine_learning --config config/iris.yaml + python run_pipeline.py --pipeline fn_graph.examples.finance --config config/finance.yaml +""" + +import argparse +import importlib +import sys +from datetime import datetime +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) + +try: + from dotenv import load_dotenv + load_dotenv() +except ImportError: + pass + +from config import load_config, get_artifact_store, get_node_config +from composer import PipelineComposer + + +class _Tee: + """Duplicates writes to both stdout and a log file.""" + def __init__(self, real, log_file): + self._real = real + self._log = log_file + + def write(self, data): + self._real.write(data) + self._log.write(data) + + def flush(self): + self._real.flush() + self._log.flush() + + def fileno(self): + return self._real.fileno() + + +def _setup_log(pipeline: str, run_id: str) -> Path: + pipeline_name = pipeline.split(".")[-1] + timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + log_dir = Path(__file__).parent / "logs" / pipeline_name / run_id + log_dir.mkdir(parents=True, exist_ok=True) + log_path = log_dir / f"{timestamp}.log" + return log_path + + +def main(): + print("[run_pipeline] starting", flush=True) + + parser = argparse.ArgumentParser(description="Run an fn_graph pipeline.") + parser.add_argument("--pipeline", required=True, help="Dotted module path, e.g. fn_graph.examples.machine_learning") + parser.add_argument("--config", required=True, help="Path to config yaml, e.g. config/iris.yaml") + args = parser.parse_args() + + print(f"[run_pipeline] pipeline: {args.pipeline}", flush=True) + print(f"[run_pipeline] config: {args.config}", flush=True) + + module = importlib.import_module(args.pipeline) + f = module.f + + # Pandas >= 2.0 compatibility patch for finance pipeline + if hasattr(module, "cumulative_return"): + import inspect as _inspect + try: + _src = _inspect.getsource(module.cumulative_return) + except OSError: + _src = "" + if "total_position[-1]" in _src: + def cumulative_return(total_position): + return 100 * (total_position.iloc[-1] / total_position.iloc[0] - 1) + f = f.update(cumulative_return=cumulative_return) + print("[run_pipeline] applied pandas compatibility patch: cumulative_return", flush=True) + + config = load_config(args.config) + run_id = config["pipeline"]["run_id"] + on_failure = config["pipeline"].get("on_failure", "stop") + + # Logging setup + log_path = _setup_log(args.pipeline, run_id) + log_file = open(log_path, "w", encoding="utf-8") + sys.stdout = _Tee(sys.__stdout__, log_file) + print(f"[run_pipeline] logging to: {log_path}", flush=True) + + artifact_store = get_artifact_store(config) + print(f"[run_pipeline] artifact store: {type(artifact_store).__name__}", flush=True) + + # Build per-node execution config for every node in the DAG + all_nodes = list(f.dag().nodes()) + execution_config = {node: get_node_config(config, node) for node in all_nodes} + + print("[run_pipeline] executors configured:", flush=True) + for node_name, node_cfg in execution_config.items(): + print(f" {node_name}: {node_cfg.get('executor', 'memory')}", flush=True) + + pipeline = PipelineComposer( + execution_config=execution_config, + artifact_store=artifact_store, + on_failure=on_failure, + ) + results = pipeline.run(f) + + print("\n" + "=" * 60, flush=True) + print("=== Pipeline Results ===", flush=True) + import matplotlib.figure + for name, value in results.items(): + if isinstance(value, str): + print(f"\n[{name}]\n{value}", flush=True) + elif isinstance(value, (int, float)): + print(f"[{name}] {value}", flush=True) + elif isinstance(value, matplotlib.figure.Figure): + out_path = Path(args.config).parent / f"{name}.png" + value.savefig(out_path) + print(f"[{name}] figure saved to {out_path}", flush=True) + elif hasattr(value, "get_figure"): + out_path = Path(args.config).parent / f"{name}.png" + value.get_figure().savefig(out_path) + print(f"[{name}] figure saved to {out_path}", flush=True) + else: + print(f"[{name}] {type(value).__name__}", flush=True) + + print(f"\n[run_pipeline] log saved to: {log_path}", flush=True) + log_file.close() + sys.stdout = sys.__stdout__ + + +if __name__ == "__main__": + main() diff --git a/fn_graph/examples/solution/toy_pipeline.py b/fn_graph/examples/solution/toy_pipeline.py new file mode 100644 index 0000000..02320a2 --- /dev/null +++ b/fn_graph/examples/solution/toy_pipeline.py @@ -0,0 +1,90 @@ +""" +Toy pipeline — tests the full orchestration layer without needing +QXR mono repo access or real Docker/Lambda. + +Four dummy nodes: load_data -> preprocess -> run_model -> postprocess + +Run (all in memory): + python toy_pipeline.py --config config/toy_local.yaml + +Run (model node in docker, rest in memory): + python toy_pipeline.py --config config/toy_docker.yaml +""" + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +from fn_graph import Composer + +# ── Node functions ──────────────────────────────────────────────────────────── +# These are "interior functions" — we never touch them. +# They receive kwargs matching upstream node names. + +def load_data(raw_path: str) -> list: + print(f" [load_data] reading from {raw_path}") + return [{"id": 1, "value": 10}, {"id": 2, "value": 20}] + + +def preprocess(load_data: list) -> list: + print(f" [preprocess] normalising {len(load_data)} records") + return [{"id": r["id"], "value": r["value"] / 100.0} for r in load_data] + + +def run_model(preprocess: list) -> list: + print(f" [run_model] scoring {len(preprocess)} samples") + return [{"id": r["id"], "score": round(r["value"] * 0.9, 4)} for r in preprocess] + + +def postprocess(run_model: list) -> dict: + print(f" [postprocess] filtering results") + results = [s for s in run_model if s["score"] > 0.05] + return {"results": results, "count": len(results)} + + +# ── Build fn_graph Composer ─────────────────────────────────────────────────── +# fn_graph infers the DAG from function signatures — no explicit wiring needed. + +f = ( + Composer() + .update( + load_data=load_data, + preprocess=preprocess, + run_model=run_model, + postprocess=postprocess, + ) + .update_parameters(raw_path="/data/input.csv") +) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import argparse + from config import load_config, get_artifact_store, get_node_config + from composer import PipelineComposer + + parser = argparse.ArgumentParser() + parser.add_argument("--config", default="config/toy_local.yaml") + args = parser.parse_args() + + config = load_config(args.config) + on_failure = config["pipeline"].get("on_failure", "stop") + artifact_store = get_artifact_store(config) + + all_nodes = list(f.dag().nodes()) + execution_config = {node: get_node_config(config, node) for node in all_nodes} + + print("\nTopological order:", list(f.dag().nodes())) + print("Executors:", {n: c.get("executor") for n, c in execution_config.items()}) + + pipeline = PipelineComposer( + execution_config=execution_config, + artifact_store=artifact_store, + on_failure=on_failure, + ) + results = pipeline.run(f) + + print("\n── Final outputs ──") + for name, value in results.items(): + print(f" {name}: {value}") diff --git a/fn_graph/examples/solution/worker/Dockerfile b/fn_graph/examples/solution/worker/Dockerfile new file mode 100644 index 0000000..9928c36 --- /dev/null +++ b/fn_graph/examples/solution/worker/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install dependencies +RUN pip install --no-cache-dir \ + fastapi \ + uvicorn \ + cloudpickle \ + scikit-learn \ + pandas \ + numpy \ + seaborn \ + matplotlib \ + networkx + +# Copy only the worker server — node functions arrive at runtime via fn_source +COPY server.py . + +EXPOSE 8000 + +CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/fn_graph/examples/solution/worker/Dockerfile.lambda b/fn_graph/examples/solution/worker/Dockerfile.lambda new file mode 100644 index 0000000..3276ab3 --- /dev/null +++ b/fn_graph/examples/solution/worker/Dockerfile.lambda @@ -0,0 +1,17 @@ +# Lambda container image — pushed to ECR, invoked by LambdaExecutor +FROM public.ecr.aws/lambda/python:3.12 + +# Install dependencies +RUN pip install --no-cache-dir \ + cloudpickle \ + scikit-learn \ + pandas \ + numpy \ + seaborn \ + matplotlib \ + networkx + +# Copy handler — node functions arrive at runtime via fn_source in event payload +COPY lambda_handler.py ${LAMBDA_TASK_ROOT}/ + +CMD ["lambda_handler.handler"] diff --git a/fn_graph/examples/solution/worker/lambda_handler.py b/fn_graph/examples/solution/worker/lambda_handler.py new file mode 100644 index 0000000..fde8240 --- /dev/null +++ b/fn_graph/examples/solution/worker/lambda_handler.py @@ -0,0 +1,82 @@ +""" +Lambda handler — runs inside the Lambda container (ECR image). +Receives the same payload as the Docker worker's /execute endpoint, +executes the node function, and returns the result. +""" +import base64 +import json +import traceback as tb_module + +import cloudpickle + +# Pre-populate namespace same as Docker worker so node functions work unchanged +_BASE_NAMESPACE: dict = {} +for _mod_name, _alias in [ + ("sklearn", "sklearn"), + ("sklearn.datasets", "sklearn.datasets"), + ("sklearn.svm", "sklearn.svm"), + ("sklearn.linear_model", "sklearn.linear_model"), + ("sklearn.metrics", "sklearn.metrics"), + ("sklearn.preprocessing", "sklearn.preprocessing"), + ("sklearn.model_selection", "sklearn.model_selection"), + ("pandas", "pd"), + ("numpy", "np"), + ("seaborn", "sns"), + ("matplotlib.pylab", "plt"), + ("matplotlib", "matplotlib"), + ("networkx", "nx"), +]: + try: + import importlib as _il + _mod = _il.import_module(_mod_name) + _BASE_NAMESPACE[_alias] = _mod + _BASE_NAMESPACE[_mod_name.split(".")[0]] = _il.import_module(_mod_name.split(".")[0]) + except ImportError: + pass + +try: + from sklearn.model_selection import train_test_split as _tts + _BASE_NAMESPACE["train_test_split"] = _tts +except ImportError: + pass + + +def handler(event, context): + """ + AWS Lambda entry point. + + Expected event keys: + node_name str name of the node function to run + fn_source str source code of the function (inspect.getsource) + kwargs_b64 str base64(cloudpickle(kwargs dict)) + + Returns: + { result_b64: str } on success + { statusCode: 500, error: str, traceback: str } on failure + """ + node_name = event.get("node_name", "") + print(f"[lambda_handler] received node: {node_name}", flush=True) + + try: + fn_source = event["fn_source"] + kwargs_b64 = event["kwargs_b64"] + + namespace = dict(_BASE_NAMESPACE) + exec(fn_source, namespace) + fn = namespace[node_name] + print(f"[lambda_handler] function loaded: {node_name}", flush=True) + + kwargs = cloudpickle.loads(base64.b64decode(kwargs_b64)) + print(f"[lambda_handler] inputs: {list(kwargs.keys())}", flush=True) + + result = fn(**kwargs) + print(f"[lambda_handler] {node_name} complete, output type: {type(result).__name__}", flush=True) + + result_b64 = base64.b64encode(cloudpickle.dumps(result, protocol=4)).decode() + return {"result_b64": result_b64} + + except Exception as e: + tb = tb_module.format_exc() + print(f"[lambda_handler] ERROR in node '{node_name}': {e}", flush=True) + print(f"[lambda_handler] traceback:\n{tb}", flush=True) + return {"statusCode": 500, "error": str(e), "traceback": tb} diff --git a/fn_graph/examples/solution/worker/server.py b/fn_graph/examples/solution/worker/server.py new file mode 100644 index 0000000..1a7155f --- /dev/null +++ b/fn_graph/examples/solution/worker/server.py @@ -0,0 +1,81 @@ +import base64 +import traceback as tb_module + +import cloudpickle +from fastapi import FastAPI +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +app = FastAPI() + +_BASE_NAMESPACE: dict = {} +for _mod_name, _alias in [ + ("sklearn", "sklearn"), + ("sklearn.datasets", "sklearn.datasets"), + ("sklearn.svm", "sklearn.svm"), + ("sklearn.linear_model", "sklearn.linear_model"), + ("sklearn.metrics", "sklearn.metrics"), + ("sklearn.preprocessing", "sklearn.preprocessing"), + ("sklearn.model_selection", "sklearn.model_selection"), + ("pandas", "pd"), + ("numpy", "np"), + ("seaborn", "sns"), + ("matplotlib.pylab", "plt"), + ("matplotlib", "matplotlib"), + ("networkx", "nx"), +]: + try: + import importlib as _il + _mod = _il.import_module(_mod_name) + _BASE_NAMESPACE[_alias] = _mod + _BASE_NAMESPACE[_mod_name.split(".")[0]] = _il.import_module(_mod_name.split(".")[0]) + except ImportError: + pass + +try: + from sklearn.model_selection import train_test_split as _tts + _BASE_NAMESPACE["train_test_split"] = _tts +except ImportError: + pass + + +class ExecuteRequest(BaseModel): + node_name: str + fn_source: str + kwargs_b64: str + + +@app.get("/health") +def health(): + print("[worker] health check received", flush=True) + return {"status": "ok"} + + +@app.post("/execute") +def execute(req: ExecuteRequest): + try: + print(f"[worker] received request for node: {req.node_name}", flush=True) + print(f"[worker] fn_source length: {len(req.fn_source)} chars", flush=True) + + namespace = dict(_BASE_NAMESPACE) + exec(req.fn_source, namespace) + fn = namespace[req.node_name] + print(f"[worker] function loaded: {req.node_name}", flush=True) + + kwargs = cloudpickle.loads(base64.b64decode(req.kwargs_b64)) + print(f"[worker] inputs loaded: {list(kwargs.keys())}", flush=True) + + print(f"[worker] running {req.node_name}...", flush=True) + result = fn(**kwargs) + print(f"[worker] {req.node_name} complete, output type: {type(result).__name__}", flush=True) + + result_b64 = base64.b64encode(cloudpickle.dumps(result, protocol=4)).decode() + return {"result_b64": result_b64} + except Exception as e: + tb = tb_module.format_exc() + print(f"[worker] ERROR in node '{req.node_name}': {e}", flush=True) + print(f"[worker] traceback:\n{tb}", flush=True) + return JSONResponse( + status_code=500, + content={"error": str(e), "traceback": tb}, + ) diff --git a/fn_graph/examples/stock_market.py b/fn_graph/examples/stock_market.py index 3bbd426..08f546b 100644 --- a/fn_graph/examples/stock_market.py +++ b/fn_graph/examples/stock_market.py @@ -1,34 +1,26 @@ """ -A simple example that works out the market capitalization fo a couple of stocks. +A simple example that works out the market capitalization of a couple of stocks. """ -# %% -from fn_graph import Composer from pathlib import Path + import pandas as pd import numpy as np import plotly.express as px +from fn_graph.examples.solution.composer import PipelineComposer as Composer + data_path = Path(__file__).parent def share_prices(): - """ - Load the share price data - """ return pd.read_csv(data_path / "share_prices.csv", parse_dates=["datetime"]) def shares_in_issue(): - """ - Load the shares issued data - """ return pd.read_csv(data_path / "shares_in_issue.csv", parse_dates=["datetime"]) def daily_share_prices(share_prices): - """ - Ensure that every day has the full set of share prices - """ return ( share_prices.groupby("share_code") .apply(lambda df: df.set_index("datetime").resample("1D").ffill().reset_index()) @@ -38,25 +30,16 @@ def daily_share_prices(share_prices): def market_cap(daily_share_prices, shares_in_issue): - """ - Merge the datasets intelligently over time and calculate market cap - """ return pd.merge_asof( daily_share_prices, shares_in_issue, on="datetime", by="share_code" ).assign(market_cap=lambda df: df.share_price * df.shares_in_issue) def total_market_cap(market_cap): - """ - Workout the total market cap - """ return market_cap.groupby("datetime", as_index=False).market_cap.sum() def total_market_cap_change(total_market_cap, swing_threshold): - """ - Calculate the changes in market cap - """ return total_market_cap.assign( market_cap_change=lambda df: df.market_cap.diff() ).assign( @@ -67,9 +50,6 @@ def total_market_cap_change(total_market_cap, swing_threshold): def plot_market_caps(market_cap): - """ - Plot the individual market caps - """ return px.area( market_cap, x="datetime", @@ -80,16 +60,10 @@ def plot_market_caps(market_cap): def plot_total_market_cap(total_market_cap): - """ - Plot the total market cap - """ return px.line(total_market_cap, x="datetime", y="market_cap") def plot_market_cap_changes(total_market_cap_change): - """ - Plot the market cap changes - """ return px.bar( total_market_cap_change, x="datetime",