Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
/.idea
.env
logs/*.log
.venv/
3 changes: 3 additions & 0 deletions index/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__
.idea
out/
5 changes: 5 additions & 0 deletions index/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions index/.idea/index.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions index/.idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions index/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions index/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions index/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions index/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.11
24 changes: 24 additions & 0 deletions index/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[project]
name = "defeed-index"
version = "0.1.0"
description = "Add your description here"
authors = [
{ name = "Bart", email = "[email protected]" }
]
requires-python = ">=3.11"
dependencies = [
"bertopic>=0.17.3",
"numpy>=2.3.3",
"pandas>=2.3.2",
"scikit-learn>=1.7.2",
"psycopg2-binary>=2.9.9",
"python-dotenv>=1.0.0",
"datamapplot>=0.6.4",
]

[project.scripts]
defeed-index = "defeed_index:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
19 changes: 19 additions & 0 deletions index/src/defeed_index/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .registry import Registry
from .repository import ActivityRepository, ActivityRepositoryConfig
from .types import (
Activity, DecoratedActivity, ActivitySummary,
SearchRequest, SearchResult, SortBy, Period
)

__all__ = [
"Registry",
"ActivityRepository",
"ActivityRepositoryConfig",
"Activity",
"DecoratedActivity",
"ActivitySummary",
"SearchRequest",
"SearchResult",
"SortBy",
"Period",
]
92 changes: 92 additions & 0 deletions index/src/defeed_index/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
from typing import List, Optional, Dict, Any
from bertopic import BERTopic
from bertopic.representation import KeyBERTInspired
from hdbscan import HDBSCAN
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
from datetime import datetime, timedelta

from umap import UMAP

from .repository import ActivityRepository
from .types import DecoratedActivity, SearchRequest, SearchResult


class Registry:
"""
Registry provides an abstraction layer on top of the ActivityRepository
and implements activity analysis using BERTopic for topic modeling.
"""

def __init__(self, repository: ActivityRepository):
self.repository = repository
self.logger = logging.getLogger(__name__)
self.activities: List[DecoratedActivity] = []
self.documents: List[str] = []
self.topics: List[int] = []

self.vectorizer_model = CountVectorizer(
ngram_range=(1, 2),
stop_words="english",
min_df=2, # Require at least 2 documents for a term
max_df=0.95 # Ignore terms that appear in more than 95% of documents
)

self.embedding_model = SentenceTransformer("thenlper/gte-small")
self.umap_model = UMAP(
n_components=5,
min_dist=0.0,
metric='cosine',
random_state=42 # Set for reproducibility
)

self.hdbscan_model = HDBSCAN(
min_cluster_size=5,
metric='euclidean',
cluster_selection_method='eom'
)

self.representation_model = KeyBERTInspired()

self.topic_model = BERTopic(
vectorizer_model=self.vectorizer_model,
embedding_model=self.embedding_model,
representation_model=self.representation_model,
umap_model=self.umap_model,
hdbscan_model=self.hdbscan_model,
verbose=True,
min_topic_size=3, # Minimum 3 activities per topic
nr_topics="auto" # Let BERTopic determine the optimal number of topics
)

def seed(self) -> None:
"""
Load existing activities from the repository and seed the BERTopic index.
"""

self.activities = self.repository.list(
from_date=datetime.now() - timedelta(days=10),
)

if not self.activities:
self.logger.warning("No activities found in database")
return

# Prepare documents for BERTopic
self.documents = []
for activity in self.activities:
# full_summary is formatted in Markdown with predefined sections.
# This will skew the topic modeling results, so use short_summary instead.
self.documents.append(activity.summary.short_summary)

self.logger.info(f"Prepared {len(self.documents)} documents for topic modeling")

embeddings = self.embedding_model.encode(self.documents, show_progress_bar=True)

self.topics, probabilities = self.topic_model.fit_transform(self.documents, embeddings)

self.logger.info(f"BERTopic modeling completed:")
self.logger.info(f" - Found {len(set(self.topics))} topics")
self.logger.info(f" - Processed {len(self.documents)} documents")
134 changes: 134 additions & 0 deletions index/src/defeed_index/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from dataclasses import dataclass
import psycopg2
import psycopg2.extras
from typing import List, Optional, Dict, Any, Union
import logging
import os
from datetime import datetime

from .types import Activity, ActivitySummary, DecoratedActivity, SearchRequest, SearchResult, SortBy, Period

@dataclass
class ActivityRepositoryConfig:
host: str
port: int
database: str
user: str
password: str

class ActivityRepository:
def __init__(self, config: ActivityRepositoryConfig):
self.connection_string = _build_connection_string(config)
self.logger = logging.getLogger(__name__)

def list(
self,
source_ids: Optional[List[str]] = None,
from_date: Optional[datetime] = None,
limit: Optional[int] = None,
) -> List[DecoratedActivity]:
"""
Read all activities from the database.
This is the main function needed for seeding BERTopic.
"""
query = """
SELECT
id,
uid,
source_uid,
source_type,
title,
body,
url,
image_url,
created_at,
short_summary,
full_summary,
raw_json,
embedding
FROM activities
WHERE embedding IS NOT NULL
"""

params = []

if source_ids is not None:
query += " AND source_uid = ANY(%s)"
params.append(source_ids)

if from_date is not None:
query += " AND created_at >= %s"
params.append(from_date)

query += " ORDER BY created_at DESC"

if limit is not None:
query += " LIMIT %s"
params.append(limit)

try:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(query, params)
rows = cur.fetchall()

activities = []
for row in rows:
activities.append(_deserialize_decorated_activity(dict(row)))

self.logger.info(f"Loaded {len(activities)} activities from database")
return activities

except Exception as e:
self.logger.error(f"Failed to get activities: {e}")
raise

def _get_connection(self):
return psycopg2.connect(self.connection_string)


def _build_connection_string(config: ActivityRepositoryConfig) -> str:
conn_str = f"postgresql://{config.user}"
if config.password:
conn_str += f":{config.password}"
conn_str += f"@{config.host}:{config.port}/{config.database}"

return conn_str

def _deserialize_decorated_activity(row: Dict[str, Any]) -> DecoratedActivity:
"""Deserialize a database row into a DecoratedActivity object."""
activity = _deserialize_activity(row)
summary = _deserialize_activity_summary(row)
embedding = None
if row.get('embedding'):
embedding = row['embedding']
similarity = row.get('similarity', 0.0)
return DecoratedActivity(
activity=activity,
summary=summary,
embedding=embedding,
similarity=similarity
)

def _deserialize_activity(row: Dict[str, Any]) -> Activity:
"""Deserialize a database row into an Activity object."""
return Activity(
uid=row['id'],
source_uid=row['source_uid'],
source_type=row['source_type'],
title=row['title'],
body=row['body'],
url=row['url'],
image_url=row['image_url'],
created_at=row['created_at'],
raw_json=row['raw_json']
)

def _deserialize_activity_summary(row: Dict[str, Any]) -> Optional[ActivitySummary]:
"""Deserialize a database row into an ActivitySummary object if data is available."""
if row.get('short_summary') and row.get('full_summary'):
return ActivitySummary(
short_summary=row['short_summary'],
full_summary=row['full_summary']
)
return None
Loading