Skip to content

Liam0205/pineapple

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

220 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Pineapple

高性能 DAG 流水线引擎。Python 声明,Go 执行,JSON 解耦。

算子只需声明输入/输出字段,引擎自动推导依赖、构建 DAG、并行调度——你专注业务逻辑,Pineapple 负责把它跑快。

适用于任何需要多步骤数据处理流水线的场景:搜索/推荐/广告排序、特征工程、实时数据加工、规则引擎、ML 推理前后处理,等等。

⚠️ Pre-1.0 阶段:Pineapple 尚处于 pre-1.0 开发阶段,API 和行为语义可能在版本间发生不兼容变更,不保证向后兼容。生产环境使用请锁定具体版本。

Breaking Change (v0.5.6)if_ / elseif_ 条件中的字段引用必须使用 {{field_name}} 模板语法。旧写法 flow.if_("item_count > 0") 需改为 flow.if_("{{item_count}} > 0")。字段名用双花括号标记,其余部分为原生 Lua 表达式。

架构概览

名称 组件 语言 职责
Pine 执行引擎 Go 解析配置、构建 DAG、并行调度算子
Apple DSL 引擎 Python 声明式描述业务逻辑,编译输出 JSON 配置
Pineapple 完整系统 Go + Python 二者协同,通过 JSON 配置解耦
Python DSL  ──(compile)──>  JSON 配置文件
                                │
                                v
                 Go 引擎解析 JSON,推导算子依赖
                                │
                                v
                  构建 DAG,拓扑排序,并行执行

工程团队用 Go 开发高性能算子;业务团队用 Python DSL 编排逻辑。两侧通过 JSON 配置彻底解耦——业务迭代不需要重编译 Go 代码,Go 服务自动热加载配置变更。

核心优势

数据驱动的隐式构图 — 算子只需声明输入/输出字段,引擎自动推导 RAW/WAW/WAR 依赖关系并构建 DAG,无需手动连线。构建阶段自动执行传递性归约,移除冗余边,调度器只保留保持可达性的最小边集。

无锁并行调度 — DAG 中无依赖的算子自动并行执行,充分利用多核。

编译期校验 — Python 编译器在部署前检测死代码、字段缺失、写后未读等问题,将错误拦截在开发阶段。

Lua 嵌入扩展 — 内置 Lua 算子支持轻量级的自定义计算和条件分支,无需新增 Go 代码即可实现灵活逻辑。简单逻辑下 Lua 仅比 Go 原生慢约 1.3x,详见 Lua vs Go 性能对比

白盒可观测 — 引擎内部始终记录算子级别的执行 trace(耗时、跳过状态)。请求方通过 common._return_trace = true 控制是否在响应中返回 trace;默认不返回。trace 仅包含实际执行或跳过的算子,DAG 中止时未开始执行的算子不会出现。通过可插拔的 pkg/metrics.Provider 接口支持 Prometheus 等外部监控系统接入,内置 /stats 端点提供零配置基线。支持全局 debug 开关(Flow(debug=True)pine.WithDebug(true)),开启后所有算子 trace 附带输入/输出快照,可直观追踪每步的 item 数量变化。也可对单个算子开启 debug=True 做定点排查。

动态资源管理pkg/resource 提供后台定时刷新的内存资源管理器,无锁读、刷新失败保留旧值。资源通过 ResourceSchema 注册,codegen 自动生成 Python 类型类,DSL 声明后编译到统一 JSON 配置。

配置热加载 — 服务运行时监控配置文件变更,自动无停机重载引擎和资源配置,业务迭代立即生效。

HTTP Middlewareserver.Config.Middlewares 接受标准 func(http.Handler) http.Handler 切片,按切片顺序从外到内包装。用于注入访问日志、认证、限流等横切逻辑,同时保留 config hot-reload 和 graceful shutdown 等内置能力。

行存/列存可切换 — DataFrame 支持行存(RowFrame)和列存(ColumnFrame)两种存储模式,通过 JSON 配置的 "storage_mode": "row"|"column" 选择。列存在大规模 item 场景下减少 GC 压力和对象分配。两种实现均通过内部 sync.RWMutex 保证并发安全,调度器无需持有外部锁。

全局日志前缀 — 通过 Apple DSL 的 Flow(log_prefix="[svc] ") 或 Go 侧的 pine.WithLogPrefix("[svc] ") 为所有日志(包括第三方算子)统一添加前缀。Go Option 优先于 JSON 配置。底层使用 log.SetPrefix() 覆盖标准库全局 logger,确保无遗漏。

