Ray论文

[Ray]Ray: A Distributed Framework for Emerging AI Applications

image-20230329135318199

image-20230329140956911

与supervised learning不同的是,training和serving可以由不同的系统分别处理,而在RL中,所有的training, serving, simulation这三个workloads都紧密地耦合在一个应用程序中,它们之间有严格的延迟要求。 目前没有框架支持这样的耦合。

因此,框架需要满足以下要求:

  1. Fine-grained, heterogeneous computations. 计算持续时间范围 从毫秒(如 taking an action)到小时量级(如 training a complex policy)。此外,训练经常要求异质的硬件(如 CPUs,GPUs或者TPUs)。
  2. Flexible computation model. RL应用要求 无状态stateless和有状态的stateful 计算。无状态计算可以在系统的任何节点上执行,这使得在需要时很容易实现负载平衡和将计算转移到数据上。因此,无状态计算很适合细粒度的模拟和数据处理,如从图像或视频中提取特征。相比之下,有状态的计算很适合于实现参数服务器,在GPU支持的数据上进行重复计算,或者运行第三方的 仿真器,这些仿真器不暴露它们的状态。
  3. Dynamic execution. RL应用的几个组成部分需要动态执行,因为计算完成的顺序并不总是事先知道的(如 仿真完成的顺序),而一个计算的结果可以决定未来的计算(如 仿真的结果决定我们是否需要进行更多的模拟)。

需要一个框架:

  • 支持异构的、动态的计算图;
  • 每秒处理百万级任务数且只有毫秒级的延迟
  • 批量同步并行系统bulk-synchronous parallel system

编程模型和计算模型

Ray提供了actor 和 task-parallel两重编程抽象。不像CIEL,只提供任务并行抽象,或者Orleans和Akka,只提供了actor抽象。

编程模型

  • Tasks。一个任务task表示在无状态工作器上执行一个远程函数。当一个带有@remote注解的remote函数被调用,会立即返回一个future对象,然后调用ray.get()即可得到结果。

    比如说

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    import time
    import ray
    
    ray.init()
    
    @ray.remote
    def f():
        time.sleep(1)
        return 1
    
    # Execute f in parallel.
    object_ids = [f.remote() for i in range(4)]
    results = ray.get(object_ids)
    

    f.remote()调用运行时,将返回一个future对象,存储在object_ids中,调用ray.get(object_ids)可以得到返回值的列表。

    remote函数操作的数据是不可变的对象,remote函数是无状态,无副作用的,使得幂等性得以保证,从而简化了容错的处理。

  • Actors。一个角色actor代表一个有状态的stateful计算。每个角色都暴露了可以被远程调用的方法,并被连续执行。一个方法的执行类似于一个任务task,因为它是远程执行并返回一个未来,但不同的是它是在一个有状态的worker上执行的。一个actor的句柄handle可以传递给其他角色或任务,使他们有可能调用该角色的方法。

    image-20230329163905360

    Tasks vs. actors tradeoffs

    image-20230329163811643

    Ray API

    为了满足上一节提到的heterogeneityflexibility需求,通过三种方程增强API的功能:

    1. 处理不同时长的并发任务,采用ray.wait(),等待前k个可用结果,而不是像ray.get()等待所有结果;
    2. 处理资源异质的任务,让开发者指定资源需求以使Ray scheduler能够高效管理资源;
    3. 提高灵活性flexibility,允许nested remote functions,也就是remote functions能够调用其他remote functions。这也是实现高可用的重要部分,因为能够让多个进程以分布式方式调用remote functions。

计算模型

一个计算图中有两类节点nodes:(1)data objects,(2)remote function invocations or tasks。

两类边edges:(1)data edges;(2)control edges。前者捕捉data objects与tasks之间的依赖(如 data object $D$是任务$T$的输出,就添加一条从$T$到$D$的data edge)。后者捕捉嵌套remote functions得到的计算依赖(如任务$T_1$调用任务$T_2$,就添加一条从$T_1$到$T_2$的control edges)。

为了捕获同一actor的后续方法调用中的状态依赖性,添加了第三种类型的边:stateful edge。(如 在相同actor上,方法$M_j$正好在方法$M_i$之后调用,就添加一条从$M_i$到$M_j$的stateful edge)。

有状态边有两种好处,

  • 一是使得我们可以把actor嵌入到无状态的任务图中,因为actor维护着自己的一个调用链和状态。
  • 二是,有状态边可以用来维护世系(lineage)信息,世系可以帮助Ray重建数据对象,包括remote function创建的或是actor method创建的。

image-20230329135402069

架构

Ray架构由(1)实现API的应用层,(2)提供高可扩展性和容错性的系统层 组成。

image-20230329144117190

应用层

应用层由三类进程process组成:

  1. Driver:执行用户程序的进程;
  2. Worker: 被driver或另一个worker调用的 执行任务(remote functions)的无状态进程。workers被自动启动,并由系统层分配任务。当一个remote function被声明时,该函数会自动发布给所有workers。worker以串行方式执行任务,在不同的任务中不保留本地状态。
  3. Actor: 一个有状态的进程,当被调用时,只执行它所暴露的方法。与 Worker 不同,Actor 是由 Worker 或 Driver显示实例化的。与Workers一样,Actor也是以串行方式执行方法,只是每个方法都依赖于前一个方法的执行所产生的状态。

