一、概述

Ray学习随笔(1)启动过程源码阅读对 ray 本身的启动流程进程了分析,Ray学习随笔(2)ray-streaming源码阅读对 ray-streaming 的启动过程进行了分析.由于 ray-streaming-state 并没有被实际使用,所以前文并没有进行深入阅读.本篇具体阅读学习相关代码和业务逻辑.

二、Java 源码分析

2.1 目录结构和文件功能

整个 state 源码包的目录结构如下所示.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
state
├── KeyValueState.java -> key-value类型状态存储接口,提供get和put方法.
├── PartitionRecord.java -> 分区值记录,包含分区ID和具体值.
├── StateException.java -> 状态异常.
├── StateStoreManager.java -> 状态存储事务接口.
├── StorageRecord.java -> 检查点记录,包含检查点ID和具体值.
├── backend
│ ├── AbstractKeyStateBackend.java -> 将状态存储事务接口进行实现,通过代理提供value/list/map三种类型的状态存储,通过put,get访问代理获取数据(需要状态描述确定存储表),通过代理实现事务基本调用,具体实现靠strage. keyGroupIndex,currentKey,currentCheckpointId作用未知
│ ├── AbstractStateBackend.java -> 状态后端的父类抽象,通过config构建状态策略,存储后端类型为keyvalue和keymap. keyGroupIndex作用未知
│ ├── BackendType.java -> 状态后端类型,目前只提供memory.(getEnum传入config配置,但逻辑上只返回memory类型,无视config配置).
│ ├── KeyStateBackend.java -> 状态存储事务接口AbstractKeyStateBackend的代理部分具体实现,提供value/list/map三种类型代理构建. numberOfKeyGroups,keyGroup作用未知
│ ├── OperatorStateBackend.java -> 状态存储事务接口AbstractKeyStateBackend的代理部分具体实现,仅适用于操作符级别数据存储,只提供list类型代理构建,SplitList和UnionList的区别在于是否对代理标识符.setSplit(true)(Split代理没有调用init,会在get/put时触发init,逻辑上union也不需要init了)
│ ├── StateBackendBuilder.java -> 提供状态后端的具体构建,
│ ├── StateStrategy.java -> 状态保存模型,DUAL_VERSION允许回滚,SINGLE_VERSION只保留当前版本.
│ └── impl
│ └── MemoryStateBackend.java -> 内存状态后端的具体实现,提供三种类型存储的构建代理,具体构建函数在store中,直接忽略tableName属性和序列化属性,目测这些是用来个关系型存储后端确定表名和数据序列化用的.
├── config
│ ├── ConfigHelper.java ->
│ └── ConfigKey.java -> 状态存储config键值存储,允许设置存储后端类型,表名,策略,NUMBER_PER_CHECKPOINT(默认值5,作用未知)
├── keystate
│ ├── KeyGroup.java -> 键值组,包含分组键值起点和终点.所有键组的分组最大值默认情况下为作业的最大并行度.
│ ├── KeyGroupAssignment.java -> 键值组调度算法,getKeyGroup通过作业最大并行度,task并行度,task索引构建keygroup. assignKeyGroupIndexForKey通过键值哈希和键组数量判断键值所属keygroup索引.computeKeyGroupToTask计算每个算子不同分组id对应的子任务id.
│ ├── desc -> 所有描述都需要提供数据类型,但没有进行利用,无法进行数据序列化和反序列化.(后端支持不完全,mem后端不需要关注序列化问题,所以只预留接口,没进行实现)
│ │ ├── AbstractStateDescriptor.java -> 状态描述抽象,包含状态名称,状态表名(目前无用),状态类型(内存后端无用).状态名称通过getIdentify对其他调用提供身份名.
│ │ ├── ListStateDescriptor.java -> list状态描述实现,需要提供value类型,提供状态build函数,可选设置ListState的partitionNum和index.
│ │ ├── MapStateDescriptor.java -> map状态描述实现,需要提供key和value的键值类型.
│ │ └── ValueStateDescriptor.java -> value状态描述实现,需要设置默认值和value类型.
│ └── state
│ ├── ListState.java -> 继承自一元状态接口,将单一value值改为list类型,进而提供对list的修改函数add,update.
│ ├── MapState.java -> 继承自一元状态接口,将单一value值改为map类型,进而提供map相关操作.
│ ├── State.java -> 状态接口,提供setCurrentKey函数(作用未知,猜测设置状态key值)
│ ├── UnaryState.java -> 一元状态接口,每个状态仅包含一个value值,只提供get
│ ├── ValueState.java -> 继承自一元状态接口,允许修改状态value值,提供update
│ ├── impl
│ │ ├── ListStateImpl.java -> ListState的实现,通过StateHelper实现存储.
│ │ ├── MapStateImpl.java -> MapState的实现,通过StateHelper实现存储.
│ │ ├── OperatorStateImpl.java -> ListState关于操作符的实现,通过StateHelper实现存储.init操作会执行scan扫描存储后端是否有对应数据,目测用于恢复数据,然后执行splitList,将数据切分成PartitionRecord,追加在覆盖原有数据上,操作符状态key值会在基本描述的基础上加入并行度和子任务索引.(splitList和scan逻辑需要仔细梳理)
│ │ ├── StateHelper.java -> 状态辅助类,成员变量AbstractKeyStateBackend用于最终存储数据,成员变量AbstractStateDescriptor用于存储状态描述
│ │ └── ValueStateImpl.java -> ValueState的实现,通过StateHelper实现存储.
│ └── proxy
│ ├── ListStateStoreManagerProxy.java -> 继承自StateStoreManagerProxy,对ListState进行代理包装,对接状态和后端,通过判断stateDescriptor.isOperatorList()提供MapStateImpl和OperatorStateImpl
│ ├── MapStateStoreManagerProxy.java -> 继承自StateStoreManagerProxy,对MapState进行代理包装,对接状态和后端,只提供MapStateImpl
│ └── ValueStateStoreManagerProxy.java -> 继承自StateStoreManagerProxy,对ValueState进行代理包装,对接状态和后端,只提供ValueStateImpl
├── serialization
│ ├── KeyMapStoreSerializer.java -> keymap序列化和反序列化接口
│ ├── KeyValueStoreSerialization.java -> keyvalue序列化和反序列化接口
│ ├── Serializer.java -> FST序列化框架封装
│ └── impl
│ ├── AbstractSerialization.java -> 序列化器抽象,generateRowKeyPrefix函数生成数据的row key,函数名应该改为generateRowKeyWithPrefix
│ ├── DefaultKeyMapStoreSerializer.java -> key提供keyWithPrefix序列化, map Key直接序列化,map value直接序列化,key未提供反序列化.
│ └── DefaultKeyValueStoreSerialization.java -> key只提供keyWithPrefix的序列化,value直接序列化,key未提供反序列化.
├── store
│ ├── KeyMapStore.java -> keymap存储接口继承自KeyValue存储接口,
│ ├── KeyValueStore.java -> KeyValue存储接口,数据读写外提供用于刷写到硬盘的flush函数,清理缓存的clearCache,结束存储的close
│ └── impl -> 内存实现均利用newConcurrentMap,map子类型为hashmap.
│ ├── MemoryKeyMapStore.java -> keymap存储的内存实现,可以直接存map,也可以存具体值.删除只能以map为目标,不支持map value删除
│ └── MemoryKeyValueStore.java -> keyvalue存储的内存实现,直接存值.
└── strategy
├── AbstractStateStoreManager.java -> 事务管理抽象,内置可读写的frontStore,远程存储的kvStore,只读的middleStore.
├── DualStateStoreManager.java ->
├── MVStateStoreManager.java -> 多版本存储策略实现,利用外部存储mvcc
└── StateStoreManagerProxy.java -> 状态存储代理,利用的后端由AbstractKeyStateBackend提供,目前只有mem

