1. 为什么需要算子化

在业务开发过程中,架构研发会说:“我不想了解业务,这不是我该关心的事。何况有一些业务代码写的还很乱”。业务研发会说:“这次我要新接入一个下游服务,但是现在服务性能不行接不了,架构优化不动不给力”。两者相互耦合,相互牵扯,在此过程中,算子化的设计思想就被提出了。

算子化主要做了以下两件事:

  • 数据依赖描述计算:用 DAG 描述输入输出与节点依赖关系
  • 性能优化和业务逻辑正交:在 DAG 通过依赖描述解决并行设计问题,业务逻辑收敛到算子Op中。

下面从分层架构看,一套可落地的算子化服务端通常包含哪些部分。


2. 算子化的架构分层

一个可维护的算子化服务端通常分为四层:

┌─────────────────┬──────────────────────────────────────────┐
│  API Layer      │  HTTP/gRPC 入口、限流熔断                │
├─────────────────┼──────────────────────────────────────────┤
│  Graph Layer    │  算子图 DAG 定义、建图                   │
├─────────────────┼──────────────────────────────────────────┤
│  Runtime Layer  │  调度器框架                              │
├─────────────────┼──────────────────────────────────────────┤
│  Operator Layer │  具体算子:召回、过滤、打分、重排……(业务逻辑)│
└─────────────────┴──────────────────────────────────────────┘

关键约束:

  • Graph 无状态——有状态的操作需要隔离到API Layer,例如限流、熔断
  • Runtime 可观测——只负责调度、资源与观测,该层负责埋点性能指标,对业务代码无入侵

这样,架构研发关注API LayerRuntime Layer,业务研发关注Graph LayerOperator Layer。此外,Graph Layer在做数据流建模时,需要架构把关,避免走偏


3. 算子图建模

对于算子图,我们通常希望它至少支持:

  1. 算子级热插拔:可熔断(故障隔离)、可短路(热摘除)
  2. 依赖显式声明:拓扑可读、可 diff,方便实验与 Code Review
  3. 配置级演进:加/摘算子、改并行宽度,尽量不动无关算子代码

要满足这些能力,设计时必须隔离控制流与数据流。二者若耦在算子实现里,每次摘流、熔断、改拓扑都会牵动业务逻辑,热插拔就名存实亡。

数据流回答「算什么、传什么」——算子之间通过有类型的输入/输出连接,依赖关系构成 DAG。召回列表、特征、打分结果等业务载荷只在这条流里传递;不应依赖全局变量、隐藏单例,或 RequestContext 里的「魔法字段」在算子间偷传数据。

控制流回答「要不要算、怎么算」——是否执行、是否并行、是否超时、是否熔断、是否跳过下游,属于 Graph + Runtime 的策略。算子可以返回 Status::kSkipStatus::kDegraded 等信号,但不应在内部硬编码「下游是谁、失败时替谁兜底」——那会把拓扑焊死在代码里。

flowchart LR subgraph dataFlow["数据流(Graph 声明)"] A[Op A] -->|data_a| B[Op B] -->|data_b| C[Op C] end subgraph ctrlFlow["控制流(Runtime 执行)"] P["NodePolicy<br/>enabled · circuit · timeout"] end P -.-> A P -.-> B P -.-> C
维度数据流控制流
关注点输入/输出契约、依赖边调度、熔断、短路、超时、降级
声明位置Graph Layer(DAG 定义)Graph + Runtime(策略与执行)
典型变更加特征、换模型、改输出 schema摘流、限流、扩缩、故障隔离

落到设计约束:

  • 算子只产出数据,不决定后继——后继是否执行,由调度器根据依赖边与策略判断。
  • 短路/熔断不破坏契约——被跳过的算子,下游要么不依赖其输出(真分支),要么引擎提供约定好的空输出/默认输出,避免 nullptr 散落各处。
  • Context 只承载横切元数据——trace、arena、超时令牌、实验分桶可进 RequestContext;跨算子的业务结果必须走显式 Data Edge。