系统层

系统层由三个主要部分组成:(1)global control store;(2)distributed scheduler;(3)distributed object store。所有成分都是横向可扩展的并具有容错性。

Global Control Store (GCS)

全局控制状态(GCS)是一个中心化的存储,使用Redis实现,负责存储task描述,remote方法代码,计算图,对象的位置,以及每个调度事件。 将所用控制状态集中存储,有这么几个好处: 1)使得集群中其他组件、节点可以无状态。这样的话可以大大提升水平扩展的能力。 2)简化了容错。 3)方便对程序debug,profile。

当然,集中式存储带来的隐忧就是瓶颈问题,基于Redis,可以做分片(sharding)来缓解单点瓶颈。并且为每个分片提供热副本(hot replica)【是什么?】来容错。 鉴于在Ray中,每个task, object, method等都有一个唯一的伪随机ID与其一一对应,使得分片间能够更好地负载均衡。

Bottom-Up Distributed Scheduler

image-20230329150629299

分布式自底向上调度器(distributed bottom-up scheduler),负责任务的调度。Ray启动时,会启动一个全局的调度器global scheduler,实现上是一个redis server,会在每个从节点上启动一个local scheduler,每个节点上提交的task率先提交给各自的local scheduler进行调度,如果local scheduler调度不了,则上推给global scheduler,进行集群全局范围的调度。 Ray这样设计的目标是调度可扩展性以及快速,低延迟的调度。这样设计带来的好处就是Ray调度任务非常之快。

那么Ray这样设计的灵感来自于哪里呢?

通常的分布式集群计算框架实现的都是中心化,集中式的调度,如Hadoop, Spark, CIEL, Dryad等。这种方式虽然简化了设计,但是扩展性不好

提升调度扩展性有几种方式: 1)批量调度。调度器批量提交任务给worker节点,以摊销提交任务带来的固定开销。Drizzle框架实现的就是这种。 2)层次调度。即全局调度器(global scheduler)将任务图划分到各个节点的本地调度器(local scheduler)。Canary框架实现了这种调度。 3)并行调度。多个全局调度器同时进行任务调度。这是Sparrow框架所做的。

但是他们都有各自的缺陷。 批量调度仍然需要一个全局调度器来处理所有任务。 层次调度假设任务图是已知的,即假设任务图是静态的。 并行调度假设每个全局调度器调度独立的作业。

Ray希望做到的是高可扩展性,处理动态任务图,并且可能处理来自同一个作业的任务。

Ray的自底向上调度器类似层次调度,不同的是,一个节点生成的task首先提交到各自的local scheduler,由local scheduler进行调度。除非本地节点过载了,或者本地节点不能满足task的资源需求,或者task的输入不在本地节点等因素出现,否则本地节点可以完成调度。

其实最后一条,task的输入不在本地,仍有方式在本地调度,因为object store可以实现快速的数据对象转运。

看到这里,读者可能想问,Ray的local scheduler是怎样判断是否该调度到本地节点还是上推到global scheduler呢?

Ray是这样处理的,local scheduler会维护一个任务队列(task queue),每次调度任务时它会检查当前任务队列的长度,如果超过一定的阈值,那么认为本机过载了,不在调度当前任务而是将其转给上层全局调度器。

  • 这个阈值为0,则调度为集中式的调度,全靠global scheduler负责。
  • 这个阈值为无穷大,则调度为去中心化的分布式调度,所有任务都有本地节点负责。

在框架设计上,local scheduler每隔一段时间会发送心跳包给GCS,注意不是直接发送给global scheduler,心跳包中会包含local scheduler的负载信息,GCS收到以后记录此信息,转发给global scheduler。 当收到local scheduler转发来的任务时,global scheduler使用最新的负载信息,以及人物的输入数据对象的位置和大小,来决定将task分发到哪个节点去运行。

如果global scheduler成为了瓶颈,那么采用多个副本,local scheduler随机选择一个global scheduler去转发任务。

In-Memory Distributed Object Store

remote函数(task)的输入和输出数据都存储在Distributed Object Store中。 调用一个函数时,其输入首先被隐式执行ray.put,存储object store中,其输出也会被放入object store中,函数返回一个object id为该输出数据的唯一id,调用ray.get(object_id)才可获得任务返回的实际数据。

同一个节点上使用**共享内存(shared memory)来实现object store,这使得两个不同的worker进程或者driver和worker之间能够零拷贝(zero copy)**地访问共同的数据。具体实现采用了Apache Arrow中的plasma storeApache Arrow是一种跨平台的内存数据交换格式。可参看介绍,等。

