前言
Ray 源码分析系列笔记。
一、概述
本文针对 ray-streaming 的源码进行分析,研究启动过程中涉及的操作。ray-streaming 脱胎于 flink,同时源码也是由蚂蚁方面的团队贡献的,目前 ray 的流计算处于一个比较粗浅的状态,只支持单一输入的操作,并且没有提供对时间窗口的支持,同时检查点逻辑也是最基本的内存实现和本地文件系统实现。
ray-streaming 主要通过三个包进行业务支持,整体设计在 ray Java actor 之上,通过构建 jobmaster 进行流计算任务调度和容错恢复,通过 ray 原生提供的 java actor 和 python actor 对多语言进行支持。相较于 flink,ray-streaming 采用了不同的数据传输策略和检查点策略,在做检查点时,会将通过 C++构建的输入输出缓冲池偏移量缓存,可以简化计算恢复逻辑,省去中间数据的存储。
二、Java 源码分析
2.1 streaming-api
2.1.1 StreamingContext
ray/streaming/api/context/StreamingContext.java
是所有流计算任务的起点,包含了流计算的基本上下文信息.其中最重要的函数有两个一个是用于添加计算任务配置的 withConfig
方法,该函数传入 map 类型的配置信息,所有信息将在后续执行任务时被提交.另一个函数是 execute
,该函数是流计算最终提交执行的地方,会从所有的流计算 sink
进行反向遍历,通过 JobGraphBuilder
构建流计算任务执行图,随后通过 JobGraphOptimizer
对计算任务进行优化(目前的优化策略仅是从数据源开始遍历任务,对计算任务进行合并,目前存在问题,同一个数据源出现两个以上的 sink 时,任务会存在执行异常.).最终连接或创建 ray 集群,并通过 JobClient.submit
提交任务,JobClient 的函数接口在 streaming-api 中,但具体实现都保存在 streaming-runtime.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void execute (String jobName) { JobGraphBuilder jobGraphBuilder = new JobGraphBuilder (this .streamSinks, jobName); JobGraph originalJobGraph = jobGraphBuilder.build(); this .jobGraph = new JobGraphOptimizer (originalJobGraph).optimize(); jobGraph.printJobGraph(); LOG.info("JobGraph digraph\n{}" , jobGraph.generateDigraph()); if (!Ray.isInitialized()) { if (Config.MEMORY_CHANNEL.equalsIgnoreCase(jobConfig.get(Config.CHANNEL_TYPE))) { Preconditions.checkArgument(!jobGraph.isCrossLanguageGraph()); ClusterStarter.startCluster(false , true ); LOG.info("Created local cluster for job {}." , jobName); } else { ClusterStarter.startCluster(jobGraph.isCrossLanguageGraph(), false ); LOG.info("Created multi process cluster for job {}." , jobName); } Runtime.getRuntime().addShutdownHook(new Thread (StreamingContext.this ::stop)); } else { LOG.info("Reuse existing cluster." ); } ServiceLoader<JobClient> serviceLoader = ServiceLoader.load(JobClient.class); Iterator<JobClient> iterator = serviceLoader.iterator(); Preconditions.checkArgument(iterator.hasNext(), "No JobClient implementation has been provided." ); JobClient jobClient = iterator.next(); jobClient.submit(jobGraph, jobConfig); }
2.1.2 DataStream
从 StreamingContext
出发,首先要构建数据源算子,该算子通过 ray/streaming/api/stream/DataStreamSource.java
对外提供调用接口,用户需要将 StreamingContext
传入其中,用于记录算子递归过程.随后的所有计算任务都基于 ray/streaming/api/stream/DataStream.java
进行构建,通过递归调用的方式,逐次记录调用关系,所有的计算流操作都封装在 ray/streaming/api/stream
目录中.一条计算流的终点是 ray/streaming/api/stream/DataStreamSink.java
,sink 流会将整个计算任务链注册回 StreamingContext
中,用于构建任务,当一个计算流没有 sink 时,流计算上下文将不会保存该任务,也不会触发执行.
2.1.3 operator&function
所有的 DataStream 都是对 Operator 的封装,Operator 的实现路径在 ray/streaming/operator
中,而所有的 Operator 如果涉及到具体的计算操作,则由最终的 Function 负责,Function 的路径为 ray/streaming/function
.Function 分为对用户直接暴露和系统内部调用两种,分别保存在 ray/streaming/api/function/impl
和 ray/streaming/api/function/internal
中.
2.2 streaming-runtime
runtime 层面的类包逐级调用逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 JobClientImpl └──>JobMaster <───────────────────────────┐ ├──>JobScheduler │ │ └──>WorkerLifecycleController │ │ └──>RemoteCallWorker │ │ └──>JobWorker │<──┐ │ ├──>RemoteCallMaster ─┙<──┥ │ ├──>ContextBackend │ │ └──>StreamTask ────┴───────┐ │ └──>Processor │ │ ├──>Operator │ │ └──>StreamingRuntimeContext<┙ │ ├──>KeyStateBackend │ └──>OperatorStateBackend ├──>GraphManager ├──>ResourceManager ├──>ContextBackend ├──>CheckpointCoordinator └──>FailoverCoordinator
2.2.1 JobClientImpl
ray/streaming/runtime/client/JobClientImpl.java
是对 JobClient
的具体实现.其中的 submit
函数传入任务图和任务参数,根据任务参数创建 ray-streaming 实现的核心 JobMaster
,随后通过 ray 的远程调用方式,执行 JobMaster
中的 submitJob
函数,正式开始执行计算任务.
2.2.2 JobMaster
ray/streaming/runtime/master/JobMaster.java
在创建时,会根据任务参数创建 StreamingConfig
,并根据该配置初始化目前简易的存储后端 ContextBackend
以及 JobMasterRuntimeContext
,同时回从上下文中判断计算任务是否需要从检查点中恢复,进而执行相关 init 操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean submitJob (ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) { LOG.info("Begin submitting job using logical plan: {}." , jobGraph); this .jobMasterActor = jobMasterActor; graphManager = new GraphManagerImpl (runtimeContext); resourceManager = new ResourceManagerImpl (runtimeContext); ExecutionGraph executionGraph = graphManager.buildExecutionGraph(jobGraph); runtimeContext.setJobGraph(jobGraph); runtimeContext.setExecutionGraph(executionGraph); try { scheduler = new JobSchedulerImpl (this ); scheduler.scheduleJob(graphManager.getExecutionGraph()); } catch (Exception e) { LOG.error("Failed to submit job." , e); return false ; } return true ; }
JobMaster 的 submitJob
函数需要传入 JobMaster 本身的远程执行句柄和任务图,随后的执行过程分为以下几个阶段:
初始化管理服务:根据 JobMasterRuntimeContext
创建 GraphManager
和 ResourceManager
.
构建执行图:通过 GraphManager
将任务图转化为最终的任务执行图 ExecutionGraph
,并将计算任务注册到 JobMasterRuntimeContext
中.(执行图的主要过程是针对不同算子并行度信息构建对应的物理执行节点).
任务调度:通过 JobSchedulerImpl
的 scheduleJob
函数具体调度并执行流计算任务.
JobMaster 的 init
传入变量为一个布尔型,用于判断任务是否是从检查点中恢复.该函数主要用途为启动用于检查点的 CheckpointCoordinator
进程和用于错误恢复的 FailoverCoordinator
进程.然后保存当前计算状态到存储后端中.
1 2 3 4 5 6 7 checkpointCoordinator = new CheckpointCoordinator (this ); checkpointCoordinator.start(); failoverCoordinator = new FailoverCoordinator (this , isRecover); failoverCoordinator.start(); saveContext();
JobMaster 的 reportJobWorkerCommit
函数,被 JobWorker 调用,通过调用 CheckpointCoordinator
的相关函数,对检查点进行管理.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public byte [] reportJobWorkerCommit(byte [] reportBytes) { Boolean ret = false ; RemoteCall.BaseWorkerCmd reportPb; try { reportPb = RemoteCall.BaseWorkerCmd.parseFrom(reportBytes); ActorId actorId = ActorId.fromBytes(reportPb.getActorId().toByteArray()); long remoteCallCost = System.currentTimeMillis() - reportPb.getTimestamp(); LOG.info("Vertex {}, request job worker commit cost {}ms, actorId={}." , getExecutionVertex(actorId), remoteCallCost, actorId); RemoteCall.WorkerCommitReport commit = reportPb.getDetail().unpack(RemoteCall.WorkerCommitReport.class); WorkerCommitReport report = new WorkerCommitReport (actorId, commit.getCommitCheckpointId()); ret = checkpointCoordinator.reportJobWorkerCommit(report); } catch (InvalidProtocolBufferException e) { LOG.error("Parse job worker commit has exception." , e); } return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray(); }
JobMaster 的 requestJobWorkerRollback
函数,同样被 JobWorker 调用,当判定计算任务需要回滚时,将会调用 FailoverCoordinator
的相关函数,进行错误恢复.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public byte [] requestJobWorkerRollback(byte [] requestBytes) { Boolean ret = false ; RemoteCall.BaseWorkerCmd requestPb; try { requestPb = RemoteCall.BaseWorkerCmd.parseFrom(requestBytes); ActorId actorId = ActorId.fromBytes(requestPb.getActorId().toByteArray()); long remoteCallCost = System.currentTimeMillis() - requestPb.getTimestamp(); ExecutionGraph executionGraph = graphManager.getExecutionGraph(); Optional<BaseActorHandle> rayActor = executionGraph.getActorById(actorId); if (!rayActor.isPresent()) { LOG.warn("Skip this invalid rollback, actor id {} is not found." , actorId); return RemoteCall.BoolResult.newBuilder().setBoolRes(false ).build().toByteArray(); } ExecutionVertex exeVertex = getExecutionVertex(actorId); LOG.info("Vertex {}, request job worker rollback cost {}ms, actorId={}." , exeVertex, remoteCallCost, actorId); RemoteCall.WorkerRollbackRequest rollbackPb = RemoteCall.WorkerRollbackRequest.parseFrom(requestPb.getDetail().getValue()); exeVertex.setPid(rollbackPb.getWorkerPid()); String hostname = "" ; Optional<Container> container = ResourceUtil.getContainerById( resourceManager.getRegisteredContainers(), exeVertex.getContainerId() ); if (container.isPresent()) { hostname = container.get().getHostname(); } WorkerRollbackRequest request = new WorkerRollbackRequest ( actorId, rollbackPb.getExceptionMsg(), hostname, exeVertex.getPid() ); ret = failoverCoordinator.requestJobWorkerRollback(request); LOG.info("Vertex {} request rollback, exception msg : {}." , exeVertex, rollbackPb.getExceptionMsg()); } catch (Throwable e) { LOG.error("Parse job worker rollback has exception." , e); } return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray(); }
2.2.3 JobScheduler
ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java
在创建过程中需要传入 JobMaster
,随后从中获取管理服务和计算任务配置,并创建用于具体算子执行管理的 WorkerLifecycleController
.
JobScheduler 的 scheduleJob
函数,对传入的物理执行图进行处理,启动计算任务,主要分为以下几个阶段:
准备阶段:通过 prepareResourceAndCreateWorker
准备资源并创建相应的 JobWorker
,随后通过 generateActorMappings
生成计算节点间的映射关系,用于数据分发.
启动阶段: initAndStart
启动流计算任务.
JobScheduler 的 prepareResourceAndCreateWorker
函数,对传入的物理执行图进行处理,主要分为以下几个阶段:
准备计算资源:通过 resourceManager.assignResource
向 ray 集群申请资源.
创建计算节点: 通过 createWorkers
创建所有计算节点.最终通过 WorkerLifecycleController
的 createWorkers
函数创建计算节点.
JobScheduler 的 initAndStart
函数,对传入的物理执行图进行处理,主要分为以下几个阶段:
构建计算上下文:通过物理执行图和 JobMaster
句柄,构建每一个 JobWorker 的上下文信息 JobWorkerContext
.
初始化计算节点: 通过 initWorkers
初始化所有计算节点.最终是调用了 WorkerLifecycleController
的 initWorkers
函数.
初始化主节点:通过 initMaster
初始化主节点.最终有调用回了 JobMaster
的 init
函数.
启动计算节点:通过 initWorkers
启动所有计算任务.最终是调用了 WorkerLifecycleController
的 startWorkers
函数.
2.2.4 WorkerLifecycleController
ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java
用于所有计算节点的生命周期管理,提供创建,初始化,启动,销毁等操作.
WorkerLifecycleController 的 asyncBatchExecute
是统一封装的异步执行函数,通过 java 自带的 CompletableFuture
类的 supplyAsync
函数,异步执行计算任务.同时会校验计算任务的执行结果.
WorkerLifecycleController 的 createWorkers
通过 asyncBatchExecute
方式创建 JobMaster.异步执行的函数为 createWorker
.在这个函数中,会逐个遍历物理执行图的节点,然后根据计算任务类型创建基于 java 或是基于 python 的 JobWorker,随后再将创建好的 JobWorker 注册回执行图中,用于后续初始化和启动操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private boolean createWorker (ExecutionVertex executionVertex) { LOG.info("Start to create worker actor for vertex: {} with resource: {}, workeConfig: {}." , executionVertex.getExecutionVertexName(), executionVertex.getResource(), executionVertex.getWorkerConfig()); Language language = executionVertex.getLanguage(); BaseActorHandle actor; if (Language.JAVA == language) { actor = Ray.actor(JobWorker::new , executionVertex) .setResources(executionVertex.getResource()) .setMaxRestarts(-1 ) .remote(); } else { RemoteCall.ExecutionVertexContext.ExecutionVertex vertexPb = new GraphPbBuilder ().buildVertex(executionVertex); actor = Ray.actor( PyActorClass.of("ray.streaming.runtime.worker" , "JobWorker" ), vertexPb.toByteArray()) .setResources(executionVertex.getResource()) .setMaxRestarts(-1 ) .remote(); } if (null == actor) { LOG.error("Create worker actor failed." ); return false ; } executionVertex.setWorkerActor(actor); LOG.info("Worker actor created, actor: {}, vertex: {}." , executionVertex.getWorkerActorId(), executionVertex.getExecutionVertexName()); return true ; }
WorkerLifecycleController 的 initWorkers
负责初始化 JobWorker,通过 RemoteCallWorker
的 initWorker
函数逐个传递 JobWorkerContext
并初始化计算任务.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public boolean initWorkers ( Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) { LOG.info("Begin initiating workers: {}." , vertexToContextMap); long startTime = System.currentTimeMillis(); Map<ObjectRef<Boolean>, ActorId> rayObjects = new HashMap <>(); vertexToContextMap.entrySet().forEach((entry -> { ExecutionVertex vertex = entry.getKey(); rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), entry.getValue()), vertex.getWorkerActorId()); })); List<ObjectRef<Boolean>> objectRefList = new ArrayList <>(rayObjects.keySet()); LOG.info("Waiting for workers' initialization." ); WaitResult<Boolean> result = Ray.wait(objectRefList, objectRefList.size(), timeout); if (result.getReady().size() != objectRefList.size()) { LOG.error("Initializing workers timeout[{} ms]." , timeout); return false ; } LOG.info("Finished waiting workers' initialization." ); LOG.info("Workers initialized. Cost {} ms." , System.currentTimeMillis() - startTime); return true ; }
WorkerLifecycleController 的 startWorkers
负责通知 JobWorker 启动计算任务,通过 RemoteCallWorker
的 rollback
函数逐个通知对应 JobWorker
通过 rollback
启动 StreamTask
.这里为了保证计算逻辑,会先启动所有数据源,然后再逐个启动其他计算任务.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public boolean startWorkers (ExecutionGraph executionGraph, long lastCheckpointId, int timeout) { LOG.info("Begin starting workers." ); long startTime = System.currentTimeMillis(); List<ObjectRef<Object>> objectRefs = new ArrayList <>(); executionGraph.getSourceActors() .forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId))); executionGraph.getNonSourceActors() .forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId))); WaitResult<Object> result = Ray.wait(objectRefs, objectRefs.size(), timeout); if (result.getReady().size() != objectRefs.size()) { LOG.error("Starting workers timeout[{} ms]." , timeout); return false ; } LOG.info("Workers started. Cost {} ms." , System.currentTimeMillis() - startTime); return true ; }
2.2.4 RemoteCallWorker&RemoteCallMaster
ray/streaming/runtime/rpc/RemoteCallWorker.java
负责处理所用 JobWorker 的远程调用逻辑.
RemoteCallWorker 的 initWorker
函数最终逐个传递 JobWorkerContext
并调用对应 JobMaster 的 init
函数.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static ObjectRef<Boolean> initWorker (BaseActorHandle actor, JobWorkerContext context) { LOG.info("Call worker to initiate, actor: {}, context: {}." , actor.getId(), context); ObjectRef<Boolean> result; if (actor instanceof PyActorHandle) { result = ((PyActorHandle) actor).task(PyActorMethod.of("init" , Boolean.class), context.getPythonWorkerContextBytes()).remote(); } else { result = ((ActorHandle<JobWorker>) actor).task(JobWorker::init, context).remote(); } LOG.info("Finished calling worker to initiate." ); return result; }
RemoteCallWorker 的 rollback
函数通过 JobWorker 的 rollback
函数启动计算,该函数同时支持从检查点中启动和直接启动.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static ObjectRef rollback (BaseActorHandle actor, final Long checkpointId) { LOG.info("Call worker to start, actor: {}." , actor.getId()); ObjectRef result; if (actor instanceof PyActorHandle) { RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder() .setCheckpointId(checkpointId) .build(); result = ((PyActorHandle) actor) .task(PyActorMethod.of("rollback" ), checkpointIdPb.toByteArray() ).remote(); } else { result = ((ActorHandle<JobWorker>) actor) .task(JobWorker::rollback, checkpointId, System.currentTimeMillis()).remote(); } LOG.info("Finished calling worker to start." ); return result; }
ray/streaming/runtime/rpc/RemoteCallMaster.java
负责处理所用 JobMaster 的远程调用逻辑.主要用来同步检查点逻辑,包含 reportJobWorkerCommitAsync
和 requestJobWorkerRollback
两个函数.
RemoteCallMaster 的 reportJobWorkerCommitAsync
函数被 StreamTask 调用,用于通知检查点执行完毕.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static ObjectRef<byte []> reportJobWorkerCommitAsync( ActorHandle<JobMaster> actor, WorkerCommitReport commitReport) { RemoteCall.WorkerCommitReport commit = RemoteCall.WorkerCommitReport.newBuilder() .setCommitCheckpointId(commitReport.commitCheckpointId) .build(); Any detail = Any.pack(commit); RemoteCall.BaseWorkerCmd cmd = RemoteCall.BaseWorkerCmd.newBuilder() .setActorId(ByteString.copyFrom(commitReport.fromActorId.getBytes())) .setTimestamp(System.currentTimeMillis()) .setDetail(detail).build(); return actor.task(JobMaster::reportJobWorkerCommit, cmd.toByteArray()).remote(); }
RemoteCallMaster 的 requestJobWorkerRollback
函数被 JobWorker 调用,用于在检测到异常时,通知 JobMaster 进行全局的业务回滚.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static Boolean requestJobWorkerRollback ( ActorHandle<JobMaster> actor, WorkerRollbackRequest rollbackRequest) { RemoteCall.WorkerRollbackRequest request = RemoteCall.WorkerRollbackRequest.newBuilder() .setExceptionMsg(rollbackRequest.getRollbackExceptionMsg()) .setWorkerHostname(rollbackRequest.getHostname()) .setWorkerPid(rollbackRequest.getPid()).build(); Any detail = Any.pack(request); RemoteCall.BaseWorkerCmd cmd = RemoteCall.BaseWorkerCmd.newBuilder() .setActorId(ByteString.copyFrom(rollbackRequest.fromActorId.getBytes())) .setTimestamp(System.currentTimeMillis()) .setDetail(detail).build(); ObjectRef<byte []> ret = actor.task( JobMaster::requestJobWorkerRollback, cmd.toByteArray()).remote(); byte [] res = ret.get(); return PbResultParser.parseBoolResult(res); }
2.2.5 JobWorker
ray/streaming/runtime/worker/JobWorker.java
是注册到 ray 集群中的最小单位,JobWorker 内部包含的 StreamTask
是流计算任务的最小单位.
JobWorker 在创建阶段,会根据物理执行节点信息创建 JobWorkerContext
并构建属于 JobWorker 的存储后端 ContextBackend
(该后端和 Job Master 中使用的后端类型一致,但存储各自独立).随后判断计算节点上下文中是否含有检查点信息,若存在,在从后端中获取信息后,会直接调用 init
从恢复的上下文中初始化节点,然后请求启动计算任务 requestRollback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public JobWorker (ExecutionVertex executionVertex) { LOG.info("Creating job worker." ); this .executionVertex = executionVertex; this .workerConfig = new StreamingWorkerConfig (executionVertex.getWorkerConfig()); this .contextBackend = ContextBackendFactory.getContextBackend(this .workerConfig); LOG.info("Ray.getRuntimeContext().wasCurrentActorRestarted()={}" , Ray.getRuntimeContext().wasCurrentActorRestarted()); if (!Ray.getRuntimeContext().wasCurrentActorRestarted()) { saveContext(); LOG.info("Job worker is fresh started, init success." ); return ; } LOG.info("Begin load job worker checkpoint state." ); byte [] bytes = CheckpointStateUtil.get(contextBackend, getJobWorkerContextKey()); if (bytes != null ) { JobWorkerContext context = Serializer.decode(bytes); LOG.info("Worker recover from checkpoint state, byte len={}, context={}." , bytes.length, context); init(context); requestRollback("LoadCheckpoint request rollback in new actor." ); } else { LOG.error( "Worker is reconstructed, but can't load checkpoint. " + "Check whether you checkpoint state is reliable. Current checkpoint state is {}." , contextBackend.getClass().getName()); } }
JobWorker 的 init
函数,通过上下文信息,再次构建节点相关信息(这里的步骤在 JobWorker 创建时执行过,但具体依赖的上下文信息存在异同),并进行保存.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public Boolean init (JobWorkerContext workerContext) { LOG.info("Initiating job worker: {}. Worker context is: {}, pid={}." , workerContext.getWorkerName(), workerContext, EnvUtil.getJvmPid()); this .workerContext = workerContext; this .executionVertex = workerContext.getExecutionVertex(); this .workerConfig = new StreamingWorkerConfig (executionVertex.getWorkerConfig()); this .contextBackend = ContextBackendFactory.getContextBackend(this .workerConfig); LOG.info("Initiating job worker succeeded: {}." , workerContext.getWorkerName()); saveContext(); return true ; } public synchronized void saveContext () { byte [] contextBytes = Serializer.encode(workerContext); String key = getJobWorkerContextKey(); LOG.info("Saving context, worker context={}, serialized byte length={}, key={}." , workerContext, contextBytes.length, key); CheckpointStateUtil.put(contextBackend, key, contextBytes); }
JobWorker 的 rollback
函数,负责实际创建用于直接计算任务的 StreamTask,并负责对相应计算任务的生命周期管理和检查点管理. rollback
函数通过判断传入的检查点 ID,但任务已经创建切检查点 ID 和本地相同时,将自动跳过剩余步骤.否则将正常创建数据传输信道,并通过调用 createStreamTask
函数创建计算任务,随后通过 StreamTask 的 recover
函数启动计算.目前的流计算任务仅支持数据源操作和单输入操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public CallResult<ChannelRecoverInfo> rollback (Long checkpointId, Long startRollbackTs) { synchronized (initialStateChangeLock) { if (task != null && task.isAlive() && checkpointId == task.lastCheckpointId && task.isInitialState) { return CallResult.skipped("Task is already in initial state, skip this rollback." ); } } long remoteCallCost = System.currentTimeMillis() - startRollbackTs; LOG.info("Start rollback[{}], checkpoint is {}, remote call cost {}ms." , executionVertex.getExecutionJobVertexName(), checkpointId, remoteCallCost); rollbackCount++; if (rollbackCount > 1 ) { isRecreate.set(true ); } try { TransferChannelType channelType = workerConfig.transferConfig.channelType(); if (TransferChannelType.NATIVE_CHANNEL == channelType) { transferHandler = new TransferHandler (); } if (task != null ) { task.close(); task = null ; } task = createStreamTask(checkpointId); ChannelRecoverInfo channelRecoverInfo = task.recover(isRecreate.get()); isNeedRollback = false ; LOG.info("Rollback job worker success, checkpoint is {}, channelRecoverInfo is {}." , checkpointId, channelRecoverInfo); return CallResult.success(channelRecoverInfo); } catch (Exception e) { LOG.error("Rollback job worker has exception." , e); return CallResult.fail(ExceptionUtils.getStackTrace(e)); } } private StreamTask createStreamTask (long checkpointId) { StreamTask task; StreamProcessor streamProcessor = ProcessBuilder .buildProcessor(executionVertex.getStreamOperator()); LOG.debug("Stream processor created: {}." , streamProcessor); if (streamProcessor instanceof SourceProcessor) { task = new SourceStreamTask (streamProcessor, this , checkpointId); } else if (streamProcessor instanceof OneInputProcessor) { task = new OneInputStreamTask (streamProcessor, this , checkpointId); } else { throw new RuntimeException ("Unsupported processor type:" + streamProcessor); } LOG.info("Stream task created: {}." , task); return task; }
JobWorker 的检查点相关函数总共有三个,其中 triggerCheckpoint
用于触发检查点, notifyCheckpointTimeout
用于通知检查点执行失败, clearExpiredCheckpoint
用于清理过期检查点.所有函数最终调用了 StreamTask 中对应的相关方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public Boolean triggerCheckpoint (Long barrierId) { LOG.info("Receive trigger, barrierId is {}." , barrierId); if (task != null ) { return task.triggerCheckpoint(barrierId); } return false ; } public Boolean notifyCheckpointTimeout (Long checkpointId) { LOG.info("Notify checkpoint timeout, checkpoint id is {}." , checkpointId); if (task != null ) { task.notifyCheckpointTimeout(checkpointId); } return true ; } public Boolean clearExpiredCheckpoint (Long expiredStateCpId, Long expiredQueueCpId) { LOG.info("Clear expired checkpoint state, checkpoint id is {}; " + "Clear expired queue msg, checkpoint id is {}" , expiredStateCpId, expiredQueueCpId); if (task != null ) { if (expiredStateCpId > 0 ) { task.clearExpiredCpState(expiredStateCpId); } task.clearExpiredQueueMsg(expiredQueueCpId); } return true ; }
JobWorker 的容错相关函数总共有两个,其中 requestRollback
负责通知 JobMaster 计算任务需要回滚, checkIfNeedRollback
则是单纯用于容错恢复的判断.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void requestRollback (String exceptionMsg) { LOG.info("Request rollback." ); isNeedRollback = true ; isRecreate.set(true ); boolean requestRet = RemoteCallMaster.requestJobWorkerRollback( workerContext.getMaster(), new WorkerRollbackRequest ( workerContext.getWorkerActorId(), exceptionMsg, EnvUtil.getHostName(), EnvUtil.getJvmPid() )); if (!requestRet) { LOG.warn("Job worker request rollback failed! exceptionMsg={}." , exceptionMsg); } } public Boolean checkIfNeedRollback (Long startCallTs) { long remoteCallCost = System.currentTimeMillis() - startCallTs; LOG.info("Finished checking if need to rollback with result: {}, rpc delay={}ms." , isNeedRollback, remoteCallCost); return isNeedRollback; }
2.2.6 StreamTask
ray/streaming/runtime/worker/tasks/StreamTask.java
是所有流计算任务的基类继承自 Runnable
,在创建时会将自身注册为独立线程来做到流数据的持续处理.StreamTask 存在多个特化的子类,用于实现对不同计算任务的处理,其类继承关系如下.主要分为数据源任务和输入任务两类,输入任务又具体分为单输入和双输入,对应操作符的不同实现.所有关于计算算子本身的操作都在 StreamTask 中进行实现,关于数据处理,则下推到具体的 StreamTask 的 run
函数中实现.当 run
函数中捕获异常时,会触发回滚.
1 2 3 4 5 StreamTask ├──>InputStreamTask │ ├──>OneInputStreamTask │ └──>TwoInputStreamTask └──>SourceStreamTask
1 2 3 4 5 6 7 8 9 protected StreamTask (Processor processor, JobWorker jobWorker, long lastCheckpointId) { this .processor = processor; this .jobWorker = jobWorker; this .checkpointState = jobWorker.contextBackend; this .lastCheckpointId = lastCheckpointId; this .thread = new Thread (Ray.wrapRunnable(this ), this .getClass().getName() + "-" + System.currentTimeMillis()); this .thread.setDaemon(true ); }
StreamTask 的 recover
函数负责启动计算任务,首先会调用 prepareTask
函数确定计算任务初始化信息,然后启动线程即启动计算任务.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public ChannelRecoverInfo recover (boolean isRecover) { if (isRecover) { LOG.info("Stream task begin recover." ); } else { LOG.info("Stream task first start begin." ); } prepareTask(isRecover); ChannelRecoverInfo recoverInfo = new ChannelRecoverInfo (new HashMap <>()); if (reader != null ) { recoverInfo = reader.getQueueRecoverInfo(); } thread.setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in runner thread." , e)); LOG.info("Start stream task: {}." , this .getClass().getSimpleName()); thread.start(); if (isRecover) { LOG.info("Stream task recover end." ); } else { LOG.info("Stream task first start finished." ); } return recoverInfo; }
StreamTask 的 prepareTask
函数负责准备计算任务所需资源,具体分为以下几步:
通过传入的参数判断计算任务是否为初次创建,并从存储后端中获取相应数据
(option)若为检查点恢复且检查点中存在数据,则调用 Processor 的相关 loadCheckpoint
函数,该函数最中由用户自行实现.
创建数据读写入口 DataWriter
和 DataReader
.
调用 openProcessor
传递数据,执行计算任务的 open 阶段.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 private void prepareTask (boolean isRecreate) { LOG.info("Preparing stream task, isRecreate={}." , isRecreate); ExecutionVertex executionVertex = jobWorker.getExecutionVertex(); jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.WORKER_NAME_INTERNAL, executionVertex.getExecutionVertexName()); jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.OP_NAME_INTERNAL, executionVertex.getExecutionJobVertexName()); OperatorCheckpointInfo operatorCheckpointInfo = new OperatorCheckpointInfo (); byte [] bytes = null ; if (isRecreate) { String cpKey = genOpCheckpointKey(lastCheckpointId); LOG.info("Getting task checkpoints from state, cpKey={}, checkpointId={}." , cpKey, lastCheckpointId); bytes = CheckpointStateUtil.get(checkpointState, cpKey); if (bytes == null ) { String msg = String.format("Task recover failed, checkpoint is null! cpKey=%s" , cpKey); throw new RuntimeException (msg); } } if (bytes != null ) { operatorCheckpointInfo = Serializer.decode(bytes); processor.loadCheckpoint(operatorCheckpointInfo.processorCheckpoint); LOG.info("Stream task recover from checkpoint state, checkpoint bytes len={}, checkpointInfo={}." , bytes.length, operatorCheckpointInfo); } if (!executionVertex.getOutputEdges().isEmpty()) { LOG.info("Register queue writer, channels={}, outputCheckpoints={}." , executionVertex.getOutputChannelIdList(), operatorCheckpointInfo.outputPoints); writer = new DataWriter (executionVertex.getOutputChannelIdList(), executionVertex.getOutputActorList(), operatorCheckpointInfo.outputPoints, jobWorker.getWorkerConfig()); } if (!executionVertex.getInputEdges().isEmpty()) { LOG.info("Register queue reader, channels={}, inputCheckpoints={}." , executionVertex.getInputChannelIdList(), operatorCheckpointInfo.inputPoints); reader = new DataReader (executionVertex.getInputChannelIdList(), executionVertex.getInputActorList(), operatorCheckpointInfo.inputPoints, jobWorker.getWorkerConfig()); } openProcessor(); LOG.debug("Finished preparing stream task." ); }
StreamTask 的 openProcessor
函数负责激活计算任务的 open 阶段,主要任务为创建计算任务所需的 StreamingRuntimeContext
然后调用处理器的 open 函数传递上下文信息.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void openProcessor () { ExecutionVertex executionVertex = jobWorker.getExecutionVertex(); List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges(); Map<String, List<String>> opGroupedChannelId = new HashMap <>(); Map<String, List<BaseActorHandle>> opGroupedActor = new HashMap <>(); Map<String, Partition> opPartitionMap = new HashMap <>(); for (int i = 0 ; i < outputEdges.size(); ++i) { ExecutionEdge edge = outputEdges.get(i); String opName = edge.getTargetExecutionJobVertexName(); if (!opPartitionMap.containsKey(opName)) { opGroupedChannelId.put(opName, new ArrayList <>()); opGroupedActor.put(opName, new ArrayList <>()); } opGroupedChannelId.get(opName).add(executionVertex.getOutputChannelIdList().get(i)); opGroupedActor.get(opName).add(executionVertex.getOutputActorList().get(i)); opPartitionMap.put(opName, edge.getPartition()); } opPartitionMap.keySet().forEach(opName -> { collectors.add(new OutputCollector (writer, opGroupedChannelId.get(opName), opGroupedActor.get(opName), opPartitionMap.get(opName))); }); RuntimeContext runtimeContext = new StreamingRuntimeContext (executionVertex, jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism()); processor.open(collectors, runtimeContext); }
StreamTask 的检查点相关操作主要有以下几个:
用于触发检查点的 triggerCheckpoint
函数,该函数只允许在数据源中调用.
用于执行检查点的 doCheckpoint
函数,该函数会首先存储消息服务的相关数据偏移量(所有计算任务都会保存并分发 DataWriter
偏移量,同时上游传入的偏移量作为 DataReader
的偏移量进行保存),然后继续将包含偏移量的检查点信息进行广播,调用 processor
的 saveCheckpoint
函数执行用户的检查点方法,最后调用 saveCpStateAndReport
函数保存检查点信息并汇报结果.
saveCpStateAndReport
函数会依次调用 saveCp
和 reportCommit
函数.
saveCp
是最终执行检查点保存的函数,调用工具类 CheckpointStateUtil
将检查点数据存储到后端中.
reportCommit
函数负责通过 RemoteCallMaster
向 JobMaster 汇报检查点任务完成.
clearExpiredCpState
, clearExpiredQueueMsg
, 用于清理本地过期检查点数据.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 public boolean triggerCheckpoint (Long barrierId) { throw new UnsupportedOperationException ("Only source operator supports trigger checkpoints." ); } public void doCheckpoint (long checkpointId, Map<String, OffsetInfo> inputPoints) { Map<String, OffsetInfo> outputPoints = null ; if (writer != null ) { outputPoints = writer.getOutputCheckpoints(); RemoteCall.Barrier barrierPb = RemoteCall.Barrier.newBuilder().setId(checkpointId).build(); ByteBuffer byteBuffer = ByteBuffer.wrap(barrierPb.toByteArray()); byteBuffer.order(ByteOrder.nativeOrder()); writer.broadcastBarrier(checkpointId, byteBuffer); } LOG.info("Start do checkpoint, cp id={}, inputPoints={}, outputPoints={}." , checkpointId, inputPoints, outputPoints); this .lastCheckpointId = checkpointId; Serializable processorCheckpoint = processor.saveCheckpoint(); try { OperatorCheckpointInfo opCpInfo = new OperatorCheckpointInfo (inputPoints, outputPoints, processorCheckpoint, checkpointId); saveCpStateAndReport(opCpInfo, checkpointId); } catch (Exception e) { LOG.error("Processor or op checkpoint exception." , e); } LOG.info("Operator do checkpoint {} finish." , checkpointId); } private void saveCpStateAndReport (OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) { saveCp(operatorCheckpointInfo, checkpointId); reportCommit(checkpointId); LOG.info("Finish save cp state and report, checkpoint id is {}." , checkpointId); } private void saveCp (OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) { byte [] bytes = Serializer.encode(operatorCheckpointInfo); String cpKey = genOpCheckpointKey(checkpointId); LOG.info("Saving task checkpoint, cpKey={}, byte len={}, checkpointInfo={}." , cpKey, bytes.length, operatorCheckpointInfo); synchronized (checkpointState) { if (outdatedCheckpoints.contains(checkpointId)) { LOG.info("Outdated checkpoint, skip save checkpoint." ); outdatedCheckpoints.remove(checkpointId); } else { CheckpointStateUtil.put(checkpointState, cpKey, bytes); } } } private void reportCommit (long checkpointId) { final JobWorkerContext context = jobWorker.getWorkerContext(); LOG.info("Report commit async, checkpoint id {}." , checkpointId); RemoteCallMaster.reportJobWorkerCommitAsync(context.getMaster(), new WorkerCommitReport (context.getWorkerActorId(), checkpointId)); } public void notifyCheckpointTimeout (long checkpointId) { String cpKey = genOpCheckpointKey(checkpointId); try { synchronized (checkpointState) { if (checkpointState.exists(cpKey)) { checkpointState.remove(cpKey); } else { outdatedCheckpoints.add(checkpointId); } } } catch (Exception e) { LOG.error("Notify checkpoint timeout failed, checkpointId is {}." , checkpointId, e); } } public void clearExpiredCpState (long checkpointId) { String cpKey = genOpCheckpointKey(checkpointId); try { checkpointState.remove(cpKey); } catch (Exception e) { LOG.error("Failed to remove key {} from state backend." , cpKey, e); } } public void clearExpiredQueueMsg (long checkpointId) { String cpKey = genOpCheckpointKey(checkpointId); byte [] bytes; try { bytes = checkpointState.get(cpKey); } catch (Exception e) { LOG.error("Failed to get key {} from state backend." , cpKey, e); return ; } if (bytes != null ) { final OperatorCheckpointInfo operatorCheckpointInfo = Serializer.decode(bytes); long cpId = operatorCheckpointInfo.checkpointId; if (writer != null ) { writer.clearCheckpoint(cpId); } } }
StreamTask 的 requestRollback
函数请求业务回滚.
1 2 3 protected void requestRollback (String exceptionMsg) { jobWorker.requestRollback(exceptionMsg); }
2.2.6.1 SourceStreamTask
ray/streaming/runtime/worker/tasks/SourceStreamTask.java
是对数据源的专门实现,重点构建了数据读取的 run
函数和触发检查点的 triggerCheckpoint
函数.
SourceStreamTask 的 run
函数会循环执行检查点判定和数据读取,正常 process 的数据处理函数为 process
,由于数据源不需要数据输入,所以单独定义了 fetch
函数.当获取到 barrierId
是,会调用 doCheckpoint
函数向下游发送检查点消息.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public void run () { LOG.info("Source stream task thread start." ); Long barrierId; try { while (running) { isInitialState = false ; barrierId = pendingBarrier.get(); if (barrierId != null ) { if (pendingBarrier.compareAndSet(barrierId, null )) { LOG.info("Start to do checkpoint {}, worker name is {}." , barrierId, jobWorker.getWorkerContext().getWorkerName()); doCheckpoint(barrierId, null ); LOG.info("Finish to do checkpoint {}." , barrierId); } else { LOG.warn("Pending checkpointId modify unexpected, expect={}, now={}." , barrierId, pendingBarrier.get()); } } sourceProcessor.fetch(); } } catch (Throwable e) { if (e instanceof ChannelInterruptException || ExceptionUtils.getRootCause(e) instanceof ChannelInterruptException) { LOG.info("queue has stopped." ); } else { LOG.error("Last success checkpointId={}, now occur error." , lastCheckpointId, e); requestRollback(ExceptionUtils.getStackTrace(e)); } } LOG.info("Source stream task thread exit." ); }
SourceStreamTask 的 triggerCheckpoint
函数,用于向 pendingBarrier
中插入检查点 ID,以供循环执行时检查检查点信息.
1 2 3 public boolean triggerCheckpoint (Long barrierId) { return pendingBarrier.compareAndSet(null , barrierId); }
ray/streaming/runtime/worker/tasks/InputStreamTask.java
主要实现了数据处理过程,后续的子类只是提供对不同处理类型的判断.在 run
函数中,会不断从 reader 中获取数据,并根据数据类型的不同,待用 processor 处理数据或进行检查点操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public void run () { try { while (running) { ChannelMessage item; synchronized (jobWorker.initialStateChangeLock) { item = reader.read(readTimeoutMillis); if (item != null ) { isInitialState = false ; } else { continue ; } } if (item instanceof DataMessage) { DataMessage dataMessage = (DataMessage) item; byte [] bytes = new byte [dataMessage.body().remaining() - 1 ]; byte typeId = dataMessage.body().get(); dataMessage.body().get(bytes); Object obj; if (typeId == Serializer.JAVA_TYPE_ID) { obj = javaSerializer.deserialize(bytes); } else { obj = crossLangSerializer.deserialize(bytes); } processor.process(obj); } else if (item instanceof BarrierMessage) { final BarrierMessage queueBarrier = (BarrierMessage) item; byte [] barrierData = new byte [queueBarrier.getData().remaining()]; queueBarrier.getData().get(barrierData); RemoteCall.Barrier barrierPb = RemoteCall.Barrier.parseFrom(barrierData); final long checkpointId = barrierPb.getId(); LOG.info("Start to do checkpoint {}, worker name is {}." , checkpointId, jobWorker.getWorkerContext().getWorkerName()); final Map<String, OffsetInfo> inputPoints = queueBarrier.getInputOffsets(); doCheckpoint(checkpointId, inputPoints); LOG.info("Do checkpoint {} success." , checkpointId); } } } catch (Throwable throwable) { if (throwable instanceof ChannelInterruptException || ExceptionUtils.getRootCause(throwable) instanceof ChannelInterruptException) { LOG.info("queue has stopped." ); } else { LOG.error("Last success checkpointId={}, now occur error." , lastCheckpointId, throwable); requestRollback(ExceptionUtils.getStackTrace(throwable)); } } LOG.info("Input stream task thread exit." ); stopped = true ; }
2.2.7 StreamingRuntimeContext
ray/streaming/runtime/worker/context/StreamingRuntimeContext.java
是对 streaming-api 中 RuntimeContext
的实现,在创建时保存了计算任务的基本信息.传入的 ExecutionVertex 只是用来获取 taskID 和 taskIndex.
1 2 3 4 5 6 7 8 public StreamingRuntimeContext ( ExecutionVertex executionVertex, Map<String, String> config, int parallelism) { this .taskId = executionVertex.getExecutionVertexId(); this .config = config; this .taskIndex = executionVertex.getExecutionVertexIndex(); this .parallelism = parallelism; }
StreamingRuntimeContext 的其他函数都是用来提供成员变量的读写,其中需要关注的是 KeyStateBackend
和 OperatorStateBackend
, 这两个位于 streaming-state 的状态后端类,目前只提供了设置入口,后续未间初始化和调用.
2.2.8 Processor
ray/streaming/runtime/core/processor/Processor.java
是所有流计算任务的具体处理位置,存在多个特化的子类,用于实现对不同计算的处理,其类继承关系如下.
1 2 3 4 5 6 Processor -->作为接口父类,主要提供函数接口. └───>StreamProcessor -->对除 `process` 外的函数进行了实现,提供基本功能.在不同阶段调用Operator的不同函数. ├──>SourceProcessor -->对外提供数据源读取的 `fetch` 函数 ├──>OneInputProcessor -->对外提供单输入处理,调用operator的 `processElement` 函数. └──>TwoInputProcessor -->与单输入类似,但是需要首先判断输入数据所属上游流,然后再调用operator的 `processElement` 函数.
Processor 主要存在五个接口,用于计算任务的不同阶段.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void open (List<Collector> collectors, RuntimeContext runtimeContext) ;void process (T t) ;Serializable saveCheckpoint () ; void loadCheckpoint (Serializable checkpointObject) ;void close () ;
2.2.9 ContextBackend
ray/streaming/runtime/context/ContextBackend.java
是 streaming-runtime 中对检查点后端的简易实现,对外提供了四个接口.随后实现了基于 HashMap 的 MemoryContextBackend
和基于本地文件系统的 AtomicFsBackend
,文件系统路径从 config 中获取.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 boolean exists (final String key) throws Exception;byte [] get(final String key) throws Exception;void put (final String key, final byte [] value) throws Exception;void remove (final String key) throws Exception;
2.2.10 BaseCoordinator
ray/streaming/runtime/master/coordinator/BaseCoordinator.java
是 CheckpointCoordinator
和 FailoverCoordinator
的父类,实现自 Runnable,将自身注册为独立线程.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public BaseCoordinator (JobMaster jobMaster) { this .jobMaster = jobMaster; this .runtimeContext = jobMaster.getRuntimeContext(); this .graphManager = jobMaster.getGraphManager(); } public void start () { thread = new Thread (Ray.wrapRunnable(this ), this .getClass().getName() + "-" + System.currentTimeMillis()); thread.start(); } public void stop () { closed = true ; try { if (thread != null ) { thread.join(30000 ); } } catch (InterruptedException e) { LOG.error("Coordinator thread exit has exception." , e); } }
2.2.10.1 CheckpointCoordinator
CheckpointCoordinator 的 run
函数中,主要流程未持续检查 runtimeContext
中的检查点命令,并作出相关操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void run () { while (!closed) { try { final BaseWorkerCmd command = runtimeContext.cpCmds.poll(1 , TimeUnit.SECONDS); if (command != null ) { if (command instanceof WorkerCommitReport) { processCommitReport((WorkerCommitReport) command); } else { interruptCheckpoint(); } } if (!pendingCheckpointActors.isEmpty()) { if (timeoutOnWaitCheckpoint()) { LOG.warn("Waiting for checkpoint {} timeout, pending cp actors is {}." , runtimeContext.lastCheckpointId, graphManager.getExecutionGraph().getActorName(pendingCheckpointActors)); interruptCheckpoint(); } } else { maybeTriggerCheckpoint(); } } catch (Throwable e) { LOG.error("Checkpoint coordinator occur err." , e); try { interruptCheckpoint(); } catch (Throwable interruptE) { LOG.error("Ignore interrupt checkpoint exception in catch block." ); } } } LOG.warn("Checkpoint coordinator thread exit." ); }
当 run
函数中获取到的为 JobWorker 正常发回的检查点消息时,将触发 processCommitReport
函数.该函数的逻辑是不断从 WorkerCommitReport 获取检查点完成消息,并将完成的 Job Worker 移出检查点队列,当队列清空时,则代表一轮检查点完成,此时通过 clearExpiredCpStateAndQueueMsg
通知所有计算任务清理上一轮检查点信息,然后通知 JobMaster 将自身也进行检查点保存.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void processCommitReport (WorkerCommitReport commitReport) { LOG.info("Start process commit report {}, from actor name={}." , commitReport, graphManager.getExecutionGraph().getActorName(commitReport.fromActorId)); try { Preconditions.checkArgument( commitReport.commitCheckpointId == runtimeContext.lastCheckpointId, "expect checkpointId %s, but got %s" , runtimeContext.lastCheckpointId, commitReport); if (!pendingCheckpointActors.contains(commitReport.fromActorId)) { LOG.warn("Invalid commit report, skipped." ); return ; } pendingCheckpointActors.remove(commitReport.fromActorId); LOG.info("Pending actors after this commit: {}." , graphManager.getExecutionGraph().getActorName(pendingCheckpointActors)); if (pendingCheckpointActors.isEmpty()) { runtimeContext.checkpointIds.add(runtimeContext.lastCheckpointId); if (clearExpiredCpStateAndQueueMsg()) { jobMaster.saveContext(); LOG.info("Finish checkpoint: {}." , runtimeContext.lastCheckpointId); } else { LOG.warn("Fail to do checkpoint: {}." , runtimeContext.lastCheckpointId); } } LOG.info("Process commit report {} success." , commitReport); } catch (Throwable e) { LOG.warn("Process commit report has exception." , e); } }
clearExpiredCpStateAndQueueMsg
函数的业务逻辑是从执行图中获取所有 actor,然后通过逐个通知清理检查点.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private boolean clearExpiredCpStateAndQueueMsg () { List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors(); if (1 == runtimeContext.checkpointIds.size()) { Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0 ); RemoteCallWorker.clearExpiredCheckpointParallel(allActor, 0L , msgExpiredCheckpointId); } if (runtimeContext.checkpointIds.size() > 1 ) { Long stateExpiredCpId = runtimeContext.checkpointIds.remove(0 ); Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0 ); RemoteCallWorker .clearExpiredCheckpointParallel(allActor, stateExpiredCpId, msgExpiredCheckpointId); } return true ; }
当 run
函数获取到检查点执行存在异常时,会触发 interruptCheckpoint
函数.该函数最终执行逻辑和 clearExpiredCpStateAndQueueMsg
函数相似,只是调用函数不同.最后会调用一次 maybeTriggerCheckpoint
判断是否要启动新的检查点.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void interruptCheckpoint () { if (interruptedCheckpointSet.contains(runtimeContext.lastCheckpointId)) { LOG.warn("Skip interrupt duplicated checkpoint id : {}." , runtimeContext.lastCheckpointId); return ; } interruptedCheckpointSet.add(runtimeContext.lastCheckpointId); LOG.warn("Interrupt checkpoint, checkpoint id : {}." , runtimeContext.lastCheckpointId); List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors(); if (runtimeContext.lastCheckpointId > runtimeContext.getLastValidCheckpointId()) { RemoteCallWorker .notifyCheckpointTimeoutParallel(allActor, runtimeContext.lastCheckpointId); } if (!pendingCheckpointActors.isEmpty()) { pendingCheckpointActors.clear(); } maybeTriggerCheckpoint(); }
当检查点正常结束后或异常处理结束后,会触发 maybeTriggerCheckpoint
函数.该函数首先判断检查点间隔是否满足,然后调用 triggerCheckpoint
函数.
1 2 3 4 5 6 7 8 9 10 11 private void maybeTriggerCheckpoint () { if (readyToTrigger()) { triggerCheckpoint(); } } private boolean readyToTrigger () { return (System.currentTimeMillis() - runtimeContext.lastCpTimestamp) >= cpIntervalSecs * 1000 ; }
triggerCheckpoint
函数是最终的检查点触发函数,会通过 RemoteCallWorker
调用 JobWorker 的 triggerCheckpoint
将检查点信息发送到所有数据源计算节点中.启动检查点任务.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void triggerCheckpoint () { interruptedCheckpointSet.clear(); if (LOG.isInfoEnabled()) { LOG.info("Start trigger checkpoint {}." , runtimeContext.lastCheckpointId + 1 ); } List<ActorId> allIds = graphManager.getExecutionGraph().getAllActorsId(); pendingCheckpointActors.addAll(allIds); ++runtimeContext.lastCheckpointId; final List<ObjectRef> sourcesRet = new ArrayList <>(); graphManager.getExecutionGraph().getSourceActors().forEach(actor -> { sourcesRet.add(RemoteCallWorker.triggerCheckpoint( actor, runtimeContext.lastCheckpointId)); }); for (ObjectRef rayObject : sourcesRet) { if (rayObject.get() instanceof RayException) { LOG.warn("Trigger checkpoint has exception." , (RayException) rayObject.get()); throw (RayException) rayObject.get(); } } runtimeContext.lastCpTimestamp = System.currentTimeMillis(); LOG.info("Trigger checkpoint success." ); }
2.2.10.2 FailoverCoordinator
FailoverCoordinator 的 requestJobWorkerRollback
函数向 JobMaster 暴露,用于判断是否向上下文中添加错误恢复请求信息.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public Boolean requestJobWorkerRollback (WorkerRollbackRequest request) { LOG.info("Request job worker rollback {}." , request); boolean ret; if (!isDuplicateRequest(request)) { ret = runtimeContext.foCmds.offer(request); } else { LOG.warn("Skip duplicated worker rollback request, {}." , request.toString()); return true ; } jobMaster.saveContext(); if (!ret) { LOG.warn("Request job worker rollback failed, because command queue is full." ); } return ret; }
FailoverCoordinator 的 run
函数同样不断查询检查点指令,当获取到 WorkerRollbackRequest
请求时,将调用 dealWithRollbackRequest
进行计算回滚.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void run () { while (!closed) { try { final BaseWorkerCmd command; synchronized (cmdLock) { command = jobMaster.getRuntimeContext().foCmds.poll(1 , TimeUnit.SECONDS); } if (null == command) { continue ; } if (command instanceof WorkerRollbackRequest) { jobMaster.getRuntimeContext().unfinishedFoCmds.add(command); dealWithRollbackRequest((WorkerRollbackRequest) command); } } catch (Throwable e) { LOG.error("Fo coordinator occur err." , e); } } LOG.warn("Fo coordinator thread exit." ); }
FailoverCoordinator 的 dealWithRollbackRequest
函数,首先会从回滚消息中获取回滚请求的计算节点,然后判断回滚是否有效,最后调用 interruptCheckpointAndRollback
函数执行回滚.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 private void dealWithRollbackRequest (WorkerRollbackRequest rollbackRequest) { LOG.info("Start deal with rollback request {}." , rollbackRequest); ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest); if (null != rollbackRequest.getPid() && !rollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) { exeVertex.setPid(rollbackRequest.getPid()); } if (isRollbacking.get(exeVertex)) { LOG.info("Vertex {} is rollbacking, skip rollback again." , exeVertex); return ; } String hostname = "" ; Optional<Container> container = ResourceUtil.getContainerById( jobMaster.getResourceManager().getRegisteredContainers(), exeVertex.getContainerId() ); if (container.isPresent()) { hostname = container.get().getHostname(); } if (rollbackRequest.isForcedRollback) { interruptCheckpointAndRollback(rollbackRequest); } else { asyncRemoteCaller.checkIfNeedRollbackAsync(exeVertex.getWorkerActor(), res -> { if (!res) { LOG.info("Vertex {} doesn't need to rollback, skip it." , exeVertex); return ; } interruptCheckpointAndRollback(rollbackRequest); }, throwable -> { LOG.error("Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead" + ", ignore this request, vertex={}." , exeVertex, throwable); }); } LOG.info("Deal with rollback request {} success." , rollbackRequest); } private Boolean isDuplicateRequest (WorkerRollbackRequest request) { try { Object[] foCmdsArray = runtimeContext.foCmds.toArray(); for (Object cmd : foCmdsArray) { if (request.fromActorId.equals(((BaseWorkerCmd) cmd).fromActorId)) { return true ; } } } catch (Exception e) { LOG.warn("Check request is duplicated failed." , e); } return false ; } private ExecutionVertex getExeVertexFromRequest (WorkerRollbackRequest rollbackRequest) { ActorId actorId = rollbackRequest.fromActorId; Optional<BaseActorHandle> rayActor = graphManager.getExecutionGraph().getActorById(actorId); if (!rayActor.isPresent()) { throw new RuntimeException ("Can not find ray actor of ID " + actorId); } return getExecutionVertex(rollbackRequest.fromActorId); } private ExecutionVertex getExecutionVertex (BaseActorHandle actor) { return graphManager.getExecutionGraph().getExecutionVertexByActorId(actor.getId()); } private ExecutionVertex getExecutionVertex (ActorId actorId) { return graphManager.getExecutionGraph().getExecutionVertexByActorId(actorId); }
FailoverCoordinator 的 interruptCheckpointAndRollback
函数,通过 rollback
执行回滚,然后向上下文中添加中断检查点请求.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void interruptCheckpointAndRollback (WorkerRollbackRequest rollbackRequest) { if (rollbackRequest.cascadingGroupId == null ) { rollbackRequest.cascadingGroupId = currentCascadingGroupId++; } rollback(jobMaster.getRuntimeContext().getLastValidCheckpointId(), rollbackRequest, currentCascadingGroupId); jobMaster.getRuntimeContext().cpCmds.offer(new InterruptCheckpointRequest ()); }
FailoverCoordinator 的 rollback
函数,在将请求回滚的 JobWorker 成功进行回滚后,会通知 JobMaster 将所有上游算子进行回滚,同时保存 JobMaster 状态.回滚将一直递归到数据源算子.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 private void rollback ( long checkpointId, WorkerRollbackRequest rollbackRequest, long cascadingGroupId) { ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest); LOG.info("Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}." , exeVertex, checkpointId, cascadingGroupId); isRollbacking.put(exeVertex, true ); asyncRemoteCaller.rollback(exeVertex.getWorkerActor(), checkpointId, result -> { List<WorkerRollbackRequest> newRollbackRequests = new ArrayList <>(); switch (result.getResultEnum()) { case SUCCESS: ChannelRecoverInfo recoverInfo = result.getResultObj(); LOG.info("Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}." , exeVertex, recoverInfo.getDataLostQueues(), result.getResultMsg(), cascadingGroupId); newRollbackRequests = cascadeUpstreamActors(recoverInfo.getDataLostQueues(), exeVertex, cascadingGroupId); break ; case SKIPPED: LOG.info("Vertex skip rollback, result = {}, cascadingGroupId={}." , result, cascadingGroupId); break ; default : LOG.error( "Rollback vertex {} failed, result={}, cascadingGroupId={}," + " rollback this worker again after {} ms." , exeVertex, result, cascadingGroupId, ROLLBACK_RETRY_TIME_MS); Thread.sleep(ROLLBACK_RETRY_TIME_MS); LOG.info("Add rollback request for {} again, cascadingGroupId={}." , exeVertex, cascadingGroupId); newRollbackRequests.add( new WorkerRollbackRequest (exeVertex, "" , "Rollback failed, try again." , false ) ); break ; } synchronized (cmdLock) { jobMaster.getRuntimeContext().foCmds.addAll(newRollbackRequests); jobMaster.getRuntimeContext().unfinishedFoCmds.remove(rollbackRequest); jobMaster.saveContext(); } isRollbacking.put(exeVertex, false ); }, throwable -> { LOG.error("Exception when calling vertex to rollback, vertex={}." , exeVertex, throwable); isRollbacking.put(exeVertex, false ); }); LOG.info("Finish rollback vertex {}, checkpoint id is {}." , exeVertex, checkpointId); } private List<WorkerRollbackRequest> cascadeUpstreamActors ( Set<String> dataLostQueues, ExecutionVertex fromVertex, long cascadingGroupId) { List<WorkerRollbackRequest> cascadedRollbackRequest = new ArrayList <>(); dataLostQueues.forEach(q -> { BaseActorHandle upstreamActor = graphManager.getExecutionGraph().getPeerActor(fromVertex.getWorkerActor(), q); ExecutionVertex upstreamExeVertex = getExecutionVertex(upstreamActor); if (isRollbacking.get(upstreamExeVertex)) { return ; } LOG.info("Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}." , upstreamExeVertex, fromVertex, cascadingGroupId); String hostname = "" ; Optional<Container> container = ResourceUtil.getContainerById( jobMaster.getResourceManager().getRegisteredContainers(), upstreamExeVertex.getContainerId() ); if (container.isPresent()) { hostname = container.get().getHostname(); } WorkerRollbackRequest upstreamRequest = new WorkerRollbackRequest ( upstreamExeVertex, hostname, String.format("Cascading rollback from %s" , fromVertex), true ); upstreamRequest.cascadingGroupId = cascadingGroupId; cascadedRollbackRequest.add(upstreamRequest); }); return cascadedRollbackRequest; }
三、总结
ray-streaming 提供对 python API 的支持,其后端运行方式一致,主要时提供了一个用于 python 连接 java JobMaster 的 _PYTHON_GATEWAY
.若提供对新计算算子的支持,需要在 streaming/python/datastream.py
, streaming/python/operator.py
, streaming/python/function.py
提供相应接口.
ray-streaming 目前仅支持简单计算,不支持双输入处理,单输入中也缺失关于时间的处理.
ray-streaming 目前的检查点只有一个简易的内存后端和一个本地文件后端.状态后端 streaming-state 只保留了函数接口,并没有整合到计算系统中.
从调用逻辑上来看,状态后端 streaming-state 应该在 StreamTask 中被创建.
用于 Task 之间消息传递的 DataMessage
包含时间戳相关信息,用于数据处理的 Record
已经丢失时间信息,无法有效支持时间处理.
参考