工作流引擎

工作流引擎以各种各样的姿态在历史上反复出现:

  • IFTTT
  • Zapier
  • Pipedream
  • Integromat
  • n8n
  • Node-RED
  • Huggin
  • Beehive
  • Trigger Happer
  • Apache NiFi
  • Apache Airflow: 通过Python的运算符重载编写DAG作为工作流程.
  • Apache Flink
  • Apache ODE
  • Apache Airavata
  • Dagster: Airflow杀手, DAG能够直接以函数调用形式表示(分支条件通过多返回值实现, 太不直观).
  • Perfect: Airflow杀手, 和Dagster很相似, 但给人的感觉不如Dagster.
  • Amazon Simple Workflow (SWF)
  • Luigi
  • Airbyte
  • Conductor
  • mara-pipelines
  • XState: 以执行副作用为主要功能的状态图, 与持久化存储结合后可以作为工作流程使用.
  • <<Azure Durable Functions>>: 最早使用生成器做编排, 也最成熟的例子.
    底层使用了事件溯源以支持从中断中恢复(由于事件本身被用于保存中间状态, 该功能只可能通过事件溯源来实现).
    底层框架是开源的: https://github.com/Azure/durabletask
    为采取生成器+事件溯源机制所做的妥协:
    • 为了允许重建状态, 编写代码的约束很多, 每个步骤都需要具有确定性.
    • 由于重放, 编排器内相同的代码会重复执行
      (算不上大问题, 因为大部份编排代码只是占用极少量内存和CPU时间而已, 用它们换取可读性是划算的).
  • [[https://cadenceworkflow.io/][<<Cadence>>]]: Uber的工作流引擎, 和Azure Durable Functions高度相似.
  • CI/CD引擎:
    尽管CI/CI引擎可能不提供从中断中恢复的功能, 但它们也属于工作流引擎, 只不过它们的原语更倾向于特定类型的任务.
  • ...

所有这些引擎都是在为不同身份的用户服务,
以完成某种特定目的(自动化, 大数据处理等)下的ETL(Extract, Transform, Load)任务或其他类型的长时运行过程,
甚至手工编写的脚本也可以归于此类(尽管缺乏一些关键特性, 但大多数框架都是从脚本起步的),
这是它们如此相似的根本原因.

工作流编排框架有很多细节差异, 但在抽象层面, 这些差异大多是可以被忽略的,
将工作流编排框架在抽象层面上进一步拆解, 可以发现它们相似的本质.

  • 容错性, 可中断, 可从中断处继续.
  • 可扩展性, 并行化, 分布式, 可以水平伸缩.
  • 记录错误, 允许之后从错误中恢复任务.
  • 主要的流程编排可以于一处完成.
  • 能够与现有的其他服务集成.
  • 任务的运行位置通常需要借助日志来追踪.

在抽象层面是相同的.

FBP通常是以一种编程模式出现, 它的侧重点在于让计算过程可以以组合组件的方式来实现.
因此FBP通常不在乎计算过程的并行性, 并且倾向于使用UI而不是DSL来组合组件.

函数链指的是按顺序逐个执行多个子过程.
每个子过程的结果都会被存储, 以便在中断时恢复.

// 互不相关的子过程的函数链
const result1 = await sub1()
const result2 = await sub2()
const result3 = await sub3()
// 相关的子过程的函数链
const result1 = await sub1()
const result2 = await sub2(result1)
const result3 = await sub3(result2)

扇出: 将一份数据在不拆分的情况下发送给多个子过程.

sub1(data)
sub2(data)

根据实现, 有的扇出支持聚合运行结果(扇入, fan-in):

const [result1, result2] = await Promise.all([
sub1(data)
, sub2(data)
])

这是一种偏DSL的模式, 有多种表现方式.

典型代表是Apache Airflow, 已经逐渐被不需要手动定义DAG的项目取代.

processor1
.pipe([processor2, processor3])
.pipe(processor4) // same as `processor2.pipe(processor4)` and `processor3.pipe(processor4)`
processor1 >> [processor2, processor3] >> processor4
{
processor1: [
processor2
, processor3
]
, processor2: processor4
, processor3: processor4
}
  • 连接是手动建立的, 更容易满足类型定义.
  • DAG模式的节点依赖关系(无论是正向或反向的)是显式定义的, 导致代码编写繁琐, 实现和依赖关系定义的垂直距离也太远.
  • 难以取消正在运行的过程.
  • 支持扇出, 但不支持将扇出的结果重新聚合在一起.

流水线的本质是DAG, 只是在表达上有意朝同步代码靠拢.
流水线中的每个子过程都会立即开始并行运行, 任务可以真正从中途开始运行.

pipeline会根据各级流水线的完成速度, 自动调节并行任务的数量, 以确定一个适合当前网络的负载.

pipeline(
from('url')
, process('fetch', async (err, input) => {
await queue.enqueue('url', input.payload, input.priority)
})
// 存档点, 便于插入从中间状态开始的任务.
, save('raw')
, process('parse', err => {
// 抛出错误, 让其中断运行
throw e
})
, save('data')
, callback(async data => {
await dao.upsert(data)
})
)

语义层面上, pipeline不支持重新聚合扇出的结果(即扇回, fan-back-in), 因此每个结果会独自发往下游.

pipeline(
fanOut({
'route-1': [
process('process-1', async (err, input) => {
// ...
})
]
, 'route-2': [
process('process-2', async (err, input) => {
// ...
})
]
})
, to('result')
)

pipeline可以使用支持缓冲区的消息队列来引入allSettled原语, 其他原语无法真正被支持.

pipeline(
// 启动allSettled时, 创建一个缓冲区或多阶段提交, 此提交的id会作为元数据传给后续子过程.
allSettled(
[
process('process-1', async (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为0号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, error将作为0号索引值提交.
]
, [
guard(() => random() ? true : false)// 如果guard的输出为false, 则之后的流程不会运行, 1号索引值会以空值提交.
, process('process-2', async (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为1号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, error将作为1号索引值提交.
]
)
, callback(async ([result1, result2]) => {
// ...
})
)

由于子过程的原子化, 真正的all语义(在一个子过程出现错误后立即停止并行的其他子流程)是无法在工作流程中实现的.

all的一种替代实现是将allSettled的error替换成空值, 然后忽略掉存在空值的情况:

pipeline(
// 启动allSettled时, 创建一个缓冲区或多阶段提交, 此提交的id会作为元数据传给后续子过程.
all(
[
process('process-1', async (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为0号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, undefined将作为0号索引值提交.
]
, [
guard(() => random() ? true : false)// 如果guard的输出为false, 则之后的流程不会运行, 1号索引值会以空值提交.
, process('process-2', async (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为1号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, undefined将作为1号索引值提交.
]
)
// 只有result1和result2都不为undefined的情况下, callback才会运行.
// 若需要集中处理错误, 则应该使用allSettled.
, callback(async ([result1, result2]) => {
// ...
})
)

由于子过程的原子化, 真正的race语义(在一个子过程返回后立即停止并行的其他子流程)是无法在工作流程中实现的.

在pipeline里无法实现race原语:
在"每一个子流程都出错/中断"的情况下, 没有一个子流程能够完成提交, 因为这些子流程根本不知道其他子流程的运行情况.

race的一种不完美的替代实现是将allSettled的结果加上时间戳, 然后只处理时间戳最早的结果.

  • 子过程对它的上游和下游尽可能保持无知, 因此需要人工保证输入输出的类型安全.
  • 难以取消正在运行的过程.

持久化状态图可以被用来实现编排.

状态图的上下文需要保存一个id或计数值, 以实现乐观并发控制, 防止重复运行的状态图意外覆盖结果.

  • 支持无限循环.
  • 表达力较差, 需要通过画图来帮助梳理工作流程.

一种用生成器编排工作流程的模式.
生成器内的所有子过程被生成器整合为一个完整的工作流程, 每个子过程在编排器的推进下执行.
子过程产生的中间状态会以事件的形式保存进Event Store, 配合生成器可以实现工作流程的状态重建, 从而能够从中断处恢复.

如果工作流程的完成速度较慢, 则可能出现很多挂起等待的任务, 生成器内相关的变量亦无法被垃圾回收.
因此工作流程实现通常会在执行新的yield后立即返回(以便垃圾回收), 等yield的任务完成后再重启工作流程.

用于"向一个对象追加消息"/"向一个事件序列追加事件"的存储仓库, 仅支持追加和删除整个实体.

具体代码表示形式也可以参考Azure Durable Functions和Cadence, 甚至redux-saga.

workflow(async function* () {
// 事件名: get-url_succeeded, get-url_failed.
const url = yield call('get-url')
let raw
try {
// 事件名: fetch_succeeded, fetch_failed.
raw = yield call('fetch', url)
} catch ({ err, payload, priority }) {
// 只要此处不引入新的yield, 并且会结束生成器, 则直接进行I/O操作是安全的.
// 以创建新任务的形式重试.
// 另一种重试方案是直接写一个while循环, 但这在连续失败的情况下会为事件序列增加大量失败事件.
await queue.enqueue('url', url, priority)
return
}
// 如果解析错误, 则会在此处中断工作流程的执行, 事件序列被保留用于待开发者修复错误后从中断处恢复.
// 事件名: parse_succeeded, parse_failed.
const data = yield function parse() {
// ...
return result
}
// 在工作流程的结尾处直接进行I/O操作是安全的.
await dao.upsert(data)
}

需要支持 all, allSettled, race 三种原语, 其中 race 原语使超时成为可能.
和pipeline一样, 这三种原语除了allSettled以外都不能真正还原其语义(只能忽略不需要的结果, 不能取消子过程).

workflow(async function* () {
const [resultA, resultB] = yield allSettled([
call('fetch', urlA)
, call('fetch', urlB)
])
})

和pipeline不同, 不需要任何消息队列就可以完成扇入, 因为每个子过程都会将结果写入event store. 每个子过程在写入后都会查询扇入情况以判断是否应该唤醒workflow.

workflow(async function* () {
yield waitForExternalEvent('UserInput')
console.log(Date.now())
})

向外部事件总线注册一个回调, 在调用的时间点后出现UserInput事件时,
就会在Event Store的指定位置写入一个事件, 然后唤醒工作流程.

workflow(async function* () {
yield schedule('2022-2-22 22:22:22') // 于指定时间之后唤醒
console.log(Date.now())
})

实现此功能需要消息队列支持让消息在指定时间之后才能出列.

本质上是计划运行的一种, 只是表现形式不同.

workflow(async function* () {
console.log(Date.now())
yield delay(1000 * 60) // 60秒后唤醒
console.log(Date.now())
})
workflow(async function* () {
// ...
yield continueAsNew(params)
})

事件溯源无法存储无限数量的事件, 在超长事件序列的工作流程里, 重建工作流程的性能表现也很差.
为解决性能问题, 允许用continueAsNew指令重用一个workflow id, 但清空它已有的所有事件序列.

  • 表现力接近于同步代码, 一个工作流程可以很快地从一个非分布式的编排式项目转换而来.
  • 取消正在运行的过程较为容易, 只需要删除掉对应的事件序列.
    (编排器本身在检测到事件序列不存在时, 选择停止运行).
  • 可以通过catch块来捕捉多个yield的错误, 以执行相同的回滚操作.
  • 可以通过外部事件"唤醒"特定的工作流程.
  • 有状态, 因此能够重新利用不同子过程的运行结果.
  • 由于生成器工作流程是从头到尾执行的, 任务不可能真正从中途开始运行, 不能创建从中间状态起始的任务.
    尽管在技术层面上, 的确可以模拟出需要插入的中间状态之前的事件, 或者让编排器能够针对某一特定类型的输入进行跳过,
    但终归是远不如其他方案直观方便.
  • 生成器yield的返回值类型依赖于手动定义.
  • 生成器内的代码会在中断恢复时重复执行, 需要人工保证代码的正确性, 并且会占用一定的内存和CPU时间.
  • 由于中间状态是以事件方式存储的, 变更工作流程的顺序时, 原有的事件序列会因为无法适应新的工作流程而报废.

不会, 因为新事件在写入Event Store时会指定index.

举例来说, 作为调度器的消息队列认为工作流程A没有及时调用子过程A来完成任务, 将消息重置回waiting状态.
工作流程B从调度器拉取被重置的任务, 从工作流程A的中断处继续运行, 调用子过程A.
然而, 被认为没有及时完成任务的工作流程A实际上仍在工作, 此时子过程A执行完毕, 将新的事件附加给了Event Store, 并继续接下来的流程.
接着, 工作流程B的子过程A执行完毕, 试图将新的事件附加给Event Store的指定index, 但发现指定index已经有事件写入了, 于是结束自身运行.

不会, 因为每个子过程在开始时会检查相应索引位置是否已经存在事件.

唯一会造成子过程重复运行的情况是作为调度器的消息队列在连续多次重置了消息状态,
但每次重置启动的重复的工作流程都没有来得及完成下一阶段的子任务.

之所以会出现这种情况, 很可能是因为调度器的消息队列有一个过短的超时时间,
或子过程的运行时长到了不合理的程度(比如等待用户交互, 但用户长时间处于离线状态).

在子过程开始时, 写入一个started事件, 该事件记录子过程预期的最晚结束时间戳.
如果到了预期的最晚结束时间, 子过程仍然没有完成, 则该子过程应该停止自己(或至少不会继续向Event Store写入事件),
期望下一个工作流程重新开始此子过程.

重复运行的工作流程在发现started事件时, 会通过对比当前时间来决定是启动子过程还是结束自身运行.

许多工作流编排框架需要运行一个K8s集群, Hadoop集群, 或是其他分布式集群,
然而这对于99.9%的ETL任务来说都是不必要的, 单机解决这些任务的速度可能会是集群的几百几千倍.

在现实世界里:

  • GB规模的ETL任务可以通过组合Unix程序解决问题.
  • TB规模的ETL任务可以通过一个单机的SQLite方案解决问题.

一些工作流编排框架明显偏向个人使用, 它们的最大的问题在于缺乏应用场景, 典型例子是IFTTT.

很多用例都像是生造出来的, 让人觉得根本无关紧要, 但仍有不少使用者陷入有一把锤子后看什么都是钉子的陷阱.

应该围绕实际需求构建用于工作流的工具, 而不是基于工具发现工作流.

当使用者的目标是与第三方服务集成, 实现一些简单的操作时, 低代码平台通常能够发挥它的作用,
但这只适用于一些预设的场景, 不能期待低代码平台能够在复杂场景下发挥作用.

低代码只在需要的时候写代码, 但经常会发现到处都需要写代码, 或者通过不写代码的方式解决问题比写代码还要繁琐.
一些低代码平台会提供脚本语言作为组件, 但通常实现得相当差, 充满局限性.

在线服务通常会预先集成与第三方服务的协同, 考虑到一些集成的申请难度和维护成本(例如Twitter API),
使用这些在线服务有时会是达成目的的唯一方法.

像Pipedream, Zapier, Integromat这样的服务基本上是直接以与第三方服务的集成为卖点的.