如果一个task,其输入不在本地,那么object store会把数据从所在地拷贝到本地,这样可以避免热数据带来的瓶颈,同时可以加快程序执行速度,因为直接在本地内存中操作。当然,数据的传送开销也是免不了的。这也将提升计算受限工作流的吞吐量,计算受限是许多AI应用共有的特征。

为了简化系统设计,简化容错,Ray像其它内存计算系统如Spark, Dryad一样,其操作的数据是不可变的(immutable),即存储在object store中的数据不能改变,如RDD一般。

Ray同时也不支持分布式对象,如分布式大矩阵,当然可以通过在应用层自行实现(通过一系列的futures)。

Implementation

https://github.com/ray-project/ray

end-to-end example(a+b, return c)

图(a)中,节点1(N1)向GCS注册了函数add(),N1调用,在N2执行。

image-20230329153550816

图(b)中,N1用ray.get()得到了add()的结果。c的Object Table在第四步被创建,并在步骤6中(被复制到N1后)被更新。

image-20230329153608901

原文参见:http://whatbeg.com/2018/03/15/ray-paper.html

对象重建

像Spark,CIEL一样,Ray根据世系来重建对象。除此之外,Ray还支持有状态操作算子(Actor)的重建。

要注意的是,如果世系中包含有状态边,则可能涉及到Actor的重新初始化,并且可能涉及到很长一条世系链的重放。对于某些应用,Ray为了减少Actor的重建时间,加入了checkpoint机制,从checkpoint进行恢复。

为了保证低延迟,如果plasma store满了以后,会通过LRU机制剔除陈旧的对象到磁盘。【到磁盘哪里?】 在Ray中,plasma store即object store挂载在Linux属于tmpfs文件系统的/dev/shm目录上,该目录不在磁盘上,而在内存里。已经存入/dev/shm中的对象不能被其他应用程序使用,但是/dev/shm没被使用的部分可被其他应用程序使用。 /dev/shm区域的大小一般是总内存的1/2。即如果你的机器是64G内存,那么此区域只有32G可用,Ray会再保留一点余量,大概用于object store的空间只有26-27GB左右,也就是说,如果Ray处理的数据过大,大到object store都装不下或者在运行时需要频繁evict甚至evict掉其他必要数据的时候,往往程序就会崩溃。

相关工作

下表对相关工作与Ray的相似点与不同点做一总结。

比较的系统框架 相似点 不同点
动态任务图 CIEL 动态任务图机制,futures抽象,通过世系容错 Ray还提供Actor抽象,并实现了全局控制面板,自底向上调度器和采用了内存对象存储而不是文件存储,扩展到Python语言
Dask 支持动态任务图,wait原语,futures抽象,Python语言 Dask是中心化调度方式,不提供actor抽象,不提供容错
数据流系统 Hadoop/Spark Hadoop/Spark计算模型更加限定,实现了BSP执行模型,假设同一阶段的task执行同样的计算,并有着相似的执行时间。Ray还提供Actor抽象,并实现了全局控制面板和调度器。
Dryad Dryad放松了Spark的假设,但是也没有实现动态计算图。Ray还提供Actor抽象,并实现了全局控制面板和调度器。
Naiad 对于一些工作流提升了可扩展性。 只支持静态计算图。
Actor系统 Orleans 提供虚拟actor-based抽象。 Orleans可以使actor的多个实例同时运行。需要显式checkpoint,提供at-least-once语义。而Ray提供exactly-once语义。[注]
Erlang Actor抽象。 需显式处理容错。Erlang的全局状态存储不适合共享大对象。
C++ Actor Framework Actor抽象。 需显式处理容错。不支持数据共享。
全局控制状态和调度 SDN 全局控制状态 Ray解耦了状态信息存储和逻辑实现(调度器),存储与计算可以独立扩展
Distributed File System,如GFS 全局控制状态 Ray解耦了状态信息存储和逻辑实现(调度器),存储与计算可以独立扩展
Distributed Frameworks (如MapReduce, BOOM) 全局控制状态 与BOOM相比,Ray解耦了状态信息存储和逻辑实现(调度器),存储与计算可以独立扩展
Resource Management (Omega) 全局共享状态 Ray增加了两层调度器来均衡负载,面向毫秒级别的任务调度
Sparrow 去中心化调度 各调度器自行其是,独立决策
Mesos 两级层次调度 顶层调度器可能成为瓶颈
Canary 去中心化调度 Canary每个调度器负责计算图的一部分,但不支持动态计算图
机器学习框架 Tensorflow 面向深度学习,对CPU、GPU资源利用很好,不支持更通用的计算工作流,Tensorflow Fold提供对动态计算图的一些支持,但仍无法全部支持执行时对任务执行进度,任务完成时间和错误的响应而编辑DAG图。支持底层消息传输和同步原语,但是过于底层使得用起来有点像MPI。
MXNet 同Tensorflow
OpenMPI 高性能 很难编程,需要显式处理异构和动态任务图,需要程序员显式处理容错

[注] 实时计算/流式计算的三种语义: At-most-once:每条记录最多只能被处理一次 At-least-once:每条记录最少要被处理一次 Exactly-once:每条记录有且仅被处理一次

References