Ray 学习随笔(3)ray-streaming-state 源码阅读
创建于 2021-06-03
更新于 2021-06-03
科技
ray

前言

Ray 源码分析系列笔记。

一、概述

Ray 学习随笔(1)启动过程源码阅读对 `ray` 本身的启动流程进程了分析,Ray 学习随笔(2)ray-streaming 源码阅读对 `ray-streaming` 的启动过程进行了分析。由于 `ray-streaming-state` 并没有被实际使用,所以前文并没有进行深入阅读。本篇具体阅读学习相关代码和业务逻辑。

二、Java 源码分析

2.1 目录结构和文件功能

整个 state 源码包的目录结构如下所示。

bash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
state ├── KeyValueState.java -> key-value类型状态存储接口,提供get和put方法. ├── PartitionRecord.java -> 分区值记录,包含分区ID和具体值. ├── StateException.java -> 状态异常. ├── StateStoreManager.java -> 状态存储事务接口. ├── StorageRecord.java -> 检查点记录,包含检查点ID和具体值. ├── backend │ ├── AbstractKeyStateBackend.java -> 将状态存储事务接口进行实现,通过代理提供value/list/map三种类型的状态存储,通过put,get访问代理获取数据(需要状态描述确定存储表),通过代理实现事务基本调用,具体实现靠strage. keyGroupIndex,currentKey,currentCheckpointId作用未知 │ ├── AbstractStateBackend.java -> 状态后端的父类抽象,通过config构建状态策略,存储后端类型为keyvalue和keymap. keyGroupIndex作用未知 │ ├── BackendType.java -> 状态后端类型,目前只提供memory.(getEnum传入config配置,但逻辑上只返回memory类型,无视config配置). │ ├── KeyStateBackend.java -> 状态存储事务接口AbstractKeyStateBackend的代理部分具体实现,提供value/list/map三种类型代理构建. numberOfKeyGroups,keyGroup作用未知 │ ├── OperatorStateBackend.java -> 状态存储事务接口AbstractKeyStateBackend的代理部分具体实现,仅适用于操作符级别数据存储,只提供list类型代理构建,SplitList和UnionList的区别在于是否对代理标识符.setSplit(true)(Split代理没有调用init,会在get/put时触发init,逻辑上union也不需要init了) │ ├── StateBackendBuilder.java -> 提供状态后端的具体构建, │ ├── StateStrategy.java -> 状态保存模型,DUAL_VERSION允许回滚,SINGLE_VERSION只保留当前版本. │ └── impl │ └── MemoryStateBackend.java -> 内存状态后端的具体实现,提供三种类型存储的构建代理,具体构建函数在store中,直接忽略tableName属性和序列化属性,目测这些是用来个关系型存储后端确定表名和数据序列化用的. ├── config │ ├── ConfigHelper.java -> │ └── ConfigKey.java -> 状态存储config键值存储,允许设置存储后端类型,表名,策略,NUMBER_PER_CHECKPOINT(默认值5,作用未知) ├── keystate │ ├── KeyGroup.java -> 键值组,包含分组键值起点和终点.所有键组的分组最大值默认情况下为作业的最大并行度. │ ├── KeyGroupAssignment.java -> 键值组调度算法,getKeyGroup通过作业最大并行度,task并行度,task索引构建keygroup. assignKeyGroupIndexForKey通过键值哈希和键组数量判断键值所属keygroup索引.computeKeyGroupToTask计算每个算子不同分组id对应的子任务id. │ ├── desc -> 所有描述都需要提供数据类型,但没有进行利用,无法进行数据序列化和反序列化.(后端支持不完全,mem后端不需要关注序列化问题,所以只预留接口,没进行实现) │ │ ├── AbstractStateDescriptor.java -> 状态描述抽象,包含状态名称,状态表名(目前无用),状态类型(内存后端无用).状态名称通过getIdentify对其他调用提供身份名. │ │ ├── ListStateDescriptor.java -> list状态描述实现,需要提供value类型,提供状态build函数,可选设置ListState的partitionNum和index. │ │ ├── MapStateDescriptor.java -> map状态描述实现,需要提供key和value的键值类型. │ │ └── ValueStateDescriptor.java -> value状态描述实现,需要设置默认值和value类型. │ └── state │ ├── ListState.java -> 继承自一元状态接口,将单一value值改为list类型,进而提供对list的修改函数add,update. │ ├── MapState.java -> 继承自一元状态接口,将单一value值改为map类型,进而提供map相关操作. │ ├── State.java -> 状态接口,提供setCurrentKey函数(作用未知,猜测设置状态key值) │ ├── UnaryState.java -> 一元状态接口,每个状态仅包含一个value值,只提供get │ ├── ValueState.java -> 继承自一元状态接口,允许修改状态value值,提供update │ ├── impl │ │ ├── ListStateImpl.java -> ListState的实现,通过StateHelper实现存储. │ │ ├── MapStateImpl.java -> MapState的实现,通过StateHelper实现存储. │ │ ├── OperatorStateImpl.java -> ListState关于操作符的实现,通过StateHelper实现存储.init操作会执行scan扫描存储后端是否有对应数据,目测用于恢复数据,然后执行splitList,将数据切分成PartitionRecord,追加在覆盖原有数据上,操作符状态key值会在基本描述的基础上加入并行度和子任务索引.(splitList和scan逻辑需要仔细梳理) │ │ ├── StateHelper.java -> 状态辅助类,成员变量AbstractKeyStateBackend用于最终存储数据,成员变量AbstractStateDescriptor用于存储状态描述 │ │ └── ValueStateImpl.java -> ValueState的实现,通过StateHelper实现存储. │ └── proxy │ ├── ListStateStoreManagerProxy.java -> 继承自StateStoreManagerProxy,对ListState进行代理包装,对接状态和后端,通过判断stateDescriptor.isOperatorList()提供MapStateImpl和OperatorStateImpl │ ├── MapStateStoreManagerProxy.java -> 继承自StateStoreManagerProxy,对MapState进行代理包装,对接状态和后端,只提供MapStateImpl │ └── ValueStateStoreManagerProxy.java -> 继承自StateStoreManagerProxy,对ValueState进行代理包装,对接状态和后端,只提供ValueStateImpl ├── serialization │ ├── KeyMapStoreSerializer.java -> keymap序列化和反序列化接口 │ ├── KeyValueStoreSerialization.java -> keyvalue序列化和反序列化接口 │ ├── Serializer.java -> FST序列化框架封装 │ └── impl │ ├── AbstractSerialization.java -> 序列化器抽象,generateRowKeyPrefix函数生成数据的row key,函数名应该改为generateRowKeyWithPrefix │ ├── DefaultKeyMapStoreSerializer.java -> key提供keyWithPrefix序列化, map Key直接序列化,map value直接序列化,key未提供反序列化. │ └── DefaultKeyValueStoreSerialization.java -> key只提供keyWithPrefix的序列化,value直接序列化,key未提供反序列化. ├── store │ ├── KeyMapStore.java -> keymap存储接口继承自KeyValue存储接口, │ ├── KeyValueStore.java -> KeyValue存储接口,数据读写外提供用于刷写到硬盘的flush函数,清理缓存的clearCache,结束存储的close │ └── impl -> 内存实现均利用newConcurrentMap,map子类型为hashmap. │ ├── MemoryKeyMapStore.java -> keymap存储的内存实现,可以直接存map,也可以存具体值.删除只能以map为目标,不支持map value删除 │ └── MemoryKeyValueStore.java -> keyvalue存储的内存实现,直接存值. └── strategy ├── AbstractStateStoreManager.java -> 事务管理抽象,内置可读写的frontStore,远程存储的kvStore,只读的middleStore. ├── DualStateStoreManager.java -> ├── MVStateStoreManager.java -> 多版本存储策略实现,利用外部存储mvcc └── StateStoreManagerProxy.java -> 状态存储代理,利用的后端由AbstractKeyStateBackend提供,目前只有mem

