Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ CAPTCHA_ENABLED=false
# Default: 120 seconds (2 minutes)
CAPTCHA_TIMEOUT_SECONDS=120

# Enable duplicate message spam detection (true/false)
# Detects and restricts users who paste the same message repeatedly
DUPLICATE_SPAM_ENABLED=true

# Time window in seconds for detecting duplicate messages
DUPLICATE_SPAM_WINDOW_SECONDS=120

# Number of similar messages within the window before restricting
DUPLICATE_SPAM_THRESHOLD=2

# Minimum normalized text length to consider (avoids false positives on short messages)
DUPLICATE_SPAM_MIN_LENGTH=20

# Similarity threshold (0.0-1.0) for matching duplicate messages
# 0.95 catches minor edits, 0.97 only near-exact copies, 0.90 is more aggressive
DUPLICATE_SPAM_SIMILARITY=0.95

# Path to groups.json for multi-group support (optional)
# If this file exists, per-group settings are loaded from it instead of the
# GROUP_ID/WARNING_TOPIC_ID/etc. fields above. See groups.json.example.
Expand Down
14 changes: 12 additions & 2 deletions groups.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
"captcha_timeout_seconds": 120,
"new_user_probation_hours": 72,
"new_user_violation_threshold": 3,
"rules_link": "https://t.me/pythonID/290029/321799"
"rules_link": "https://t.me/pythonID/290029/321799",
"duplicate_spam_enabled": true,
"duplicate_spam_window_seconds": 120,
"duplicate_spam_threshold": 2,
"duplicate_spam_min_length": 20,
"duplicate_spam_similarity": 0.95
},
{
"group_id": -1009876543210,
Expand All @@ -21,6 +26,11 @@
"captcha_timeout_seconds": 180,
"new_user_probation_hours": 168,
"new_user_violation_threshold": 2,
"rules_link": "https://t.me/mygroup/rules"
"rules_link": "https://t.me/mygroup/rules",
"duplicate_spam_enabled": true,
"duplicate_spam_window_seconds": 60,
"duplicate_spam_threshold": 2,
"duplicate_spam_min_length": 20,
"duplicate_spam_similarity": 0.90
}
]
5 changes: 5 additions & 0 deletions src/bot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class Settings(BaseSettings):
captcha_timeout_seconds: int = 120
new_user_probation_hours: int = 72 # 3 days default
new_user_violation_threshold: int = 3 # restrict after this many violations
duplicate_spam_enabled: bool = True
duplicate_spam_window_seconds: int = 120
duplicate_spam_threshold: int = 2
duplicate_spam_min_length: int = 20
duplicate_spam_similarity: float = 0.95
groups_config_path: str = "groups.json"
logfire_token: str | None = None
logfire_service_name: str = "pythonid-bot"
Expand Down
15 changes: 15 additions & 0 deletions src/bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,21 @@ def format_hours_display(hours: int) -> str:
"📌 [Peraturan Grup]({rules_link})"
)

# Duplicate message spam notification
DUPLICATE_SPAM_RESTRICTION = (
"🚫 *Spam Pesan Duplikat*\n\n"
"{user_mention} telah dibatasi karena mengirim pesan yang sama "
"sebanyak {count} kali dalam waktu singkat.\n\n"
"📌 [Peraturan Grup]({rules_link})"
)

DUPLICATE_SPAM_RESTRICTION_NO_RESTRICT = (
"🚫 *Spam Pesan Duplikat*\n\n"
"Pesan duplikat dari {user_mention} telah dihapus "
"({count} pesan yang sama dalam waktu singkat).\n\n"
"📌 [Peraturan Grup]({rules_link})"
)

# Whitelisted URL domains for new user probation
# These domains are allowed even during probation period
# Matches exact domain or subdomains (e.g., "github.com" matches "www.github.com")
Expand Down
10 changes: 10 additions & 0 deletions src/bot/group_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class GroupConfig(BaseModel):
new_user_probation_hours: int = 72
new_user_violation_threshold: int = 3
rules_link: str = "https://t.me/pythonID/290029/321799"
duplicate_spam_enabled: bool = True
duplicate_spam_window_seconds: int = 120
duplicate_spam_threshold: int = 2
duplicate_spam_min_length: int = 20
duplicate_spam_similarity: float = 0.95