在此前提下,「热摘除一个重排算子」改图配置即可;「熔断一个频繁超时的召回源」在该边挂策略即可,Rank 算子无需跟着改。

下面用一段精简代码说明二者如何分工:算子只管数据变换,Runtime 只管是否执行

// ── 数据流:节点、依赖、显式数据绑定 ──
ADD_NODE(A);
ADD_NODE(B);
ADD_NODE(C);
ADD_NODE(D);

ADD_EDGE(A, B);
ADD_EDGE(A, C);
ADD_EDGE(B, D);
ADD_EDGE(C, D);

ASSIGN_DATA(A, data_a, B, data_b);
ASSIGN_DATA(A, data_a, C, data_c);
ASSIGN_DATA(B, data_b, D, data_d);
ASSIGN_DATA(C, data_c, D, data_d);

// ── 控制流:与数据边正交,Runtime 解释执行 ──
DISABLE_NODE(B);                 // 热摘除:不执行 B,注入兜底值
flowchart TB A[Op A] --> B[Op B] A --> C[Op C] B --> D[Op D] C --> D B -.-x|DISABLE| skipB[跳过 B · 注入 fallback]

4. 数据建模

§3 里 ASSIGN_DATA(A, data_a, B, data_b) 只声明了数据从哪来到哪去,并未保证 A 的输出结构与 B 的输入结构一致。实际链路里,上游算子换字段名、增删列、改类型是常态——若让 B 直接读 A 的内部结构,两者就会耦死,热插拔与独立演进都无从谈起。

因此需要在边上增加数据对齐层:A 产出 data_a,Runtime 按映射表转换为 B 能消费的 data_b,再调用 B。

A 执行 → data_a (Schema A) ──[Mapping A→B]──→ data_b (Schema B) → B 执行
flowchart LR A["Op A<br/>Schema A"] -->|data_a| M["Mapping<br/>A → B"] M -->|data_b| B["Op B<br/>Schema B"]

对齐通常用**映射表(Field Mapping Table)**描述,挂在 Graph 的 Data Edge 上,而不是写进算子代码里:

映射类型含义示例
直传同名字段拷贝ids → ids
重命名(通常不允许、建议改成同名)字段改名recall_score → rank_feat
变换(通常不允许、建议改成同类型)类型/语义转换int64 id → string id
默认值上游缺失时填充source = ""
// ── 各算子维护自己的 Schema(算子内可见,算子间不可见)──
struct SchemaA {           // A 的输出
    Span<ItemId> ids;
    Span<float> recall_scores;
    Span<uint8_t> sources;
};

struct SchemaB {           // B 的输入
    Span<ItemId> ids;
    Span<float> rank_features;
    Span<uint8_t> sources;
};

// ── 映射表:声明 A.data_a → B.data_b 的字段对齐规则 ──
BEGIN_MAPPING(A, data_a, B, data_b)
    MAP_DIRECT(ids, ids)
    MAP_RENAME(recall_scores, rank_features)
    MAP_TRANSFORM(sources, sources, CastSource)   // 可选变换函数
    MAP_DEFAULT(rank_features, 0.0f)              // A 未产出时兜底
END_MAPPING()

// ── 与 §3 建图 DSL 组合 ──
ADD_NODE(A);
ADD_NODE(B);
ADD_EDGE(A, B);
ASSIGN_DATA(A, data_a, B, data_b);
BIND_MAPPING(A, data_a, B, data_b, mapping_a2b);  // 边绑定映射表

// ── Runtime 在边上执行对齐(宏展开示意)──
Status AlignAndRun(RequestContext& ctx) {
    const SchemaA& raw = slots_.Get<A>("data_a");
    SchemaB& aligned = slots_.Alloc<B>("data_b");

    for (const auto& rule : mapping_a2b) {
        if (!rule.Apply(raw, &aligned)) {
            if (!rule.has_default) return Status::kError;
            rule.FillDefault(&aligned);
        }
    }
    return ops_["B"]->Execute(ctx, aligned, &slots_["data_c"]);
}