2.2 重点参数研究

keyGroupIndex 数据存储相关,目前无用

  • AbstractKeyStateBackend 的 getKeyGroupIndex 在 StateStoreManagerProxy 被调用,修改 stateStrategy 对应值,用在 get 和 put 前,但是后续实现中没有操作该对象。
  • AbstractKeyStateBackend 的 setKeyGroupIndex 在 StateHelper 中被调用,调用父方法 setKeyGroupIndex 和 resetKeyGroupIndex 未被调用。
  • KeyStateBackend 的 setCurrentKey 方法会根据 key 信息直接修改该值。
  • AbstractStateBackend 的 setKeyGroupIndex 目前无调用
  • AbstractStateStoreManager 的 setKeyGroupIndex 在 StateStoreManagerProxy 被调用,父方法 get 和 put 后续实现中没有操作该对象。父方法 setKeyGroupIndex 未被调用

currentCheckpointId 检查点相关

  • AbstractKeyStateBackend 的 setCheckpointId 在 runtime/worker/context/StreamingRuntimeContext.java 中被调用,调用父类的 setCheckpointId 未被调用,状态检查点目前应该没有实现
  • AbstractKeyStateBackend 的 setCheckpointId 在同文件 setContext 方法中被调用,调用父类的 setContext 未被调用
  • AbstractKeyStateBackend 的 getCheckpointId 在 StateStoreManagerProxy 被调用,用在 put 和 get 操作中,put 直接根据检查点构建存储记录 StorageRecord,get 在 daul,mv 中有不同实现。