文档自动生成 — 算子和资源的 Type、Description、参数描述在注册时强制填写,codegen 自动生成 Python 类型绑定和 Markdown 文档,保证代码与文档永远同步。

Schema 即约束Register() 强制校验算子元信息完整性,缺少 Type、Description 或参数描述则启动时直接 panic,从源头杜绝文档缺失。

Quick Start

环境要求

  • Go 1.22+
  • Python 3.10+

1. 克隆项目

git clone https://github.com/Liam0205/pineapple.git
cd pineapple
go mod download

2. 编写 Python Pipeline

创建 demo.py(所有算子方法返回 Flow 自身,支持链式调用 flow.recall_static(...).transform_by_lua(...).reorder_sort(...)):

from apple.flow import Flow

flow = Flow(
    name="demo",
    common_input=["user_age"],
    item_output=["item_id", "item_final_price"],
)

# 召回:静态候选集
flow.recall_static(
    item_output=["item_id", "item_price"],
    items=[
        {"item_id": "a", "item_price": 100.0},
        {"item_id": "b", "item_price": 200.0},
        {"item_id": "c", "item_price": 50.0},
    ],
)

# 特征计算:用 Lua 根据用户年龄打折
flow.transform_by_lua(
    common_input=["user_age"],
    item_input=["item_price"],
    item_output=["item_final_price"],
    lua_script="""
function discount()
  if user_age < 18 then
    return item_price * 0.8
  else
    return item_price
  end
end
""",
    function_for_item="discount",
)

# 排序:按最终价格降序
flow.reorder_sort(
    item_input=["item_final_price"],
    field="item_final_price",
    order="desc",
)

# 编译输出 JSON 配置
with open("pipeline.json", "w") as f:
    f.write(flow.compile())

print("pipeline.json generated")

3. 生成配置

python3 demo.py

4. 启动服务

go run ./cmd/pineapple-server -config pipeline.json -addr :8080

5. 发送请求

curl -s -X POST http://localhost:8080/execute \
  -H "Content-Type: application/json" \
  -d '{
    "common": {"user_age": 16},
    "items": []
  }' | python3 -m json.tool

预期返回(16 岁用户享受 8 折,结果仅含 item_output 声明的字段):

{
  "common": {"user_age": 16},
  "items": [
    {"item_id": "b", "item_final_price": 160.0},
    {"item_id": "a", "item_final_price": 80.0},
    {"item_id": "c", "item_final_price": 40.0}
  ],
  "trace": [...]
}

6. 迭代

修改 demo.py 后重新运行 python3 demo.py,服务自动热加载新配置,无需重启。

算子开发指南(工程视角)

Operator 接口

每个算子实现两个方法:

type Operator interface {
    Init(params map[string]any) error
    Execute(ctx context.Context, input *OperatorInput, output *OperatorOutput) error
}
  • Init:接收业务参数,做一次性初始化
  • Execute:每次请求调用,从 input 读数据、向 output 写数据

注册算子

init() 中调用 pine.Register(),所有元信息字段必填

package myop

import (
    "context"
    pine "github.com/Liam0205/pineapple"
)

func init() {
    pine.Register(pine.OperatorSchema{
        Name:        "transform_my_custom",
        Type:        pine.OpTypeTransform,
        Description: "Computes a custom feature for each item.",
        Params: map[string]pine.ParamSpec{
            "field":  {Type: "string", Required: true, Description: "Input field name."},
            "factor": {Type: "float64", Required: false, Default: 1.0, Description: "Scaling factor."},
        },
    }, func() pine.Operator {
        return &MyCustomOp{}
    })
}

type MyCustomOp struct {
    field  string
    factor float64
}

func (o *MyCustomOp) Init(params map[string]any) error {
    o.field = params["field"].(string)
    o.factor = params["factor"].(float64)
    return nil
}

func (o *MyCustomOp) Execute(_ context.Context, in *pine.OperatorInput, out *pine.OperatorOutput) error {
    for i := 0; i < in.ItemCount(); i++ {
        val := in.Item(i, o.field).(float64)
        out.SetItem(i, o.field+"_scaled", val*o.factor)
    }
    return nil
}

Schema 字段说明