@field_validator("group_id")
@classmethod
Expand Down Expand Up @@ -181,6 +186,11 @@ def build_group_registry(settings: object) -> GroupRegistry:
new_user_probation_hours=settings.new_user_probation_hours,
new_user_violation_threshold=settings.new_user_violation_threshold,
rules_link=settings.rules_link,
duplicate_spam_enabled=settings.duplicate_spam_enabled,
duplicate_spam_window_seconds=settings.duplicate_spam_window_seconds,
duplicate_spam_threshold=settings.duplicate_spam_threshold,
duplicate_spam_min_length=settings.duplicate_spam_min_length,
duplicate_spam_similarity=settings.duplicate_spam_similarity,
)
registry.register(config)

Expand Down
215 changes: 215 additions & 0 deletions src/bot/handlers/duplicate_spam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
"""
Duplicate message spam detection handler.

This module detects users who spam by repeatedly posting the same or
very similar messages within a short time window. When the threshold
is reached, duplicate messages are deleted and the user is restricted.

Uses an in-memory rolling window per (group_id, user_id) to track
recent messages. No database state is needed — restrictions applied
here are NOT reversible via the DM unrestriction flow (no UserWarning
record is created).
"""

import logging
import re
import unicodedata
from collections import deque
from dataclasses import dataclass
from datetime import UTC, datetime
from difflib import SequenceMatcher

from telegram import Update
from telegram.ext import ApplicationHandlerStop, ContextTypes

from bot.constants import (
DUPLICATE_SPAM_RESTRICTION,
DUPLICATE_SPAM_RESTRICTION_NO_RESTRICT,
RESTRICTED_PERMISSIONS,
)
from bot.group_config import GroupConfig, get_group_config_for_update
from bot.services.telegram_utils import get_user_mention

logger = logging.getLogger(__name__)

RECENT_MESSAGES_KEY = "duplicate_spam_recent"


@dataclass
class RecentMessage:
"""A recent message entry for duplicate detection."""

timestamp: datetime
normalized_text: str
message_id: int


def normalize_text(text: str) -> str:
"""
Normalize text for duplicate comparison.

Lowercases, strips whitespace, collapses runs of whitespace,
removes emoji/symbol unicode categories, and strips punctuation.
"""
text = text.lower()
text = unicodedata.normalize("NFKC", text)
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"[^\w\s]", "", text, flags=re.UNICODE)
return text


def is_similar(a: str, b: str, threshold: float = 0.95) -> bool:
"""Check if two normalized texts are similar enough to be considered duplicates."""
if a == b:
return True
return SequenceMatcher(None, a, b).ratio() >= threshold


def _get_recent_messages(
context: ContextTypes.DEFAULT_TYPE, group_id: int, user_id: int
) -> deque[RecentMessage]:
"""Get or create the recent messages deque for a (group, user) pair."""
store: dict[tuple[int, int], deque[RecentMessage]] = context.bot_data.setdefault(
RECENT_MESSAGES_KEY, {}
)
key = (group_id, user_id)
if key not in store:
store[key] = deque()
return store[key]


def _prune_old_messages(
dq: deque[RecentMessage], window_seconds: int, now: datetime
) -> None:
"""Remove messages older than the window from the deque."""
while dq and (now - dq[0].timestamp).total_seconds() > window_seconds:
dq.popleft()


def count_similar_in_window(
dq: deque[RecentMessage], normalized: str, threshold: float = 0.95
) -> int:
"""Count how many messages in the deque are similar to the given text."""
return sum(1 for m in dq if is_similar(normalized, m.normalized_text, threshold))


