工作流引擎

工作流引擎以各种各样的姿态在历史上反复出现:

  • IFTTT
  • Zapier
  • Pipedream
  • Integromat
  • n8n
  • Node-RED
  • Huggin
  • Beehive
  • Trigger Happer
  • Apache NiFi
  • Apache Airflow: 通过Python的运算符重载编写DAG作为工作流程.
  • Apache Flink
  • Apache ODE
  • Apache Airavata
  • Dagster: Airflow杀手, DAG能够直接以函数调用形式表示(分支条件通过多返回值实现, 太不直观).
  • Perfect: Airflow杀手, 和Dagster很相似, 但给人的感觉不如Dagster.
  • Amazon Simple Workflow (SWF)
  • Luigi
  • Airbyte
  • Conductor
  • mara-pipelines
  • XState: 以执行副作用为主要功能的状态图, 与持久化存储结合后可以作为工作流程使用.
  • <<Azure Durable Functions>>:
    最早使用生成器做编排, 也最成熟的例子.
    底层使用了事件溯源以支持重建状态(从中断中恢复).
    底层框架是开源的: https://github.com/Azure/durabletask

为采取生成器+事件溯源机制所做的妥协:

  • 为了允许重建状态, 编写代码的约束很多, 每个步骤都需要具有确定性.
  • 由于重放, 编排器内相同的代码会重复执行.
    算不上大问题, 因为大部份编排代码只是占用极少量内存和CPU时间而已, 这种交换往往是划算的.
  • [[https://cadenceworkflow.io/][<<Cadence>>]]: Uber的工作流引擎, 和Azure Durable Functions高度相似.
  • CI/CD引擎:
    尽管CI/CI引擎可能不提供从中断中恢复的功能, 但它们也属于工作流引擎, 只不过它们的原语更倾向于特定类型的任务.
  • ...

所有这些引擎都是在为不同身份的用户服务,
以完成某种特定目的(自动化, 大数据处理等)下的ETL(Extract, Transform, Load)任务或其他类型的长时运行过程,
甚至手工编写的脚本也可以归于此类(尽管缺乏一些关键特性, 但大多数框架都是从脚本起步的),
这是它们如此相似的根本原因.

工作流编排框架有很多细节差异, 但在抽象层面, 这些差异大多是可以被忽略的,
将工作流编排框架在抽象层面上进一步拆解, 可以发现它们相似的本质.

  • 容错性, 可中断, 可从中断处继续.
  • 可扩展性, 并行化, 分布式, 可以水平伸缩.
  • 记录错误, 允许之后从错误中恢复任务.
  • 主要的流程编排可以于一处完成.
  • 能够与现有的其他服务集成.
  • 任务的运行位置通常需要借助日志来追踪.

在抽象层面是相同的.

FBP通常是以一种编程模式出现, 它的侧重点在于让计算过程可以以组合组件的方式来实现.
因此FBP通常不在乎计算过程的并行性, 并且倾向于使用UI而不是DSL来组合组件.

许多工作流编排框架需要运行一个K8s集群, Hadoop集群, 或是其他分布式集群,
然而这对于99%的ETL任务来说都是不必要的, 单机解决这些任务的速度可能会是集群的几百几千倍.

在现实世界里:

  • GB规模的ETL任务可以通过组合Unix程序解决问题.
  • TB规模的ETL任务可以通过一个单机的SQLite方案解决, 通过压缩数据库可以提高处理性能.

一些工作流编排框架明显偏向个人用例, 它们的最大的问题在于缺乏应用场景, 典型例子是IFTTT.

很多用例都像是生造出来的, 让人觉得根本无关紧要, 但仍有不少使用者陷入有一把锤子后看什么都是钉子的陷阱.

应该围绕实际需求构建用于工作流的工具, 而不是基于工具发现工作流.

当使用者的目标是与第三方服务集成, 实现一些简单的操作时, 低代码平台通常足够使用.
一旦使用者的需求超出服务供应商的预设时, 情况往往会急转直下, 以至于很难期待低代码平台能够在复杂场景下发挥作用.

低代码只在需要的时候写代码, 但经常会发现到处都需要写代码, 或者通过不写代码的方式解决问题比写代码还要繁琐.
一些低代码平台会提供脚本语言作为组件, 但通常实现得相当差, 充满局限性.

与一般的认识相反, 这种矛盾实际上是可调和的, 因为矛盾本质上只是一个封装问题:
对于新的问题, 首先通过低级代码来实现它, 然后再将其逐步抽象为高级代码就行了.
但是, 由于低级代码所需要的编程技能, 导致不可能成为一种流行选项,
这一点从Zapier等服务对自定义代码的支持程度(只提供了最低限度的功能)就可以看出.

在线服务通常会预先集成与第三方服务的协同, 考虑到一些集成的申请难度和维护成本(例如Twitter API),
使用这些在线服务有时会是达成目的的唯一方法.

像Pipedream, Zapier, Integromat这样的服务基本上是直接以与第三方服务的集成为卖点的.
这类以集成为卖点的服务具有很大的脆弱性, 因为它们依赖于自己不能控制的东西.
由于这类服务的这种显著特征, 它们的角色与其说是一个工作流, 更像是一个向Webhook/邮箱地址发送通知消息的胶水.

函数链指的是按顺序逐个执行多个子过程.
每个子过程的结果都会被存储, 以便在中断时恢复.

// 互不相关的子过程的函数链
const result1 = await sub1()
const result2 = await sub2()
const result3 = await sub3()
// 相关的子过程的函数链
const result1 = await sub1()
const result2 = await sub2(result1)
const result3 = await sub3(result2)

扇出: 将一份数据在不拆分的情况下发送给多个子过程.

sub1(data)
sub2(data)

根据实现, 有的扇出支持聚合运行结果(扇入, fan-in):

const [result1, result2] = await Promise.all([
sub1(data)
, sub2(data)
])

协同工作流的最低级形式, 表达力最差, 所有协同工作流都可以转换为此类数据结构.

典型代表是Apache Airflow, 已经逐渐被不需要手动定义DAG的项目取代.

processor1
.pipe([processor2, processor3])
.pipe(processor4) // same as `processor2.pipe(processor4)` and `processor3.pipe(processor4)`
processor1 >> [processor2, processor3] >> processor4
{
processor1: [
processor2
, processor3
]
, processor2: processor4
, processor3: processor4
}
  • 连接是手动建立的, 更容易满足类型定义.
  • 依赖关系(无论是正向或反向的)需要用代码显式定义, 这导致依赖关系定义和实现的垂直距离太远.
  • 难以观察/取消正在运行的过程.
  • 表达时支持扇出, 但不支持将扇出的结果重新聚合在一起.

流水线的本质是DAG, 只是在表达上有意朝同步代码靠拢.
流水线中的每个子过程都会立即开始并行运行, 任务可以真正从中途开始运行.

pipeline会根据各级流水线的完成速度, 自动调节并行任务的数量, 以确定一个适合当前网络的负载.

pipeline(
from('url')
, process('fetch', async (err, input) => {
await queue.enqueue('url', input.payload, input.priority)
})
, process('parse', err => {
// 抛出错误, 让其中断运行
throw e
})
, callback(async data => {
await dao.upsert(data)
})
)

语义层面上, pipeline不支持重新聚合扇出的结果(即扇回, fan-back-in), 因此每个结果会独自发往下游.

pipeline(
fanOut({
'route-1': [
process('process-1', (err, input) => {
// ...
})
]
, 'route-2': [
process('process-2', (err, input) => {
// ...
})
]
})
, to('result')
)

pipeline可以使用支持缓冲区的消息队列来引入allSettled原语, 其他原语无法真正被支持.

pipeline(
// 启动allSettled时, 创建一个缓冲区或多阶段提交, 此提交的id会作为元数据传给后续子过程.
allSettled(
[
process('process-1', (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为0号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, error将作为0号索引值提交.
]
, [
filter(condition)// 如果filter的输出为false, 则之后的流程不会运行, 1号索引值会以空值提交.
, process('process-2', (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为1号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, error将作为1号索引值提交.
]
)
, callback(async ([result1, result2]) => {
// ...
})
)

由于子过程的原子化, 真正的all语义(在一个子过程出现错误后立即停止并行的其他子流程)是无法在工作流程中实现的.

all的一种替代实现是将allSettled的error替换成空值, 然后忽略掉存在空值的情况:

pipeline(
// 启动allSettled时, 创建一个缓冲区或多阶段提交, 此提交的id会作为元数据传给后续子过程.
all(
[
process('process-1', (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为0号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, undefined将作为0号索引值提交.
]
, [
filter(condition)// 如果filter的输出为false, 则之后的流程不会运行, 1号索引值会以空值提交.
, process('process-2', (err, input) => {
// ...
})
// 如果process的输出为result, 作为分支里的最后一个流程, result将作为1号索引值提交.
// 如果process的输出为error, 则之后的流程不会运行, undefined将作为1号索引值提交.
]
)
// 只有result1和result2都不为undefined的情况下, callback才会运行.
// 若需要集中处理错误, 则应该使用allSettled.
, callback(([result1, result2]) => {
// ...
})
)

由于子过程的原子化, 真正的race语义(在一个子过程返回后立即停止并行的其他子流程)是无法在工作流程中实现的.

在pipeline里无法实现race原语:
在"每一个子流程都出错/中断"的情况下, 没有一个子流程能够完成提交, 因为这些子流程根本不知道其他子流程的运行情况.

race的一种不完美的替代实现是将allSettled的结果加上时间戳, 然后只处理时间戳最早的结果.

  • 通过编写对应格式的消息, 任务可以从工作流程的中间开始运行.
  • 中间状态会被尽快垃圾回收.
  • 运行效率高, 资源浪费少.
  • 子过程对它的上游和下游尽可能保持无知, 因此需要人工保证输入输出的类型安全.
  • 难以取消正在运行的过程.
  • 无法与外部世界通过事件交互.

编排工作流的最低级形式, 表达力最差, 所有编排工作流都可以转换为此类数据结构.

  • 类似于游戏AI的开发情况, 状态机和状态图很快会变得难以维护.

一种用生成器编排工作流程的模式.

用生成器描述工作流程, 任务在编排器的推进下按设定好的流程执行.
子过程产生的中间状态会以事件溯源的形式保存进事件存储(Event Store), 配合生成器可以实现工作流程的状态重建, 从而能够从中断处恢复.

除了使用生成器描述工作流程外, 底层会有几种不同的实现方式:

  • 尽可能不中断任务的模式:
    启动的任务会一直存在于内存里, 总是在同步等待它所需的事件, 除非发生故障重启, 否则直到任务结束前都不会被回收.
    这种模式下, 无谓的计算资源占用最小, 内存资源占用最大.
  • 尽可能中断任务的模式:
    启动的任务一旦开始等待事件, 就立即将内存中的任务回收, 将被等待的事件添加到事件中心(Event Hub)里.
    当事件中心收到对应事件时, 就根据事件存储重建任务, 继续运行直到遇到下一个等待事件或任务运行完成.
    这种模式下, 内存资源占用最小, 无谓的计算资源占用最大.
    这种模式通常会更有吸引力, 因为无谓的计算资源占总计算资源的比例在任务数量较少的情况下通常会被认为小到可以忽略.
    然而, 如果编排器需要验证每一个输入和输出对(以验证工作流程够不够纯), 深匹配算法会立即成为工作流程中的减速器之一.

具体代码表示形式可参考Azure Durable Functions和Cadence.
甚至也可以参考redux-saga, 因为它本质上也是用生成器编写的可执行状态图.

需要支持 all, allSettled, race 三种原语, 其中 race 原语使超时成为可能.
和协同一样, 这三种原语除了allSettled以外都不能真正还原其语义(只能忽略不需要的结果, 不能取消子过程).

workflow(function* () {
const [resultA, resultB] = yield allSettled([
call('fetch', urlA) // 调用子过程
, call('fetch', urlB) // 调用子过程
])
})

和协同方案不同, 编排不需要任何消息队列就可以完成扇入, 因为每个子过程都会将结果写入event store.
每个子过程在写入后都会查询扇入情况以判断是否应该唤醒workflow.

workflow(function* () {
yield waitForExternalEvent('UserInput')
console.log(Date.now())
})

向外部事件总线注册一个回调, 在调用的时间点后出现UserInput事件时,
就会在Event Store的指定位置写入一个事件, 然后唤醒工作流程.

计算运行本质上是一种带参数的"等待外部事件".

workflow(function* () {
yield schedule('2222-2-22 22:22:22') // 于指定时间之后唤醒
console.log(Date.now())
})

实现此功能需要消息队列支持让消息在指定时间之后才能出列.

本质上是计划运行的一种, 只是表现形式不同.

workflow(function* () {
console.log(Date.now())
yield delay(1000 * 60) // 60秒后唤醒
console.log(Date.now())
})
workflow(function* () {
// ...
yield continueAsNew(params)
})

事件溯源无法存储无限数量的事件, 在超长事件序列的工作流程里, 重建工作流程的性能表现也很差.
为解决性能问题, 允许用continueAsNew指令从头开始继续运行workflow, 但清空它已有的所有事件序列.

  • 表现力接近于同步代码, 一个工作流程可以相对较快地从一个非分布式的编排式项目转换而来.
  • 取消正在运行的过程相对容易.
  • 可以通过catch块来捕捉多个yield的错误, 以执行相同的回滚操作.
  • 在运行中途可以与外部世界通过事件交互.
  • 可以保留下中间数据用作其他用途.
  • 由于生成器工作流程是从头到尾执行的, 任务不可能真正从中途开始运行, 不能创建从中间状态起始的任务.
    尽管在技术层面上, 的确可以模拟出需要插入的中间状态之前的事件, 或者让编排器能够针对某一特定类型的输入进行跳过.
    但这终归是一种hack, 背离了使用这种模式的初衷.
  • 由于中间状态是以事件方式存储的, 变更工作流程时, 原有的事件序列很可能会因为无法适应新的工作流程而报废.
  • 运行效率较低.
  • 产生很多非必要I/O流量.
  • 由于事件溯源对结果的顺序要求, 工作流内的多线程代码无法抢占式运行.
    举例来说, 如果代码想要以生产者消费者模式运行, 则消费者返回内容时:
    • 如果是无序返回, 则每次运行的结果顺序会不同.
    • 如果是有序返回, 则会强制要求分块, 这导致每次只有组内最后一个结果返回后, 下一组任务才能运行.
      为了不浪费性能, 需要设置一个为分块大小数倍的缓冲区用来保存尚未整组完成的返回结果.
      无论如何, 这些为了满足顺序的方案都会 对代码的可读性造成严重的负面影响, 违背使用这种模式的初衷.
      作为参考, 在Azure Durable Functions里, 直接限制只能运行单线程代码.
  • 生成器的语法形式通常不适合函数式编程, 因为 yield 运算符只能在生成器这一层使用.
    一种替代方案是使用 yield*, 这要求将相关的所有函数改写为支持生成器的形式, 显然会造成很大的改变, 因此很不可取.

不会, 因为新事件在写入Event Store时会指定index.

举例来说, 作为调度器的消息队列认为工作流程A没有及时调用子过程A来完成任务, 将消息重置回waiting状态.
工作流程B从调度器拉取被重置的任务, 从工作流程A的中断处继续运行, 调用子过程A.
然而, 被认为没有及时完成任务的工作流程A实际上仍在工作, 此时子过程A执行完毕, 将新的事件附加给了Event Store, 并继续接下来的流程.
接着, 工作流程B的子过程A执行完毕, 试图将新的事件附加给Event Store的指定index, 但发现指定index已经有事件写入了,
于是它发现自己是多余的, 丛而自主结束运行.

不会, 因为每个子过程在开始时会检查相应索引位置是否已经存在事件.

唯一会造成子过程重复运行的情况是作为调度器的消息队列在连续多次重置了消息状态,
但每次重置启动的重复的工作流程都没有来得及完成下一阶段的子任务.

之所以会出现这种情况, 很可能是因为调度器的消息队列有一个过短的超时时间,
或子过程的运行时长到了不合理的程度(比如等待用户交互, 但用户长时间处于离线状态).

在子过程开始时, 写入一个started事件, 该事件记录子过程预期的最晚结束时间戳.
如果到了预期的最晚结束时间, 子过程仍然没有完成, 则该子过程应该停止自己(或至少不会继续向Event Store写入事件),
期望下一个工作流程重新开始此子过程.

重复运行的工作流程在发现started事件时, 会通过对比当前时间来决定是启动子过程还是结束自身运行.

支持async/await的语言可以用async/await替代生成器, 因为基于生成器的编排器本质上只要满足几个条件, 而这些条件async/await都满足.

一般来说, 使用async/await还有一些额外的好处:

  • 大部分第三方库支持async/await, 而不支持Generator, 因此async/await更有用.
  • 由于async/await底层的Future/Promise类型是一种无阻塞数据结构, 使用起来会比强制阻塞的Generator更灵活.
    例如可以通过并发调用来返回无序结果, 因此也能更好地支持多线程.
  • 实现起来比生成器容易.