2.2 重点参数研究

keyGroupIndex 数据存储相关,目前无用

  • AbstractKeyStateBackend 的 getKeyGroupIndex 在 StateStoreManagerProxy 被调用,修改 stateStrategy 对应值,用在 get 和 put 前,但是后续实现中没有操作该对象.
  • AbstractKeyStateBackend 的 setKeyGroupIndex 在 StateHelper 中被调用,调用父方法 setKeyGroupIndex 和 resetKeyGroupIndex 未被调用.
  • KeyStateBackend 的 setCurrentKey 方法会根据 key 信息直接修改该值.
  • AbstractStateBackend 的 setKeyGroupIndex 目前无调用
  • AbstractStateStoreManager 的 setKeyGroupIndex 在 StateStoreManagerProxy 被调用,父方法 get 和 put 后续实现中没有操作该对象.父方法 setKeyGroupIndex 未被调用

currentCheckpointId 检查点相关

  • AbstractKeyStateBackend 的 setCheckpointId 在 runtime/worker/context/StreamingRuntimeContext.java 中被调用,调用父类的 setCheckpointId 未被调用,状态检查点目前应该没有实现
  • AbstractKeyStateBackend 的 setCheckpointId 在同文件 setContext 方法中被调用,调用父类的 setContext 未被调用
  • AbstractKeyStateBackend 的 getCheckpointId 在 StateStoreManagerProxy 被调用,用在 put 和 get 操作中,put 直接根据检查点构建存储记录 StorageRecord,get 在 daul,mv 中有不同实现.