currentKey 数据查询相关

  • AbstractKeyStateBackend 的 getCurrentKey 在 StateHelper 中被 getStateKey 调用,将 descName 和 currentKey 组合,构建存储键值,descName 通过 descriptor.getIdentify(),具体值为 descriptor 的 name 变量。
  • AbstractKeyStateBackend 的 setCurrentKey 在 runtime/worker/context/StreamingRuntimeContext.java 中被调用,父函数 setCurrentKey 未见调用
  • AbstractKeyStateBackend 的 setCurrentKey 在同文件 setContext 方法中被调用,调用父类的 setContext 未被调用
  • AbstractKeyStateBackend 的 setCurrentKey 在 StateHelper 中被调用,调用方法 setCurrentKey 在 stateimpl 中被调用,后续应为用户调用
  • AbstractKeyStateBackend 的 setCurrentKey 在 KeyStateBackend 中被实现,在赋值前会修改 KeyGroupIndex(KeyGroupAssignment.assignKeyGroupIndexForKey(currentKey, numberOfKeyGroups)) 策略为直接 hash 取模
  • AbstractKeyStateBackend 的 setCurrentKey 在 OperatorStateBackend 中被实现,直接赋值
    currentKey 切换会造成同样查询结果不同,currentKey 参与 StateKey 构建.descriptor 的 name 同样参与构建。
    currentKey 应该与操作符或 task 绑定,descriptor 的 name 可以参与用户 state 绑定或 window 绑定,不同 window 不同 descriptor.

numberOfKeyGroups

  • KeyStateBackend 的 numberOfKeyGroups 在构建时被赋值,目前只在 test 中进行构建。
  • KeyStateBackend 的 numberOfKeyGroups 在 setCurrentKey 时被调用,根据调用情况来看,该变量应该为当前操作符并行度。
  • KeyStateBackend 的 getNumberOfKeyGroups 未见调用

keyGroup

  • KeyStateBackend 的 keyGroup 在构建时被赋值,目前只在 test 中进行构建。
  • KeyStateBackend 的 getkeyGroup 未见调用

2.3 调用逻辑

整个 state 对外暴露的后端接口为 OperatorStateBackendKeyStateBackend ,其中后者源码注释中表示应该只用于键值类型数据流,但是目前计算系统中未进行明细判断。后端的创建由 StateBackendBuilder 提供。在状态后端创建完成以后,两种后端对外暴露的存储类型不尽相同。当执行 commit 事务时,会将存储信息持久化。获取具体的装填后端存储时,需要传入 Descriptor 进行辅助。

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** * TransactionState interface. * <p> * Streaming State should implement transaction in case of failure, which in our case is four * default method, finish, commit, ackCommit, rollback. */ public interface StateStoreManager { /** * The finish method is used when the batched data is all saved in state. Normally, serialization * job is done here. */ void finish(long checkpointId); /** * The commit method is used for persistent, and can be used in another thread to reach async * state commit. Normally, data persistent is done here. */ void commit(long checkpointId); /** * The ackCommit method is used for cleaning the last checkpoint, and must be called after commit * in the same thread. */ void ackCommit(long checkpointId, long timeStamp); /** * The rollback method is used for recovering the checkpoint. */ void rollBack(long checkpointId); }

三、总结

  • state 已经提供了完整的状态支持,ray-streaming 后续发展中应该会将 runtime 中的 content 整体替换成 state 中的 backend.
  • state 目前仅支持内存作为状态存储后端,处于实验性阶段。后续如果对代码进行实际使用,应该提供其他可用后端。

参考

本文作者: 有次元袋的 tiger
本文链接: https://www.superheaoz.top/2021/06/3687/
版权声明: 本站点所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 我的个人天地