字段 类型 必填 说明
Name string Yes 算子唯一标识,蛇形命名,前缀体现类型(recall_/transform_/filter_/merge_/reorder_/observe_
Type OperatorType Yes 类型(Recall / Transform / Filter / Merge / Reorder / Observe)
Description string Yes 一句话功能描述
Params[k].Type string Yes "string" / "int64" / "float64" / "bool" / "any"
Params[k].Required bool Yes 是否必填
Params[k].Default any No 可选参数的默认值
Params[k].Description string Yes 参数描述

缺少 TypeDescription 或任一参数的 Description 将导致启动 panic。

注释中的 Metadata Contract(可选)

在源文件顶部添加 Metadata contract 注释段,codegen 会将其解析到文档中:

// Operator: transform_my_custom
// Type: Transform
// ...
//
// Metadata contract (typical usage):
//   CommonInput:  []
//   CommonOutput: []
//   ItemInput:    [<field>]
//   ItemOutput:   [<field>_scaled]
package myop

生成代码和文档

# 生成 Python DSL 绑定 + 算子文档
go run ./cmd/pineapple-codegen \
  -output apple_generated \
  -doc-dir doc/operators \
  -operators-dir operators

这将自动生成:

  • apple_generated/operators.py — 带类型提示的 Python 算子类
  • apple_generated/__init__.py — 算子导出列表
  • apple_generated/resources.py — 带类型提示的 Python 资源类(若有注册资源)
  • apple_generated/resources_init.py — 资源导出列表(若有注册资源)
  • doc/operators/<name>.md — 每个算子的文档
  • doc/operators/README.md — 按分类索引

测试

# 单个算子包
go test ./operators/transform/...

# 全量测试
go test ./...

# 覆盖率报告
go test -coverprofile=coverage.out -covermode=atomic ./...
go tool cover -func=coverage.out

# Python 测试(含覆盖率)
python3 -m pytest apple/tests/ -v --cov=apple --cov-report=term

Redis 算子测试使用 miniredis 提供内存级 Redis 服务,无需外部 Redis 实例。HTTP handler 测试使用标准库 net/http/httptest

Lint

# Go
golangci-lint run ./...

# Python
ruff check apple/

Fuzz 测试

# JSON 配置解析
go test -run=^$ -fuzz=FuzzLoad -fuzztime=30s -parallel=4 ./internal/config/

# DAG 构建
go test -run=^$ -fuzz=FuzzBuild -fuzztime=30s -parallel=4 ./internal/dag/

# DataFrame 行存/列存语义一致性
go test -run=^$ -fuzz=FuzzApplyOutputStorageEquivalence -fuzztime=30s -parallel=4 ./internal/dataframe/

# data_parallel 与单 shard 语义一致性
go test -run=^$ -fuzz=FuzzDataParallelEquivalence -fuzztime=30s -parallel=4 ./internal/runtime/

并发压力测试

# 默认测试包含轻量 HTTP 并发覆盖;高压测试需显式开启
PINEAPPLE_STRESS=1 GOMAXPROCS=$(nproc) go test -race -run TestServerHighConcurrencyStress -count=1 -timeout=10m ./pkg/server/

# HTTP 吞吐 benchmark:可调复杂 DAG 的深度、宽度、fan-in、算子 CPU 强度、items、workers、reload
GOMAXPROCS=$(nproc) go test -run=^$ -bench=BenchmarkHTTPServerComplexDAGThroughput -benchmem -benchtime=5s ./pkg/server \
  -args -pineapple.bench.depth=8 -pineapple.bench.width=32 -pineapple.bench.fanin=4 \
  -pineapple.bench.work=100000 -pineapple.bench.items=0 -pineapple.bench.workers=256 \
  -pineapple.bench.reload=true

动态资源管理

资源(特征索引、AB 配置等需要定时刷新的数据)与算子对称,走 Go Schema → codegen Python 类 → DSL 声明 → 编译到统一 JSON 的全流程。

注册资源 Schema

init() 中调用 pine.RegisterResource()

package myresource

import (
    pine "github.com/Liam0205/pineapple"
    "github.com/Liam0205/pineapple/pkg/resource"
)

func init() {
    pine.RegisterResource(pine.ResourceSchema{
        Name:            "feature_index",
        Description:     "User feature lookup table.",
        DefaultInterval: 600,  // 刷新间隔(秒),0 → 默认 10min
        Params: map[string]pine.ParamSpec{
            "dsn": {Type: "string", Required: true, Description: "Database DSN."},
        },
    }, func(params map[string]any) (resource.Fetcher, error) {
        dsn := params["dsn"].(string)
        return newFeatureIndexFetcher(dsn), nil
    })
}

ResourceSchema 字段说明

字段 类型 必填 说明
Name string Yes 资源唯一标识,蛇形命名
Description string Yes 一句话功能描述
DefaultInterval int No 刷新间隔(秒),0 → 默认 10min
Params[k] ParamSpec 与算子共用 ParamSpec(Type、Required、Default、Description)

DSL 中声明资源

codegen 会生成带类型的 Python 资源类。在 pipeline 文件中:

from apple_generated.resources import FeatureIndexResource

flow.resource("my_index", FeatureIndexResource(dsn="host:3306/db"))

编译后,统一 JSON 配置包含 resource_config

{
  "pipeline_config": { ... },
  "pipeline_group": { ... },
  "flow_contract": { ... },
  "resource_config": {
    "my_index": {
      "type": "feature_index",
      "interval": 600,
      "params": {"dsn": "host:3306/db"}
    }
  }
}

算子中读取资源

import "github.com/Liam0205/pineapple/pkg/resource"

func (o *MyOp) Execute(ctx context.Context, in *pine.OperatorInput, out *pine.OperatorOutput) error {
    rp := resource.FromContext(ctx)
    if rp == nil {
        return nil // 未注入,降级处理
    }
    idx, ok := rp.Get("my_index")
    if !ok {
        return nil // 资源未就绪,降级
    }
    table := idx.(*FeatureTable)
    // 使用 table ...
    return nil
}

详见 动态资源管理设计文档

Pipeline 编写指南(算法视角)

基本用法

from apple.flow import Flow

flow = Flow(
    name="my_pipeline",
    common_input=["user_id", "user_age"],   # 请求级上下文字段
    item_output=["item_id", "item_score"],  # 最终输出字段
)

链式调用算子

# 召回候选集
flow.recall_static(
    item_output=["item_id", "item_score"],
    items=[...],
)

# 过滤
flow.filter_condition(
    item_input=["item_status"],
    field="item_status",
    value="offline",
)

# 特征处理
flow.transform_normalize(
    item_input=["item_score"],
    item_output=["item_score_norm"],
    field="item_score",
)

# 截断
flow.filter_truncate(
    top_n=50,
)

# 排序
flow.reorder_sort(
    item_input=["item_score_norm"],
    field="item_score_norm",
    order="desc",
)

条件分支

flow.if_("{{is_new_user}}") \
    .transform_dispatch(
        common_input=["default_score"],
        item_output=["item_score"],
        common_field="default_score",
        item_field="item_score",
    ) \
.else_() \
    .transform_by_lua(
        common_input=["user_id"],
        item_input=["item_id"],
        item_output=["item_score"],
        lua_script="...",
        function_for_item="score",
    ) \
.end_if_()

SubFlow 复用

from apple.flow import Flow, SubFlow

normalize_sub = SubFlow(name="normalize")
normalize_sub.transform_normalize(
    item_input=["raw_score"],
    item_output=["norm_score"],
    field="raw_score",
)

flow = Flow(
    name="main",
    common_input=["user_id"],
    item_output=["item_id", "norm_score"],
    sub_flows=[normalize_sub],
)
flow.recall_static(...)
# normalize_sub 的算子会被展开到 flow 中

资源声明

当算子依赖外部数据时,在 pipeline 中声明资源。codegen 会为每种资源生成带类型的 Python 类:

from apple_generated.resources import FeatureIndexResource

flow = Flow(name="my_pipeline", ...)

# 声明资源
flow.resource("my_index", FeatureIndexResource(dsn="host:3306/db"))

# 算子引用资源(通过 resource_name 参数)
flow.recall_feature_index(
    resource_name="my_index",
    item_output=["item_id", "score"],
)

编译器会校验所有 resource_name 引用是否有匹配的资源声明,未声明即报错。

Metadata 声明

每个算子调用需要声明它读写的字段:

参数 含义
common_input 读取的请求级字段
common_output 写入的请求级字段
item_input 读取的物品级字段
item_output 写入的物品级字段
item_defaults 物品级字段默认值
common_defaults 请求级字段默认值
sources 合并算子的数据来源
debug 启用此算子的调试快照
data_parallel 数据并行分片数(仅 Transform,需空 common_output)

Recall 身份由算子名前缀 (recall_) 自动推导,无需手动声明。

编译和校验

# 编译为 JSON 字符串
json_str = flow.compile()

# 编译为 dict(不写文件)
config = flow.compile_dict()

编译器自动执行以下校验:

  • 字段覆盖 — 算子声明读取的字段必须有上游产出
  • 死代码检测 — 产出字段未被下游消费的算子会被标记(observe 类算子豁免)
  • 写后覆写 — 检测同一字段被多次写入
  • 控制流完整性if_ 必须有对应的 end_if_
  • 空分支检测 — 控制块的每个分支下必须有至少一个业务算子,空分支(如 if_("{{cond}}").end_if_())会报错
  • 数据并行约束data_parallel > 1 时必须是 Transform 算子且 common_output 为空
  • 参数-元数据一致性transform_resource_lookuplookup_key 必须出现在 item_inputoutput_field 必须出现在 item_output,防止业务参数与元数据声明不匹配导致运行时静默错误
  • 控制算子可辨识命名 — 条件分支编译后的控制算子使用 if_1elseif_2else_3 等显式名称,在 DAG 可视化中可直观辨识

API 参考

POST /execute

执行 pipeline。

请求体:

{
  "common": {"user_id": "123", "user_age": 25},
  "items": []
}

响应体:

{
  "common": {"user_id": "123", "user_age": 25},
  "items": [
    {"item_id": "a", "item_score": 0.95}
  ],
  "warnings": [],
  "trace": [
    {
      "name": "recall_static_ABA9A7",
      "duration_ms": 0.123,
      "skipped": false
    }
  ]
}

GET /health

健康检查。返回 {"status": "ok"}

GET /stats

引擎运行统计。返回复合 JSON 结构,包含四个维度:

{
  "operators": {"<operator_name>": {"exec_count": 100, "skip_count": 0, ...}},
  "scheduler": {"run_count": 100, "peak_concurrency": 4},
  "server": {"reload_count": 3, "reload_error_count": 0, "last_reload_duration_ns": 5234000},
  "operator_detail": {"<operator_name>": {"borrow_count": 100, ...}}
}
  • operators:per-operator 累计统计
  • scheduler:调度器级统计(运行次数、峰值并发)
  • server:配置热重载统计
  • operator_detail:实现 StatsProvider 接口的算子自定义统计(如 Lua pool 指标)

GET /dag

返回编译后的 DAG 结构可视化。通过查询参数选择输出格式和渲染模式:

参数 说明
format dot(默认) Graphviz DOT 格式,可通过 dot -Tsvg 渲染
format mermaid Mermaid flowchart 格式,可嵌入 Markdown
collapse subflow 将同一 SubFlow 内的算子折叠为单个聚合节点

节点按算子类型着色(Recall 绿、Transform 蓝、Filter 橙、Merge 紫、Reorder 黄、Observe 灰),标签包含算子名。

# 获取 DOT 格式
curl http://localhost:8080/dag

# 渲染为 SVG
curl -s http://localhost:8080/dag | dot -Tsvg -o dag.svg

# 获取 Mermaid 格式
curl http://localhost:8080/dag?format=mermaid

# SubFlow 折叠渲染
curl http://localhost:8080/dag?format=dot&collapse=subflow

项目结构

pineapple/
├── apple/                  # Python DSL (Apple)
│   ├── base.py             #   算子基类
│   ├── resource.py         #   资源基类 + ResourceDecl
│   ├── flow.py             #   Flow/SubFlow 声明(含资源声明)
│   ├── compiler.py         #   编译器:DSL -> JSON(含 resource_config)
│   ├── validator.py        #   静态校验器
│   ├── control.py          #   控制流 (if/else) 支持
│   ├── generated/          #   自动生成的 Python 绑定
│   └── tests/              #   Python 测试
├── cmd/
│   ├── pineapple-server/   # HTTP 服务入口
│   └── pineapple-codegen/  # 代码 & 文档生成工具
├── pkg/
│   ├── metrics/            # 可插拔指标接口 (Counter/Gauge/Histogram + Nop)
│   ├── resource/           # 动态资源管理 (ResourceManager)
│   ├── server/             # 可复用 HTTP 服务库
│   └── codegen/            # 可复用代码生成库
├── design_doc/             # 设计文档 (01-12)
├── doc/
│   ├── operators/          # 自动生成的算子文档
│   └── reports/            # 测试 & 性能报告
├── internal/               # Go 内部包
│   ├── config/             #   JSON 配置解析
│   ├── dag/                #   DAG 构建与拓扑排序
│   ├── dataframe/          #   DataFrame 实现
│   ├── registry/           #   算子注册表
│   ├── runtime/            #   调度器、trace、stats
│   └── types/              #   核心类型定义
├── operators/              # 内置算子实现
│   ├── transform/          #   transform_dispatch, transform_normalize, transform_resource_lookup, transform_by_remote_pineapple
│   ├── filter/             #   filter_condition, filter_truncate
│   ├── lua/                #   transform_by_lua (Lua 嵌入)
│   ├── merge/              #   merge_dedup
│   ├── observe/            #   observe_log
│   ├── recall/             #   recall_static, recall_resource
│   └── reorder/            #   reorder_sort
├── integration/            # 集成测试
├── benchmarks/             # 性能基准测试
└── testdata/               # 测试用 JSON 配置

文档链接

可观测性

内置统计

GET /stats 端点始终可用,基于 atomic 计数器,零外部依赖。覆盖调度器执行、算子耗时、Lua pool 使用、配置热重载四个维度。

Prometheus 接入

Pineapple 通过 pkg/metrics.Provider 接口支持外部指标导出,核心库不依赖 prometheus/client_golang。接入方在自己的项目中实现 Provider(约 80 行),通过 pine.WithMetrics()server.Config.Metrics 注入即可。

// 在 server wrapper 中注入 Prometheus provider
mp := promadapter.New(prometheus.DefaultRegisterer)
server.Run(server.Config{
    ConfigPath: *configPath,
    Addr:       *addr,
    Metrics:    mp,
})

详见 可观测性设计文档

自定义算子指标

算子可选实现 StatsProvider 接口暴露自定义统计到 /statsoperator_detail 字段,或实现 MetricsAware 接口向 Provider 注册自定义 Prometheus 指标。

第三方扩展

第三方项目可以在不修改 pineapple 源码的前提下添加自定义算子和资源。核心思路:写自己的算子/资源包,通过 blank import 注册到全局 registry,然后用 pkg/serverpkg/codegen 构建自己的服务和 Python 绑定。

my-project/
├── go.mod                    # require github.com/Liam0205/pineapple
├── operators/
│   └── my_scorer/
│       └── scorer.go         # init() { pine.Register(schema, factory) }
├── resources/
│   └── feature_index/
│       └── feature_index.go  # init() { pine.RegisterResource(schema, factory) }
├── cmd/
│   ├── my-server/
│   │   └── main.go           # import 算子 + 资源 → server.Run()
│   └── my-codegen/
│       └── main.go           # import 算子 + 资源 → codegen.Run()
├── apple_generated/          # codegen 产出(算子 + 资源的 Python 绑定)
└── pipelines/
    └── my_pipeline.py        # DSL 声明算子 + 资源 → 编译统一 JSON

Server wrapper 示例

package main

import (
    "flag"
    "log"

    _ "github.com/Liam0205/pineapple/operators" // 内置算子
    _ "my-project/operators/my_scorer"            // 自定义算子
    _ "my-project/resources/feature_index"        // 自定义资源
    "github.com/Liam0205/pineapple/pkg/server"
)

func main() {
    configPath := flag.String("config", "", "Unified JSON config (pipeline + resources)")
    addr := flag.String("addr", ":8080", "Listen address")
    flag.Parse()
    if err := server.Run(server.Config{
        ConfigPath: *configPath,
        Addr:       *addr,
    }); err != nil {
        log.Fatal(err)
    }
}

Codegen wrapper 示例

package main

import (
    "flag"
    "fmt"
    "os"

    _ "github.com/Liam0205/pineapple/operators"
    _ "my-project/operators/my_scorer"
    _ "my-project/resources/feature_index"
    "github.com/Liam0205/pineapple/pkg/codegen"
)

func main() {
    output := flag.String("output", "apple_generated", "Python output dir")
    docDir := flag.String("doc-dir", "", "Doc output dir")
    opsDir := flag.String("operators-dir", "operators", "Go operators source")
    flag.Parse()
    if err := codegen.Run(codegen.Config{OutputDir: *output, DocDir: *docDir, OpsDir: *opsDir}); err != nil {
        fmt.Fprintf(os.Stderr, "codegen: %v\n", err)
        os.Exit(1)
    }
}

详见 发布与第三方扩展设计文档

About

High-performance DAG pipeline engine — declare in Python, execute in Go, decouple with JSON.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors