前言
Ray 是一个高性能的分布式执行引擎,开源的人工智能框架,目标之一在于:让开发者可以用一个运行在笔记本电脑上的原型算法,仅需添加数行代码就能轻松转为适合于计算机集群运行的(或单个多核心计算机的)高性能分布式应用。这样的框架需要包含手动优化系统的性能优势,同时又不需要用户关心那些调度、数据传输和硬件错误等问题.(摘自 oschina-Ray 分布式执行引擎)
本文分析基于 Ray 官方 1.0.0-rc2 版本。
一、过程分析
1.1 python 部分
1.1.1 scripts.py
Ray 中,命令行启动的起点对应的是 python/ray/scripts/scripts.py
.该脚本中提供了对所有 ray 命令的执行入口。
以最核心的 ray start
进行分析。该命令对应脚本中的 358 行 start
函数。
- 通过
@click.option
添加对各种命令信息的支持,经过处理以后,将数据通过ray_params = ray.parameter.RayParams
构建为 ray 内部通用的参数,然后判断该命令是否为头节点启动命令。 - 如果为头节点,则第一步对参数信息进行相关校验,重点逻辑为:头节点只允许自行创建 redis 服务,不允许设置 redis_address.校验后将信息和 autoscaling_config(用于自动扩容相关配置,指向自动扩容相关 yaml 文件的路径)更新到 ray_params 中。参数更新完毕以后,第二步通过
node = ray.node.Node(ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block)
启动头节点。第三步将结果输出到命令行中。 - 如果不为头节点,则第一步对参数信息进行相关校验,然后等待连接 redis 服务并创建客户端 (该步中创建客户端仅为校验同节点是否启动多个 worker).第二步通过
ray.node.Node(ray_params, head=False, shutdown_at_exit=block, spawn_reaper=block)
启动工作节点。第三步将结果输出到命令行中。 - 随后如果命令行中带有
--block
参数,继续等待命令输入。
1.1.2 node.py
start
函数所有节点最后都利用 ray.node.Node
进行启动,该脚本位于 python/ray/node.py
. 构建函数在 44 行 __init__
中。
- 首先从
ray_params
获取配置,并进行相关校验,配置 plasma_store. - 若为 head 节点,则调用
self.start_head_processes()
启动进程,然后创建redis_client
并存储 head 节点session_name
,session_dir
,temp_dir
等信息。 - 调用
self.start_ray_processes()
启动进程。
start_head_processes
函数在 python/ray/node.py
的 795 行,该函数内部通过 self.start_redis()
,self.start_gcs_server()
,self.start_monitor()
,self.start_dashboard(require_dashboard)
四步启动对应进程。
self.start_redis()
最终调用 ray.services.start_redis
启动 redis.
self.start_gcs_server()
最终调用 ray.services.start_gcs_server
启动 GCS_SERVER.
self.start_monitor()
最终调用 ray.services.start_monitor
启动监控进程,该进程同时负责自动扩容工作。
self.start_dashboard(require_dashboard)
最终调用 ray.services.start_dashboard
启动监控面板。
start_ray_processes
函数在 python/ray/node.py
的 812 行,该函数内部通过 self.start_plasma_store(plasma_directory, object_store_memory)
,self.start_raylet(plasma_directory, object_store_memory)
,可选 self.start_reporter()
,可选 self.start_log_monitor()
四步启动对应进程。
self.start_plasma_store(plasma_directory, object_store_memory)
最终调用 ray.services.start_plasma_store
启动内存共享服务。
self.start_raylet(plasma_directory, object_store_memory)
最终调用 ray.services.start_raylet
启动 raylet 进程。
1.1.3 services.py
所有启动命令,最后都通过 ray.services
相关命令启动,该类函数均位于 python/ray/services.py
中。启动分为三种,一种时直接调用 C++ 后端进行启动,这里包括 GCS,Raylet,plasma 等;一种是调用 python 脚本,这里包括 dashboard,monitor 等;一种是直接调用第三方组件启动,包括 redis 等。
三种启动除了执行路径不尽相同,其他过程大同小异。本文依托位于 1173 行的 start_gcs_server
函数继续分析。
- 第一步根据传入参数构建
command
,命令的第一个参数为对应可执行文件路径,大部分定义在了services.py
的开头,GCS_SERVER 相关的 GCS_SERVER_EXECUTABLE 路径为core/src/ray/gcs/gcs_server
;部分启动过程仍依靠相关 python 文件和第三方库,不涉及 c++,启动路径直接卸写在数中。 - 第二步通过
start_ray_process
函数执行命令,该函数首先判断环境变量中是否还有对应配置并修改对应参数,最后通过ConsolePopen
执行启动命令。
1.1.4 monitor.py
self.start_monitor()
监控进程启动时所设置的文件路径为 python/ray/monitor.py
.
在 if __name__ == "__main__":
中,读取参数并构建 monitor,随后执行 run 方法启动监控进程。
在 __init__
函数中,具体执行过程如下:
-
第一步初始化状态存储 (目前后端只有 redis 客户端).
python12345# Initialize the Redis clients. ray.state.state._initialize_global_state( redis_address, redis_password=redis_password) self.redis = ray.services.create_redis_client( redis_address, password=redis_password)
-
第二步激活订阅服务,用于获取集群信息。
-
第三步判断是否传入自动扩容文件路径
autoscaling_config
,传入则额外配置标准自动扩容服务autoscaler = StandardAutoscaler(autoscaling_config, self.load_metrics)
在 _run(self)
函数中,具体执行过程如下:
-
订阅消息
python123456self.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN) self.psubscribe(ray.gcs_utils.XRAY_JOB_PATTERN) if self.autoscaler: self.subscribe( ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
-
循环执行自动扩容命令和消息处理。
python123456789101112131415# Handle messages from the subscription channels. while True: # Process autoscaling actions if self.autoscaler: # Only used to update the load metrics for the autoscaler. self.update_raylet_map() self.autoscaler.update() # Process a round of messages. self.process_messages() # Wait for a heartbeat interval before processing the next round of # messages. time.sleep( ray._config.raylet_heartbeat_timeout_milliseconds() * 1e-3)
1.1.4.1 autoscaler.py
自动扩容脚本路径为 python/ray/autoscaler/autoscaler.py
. 该脚本在创建过程中首先会 通过self.reset(errors_fatal=True)
调用 node_provider
的 get_node_provider
创建 provider
句柄;然后会启动 node_launcher
,并将句柄 provider
和自动扩容队列 queue
以及其他信息传入其中,随后在 autoscaler.update()
的自动扩容过程中,就是正常的读取对比配置,然后调用 provider
移除节点,通过 launch_new_node
将需要启动的节点放到自动扩容队列中,由 node_launcher
周期执行扩容。
1.1.4.2 node_launcher.py
node_launcher
的路径为 python/ray/autoscaler/node_launcher.py
. 该脚本继承自 threading.Thread
会将自己注册为进程,周期性检查队列,并判断执行 self.provider.create_node(node_config, node_tags, count)
.
1.1.4.2 node_provider.py
node_provider.py
为抽象脚本,路径为 python/ray/autoscaler/node_provider.py
. get_node_provider
会根据传入 provider_config["type"]
的不同,构建不同的 importer.目前支持如下:
python123456789NODE_PROVIDERS = { "local": import_local, "aws": import_aws, "gcp": import_gcp, "azure": import_azure, "kubernetes": import_kubernetes, "docker": None, "external": import_external # Import an external module }
1.2 C++ 部分
ray 中,依托 C++ 代码创建的服务有 gcs,raylet 和 plasma.其中 plasma 早期继承在 ray 中,后期孵化为独立项目。
1.2.1 gcs_server
首先根据 python 文件中 GCS_SERVER_EXECUTABLE 配置搜寻编译工程的 BUILD.bazel
文件,得到如下配置。
bazel12345copy_to_workspace( name = "cp_gcs_server", srcs = [":gcs_server"], dstdir = "python/ray/core/src/ray/gcs", )
然后根据配置中的 srcs 项找对应编译配置,如下。可以断定 c++ 部分源码入口为 src/ray/gcs/gcs_server/gcs_server_main.cc
.
cc12345678910111213cc_binary( name = "gcs_server", srcs = [ "src/ray/gcs/gcs_server/gcs_server_main.cc", ], copts = COPTS, visibility = ["//java:__subpackages__"], deps = [ ":gcs_server_lib", ":stats_lib", "@com_github_gflags_gflags//:gflags", ], )
在对应的 gcs_server_main.cc
的 main 函数中,函数执行过程如下:
- 通过
gflags
工具对传入的参数进行预处理,读取参数并配置到config_map
中。 - 通过
ray::stats::Init
启动监控进程 - 然后构建通信进程
boost::asio::io_service::work work(main_service)
- 紧跟着构建 GCS_SERVER 参数
gcs_server_config
并通过ray::gcs::GcsServer gcs_server(gcs_server_config, main_service)
创建 GCS_SERVER 并注册关闭句柄 - 通过
gcs_server.Start()
启动 gcs_server 并通过main_service.run()
保持通信服务工作。
在 src/ray/gcs/gcs_server/gcs_server.cc
中,void GcsServer::Start()
函数执行以下过程:
- 第一步通过
InitBackendClient()
初始化后端服务,目前仅有 redis; - 第二步在 redis 中注册订阅服务
std::make_shared<gcs::GcsPubSub>
和存储服务std::make_shared<gcs::RedisGcsTableStorage>
; - 第三步初始化 GCS 节点管理服务
InitGcsNodeManager()
和 GCS 分组服务InitGcsPlacementGroupManager()
; - 第四步注册各类 RPC 服务,用于其他进程和 GCS 通信。
1.2.2 raylet
同理分析,raylet 最终的源码入口路径为 src/ray/raylet/main.cc
的 main 函数,其执行过程如下:
- 通过
gflags
工具对传入的参数进行预处理。 - 构建通信进程
boost::asio::io_service::work work(main_service)
- 构建
GcsClientOptions
并通过std::make_shared<ray::gcs::ServiceBasedGcsClient>(client_options)
启动 gcs_client. - 若为 head 节点,则首先通过
gcs_client->Nodes().AsyncSetInternalConfig(raylet_config)
将配置信息写入 gcs.raylet_config
配置来源为用户传入的config_list
. - 在异步读取过程中通过 raylet_config 初始化
RayConfig
;从RayConfig
中获取信息创建node_manager_config
和object_manager_config
;初始化监控ray::stats::Init(global_tags, metrics_agent_port)
和 raylet 节点server.reset(new ray::raylet::Raylet( main_service, raylet_socket_name, node_ip_address, redis_address,redis_port,redis_password, node_manager_config, object_manager_config, gcs_client, metrics_export_port))
;最后启动 rayletserver->Start()
- 注册 raylet 关闭句柄
signals.async_wait(handler)
启动通信进程main_service.run()
.
1.2.3 plasma_store
同理分析,raylet 最终的源码入口路径为 src/ray/plasma/store_exec.cc
的 main 函数核心调用如下
cc123456plasma::plasma_store_runner.reset( new plasma::PlasmaStoreRunner(socket_name, system_memory, hugepages_enabled, plasma_directory, external_store_endpoint)); plasma::plasma_store_runner->Start(); plasma::plasma_store_runner.reset();
在 src/ray/object_manager/plasma/store_runner.cc
的 Start()
函数中,直接调用了第三方组件 plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name))
创建存储环境。
二、总结
- 通过对 ray 的启动过程
scripts.py
446 行的分析,可以确定 ray 目前仅支持 redis 作为 gcs 后端并且必须由 ray 自行在 head 上启动,无法指定自己启动的 redis 集群。 - ray 在启动 raylet 时,若为头节点,会首先将
config_list
数据直接存储在 redis 中,然后所有节点复用读取 redis 数据并创建 raylet 的操作,所有 head 节点的config_list
配置会同步到所有 worker 节点中。 - ray 在 python 脚本阶段也与 redis 进行了交互,后续修改存储后端时,需要将 python 脚本相关文件也进行修改。
- ray 的自动扩容仅发生在传入
autoscaling_config
参数的条件下,该参数执指向一个 yaml 配置文件。相关配置模板在python/ray/autoscaler
中。 - 通过
ray up
启动时,从 调用顺序python/ray/autoscaler/commands.py
,python/ray/autoscaler/updater.py
来看,会将自动扩容配置以ray_bootstrap_config.yaml
写入云服务器中。