前言
Ray 是一个高性能的分布式执行引擎,开源的人工智能框架,目标之一在于:让开发者可以用一个运行在笔记本电脑上的原型算法,仅需添加数行代码就能轻松转为适合于计算机集群运行的(或单个多核心计算机的)高性能分布式应用.这样的框架需要包含手动优化系统的性能优势,同时又不需要用户关心那些调度、数据传输和硬件错误等问题.(摘自oschina-Ray 分布式执行引擎)
本文分析基于 Ray 官方 1.0.0-rc2 版本。
一、过程分析
1.1 python 部分
1.1.1 scripts.py
Ray 中,命令行启动的起点对应的是 python/ray/scripts/scripts.py
.该脚本中提供了对所有 ray 命令的执行入口.
以最核心的 ray start
进行分析. 该命令对应脚本中的 358 行 start
函数.
- 通过
@click.option
添加对各种命令信息的支持,经过处理以后,将数据通过ray_params = ray.parameter.RayParams
构建为 ray 内部通用的参数,然后判断该命令是否为头节点启动命令. - 如果为头节点,则第一步对参数信息进行相关校验,重点逻辑为:头节点只允许自行创建 redis 服务,不允许设置 redis_address.校验后将信息和 autoscaling_config(用于自动扩容相关配置,指向自动扩容相关 yaml 文件的路径)更新到 ray_params 中.参数更新完毕以后,第二步通过
node = ray.node.Node(ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block)
启动头节点.第三步将结果输出到命令行中. - 如果不为头节点,则第一步对参数信息进行相关校验,然后等待连接 redis 服务并创建客户端(该步中创建客户端仅为校验同节点是否启动多个 worker).第二步通过
ray.node.Node(ray_params, head=False, shutdown_at_exit=block, spawn_reaper=block)
启动工作节点.第三步将结果输出到命令行中. - 随后如果命令行中带有
--block
参数,继续等待命令输入.
1.1.2 node.py
start
函数所有节点最后都利用 ray.node.Node
进行启动,该脚本位于 python/ray/node.py
. 构建函数在 44 行 __init__
中.
- 首先从
ray_params
获取配置,并进行相关校验,配置 plasma_store. - 若为 head 节点,则调用
self.start_head_processes()
启动进程,然后创建redis_client
并存储 head 节点session_name
,session_dir
,temp_dir
等信息. - 调用
self.start_ray_processes()
启动进程.
start_head_processes
函数在 python/ray/node.py
的 795 行,该函数内部通过 self.start_redis()
,self.start_gcs_server()
,self.start_monitor()
,self.start_dashboard(require_dashboard)
四步启动对应进程.
self.start_redis()
最终调用 ray.services.start_redis
启动 redis.
self.start_gcs_server()
最终调用 ray.services.start_gcs_server
启动 GCS_SERVER.
self.start_monitor()
最终调用 ray.services.start_monitor
启动监控进程,该进程同时负责自动扩容工作.
self.start_dashboard(require_dashboard)
最终调用 ray.services.start_dashboard
启动监控面板.
start_ray_processes
函数在 python/ray/node.py
的 812 行,该函数内部通过 self.start_plasma_store(plasma_directory, object_store_memory)
,self.start_raylet(plasma_directory, object_store_memory)
,可选 self.start_reporter()
,可选 self.start_log_monitor()
四步启动对应进程.
self.start_plasma_store(plasma_directory, object_store_memory)
最终调用 ray.services.start_plasma_store
启动内存共享服务.
self.start_raylet(plasma_directory, object_store_memory)
最终调用 ray.services.start_raylet
启动 raylet 进程.
1.1.3 services.py
所有启动命令,最后都通过 ray.services
相关命令启动,该类函数均位于 python/ray/services.py
中.启动分为三种,一种时直接调用 C++后端进行启动,这里包括 GCS,Raylet,plasma 等;一种是调用 python 脚本,这里包括 dashboard,monitor 等;一种是直接调用第三方组件启动,包括 redis 等.
三种启动除了执行路径不尽相同,其他过程大同小异.本文依托位于 1173 行的 start_gcs_server
函数继续分析.
- 第一步根据传入参数构建
command
,命令的第一个参数为对应可执行文件路径,大部分定义在了services.py
的开头,GCS_SERVER 相关的 GCS_SERVER_EXECUTABLE 路径为core/src/ray/gcs/gcs_server
;部分启动过程仍依靠相关 python 文件和第三方库,不涉及 c++,启动路径直接卸写在数中. - 第二步通过
start_ray_process
函数执行命令,该函数首先判断环境变量中是否还有对应配置并修改对应参数,最后通过ConsolePopen
执行启动命令.
1.1.4 monitor.py
self.start_monitor()
监控进程启动时所设置的文件路径为 python/ray/monitor.py
.
在 if __name__ == "__main__":
中,读取参数并构建 monitor,随后执行 run 方法启动监控进程.
在 __init__
函数中,具体执行过程如下:
-
第一步初始化状态存储(目前后端只有 redis 客户端).
1
2
3
4
5# Initialize the Redis clients.
ray.state.state._initialize_global_state(
redis_address, redis_password=redis_password)
self.redis = ray.services.create_redis_client(
redis_address, password=redis_password) -
第二步激活订阅服务,用于获取集群信息.
-
第三步判断是否传入自动扩容文件路径
autoscaling_config
,传入则额外配置标准自动扩容服务autoscaler = StandardAutoscaler(autoscaling_config, self.load_metrics)
在 _run(self)
函数中,具体执行过程如下:
-
订阅消息
1
2
3
4
5
6self.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN)
self.psubscribe(ray.gcs_utils.XRAY_JOB_PATTERN)
if self.autoscaler:
self.subscribe(
ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL) -
循环执行自动扩容命令和消息处理.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# Handle messages from the subscription channels.
while True:
# Process autoscaling actions
if self.autoscaler:
# Only used to update the load metrics for the autoscaler.
self.update_raylet_map()
self.autoscaler.update()
# Process a round of messages.
self.process_messages()
# Wait for a heartbeat interval before processing the next round of
# messages.
time.sleep(
ray._config.raylet_heartbeat_timeout_milliseconds() * 1e-3)
1.1.4.1 autoscaler.py
自动扩容脚本路径为 python/ray/autoscaler/autoscaler.py
. 该脚本在创建过程中首先会 通过self.reset(errors_fatal=True)
调用 node_provider
的 get_node_provider
创建 provider
句柄;然后会启动 node_launcher
,并将句柄 provider
和自动扩容队列 queue
以及其他信息传入其中,随后在 autoscaler.update()
的自动扩容过程中,就是正常的读取对比配置,然后调用 provider
移除节点,通过 launch_new_node
将需要启动的节点放到自动扩容队列中,由 node_launcher
周期执行扩容.
1.1.4.2 node_launcher.py
node_launcher
的路径为 python/ray/autoscaler/node_launcher.py
. 该脚本继承自 threading.Thread
会将自己注册为进程,周期性检查队列,并判断执行 self.provider.create_node(node_config, node_tags, count)
.
1.1.4.2 node_provider.py
node_provider.py
为抽象脚本,路径为 python/ray/autoscaler/node_provider.py
. get_node_provider
会根据传入 provider_config["type"]
的不同,构建不同的 importer.目前支持如下:
1 | NODE_PROVIDERS = { |
1.2 C++部分
ray 中,依托 C++代码创建的服务有 gcs,raylet 和 plasma.其中 plasma 早期继承在 ray 中,后期孵化为独立项目.
1.2.1 gcs_server
首先根据 python 文件中 GCS_SERVER_EXECUTABLE 配置搜寻编译工程的 BUILD.bazel
文件,得到如下配置.
1 | copy_to_workspace( |
然后根据配置中的 srcs 项找对应编译配置,如下.可以断定 c++部分源码入口为 src/ray/gcs/gcs_server/gcs_server_main.cc
.
1 | cc_binary( |
在对应的 gcs_server_main.cc
的 main 函数中,函数执行过程如下:
- 通过
gflags
工具对传入的参数进行预处理,读取参数并配置到config_map
中. - 通过
ray::stats::Init
启动监控进程 - 然后构建通信进程
boost::asio::io_service::work work(main_service)
- 紧跟着构建 GCS_SERVER 参数
gcs_server_config
并通过ray::gcs::GcsServer gcs_server(gcs_server_config, main_service)
创建 GCS_SERVER 并注册关闭句柄 - 通过
gcs_server.Start()
启动 gcs_server 并通过main_service.run()
保持通信服务工作.
在 src/ray/gcs/gcs_server/gcs_server.cc
中, void GcsServer::Start()
函数执行以下过程:
- 第一步通过
InitBackendClient()
初始化后端服务,目前仅有 redis; - 第二步在 redis 中注册订阅服务
std::make_shared<gcs::GcsPubSub>
和存储服务std::make_shared<gcs::RedisGcsTableStorage>
; - 第三步初始化 GCS 节点管理服务
InitGcsNodeManager()
和 GCS 分组服务InitGcsPlacementGroupManager()
; - 第四步注册各类 RPC 服务,用于其他进程和 GCS 通信.
1.2.2 raylet
同理分析,raylet 最终的源码入口路径为 src/ray/raylet/main.cc
的 main 函数,其执行过程如下:
- 通过
gflags
工具对传入的参数进行预处理. - 构建通信进程
boost::asio::io_service::work work(main_service)
- 构建
GcsClientOptions
并通过std::make_shared<ray::gcs::ServiceBasedGcsClient>(client_options)
启动 gcs_client. - 若为 head 节点,则首先通过
gcs_client->Nodes().AsyncSetInternalConfig(raylet_config)
将配置信息写入 gcs.raylet_config
配置来源为用户传入的config_list
. - 在异步读取过程中通过 raylet_config 初始化
RayConfig
;从RayConfig
中获取信息创建node_manager_config
和object_manager_config
;初始化监控ray::stats::Init(global_tags, metrics_agent_port)
和 raylet 节点server.reset(new ray::raylet::Raylet( main_service, raylet_socket_name, node_ip_address, redis_address,redis_port,redis_password, node_manager_config, object_manager_config, gcs_client, metrics_export_port))
;最后启动 rayletserver->Start()
- 注册 raylet 关闭句柄
signals.async_wait(handler)
启动通信进程main_service.run()
.
1.2.3 plasma_store
同理分析,raylet 最终的源码入口路径为 src/ray/plasma/store_exec.cc
的 main 函数核心调用如下
1 | plasma::plasma_store_runner.reset( |
在 src/ray/object_manager/plasma/store_runner.cc
的 Start()
函数中,直接调用了第三方组件 plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name))
创建存储环境.
二、总结
- 通过对 ray 的启动过程
scripts.py
446 行的分析,可以确定 ray 目前仅支持 redis 作为 gcs 后端并且必须由 ray 自行在 head 上启动,无法指定自己启动的 redis 集群. - ray 在启动 raylet 时,若为头节点,会首先将
config_list
数据直接存储在 redis 中,然后所有节点复用读取 redis 数据并创建 raylet 的操作,所有 head 节点的config_list
配置会同步到所有 worker 节点中. - ray 在 python 脚本阶段也与 redis 进行了交互,后续修改存储后端时,需要将 python 脚本相关文件也进行修改.
- ray 的自动扩容仅发生在传入
autoscaling_config
参数的条件下,该参数执指向一个 yaml 配置文件.相关配置模板在python/ray/autoscaler
中. - 通过
ray up
启动时,从 调用顺序python/ray/autoscaler/commands.py
,python/ray/autoscaler/updater.py
来看,会将自动扩容配置以ray_bootstrap_config.yaml
写入云服务器中.