currentKey 数据查询相关

  • AbstractKeyStateBackend 的 getCurrentKey 在 StateHelper 中被 getStateKey 调用,将 descName 和 currentKey 组合,构建存储键值,descName 通过 descriptor.getIdentify(),具体值为 descriptor 的 name 变量.
  • AbstractKeyStateBackend 的 setCurrentKey 在 runtime/worker/context/StreamingRuntimeContext.java 中被调用,父函数 setCurrentKey 未见调用
  • AbstractKeyStateBackend 的 setCurrentKey 在同文件 setContext 方法中被调用,调用父类的 setContext 未被调用
  • AbstractKeyStateBackend 的 setCurrentKey 在 StateHelper 中被调用,调用方法 setCurrentKey 在 stateimpl 中被调用,后续应为用户调用
  • AbstractKeyStateBackend 的 setCurrentKey 在 KeyStateBackend 中被实现,在赋值前会修改 KeyGroupIndex(KeyGroupAssignment.assignKeyGroupIndexForKey(currentKey, numberOfKeyGroups))策略为直接 hash 取模
  • AbstractKeyStateBackend 的 setCurrentKey 在 OperatorStateBackend 中被实现,直接赋值
    currentKey 切换会造成同样查询结果不同,currentKey 参与 StateKey 构建.descriptor 的 name 同样参与构建.
    currentKey 应该与操作符或 task 绑定,descriptor 的 name 可以参与用户 state 绑定或 window 绑定,不同 window 不同 descriptor.

numberOfKeyGroups

  • KeyStateBackend 的 numberOfKeyGroups 在构建时被赋值,目前只在 test 中进行构建.
  • KeyStateBackend 的 numberOfKeyGroups 在 setCurrentKey 时被调用,根据调用情况来看,该变量应该为当前操作符并行度.
  • KeyStateBackend 的 getNumberOfKeyGroups 未见调用

keyGroup

  • KeyStateBackend 的 keyGroup 在构建时被赋值,目前只在 test 中进行构建.
  • KeyStateBackend 的 getkeyGroup 未见调用

2.3 调用逻辑

整个 state 对外暴露的后端接口为 OperatorStateBackendKeyStateBackend ,其中后者源码注释中表示应该只用于键值类型数据流,但是目前计算系统中未进行明细判断.后端的创建由 StateBackendBuilder 提供.在状态后端创建完成以后,两种后端对外暴露的存储类型不尽相同.当执行 commit 事务时,会将存储信息持久化.获取具体的装填后端存储时,需要传入 Descriptor 进行辅助.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* TransactionState interface.
* <p>
* Streaming State should implement transaction in case of failure, which in our case is four
* default method, finish, commit, ackCommit, rollback.
*/
public interface StateStoreManager {

/**
* The finish method is used when the batched data is all saved in state. Normally, serialization
* job is done here.
*/
void finish(long checkpointId);

/**
* The commit method is used for persistent, and can be used in another thread to reach async
* state commit. Normally, data persistent is done here.
*/
void commit(long checkpointId);

/**
* The ackCommit method is used for cleaning the last checkpoint, and must be called after commit
* in the same thread.
*/
void ackCommit(long checkpointId, long timeStamp);

/**
* The rollback method is used for recovering the checkpoint.
*/
void rollBack(long checkpointId);
}

三、总结

  • state 已经提供了完整的状态支持,ray-streaming 后续发展中应该会将 runtime 中的 content 整体替换成 state 中的 backend.
  • state 目前仅支持内存作为状态存储后端,处于实验性阶段.后续如果对代码进行实际使用,应该提供其他可用后端.