一、概述

Ray 是一个高性能的分布式执行引擎,开源的人工智能框架,目标之一在于:让开发者可以用一个运行在笔记本电脑上的原型算法,仅需添加数行代码就能轻松转为适合于计算机集群运行的(或单个多核心计算机的)高性能分布式应用.这样的框架需要包含手动优化系统的性能优势,同时又不需要用户关心那些调度、数据传输和硬件错误等问题.(摘自oschina-Ray 分布式执行引擎

本文分析基于 Ray 官方 1.0.0-rc2 版本.

二、过程分析

2.1 python 部分

2.1.1 scripts.py

Ray 中,命令行启动的起点对应的是 python/ray/scripts/scripts.py .该脚本中提供了对所有 ray 命令的执行入口.

以最核心的 ray start 进行分析. 该命令对应脚本中的 358 行 start 函数.

  • 通过 @click.option 添加对各种命令信息的支持,经过处理以后,将数据通过 ray_params = ray.parameter.RayParams 构建为 ray 内部通用的参数,然后判断该命令是否为头节点启动命令.
  • 如果为头节点,则第一步对参数信息进行相关校验,重点逻辑为:头节点只允许自行创建 redis 服务,不允许设置 redis_address.校验后将信息和 autoscaling_config(用于自动扩容相关配置,指向自动扩容相关 yaml 文件的路径)更新到 ray_params 中.参数更新完毕以后,第二步通过 node = ray.node.Node(ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block)启动头节点.第三步将结果输出到命令行中.
  • 如果不为头节点,则第一步对参数信息进行相关校验,然后等待连接 redis 服务并创建客户端(该步中创建客户端仅为校验同节点是否启动多个 worker).第二步通过 ray.node.Node(ray_params, head=False, shutdown_at_exit=block, spawn_reaper=block) 启动工作节点.第三步将结果输出到命令行中.
  • 随后如果命令行中带有 --block 参数,继续等待命令输入.

2.1.2 node.py

start 函数所有节点最后都利用 ray.node.Node 进行启动,该脚本位于 python/ray/node.py. 构建函数在 44 行 __init__ 中.

  • 首先从 ray_params 获取配置,并进行相关校验,配置 plasma_store.
  • 若为 head 节点,则调用 self.start_head_processes() 启动进程,然后创建 redis_client 并存储 head 节点 session_name,session_dir,temp_dir等信息.
  • 调用 self.start_ray_processes()启动进程.

start_head_processes函数在 python/ray/node.py 的 795 行,该函数内部通过 self.start_redis(),self.start_gcs_server(),self.start_monitor(),self.start_dashboard(require_dashboard) 四步启动对应进程.

self.start_redis() 最终调用 ray.services.start_redis 启动 redis.
self.start_gcs_server() 最终调用 ray.services.start_gcs_server 启动 GCS_SERVER.
self.start_monitor() 最终调用 ray.services.start_monitor启动监控进程,该进程同时负责自动扩容工作.
self.start_dashboard(require_dashboard) 最终调用 ray.services.start_dashboard 启动监控面板.


start_ray_processes函数在 python/ray/node.py 的 812 行,该函数内部通过 self.start_plasma_store(plasma_directory, object_store_memory),self.start_raylet(plasma_directory, object_store_memory),可选 self.start_reporter(),可选 self.start_log_monitor() 四步启动对应进程.

self.start_plasma_store(plasma_directory, object_store_memory) 最终调用 ray.services.start_plasma_store启动内存共享服务.
self.start_raylet(plasma_directory, object_store_memory) 最终调用 ray.services.start_raylet启动 raylet 进程.

2.1.3 services.py

所有启动命令,最后都通过 ray.services 相关命令启动,该类函数均位于 python/ray/services.py 中.启动分为三种,一种时直接调用 C++后端进行启动,这里包括 GCS,Raylet,plasma 等;一种是调用 python 脚本,这里包括 dashboard,monitor 等;一种是直接调用第三方组件启动,包括 redis 等.

三种启动除了执行路径不尽相同,其他过程大同小异.本文依托位于 1173 行的 start_gcs_server函数继续分析.

  • 第一步根据传入参数构建 command ,命令的第一个参数为对应可执行文件路径,大部分定义在了 services.py 的开头,GCS_SERVER 相关的 GCS_SERVER_EXECUTABLE 路径为 core/src/ray/gcs/gcs_server;部分启动过程仍依靠相关 python 文件和第三方库,不涉及 c++,启动路径直接卸写在数中.
  • 第二步通过 start_ray_process 函数执行命令,该函数首先判断环境变量中是否还有对应配置并修改对应参数,最后通过 ConsolePopen 执行启动命令.

2.1.4 monitor.py

self.start_monitor() 监控进程启动时所设置的文件路径为 python/ray/monitor.py.

if __name__ == "__main__": 中,读取参数并构建 monitor,随后执行 run 方法启动监控进程.

__init__ 函数中,具体执行过程如下:

  • 第一步初始化状态存储(目前后端只有 redis 客户端).

    1
    2
    3
    4
    5
    # Initialize the Redis clients.
    ray.state.state._initialize_global_state(
    redis_address, redis_password=redis_password)
    self.redis = ray.services.create_redis_client(
    redis_address, password=redis_password)
  • 第二步激活订阅服务,用于获取集群信息.

  • 第三步判断是否传入自动扩容文件路径 autoscaling_config ,传入则额外配置标准自动扩容服务 autoscaler = StandardAutoscaler(autoscaling_config, self.load_metrics)

_run(self)函数中,具体执行过程如下:

  • 订阅消息

    1
    2
    3
    4
    5
    6
    self.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN)
    self.psubscribe(ray.gcs_utils.XRAY_JOB_PATTERN)

    if self.autoscaler:
    self.subscribe(
    ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
  • 循环执行自动扩容命令和消息处理.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # Handle messages from the subscription channels.
    while True:
    # Process autoscaling actions
    if self.autoscaler:
    # Only used to update the load metrics for the autoscaler.
    self.update_raylet_map()
    self.autoscaler.update()

    # Process a round of messages.
    self.process_messages()

    # Wait for a heartbeat interval before processing the next round of
    # messages.
    time.sleep(
    ray._config.raylet_heartbeat_timeout_milliseconds() * 1e-3)
2.1.4.1 autoscaler.py

自动扩容脚本路径为 python/ray/autoscaler/autoscaler.py. 该脚本在创建过程中首先会 通过self.reset(errors_fatal=True) 调用 node_providerget_node_provider 创建 provider 句柄;然后会启动 node_launcher,并将句柄 provider 和自动扩容队列 queue 以及其他信息传入其中,随后在 autoscaler.update() 的自动扩容过程中,就是正常的读取对比配置,然后调用 provider移除节点,通过 launch_new_node 将需要启动的节点放到自动扩容队列中,由 node_launcher 周期执行扩容.

2.1.4.2 node_launcher.py

node_launcher 的路径为 python/ray/autoscaler/node_launcher.py . 该脚本继承自 threading.Thread 会将自己注册为进程,周期性检查队列,并判断执行 self.provider.create_node(node_config, node_tags, count) .

2.1.4.2 node_provider.py

node_provider.py 为抽象脚本,路径为 python/ray/autoscaler/node_provider.py. get_node_provider 会根据传入 provider_config["type"] 的不同,构建不同的 importer.目前支持如下:

1
2
3
4
5
6
7
8
9
NODE_PROVIDERS = {
"local": import_local,
"aws": import_aws,
"gcp": import_gcp,
"azure": import_azure,
"kubernetes": import_kubernetes,
"docker": None,
"external": import_external # Import an external module
}

2.2 C++部分

ray 中,依托 C++代码创建的服务有 gcs,raylet 和 plasma.其中 plasma 早期继承在 ray 中,后期孵化为独立项目.

2.2.1 gcs_server

首先根据 python 文件中 GCS_SERVER_EXECUTABLE 配置搜寻编译工程的 BUILD.bazel 文件,得到如下配置.

1
2
3
4
5
copy_to_workspace(
name = "cp_gcs_server",
srcs = [":gcs_server"],
dstdir = "python/ray/core/src/ray/gcs",
)

然后根据配置中的 srcs 项找对应编译配置,如下.可以断定 c++部分源码入口为 src/ray/gcs/gcs_server/gcs_server_main.cc.

1
2
3
4
5
6
7
8
9
10
11
12
13
cc_binary(
name = "gcs_server",
srcs = [
"src/ray/gcs/gcs_server/gcs_server_main.cc",
],
copts = COPTS,
visibility = ["//java:__subpackages__"],
deps = [
":gcs_server_lib",
":stats_lib",
"@com_github_gflags_gflags//:gflags",
],
)

在对应的 gcs_server_main.cc的 main 函数中,函数执行过程如下:

  • 通过 gflags 工具对传入的参数进行预处理,读取参数并配置到 config_map 中.
  • 通过 ray::stats::Init 启动监控进程
  • 然后构建通信进程 boost::asio::io_service::work work(main_service)
  • 紧跟着构建 GCS_SERVER 参数 gcs_server_config 并通过 ray::gcs::GcsServer gcs_server(gcs_server_config, main_service) 创建 GCS_SERVER 并注册关闭句柄
  • 通过 gcs_server.Start() 启动 gcs_server 并通过 main_service.run()保持通信服务工作.

src/ray/gcs/gcs_server/gcs_server.cc 中, void GcsServer::Start()函数执行以下过程:

  • 第一步通过 InitBackendClient() 初始化后端服务,目前仅有 redis;
  • 第二步在 redis 中注册订阅服务 std::make_shared<gcs::GcsPubSub> 和存储服务 std::make_shared<gcs::RedisGcsTableStorage>;
  • 第三步初始化 GCS 节点管理服务 InitGcsNodeManager()和 GCS 分组服务 InitGcsPlacementGroupManager();
  • 第四步注册各类 RPC 服务,用于其他进程和 GCS 通信.

2.2.2 raylet

同理分析,raylet 最终的源码入口路径为 src/ray/raylet/main.cc 的 main 函数,其执行过程如下:

  • 通过 gflags 工具对传入的参数进行预处理.
  • 构建通信进程 boost::asio::io_service::work work(main_service)
  • 构建 GcsClientOptions 并通过 std::make_shared<ray::gcs::ServiceBasedGcsClient>(client_options) 启动 gcs_client.
  • 若为 head 节点,则首先通过 gcs_client->Nodes().AsyncSetInternalConfig(raylet_config) 将配置信息写入 gcs. raylet_config 配置来源为用户传入的 config_list.
  • 在异步读取过程中通过 raylet_config 初始化 RayConfig;从 RayConfig 中获取信息创建 node_manager_configobject_manager_config ;初始化监控 ray::stats::Init(global_tags, metrics_agent_port) 和 raylet 节点 server.reset(new ray::raylet::Raylet( main_service, raylet_socket_name, node_ip_address, redis_address,redis_port,redis_password, node_manager_config, object_manager_config, gcs_client, metrics_export_port)) ;最后启动 raylet server->Start()
  • 注册 raylet 关闭句柄 signals.async_wait(handler) 启动通信进程 main_service.run() .

2.2.3 plasma_store

同理分析,raylet 最终的源码入口路径为 src/ray/plasma/store_exec.cc 的 main 函数核心调用如下

1
2
3
4
5
6
plasma::plasma_store_runner.reset(
new plasma::PlasmaStoreRunner(socket_name, system_memory, hugepages_enabled,
plasma_directory, external_store_endpoint));

plasma::plasma_store_runner->Start();
plasma::plasma_store_runner.reset();

src/ray/object_manager/plasma/store_runner.ccStart() 函数中,直接调用了第三方组件 plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name)) 创建存储环境.

三、总结

  • 通过对 ray 的启动过程 scripts.py 446 行的分析,可以确定 ray 目前仅支持 redis 作为 gcs 后端并且必须由 ray 自行在 head 上启动,无法指定自己启动的 redis 集群.
  • ray 在启动 raylet 时,若为头节点,会首先将 config_list 数据直接存储在 redis 中,然后所有节点复用读取 redis 数据并创建 raylet 的操作,所有 head 节点的config_list配置会同步到所有 worker 节点中.
  • ray 在 python 脚本阶段也与 redis 进行了交互,后续修改存储后端时,需要将 python 脚本相关文件也进行修改.
  • ray 的自动扩容仅发生在传入 autoscaling_config 参数的条件下,该参数执指向一个 yaml 配置文件.相关配置模板在 python/ray/autoscaler 中.
  • 通过 ray up 启动时,从 调用顺序 python/ray/autoscaler/commands.py , python/ray/autoscaler/updater.py 来看,会将自动扩容配置以 ray_bootstrap_config.yaml 写入云服务器中.