Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions runtime/datamate-python/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,29 @@ def build_database_url(self):
# 文件存储配置(共享文件系统)
file_storage_path: str = "/data/files"

# ==================== 配比任务并行复制配置 ====================
# 动态并发计算参数(全闪存储高性能场景默认值)

# 并发下限(最少并发数)
ratio_copy_min_concurrent: int = 8

# 并发上限(最多并发数,防止资源耗尽)
ratio_copy_max_concurrent: int = 128

# CPU核心系数(每个核心贡献的并发数,全闪存储建议4.0)
ratio_copy_cpu_factor: float = 4.0

# 每并发任务预估内存占用(MB)
ratio_copy_memory_per_task_mb: int = 32

# 内存安全保留比例(保留给其他进程)
ratio_copy_memory_reserve_ratio: float = 0.2

# 是否启用动态计算(False则使用固定值)
ratio_copy_dynamic_concurrent: bool = True

# 固定并发数(当 dynamic_concurrent=False 时使用)
ratio_copy_fixed_concurrent: int = 10
Comment on lines +87 to +109
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says “Add 8 new configuration parameters”, but this diff adds 7 (ratio_copy_min_concurrent, ratio_copy_max_concurrent, ratio_copy_cpu_factor, ratio_copy_memory_per_task_mb, ratio_copy_memory_reserve_ratio, ratio_copy_dynamic_concurrent, ratio_copy_fixed_concurrent). Please reconcile the PR description with the actual change (or add the missing config if something was intended).

Copilot uses AI. Check for mistakes.

# 全局设置实例
settings = Settings()
157 changes: 129 additions & 28 deletions runtime/datamate-python/app/module/ratio/service/ratio_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Tuple
import random
import json
import os
Expand All @@ -10,6 +10,7 @@
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.logging import get_logger
from app.core.config import settings
from app.db.models.base_entity import LineageNode, LineageEdge
from app.db.models.ratio_task import RatioInstance, RatioRelation
from app.db.models import Dataset, DatasetFiles
Expand All @@ -18,6 +19,7 @@
from app.module.shared.common.lineage import LineageService
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
from app.module.ratio.schema.ratio_task import FilterCondition
from app.module.shared.util.resource_utils import get_concurrent_for_ratio_copy, get_system_resource_info

logger = get_logger(__name__)

