引言
简介
OpenStack TaskFlow是一个用于编排和协调OpenStack任务的框架。它提供了一种简单的方式来定义和管理任务流程,以及处理任务之间的依赖关系。TaskFlow支持任务的状态管理、任务重试、任务失败处理、任务取消等功能,可以帮助开发人员更好地管理和控制任务的执行过程。TaskFlow整体架构如下:
TaskFlow 分为提供同步执行能力的基础部分(上图左半部分)和依赖外部支撑提供异步运行的可选部分(右半部分),目前 OpenStack 的组件主要用的还是基础部分的功能。上图右半部分可以用于编排运行在分布式集群环境的工作流,例如通过worker架构安装一个一主二从架构的mysql集群。
另外TaskFlow提供了一些Openstack用到的场景和代码用例,例如创建虚拟机,创建数据卷等。具体可参考:
docs.openstack.org/taskflow/la…
优势
Taskflow 以一个python依赖包的方式进行发布,方便其他python程序进行集成;TaskFlow内部很多模块都先定义抽象接口并后提供多种的实现,方便用户根据自己业务场景进行选择或者自定义扩展。
框架入口
框架入口是源码文件 taskflow.helpers.py 定义的 load() 和 run() 函数:
# 创建一个engine对象,engine类型通过参数 engine= 来指定
def load(flow, store=None, flow_detail=None, book=None,
backend=None, namespace=ENGINES_NAMESPACE,
engine=ENGINE_DEFAULT, **kwargs):
# 创建一个engine对象并执行engine,返回 flow的执行结果。engine类型通过参数 engine= 来指定
def run(flow, store=None, flow_detail=None, book=None,
backend=None, namespace=ENGINES_NAMESPACE,
engine=ENGINE_DEFAULT, **kwargs):
下面demo代码执行一个简单的taskflow:
class Task1(task.Task):
def execute(self, *args, **kwargs):
print("执行Task1 ......")
time.sleep(15)
return True
def revert(self, *args, **kwargs):
print("Task1 执行失败,开始回滚...")
class Task2(task.Task):
def execute(self, *args, **kwargs):
print("执行Task2...")
time.sleep(10)
return True
flow = gf.Flow("taskflow demo").add(*[Task1(), Task2()])
engine = taskflow.engines.load(flow)
engine.run()
核心概念
TaskFlow涉及较多的概念,本节内容将分别对它的的核心的念进行解说。
Atom
Atom是指一个原子操作,它是TaskFlow中的基本执行单元。Atom操作是不可分割的,要么全部执行成功,要么全部回滚。这意味着,如果在执行Atom操作时发生错误,TaskFlow会自动回滚到操作之前的状态,以确保数据的一致性和完整性。
Atom操作通常用于执行一些简单的任务,例如创建或删除一个资源,或者执行一些简单的计算。
Atom定义了与任务执行以及回滚相关的函数接口:
# 执行任务前调用
def pre_execute(self):
# 执行任务后调用
def post_execute(self):
# 任务执行的逻辑
def execute(self, *args, **kwargs):
# 回滚操作,execute函数执行失败时调用
def revert(self, *args, **kwargs):
# 回滚函数执行之后调用
def post_revert(self):
Task
Task继承了Atom:
class Task(atom.Atom, metaclass=abc.ABCMeta):
"""An abstraction that defines a potential piece of work.
This potential piece of work is expected to be able to contain
functionality that defines what can be executed to accomplish that work
as well as a way of defining what can be executed to reverted/undo that
same piece of work.
"""
每个Task都有一个唯一的名称和一组输入和输出。输入是Task执行所需的数据,输出是Task执行后产生的数据。Task可以依赖于其他Task的输出,这样就可以构建复杂的任务流。使用TaskFlow时可以通过继承Task类来创建具体的业务任务。从源码上看相比于Atom,Task多了一个 _notifier 属性,用于监听事件。当前已经定义的Event有:
TASK_EVENTS = (EVENT_UPDATE_PROGRESS,)
Retry
Retry 也继承了Atom:
class Retry(atom.Atom, metaclass=abc.ABCMeta):
"""A class that can decide how to resolve execution failures.
This abstract base class is used to inherit from and provide different
strategies that will be activated upon execution failures. Since a retry
object is an atom it may also provide :meth:`~taskflow.retry.Retry.execute`
and :meth:`~taskflow.retry.Retry.revert` methods to alter the inputs of
connected atoms (depending on the desired strategy to be used this can be
quite useful).
NOTE(harlowja): the :meth:`~taskflow.retry.Retry.execute` and
:meth:`~taskflow.retry.Retry.revert` and
:meth:`~taskflow.retry.Retry.on_failure` will automatically be given
a ``history`` parameter, which contains information about the past
decisions and outcomes that have occurred (if available).
"""
Flow
Flow表示一个任务流程,定义了一组任务及其之间的依赖关系,同时可以定义控制流程,例如循环、条件分支。Flow可以看作是一个有向无环图(DAG),其中节点表示任务,边表示任务之间的依赖关系。
Flow 模式(pattern)
- liner_flow : 线性模式,任务按照指定的顺序依次执行,每个任务的输出作为下一个任务的输入,这是最简单的模式。
- graph_flow: 图模式,任务与任务之间以有向无环图(DAG)的模式来组织。任务之间可以并行执行,一个任务之间可以传递数据。
Engine
engine是一个flow运行的入口和核心控制器。负责管理任务的状态、执行任务、处理任务之间的依赖关系以及处理任务执行过程中的异常情况。Taskflow内部很多的模块组件都是以插件的方式进行加载的,Engine也不例外,从源码的 setup.cfg 配置文件可知道目前提供以下几种实现:
taskflow.engines =
default = taskflow.engines.action_engine.engine:SerialActionEngine
serial = taskflow.engines.action_engine.engine:SerialActionEngine
parallel = taskflow.engines.action_engine.engine:ParallelActionEngine
worker-based = taskflow.engines.worker_based.engine:WorkerBasedActionEngine
workers = taskflow.engines.worker_based.engine:WorkerBasedActionEngine
SerialActionEngine
SerialActionEngine为默认的engine,它是一种串行执行引擎,可以按照指定的顺序依次执行任务。它内部有一个 _task_executor,负责Task的执行:
class SerialActionEngine(ActionEngine):
"""Engine that runs tasks in serial manner."""
def __init__(self, flow, flow_detail, backend, options):
super(SerialActionEngine, self).__init__(flow, flow_detail,
backend, options)
self._task_executor = executor.SerialTaskExecutor()
SerialTaskExecutor是一个串行执行器,可以在源码找到它的定义:
class SerialTaskExecutor(TaskExecutor):
"""Executes tasks one after another."""
def __init__(self):
self._executor = futurist.SynchronousExecutor()
def start(self):
self._executor.restart()
def stop(self):
self._executor.shutdown()
所以即使是Flow定义了多分支并行的Task,它们在engine内部也是串行执行的。
ParallelActionEngine
并行执行任务的engine,提供了多线程和多进程两种并行方式。
class ParallelActionEngine(ActionEngine):
def __init__(self, flow, flow_detail, backend, options):
super(ParallelActionEngine, self).__init__(flow, flow_detail,
backend, options)
# This ensures that any provided executor will be validated before
# we get to far in the compilation/execution pipeline...
self._task_executor = self._fetch_task_executor(self._options)
启动一个ParallelActionEngine时,通过指定option来选择_task_executor的类型,从而选择不同的任务并行运行方式:
========================= ===============================================
Type provided Executor used
========================= ===============================================
|cft|.ThreadPoolExecutor :class:`~.executor.ParallelThreadTaskExecutor`
|cfp|.ProcessPoolExecutor :class:`~.|pe|.ParallelProcessTaskExecutor`
|cf|._base.Executor :class:`~.executor.ParallelThreadTaskExecutor`
========================= ===============================================
# One of these types should match when a object (non-string) is provided
# for the 'executor' option.
#
# NOTE(harlowja): the reason we use the library/built-in futures is to
# allow for instances of that to be detected and handled correctly, instead
# of forcing everyone to use our derivatives (futurist or other)...
_executor_cls_matchers = [
_ExecutorTypeMatch((futures.ThreadPoolExecutor,),
executor.ParallelThreadTaskExecutor),
_ExecutorTypeMatch((futures.ProcessPoolExecutor,),
process_executor.ParallelProcessTaskExecutor),
_ExecutorTypeMatch((futures.Executor,),
executor.ParallelThreadTaskExecutor),
]
# One of these should match when a string/text is provided for the
# 'executor' option (a mixed case equivalent is allowed since the match
# will be lower-cased before checking).
_executor_str_matchers = [
# 多进程并发模型,每个task都有自己独立的python解析器
_ExecutorTextMatch(frozenset(['processes', 'process']),
process_executor.ParallelProcessTaskExecutor),
# 多线程并发模型
_ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']),
executor.ParallelThreadTaskExecutor),
# 基于协程并发模型的executor,实现单线程的并发执行
_ExecutorTextMatch(frozenset(['greenthread', 'greenthreads',
'greenthreaded']),
executor.ParallelGreenThreadTaskExecutor),
]
# Used when no executor is provided (either a string or object)...
_default_executor_cls = executor.ParallelThreadTaskExecutor
WorkerBasedActionEngine
用于分布式执行环境中, 使用WorkerBaseActionEngine执行后,执行模型变成以下形式:
server: 负责接收网络请求;将Request转换为一个work并通过work生成一个endpoint。Server定义了两个路由:
pr.NOTIFY | 处理任务状态变化 |
---|---|
pr.REQUEST | 处理执行任务请求 |
参考源码文件 taskflow.engines.worker_based.server.py
class Server(object):
"""Server implementation that waits for incoming tasks requests."""
def __init__(self, topic, exchange, executor, endpoints,
url=None, transport=None, transport_options=None,
retry_options=None):
# 定义两个路由
type_handlers = {
pr.NOTIFY: dispatcher.Handler(
self._delayed_process(self._process_notify),
validator=functools.partial(pr.Notify.validate,
response=False)),
pr.REQUEST: dispatcher.Handler(
self._delayed_process(self._process_request),
validator=pr.Request.validate),
}
self._executor = executor
self._proxy = proxy.Proxy(topic, exchange,
type_handlers=type_handlers,
url=url, transport=transport,
transport_options=transport_options,
retry_options=retry_options)
self._topic = topic
self._endpoints = dict([(endpoint.name, endpoint)
for endpoint in endpoints])
endpoint:task在worker上的执行入口。通过调用Executor的接口来负责task的生成,执行和回滚:
lass Endpoint(object):
"""Represents a single task with execute/revert methods."""
... 省略部分代码
def generate(self, name=None):
# NOTE(skudriashev): Note that task is created here with the `name`
# argument passed to its constructor. This will be a problem when
# task's constructor requires any other arguments.
return self._task_cls(name=name)
def execute(self, task, **kwargs):
event, result = self._executor.execute_task(task, **kwargs).result()
return result
def revert(self, task, **kwargs):
event, result = self._executor.revert_task(task, **kwargs).result()
return result
Executor: 真正执行任务的资源池,与另外两种模式(SeriesActionEngine和ParallelActionEngine)的Executor是一样的角色,只是它部署在了Worker进程中,默认实现是WorkerTaskExecutor。
Compile
编译是taskflow启动engine执行Flow前的一个关键步骤,它将用户定义的Flow转换为引擎内部可识别的执行图 _execute_graph,execute_graph 的底层存储直接复用了开源项目 networkx 对图结构的操作能力。编译的过程中会产生一些辅助对象(help objects)并存储到engine对象中去,help object用于辅助后面engine执行task任务,分析Flow以及自己一些内部的活动。
compile() 函数执行后,会将 graph以及第一个node封装到自己的变量 _compilation 中去
@fasteners.locked
def compile(self):
"""Compiles the contained item into a compiled equivalent."""
if self._compilation is None:
self._pre_compile()
try:
# node : first node for current graph
graph, node = self._compile(self._root, parent=None)
except Exception:
with excutils.save_and_reraise_exception():
# Always clear the history, to avoid retaining junk
# in memory that isn't needed to be in memory if
# compilation fails...
self._history.clear()
else:
self._post_compile(graph, node)
if self._freeze:
graph.freeze()
node.freeze()
self._compilation = Compilation(graph, node)
return self._compilation
Backend 和 Storage
Backend和Storage在TaskFlow架构中的位置如下:
TaskFlow提供了多种Backend实现,包括SQLAlchemy、Zookeeper、Redis等,如果需要高可用性和分布式支持,可以选择Zookeeper或Redis作为Backend;如果需要支持多种数据库,可以选择SQLAlchemy作为Backend。
在run一个flow或者load一个engine的时候可以指定一个backend,也就是说在一个应用中用户可以很方面选择不同的backend来存储TaskFlow的状态。
通过taskflow源码文件 setup.cfg 可以知道目前有以下几种实现:
taskflow.persistence =
dir = taskflow.persistence.backends.impl_dir:DirBackend
file = taskflow.persistence.backends.impl_dir:DirBackend
memory = taskflow.persistence.backends.impl_memory:MemoryBackend
mysql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
sqlite = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
zookeeper = taskflow.persistence.backends.impl_zookeeper:ZkBackend
Provides
Provides是指任务所提供的输出,即任务执行后产生的结果。Provides可以是一个值、一个对象或者一个状态,它们可以被其他任务所依赖和使用。
Provides的作用是让任务之间能够协同工作,提高任务执行的效率和可靠性。通过Provides,任务可以将自己的输出传递给其他任务,从而实现任务之间的协作。provides 在任务之间传递消息的使用方式如下:
核心流程
TaskFlow的核心流程分为3步:定义Flow --> 组件初始化 --> 执行Flow。下面分别细讲这三个步骤。
定义Flow
直接使用代码定义Flow对象,下面是定义一个flow的示例:
class InitEnvTask(task.Task):
"""定义环境初始化任务"""
def __init__(self, ip_addr):
super(InitEnvTask, self).__init__(name="InitEnvTask")
self.ip_addr = ip_addr
def execute(self, *args, **kwargs):
print("对机器 %s 做环境初始化操作。" % self.ip_addr)
time.sleep(15)
return True
def revert(self, *args, **kwargs):
print("执行失败,开始回滚...")
class InstallSoftwareTask(task.Task):
"""定义软件安装任务:"""
def __init__(self, ip_addr, sorftware):
super(InstallSoftwareTask, self).__init__(name="InstallSoftwareTask")
self.ip_addr = ip_addr
def execute(self, *args, **kwargs):
print("开始在机器 %s 上安装软件 %s ." % (self.ip_addr, sorftware))
time.sleep(10)
return True
def gen_install_taskflow(ip_addr, flow_name=None):
""""生成一个Flow:
|---> 安装mysql ---|
初始化环境--->| |---> 安装springboot app应用
|---> 安装redis ---|
"""
init_env_job = InitEnvTask(ip_addr)
install_mysql_job = InstallSoftwareTask(ip_addr, 'mysql')
install_redis_job = InstallSoftwareTask(ip_addr, 'redis')
install_springboot_app_job = InstallSoftwareTask(ip_addr, 'springboot-app')
if flow_name is None:
flow_name = 'workflow for springboot app installation'
# 加入所有的节点
flow = gf.Flow(flow_name)
.add(*[init_env_job, install_mysql_job, install_redis_job, install_springboot_app_job])
# 定义task依赖关系:
flow.link(u=init_env_job, v=install_mysql_job)
flow.link(u=init_env_job, v=install_redis_job)
flow.link(u=install_mysql_job, v=install_springboot_app_job)
flow.link(u=install_redis_job, v=install_springboot_app_job)
return flow
如下图所示:
环境初始化
调用load() 函数, 根据配置信息初始化storage,persistence,engine等组件并返回engine对象。
执行Flow
整个Flow的执行过程包括几个核心步骤: compile --> prepare --> validate --> build state machine --> start
compile 负责把Flow编译为内部可执行的ExecuteGraph;
prepare 负责将为准备好storage用于存储engine,flow以及task的状态数据;
build state machine 负责定义好Engine的状态机转换规则以及注册状态机变更钩子函数;
start 负责启动Engine状态机,开始执行任务,入口是 taskflow.engines.action_engine.engine.py 的 run_iter(self, timeout=None) : 函数:
def run_iter(self, timeout=None):
# 将用户定义的Flow转换为引擎内部使用的执行图 execute_graph
self.compile()
# 准备好storage用于存储engine,flow以及task的状态数据
self.prepare()
# 启动前做一次校验操作
self.validate()
... 省略部分代码 ...
with _start_stop(self._task_executor, self._retry_executor):
self._change_state(states.RUNNING)
.... 省略部分代码 ....
try:
closed = False
# 构建有限状态机,是驱动整个DAG调度的入口
machine, memory = self._runtime.builder.build(
self._statistics, timeout=timeout,
gather_statistics=self._gather_statistics)
# 使用状态机的FiniteRunner组件作为调度入口
r = runners.FiniteRunner(machine)
# 启动engine的有限状态机,开始执行调度DAG任务
for transition in r.run_iter(builder.START):
last_transitions.append(transition)
_prior_state, new_state = transition
... 省略部分代码 ....
... 省略部分代码 ...
finally:
if w is not None:
w.stop()
self._statistics['active_for'] = w.elapsed()
上面代码中,构建有限状态机是核心,源码文件 taskflow.engines.action_engine.builder 的 build(self, statistics, timeout=None, gather_statistics=True) 函数定义了engine的状态列表,状态转换规则以及状态发生变更时触发的钩子函数:
build(self, statistics, timeout=None, gather_statistics=True):
... 省略部分代码...
# FiniteMachine 是有限状态机库automaton的一种实现,另一种是HierarchicalFiniteMachine
m = machines.FiniteMachine()
# 定义所有的状态列表
m.add_state(GAME_OVER, **state_kwargs)
m.add_state(UNDEFINED, **state_kwargs)
m.add_state(st.ANALYZING, **state_kwargs)
m.add_state(st.RESUMING, **state_kwargs)
m.add_state(st.REVERTED, terminal=True, **state_kwargs)
m.add_state(st.SCHEDULING, **state_kwargs)
m.add_state(st.SUCCESS, terminal=True, **state_kwargs)
m.add_state(st.SUSPENDED, terminal=True, **state_kwargs)
m.add_state(st.WAITING, **state_kwargs)
m.add_state(st.FAILURE, terminal=True, **state_kwargs)
m.default_start_state = UNDEFINED
# 定义状态转换规则以及能够接受的事件类型。
m.add_transition(GAME_OVER, st.REVERTED, REVERTED)
m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS)
m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED)
m.add_transition(GAME_OVER, st.FAILURE, FAILED)
# START事件是整个Flow调度的入口事件
m.add_transition(UNDEFINED, st.RESUMING, START)
m.add_transition(st.ANALYZING, GAME_OVER, FINISH)
m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE)
m.add_transition(st.ANALYZING, st.WAITING, WAIT)
m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE)
m.add_transition(st.SCHEDULING, st.WAITING, WAIT)
m.add_transition(st.WAITING, st.ANALYZING, ANALYZE)
# 注册状态发生变更时(接收到的事件)需要调用钩子函数
m.add_reaction(GAME_OVER, FINISH, game_over)
m.add_reaction(st.ANALYZING, ANALYZE, analyze)
m.add_reaction(st.RESUMING, START, resume)
# 钩子函数schedule是启动整个Flow调度的入口,选择第一批次task来执行,并驱动整个Flow向后执行
m.add_reaction(st.SCHEDULING, SCHEDULE, schedule)
m.add_reaction(st.WAITING, WAIT, wait)
m.freeze()
return (m, memory)
整个DAG的驱动通过automaton的有限状态机来实现,而启动则从函数 run_iter(self, timeout=None) 开始,最终整个engine的状态转移过程可总结为下图:
参考:docs.openstack.org/taskflow/la…
整个执行过程各种组价之间的调用关系,官方提供的图解如下:
状态机 automaton
本节内容从 telnetning-notebook.readthedocs.io/zh/latest/p… 处引用,由于taskflow的DAG的执行是依赖automaton来驱动的,因此automaton是理解taskflow内部执行调度的基础,下面内容直接从 链接地址拷贝而来。
automaton 是一个状态转换机库。OpenStack 的 taksflow 库就是基于 automaton 实现的。
在状态转换机中比较常见的是有限状态机
,理解起来就是定义一个系统的几个状态,给出各个状态之间装换的条件,然后状态加触发条件就决定了系统的下一个状态。我们还可以在状态转换的各个阶段插入自己想要实现的逻辑。
因此,一个有限状态机中可以提取出几个重要的元素:
- 状态列表,系统可能存在的状态
- 转换条件,状态转换的条件
- 钩子函数,状态转换前后调用函数
一个最简单的有限状态机定义如下:
from automaton import machines
m = machines.FiniteMachine() #FiniteMachine 有限状态机
m.add_state('up')
m.add_state('down')
m.add_transition('down', 'up', 'jump')
m.add_transition('up', 'down', 'fall')
m.defualt_start_state = 'down'
print(m.pformat())
# output:
# +-------+-------+------+----------+---------+
# | Start | Event | End | On Enter | On Exit |
# +-------+-------+------+----------+---------+
# | down | jump | up | . | . |
# | up | fall | down | . | . |
# +-------+-------+------+----------+---------+
当要使用状态机时,需要先初始化状态机,然后可以调用 process_event 来触发转换条件:
from automaton import machines
m = machines.FiniteMachine()
m.add_state('up')
m.add_state('down')
m.add_transition('down', 'up', 'jump')
m.add_transition('up', 'down', 'fall')
m.default_start_state = 'down'
m.initialize() # 初始化状态机
m.process_event('jump')
print(m.pformat())
# +---------+-------+------+----------+---------+
# | Start | Event | End | On Enter | On Exit |
# +---------+-------+------+----------+---------+
# | down[^] | jump | up | . | . |
# | @up | fall | down | . | . |
# +---------+-------+------+----------+---------+
print(m.current_state) # up
print(m.terminated) # False
m.process_event('fall')
print(m.pformat())
# +----------+-------+------+----------+---------+
# | Start | Event | End | On Enter | On Exit |
# +----------+-------+------+----------+---------+
# | @down[^] | jump | up | . | . |
# | up | fall | down | . | . |
# +----------+-------+------+----------+---------+
print(m.current_state) # down
print(m.terminated) # False