Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,5 @@ Digraph.gv.pdf
.fn_graph_cache

# Sandbox files for developers
sandbox.py
sandbox.py.DS_Store
fn_graph/.DS_Store
4 changes: 1 addition & 3 deletions fn_graph/examples/car_savings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""
A simple example showing basic functionality.
"""
#%%
from random import choice, random

import pandas as pd
import plotly.express as px
from fn_graph import Composer
from fn_graph.examples.solution.composer import PipelineComposer as Composer

prices = [random() * 100_000 + 50000 for _ in range(10)]

Expand All @@ -18,7 +17,6 @@ def get_car_prices():
price=prices,
)
)

return df


Expand Down
45 changes: 7 additions & 38 deletions fn_graph/examples/finance.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,46 @@
"""
In finance, the Sharpe ratio (also known as the Sharpe index, the Sharpe measure,
and the reward-to-variability ratio) measures the performance of an investment
(e.g., a security or portfolio) compared to a risk-free asset, after adjusting
for its risk. It is defined as the difference between the returns of the
investment and the risk-free return, divided by the standard deviation of the
investment (i.e., its volatility). It represents the additional amount of return
that an investor receives per unit of increase in risk.

This shows how to calculate a the Sharoe ratio for a small portfolio of shares. The
share data us pulled from yahoo finance and the analysis is done in pandas. We assume
a risk free rate of zero.
In finance, the Sharpe ratio measures the performance of an investment compared
to a risk-free asset, after adjusting for its risk.

This shows how to calculate the Sharpe ratio for a small portfolio of shares.
"""

from datetime import date
from math import sqrt
from pathlib import Path

import matplotlib.pyplot as plt
import pandas as pd
import yfinance as yf
from fn_graph import Composer
from pandas.plotting import register_matplotlib_converters

from fn_graph.examples.solution.composer import PipelineComposer as Composer

register_matplotlib_converters()
plt.style.use("fivethirtyeight")


def closing_prices(share_allocations, start_date, end_date):
"""
The closing prices of our portfolio pulled with yfinance.
"""
data = yf.download(
" ".join(share_allocations.keys()), start=start_date, end=end_date
)
return data["Close"]


def normalised_returns(closing_prices):
"""
Normalise the returns as a ratio of the initial price.
"""
return closing_prices / closing_prices.iloc[0, :]


def positions(normalised_returns, share_allocations, initial_total_position):
"""
Our total positions ovr time given an initial allocation.
"""

allocations = pd.DataFrame(
{
symbol: normalised_returns[symbol] * allocation
for symbol, allocation in share_allocations.items()
}
)

return allocations * initial_total_position


def total_position(positions):
"""
The total value of out portfolio
"""
return positions.sum(axis=1)


Expand All @@ -70,23 +49,14 @@ def positions_plot(positions):


def cumulative_return(total_position):
"""
The cumulative return of our portfolio
"""
return 100 * (total_position[-1] / total_position[0] - 1)
return 100 * (total_position.iloc[-1] / total_position.iloc[0] - 1)


def daily_return(total_position):
"""
The daily return of our portfolio
"""
return total_position.pct_change(1)


def sharpe_ratio(daily_return):
"""
The sharpe ratio of the portfolio assuming a zero risk free rate.
"""
return daily_return.mean() / daily_return.std()


Expand Down Expand Up @@ -116,5 +86,4 @@ def annual_sharpe_ratio(sharpe_ratio, trading_days_in_a_year=252):
.cache()
)

# Just for uniformity with the rest of the examples
f = composer
61 changes: 7 additions & 54 deletions fn_graph/examples/machine_learning.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,40 @@
"""
A simple machine learning example that builds a classifier for the standard iris dataset.

This example uses scikit-learn to build a a classifier to detect the species of an iris
flower based on attributes of the flower. Based on the parameters different types of models
can be trained, and preprocessing can be turned on and off. It also show cases the integration
of visualisations to measure the accuracy of the model.
"""

from fn_graph import Composer
import sklearn, sklearn.datasets, sklearn.svm, sklearn.linear_model, sklearn.metrics
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pylab as plt

from fn_graph.examples.solution.composer import PipelineComposer as Composer


def iris():
"""
Load the classic iris dataset
"""
return sklearn.datasets.load_iris()


def data(iris):
"""
Pull out the data as pandas DataFrame
"""
df_train = pd.DataFrame(
iris.data, columns=["feature{}".format(i) for i in range(4)]
)
return df_train.assign(y=iris.target)


def investigate_data(data):
"""
Check for any visual correlations using seaborn
"""
return sns.pairplot(data, hue="y")


def preprocess_data(data, do_preprocess):
"""
Preprocess the data by scaling depending on the parameter