Expand Down Expand Up @@ -78,66 +80,55 @@ async def create_task(

@staticmethod
async def execute_dataset_ratio_task(instance_id: str) -> None:
"""Execute a ratio task in background.

Supported ratio_method:
- DATASET: randomly select counts files from each source dataset
- TAG: randomly select counts files matching relation.filter_conditions tags

Steps:
- Mark instance RUNNING
- For each relation: fetch ACTIVE files, optionally filter by tags
- Copy selected files into target dataset
- Update dataset statistics and mark instance SUCCESS/FAILED
"""
async with AsyncSessionLocal() as session: # type: AsyncSession
async with AsyncSessionLocal() as session:
try:
# Load instance and relations
inst_res = await session.execute(select(RatioInstance).where(RatioInstance.id == instance_id))
instance: Optional[RatioInstance] = inst_res.scalar_one_or_none()
if not instance:
logger.error(f"Ratio instance not found: {instance_id}")
return
logger.info(f"start execute ratio task: {instance_id}")

logger.info(f"Starting ratio task {instance_id}")
logger.info(f"System resources: {get_system_resource_info()}")

rel_res = await session.execute(
select(RatioRelation).where(RatioRelation.ratio_instance_id == instance_id)
)
relations: List[RatioRelation] = list(rel_res.scalars().all())

# Mark running
instance.status = TaskStatus.RUNNING.name

# Load target dataset
ds_res = await session.execute(select(Dataset).where(Dataset.id == instance.target_dataset_id))
target_ds: Optional[Dataset] = ds_res.scalar_one_or_none()
if not target_ds:
logger.error(f"Target dataset not found for instance {instance_id}")
instance.status = TaskStatus.FAILED.name
return

added_count, added_size = await RatioTaskService.handle_ratio_relations(relations,session, target_ds)
max_concurrent = get_concurrent_for_ratio_copy(settings)
logger.info(f"Using {max_concurrent} concurrent workers for ratio task {instance_id}")

# Update target dataset statistics
target_ds.file_count = (target_ds.file_count or 0) + added_count # type: ignore
target_ds.size_bytes = (target_ds.size_bytes or 0) + added_size # type: ignore
# If target dataset has files, mark it ACTIVE
if (target_ds.file_count or 0) > 0: # type: ignore
added_count, added_size = await RatioTaskService.handle_ratio_relations_parallel(
relations, session, target_ds, max_concurrent
)

target_ds.file_count = (target_ds.file_count or 0) + added_count
target_ds.size_bytes = (target_ds.size_bytes or 0) + added_size
if (target_ds.file_count or 0) > 0:
target_ds.status = "ACTIVE"

# Done
instance.status = TaskStatus.COMPLETED.name
logger.info(f"Dataset ratio execution completed: instance={instance_id}, files={added_count}, size={added_size}, {instance.status}")
logger.info(f"Ratio task completed: {instance_id}, files={added_count}, size={added_size}")

await RatioTaskService._add_task_to_graph(
session=session,
src_relations=relations,
task=instance,
dst_dataset=target_ds,
)
except Exception as e:
logger.exception(f"Dataset ratio execution failed for {instance_id}: {e}")
logger.exception(f"Ratio task failed for {instance_id}: {e}")
try:
# Try mark failed
inst_res = await session.execute(select(RatioInstance).where(RatioInstance.id == instance_id))
instance = inst_res.scalar_one_or_none()
if instance:
Expand All @@ -147,6 +138,116 @@ async def execute_dataset_ratio_task(instance_id: str) -> None:
finally:
await session.commit()

@staticmethod
async def handle_ratio_relations_parallel(
relations: list[RatioRelation],
session: AsyncSession,
target_ds: Dataset,
max_concurrent: int = 10
) -> Tuple[int, int]:
existing_path_rows = await session.execute(
select(DatasetFiles.file_path).where(DatasetFiles.dataset_id == target_ds.id)
)
existing_paths = set(p for p in existing_path_rows.scalars().all() if p)
source_paths = set()

all_copy_tasks: List[Tuple[DatasetFiles, str, str]] = []

for rel in relations:
if not rel.source_dataset_id or not rel.counts or rel.counts <= 0:
continue

files = await RatioTaskService.get_files(rel, session)
if not files:
continue

pick_n = min(rel.counts or 0, len(files))
chosen = random.sample(files, pick_n) if pick_n < len(files) else files

for file in chosen:
if file.file_path in source_paths:
continue

dst_prefix = f"/dataset/{target_ds.id}/"
file_name = RatioTaskService.get_new_file_name(dst_prefix, existing_paths, file)
new_path = dst_prefix + file_name

file_record = DatasetFiles(
dataset_id=target_ds.id,
file_name=file_name,
file_path=new_path,
file_type=file.file_type,
file_size=file.file_size,
check_sum=file.check_sum,
tags=file.tags,
tags_updated_at=datetime.now(),
dataset_filemetadata=file.dataset_filemetadata,
status="ACTIVE",
)

all_copy_tasks.append((file_record, file.file_path, new_path))
existing_paths.add(new_path)
source_paths.add(file.file_path)

if not all_copy_tasks:
return 0, 0

dst_dir = f"/dataset/{target_ds.id}/"
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)
Comment on lines +171 to +196
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New code hard-codes the dataset path prefix as "/dataset" when constructing dst_prefix/dst_dir. The codebase already has settings.dm_file_path_prefix for this purpose; hard-coding here makes the ratio task ignore that configuration and increases drift if the prefix ever changes. Consider building paths from settings.dm_file_path_prefix (or a shared helper) instead of embedding the literal string in the new parallel implementation.

Copilot uses AI. Check for mistakes.

Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.Semaphore(max_concurrent) will deadlock if max_concurrent is 0 (tasks will wait forever) and may error for negative values. Since max_concurrent comes from config, add validation/clamping (e.g., ensure it’s at least 1) either in get_concurrent_for_ratio_copy/calculate_optimal_concurrent or immediately before constructing the semaphore.