映射表的核心收益:

  • A、B 可独立迭代——A 加字段只改 SchemaA 和映射表,B 的 Execute 签名不变。
  • 对齐逻辑可配置——映射表可由 Graph 配置加载,实验时可换一张表而不重新编译算子。
  • 与 control flow 正交——DISABLE_NODE(B) 时走 FALLBACK;映射表只在「B 确实要执行」时生效。

工程上,映射表建议在建图期校验:字段是否存在、类型是否可转换、必填列是否有 MAP_DEFAULT 兜底;校验失败直接拒绝发布,避免线上才暴露对齐错误。


5. 内存与请求级 Arena

5.1 请求级 Arena

单条 query 处理约 500ms 时,瓶颈通常在 I/O、RPC 与模型推理,而不是 malloc 本身——此时引入 Arena 不是为了再抠几微秒,而是为了在「多算子、多中间结果、长生命周期」的链路里,把内存管理从分散释放变成整请求回收

一条请求在 DAG 里可能产生大量短生命周期对象:召回列表、过滤后的副本、特征块、临时 buffer……若每个算子各自 new/delete

  • 正确性成本高:拓扑一复杂,就容易漏释放、重复释放,或在异步/降级路径上生命周期不一致。
  • 并发下仍会有全局分配器压力:500ms 不代表 QPS 低;1000 QPS 同时有数百个在途请求,全局 heap 的锁竞争与缓存行抖动依然可见,只是不是首要矛盾。
  • 峰值不可预期:中间对象散落在堆上,难以估算「单请求到底占多少内存」,容量规划与 OOM 排查都变难。

请求级 Arena 的做法是:一次请求一块池,请求结束 Reset() 一次性归还——算子只负责 Allocate,不负责 free

class Arena {
public:
    void* Allocate(size_t size, size_t align = alignof(std::max_align_t));
    template <typename T, typename... Args>
    T* New(Args&&... args);
    void Reset();  // 请求结束调用,O(1) 批量回收
};

struct ItemList {
    Item* items;   // 来自 ctx.arena
    size_t size;
};

// 算子输出写入 arena;下游只读或原地修改,生命周期 <= 本请求
Status RecallOp::Execute(RequestContext& ctx,
                         const Query& in,
                         ItemList* out) {
    out->items = ctx.arena->NewArray<Item>(in.limit * 2);
    out->size = Fetch(ctx, in, out->items);
    return Status::kOk;
}

// 请求入口
Status Handle(RequestContext& ctx, const Query& q, Response* resp) {
    Arena arena;
    ctx.arena = &arena;
    auto status = runtime.Execute(ctx, q, resp);
    arena.Reset();  // 500ms 后统一回收,中间算子无需 delete
    return status;
}

在 500ms 量级下,Arena 的主要收益可以概括为:

flowchart LR Req[请求开始] --> Arena[请求级 Arena] Arena --> Op1[Op 1 Allocate] Op1 --> Op2[Op 2 Allocate] Op2 --> OpN[Op N Allocate] OpN --> Reset["Reset() 批量回收"] Reset --> End[请求结束]
收益说明
生命周期清晰所有中间结果随请求生灭,算子不必约定「谁分配谁释放」
批量回收一次 Reset() 代替 N 次 delete,简化异常/短路/熔断路径
峰值可估按「单请求 arena 上限」做限流与容量规划
与算子解耦换拓扑、加算子,不必逐条审查内存归属

若链路极短、中间对象极少,Arena 确实可以不上;但当算子图变长、中间数据结构变多,Arena 首先是工程可靠性工具,其次才是性能工具。


6. 执行引擎:DAG

执行引擎(Runtime)处于架构分层的中心:向上承接 Graph 的拓扑与策略,向下调度 Operator 的业务计算。它的设计目标不是「把 DAG 跑通」这么简单,而是同时满足两件事:

  • 业务正交:算子只写「算什么」,不感知调度、埋点、超时、实验路由;
  • 可观测性内建:每个节点的耗时、状态、输入输出规模,由 Runtime 统一采集,业务代码零入侵。
sequenceDiagram participant R as Runtime participant O as Operator R->>R: ScopedSpan / ScopedTimer alt policy.ShouldRun R->>O: Execute(in, out) O-->>R: Status R->>R: NodeMetrics.Record else 热摘除 / 熔断 R->>R: InjectFallback end

6.1 业务正交:Runtime 包装,算子裸奔

算子对外只暴露 Execute(ctx, in, out);Runtime 在调用前后注入横切逻辑,算子内部不出现 metrics、trace、熔断判断:

enum class Status { kOk, kSkip, kDegraded, kError };

class Operator {
public:
    virtual ~Operator() = default;
    virtual Status Execute(RequestContext& ctx,
                         const void* in, void* out) = 0;
};

// Runtime 统一包装——业务算子不继承、不调用这部分
class Runtime {
public:
    Status RunNode(int node_id, RequestContext& ctx) {
        auto& node = graph_.Node(node_id);
        ScopedSpan span(ctx.trace, node.name);       // 自动 trace
        ScopedTimer timer(&node_metrics_[node_id]);  // 自动耗时

        if (!policy_.ShouldRun(node_id)) {
            node_metrics_[node_id].skip_cnt++;
            return InjectFallback(node_id, ctx);
        }

        Status st = ops_[node_id]->Execute(ctx,
                                           LoadInput(node_id, ctx),
                                           AllocOutput(node_id, ctx));
        node_metrics_[node_id].Record(st);
        return st;
    }

private:
    GraphDef graph_;
    std::vector<Operator*> ops_;
    std::vector<NodeMetrics> node_metrics_;
    ControlPolicy policy_;
};

Recall、Rank、ReRank 的实现里只有算法逻辑;换调度策略、改埋点格式、加全链路 trace,只改 Runtime,不动业务算子——这就是 §2 里「Runtime 可观测、对业务无入侵」的落地方式。

6.2 可观测性:从拓扑自动生成观测面

可观测性不应依赖每个算子手写 LOG / METRIC。Graph 一旦定义,Runtime 天然知道:

观测维度来源业务是否参与
节点耗时Runtime 包装计时
成功/Skip/Degrade/ErrorStatus 返回值否(只返回状态)
输入/输出规模对齐后 schema 的 size 字段否(Runtime 读元信息)
调用链 Trace按拓扑生成 Span 父子关系
实验分桶RequestContext::exp否(Context 注入)
struct NodeMetrics {
    Counter ok, skip, degraded, error;
    Histogram latency_us;
    Histogram input_size, output_size;

    void Record(Status st) {
        switch (st) {
            case Status::kOk:       ok.Inc(); break;
            case Status::kSkip:     skip.Inc(); break;
            case Status::kDegraded: degraded.Inc(); break;
            default:                error.Inc(); break;
        }
    }
};

// 500ms 链路上,定位 tail latency 通常看 P99 最慢节点
// Runtime 按 node_id 上报,无需业务侧逐段埋点
void ExportMetrics(const std::vector<NodeMetrics>& m, const GraphDef& g) {
    for (int i = 0; i < g.size(); ++i) {
        Report(g.Node(i).name, m[i]);  // Recall / Rank / ReRank 各一条
    }
}

线上排查「这次请求为什么慢」:trace 按 DAG 展开,直接看到哪个节点拖尾;若某节点 degraded 飙高,结合 §3 的 CIRCUIT_BREAKER 策略即可定位,而不必翻业务代码里的 if-else。


总结

算子化不是换框架,而是把服务端计算拆成四层协作:算子写业务,Graph 声明依赖与策略,Runtime 负责调度与观测,API 承接有状态横切

全文贯穿一条原则——控制流与数据流分离:数据用 DAG + 映射表对齐,熔断/短路/热插拔改配置;算子只产数据,Runtime 统一埋点。500ms 级链路上,Arena 解决中间对象的生命周期,可观测性从拓扑自动生成,业务代码保持干净。

落地时记住三件事:依赖显式声明、边上做数据对齐、横切逻辑不进算子。做到这三点,换策略、摘节点、查慢请求才不需要推翻重来。