We make sure we don't mutate the data because that is better practice.
"""
processed = data.copy()
if do_preprocess:
processed.iloc[:, :-1] = sklearn.preprocessing.scale(processed.iloc[:, :-1])
return processed


def split_data(preprocess_data):
"""
Split the data into test and train sets
"""
return dict(
zip(
("training_features", "test_features", "training_target", "test_target"),
Expand All @@ -64,9 +43,6 @@ def split_data(preprocess_data):
)


# This is done verbosely purpose, but it could be more concise


def training_features(split_data):
return split_data["training_features"]

Expand All @@ -84,43 +60,25 @@ def test_target(split_data):


def model(training_features, training_target, model_type):
"""
Train the model
"""
if model_type == "ols":
model = sklearn.linear_model.LogisticRegression()
m = sklearn.linear_model.LogisticRegression()
elif model_type == "svm":
model = sklearn.svm.SVC()
m = sklearn.svm.SVC()
else:
raise ValueError("invalid model selection, choose either 'ols' or 'svm'")
model.fit(training_features, training_target)
return model
m.fit(training_features, training_target)
return m


def predictions(model, test_features):
"""
Make some predictions foo the test data
"""
return model.predict(test_features)


def classification_metrics(predictions, test_target):
"""
Show some standard classification metrics
"""
return sklearn.metrics.classification_report(test_target, predictions)


def plot_confusion_matrix(
cm, target_names, title="Confusion matrix", cmap=plt.cm.Blues
):
"""
Plots a confusion matrix using matplotlib.

This is just a regular function that is not in the composer.
Shamelessly taken from https://scikit-learn.org/0.15/auto_examples/model_selection/plot_confusion_matrix.html
"""

def plot_confusion_matrix(cm, target_names, title="Confusion matrix", cmap=plt.cm.Blues):
plt.imshow(cm, interpolation="nearest", cmap=cmap)
plt.title(title)
plt.colorbar()
Expand All @@ -134,19 +92,14 @@ def plot_confusion_matrix(


def confusion_matrix(predictions, test_target):
"""
Show the confusion matrix
"""
cm = sklearn.metrics.confusion_matrix(test_target, predictions)
return plot_confusion_matrix(cm, ["setosa", "versicolor", "virginica"])


f = (
Composer()
.update_parameters(
# Parameter controlling the model type (ols, svc)
model_type="ols",
# Parameter enabling data preprocessing
do_preprocess=True,
)
.update(
Expand Down
7 changes: 7 additions & 0 deletions fn_graph/examples/solution/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
venv/
__pycache__/
*.pyc
artifacts/
logs/
mnt/
.env
3 changes: 3 additions & 0 deletions fn_graph/examples/solution/artifact_store/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .base import BaseArtifactStore
from .fs import LocalFSArtifactStore
from .s3 import S3ArtifactStore
14 changes: 14 additions & 0 deletions fn_graph/examples/solution/artifact_store/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from typing import Any

class BaseArtifactStore(ABC):
@abstractmethod
def put(self, key: str, value: Any) -> None: ...
@abstractmethod
def get(self, key: str) -> Any: ...
@abstractmethod
def exists(self, key: str) -> bool: ...
@abstractmethod
def delete(self, key: str) -> None: ...
@abstractmethod
def metadata(self, key: str) -> dict: ...
48 changes: 48 additions & 0 deletions fn_graph/examples/solution/artifact_store/fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
from pathlib import Path
from typing import Any

import cloudpickle

from .base import BaseArtifactStore


class LocalFSArtifactStore(BaseArtifactStore):
def __init__(self, base_dir: str, run_id: str):
self.base_dir = Path(base_dir)
self.run_id = run_id
self._run_dir = self.base_dir / run_id
self._run_dir.mkdir(parents=True, exist_ok=True)

def _path(self, key: str) -> Path:
return self._run_dir / f"{key}.pkl"

def put(self, key: str, value: Any) -> None:
path = self._path(key)
print(f"[LocalFSArtifactStore] writing {key} to {path}", flush=True)
tmp_path = path.with_suffix(".tmp")
data = cloudpickle.dumps(value, protocol=4)
tmp_path.write_bytes(data)
os.replace(tmp_path, path)
print(f"[LocalFSArtifactStore] {key} written, size: {len(data)} bytes", flush=True)

def get(self, key: str) -> Any:
path = self._path(key)
print(f"[LocalFSArtifactStore] loading {key} from {path}", flush=True)
result = cloudpickle.loads(path.read_bytes())
print(f"[LocalFSArtifactStore] {key} loaded, type: {type(result).__name__}", flush=True)
return result

def exists(self, key: str) -> bool:
result = self._path(key).exists()
print(f"[LocalFSArtifactStore] exists({key}): {result}", flush=True)
return result

def delete(self, key: str) -> None:
path = self._path(key)
if path.exists():
path.unlink()

def metadata(self, key: str) -> dict:
stat = os.stat(self._path(key))
return {"size": stat.st_size, "mtime": stat.st_mtime}
Loading