Suggested change
original_max_concurrent = max_concurrent
try:
max_concurrent = max(1, int(max_concurrent))
except (TypeError, ValueError):
max_concurrent = 1
if max_concurrent != original_max_concurrent:
logger.warning(
f"Invalid max_concurrent={original_max_concurrent}; using {max_concurrent} instead"
)

Copilot uses AI. Check for mistakes.
semaphore = asyncio.Semaphore(max_concurrent)
successful_records: List[DatasetFiles] = []
added_count = 0
added_size = 0

async def copy_with_semaphore(
file_record: DatasetFiles,
src_path: str,
dst_path: str
) -> Tuple[bool, DatasetFiles]:
async with semaphore:
try:
file_dst_dir = os.path.dirname(dst_path)
if file_dst_dir != dst_dir:
await asyncio.to_thread(os.makedirs, file_dst_dir, exist_ok=True)

Comment on lines +195 to +213
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dst_dir includes a trailing slash (e.g. "/dataset//"), but os.path.dirname(dst_path) returns "/dataset/". This makes if file_dst_dir != dst_dir always true, causing an extra os.makedirs(...) call for every file copy. Normalize paths (e.g., strip trailing slash or compare Path(dst_dir)/Path(file_dst_dir)) and avoid per-file mkdir when all files share the same target directory.

Copilot uses AI. Check for mistakes.
try:
await asyncio.to_thread(os.link, src_path, dst_path)
except OSError:
try:
await asyncio.to_thread(os.symlink, src_path, dst_path)
except OSError:
await asyncio.to_thread(shutil.copy2, src_path, dst_path)

return True, file_record
except Exception as e:
logger.error(f"Copy failed: {src_path} -> {dst_path}: {e}")
return False, file_record

tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks]
results = await asyncio.gather(*tasks, return_exceptions=True)

for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Copy task {i} raised exception: {result}")
continue
success, file_record = result
if success:
added_count += 1
added_size += int(file_record.file_size or 0)
successful_records.append(file_record)

if successful_records:
session.add_all(successful_records)
await session.flush()

logger.info(
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
f"{added_size} bytes, {len(results) - added_count} failures"
Comment on lines +227 to +246
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks = [copy_with_semaphore(...) for ...] followed by asyncio.gather(*tasks) creates all tasks up front and retains a full results list. For large datasets this can cause high memory usage and event-loop overhead even though actual copy concurrency is limited by the semaphore. Consider processing copies in bounded batches or using a worker-queue pattern (fixed number of worker tasks consuming from an asyncio.Queue) so outstanding tasks are also bounded.

Suggested change
tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Copy task {i} raised exception: {result}")
continue
success, file_record = result
if success:
added_count += 1
added_size += int(file_record.file_size or 0)
successful_records.append(file_record)
if successful_records:
session.add_all(successful_records)
await session.flush()
logger.info(
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
f"{added_size} bytes, {len(results) - added_count} failures"
copy_queue: asyncio.Queue = asyncio.Queue()
for rec, src, dst in all_copy_tasks:
await copy_queue.put((rec, src, dst))
async def worker(worker_index: int) -> None:
nonlocal added_count, added_size
while True:
item = await copy_queue.get()
if item is None:
copy_queue.task_done()
break
file_record, src_path, dst_path = item
try:
success, copied_file_record = await copy_with_semaphore(
file_record, src_path, dst_path
)
if success:
added_count += 1
added_size += int(copied_file_record.file_size or 0)
successful_records.append(copied_file_record)
except Exception as e:
logger.error(f"Copy worker {worker_index} raised exception: {e}")
finally:
copy_queue.task_done()
worker_count = max(1, min(max_concurrent, len(all_copy_tasks)))
workers = [
asyncio.create_task(worker(i))
for i in range(worker_count)
]
await copy_queue.join()
for _ in range(worker_count):
await copy_queue.put(None)
await asyncio.gather(*workers, return_exceptions=True)
if successful_records:
session.add_all(successful_records)
await session.flush()
logger.info(
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
f"{added_size} bytes, {len(all_copy_tasks) - added_count} failures"

Copilot uses AI. Check for mistakes.
)

return added_count, added_size

@staticmethod
async def handle_ratio_relations(relations: list[RatioRelation], session, target_ds: Dataset) -> tuple[int, int]:
# Preload existing target file paths for deduplication
Expand Down
Loading
Loading