async def handle_duplicate_spam(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""
Detect and handle duplicate message spam.

Tracks recent messages per (group_id, user_id) in memory. When the
count of similar messages within the time window reaches the threshold,
deletes the message and restricts the user.
"""
if not update.message or not update.message.from_user:
return

group_config = get_group_config_for_update(update)
if group_config is None:
return

if not group_config.duplicate_spam_enabled:
return

user = update.message.from_user
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
return

text = update.message.text or update.message.caption
if not text:
return

normalized = normalize_text(text)
if len(normalized) < group_config.duplicate_spam_min_length:
return

now = datetime.now(UTC)
dq = _get_recent_messages(context, group_config.group_id, user.id)
_prune_old_messages(dq, group_config.duplicate_spam_window_seconds, now)

similar_count = count_similar_in_window(dq, normalized, group_config.duplicate_spam_similarity)

dq.append(
RecentMessage(
timestamp=now,
normalized_text=normalized,
message_id=update.message.message_id,
)
)

if similar_count < group_config.duplicate_spam_threshold - 1:
return

total_count = similar_count + 1
user_mention = get_user_mention(user)

logger.info(
f"Duplicate spam detected: user_id={user.id}, "
f"group_id={group_config.group_id}, count={total_count}"
)

try:
await update.message.delete()
logger.info(f"Deleted duplicate spam from user_id={user.id}")
except Exception:
logger.error(
f"Failed to delete duplicate spam: user_id={user.id}",
exc_info=True,
)

await _enforce_restriction(context, group_config, user, user_mention, total_count)

raise ApplicationHandlerStop


async def _enforce_restriction(
context: ContextTypes.DEFAULT_TYPE,
group_config: GroupConfig,
user: object,
user_mention: str,
count: int,
) -> None:
"""Restrict the user and send notification to warning topic."""
restricted = False
try:
await context.bot.restrict_chat_member(
chat_id=group_config.group_id,
user_id=user.id,
permissions=RESTRICTED_PERMISSIONS,
)
restricted = True
logger.info(f"Restricted user_id={user.id} for duplicate spam")
except Exception:
logger.error(
f"Failed to restrict user for duplicate spam: user_id={user.id}",
exc_info=True,
)

try:
template = (
DUPLICATE_SPAM_RESTRICTION if restricted
else DUPLICATE_SPAM_RESTRICTION_NO_RESTRICT
)
notification_text = template.format(
user_mention=user_mention,
count=count,
rules_link=group_config.rules_link,
)
await context.bot.send_message(
chat_id=group_config.group_id,
message_thread_id=group_config.warning_topic_id,
text=notification_text,
parse_mode="Markdown",
)
logger.info(f"Sent duplicate spam notification for user_id={user.id}")
except Exception:
logger.error(
f"Failed to send duplicate spam notification: user_id={user.id}",
exc_info=True,
)
12 changes: 11 additions & 1 deletion src/bot/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from bot.group_config import get_group_registry, init_group_registry
from bot.handlers import captcha
from bot.handlers.anti_spam import handle_inline_keyboard_spam, handle_new_user_spam
from bot.handlers.duplicate_spam import handle_duplicate_spam
from bot.handlers.dm import handle_dm
from bot.handlers.message import handle_message
from bot.handlers.topic_guard import guard_warning_topic
Expand Down Expand Up @@ -294,7 +295,16 @@ def main() -> None:
)
logger.info("Registered handler: anti_spam_handler (group=0)")

# Handler 10: Group message handler - monitors messages in monitored
# Handler 10: Duplicate message spam handler - detects repeated identical messages
application.add_handler(
MessageHandler(
filters.ChatType.GROUPS & ~filters.COMMAND,
handle_duplicate_spam,
)
)
logger.info("Registered handler: duplicate_spam_handler (group=0)")

# Handler 11: Group message handler - monitors messages in monitored
# groups and warns/restricts users with incomplete profiles
application.add_handler(
MessageHandler(
Expand Down
30 changes: 30 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,36 @@ def test_captcha_timeout_timedelta(self, monkeypatch):

assert settings.captcha_timeout_timedelta == timedelta(seconds=90)

def test_duplicate_spam_defaults(self, monkeypatch):
"""Test that duplicate_spam fields have correct defaults."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "test_token")
monkeypatch.setenv("GROUP_ID", "-100999")
monkeypatch.setenv("WARNING_TOPIC_ID", "1")

settings = Settings(_env_file=None)

assert settings.duplicate_spam_enabled is True
assert settings.duplicate_spam_window_seconds == 120
assert settings.duplicate_spam_threshold == 2
assert settings.duplicate_spam_min_length == 20

def test_duplicate_spam_from_env(self, monkeypatch):
"""Test that duplicate_spam fields are read from environment variables."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "test_token")
monkeypatch.setenv("GROUP_ID", "-100999")
monkeypatch.setenv("WARNING_TOPIC_ID", "1")
monkeypatch.setenv("DUPLICATE_SPAM_ENABLED", "false")
monkeypatch.setenv("DUPLICATE_SPAM_WINDOW_SECONDS", "300")
monkeypatch.setenv("DUPLICATE_SPAM_THRESHOLD", "5")
monkeypatch.setenv("DUPLICATE_SPAM_MIN_LENGTH", "50")

settings = Settings(_env_file=None)

assert settings.duplicate_spam_enabled is False
assert settings.duplicate_spam_window_seconds == 300
assert settings.duplicate_spam_threshold == 5
assert settings.duplicate_spam_min_length == 50


class TestSettingsValidation:
def test_group_id_must_be_negative(self, monkeypatch):
Expand Down
Loading