一、概述

本文针对 ray-streaming 的源码进行分析,研究启动过程中涉及的操作。ray-streaming 脱胎于 flink,同时源码也是由蚂蚁方面的团队贡献的,目前 ray 的流计算处于一个比较粗浅的状态,只支持单一输入的操作,并且没有提供对时间窗口的支持,同时检查点逻辑也是最基本的内存实现和本地文件系统实现。

ray-streaming 主要通过三个包进行业务支持,整体设计在 ray Java actor 之上,通过构建 jobmaster 进行流计算任务调度和容错恢复,通过 ray 原生提供的 java actor 和 python actor 对多语言进行支持。相较于 flink,ray-streaming 采用了不同的数据传输策略和检查点策略,在做检查点时,会将通过 C++构建的输入输出缓冲池偏移量缓存,可以简化计算恢复逻辑,省去中间数据的存储。

二、Java 源码分析

2.1 streaming-api

2.1.1 StreamingContext

ray/streaming/api/context/StreamingContext.java 是所有流计算任务的起点,包含了流计算的基本上下文信息.其中最重要的函数有两个一个是用于添加计算任务配置的 withConfig方法,该函数传入 map 类型的配置信息,所有信息将在后续执行任务时被提交.另一个函数是 execute ,该函数是流计算最终提交执行的地方,会从所有的流计算 sink 进行反向遍历,通过 JobGraphBuilder 构建流计算任务执行图,随后通过 JobGraphOptimizer 对计算任务进行优化(目前的优化策略仅是从数据源开始遍历任务,对计算任务进行合并,目前存在问题,同一个数据源出现两个以上的 sink 时,任务会存在执行异常.).最终连接或创建 ray 集群,并通过 JobClient.submit 提交任务,JobClient 的函数接口在 streaming-api 中,但具体实现都保存在 streaming-runtime.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Construct job DAG, and execute the job.
*/
public void execute(String jobName) {
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName);
JobGraph originalJobGraph = jobGraphBuilder.build();
this.jobGraph = new JobGraphOptimizer(originalJobGraph).optimize();
jobGraph.printJobGraph();
LOG.info("JobGraph digraph\n{}", jobGraph.generateDigraph());

if (!Ray.isInitialized()) {
if (Config.MEMORY_CHANNEL.equalsIgnoreCase(jobConfig.get(Config.CHANNEL_TYPE))) {
Preconditions.checkArgument(!jobGraph.isCrossLanguageGraph());
ClusterStarter.startCluster(false, true);
LOG.info("Created local cluster for job {}.", jobName);
} else {
ClusterStarter.startCluster(jobGraph.isCrossLanguageGraph(), false);
LOG.info("Created multi process cluster for job {}.", jobName);
}
Runtime.getRuntime().addShutdownHook(new Thread(StreamingContext.this::stop));
} else {
LOG.info("Reuse existing cluster.");
}

ServiceLoader<JobClient> serviceLoader = ServiceLoader.load(JobClient.class);
Iterator<JobClient> iterator = serviceLoader.iterator();
Preconditions.checkArgument(iterator.hasNext(),
"No JobClient implementation has been provided.");
JobClient jobClient = iterator.next();
jobClient.submit(jobGraph, jobConfig);
}

2.1.2 DataStream

StreamingContext 出发,首先要构建数据源算子,该算子通过 ray/streaming/api/stream/DataStreamSource.java对外提供调用接口,用户需要将 StreamingContext 传入其中,用于记录算子递归过程.随后的所有计算任务都基于 ray/streaming/api/stream/DataStream.java 进行构建,通过递归调用的方式,逐次记录调用关系,所有的计算流操作都封装在 ray/streaming/api/stream 目录中.一条计算流的终点是 ray/streaming/api/stream/DataStreamSink.java ,sink 流会将整个计算任务链注册回 StreamingContext 中,用于构建任务,当一个计算流没有 sink 时,流计算上下文将不会保存该任务,也不会触发执行.

2.1.3 operator&function

所有的 DataStream 都是对 Operator 的封装,Operator 的实现路径在 ray/streaming/operator 中,而所有的 Operator 如果涉及到具体的计算操作,则由最终的 Function 负责,Function 的路径为 ray/streaming/function .Function 分为对用户直接暴露和系统内部调用两种,分别保存在 ray/streaming/api/function/implray/streaming/api/function/internal 中.

2.2 streaming-runtime

runtime 层面的类包逐级调用逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
JobClientImpl
└──>JobMaster <───────────────────────────┐
├──>JobScheduler │
│ └──>WorkerLifecycleController │
│ └──>RemoteCallWorker │
│ └──>JobWorker │<──┐
│ ├──>RemoteCallMaster ─┙<──┥
│ ├──>ContextBackend │
│ └──>StreamTask ────┴───────┐
│ └──>Processor │
│ ├──>Operator │
│ └──>StreamingRuntimeContext<┙
│ ├──>KeyStateBackend
│ └──>OperatorStateBackend
├──>GraphManager
├──>ResourceManager
├──>ContextBackend
├──>CheckpointCoordinator
└──>FailoverCoordinator

2.2.1 JobClientImpl

ray/streaming/runtime/client/JobClientImpl.java 是对 JobClient 的具体实现.其中的 submit 函数传入任务图和任务参数,根据任务参数创建 ray-streaming 实现的核心 JobMaster ,随后通过 ray 的远程调用方式,执行 JobMaster 中的 submitJob 函数,正式开始执行计算任务.

2.2.2 JobMaster

ray/streaming/runtime/master/JobMaster.java 在创建时,会根据任务参数创建 StreamingConfig ,并根据该配置初始化目前简易的存储后端 ContextBackend 以及 JobMasterRuntimeContext ,同时回从上下文中判断计算任务是否需要从检查点中恢复,进而执行相关 init 操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) {
LOG.info("Begin submitting job using logical plan: {}.", jobGraph);

this.jobMasterActor = jobMasterActor;

// init manager
graphManager = new GraphManagerImpl(runtimeContext);
resourceManager = new ResourceManagerImpl(runtimeContext);

// build and set graph into runtime context
ExecutionGraph executionGraph = graphManager.buildExecutionGraph(jobGraph);
runtimeContext.setJobGraph(jobGraph);
runtimeContext.setExecutionGraph(executionGraph);

// init scheduler
try {
scheduler = new JobSchedulerImpl(this);
scheduler.scheduleJob(graphManager.getExecutionGraph());
} catch (Exception e) {
LOG.error("Failed to submit job.", e);
return false;
}
return true;
}

JobMaster 的 submitJob 函数需要传入 JobMaster 本身的远程执行句柄和任务图,随后的执行过程分为以下几个阶段:

  • 初始化管理服务:根据 JobMasterRuntimeContext 创建 GraphManagerResourceManager .
  • 构建执行图:通过 GraphManager 将任务图转化为最终的任务执行图 ExecutionGraph,并将计算任务注册到 JobMasterRuntimeContext 中.(执行图的主要过程是针对不同算子并行度信息构建对应的物理执行节点).
  • 任务调度:通过 JobSchedulerImplscheduleJob 函数具体调度并执行流计算任务.

JobMaster 的 init传入变量为一个布尔型,用于判断任务是否是从检查点中恢复.该函数主要用途为启动用于检查点的 CheckpointCoordinator 进程和用于错误恢复的 FailoverCoordinator 进程.然后保存当前计算状态到存储后端中.

1
2
3
4
5
6
7
// init coordinators
checkpointCoordinator = new CheckpointCoordinator(this);
checkpointCoordinator.start();
failoverCoordinator = new FailoverCoordinator(this, isRecover);
failoverCoordinator.start();

saveContext();

JobMaster 的 reportJobWorkerCommit 函数,被 JobWorker 调用,通过调用 CheckpointCoordinator 的相关函数,对检查点进行管理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public byte[] reportJobWorkerCommit(byte[] reportBytes) {
Boolean ret = false;
RemoteCall.BaseWorkerCmd reportPb;
try {
reportPb = RemoteCall.BaseWorkerCmd.parseFrom(reportBytes);
ActorId actorId = ActorId.fromBytes(reportPb.getActorId().toByteArray());
long remoteCallCost = System.currentTimeMillis() - reportPb.getTimestamp();
LOG.info("Vertex {}, request job worker commit cost {}ms, actorId={}.",
getExecutionVertex(actorId), remoteCallCost, actorId);
RemoteCall.WorkerCommitReport commit =
reportPb.getDetail().unpack(RemoteCall.WorkerCommitReport.class);
WorkerCommitReport report = new WorkerCommitReport(actorId, commit.getCommitCheckpointId());
ret = checkpointCoordinator.reportJobWorkerCommit(report);
} catch (InvalidProtocolBufferException e) {
LOG.error("Parse job worker commit has exception.", e);
}
return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray();
}

JobMaster 的 requestJobWorkerRollback 函数,同样被 JobWorker 调用,当判定计算任务需要回滚时,将会调用 FailoverCoordinator 的相关函数,进行错误恢复.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public byte[] requestJobWorkerRollback(byte[] requestBytes) {
Boolean ret = false;
RemoteCall.BaseWorkerCmd requestPb;
try {
requestPb = RemoteCall.BaseWorkerCmd.parseFrom(requestBytes);
ActorId actorId = ActorId.fromBytes(requestPb.getActorId().toByteArray());
long remoteCallCost = System.currentTimeMillis() - requestPb.getTimestamp();
ExecutionGraph executionGraph = graphManager.getExecutionGraph();
Optional<BaseActorHandle> rayActor = executionGraph.getActorById(actorId);
if (!rayActor.isPresent()) {
LOG.warn("Skip this invalid rollback, actor id {} is not found.", actorId);
return RemoteCall.BoolResult.newBuilder().setBoolRes(false).build().toByteArray();
}
ExecutionVertex exeVertex = getExecutionVertex(actorId);
LOG.info("Vertex {}, request job worker rollback cost {}ms, actorId={}.",
exeVertex, remoteCallCost, actorId);
RemoteCall.WorkerRollbackRequest rollbackPb
= RemoteCall.WorkerRollbackRequest.parseFrom(requestPb.getDetail().getValue());
exeVertex.setPid(rollbackPb.getWorkerPid());
// To find old container where slot is located in.
String hostname = "";
Optional<Container> container = ResourceUtil.getContainerById(
resourceManager.getRegisteredContainers(),
exeVertex.getContainerId()
);
if (container.isPresent()) {
hostname = container.get().getHostname();
}
WorkerRollbackRequest request = new WorkerRollbackRequest(
actorId, rollbackPb.getExceptionMsg(), hostname, exeVertex.getPid()
);

ret = failoverCoordinator.requestJobWorkerRollback(request);
LOG.info("Vertex {} request rollback, exception msg : {}.",
exeVertex, rollbackPb.getExceptionMsg());

} catch (Throwable e) {
LOG.error("Parse job worker rollback has exception.", e);
}
return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray();
}

2.2.3 JobScheduler

ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java 在创建过程中需要传入 JobMaster ,随后从中获取管理服务和计算任务配置,并创建用于具体算子执行管理的 WorkerLifecycleController .

JobScheduler 的 scheduleJob 函数,对传入的物理执行图进行处理,启动计算任务,主要分为以下几个阶段:

  • 准备阶段:通过 prepareResourceAndCreateWorker 准备资源并创建相应的 JobWorker ,随后通过 generateActorMappings 生成计算节点间的映射关系,用于数据分发.
  • 启动阶段: initAndStart 启动流计算任务.

JobScheduler 的 prepareResourceAndCreateWorker 函数,对传入的物理执行图进行处理,主要分为以下几个阶段:

  • 准备计算资源:通过 resourceManager.assignResource 向 ray 集群申请资源.
  • 创建计算节点: 通过 createWorkers 创建所有计算节点.最终通过 WorkerLifecycleControllercreateWorkers 函数创建计算节点.

JobScheduler 的 initAndStart 函数,对传入的物理执行图进行处理,主要分为以下几个阶段:

  • 构建计算上下文:通过物理执行图和 JobMaster 句柄,构建每一个 JobWorker 的上下文信息 JobWorkerContext .
  • 初始化计算节点: 通过 initWorkers 初始化所有计算节点.最终是调用了 WorkerLifecycleControllerinitWorkers 函数.
  • 初始化主节点:通过 initMaster 初始化主节点.最终有调用回了 JobMasterinit 函数.
  • 启动计算节点:通过 initWorkers 启动所有计算任务.最终是调用了 WorkerLifecycleControllerstartWorkers 函数.

2.2.4 WorkerLifecycleController

ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java用于所有计算节点的生命周期管理,提供创建,初始化,启动,销毁等操作.

WorkerLifecycleController 的 asyncBatchExecute 是统一封装的异步执行函数,通过 java 自带的 CompletableFuture 类的 supplyAsync 函数,异步执行计算任务.同时会校验计算任务的执行结果.

WorkerLifecycleController 的 createWorkers 通过 asyncBatchExecute 方式创建 JobMaster.异步执行的函数为 createWorker .在这个函数中,会逐个遍历物理执行图的节点,然后根据计算任务类型创建基于 java 或是基于 python 的 JobWorker,随后再将创建好的 JobWorker 注册回执行图中,用于后续初始化和启动操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private boolean createWorker(ExecutionVertex executionVertex) {
LOG.info("Start to create worker actor for vertex: {} with resource: {}, workeConfig: {}.",
executionVertex.getExecutionVertexName(), executionVertex.getResource(),
executionVertex.getWorkerConfig());

Language language = executionVertex.getLanguage();

BaseActorHandle actor;
if (Language.JAVA == language) {
actor = Ray.actor(JobWorker::new, executionVertex)
.setResources(executionVertex.getResource())
.setMaxRestarts(-1)
.remote();
} else {
RemoteCall.ExecutionVertexContext.ExecutionVertex vertexPb
= new GraphPbBuilder().buildVertex(executionVertex);
actor = Ray.actor(
PyActorClass.of("ray.streaming.runtime.worker", "JobWorker"), vertexPb.toByteArray())
.setResources(executionVertex.getResource())
.setMaxRestarts(-1)
.remote();
}

if (null == actor) {
LOG.error("Create worker actor failed.");
return false;
}

executionVertex.setWorkerActor(actor);

LOG.info("Worker actor created, actor: {}, vertex: {}.",
executionVertex.getWorkerActorId(), executionVertex.getExecutionVertexName());
return true;
}

WorkerLifecycleController 的 initWorkers 负责初始化 JobWorker,通过 RemoteCallWorkerinitWorker 函数逐个传递 JobWorkerContext 并初始化计算任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean initWorkers(
Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) {
LOG.info("Begin initiating workers: {}.", vertexToContextMap);
long startTime = System.currentTimeMillis();

Map<ObjectRef<Boolean>, ActorId> rayObjects = new HashMap<>();
vertexToContextMap.entrySet().forEach((entry -> {
ExecutionVertex vertex = entry.getKey();
rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), entry.getValue()),
vertex.getWorkerActorId());
}));

List<ObjectRef<Boolean>> objectRefList = new ArrayList<>(rayObjects.keySet());

LOG.info("Waiting for workers' initialization.");
WaitResult<Boolean> result = Ray.wait(objectRefList, objectRefList.size(), timeout);
if (result.getReady().size() != objectRefList.size()) {
LOG.error("Initializing workers timeout[{} ms].", timeout);
return false;
}

LOG.info("Finished waiting workers' initialization.");
LOG.info("Workers initialized. Cost {} ms.", System.currentTimeMillis() - startTime);
return true;
}

WorkerLifecycleController 的 startWorkers 负责通知 JobWorker 启动计算任务,通过 RemoteCallWorkerrollback 函数逐个通知对应 JobWorker 通过 rollback启动 StreamTask.这里为了保证计算逻辑,会先启动所有数据源,然后再逐个启动其他计算任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean startWorkers(ExecutionGraph executionGraph, long lastCheckpointId, int timeout) {
LOG.info("Begin starting workers.");
long startTime = System.currentTimeMillis();
List<ObjectRef<Object>> objectRefs = new ArrayList<>();

// start source actors 1st
executionGraph.getSourceActors()
.forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId)));

// then start non-source actors
executionGraph.getNonSourceActors()
.forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId)));

WaitResult<Object> result = Ray.wait(objectRefs, objectRefs.size(), timeout);
if (result.getReady().size() != objectRefs.size()) {
LOG.error("Starting workers timeout[{} ms].", timeout);
return false;
}

LOG.info("Workers started. Cost {} ms.", System.currentTimeMillis() - startTime);
return true;
}

2.2.4 RemoteCallWorker&RemoteCallMaster

ray/streaming/runtime/rpc/RemoteCallWorker.java 负责处理所用 JobWorker 的远程调用逻辑.

RemoteCallWorker 的 initWorker 函数最终逐个传递 JobWorkerContext 并调用对应 JobMaster 的 init 函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static ObjectRef<Boolean> initWorker(BaseActorHandle actor, JobWorkerContext context) {
LOG.info("Call worker to initiate, actor: {}, context: {}.", actor.getId(), context);
ObjectRef<Boolean> result;

// python
if (actor instanceof PyActorHandle) {
result = ((PyActorHandle) actor).task(PyActorMethod.of("init", Boolean.class),
context.getPythonWorkerContextBytes()).remote();
} else {
// java
result = ((ActorHandle<JobWorker>) actor).task(JobWorker::init, context).remote();
}

LOG.info("Finished calling worker to initiate.");
return result;
}

RemoteCallWorker 的 rollback 函数通过 JobWorker 的 rollback 函数启动计算,该函数同时支持从检查点中启动和直接启动.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static ObjectRef rollback(BaseActorHandle actor, final Long checkpointId) {
LOG.info("Call worker to start, actor: {}.", actor.getId());
ObjectRef result;

// python
if (actor instanceof PyActorHandle) {
RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder()
.setCheckpointId(checkpointId)
.build();
result = ((PyActorHandle) actor)
.task(PyActorMethod.of("rollback"),
checkpointIdPb.toByteArray()
).remote();
} else {
// java
result = ((ActorHandle<JobWorker>) actor)
.task(JobWorker::rollback, checkpointId, System.currentTimeMillis()).remote();
}

LOG.info("Finished calling worker to start.");
return result;
}

ray/streaming/runtime/rpc/RemoteCallMaster.java 负责处理所用 JobMaster 的远程调用逻辑.主要用来同步检查点逻辑,包含 reportJobWorkerCommitAsyncrequestJobWorkerRollback 两个函数.

RemoteCallMaster 的 reportJobWorkerCommitAsync 函数被 StreamTask 调用,用于通知检查点执行完毕.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ObjectRef<byte[]> reportJobWorkerCommitAsync(
ActorHandle<JobMaster> actor,
WorkerCommitReport commitReport) {
RemoteCall.WorkerCommitReport commit = RemoteCall.WorkerCommitReport.newBuilder()
.setCommitCheckpointId(commitReport.commitCheckpointId)
.build();
Any detail = Any.pack(commit);
RemoteCall.BaseWorkerCmd cmd = RemoteCall.BaseWorkerCmd.newBuilder()
.setActorId(ByteString.copyFrom(commitReport.fromActorId.getBytes()))
.setTimestamp(System.currentTimeMillis())
.setDetail(detail).build();

return actor.task(JobMaster::reportJobWorkerCommit, cmd.toByteArray()).remote();
}

RemoteCallMaster 的 requestJobWorkerRollback 函数被 JobWorker 调用,用于在检测到异常时,通知 JobMaster 进行全局的业务回滚.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static Boolean requestJobWorkerRollback(
ActorHandle<JobMaster> actor,
WorkerRollbackRequest rollbackRequest) {
RemoteCall.WorkerRollbackRequest request = RemoteCall.WorkerRollbackRequest.newBuilder()
.setExceptionMsg(rollbackRequest.getRollbackExceptionMsg())
.setWorkerHostname(rollbackRequest.getHostname())
.setWorkerPid(rollbackRequest.getPid()).build();
Any detail = Any.pack(request);
RemoteCall.BaseWorkerCmd cmd = RemoteCall.BaseWorkerCmd.newBuilder()
.setActorId(ByteString.copyFrom(rollbackRequest.fromActorId.getBytes()))
.setTimestamp(System.currentTimeMillis())
.setDetail(detail).build();
ObjectRef<byte[]> ret = actor.task(
JobMaster::requestJobWorkerRollback, cmd.toByteArray()).remote();
byte[] res = ret.get();
return PbResultParser.parseBoolResult(res);
}

2.2.5 JobWorker

ray/streaming/runtime/worker/JobWorker.java 是注册到 ray 集群中的最小单位,JobWorker 内部包含的 StreamTask 是流计算任务的最小单位.

JobWorker 在创建阶段,会根据物理执行节点信息创建 JobWorkerContext 并构建属于 JobWorker 的存储后端 ContextBackend (该后端和 Job Master 中使用的后端类型一致,但存储各自独立).随后判断计算节点上下文中是否含有检查点信息,若存在,在从后端中获取信息后,会直接调用 init 从恢复的上下文中初始化节点,然后请求启动计算任务 requestRollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public JobWorker(ExecutionVertex executionVertex) {
LOG.info("Creating job worker.");

// TODO: the following 3 lines is duplicated with that in init(), try to optimise it later.
this.executionVertex = executionVertex;
this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig());
this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig);

LOG.info("Ray.getRuntimeContext().wasCurrentActorRestarted()={}",
Ray.getRuntimeContext().wasCurrentActorRestarted());
if (!Ray.getRuntimeContext().wasCurrentActorRestarted()) {
saveContext();
LOG.info("Job worker is fresh started, init success.");
return;
}

LOG.info("Begin load job worker checkpoint state.");

byte[] bytes = CheckpointStateUtil.get(contextBackend, getJobWorkerContextKey());
if (bytes != null) {
JobWorkerContext context = Serializer.decode(bytes);
LOG.info("Worker recover from checkpoint state, byte len={}, context={}.", bytes.length,
context);
init(context);
requestRollback("LoadCheckpoint request rollback in new actor.");
} else {
LOG.error(
"Worker is reconstructed, but can't load checkpoint. " +
"Check whether you checkpoint state is reliable. Current checkpoint state is {}.",
contextBackend.getClass().getName());
}
}

JobWorker 的 init 函数,通过上下文信息,再次构建节点相关信息(这里的步骤在 JobWorker 创建时执行过,但具体依赖的上下文信息存在异同),并进行保存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Boolean init(JobWorkerContext workerContext) {
// IMPORTANT: some test cases depends on this log to find workers' pid,
// be careful when changing this log.
LOG.info("Initiating job worker: {}. Worker context is: {}, pid={}.",
workerContext.getWorkerName(), workerContext, EnvUtil.getJvmPid());

this.workerContext = workerContext;
this.executionVertex = workerContext.getExecutionVertex();
this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig());
// init state backend
this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig);

LOG.info("Initiating job worker succeeded: {}.", workerContext.getWorkerName());
saveContext();
return true;
}

public synchronized void saveContext() {
byte[] contextBytes = Serializer.encode(workerContext);
String key = getJobWorkerContextKey();
LOG.info("Saving context, worker context={}, serialized byte length={}, key={}.", workerContext,
contextBytes.length, key);
CheckpointStateUtil.put(contextBackend, key, contextBytes);
}

JobWorker 的 rollback 函数,负责实际创建用于直接计算任务的 StreamTask,并负责对相应计算任务的生命周期管理和检查点管理. rollback 函数通过判断传入的检查点 ID,但任务已经创建切检查点 ID 和本地相同时,将自动跳过剩余步骤.否则将正常创建数据传输信道,并通过调用 createStreamTask 函数创建计算任务,随后通过 StreamTask 的 recover 函数启动计算.目前的流计算任务仅支持数据源操作和单输入操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public CallResult<ChannelRecoverInfo> rollback(Long checkpointId, Long startRollbackTs) {
synchronized (initialStateChangeLock) {
if (task != null && task.isAlive() && checkpointId == task.lastCheckpointId &&
task.isInitialState) {
return CallResult.skipped("Task is already in initial state, skip this rollback.");
}
}
long remoteCallCost = System.currentTimeMillis() - startRollbackTs;

LOG.info("Start rollback[{}], checkpoint is {}, remote call cost {}ms.",
executionVertex.getExecutionJobVertexName(), checkpointId, remoteCallCost);

rollbackCount++;
if (rollbackCount > 1) {
isRecreate.set(true);
}

try {
//Init transfer
TransferChannelType channelType = workerConfig.transferConfig.channelType();
if (TransferChannelType.NATIVE_CHANNEL == channelType) {
transferHandler = new TransferHandler();
}

if (task != null) {
// make sure the task is closed
task.close();
task = null;
}

// create stream task
task = createStreamTask(checkpointId);
ChannelRecoverInfo channelRecoverInfo = task.recover(isRecreate.get());
isNeedRollback = false;

LOG.info("Rollback job worker success, checkpoint is {}, channelRecoverInfo is {}.",
checkpointId, channelRecoverInfo);

return CallResult.success(channelRecoverInfo);
} catch (Exception e) {
LOG.error("Rollback job worker has exception.", e);
return CallResult.fail(ExceptionUtils.getStackTrace(e));
}
}

private StreamTask createStreamTask(long checkpointId) {
StreamTask task;
StreamProcessor streamProcessor = ProcessBuilder
.buildProcessor(executionVertex.getStreamOperator());
LOG.debug("Stream processor created: {}.", streamProcessor);

if (streamProcessor instanceof SourceProcessor) {
task = new SourceStreamTask(streamProcessor, this, checkpointId);
} else if (streamProcessor instanceof OneInputProcessor) {
task = new OneInputStreamTask(streamProcessor, this, checkpointId);
} else {
throw new RuntimeException("Unsupported processor type:" + streamProcessor);
}
LOG.info("Stream task created: {}.", task);
return task;
}

JobWorker 的检查点相关函数总共有三个,其中 triggerCheckpoint 用于触发检查点, notifyCheckpointTimeout 用于通知检查点执行失败, clearExpiredCheckpoint 用于清理过期检查点.所有函数最终调用了 StreamTask 中对应的相关方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Boolean triggerCheckpoint(Long barrierId) {
LOG.info("Receive trigger, barrierId is {}.", barrierId);
if (task != null) {
return task.triggerCheckpoint(barrierId);
}
return false;
}

public Boolean notifyCheckpointTimeout(Long checkpointId) {
LOG.info("Notify checkpoint timeout, checkpoint id is {}.", checkpointId);
if (task != null) {
task.notifyCheckpointTimeout(checkpointId);
}
return true;
}

public Boolean clearExpiredCheckpoint(Long expiredStateCpId, Long expiredQueueCpId) {
LOG.info("Clear expired checkpoint state, checkpoint id is {}; " +
"Clear expired queue msg, checkpoint id is {}",
expiredStateCpId, expiredQueueCpId);
if (task != null) {
if (expiredStateCpId > 0) {
task.clearExpiredCpState(expiredStateCpId);
}
task.clearExpiredQueueMsg(expiredQueueCpId);
}
return true;
}

JobWorker 的容错相关函数总共有两个,其中 requestRollback 负责通知 JobMaster 计算任务需要回滚, checkIfNeedRollback 则是单纯用于容错恢复的判断.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void requestRollback(String exceptionMsg) {
LOG.info("Request rollback.");
isNeedRollback = true;
isRecreate.set(true);
boolean requestRet = RemoteCallMaster.requestJobWorkerRollback(
workerContext.getMaster(), new WorkerRollbackRequest(
workerContext.getWorkerActorId(),
exceptionMsg,
EnvUtil.getHostName(),
EnvUtil.getJvmPid()
));
if (!requestRet) {
LOG.warn("Job worker request rollback failed! exceptionMsg={}.", exceptionMsg);
}
}

public Boolean checkIfNeedRollback(Long startCallTs) {
// No save checkpoint in this query.
long remoteCallCost = System.currentTimeMillis() - startCallTs;
LOG.info("Finished checking if need to rollback with result: {}, rpc delay={}ms.",
isNeedRollback, remoteCallCost);
return isNeedRollback;
}

2.2.6 StreamTask

ray/streaming/runtime/worker/tasks/StreamTask.java 是所有流计算任务的基类继承自 Runnable ,在创建时会将自身注册为独立线程来做到流数据的持续处理.StreamTask 存在多个特化的子类,用于实现对不同计算任务的处理,其类继承关系如下.主要分为数据源任务和输入任务两类,输入任务又具体分为单输入和双输入,对应操作符的不同实现.所有关于计算算子本身的操作都在 StreamTask 中进行实现,关于数据处理,则下推到具体的 StreamTask 的 run 函数中实现.当 run 函数中捕获异常时,会触发回滚.

1
2
3
4
5
StreamTask
├──>InputStreamTask
│ ├──>OneInputStreamTask
│ └──>TwoInputStreamTask
└──>SourceStreamTask
1
2
3
4
5
6
7
8
9
protected StreamTask(Processor processor, JobWorker jobWorker, long lastCheckpointId) {
this.processor = processor;
this.jobWorker = jobWorker;
this.checkpointState = jobWorker.contextBackend;
this.lastCheckpointId = lastCheckpointId;

this.thread = new Thread(Ray.wrapRunnable(this), this.getClass().getName() + "-" + System.currentTimeMillis());
this.thread.setDaemon(true);
}

StreamTask 的 recover 函数负责启动计算任务,首先会调用 prepareTask 函数确定计算任务初始化信息,然后启动线程即启动计算任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public ChannelRecoverInfo recover(boolean isRecover) {

if (isRecover) {
LOG.info("Stream task begin recover.");
} else {
LOG.info("Stream task first start begin.");
}
prepareTask(isRecover);

// start runner
ChannelRecoverInfo recoverInfo = new ChannelRecoverInfo(new HashMap<>());
if (reader != null) {
recoverInfo = reader.getQueueRecoverInfo();
}

thread.setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in runner thread.", e));
LOG.info("Start stream task: {}.", this.getClass().getSimpleName());
thread.start();

if (isRecover) {
LOG.info("Stream task recover end.");
} else {
LOG.info("Stream task first start finished.");
}

return recoverInfo;
}

StreamTask 的 prepareTask 函数负责准备计算任务所需资源,具体分为以下几步:

  • 通过传入的参数判断计算任务是否为初次创建,并从存储后端中获取相应数据
  • (option)若为检查点恢复且检查点中存在数据,则调用 Processor 的相关 loadCheckpoint 函数,该函数最中由用户自行实现.
  • 创建数据读写入口 DataWriterDataReader.
  • 调用 openProcessor 传递数据,执行计算任务的 open 阶段.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void prepareTask(boolean isRecreate) {
LOG.info("Preparing stream task, isRecreate={}.", isRecreate);
ExecutionVertex executionVertex = jobWorker.getExecutionVertex();

// set vertex info into config for native using
jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.WORKER_NAME_INTERNAL,
executionVertex.getExecutionVertexName());
jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.OP_NAME_INTERNAL,
executionVertex.getExecutionJobVertexName());

OperatorCheckpointInfo operatorCheckpointInfo = new OperatorCheckpointInfo();
byte[] bytes = null;

// Fetch checkpoint from storage only in recreate mode not for new startup
// worker
// in rescaling or something like that.
if (isRecreate) {
String cpKey = genOpCheckpointKey(lastCheckpointId);
LOG.info("Getting task checkpoints from state, cpKey={}, checkpointId={}.", cpKey, lastCheckpointId);
bytes = CheckpointStateUtil.get(checkpointState, cpKey);
if (bytes == null) {
String msg = String.format("Task recover failed, checkpoint is null! cpKey=%s", cpKey);
throw new RuntimeException(msg);
}
}

// when use memory state, if actor throw exception, will miss state
if (bytes != null) {
operatorCheckpointInfo = Serializer.decode(bytes);
processor.loadCheckpoint(operatorCheckpointInfo.processorCheckpoint);
LOG.info("Stream task recover from checkpoint state, checkpoint bytes len={}, checkpointInfo={}.", bytes.length,
operatorCheckpointInfo);
}

// writer
if (!executionVertex.getOutputEdges().isEmpty()) {
LOG.info("Register queue writer, channels={}, outputCheckpoints={}.", executionVertex.getOutputChannelIdList(),
operatorCheckpointInfo.outputPoints);
writer = new DataWriter(executionVertex.getOutputChannelIdList(), executionVertex.getOutputActorList(),
operatorCheckpointInfo.outputPoints, jobWorker.getWorkerConfig());
}

// reader
if (!executionVertex.getInputEdges().isEmpty()) {
LOG.info("Register queue reader, channels={}, inputCheckpoints={}.", executionVertex.getInputChannelIdList(),
operatorCheckpointInfo.inputPoints);
reader = new DataReader(executionVertex.getInputChannelIdList(), executionVertex.getInputActorList(),
operatorCheckpointInfo.inputPoints, jobWorker.getWorkerConfig());
}

openProcessor();

LOG.debug("Finished preparing stream task.");
}

StreamTask 的 openProcessor 函数负责激活计算任务的 open 阶段,主要任务为创建计算任务所需的 StreamingRuntimeContext 然后调用处理器的 open 函数传递上下文信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void openProcessor() {
ExecutionVertex executionVertex = jobWorker.getExecutionVertex();
List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges();

Map<String, List<String>> opGroupedChannelId = new HashMap<>();
Map<String, List<BaseActorHandle>> opGroupedActor = new HashMap<>();
Map<String, Partition> opPartitionMap = new HashMap<>();
for (int i = 0; i < outputEdges.size(); ++i) {
ExecutionEdge edge = outputEdges.get(i);
String opName = edge.getTargetExecutionJobVertexName();
if (!opPartitionMap.containsKey(opName)) {
opGroupedChannelId.put(opName, new ArrayList<>());
opGroupedActor.put(opName, new ArrayList<>());
}
opGroupedChannelId.get(opName).add(executionVertex.getOutputChannelIdList().get(i));
opGroupedActor.get(opName).add(executionVertex.getOutputActorList().get(i));
opPartitionMap.put(opName, edge.getPartition());
}
opPartitionMap.keySet().forEach(opName -> {
collectors.add(new OutputCollector(writer, opGroupedChannelId.get(opName), opGroupedActor.get(opName),
opPartitionMap.get(opName)));
});

RuntimeContext runtimeContext = new StreamingRuntimeContext(executionVertex, jobWorker.getWorkerConfig().configMap,
executionVertex.getParallelism());

processor.open(collectors, runtimeContext);
}

StreamTask 的检查点相关操作主要有以下几个:

  • 用于触发检查点的 triggerCheckpoint 函数,该函数只允许在数据源中调用.
  • 用于执行检查点的 doCheckpoint 函数,该函数会首先存储消息服务的相关数据偏移量(所有计算任务都会保存并分发 DataWriter 偏移量,同时上游传入的偏移量作为 DataReader 的偏移量进行保存),然后继续将包含偏移量的检查点信息进行广播,调用 processorsaveCheckpoint 函数执行用户的检查点方法,最后调用 saveCpStateAndReport 函数保存检查点信息并汇报结果.
  • saveCpStateAndReport 函数会依次调用 saveCpreportCommit 函数.
  • saveCp 是最终执行检查点保存的函数,调用工具类 CheckpointStateUtil 将检查点数据存储到后端中.
  • reportCommit 函数负责通过 RemoteCallMaster 向 JobMaster 汇报检查点任务完成.
  • clearExpiredCpState , clearExpiredQueueMsg , 用于清理本地过期检查点数据.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public boolean triggerCheckpoint(Long barrierId) {
throw new UnsupportedOperationException("Only source operator supports trigger checkpoints.");
}

public void doCheckpoint(long checkpointId, Map<String, OffsetInfo> inputPoints) {
Map<String, OffsetInfo> outputPoints = null;
if (writer != null) {
outputPoints = writer.getOutputCheckpoints();
RemoteCall.Barrier barrierPb = RemoteCall.Barrier.newBuilder().setId(checkpointId).build();
ByteBuffer byteBuffer = ByteBuffer.wrap(barrierPb.toByteArray());
byteBuffer.order(ByteOrder.nativeOrder());
writer.broadcastBarrier(checkpointId, byteBuffer);
}

LOG.info("Start do checkpoint, cp id={}, inputPoints={}, outputPoints={}.", checkpointId, inputPoints,
outputPoints);

this.lastCheckpointId = checkpointId;
Serializable processorCheckpoint = processor.saveCheckpoint();

try {
OperatorCheckpointInfo opCpInfo = new OperatorCheckpointInfo(inputPoints, outputPoints, processorCheckpoint,
checkpointId);
saveCpStateAndReport(opCpInfo, checkpointId);
} catch (Exception e) {
// there will be exceptions when flush state to backend.
// we ignore the exception to prevent failover
LOG.error("Processor or op checkpoint exception.", e);
}

LOG.info("Operator do checkpoint {} finish.", checkpointId);
}

private void saveCpStateAndReport(OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) {
saveCp(operatorCheckpointInfo, checkpointId);
reportCommit(checkpointId);

LOG.info("Finish save cp state and report, checkpoint id is {}.", checkpointId);
}

private void saveCp(OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) {
byte[] bytes = Serializer.encode(operatorCheckpointInfo);
String cpKey = genOpCheckpointKey(checkpointId);
LOG.info("Saving task checkpoint, cpKey={}, byte len={}, checkpointInfo={}.", cpKey, bytes.length,
operatorCheckpointInfo);
synchronized (checkpointState) {
if (outdatedCheckpoints.contains(checkpointId)) {
LOG.info("Outdated checkpoint, skip save checkpoint.");
outdatedCheckpoints.remove(checkpointId);
} else {
CheckpointStateUtil.put(checkpointState, cpKey, bytes);
}
}
}

private void reportCommit(long checkpointId) {
final JobWorkerContext context = jobWorker.getWorkerContext();
LOG.info("Report commit async, checkpoint id {}.", checkpointId);
RemoteCallMaster.reportJobWorkerCommitAsync(context.getMaster(),
new WorkerCommitReport(context.getWorkerActorId(), checkpointId));
}

public void notifyCheckpointTimeout(long checkpointId) {
String cpKey = genOpCheckpointKey(checkpointId);
try {
synchronized (checkpointState) {
if (checkpointState.exists(cpKey)) {
checkpointState.remove(cpKey);
} else {
outdatedCheckpoints.add(checkpointId);
}
}
} catch (Exception e) {
LOG.error("Notify checkpoint timeout failed, checkpointId is {}.", checkpointId, e);
}
}

public void clearExpiredCpState(long checkpointId) {
String cpKey = genOpCheckpointKey(checkpointId);
try {
checkpointState.remove(cpKey);
} catch (Exception e) {
LOG.error("Failed to remove key {} from state backend.", cpKey, e);
}
}

public void clearExpiredQueueMsg(long checkpointId) {
// get operator checkpoint
String cpKey = genOpCheckpointKey(checkpointId);
byte[] bytes;
try {
bytes = checkpointState.get(cpKey);
} catch (Exception e) {
LOG.error("Failed to get key {} from state backend.", cpKey, e);
return;
}
if (bytes != null) {
final OperatorCheckpointInfo operatorCheckpointInfo = Serializer.decode(bytes);
long cpId = operatorCheckpointInfo.checkpointId;
if (writer != null) {
writer.clearCheckpoint(cpId);
}
}
}

StreamTask 的 requestRollback 函数请求业务回滚.

1
2
3
protected void requestRollback(String exceptionMsg) {
jobWorker.requestRollback(exceptionMsg);
}
2.2.6.1 SourceStreamTask

ray/streaming/runtime/worker/tasks/SourceStreamTask.java 是对数据源的专门实现,重点构建了数据读取的 run 函数和触发检查点的 triggerCheckpoint 函数.

SourceStreamTask 的 run 函数会循环执行检查点判定和数据读取,正常 process 的数据处理函数为 process ,由于数据源不需要数据输入,所以单独定义了 fetch 函数.当获取到 barrierId 是,会调用 doCheckpoint 函数向下游发送检查点消息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void run() {
LOG.info("Source stream task thread start.");
Long barrierId;
try {
while (running) {
isInitialState = false;

// check checkpoint
barrierId = pendingBarrier.get();
if (barrierId != null) {
// Important: because cp maybe timeout, master will use the old checkpoint id again
if (pendingBarrier.compareAndSet(barrierId, null)) {
// source fetcher only have outputPoints
LOG.info("Start to do checkpoint {}, worker name is {}.",
barrierId, jobWorker.getWorkerContext().getWorkerName());

doCheckpoint(barrierId, null);

LOG.info("Finish to do checkpoint {}.", barrierId);
} else {
// pendingCheckpointId has modify, should not happen
LOG.warn("Pending checkpointId modify unexpected, expect={}, now={}.", barrierId,
pendingBarrier.get());
}
}

sourceProcessor.fetch();
}
} catch (Throwable e) {
if (e instanceof ChannelInterruptException ||
ExceptionUtils.getRootCause(e) instanceof ChannelInterruptException) {
LOG.info("queue has stopped.");
} else {
// occur error, need to rollback
LOG.error("Last success checkpointId={}, now occur error.", lastCheckpointId, e);
requestRollback(ExceptionUtils.getStackTrace(e));
}
}

LOG.info("Source stream task thread exit.");
}

SourceStreamTask 的 triggerCheckpoint 函数,用于向 pendingBarrier中插入检查点 ID,以供循环执行时检查检查点信息.

1
2
3
public boolean triggerCheckpoint(Long barrierId) {
return pendingBarrier.compareAndSet(null, barrierId);
}
2.2.6.2 InputStreamTask

ray/streaming/runtime/worker/tasks/InputStreamTask.java 主要实现了数据处理过程,后续的子类只是提供对不同处理类型的判断.在 run 函数中,会不断从 reader 中获取数据,并根据数据类型的不同,待用 processor 处理数据或进行检查点操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void run() {
try {
while (running) {
ChannelMessage item;

// reader.read() will change the consumer state once it got an item. This lock is to
// ensure worker can get correct isInitialState value in exactly-once-mode's rollback.
synchronized (jobWorker.initialStateChangeLock) {
item = reader.read(readTimeoutMillis);
if (item != null) {
isInitialState = false;
} else {
continue;
}
}

if (item instanceof DataMessage) {
DataMessage dataMessage = (DataMessage) item;
byte[] bytes = new byte[dataMessage.body().remaining() - 1];
byte typeId = dataMessage.body().get();
dataMessage.body().get(bytes);
Object obj;
if (typeId == Serializer.JAVA_TYPE_ID) {
obj = javaSerializer.deserialize(bytes);
} else {
obj = crossLangSerializer.deserialize(bytes);
}
processor.process(obj);
} else if (item instanceof BarrierMessage) {
final BarrierMessage queueBarrier = (BarrierMessage) item;
byte[] barrierData = new byte[queueBarrier.getData().remaining()];
queueBarrier.getData().get(barrierData);
RemoteCall.Barrier barrierPb = RemoteCall.Barrier.parseFrom(barrierData);
final long checkpointId = barrierPb.getId();
LOG.info("Start to do checkpoint {}, worker name is {}.", checkpointId,
jobWorker.getWorkerContext().getWorkerName());

final Map<String, OffsetInfo> inputPoints = queueBarrier.getInputOffsets();
doCheckpoint(checkpointId, inputPoints);
LOG.info("Do checkpoint {} success.", checkpointId);
}
}
} catch (Throwable throwable) {
if (throwable instanceof ChannelInterruptException ||
ExceptionUtils.getRootCause(throwable) instanceof ChannelInterruptException) {
LOG.info("queue has stopped.");
} else {
// error occurred, need to rollback
LOG.error("Last success checkpointId={}, now occur error.", lastCheckpointId, throwable);
requestRollback(ExceptionUtils.getStackTrace(throwable));
}
}
LOG.info("Input stream task thread exit.");
stopped = true;
}

2.2.7 StreamingRuntimeContext

ray/streaming/runtime/worker/context/StreamingRuntimeContext.java 是对 streaming-api 中 RuntimeContext 的实现,在创建时保存了计算任务的基本信息.传入的 ExecutionVertex 只是用来获取 taskID 和 taskIndex.

1
2
3
4
5
6
7
8
public StreamingRuntimeContext(
ExecutionVertex executionVertex, Map<String, String> config,
int parallelism) {
this.taskId = executionVertex.getExecutionVertexId();
this.config = config;
this.taskIndex = executionVertex.getExecutionVertexIndex();
this.parallelism = parallelism;
}

StreamingRuntimeContext 的其他函数都是用来提供成员变量的读写,其中需要关注的是 KeyStateBackendOperatorStateBackend , 这两个位于 streaming-state 的状态后端类,目前只提供了设置入口,后续未间初始化和调用.

2.2.8 Processor

ray/streaming/runtime/core/processor/Processor.java 是所有流计算任务的具体处理位置,存在多个特化的子类,用于实现对不同计算的处理,其类继承关系如下.

1
2
3
4
5
6
Processor -->作为接口父类,主要提供函数接口.
└───>StreamProcessor -->对除 `process` 外的函数进行了实现,提供基本功能.在不同阶段调用Operator的不同函数.
├──>SourceProcessor -->对外提供数据源读取的 `fetch` 函数
├──>OneInputProcessor -->对外提供单输入处理,调用operator的 `processElement` 函数.
└──>TwoInputProcessor -->与单输入类似,但是需要首先判断输入数据所属上游流,然后再调用operator的 `processElement` 函数.

Processor 主要存在五个接口,用于计算任务的不同阶段.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void open(List<Collector> collectors, RuntimeContext runtimeContext);

void process(T t);

/**
* See {@link Function#saveCheckpoint()}.
*/
Serializable saveCheckpoint();

/**
* See {@link Function#loadCheckpoint(Serializable)}.
*/
void loadCheckpoint(Serializable checkpointObject);

void close();

2.2.9 ContextBackend

ray/streaming/runtime/context/ContextBackend.java 是 streaming-runtime 中对检查点后端的简易实现,对外提供了四个接口.随后实现了基于 HashMap 的 MemoryContextBackend 和基于本地文件系统的 AtomicFsBackend ,文件系统路径从 config 中获取.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* check if key exists in state
*
* @return true if exists
*/
boolean exists(final String key) throws Exception;

/**
* get content by key
*
* @param key key
* @return the StateBackend
*/
byte[] get(final String key) throws Exception;

/**
* put content by key
*
* @param key key
* @param value content
*/
void put(final String key, final byte[] value) throws Exception;

/**
* remove content by key
*
* @param key key
*/
void remove(final String key) throws Exception;

2.2.10 BaseCoordinator

ray/streaming/runtime/master/coordinator/BaseCoordinator.javaCheckpointCoordinatorFailoverCoordinator 的父类,实现自 Runnable,将自身注册为独立线程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public BaseCoordinator(JobMaster jobMaster) {
this.jobMaster = jobMaster;
this.runtimeContext = jobMaster.getRuntimeContext();
this.graphManager = jobMaster.getGraphManager();
}

public void start() {
thread = new Thread(Ray.wrapRunnable(this),
this.getClass().getName() + "-" + System.currentTimeMillis());
thread.start();
}

public void stop() {
closed = true;

try {
if (thread != null) {
thread.join(30000);
}
} catch (InterruptedException e) {
LOG.error("Coordinator thread exit has exception.", e);
}
}
2.2.10.1 CheckpointCoordinator

CheckpointCoordinator 的 run 函数中,主要流程未持续检查 runtimeContext 中的检查点命令,并作出相关操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() {
while (!closed) {
try {
final BaseWorkerCmd command = runtimeContext.cpCmds.poll(1, TimeUnit.SECONDS);
if (command != null) {
if (command instanceof WorkerCommitReport) {
processCommitReport((WorkerCommitReport) command);
} else {
interruptCheckpoint();
}
}

if (!pendingCheckpointActors.isEmpty()) {
// if wait commit report timeout, this cp fail, and restart next cp
if (timeoutOnWaitCheckpoint()) {
LOG.warn("Waiting for checkpoint {} timeout, pending cp actors is {}.",
runtimeContext.lastCheckpointId,
graphManager.getExecutionGraph().getActorName(pendingCheckpointActors));

interruptCheckpoint();
}
} else {
maybeTriggerCheckpoint();
}
} catch (Throwable e) {
LOG.error("Checkpoint coordinator occur err.", e);
try {
interruptCheckpoint();
} catch (Throwable interruptE) {
LOG.error("Ignore interrupt checkpoint exception in catch block.");
}
}
}
LOG.warn("Checkpoint coordinator thread exit.");
}

run 函数中获取到的为 JobWorker 正常发回的检查点消息时,将触发 processCommitReport 函数.该函数的逻辑是不断从 WorkerCommitReport 获取检查点完成消息,并将完成的 Job Worker 移出检查点队列,当队列清空时,则代表一轮检查点完成,此时通过 clearExpiredCpStateAndQueueMsg 通知所有计算任务清理上一轮检查点信息,然后通知 JobMaster 将自身也进行检查点保存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processCommitReport(WorkerCommitReport commitReport) {
LOG.info("Start process commit report {}, from actor name={}.", commitReport,
graphManager.getExecutionGraph().getActorName(commitReport.fromActorId));

try {
Preconditions.checkArgument(
commitReport.commitCheckpointId == runtimeContext.lastCheckpointId,
"expect checkpointId %s, but got %s",
runtimeContext.lastCheckpointId, commitReport);

if (!pendingCheckpointActors.contains(commitReport.fromActorId)) {
LOG.warn("Invalid commit report, skipped.");
return;
}

pendingCheckpointActors.remove(commitReport.fromActorId);
LOG.info("Pending actors after this commit: {}.",
graphManager.getExecutionGraph().getActorName(pendingCheckpointActors));

// checkpoint finish
if (pendingCheckpointActors.isEmpty()) {
// actor finish
runtimeContext.checkpointIds.add(runtimeContext.lastCheckpointId);

if (clearExpiredCpStateAndQueueMsg()) {
// save master context
jobMaster.saveContext();

LOG.info("Finish checkpoint: {}.", runtimeContext.lastCheckpointId);
} else {
LOG.warn("Fail to do checkpoint: {}.", runtimeContext.lastCheckpointId);
}
}

LOG.info("Process commit report {} success.", commitReport);
} catch (Throwable e) {
LOG.warn("Process commit report has exception.", e);
}
}

clearExpiredCpStateAndQueueMsg 函数的业务逻辑是从执行图中获取所有 actor,然后通过逐个通知清理检查点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean clearExpiredCpStateAndQueueMsg() {
// queue msg must clear when first checkpoint finish
List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors();
if (1 == runtimeContext.checkpointIds.size()) {
Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0);
RemoteCallWorker.clearExpiredCheckpointParallel(allActor, 0L, msgExpiredCheckpointId);
}

if (runtimeContext.checkpointIds.size() > 1) {
Long stateExpiredCpId = runtimeContext.checkpointIds.remove(0);
Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0);
RemoteCallWorker
.clearExpiredCheckpointParallel(allActor, stateExpiredCpId, msgExpiredCheckpointId);
}
return true;
}

run 函数获取到检查点执行存在异常时,会触发 interruptCheckpoint 函数.该函数最终执行逻辑和 clearExpiredCpStateAndQueueMsg 函数相似,只是调用函数不同.最后会调用一次 maybeTriggerCheckpoint 判断是否要启动新的检查点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptCheckpoint() {
// notify checkpoint timeout is time-consuming while many workers crash or
// container failover.
if (interruptedCheckpointSet.contains(runtimeContext.lastCheckpointId)) {
LOG.warn("Skip interrupt duplicated checkpoint id : {}.", runtimeContext.lastCheckpointId);
return;
}
interruptedCheckpointSet.add(runtimeContext.lastCheckpointId);
LOG.warn("Interrupt checkpoint, checkpoint id : {}.", runtimeContext.lastCheckpointId);

List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors();
if (runtimeContext.lastCheckpointId > runtimeContext.getLastValidCheckpointId()) {
RemoteCallWorker
.notifyCheckpointTimeoutParallel(allActor, runtimeContext.lastCheckpointId);
}

if (!pendingCheckpointActors.isEmpty()) {
pendingCheckpointActors.clear();
}
maybeTriggerCheckpoint();
}

当检查点正常结束后或异常处理结束后,会触发 maybeTriggerCheckpoint 函数.该函数首先判断检查点间隔是否满足,然后调用 triggerCheckpoint 函数.

1
2
3
4
5
6
7
8
9
10
11
private void maybeTriggerCheckpoint() {
if (readyToTrigger()) {
triggerCheckpoint();
}
}

private boolean readyToTrigger() {
return (System.currentTimeMillis() - runtimeContext.lastCpTimestamp) >=
cpIntervalSecs * 1000;
}

triggerCheckpoint 函数是最终的检查点触发函数,会通过 RemoteCallWorker 调用 JobWorker 的 triggerCheckpoint 将检查点信息发送到所有数据源计算节点中.启动检查点任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void triggerCheckpoint() {
interruptedCheckpointSet.clear();
if (LOG.isInfoEnabled()) {
LOG.info("Start trigger checkpoint {}.", runtimeContext.lastCheckpointId + 1);
}

List<ActorId> allIds = graphManager.getExecutionGraph().getAllActorsId();
// do the checkpoint
pendingCheckpointActors.addAll(allIds);

// inc last checkpoint id
++runtimeContext.lastCheckpointId;

final List<ObjectRef> sourcesRet = new ArrayList<>();

graphManager.getExecutionGraph().getSourceActors().forEach(actor -> {
sourcesRet.add(RemoteCallWorker.triggerCheckpoint(
actor, runtimeContext.lastCheckpointId));
});

for (ObjectRef rayObject : sourcesRet) {
if (rayObject.get() instanceof RayException) {
LOG.warn("Trigger checkpoint has exception.", (RayException) rayObject.get());
throw (RayException) rayObject.get();
}
}
runtimeContext.lastCpTimestamp = System.currentTimeMillis();
LOG.info("Trigger checkpoint success.");
}
2.2.10.2 FailoverCoordinator

FailoverCoordinator 的 requestJobWorkerRollback 函数向 JobMaster 暴露,用于判断是否向上下文中添加错误恢复请求信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Boolean requestJobWorkerRollback(WorkerRollbackRequest request) {
LOG.info("Request job worker rollback {}.", request);
boolean ret;
if (!isDuplicateRequest(request)) {
ret = runtimeContext.foCmds.offer(request);
} else {
LOG.warn("Skip duplicated worker rollback request, {}.", request.toString());
return true;
}
jobMaster.saveContext();
if (!ret) {
LOG.warn("Request job worker rollback failed, because command queue is full.");
}
return ret;
}

FailoverCoordinator 的 run 函数同样不断查询检查点指令,当获取到 WorkerRollbackRequest 请求时,将调用 dealWithRollbackRequest 进行计算回滚.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
while (!closed) {
try {
final BaseWorkerCmd command;
// see rollback() for lock reason
synchronized (cmdLock) {
command = jobMaster.getRuntimeContext().foCmds.poll(1, TimeUnit.SECONDS);
}
if (null == command) {
continue;
}
if (command instanceof WorkerRollbackRequest) {
jobMaster.getRuntimeContext().unfinishedFoCmds.add(command);
dealWithRollbackRequest((WorkerRollbackRequest) command);
}
} catch (Throwable e) {
LOG.error("Fo coordinator occur err.", e);
}
}
LOG.warn("Fo coordinator thread exit.");
}

FailoverCoordinator 的 dealWithRollbackRequest 函数,首先会从回滚消息中获取回滚请求的计算节点,然后判断回滚是否有效,最后调用 interruptCheckpointAndRollback 函数执行回滚.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private void dealWithRollbackRequest(WorkerRollbackRequest rollbackRequest) {
LOG.info("Start deal with rollback request {}.", rollbackRequest);

ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest);

// Reset pid for new-rollback actor.
if (null != rollbackRequest.getPid() &&
!rollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) {
exeVertex.setPid(rollbackRequest.getPid());
}

if (isRollbacking.get(exeVertex)) {
LOG.info("Vertex {} is rollbacking, skip rollback again.", exeVertex);
return;
}

String hostname = "";
Optional<Container> container = ResourceUtil.getContainerById(
jobMaster.getResourceManager().getRegisteredContainers(),
exeVertex.getContainerId()
);
if (container.isPresent()) {
hostname = container.get().getHostname();
}

if (rollbackRequest.isForcedRollback) {
interruptCheckpointAndRollback(rollbackRequest);
} else {
asyncRemoteCaller.checkIfNeedRollbackAsync(exeVertex.getWorkerActor(), res -> {
if (!res) {
LOG.info("Vertex {} doesn't need to rollback, skip it.", exeVertex);
return;
}
interruptCheckpointAndRollback(rollbackRequest);
}, throwable -> {
LOG.error("Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead" +
", ignore this request, vertex={}.", exeVertex, throwable);
});
}

LOG.info("Deal with rollback request {} success.", rollbackRequest);
}

private Boolean isDuplicateRequest(WorkerRollbackRequest request) {
try {
Object[] foCmdsArray = runtimeContext.foCmds.toArray();
for (Object cmd : foCmdsArray) {
if (request.fromActorId.equals(((BaseWorkerCmd) cmd).fromActorId)) {
return true;
}
}
} catch (Exception e) {
LOG.warn("Check request is duplicated failed.", e);
}
return false;
}

private ExecutionVertex getExeVertexFromRequest(WorkerRollbackRequest rollbackRequest) {
ActorId actorId = rollbackRequest.fromActorId;
Optional<BaseActorHandle> rayActor = graphManager.getExecutionGraph().getActorById(actorId);
if (!rayActor.isPresent()) {
throw new RuntimeException("Can not find ray actor of ID " + actorId);
}
return getExecutionVertex(rollbackRequest.fromActorId);
}

private ExecutionVertex getExecutionVertex(BaseActorHandle actor) {
return graphManager.getExecutionGraph().getExecutionVertexByActorId(actor.getId());
}

private ExecutionVertex getExecutionVertex(ActorId actorId) {
return graphManager.getExecutionGraph().getExecutionVertexByActorId(actorId);
}

FailoverCoordinator 的 interruptCheckpointAndRollback 函数,通过 rollback 执行回滚,然后向上下文中添加中断检查点请求.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void interruptCheckpointAndRollback(WorkerRollbackRequest rollbackRequest) {
// assign a cascadingGroupId
if (rollbackRequest.cascadingGroupId == null) {
rollbackRequest.cascadingGroupId = currentCascadingGroupId++;
}
// get last valid checkpoint id then call worker rollback
rollback(jobMaster.getRuntimeContext().getLastValidCheckpointId(), rollbackRequest,
currentCascadingGroupId);
// we interrupt current checkpoint for 2 considerations:
// 1. current checkpoint might be timeout, because barrier might be lost after failover. so we
// interrupt current checkpoint to avoid waiting.
// 2. when we want to rollback vertex to n, job finished checkpoint n+1 and cleared state
// of checkpoint n.
jobMaster.getRuntimeContext().cpCmds.offer(new InterruptCheckpointRequest());
}

FailoverCoordinator 的 rollback 函数,在将请求回滚的 JobWorker 成功进行回滚后,会通知 JobMaster 将所有上游算子进行回滚,同时保存 JobMaster 状态.回滚将一直递归到数据源算子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
private void rollback(
long checkpointId, WorkerRollbackRequest rollbackRequest,
long cascadingGroupId) {
ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest);
LOG.info("Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}.",
exeVertex, checkpointId, cascadingGroupId);

isRollbacking.put(exeVertex, true);

asyncRemoteCaller.rollback(exeVertex.getWorkerActor(), checkpointId, result -> {
List<WorkerRollbackRequest> newRollbackRequests = new ArrayList<>();
switch (result.getResultEnum()) {
case SUCCESS:
ChannelRecoverInfo recoverInfo = result.getResultObj();
LOG.info("Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}.",
exeVertex, recoverInfo.getDataLostQueues(), result.getResultMsg(), cascadingGroupId);
// rollback upstream if vertex reports abnormal input queues
newRollbackRequests =
cascadeUpstreamActors(recoverInfo.getDataLostQueues(), exeVertex, cascadingGroupId);
break;
case SKIPPED:
LOG.info("Vertex skip rollback, result = {}, cascadingGroupId={}.", result,
cascadingGroupId);
break;
default:
LOG.error(
"Rollback vertex {} failed, result={}, cascadingGroupId={}," +
" rollback this worker again after {} ms.",
exeVertex, result, cascadingGroupId, ROLLBACK_RETRY_TIME_MS);
Thread.sleep(ROLLBACK_RETRY_TIME_MS);
LOG.info("Add rollback request for {} again, cascadingGroupId={}.", exeVertex,
cascadingGroupId);
newRollbackRequests.add(
new WorkerRollbackRequest(exeVertex, "", "Rollback failed, try again.", false)
);
break;
}

// lock to avoid executing new rollback requests added.
// consider such a case: A->B->C, C cascade B, and B cascade A
// if B is rollback before B's rollback request is saved, and then JobMaster crashed,
// then A will never be rollback.
synchronized (cmdLock) {
jobMaster.getRuntimeContext().foCmds.addAll(newRollbackRequests);
// this rollback request is finished, remove it.
jobMaster.getRuntimeContext().unfinishedFoCmds.remove(rollbackRequest);
jobMaster.saveContext();
}
isRollbacking.put(exeVertex, false);
}, throwable -> {
LOG.error("Exception when calling vertex to rollback, vertex={}.", exeVertex, throwable);
isRollbacking.put(exeVertex, false);
});

LOG.info("Finish rollback vertex {}, checkpoint id is {}.", exeVertex, checkpointId);
}

private List<WorkerRollbackRequest> cascadeUpstreamActors(
Set<String> dataLostQueues, ExecutionVertex fromVertex, long cascadingGroupId) {
List<WorkerRollbackRequest> cascadedRollbackRequest = new ArrayList<>();
// rollback upstream if vertex reports abnormal input queues
dataLostQueues.forEach(q -> {
BaseActorHandle upstreamActor =
graphManager.getExecutionGraph().getPeerActor(fromVertex.getWorkerActor(), q);
ExecutionVertex upstreamExeVertex = getExecutionVertex(upstreamActor);
// vertexes that has already cascaded by other vertex in the same level
// of graph should be ignored.
if (isRollbacking.get(upstreamExeVertex)) {
return;
}
LOG.info("Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}.",
upstreamExeVertex, fromVertex, cascadingGroupId);
String hostname = "";
Optional<Container> container = ResourceUtil.getContainerById(
jobMaster.getResourceManager().getRegisteredContainers(),
upstreamExeVertex.getContainerId()
);
if (container.isPresent()) {
hostname = container.get().getHostname();
}
// force upstream vertexes to rollback
WorkerRollbackRequest upstreamRequest = new WorkerRollbackRequest(
upstreamExeVertex, hostname, String.format("Cascading rollback from %s", fromVertex), true
);
upstreamRequest.cascadingGroupId = cascadingGroupId;
cascadedRollbackRequest.add(upstreamRequest);
});
return cascadedRollbackRequest;
}

三、总结

  • ray-streaming 提供对 python API 的支持,其后端运行方式一致,主要时提供了一个用于 python 连接 java JobMaster 的 _PYTHON_GATEWAY.若提供对新计算算子的支持,需要在 streaming/python/datastream.py , streaming/python/operator.py , streaming/python/function.py 提供相应接口.
  • ray-streaming 目前仅支持简单计算,不支持双输入处理,单输入中也缺失关于时间的处理.
  • ray-streaming 目前的检查点只有一个简易的内存后端和一个本地文件后端.状态后端 streaming-state 只保留了函数接口,并没有整合到计算系统中.
  • 从调用逻辑上来看,状态后端 streaming-state 应该在 StreamTask 中被创建.
  • 用于 Task 之间消息传递的 DataMessage 包含时间戳相关信息,用于数据处理的 Record 已经丢失时间信息,无法有效支持时间处理.