高性能 DAG 流水线引擎。Python 声明,Go 执行,JSON 解耦。
算子只需声明输入/输出字段,引擎自动推导依赖、构建 DAG、并行调度——你专注业务逻辑,Pineapple 负责把它跑快。
适用于任何需要多步骤数据处理流水线的场景:搜索/推荐/广告排序、特征工程、实时数据加工、规则引擎、ML 推理前后处理,等等。
⚠️ Pre-1.0 阶段:Pineapple 尚处于 pre-1.0 开发阶段,API 和行为语义可能在版本间发生不兼容变更,不保证向后兼容。生产环境使用请锁定具体版本。
Breaking Change (v0.5.6):
if_/elseif_条件中的字段引用必须使用{{field_name}}模板语法。旧写法flow.if_("item_count > 0")需改为flow.if_("{{item_count}} > 0")。字段名用双花括号标记,其余部分为原生 Lua 表达式。
| 名称 | 组件 | 语言 | 职责 |
|---|---|---|---|
| Pine | 执行引擎 | Go | 解析配置、构建 DAG、并行调度算子 |
| Apple | DSL 引擎 | Python | 声明式描述业务逻辑,编译输出 JSON 配置 |
| Pineapple | 完整系统 | Go + Python | 二者协同,通过 JSON 配置解耦 |
Python DSL ──(compile)──> JSON 配置文件
│
v
Go 引擎解析 JSON,推导算子依赖
│
v
构建 DAG,拓扑排序,并行执行
工程团队用 Go 开发高性能算子;业务团队用 Python DSL 编排逻辑。两侧通过 JSON 配置彻底解耦——业务迭代不需要重编译 Go 代码,Go 服务自动热加载配置变更。
数据驱动的隐式构图 — 算子只需声明输入/输出字段,引擎自动推导 RAW/WAW/WAR 依赖关系并构建 DAG,无需手动连线。构建阶段自动执行传递性归约,移除冗余边,调度器只保留保持可达性的最小边集。
无锁并行调度 — DAG 中无依赖的算子自动并行执行,充分利用多核。
编译期校验 — Python 编译器在部署前检测死代码、字段缺失、写后未读等问题,将错误拦截在开发阶段。
Lua 嵌入扩展 — 内置 Lua 算子支持轻量级的自定义计算和条件分支,无需新增 Go 代码即可实现灵活逻辑。简单逻辑下 Lua 仅比 Go 原生慢约 1.3x,详见 Lua vs Go 性能对比。
白盒可观测 — 引擎内部始终记录算子级别的执行 trace(耗时、跳过状态)。请求方通过 common._return_trace = true 控制是否在响应中返回 trace;默认不返回。trace 仅包含实际执行或跳过的算子,DAG 中止时未开始执行的算子不会出现。通过可插拔的 pkg/metrics.Provider 接口支持 Prometheus 等外部监控系统接入,内置 /stats 端点提供零配置基线。支持全局 debug 开关(Flow(debug=True) 或 pine.WithDebug(true)),开启后所有算子 trace 附带输入/输出快照,可直观追踪每步的 item 数量变化。也可对单个算子开启 debug=True 做定点排查。
动态资源管理 — pkg/resource 提供后台定时刷新的内存资源管理器,无锁读、刷新失败保留旧值。资源通过 ResourceSchema 注册,codegen 自动生成 Python 类型类,DSL 声明后编译到统一 JSON 配置。
配置热加载 — 服务运行时监控配置文件变更,自动无停机重载引擎和资源配置,业务迭代立即生效。
HTTP Middleware — server.Config.Middlewares 接受标准 func(http.Handler) http.Handler 切片,按切片顺序从外到内包装。用于注入访问日志、认证、限流等横切逻辑,同时保留 config hot-reload 和 graceful shutdown 等内置能力。
行存/列存可切换 — DataFrame 支持行存(RowFrame)和列存(ColumnFrame)两种存储模式,通过 JSON 配置的 "storage_mode": "row"|"column" 选择。列存在大规模 item 场景下减少 GC 压力和对象分配。两种实现均通过内部 sync.RWMutex 保证并发安全,调度器无需持有外部锁。
全局日志前缀 — 通过 Apple DSL 的 Flow(log_prefix="[svc] ") 或 Go 侧的 pine.WithLogPrefix("[svc] ") 为所有日志(包括第三方算子)统一添加前缀。Go Option 优先于 JSON 配置。底层使用 log.SetPrefix() 覆盖标准库全局 logger,确保无遗漏。
文档自动生成 — 算子和资源的 Type、Description、参数描述在注册时强制填写,codegen 自动生成 Python 类型绑定和 Markdown 文档,保证代码与文档永远同步。
Schema 即约束 — Register() 强制校验算子元信息完整性,缺少 Type、Description 或参数描述则启动时直接 panic,从源头杜绝文档缺失。
- Go 1.22+
- Python 3.10+
git clone https://github.com/Liam0205/pineapple.git
cd pineapple
go mod download创建 demo.py(所有算子方法返回 Flow 自身,支持链式调用 flow.recall_static(...).transform_by_lua(...).reorder_sort(...)):
from apple.flow import Flow
flow = Flow(
name="demo",
common_input=["user_age"],
item_output=["item_id", "item_final_price"],
)
# 召回:静态候选集
flow.recall_static(
item_output=["item_id", "item_price"],
items=[
{"item_id": "a", "item_price": 100.0},
{"item_id": "b", "item_price": 200.0},
{"item_id": "c", "item_price": 50.0},
],
)
# 特征计算:用 Lua 根据用户年龄打折
flow.transform_by_lua(
common_input=["user_age"],
item_input=["item_price"],
item_output=["item_final_price"],
lua_script="""
function discount()
if user_age < 18 then
return item_price * 0.8
else
return item_price
end
end
""",
function_for_item="discount",
)
# 排序:按最终价格降序
flow.reorder_sort(
item_input=["item_final_price"],
field="item_final_price",
order="desc",
)
# 编译输出 JSON 配置
with open("pipeline.json", "w") as f:
f.write(flow.compile())
print("pipeline.json generated")python3 demo.pygo run ./cmd/pineapple-server -config pipeline.json -addr :8080curl -s -X POST http://localhost:8080/execute \
-H "Content-Type: application/json" \
-d '{
"common": {"user_age": 16},
"items": []
}' | python3 -m json.tool预期返回(16 岁用户享受 8 折,结果仅含 item_output 声明的字段):
{
"common": {"user_age": 16},
"items": [
{"item_id": "b", "item_final_price": 160.0},
{"item_id": "a", "item_final_price": 80.0},
{"item_id": "c", "item_final_price": 40.0}
],
"trace": [...]
}修改 demo.py 后重新运行 python3 demo.py,服务自动热加载新配置,无需重启。
每个算子实现两个方法:
type Operator interface {
Init(params map[string]any) error
Execute(ctx context.Context, input *OperatorInput, output *OperatorOutput) error
}Init:接收业务参数,做一次性初始化Execute:每次请求调用,从input读数据、向output写数据
在 init() 中调用 pine.Register(),所有元信息字段必填:
package myop
import (
"context"
pine "github.com/Liam0205/pineapple"
)
func init() {
pine.Register(pine.OperatorSchema{
Name: "transform_my_custom",
Type: pine.OpTypeTransform,
Description: "Computes a custom feature for each item.",
Params: map[string]pine.ParamSpec{
"field": {Type: "string", Required: true, Description: "Input field name."},
"factor": {Type: "float64", Required: false, Default: 1.0, Description: "Scaling factor."},
},
}, func() pine.Operator {
return &MyCustomOp{}
})
}
type MyCustomOp struct {
field string
factor float64
}
func (o *MyCustomOp) Init(params map[string]any) error {
o.field = params["field"].(string)
o.factor = params["factor"].(float64)
return nil
}
func (o *MyCustomOp) Execute(_ context.Context, in *pine.OperatorInput, out *pine.OperatorOutput) error {
for i := 0; i < in.ItemCount(); i++ {
val := in.Item(i, o.field).(float64)
out.SetItem(i, o.field+"_scaled", val*o.factor)
}
return nil
}| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
Name |
string | Yes | 算子唯一标识,蛇形命名,前缀体现类型(recall_/transform_/filter_/merge_/reorder_/observe_) |
Type |
OperatorType | Yes | 类型(Recall / Transform / Filter / Merge / Reorder / Observe) |
Description |
string | Yes | 一句话功能描述 |
Params[k].Type |
string | Yes | "string" / "int64" / "float64" / "bool" / "any" |
Params[k].Required |
bool | Yes | 是否必填 |
Params[k].Default |
any | No | 可选参数的默认值 |
Params[k].Description |
string | Yes | 参数描述 |
缺少
Type、Description或任一参数的Description将导致启动 panic。
在源文件顶部添加 Metadata contract 注释段,codegen 会将其解析到文档中:
// Operator: transform_my_custom
// Type: Transform
// ...
//
// Metadata contract (typical usage):
// CommonInput: []
// CommonOutput: []
// ItemInput: [<field>]
// ItemOutput: [<field>_scaled]
package myop# 生成 Python DSL 绑定 + 算子文档
go run ./cmd/pineapple-codegen \
-output apple_generated \
-doc-dir doc/operators \
-operators-dir operators这将自动生成:
apple_generated/operators.py— 带类型提示的 Python 算子类apple_generated/__init__.py— 算子导出列表apple_generated/resources.py— 带类型提示的 Python 资源类(若有注册资源)apple_generated/resources_init.py— 资源导出列表(若有注册资源)doc/operators/<name>.md— 每个算子的文档doc/operators/README.md— 按分类索引
# 单个算子包
go test ./operators/transform/...
# 全量测试
go test ./...
# 覆盖率报告
go test -coverprofile=coverage.out -covermode=atomic ./...
go tool cover -func=coverage.out
# Python 测试(含覆盖率)
python3 -m pytest apple/tests/ -v --cov=apple --cov-report=termRedis 算子测试使用 miniredis 提供内存级 Redis 服务,无需外部 Redis 实例。HTTP handler 测试使用标准库 net/http/httptest。
# Go
golangci-lint run ./...
# Python
ruff check apple/# JSON 配置解析
go test -run=^$ -fuzz=FuzzLoad -fuzztime=30s -parallel=4 ./internal/config/
# DAG 构建
go test -run=^$ -fuzz=FuzzBuild -fuzztime=30s -parallel=4 ./internal/dag/
# DataFrame 行存/列存语义一致性
go test -run=^$ -fuzz=FuzzApplyOutputStorageEquivalence -fuzztime=30s -parallel=4 ./internal/dataframe/
# data_parallel 与单 shard 语义一致性
go test -run=^$ -fuzz=FuzzDataParallelEquivalence -fuzztime=30s -parallel=4 ./internal/runtime/# 默认测试包含轻量 HTTP 并发覆盖;高压测试需显式开启
PINEAPPLE_STRESS=1 GOMAXPROCS=$(nproc) go test -race -run TestServerHighConcurrencyStress -count=1 -timeout=10m ./pkg/server/
# HTTP 吞吐 benchmark:可调复杂 DAG 的深度、宽度、fan-in、算子 CPU 强度、items、workers、reload
GOMAXPROCS=$(nproc) go test -run=^$ -bench=BenchmarkHTTPServerComplexDAGThroughput -benchmem -benchtime=5s ./pkg/server \
-args -pineapple.bench.depth=8 -pineapple.bench.width=32 -pineapple.bench.fanin=4 \
-pineapple.bench.work=100000 -pineapple.bench.items=0 -pineapple.bench.workers=256 \
-pineapple.bench.reload=true资源(特征索引、AB 配置等需要定时刷新的数据)与算子对称,走 Go Schema → codegen Python 类 → DSL 声明 → 编译到统一 JSON 的全流程。
在 init() 中调用 pine.RegisterResource():
package myresource
import (
pine "github.com/Liam0205/pineapple"
"github.com/Liam0205/pineapple/pkg/resource"
)
func init() {
pine.RegisterResource(pine.ResourceSchema{
Name: "feature_index",
Description: "User feature lookup table.",
DefaultInterval: 600, // 刷新间隔(秒),0 → 默认 10min
Params: map[string]pine.ParamSpec{
"dsn": {Type: "string", Required: true, Description: "Database DSN."},
},
}, func(params map[string]any) (resource.Fetcher, error) {
dsn := params["dsn"].(string)
return newFeatureIndexFetcher(dsn), nil
})
}| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
Name |
string | Yes | 资源唯一标识,蛇形命名 |
Description |
string | Yes | 一句话功能描述 |
DefaultInterval |
int | No | 刷新间隔(秒),0 → 默认 10min |
Params[k] |
ParamSpec | — | 与算子共用 ParamSpec(Type、Required、Default、Description) |
codegen 会生成带类型的 Python 资源类。在 pipeline 文件中:
from apple_generated.resources import FeatureIndexResource
flow.resource("my_index", FeatureIndexResource(dsn="host:3306/db"))编译后,统一 JSON 配置包含 resource_config:
{
"pipeline_config": { ... },
"pipeline_group": { ... },
"flow_contract": { ... },
"resource_config": {
"my_index": {
"type": "feature_index",
"interval": 600,
"params": {"dsn": "host:3306/db"}
}
}
}import "github.com/Liam0205/pineapple/pkg/resource"
func (o *MyOp) Execute(ctx context.Context, in *pine.OperatorInput, out *pine.OperatorOutput) error {
rp := resource.FromContext(ctx)
if rp == nil {
return nil // 未注入,降级处理
}
idx, ok := rp.Get("my_index")
if !ok {
return nil // 资源未就绪,降级
}
table := idx.(*FeatureTable)
// 使用 table ...
return nil
}详见 动态资源管理设计文档。
from apple.flow import Flow
flow = Flow(
name="my_pipeline",
common_input=["user_id", "user_age"], # 请求级上下文字段
item_output=["item_id", "item_score"], # 最终输出字段
)# 召回候选集
flow.recall_static(
item_output=["item_id", "item_score"],
items=[...],
)
# 过滤
flow.filter_condition(
item_input=["item_status"],
field="item_status",
value="offline",
)
# 特征处理
flow.transform_normalize(
item_input=["item_score"],
item_output=["item_score_norm"],
field="item_score",
)
# 截断
flow.filter_truncate(
top_n=50,
)
# 排序
flow.reorder_sort(
item_input=["item_score_norm"],
field="item_score_norm",
order="desc",
)flow.if_("{{is_new_user}}") \
.transform_dispatch(
common_input=["default_score"],
item_output=["item_score"],
common_field="default_score",
item_field="item_score",
) \
.else_() \
.transform_by_lua(
common_input=["user_id"],
item_input=["item_id"],
item_output=["item_score"],
lua_script="...",
function_for_item="score",
) \
.end_if_()from apple.flow import Flow, SubFlow
normalize_sub = SubFlow(name="normalize")
normalize_sub.transform_normalize(
item_input=["raw_score"],
item_output=["norm_score"],
field="raw_score",
)
flow = Flow(
name="main",
common_input=["user_id"],
item_output=["item_id", "norm_score"],
sub_flows=[normalize_sub],
)
flow.recall_static(...)
# normalize_sub 的算子会被展开到 flow 中当算子依赖外部数据时,在 pipeline 中声明资源。codegen 会为每种资源生成带类型的 Python 类:
from apple_generated.resources import FeatureIndexResource
flow = Flow(name="my_pipeline", ...)
# 声明资源
flow.resource("my_index", FeatureIndexResource(dsn="host:3306/db"))
# 算子引用资源(通过 resource_name 参数)
flow.recall_feature_index(
resource_name="my_index",
item_output=["item_id", "score"],
)编译器会校验所有 resource_name 引用是否有匹配的资源声明,未声明即报错。
每个算子调用需要声明它读写的字段:
| 参数 | 含义 |
|---|---|
common_input |
读取的请求级字段 |
common_output |
写入的请求级字段 |
item_input |
读取的物品级字段 |
item_output |
写入的物品级字段 |
item_defaults |
物品级字段默认值 |
common_defaults |
请求级字段默认值 |
sources |
合并算子的数据来源 |
debug |
启用此算子的调试快照 |
data_parallel |
数据并行分片数(仅 Transform,需空 common_output) |
Recall 身份由算子名前缀 (
recall_) 自动推导,无需手动声明。
# 编译为 JSON 字符串
json_str = flow.compile()
# 编译为 dict(不写文件)
config = flow.compile_dict()编译器自动执行以下校验:
- 字段覆盖 — 算子声明读取的字段必须有上游产出
- 死代码检测 — 产出字段未被下游消费的算子会被标记(observe 类算子豁免)
- 写后覆写 — 检测同一字段被多次写入
- 控制流完整性 —
if_必须有对应的end_if_ - 空分支检测 — 控制块的每个分支下必须有至少一个业务算子,空分支(如
if_("{{cond}}").end_if_())会报错 - 数据并行约束 —
data_parallel > 1时必须是 Transform 算子且common_output为空 - 参数-元数据一致性 —
transform_resource_lookup的lookup_key必须出现在item_input,output_field必须出现在item_output,防止业务参数与元数据声明不匹配导致运行时静默错误 - 控制算子可辨识命名 — 条件分支编译后的控制算子使用
if_1、elseif_2、else_3等显式名称,在 DAG 可视化中可直观辨识
执行 pipeline。
请求体:
{
"common": {"user_id": "123", "user_age": 25},
"items": []
}响应体:
{
"common": {"user_id": "123", "user_age": 25},
"items": [
{"item_id": "a", "item_score": 0.95}
],
"warnings": [],
"trace": [
{
"name": "recall_static_ABA9A7",
"duration_ms": 0.123,
"skipped": false
}
]
}健康检查。返回 {"status": "ok"}。
引擎运行统计。返回复合 JSON 结构,包含四个维度:
{
"operators": {"<operator_name>": {"exec_count": 100, "skip_count": 0, ...}},
"scheduler": {"run_count": 100, "peak_concurrency": 4},
"server": {"reload_count": 3, "reload_error_count": 0, "last_reload_duration_ns": 5234000},
"operator_detail": {"<operator_name>": {"borrow_count": 100, ...}}
}operators:per-operator 累计统计scheduler:调度器级统计(运行次数、峰值并发)server:配置热重载统计operator_detail:实现StatsProvider接口的算子自定义统计(如 Lua pool 指标)
返回编译后的 DAG 结构可视化。通过查询参数选择输出格式和渲染模式:
| 参数 | 值 | 说明 |
|---|---|---|
format |
dot(默认) |
Graphviz DOT 格式,可通过 dot -Tsvg 渲染 |
format |
mermaid |
Mermaid flowchart 格式,可嵌入 Markdown |
collapse |
subflow |
将同一 SubFlow 内的算子折叠为单个聚合节点 |
节点按算子类型着色(Recall 绿、Transform 蓝、Filter 橙、Merge 紫、Reorder 黄、Observe 灰),标签包含算子名。
# 获取 DOT 格式
curl http://localhost:8080/dag
# 渲染为 SVG
curl -s http://localhost:8080/dag | dot -Tsvg -o dag.svg
# 获取 Mermaid 格式
curl http://localhost:8080/dag?format=mermaid
# SubFlow 折叠渲染
curl http://localhost:8080/dag?format=dot&collapse=subflowpineapple/
├── apple/ # Python DSL (Apple)
│ ├── base.py # 算子基类
│ ├── resource.py # 资源基类 + ResourceDecl
│ ├── flow.py # Flow/SubFlow 声明(含资源声明)
│ ├── compiler.py # 编译器:DSL -> JSON(含 resource_config)
│ ├── validator.py # 静态校验器
│ ├── control.py # 控制流 (if/else) 支持
│ ├── generated/ # 自动生成的 Python 绑定
│ └── tests/ # Python 测试
├── cmd/
│ ├── pineapple-server/ # HTTP 服务入口
│ └── pineapple-codegen/ # 代码 & 文档生成工具
├── pkg/
│ ├── metrics/ # 可插拔指标接口 (Counter/Gauge/Histogram + Nop)
│ ├── resource/ # 动态资源管理 (ResourceManager)
│ ├── server/ # 可复用 HTTP 服务库
│ └── codegen/ # 可复用代码生成库
├── design_doc/ # 设计文档 (01-12)
├── doc/
│ ├── operators/ # 自动生成的算子文档
│ └── reports/ # 测试 & 性能报告
├── internal/ # Go 内部包
│ ├── config/ # JSON 配置解析
│ ├── dag/ # DAG 构建与拓扑排序
│ ├── dataframe/ # DataFrame 实现
│ ├── registry/ # 算子注册表
│ ├── runtime/ # 调度器、trace、stats
│ └── types/ # 核心类型定义
├── operators/ # 内置算子实现
│ ├── transform/ # transform_dispatch, transform_normalize, transform_resource_lookup, transform_by_remote_pineapple
│ ├── filter/ # filter_condition, filter_truncate
│ ├── lua/ # transform_by_lua (Lua 嵌入)
│ ├── merge/ # merge_dedup
│ ├── observe/ # observe_log
│ ├── recall/ # recall_static, recall_resource
│ └── reorder/ # reorder_sort
├── integration/ # 集成测试
├── benchmarks/ # 性能基准测试
└── testdata/ # 测试用 JSON 配置
-
设计文档
-
算子参考文档 — 所有内置算子的详细说明、参数、用法示例
GET /stats 端点始终可用,基于 atomic 计数器,零外部依赖。覆盖调度器执行、算子耗时、Lua pool 使用、配置热重载四个维度。
Pineapple 通过 pkg/metrics.Provider 接口支持外部指标导出,核心库不依赖 prometheus/client_golang。接入方在自己的项目中实现 Provider(约 80 行),通过 pine.WithMetrics() 或 server.Config.Metrics 注入即可。
// 在 server wrapper 中注入 Prometheus provider
mp := promadapter.New(prometheus.DefaultRegisterer)
server.Run(server.Config{
ConfigPath: *configPath,
Addr: *addr,
Metrics: mp,
})详见 可观测性设计文档。
算子可选实现 StatsProvider 接口暴露自定义统计到 /stats 的 operator_detail 字段,或实现 MetricsAware 接口向 Provider 注册自定义 Prometheus 指标。
第三方项目可以在不修改 pineapple 源码的前提下添加自定义算子和资源。核心思路:写自己的算子/资源包,通过 blank import 注册到全局 registry,然后用 pkg/server 和 pkg/codegen 构建自己的服务和 Python 绑定。
my-project/
├── go.mod # require github.com/Liam0205/pineapple
├── operators/
│ └── my_scorer/
│ └── scorer.go # init() { pine.Register(schema, factory) }
├── resources/
│ └── feature_index/
│ └── feature_index.go # init() { pine.RegisterResource(schema, factory) }
├── cmd/
│ ├── my-server/
│ │ └── main.go # import 算子 + 资源 → server.Run()
│ └── my-codegen/
│ └── main.go # import 算子 + 资源 → codegen.Run()
├── apple_generated/ # codegen 产出(算子 + 资源的 Python 绑定)
└── pipelines/
└── my_pipeline.py # DSL 声明算子 + 资源 → 编译统一 JSON
package main
import (
"flag"
"log"
_ "github.com/Liam0205/pineapple/operators" // 内置算子
_ "my-project/operators/my_scorer" // 自定义算子
_ "my-project/resources/feature_index" // 自定义资源
"github.com/Liam0205/pineapple/pkg/server"
)
func main() {
configPath := flag.String("config", "", "Unified JSON config (pipeline + resources)")
addr := flag.String("addr", ":8080", "Listen address")
flag.Parse()
if err := server.Run(server.Config{
ConfigPath: *configPath,
Addr: *addr,
}); err != nil {
log.Fatal(err)
}
}package main
import (
"flag"
"fmt"
"os"
_ "github.com/Liam0205/pineapple/operators"
_ "my-project/operators/my_scorer"
_ "my-project/resources/feature_index"
"github.com/Liam0205/pineapple/pkg/codegen"
)
func main() {
output := flag.String("output", "apple_generated", "Python output dir")
docDir := flag.String("doc-dir", "", "Doc output dir")
opsDir := flag.String("operators-dir", "operators", "Go operators source")
flag.Parse()
if err := codegen.Run(codegen.Config{OutputDir: *output, DocDir: *docDir, OpsDir: *opsDir}); err != nil {
fmt.Fprintf(os.Stderr, "codegen: %v\n", err)
os.Exit(1)
}
}详见 发布与第三方扩展设计文档。