前言
Ray
源码分析系列笔记。
一、概述
本文针对 ray-streaming
的源码进行分析,研究启动过程中涉及的操作。ray-streaming
脱胎于 flink
,同时源码也是由蚂蚁方面的团队贡献的,目前 ray
的流计算处于一个比较粗浅的状态,只支持单一输入的操作,并且没有提供对时间窗口的支持,同时检查点逻辑也是最基本的内存实现和本地文件系统实现。
ray-streaming
主要通过三个包进行业务支持,整体设计在 ray Java actor
之上,通过构建 jobmaster
进行流计算任务调度和容错恢复,通过 ray
原生提供的 java actor
和 python actor
对多语言进行支持。相较于 flink
,ray-streaming
采用了不同的数据传输策略和检查点策略,在做检查点时,会将通过 C++
构建的输入输出缓冲池偏移量缓存,可以简化计算恢复逻辑,省去中间数据的存储。
二、Java 源码分析
2.1 streaming-api
2.1.1 StreamingContext
ray/streaming/api/context/StreamingContext.java
是所有流计算任务的起点,包含了流计算的基本上下文信息。其中最重要的函数有两个一个是用于添加计算任务配置的 withConfig
方法,该函数传入 map 类型
的配置信息,所有信息将在后续执行任务时被提交。另一个函数是 execute
,该函数是流计算最终提交执行的地方,会从所有的流计算 sink
进行反向遍历,通过 JobGraphBuilder
构建流计算任务执行图,随后通过 JobGraphOptimizer
对计算任务进行优化 (目前的优化策略仅是从数据源开始遍历任务,对计算任务进行合并,目前存在问题,同一个数据源出现两个以上的 sink
时,任务会存在执行异常.).最终连接或创建 ray
集群,并通过 JobClient.submit
提交任务,JobClient
的函数接口在 streaming-api
中,但具体实现都保存在 streaming-runtime
。
Java12345678910111213141516171819202122232425262728293031/** * Construct job DAG, and execute the job. */ public void execute(String jobName) { JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName); JobGraph originalJobGraph = jobGraphBuilder.build(); this.jobGraph = new JobGraphOptimizer(originalJobGraph).optimize(); jobGraph.printJobGraph(); LOG.info("JobGraph digraph\n{}", jobGraph.generateDigraph()); if (!Ray.isInitialized()) { if (Config.MEMORY_CHANNEL.equalsIgnoreCase(jobConfig.get(Config.CHANNEL_TYPE))) { Preconditions.checkArgument(!jobGraph.isCrossLanguageGraph()); ClusterStarter.startCluster(false, true); LOG.info("Created local cluster for job {}.", jobName); } else { ClusterStarter.startCluster(jobGraph.isCrossLanguageGraph(), false); LOG.info("Created multi process cluster for job {}.", jobName); } Runtime.getRuntime().addShutdownHook(new Thread(StreamingContext.this::stop)); } else { LOG.info("Reuse existing cluster."); } ServiceLoader<JobClient> serviceLoader = ServiceLoader.load(JobClient.class); Iterator<JobClient> iterator = serviceLoader.iterator(); Preconditions.checkArgument(iterator.hasNext(), "No JobClient implementation has been provided."); JobClient jobClient = iterator.next(); jobClient.submit(jobGraph, jobConfig); }
2.1.2 DataStream
从 StreamingContext
出发,首先要构建数据源算子,该算子通过 ray/streaming/api/stream/DataStreamSource.java
对外提供调用接口,用户需要将 StreamingContext
传入其中,用于记录算子递归过程。随后的所有计算任务都基于 ray/streaming/api/stream/DataStream.java
进行构建,通过递归调用的方式,逐次记录调用关系,所有的计算流操作都封装在 ray/streaming/api/stream
目录中。一条计算流的终点是 ray/streaming/api/stream/DataStreamSink.java
,sink
流会将整个计算任务链注册回 StreamingContext
中,用于构建任务,当一个计算流没有 sink
时,流计算上下文将不会保存该任务,也不会触发执行。
2.1.3 operator&function
所有的 DataStream
都是对 Operator
的封装,Operator
的实现路径在 ray/streaming/operator
中,而所有的 Operator
如果涉及到具体的计算操作,则由最终的 Function
负责,Function
的路径为 ray/streaming/function
。Function
分为对用户直接暴露和系统内部调用两种,分别保存在 ray/streaming/api/function/impl
和 ray/streaming/api/function/internal
中。
2.2 streaming-runtime
runtime
层面的类包逐级调用逻辑如下:
bash12345678910111213141516171819JobClientImpl └──>JobMaster <───────────────────────────┐ ├──>JobScheduler │ │ └──>WorkerLifecycleController │ │ └──>RemoteCallWorker │ │ └──>JobWorker │<──┐ │ ├──>RemoteCallMaster ─┙<──┥ │ ├──>ContextBackend │ │ └──>StreamTask ────┴───────┐ │ └──>Processor │ │ ├──>Operator │ │ └──>StreamingRuntimeContext<┙ │ ├──>KeyStateBackend │ └──>OperatorStateBackend ├──>GraphManager ├──>ResourceManager ├──>ContextBackend ├──>CheckpointCoordinator └──>FailoverCoordinator
2.2.1 JobClientImpl
ray/streaming/runtime/client/JobClientImpl.java
是对 JobClient
的具体实现。其中的 submit
函数传入任务图和任务参数,根据任务参数创建 ray-streaming
实现的核心 JobMaster
,随后通过 ray
的远程调用方式,执行 JobMaster
中的 submitJob
函数,正式开始执行计算任务。
2.2.2 JobMaster
ray/streaming/runtime/master/JobMaster.java
在创建时,会根据任务参数创建 StreamingConfig
,并根据该配置初始化目前简易的存储后端 ContextBackend
以及 JobMasterRuntimeContext
,同时回从上下文中判断计算任务是否需要从检查点中恢复,进而执行相关 init
操作。
Java123456789101112131415161718192021222324public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) { LOG.info("Begin submitting job using logical plan: {}.", jobGraph); this.jobMasterActor = jobMasterActor; // init manager graphManager = new GraphManagerImpl(runtimeContext); resourceManager = new ResourceManagerImpl(runtimeContext); // build and set graph into runtime context ExecutionGraph executionGraph = graphManager.buildExecutionGraph(jobGraph); runtimeContext.setJobGraph(jobGraph); runtimeContext.setExecutionGraph(executionGraph); // init scheduler try { scheduler = new JobSchedulerImpl(this); scheduler.scheduleJob(graphManager.getExecutionGraph()); } catch (Exception e) { LOG.error("Failed to submit job.", e); return false; } return true; }
JobMaster
的 submitJob
函数需要传入 JobMaster
本身的远程执行句柄和任务图,随后的执行过程分为以下几个阶段:
- 初始化管理服务:根据
JobMasterRuntimeContext
创建GraphManager
和ResourceManager
。 - 构建执行图:通过
GraphManager
将任务图转化为最终的任务执行图ExecutionGraph
,并将计算任务注册到JobMasterRuntimeContext
中。(执行图的主要过程是针对不同算子并行度信息构建对应的物理执行节点) - 任务调度:通过
JobSchedulerImpl
的scheduleJob
函数具体调度并执行流计算任务。
JobMaster
的 init
传入变量为一个布尔型,用于判断任务是否是从检查点中恢复。该函数主要用途为启动用于检查点的 CheckpointCoordinator
进程和用于错误恢复的 FailoverCoordinator
进程。然后保存当前计算状态到存储后端中。
Java1234567// init coordinators checkpointCoordinator = new CheckpointCoordinator(this); checkpointCoordinator.start(); failoverCoordinator = new FailoverCoordinator(this, isRecover); failoverCoordinator.start(); saveContext();
JobMaster
的 reportJobWorkerCommit
函数,被 JobWorker
调用,通过调用 CheckpointCoordinator
的相关函数,对检查点进行管理。
Java123456789101112131415161718public byte[] reportJobWorkerCommit(byte[] reportBytes) { Boolean ret = false; RemoteCall.BaseWorkerCmd reportPb; try { reportPb = RemoteCall.BaseWorkerCmd.parseFrom(reportBytes); ActorId actorId = ActorId.fromBytes(reportPb.getActorId().toByteArray()); long remoteCallCost = System.currentTimeMillis() - reportPb.getTimestamp(); LOG.info("Vertex {}, request job worker commit cost {}ms, actorId={}.", getExecutionVertex(actorId), remoteCallCost, actorId); RemoteCall.WorkerCommitReport commit = reportPb.getDetail().unpack(RemoteCall.WorkerCommitReport.class); WorkerCommitReport report = new WorkerCommitReport(actorId, commit.getCommitCheckpointId()); ret = checkpointCoordinator.reportJobWorkerCommit(report); } catch (InvalidProtocolBufferException e) { LOG.error("Parse job worker commit has exception.", e); } return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray(); }
JobMaster
的 requestJobWorkerRollback
函数,同样被 JobWorker
调用,当判定计算任务需要回滚时,将会调用 FailoverCoordinator
的相关函数,进行错误恢复。
Java1234567891011121314151617181920212223242526272829303132333435363738394041public byte[] requestJobWorkerRollback(byte[] requestBytes) { Boolean ret = false; RemoteCall.BaseWorkerCmd requestPb; try { requestPb = RemoteCall.BaseWorkerCmd.parseFrom(requestBytes); ActorId actorId = ActorId.fromBytes(requestPb.getActorId().toByteArray()); long remoteCallCost = System.currentTimeMillis() - requestPb.getTimestamp(); ExecutionGraph executionGraph = graphManager.getExecutionGraph(); Optional<BaseActorHandle> rayActor = executionGraph.getActorById(actorId); if (!rayActor.isPresent()) { LOG.warn("Skip this invalid rollback, actor id {} is not found.", actorId); return RemoteCall.BoolResult.newBuilder().setBoolRes(false).build().toByteArray(); } ExecutionVertex exeVertex = getExecutionVertex(actorId); LOG.info("Vertex {}, request job worker rollback cost {}ms, actorId={}.", exeVertex, remoteCallCost, actorId); RemoteCall.WorkerRollbackRequest rollbackPb = RemoteCall.WorkerRollbackRequest.parseFrom(requestPb.getDetail().getValue()); exeVertex.setPid(rollbackPb.getWorkerPid()); // To find old container where slot is located in. String hostname = ""; Optional<Container> container = ResourceUtil.getContainerById( resourceManager.getRegisteredContainers(), exeVertex.getContainerId() ); if (container.isPresent()) { hostname = container.get().getHostname(); } WorkerRollbackRequest request = new WorkerRollbackRequest( actorId, rollbackPb.getExceptionMsg(), hostname, exeVertex.getPid() ); ret = failoverCoordinator.requestJobWorkerRollback(request); LOG.info("Vertex {} request rollback, exception msg : {}.", exeVertex, rollbackPb.getExceptionMsg()); } catch (Throwable e) { LOG.error("Parse job worker rollback has exception.", e); } return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray(); }
2.2.3 JobScheduler
ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java
在创建过程中需要传入 JobMaster
,随后从中获取管理服务和计算任务配置,并创建用于具体算子执行管理的 WorkerLifecycleController
。
JobScheduler
的 scheduleJob
函数,对传入的物理执行图进行处理,启动计算任务,主要分为以下几个阶段:
- 准备阶段:通过
prepareResourceAndCreateWorker
准备资源并创建相应的JobWorker
,随后通过generateActorMappings
生成计算节点间的映射关系,用于数据分发。 - 启动阶段:
initAndStart
启动流计算任务。
JobScheduler 的 prepareResourceAndCreateWorker
函数,对传入的物理执行图进行处理,主要分为以下几个阶段:
- 准备计算资源:通过
resourceManager.assignResource
向ray
集群申请资源。 - 创建计算节点:通过
createWorkers
创建所有计算节点。最终通过WorkerLifecycleController
的createWorkers
函数创建计算节点。
JobScheduler
的 initAndStart
函数,对传入的物理执行图进行处理,主要分为以下几个阶段:
- 构建计算上下文:通过物理执行图和
JobMaster
句柄,构建每一个JobWorker
的上下文信息JobWorkerContext
. - 初始化计算节点:通过
initWorkers
初始化所有计算节点。最终是调用了WorkerLifecycleController
的initWorkers
函数。 - 初始化主节点:通过
initMaster
初始化主节点。最终有调用回了JobMaster
的init
函数。 - 启动计算节点:通过
initWorkers
启动所有计算任务。最终是调用了WorkerLifecycleController
的startWorkers
函数。
2.2.4 WorkerLifecycleController
ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java
用于所有计算节点的生命周期管理,提供创建,初始化,启动,销毁等操作。
WorkerLifecycleController
的 asyncBatchExecute
是统一封装的异步执行函数,通过 java
自带的 CompletableFuture
类的 supplyAsync
函数,异步执行计算任务。同时会校验计算任务的执行结果。
WorkerLifecycleController
的 createWorkers
通过 asyncBatchExecute
方式创建 JobMaster
。异步执行的函数为 createWorker
,在这个函数中,会逐个遍历物理执行图的节点,然后根据计算任务类型创建基于 java
或是基于 python
的 JobWorker
,随后再将创建好的 JobWorker
注册回执行图中,用于后续初始化和启动操作。
Java12345678910111213141516171819202122232425262728293031323334private boolean createWorker(ExecutionVertex executionVertex) { LOG.info("Start to create worker actor for vertex: {} with resource: {}, workeConfig: {}.", executionVertex.getExecutionVertexName(), executionVertex.getResource(), executionVertex.getWorkerConfig()); Language language = executionVertex.getLanguage(); BaseActorHandle actor; if (Language.JAVA == language) { actor = Ray.actor(JobWorker::new, executionVertex) .setResources(executionVertex.getResource()) .setMaxRestarts(-1) .remote(); } else { RemoteCall.ExecutionVertexContext.ExecutionVertex vertexPb = new GraphPbBuilder().buildVertex(executionVertex); actor = Ray.actor( PyActorClass.of("ray.streaming.runtime.worker", "JobWorker"), vertexPb.toByteArray()) .setResources(executionVertex.getResource()) .setMaxRestarts(-1) .remote(); } if (null == actor) { LOG.error("Create worker actor failed."); return false; } executionVertex.setWorkerActor(actor); LOG.info("Worker actor created, actor: {}, vertex: {}.", executionVertex.getWorkerActorId(), executionVertex.getExecutionVertexName()); return true; }
WorkerLifecycleController
的 initWorkers
负责初始化 JobWorker
,通过 RemoteCallWorker
的 initWorker
函数逐个传递 JobWorkerContext
并初始化计算任务。
Java12345678910111213141516171819202122232425public boolean initWorkers( Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) { LOG.info("Begin initiating workers: {}.", vertexToContextMap); long startTime = System.currentTimeMillis(); Map<ObjectRef<Boolean>, ActorId> rayObjects = new HashMap<>(); vertexToContextMap.entrySet().forEach((entry -> { ExecutionVertex vertex = entry.getKey(); rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), entry.getValue()), vertex.getWorkerActorId()); })); List<ObjectRef<Boolean>> objectRefList = new ArrayList<>(rayObjects.keySet()); LOG.info("Waiting for workers' initialization."); WaitResult<Boolean> result = Ray.wait(objectRefList, objectRefList.size(), timeout); if (result.getReady().size() != objectRefList.size()) { LOG.error("Initializing workers timeout[{} ms].", timeout); return false; } LOG.info("Finished waiting workers' initialization."); LOG.info("Workers initialized. Cost {} ms.", System.currentTimeMillis() - startTime); return true; }
WorkerLifecycleController
的 startWorkers
负责通知 JobWorker
启动计算任务,通过 RemoteCallWorker
的 rollback
函数逐个通知对应 JobWorker
通过 rollback
启动 StreamTask
。这里为了保证计算逻辑,会先启动所有数据源,然后再逐个启动其他计算任务。
Java12345678910111213141516171819202122public boolean startWorkers(ExecutionGraph executionGraph, long lastCheckpointId, int timeout) { LOG.info("Begin starting workers."); long startTime = System.currentTimeMillis(); List<ObjectRef<Object>> objectRefs = new ArrayList<>(); // start source actors 1st executionGraph.getSourceActors() .forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId))); // then start non-source actors executionGraph.getNonSourceActors() .forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId))); WaitResult<Object> result = Ray.wait(objectRefs, objectRefs.size(), timeout); if (result.getReady().size() != objectRefs.size()) { LOG.error("Starting workers timeout[{} ms].", timeout); return false; } LOG.info("Workers started. Cost {} ms.", System.currentTimeMillis() - startTime); return true; }
2.2.4 RemoteCallWorker&RemoteCallMaster
ray/streaming/runtime/rpc/RemoteCallWorker.java
负责处理所用 JobWorker
的远程调用逻辑。
RemoteCallWorker
的 initWorker
函数最终逐个传递 JobWorkerContext
并调用对应 JobMaster
的 init
函数。
Java12345678910111213141516public static ObjectRef<Boolean> initWorker(BaseActorHandle actor, JobWorkerContext context) { LOG.info("Call worker to initiate, actor: {}, context: {}.", actor.getId(), context); ObjectRef<Boolean> result; // python if (actor instanceof PyActorHandle) { result = ((PyActorHandle) actor).task(PyActorMethod.of("init", Boolean.class), context.getPythonWorkerContextBytes()).remote(); } else { // java result = ((ActorHandle<JobWorker>) actor).task(JobWorker::init, context).remote(); } LOG.info("Finished calling worker to initiate."); return result; }
RemoteCallWorker
的 rollback
函数通过 JobWorker
的 rollback
函数启动计算,该函数同时支持从检查点中启动和直接启动。
Java12345678910111213141516171819202122public static ObjectRef rollback(BaseActorHandle actor, final Long checkpointId) { LOG.info("Call worker to start, actor: {}.", actor.getId()); ObjectRef result; // python if (actor instanceof PyActorHandle) { RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder() .setCheckpointId(checkpointId) .build(); result = ((PyActorHandle) actor) .task(PyActorMethod.of("rollback"), checkpointIdPb.toByteArray() ).remote(); } else { // java result = ((ActorHandle<JobWorker>) actor) .task(JobWorker::rollback, checkpointId, System.currentTimeMillis()).remote(); } LOG.info("Finished calling worker to start."); return result; }
ray/streaming/runtime/rpc/RemoteCallMaster.java
负责处理所用 JobMaster
的远程调用逻辑。主要用来同步检查点逻辑,包含 reportJobWorkerCommitAsync
和 requestJobWorkerRollback
两个函数。
RemoteCallMaster
的 reportJobWorkerCommitAsync
函数被 StreamTask
调用,用于通知检查点执行完毕。
Java1234567891011121314public static ObjectRef<byte[]> reportJobWorkerCommitAsync( ActorHandle<JobMaster> actor, WorkerCommitReport commitReport) { RemoteCall.WorkerCommitReport commit = RemoteCall.WorkerCommitReport.newBuilder() .setCommitCheckpointId(commitReport.commitCheckpointId) .build(); Any detail = Any.pack(commit); RemoteCall.BaseWorkerCmd cmd = RemoteCall.BaseWorkerCmd.newBuilder() .setActorId(ByteString.copyFrom(commitReport.fromActorId.getBytes())) .setTimestamp(System.currentTimeMillis()) .setDetail(detail).build(); return actor.task(JobMaster::reportJobWorkerCommit, cmd.toByteArray()).remote(); }
RemoteCallMaster
的 requestJobWorkerRollback
函数被 JobWorker
调用,用于在检测到异常时,通知 JobMaster
进行全局的业务回滚。
Java1234567891011121314151617public static Boolean requestJobWorkerRollback( ActorHandle<JobMaster> actor, WorkerRollbackRequest rollbackRequest) { RemoteCall.WorkerRollbackRequest request = RemoteCall.WorkerRollbackRequest.newBuilder() .setExceptionMsg(rollbackRequest.getRollbackExceptionMsg()) .setWorkerHostname(rollbackRequest.getHostname()) .setWorkerPid(rollbackRequest.getPid()).build(); Any detail = Any.pack(request); RemoteCall.BaseWorkerCmd cmd = RemoteCall.BaseWorkerCmd.newBuilder() .setActorId(ByteString.copyFrom(rollbackRequest.fromActorId.getBytes())) .setTimestamp(System.currentTimeMillis()) .setDetail(detail).build(); ObjectRef<byte[]> ret = actor.task( JobMaster::requestJobWorkerRollback, cmd.toByteArray()).remote(); byte[] res = ret.get(); return PbResultParser.parseBoolResult(res); }
2.2.5 JobWorker
ray/streaming/runtime/worker/JobWorker.java
是注册到 ray
集群中的最小单位,JobWorker
内部包含的 StreamTask
是流计算任务的最小单位。
JobWorker
在创建阶段,会根据物理执行节点信息创建 JobWorkerContext
并构建属于 JobWorker
的存储后端 ContextBackend
(该后端和 Job Master
中使用的后端类型一致,但存储各自独立).随后判断计算节点上下文中是否含有检查点信息,若存在,在从后端中获取信息后,会直接调用 init
从恢复的上下文中初始化节点,然后请求启动计算任务 requestRollback
Java1234567891011121314151617181920212223242526272829303132public JobWorker(ExecutionVertex executionVertex) { LOG.info("Creating job worker."); // TODO: the following 3 lines is duplicated with that in init(), try to optimise it later. this.executionVertex = executionVertex; this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig()); this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig); LOG.info("Ray.getRuntimeContext().wasCurrentActorRestarted()={}", Ray.getRuntimeContext().wasCurrentActorRestarted()); if (!Ray.getRuntimeContext().wasCurrentActorRestarted()) { saveContext(); LOG.info("Job worker is fresh started, init success."); return; } LOG.info("Begin load job worker checkpoint state."); byte[] bytes = CheckpointStateUtil.get(contextBackend, getJobWorkerContextKey()); if (bytes != null) { JobWorkerContext context = Serializer.decode(bytes); LOG.info("Worker recover from checkpoint state, byte len={}, context={}.", bytes.length, context); init(context); requestRollback("LoadCheckpoint request rollback in new actor."); } else { LOG.error( "Worker is reconstructed, but can't load checkpoint. " + "Check whether you checkpoint state is reliable. Current checkpoint state is {}.", contextBackend.getClass().getName()); } }
JobWorker
的 init
函数,通过上下文信息,再次构建节点相关信息 (这里的步骤在 JobWorker
创建时执行过,但具体依赖的上下文信息存在异同),并进行保存。
Java123456789101112131415161718192021222324public Boolean init(JobWorkerContext workerContext) { // IMPORTANT: some test cases depends on this log to find workers' pid, // be careful when changing this log. LOG.info("Initiating job worker: {}. Worker context is: {}, pid={}.", workerContext.getWorkerName(), workerContext, EnvUtil.getJvmPid()); this.workerContext = workerContext; this.executionVertex = workerContext.getExecutionVertex(); this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig()); // init state backend this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig); LOG.info("Initiating job worker succeeded: {}.", workerContext.getWorkerName()); saveContext(); return true; } public synchronized void saveContext() { byte[] contextBytes = Serializer.encode(workerContext); String key = getJobWorkerContextKey(); LOG.info("Saving context, worker context={}, serialized byte length={}, key={}.", workerContext, contextBytes.length, key); CheckpointStateUtil.put(contextBackend, key, contextBytes); }
JobWorker
的 rollback
函数,负责实际创建用于直接计算任务的 StreamTask
,并负责对相应计算任务的生命周期管理和检查点管理。rollback
函数通过判断传入的检查点 ID
,但任务已经创建切检查点 ID
和本地相同时,将自动跳过剩余步骤。否则将正常创建数据传输信道,并通过调用 createStreamTask
函数创建计算任务,随后通过 StreamTask
的 recover
函数启动计算。目前的流计算任务仅支持数据源操作和单输入操作。
Java12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061public CallResult<ChannelRecoverInfo> rollback(Long checkpointId, Long startRollbackTs) { synchronized (initialStateChangeLock) { if (task != null && task.isAlive() && checkpointId == task.lastCheckpointId && task.isInitialState) { return CallResult.skipped("Task is already in initial state, skip this rollback."); } } long remoteCallCost = System.currentTimeMillis() - startRollbackTs; LOG.info("Start rollback[{}], checkpoint is {}, remote call cost {}ms.", executionVertex.getExecutionJobVertexName(), checkpointId, remoteCallCost); rollbackCount++; if (rollbackCount > 1) { isRecreate.set(true); } try { //Init transfer TransferChannelType channelType = workerConfig.transferConfig.channelType(); if (TransferChannelType.NATIVE_CHANNEL == channelType) { transferHandler = new TransferHandler(); } if (task != null) { // make sure the task is closed task.close(); task = null; } // create stream task task = createStreamTask(checkpointId); ChannelRecoverInfo channelRecoverInfo = task.recover(isRecreate.get()); isNeedRollback = false; LOG.info("Rollback job worker success, checkpoint is {}, channelRecoverInfo is {}.", checkpointId, channelRecoverInfo); return CallResult.success(channelRecoverInfo); } catch (Exception e) { LOG.error("Rollback job worker has exception.", e); return CallResult.fail(ExceptionUtils.getStackTrace(e)); } } private StreamTask createStreamTask(long checkpointId) { StreamTask task; StreamProcessor streamProcessor = ProcessBuilder .buildProcessor(executionVertex.getStreamOperator()); LOG.debug("Stream processor created: {}.", streamProcessor); if (streamProcessor instanceof SourceProcessor) { task = new SourceStreamTask(streamProcessor, this, checkpointId); } else if (streamProcessor instanceof OneInputProcessor) { task = new OneInputStreamTask(streamProcessor, this, checkpointId); } else { throw new RuntimeException("Unsupported processor type:" + streamProcessor); } LOG.info("Stream task created: {}.", task); return task; }
JobWorker
的检查点相关函数总共有三个,其中 triggerCheckpoint
用于触发检查点,notifyCheckpointTimeout
用于通知检查点执行失败,clearExpiredCheckpoint
用于清理过期检查点。所有函数最终调用了 StreamTask
中对应的相关方法。
Java12345678910111213141516171819202122232425262728public Boolean triggerCheckpoint(Long barrierId) { LOG.info("Receive trigger, barrierId is {}.", barrierId); if (task != null) { return task.triggerCheckpoint(barrierId); } return false; } public Boolean notifyCheckpointTimeout(Long checkpointId) { LOG.info("Notify checkpoint timeout, checkpoint id is {}.", checkpointId); if (task != null) { task.notifyCheckpointTimeout(checkpointId); } return true; } public Boolean clearExpiredCheckpoint(Long expiredStateCpId, Long expiredQueueCpId) { LOG.info("Clear expired checkpoint state, checkpoint id is {}; " + "Clear expired queue msg, checkpoint id is {}", expiredStateCpId, expiredQueueCpId); if (task != null) { if (expiredStateCpId > 0) { task.clearExpiredCpState(expiredStateCpId); } task.clearExpiredQueueMsg(expiredQueueCpId); } return true; }
JobWorker
的容错相关函数总共有两个,其中 requestRollback
负责通知 JobMaster
计算任务需要回滚,checkIfNeedRollback
则是单纯用于容错恢复的判断。
Java1234567891011121314151617181920212223public void requestRollback(String exceptionMsg) { LOG.info("Request rollback."); isNeedRollback = true; isRecreate.set(true); boolean requestRet = RemoteCallMaster.requestJobWorkerRollback( workerContext.getMaster(), new WorkerRollbackRequest( workerContext.getWorkerActorId(), exceptionMsg, EnvUtil.getHostName(), EnvUtil.getJvmPid() )); if (!requestRet) { LOG.warn("Job worker request rollback failed! exceptionMsg={}.", exceptionMsg); } } public Boolean checkIfNeedRollback(Long startCallTs) { // No save checkpoint in this query. long remoteCallCost = System.currentTimeMillis() - startCallTs; LOG.info("Finished checking if need to rollback with result: {}, rpc delay={}ms.", isNeedRollback, remoteCallCost); return isNeedRollback; }
2.2.6 StreamTask
ray/streaming/runtime/worker/tasks/StreamTask.java
是所有流计算任务的基类继承自 Runnable
,在创建时会将自身注册为独立线程来做到流数据的持续处理。StreamTask
存在多个特化的子类,用于实现对不同计算任务的处理,其类继承关系如下。主要分为数据源任务和输入任务两类,输入任务又具体分为单输入和双输入,对应操作符的不同实现。所有关于计算算子本身的操作都在 StreamTask
中进行实现,关于数据处理,则下推到具体的 StreamTask
的 run
函数中实现。当 run
函数中捕获异常时,会触发回滚。
bash12345StreamTask ├──>InputStreamTask │ ├──>OneInputStreamTask │ └──>TwoInputStreamTask └──>SourceStreamTask
Java123456789protected StreamTask(Processor processor, JobWorker jobWorker, long lastCheckpointId) { this.processor = processor; this.jobWorker = jobWorker; this.checkpointState = jobWorker.contextBackend; this.lastCheckpointId = lastCheckpointId; this.thread = new Thread(Ray.wrapRunnable(this), this.getClass().getName() + "-" + System.currentTimeMillis()); this.thread.setDaemon(true); }
StreamTask
的 recover
函数负责启动计算任务,首先会调用 prepareTask
函数确定计算任务初始化信息,然后启动线程即启动计算任务。
Java123456789101112131415161718192021222324252627public ChannelRecoverInfo recover(boolean isRecover) { if (isRecover) { LOG.info("Stream task begin recover."); } else { LOG.info("Stream task first start begin."); } prepareTask(isRecover); // start runner ChannelRecoverInfo recoverInfo = new ChannelRecoverInfo(new HashMap<>()); if (reader != null) { recoverInfo = reader.getQueueRecoverInfo(); } thread.setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in runner thread.", e)); LOG.info("Start stream task: {}.", this.getClass().getSimpleName()); thread.start(); if (isRecover) { LOG.info("Stream task recover end."); } else { LOG.info("Stream task first start finished."); } return recoverInfo; }
StreamTask
的 prepareTask
函数负责准备计算任务所需资源,具体分为以下几步:
- 通过传入的参数判断计算任务是否为初次创建,并从存储后端中获取相应数据
- (option) 若为检查点恢复且检查点中存在数据,则调用 Processor 的相关
loadCheckpoint
函数,该函数最中由用户自行实现。 - 创建数据读写入口
DataWriter
和DataReader
。 - 调用
openProcessor
传递数据,执行计算任务的open
阶段。
Java123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354private void prepareTask(boolean isRecreate) { LOG.info("Preparing stream task, isRecreate={}.", isRecreate); ExecutionVertex executionVertex = jobWorker.getExecutionVertex(); // set vertex info into config for native using jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.WORKER_NAME_INTERNAL, executionVertex.getExecutionVertexName()); jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.OP_NAME_INTERNAL, executionVertex.getExecutionJobVertexName()); OperatorCheckpointInfo operatorCheckpointInfo = new OperatorCheckpointInfo(); byte[] bytes = null; // Fetch checkpoint from storage only in recreate mode not for new startup // worker // in rescaling or something like that. if (isRecreate) { String cpKey = genOpCheckpointKey(lastCheckpointId); LOG.info("Getting task checkpoints from state, cpKey={}, checkpointId={}.", cpKey, lastCheckpointId); bytes = CheckpointStateUtil.get(checkpointState, cpKey); if (bytes == null) { String msg = String.format("Task recover failed, checkpoint is null! cpKey=%s", cpKey); throw new RuntimeException(msg); } } // when use memory state, if actor throw exception, will miss state if (bytes != null) { operatorCheckpointInfo = Serializer.decode(bytes); processor.loadCheckpoint(operatorCheckpointInfo.processorCheckpoint); LOG.info("Stream task recover from checkpoint state, checkpoint bytes len={}, checkpointInfo={}.", bytes.length, operatorCheckpointInfo); } // writer if (!executionVertex.getOutputEdges().isEmpty()) { LOG.info("Register queue writer, channels={}, outputCheckpoints={}.", executionVertex.getOutputChannelIdList(), operatorCheckpointInfo.outputPoints); writer = new DataWriter(executionVertex.getOutputChannelIdList(), executionVertex.getOutputActorList(), operatorCheckpointInfo.outputPoints, jobWorker.getWorkerConfig()); } // reader if (!executionVertex.getInputEdges().isEmpty()) { LOG.info("Register queue reader, channels={}, inputCheckpoints={}.", executionVertex.getInputChannelIdList(), operatorCheckpointInfo.inputPoints); reader = new DataReader(executionVertex.getInputChannelIdList(), executionVertex.getInputActorList(), operatorCheckpointInfo.inputPoints, jobWorker.getWorkerConfig()); } openProcessor(); LOG.debug("Finished preparing stream task."); }
StreamTask
的 openProcessor
函数负责激活计算任务的 open
阶段,主要任务为创建计算任务所需的 StreamingRuntimeContext
然后调用处理器的 open
函数传递上下文信息。
Java12345678910111213141516171819202122232425262728private void openProcessor() { ExecutionVertex executionVertex = jobWorker.getExecutionVertex(); List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges(); Map<String, List<String>> opGroupedChannelId = new HashMap<>(); Map<String, List<BaseActorHandle>> opGroupedActor = new HashMap<>(); Map<String, Partition> opPartitionMap = new HashMap<>(); for (int i = 0; i < outputEdges.size(); ++i) { ExecutionEdge edge = outputEdges.get(i); String opName = edge.getTargetExecutionJobVertexName(); if (!opPartitionMap.containsKey(opName)) { opGroupedChannelId.put(opName, new ArrayList<>()); opGroupedActor.put(opName, new ArrayList<>()); } opGroupedChannelId.get(opName).add(executionVertex.getOutputChannelIdList().get(i)); opGroupedActor.get(opName).add(executionVertex.getOutputActorList().get(i)); opPartitionMap.put(opName, edge.getPartition()); } opPartitionMap.keySet().forEach(opName -> { collectors.add(new OutputCollector(writer, opGroupedChannelId.get(opName), opGroupedActor.get(opName), opPartitionMap.get(opName))); }); RuntimeContext runtimeContext = new StreamingRuntimeContext(executionVertex, jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism()); processor.open(collectors, runtimeContext); }
StreamTask
的检查点相关操作主要有以下几个:
- 用于触发检查点的
triggerCheckpoint
函数,该函数只允许在数据源中调用。 - 用于执行检查点的
doCheckpoint
函数,该函数会首先存储消息服务的相关数据偏移量 (所有计算任务都会保存并分发DataWriter
偏移量,同时上游传入的偏移量作为DataReader
的偏移量进行保存),然后继续将包含偏移量的检查点信息进行广播,调用processor
的saveCheckpoint
函数执行用户的检查点方法,最后调用saveCpStateAndReport
函数保存检查点信息并汇报结果。 saveCpStateAndReport
函数会依次调用saveCp
和reportCommit
函数。saveCp
是最终执行检查点保存的函数,调用工具类CheckpointStateUtil
将检查点数据存储到后端中。reportCommit
函数负责通过RemoteCallMaster
向JobMaster
汇报检查点任务完成。clearExpiredCpState
,clearExpiredQueueMsg
, 用于清理本地过期检查点数据。
Java123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104public boolean triggerCheckpoint(Long barrierId) { throw new UnsupportedOperationException("Only source operator supports trigger checkpoints."); } public void doCheckpoint(long checkpointId, Map<String, OffsetInfo> inputPoints) { Map<String, OffsetInfo> outputPoints = null; if (writer != null) { outputPoints = writer.getOutputCheckpoints(); RemoteCall.Barrier barrierPb = RemoteCall.Barrier.newBuilder().setId(checkpointId).build(); ByteBuffer byteBuffer = ByteBuffer.wrap(barrierPb.toByteArray()); byteBuffer.order(ByteOrder.nativeOrder()); writer.broadcastBarrier(checkpointId, byteBuffer); } LOG.info("Start do checkpoint, cp id={}, inputPoints={}, outputPoints={}.", checkpointId, inputPoints, outputPoints); this.lastCheckpointId = checkpointId; Serializable processorCheckpoint = processor.saveCheckpoint(); try { OperatorCheckpointInfo opCpInfo = new OperatorCheckpointInfo(inputPoints, outputPoints, processorCheckpoint, checkpointId); saveCpStateAndReport(opCpInfo, checkpointId); } catch (Exception e) { // there will be exceptions when flush state to backend. // we ignore the exception to prevent failover LOG.error("Processor or op checkpoint exception.", e); } LOG.info("Operator do checkpoint {} finish.", checkpointId); } private void saveCpStateAndReport(OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) { saveCp(operatorCheckpointInfo, checkpointId); reportCommit(checkpointId); LOG.info("Finish save cp state and report, checkpoint id is {}.", checkpointId); } private void saveCp(OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) { byte[] bytes = Serializer.encode(operatorCheckpointInfo); String cpKey = genOpCheckpointKey(checkpointId); LOG.info("Saving task checkpoint, cpKey={}, byte len={}, checkpointInfo={}.", cpKey, bytes.length, operatorCheckpointInfo); synchronized (checkpointState) { if (outdatedCheckpoints.contains(checkpointId)) { LOG.info("Outdated checkpoint, skip save checkpoint."); outdatedCheckpoints.remove(checkpointId); } else { CheckpointStateUtil.put(checkpointState, cpKey, bytes); } } } private void reportCommit(long checkpointId) { final JobWorkerContext context = jobWorker.getWorkerContext(); LOG.info("Report commit async, checkpoint id {}.", checkpointId); RemoteCallMaster.reportJobWorkerCommitAsync(context.getMaster(), new WorkerCommitReport(context.getWorkerActorId(), checkpointId)); } public void notifyCheckpointTimeout(long checkpointId) { String cpKey = genOpCheckpointKey(checkpointId); try { synchronized (checkpointState) { if (checkpointState.exists(cpKey)) { checkpointState.remove(cpKey); } else { outdatedCheckpoints.add(checkpointId); } } } catch (Exception e) { LOG.error("Notify checkpoint timeout failed, checkpointId is {}.", checkpointId, e); } } public void clearExpiredCpState(long checkpointId) { String cpKey = genOpCheckpointKey(checkpointId); try { checkpointState.remove(cpKey); } catch (Exception e) { LOG.error("Failed to remove key {} from state backend.", cpKey, e); } } public void clearExpiredQueueMsg(long checkpointId) { // get operator checkpoint String cpKey = genOpCheckpointKey(checkpointId); byte[] bytes; try { bytes = checkpointState.get(cpKey); } catch (Exception e) { LOG.error("Failed to get key {} from state backend.", cpKey, e); return; } if (bytes != null) { final OperatorCheckpointInfo operatorCheckpointInfo = Serializer.decode(bytes); long cpId = operatorCheckpointInfo.checkpointId; if (writer != null) { writer.clearCheckpoint(cpId); } } }
StreamTask
的 requestRollback
函数请求业务回滚。
Java123protected void requestRollback(String exceptionMsg) { jobWorker.requestRollback(exceptionMsg); }
2.2.6.1 SourceStreamTask
ray/streaming/runtime/worker/tasks/SourceStreamTask.java
是对数据源的专门实现,重点构建了数据读取的 run
函数和触发检查点的 triggerCheckpoint
函数。
SourceStreamTask
的 run
函数会循环执行检查点判定和数据读取,正常 process
的数据处理函数为 process
,由于数据源不需要数据输入,所以单独定义了 fetch
函数。当获取到 barrierId
是,会调用 doCheckpoint
函数向下游发送检查点消息。
Java1234567891011121314151617181920212223242526272829303132333435363738394041public void run() { LOG.info("Source stream task thread start."); Long barrierId; try { while (running) { isInitialState = false; // check checkpoint barrierId = pendingBarrier.get(); if (barrierId != null) { // Important: because cp maybe timeout, master will use the old checkpoint id again if (pendingBarrier.compareAndSet(barrierId, null)) { // source fetcher only have outputPoints LOG.info("Start to do checkpoint {}, worker name is {}.", barrierId, jobWorker.getWorkerContext().getWorkerName()); doCheckpoint(barrierId, null); LOG.info("Finish to do checkpoint {}.", barrierId); } else { // pendingCheckpointId has modify, should not happen LOG.warn("Pending checkpointId modify unexpected, expect={}, now={}.", barrierId, pendingBarrier.get()); } } sourceProcessor.fetch(); } } catch (Throwable e) { if (e instanceof ChannelInterruptException || ExceptionUtils.getRootCause(e) instanceof ChannelInterruptException) { LOG.info("queue has stopped."); } else { // occur error, need to rollback LOG.error("Last success checkpointId={}, now occur error.", lastCheckpointId, e); requestRollback(ExceptionUtils.getStackTrace(e)); } } LOG.info("Source stream task thread exit."); }
SourceStreamTask
的 triggerCheckpoint
函数,用于向 pendingBarrier
中插入 检查点 ID
,以供循环执行时检查检查点信息。
Java123public boolean triggerCheckpoint(Long barrierId) { return pendingBarrier.compareAndSet(null, barrierId); }
2.2.6.2 InputStreamTask
ray/streaming/runtime/worker/tasks/InputStreamTask.java
主要实现了数据处理过程,后续的子类只是提供对不同处理类型的判断。在 run
函数中,会不断从 reader
中获取数据,并根据数据类型的不同,待用 processor
处理数据或进行检查点操作。
Java12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455public void run() { try { while (running) { ChannelMessage item; // reader.read() will change the consumer state once it got an item. This lock is to // ensure worker can get correct isInitialState value in exactly-once-mode's rollback. synchronized (jobWorker.initialStateChangeLock) { item = reader.read(readTimeoutMillis); if (item != null) { isInitialState = false; } else { continue; } } if (item instanceof DataMessage) { DataMessage dataMessage = (DataMessage) item; byte[] bytes = new byte[dataMessage.body().remaining() - 1]; byte typeId = dataMessage.body().get(); dataMessage.body().get(bytes); Object obj; if (typeId == Serializer.JAVA_TYPE_ID) { obj = javaSerializer.deserialize(bytes); } else { obj = crossLangSerializer.deserialize(bytes); } processor.process(obj); } else if (item instanceof BarrierMessage) { final BarrierMessage queueBarrier = (BarrierMessage) item; byte[] barrierData = new byte[queueBarrier.getData().remaining()]; queueBarrier.getData().get(barrierData); RemoteCall.Barrier barrierPb = RemoteCall.Barrier.parseFrom(barrierData); final long checkpointId = barrierPb.getId(); LOG.info("Start to do checkpoint {}, worker name is {}.", checkpointId, jobWorker.getWorkerContext().getWorkerName()); final Map<String, OffsetInfo> inputPoints = queueBarrier.getInputOffsets(); doCheckpoint(checkpointId, inputPoints); LOG.info("Do checkpoint {} success.", checkpointId); } } } catch (Throwable throwable) { if (throwable instanceof ChannelInterruptException || ExceptionUtils.getRootCause(throwable) instanceof ChannelInterruptException) { LOG.info("queue has stopped."); } else { // error occurred, need to rollback LOG.error("Last success checkpointId={}, now occur error.", lastCheckpointId, throwable); requestRollback(ExceptionUtils.getStackTrace(throwable)); } } LOG.info("Input stream task thread exit."); stopped = true; }
2.2.7 StreamingRuntimeContext
ray/streaming/runtime/worker/context/StreamingRuntimeContext.java
是对 streaming-api
中 RuntimeContext
的实现,在创建时保存了计算任务的基本信息。传入的 ExecutionVertex
只是用来获取 taskID
和 taskIndex
.
Java12345678public StreamingRuntimeContext( ExecutionVertex executionVertex, Map<String, String> config, int parallelism) { this.taskId = executionVertex.getExecutionVertexId(); this.config = config; this.taskIndex = executionVertex.getExecutionVertexIndex(); this.parallelism = parallelism; }
StreamingRuntimeContext
的其他函数都是用来提供成员变量的读写,其中需要关注的是 KeyStateBackend
和 OperatorStateBackend
, 这两个位于 streaming-state
的状态后端类,目前只提供了设置入口,后续未间初始化和调用。
2.2.8 Processor
ray/streaming/runtime/core/processor/Processor.java
是所有流计算任务的具体处理位置,存在多个特化的子类,用于实现对不同计算的处理,其类继承关系如下。
bash123456Processor -->作为接口父类,主要提供函数接口. └───>StreamProcessor -->对除 `process` 外的函数进行了实现,提供基本功能.在不同阶段调用Operator的不同函数. ├──>SourceProcessor -->对外提供数据源读取的 `fetch` 函数 ├──>OneInputProcessor -->对外提供单输入处理,调用operator的 `processElement` 函数. └──>TwoInputProcessor -->与单输入类似,但是需要首先判断输入数据所属上游流,然后再调用operator的 `processElement` 函数.
Processor
主要存在五个接口,用于计算任务的不同阶段。
Java123456789101112131415void open(List<Collector> collectors, RuntimeContext runtimeContext); void process(T t); /** * See {@link Function#saveCheckpoint()}. */ Serializable saveCheckpoint(); /** * See {@link Function#loadCheckpoint(Serializable)}. */ void loadCheckpoint(Serializable checkpointObject); void close();
2.2.9 ContextBackend
ray/streaming/runtime/context/ContextBackend.java
是 streaming-runtime
中对检查点后端的简易实现,对外提供了四个接口。随后实现了基于 HashMap
的 MemoryContextBackend
和基于本地文件系统的 AtomicFsBackend
,文件系统路径从 config
中获取。
Java1234567891011121314151617181920212223242526272829/** * check if key exists in state * * @return true if exists */ boolean exists(final String key) throws Exception; /** * get content by key * * @param key key * @return the StateBackend */ byte[] get(final String key) throws Exception; /** * put content by key * * @param key key * @param value content */ void put(final String key, final byte[] value) throws Exception; /** * remove content by key * * @param key key */ void remove(final String key) throws Exception;
2.2.10 BaseCoordinator
ray/streaming/runtime/master/coordinator/BaseCoordinator.java
是 CheckpointCoordinator
和 FailoverCoordinator
的父类,实现自 Runnable
,将自身注册为独立线程。
Java1234567891011121314151617181920212223public BaseCoordinator(JobMaster jobMaster) { this.jobMaster = jobMaster; this.runtimeContext = jobMaster.getRuntimeContext(); this.graphManager = jobMaster.getGraphManager(); } public void start() { thread = new Thread(Ray.wrapRunnable(this), this.getClass().getName() + "-" + System.currentTimeMillis()); thread.start(); } public void stop() { closed = true; try { if (thread != null) { thread.join(30000); } } catch (InterruptedException e) { LOG.error("Coordinator thread exit has exception.", e); } }
2.2.10.1 CheckpointCoordinator
CheckpointCoordinator
的 run
函数中,主要流程未持续检查 runtimeContext
中的检查点命令,并作出相关操作。
Java1234567891011121314151617181920212223242526272829303132333435public void run() { while (!closed) { try { final BaseWorkerCmd command = runtimeContext.cpCmds.poll(1, TimeUnit.SECONDS); if (command != null) { if (command instanceof WorkerCommitReport) { processCommitReport((WorkerCommitReport) command); } else { interruptCheckpoint(); } } if (!pendingCheckpointActors.isEmpty()) { // if wait commit report timeout, this cp fail, and restart next cp if (timeoutOnWaitCheckpoint()) { LOG.warn("Waiting for checkpoint {} timeout, pending cp actors is {}.", runtimeContext.lastCheckpointId, graphManager.getExecutionGraph().getActorName(pendingCheckpointActors)); interruptCheckpoint(); } } else { maybeTriggerCheckpoint(); } } catch (Throwable e) { LOG.error("Checkpoint coordinator occur err.", e); try { interruptCheckpoint(); } catch (Throwable interruptE) { LOG.error("Ignore interrupt checkpoint exception in catch block."); } } } LOG.warn("Checkpoint coordinator thread exit."); }
当 run
函数中获取到的为 JobWorker
正常发回的检查点消息时,将触发 processCommitReport
函数。该函数的逻辑是不断从 WorkerCommitReport
获取检查点完成消息,并将完成的 Job Worker
移出检查点队列,当队列清空时,则代表一轮检查点完成,此时通过 clearExpiredCpStateAndQueueMsg
通知所有计算任务清理上一轮检查点信息,然后通知 JobMaster
将自身也进行检查点保存。
Java123456789101112131415161718192021222324252627282930313233343536373839private void processCommitReport(WorkerCommitReport commitReport) { LOG.info("Start process commit report {}, from actor name={}.", commitReport, graphManager.getExecutionGraph().getActorName(commitReport.fromActorId)); try { Preconditions.checkArgument( commitReport.commitCheckpointId == runtimeContext.lastCheckpointId, "expect checkpointId %s, but got %s", runtimeContext.lastCheckpointId, commitReport); if (!pendingCheckpointActors.contains(commitReport.fromActorId)) { LOG.warn("Invalid commit report, skipped."); return; } pendingCheckpointActors.remove(commitReport.fromActorId); LOG.info("Pending actors after this commit: {}.", graphManager.getExecutionGraph().getActorName(pendingCheckpointActors)); // checkpoint finish if (pendingCheckpointActors.isEmpty()) { // actor finish runtimeContext.checkpointIds.add(runtimeContext.lastCheckpointId); if (clearExpiredCpStateAndQueueMsg()) { // save master context jobMaster.saveContext(); LOG.info("Finish checkpoint: {}.", runtimeContext.lastCheckpointId); } else { LOG.warn("Fail to do checkpoint: {}.", runtimeContext.lastCheckpointId); } } LOG.info("Process commit report {} success.", commitReport); } catch (Throwable e) { LOG.warn("Process commit report has exception.", e); } }
clearExpiredCpStateAndQueueMsg
函数的业务逻辑是从执行图中获取所有 actor
,然后通过逐个通知清理检查点。
Java12345678910111213141516private boolean clearExpiredCpStateAndQueueMsg() { // queue msg must clear when first checkpoint finish List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors(); if (1 == runtimeContext.checkpointIds.size()) { Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0); RemoteCallWorker.clearExpiredCheckpointParallel(allActor, 0L, msgExpiredCheckpointId); } if (runtimeContext.checkpointIds.size() > 1) { Long stateExpiredCpId = runtimeContext.checkpointIds.remove(0); Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0); RemoteCallWorker .clearExpiredCheckpointParallel(allActor, stateExpiredCpId, msgExpiredCheckpointId); } return true; }
当 run
函数获取到检查点执行存在异常时,会触发 interruptCheckpoint
函数。该函数最终执行逻辑和 clearExpiredCpStateAndQueueMsg
函数相似,只是调用函数不同。最后会调用一次 maybeTriggerCheckpoint
判断是否要启动新的检查点。
Java123456789101112131415161718192021private void interruptCheckpoint() { // notify checkpoint timeout is time-consuming while many workers crash or // container failover. if (interruptedCheckpointSet.contains(runtimeContext.lastCheckpointId)) { LOG.warn("Skip interrupt duplicated checkpoint id : {}.", runtimeContext.lastCheckpointId); return; } interruptedCheckpointSet.add(runtimeContext.lastCheckpointId); LOG.warn("Interrupt checkpoint, checkpoint id : {}.", runtimeContext.lastCheckpointId); List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors(); if (runtimeContext.lastCheckpointId > runtimeContext.getLastValidCheckpointId()) { RemoteCallWorker .notifyCheckpointTimeoutParallel(allActor, runtimeContext.lastCheckpointId); } if (!pendingCheckpointActors.isEmpty()) { pendingCheckpointActors.clear(); } maybeTriggerCheckpoint(); }
当检查点正常结束后或异常处理结束后,会触发 maybeTriggerCheckpoint
函数。该函数首先判断检查点间隔是否满足,然后调用 triggerCheckpoint
函数。
Java1234567891011private void maybeTriggerCheckpoint() { if (readyToTrigger()) { triggerCheckpoint(); } } private boolean readyToTrigger() { return (System.currentTimeMillis() - runtimeContext.lastCpTimestamp) >= cpIntervalSecs * 1000; }
triggerCheckpoint
函数是最终的检查点触发函数,会通过 RemoteCallWorker
调用 JobWorker
的 triggerCheckpoint
将检查点信息发送到所有数据源计算节点中。启动检查点任务。
Java1234567891011121314151617181920212223242526272829private void triggerCheckpoint() { interruptedCheckpointSet.clear(); if (LOG.isInfoEnabled()) { LOG.info("Start trigger checkpoint {}.", runtimeContext.lastCheckpointId + 1); } List<ActorId> allIds = graphManager.getExecutionGraph().getAllActorsId(); // do the checkpoint pendingCheckpointActors.addAll(allIds); // inc last checkpoint id ++runtimeContext.lastCheckpointId; final List<ObjectRef> sourcesRet = new ArrayList<>(); graphManager.getExecutionGraph().getSourceActors().forEach(actor -> { sourcesRet.add(RemoteCallWorker.triggerCheckpoint( actor, runtimeContext.lastCheckpointId)); }); for (ObjectRef rayObject : sourcesRet) { if (rayObject.get() instanceof RayException) { LOG.warn("Trigger checkpoint has exception.", (RayException) rayObject.get()); throw (RayException) rayObject.get(); } } runtimeContext.lastCpTimestamp = System.currentTimeMillis(); LOG.info("Trigger checkpoint success."); }
2.2.10.2 FailoverCoordinator
FailoverCoordinator
的 requestJobWorkerRollback
函数向 JobMaster
暴露,用于判断是否向上下文中添加错误恢复请求信息。
Java123456789101112131415public Boolean requestJobWorkerRollback(WorkerRollbackRequest request) { LOG.info("Request job worker rollback {}.", request); boolean ret; if (!isDuplicateRequest(request)) { ret = runtimeContext.foCmds.offer(request); } else { LOG.warn("Skip duplicated worker rollback request, {}.", request.toString()); return true; } jobMaster.saveContext(); if (!ret) { LOG.warn("Request job worker rollback failed, because command queue is full."); } return ret; }
FailoverCoordinator
的 run
函数同样不断查询检查点指令,当获取到 WorkerRollbackRequest
请求时,将调用 dealWithRollbackRequest
进行计算回滚。
Java123456789101112131415161718192021public void run() { while (!closed) { try { final BaseWorkerCmd command; // see rollback() for lock reason synchronized (cmdLock) { command = jobMaster.getRuntimeContext().foCmds.poll(1, TimeUnit.SECONDS); } if (null == command) { continue; } if (command instanceof WorkerRollbackRequest) { jobMaster.getRuntimeContext().unfinishedFoCmds.add(command); dealWithRollbackRequest((WorkerRollbackRequest) command); } } catch (Throwable e) { LOG.error("Fo coordinator occur err.", e); } } LOG.warn("Fo coordinator thread exit."); }
FailoverCoordinator
的 dealWithRollbackRequest
函数,首先会从回滚消息中获取回滚请求的计算节点,然后判断回滚是否有效,最后调用 interruptCheckpointAndRollback
函数执行回滚。
Java12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273private void dealWithRollbackRequest(WorkerRollbackRequest rollbackRequest) { LOG.info("Start deal with rollback request {}.", rollbackRequest); ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest); // Reset pid for new-rollback actor. if (null != rollbackRequest.getPid() && !rollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) { exeVertex.setPid(rollbackRequest.getPid()); } if (isRollbacking.get(exeVertex)) { LOG.info("Vertex {} is rollbacking, skip rollback again.", exeVertex); return; } String hostname = ""; Optional<Container> container = ResourceUtil.getContainerById( jobMaster.getResourceManager().getRegisteredContainers(), exeVertex.getContainerId() ); if (container.isPresent()) { hostname = container.get().getHostname(); } if (rollbackRequest.isForcedRollback) { interruptCheckpointAndRollback(rollbackRequest); } else { asyncRemoteCaller.checkIfNeedRollbackAsync(exeVertex.getWorkerActor(), res -> { if (!res) { LOG.info("Vertex {} doesn't need to rollback, skip it.", exeVertex); return; } interruptCheckpointAndRollback(rollbackRequest); }, throwable -> { LOG.error("Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead" + ", ignore this request, vertex={}.", exeVertex, throwable); }); } LOG.info("Deal with rollback request {} success.", rollbackRequest); } private Boolean isDuplicateRequest(WorkerRollbackRequest request) { try { Object[] foCmdsArray = runtimeContext.foCmds.toArray(); for (Object cmd : foCmdsArray) { if (request.fromActorId.equals(((BaseWorkerCmd) cmd).fromActorId)) { return true; } } } catch (Exception e) { LOG.warn("Check request is duplicated failed.", e); } return false; } private ExecutionVertex getExeVertexFromRequest(WorkerRollbackRequest rollbackRequest) { ActorId actorId = rollbackRequest.fromActorId; Optional<BaseActorHandle> rayActor = graphManager.getExecutionGraph().getActorById(actorId); if (!rayActor.isPresent()) { throw new RuntimeException("Can not find ray actor of ID " + actorId); } return getExecutionVertex(rollbackRequest.fromActorId); } private ExecutionVertex getExecutionVertex(BaseActorHandle actor) { return graphManager.getExecutionGraph().getExecutionVertexByActorId(actor.getId()); } private ExecutionVertex getExecutionVertex(ActorId actorId) { return graphManager.getExecutionGraph().getExecutionVertexByActorId(actorId); }
FailoverCoordinator
的 interruptCheckpointAndRollback
函数,通过 rollback
执行回滚,然后向上下文中添加中断检查点请求。
Java123456789101112131415private void interruptCheckpointAndRollback(WorkerRollbackRequest rollbackRequest) { // assign a cascadingGroupId if (rollbackRequest.cascadingGroupId == null) { rollbackRequest.cascadingGroupId = currentCascadingGroupId++; } // get last valid checkpoint id then call worker rollback rollback(jobMaster.getRuntimeContext().getLastValidCheckpointId(), rollbackRequest, currentCascadingGroupId); // we interrupt current checkpoint for 2 considerations: // 1. current checkpoint might be timeout, because barrier might be lost after failover. so we // interrupt current checkpoint to avoid waiting. // 2. when we want to rollback vertex to n, job finished checkpoint n+1 and cleared state // of checkpoint n. jobMaster.getRuntimeContext().cpCmds.offer(new InterruptCheckpointRequest()); }
FailoverCoordinator
的 rollback
函数,在将请求回滚的 JobWorker
成功进行回滚后,会通知 JobMaster
将所有上游算子进行回滚,同时保存 JobMaster
状态。回滚将一直递归到数据源算子。
Java1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889private void rollback( long checkpointId, WorkerRollbackRequest rollbackRequest, long cascadingGroupId) { ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest); LOG.info("Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}.", exeVertex, checkpointId, cascadingGroupId); isRollbacking.put(exeVertex, true); asyncRemoteCaller.rollback(exeVertex.getWorkerActor(), checkpointId, result -> { List<WorkerRollbackRequest> newRollbackRequests = new ArrayList<>(); switch (result.getResultEnum()) { case SUCCESS: ChannelRecoverInfo recoverInfo = result.getResultObj(); LOG.info("Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}.", exeVertex, recoverInfo.getDataLostQueues(), result.getResultMsg(), cascadingGroupId); // rollback upstream if vertex reports abnormal input queues newRollbackRequests = cascadeUpstreamActors(recoverInfo.getDataLostQueues(), exeVertex, cascadingGroupId); break; case SKIPPED: LOG.info("Vertex skip rollback, result = {}, cascadingGroupId={}.", result, cascadingGroupId); break; default: LOG.error( "Rollback vertex {} failed, result={}, cascadingGroupId={}," + " rollback this worker again after {} ms.", exeVertex, result, cascadingGroupId, ROLLBACK_RETRY_TIME_MS); Thread.sleep(ROLLBACK_RETRY_TIME_MS); LOG.info("Add rollback request for {} again, cascadingGroupId={}.", exeVertex, cascadingGroupId); newRollbackRequests.add( new WorkerRollbackRequest(exeVertex, "", "Rollback failed, try again.", false) ); break; } // lock to avoid executing new rollback requests added. // consider such a case: A->B->C, C cascade B, and B cascade A // if B is rollback before B's rollback request is saved, and then JobMaster crashed, // then A will never be rollback. synchronized (cmdLock) { jobMaster.getRuntimeContext().foCmds.addAll(newRollbackRequests); // this rollback request is finished, remove it. jobMaster.getRuntimeContext().unfinishedFoCmds.remove(rollbackRequest); jobMaster.saveContext(); } isRollbacking.put(exeVertex, false); }, throwable -> { LOG.error("Exception when calling vertex to rollback, vertex={}.", exeVertex, throwable); isRollbacking.put(exeVertex, false); }); LOG.info("Finish rollback vertex {}, checkpoint id is {}.", exeVertex, checkpointId); } private List<WorkerRollbackRequest> cascadeUpstreamActors( Set<String> dataLostQueues, ExecutionVertex fromVertex, long cascadingGroupId) { List<WorkerRollbackRequest> cascadedRollbackRequest = new ArrayList<>(); // rollback upstream if vertex reports abnormal input queues dataLostQueues.forEach(q -> { BaseActorHandle upstreamActor = graphManager.getExecutionGraph().getPeerActor(fromVertex.getWorkerActor(), q); ExecutionVertex upstreamExeVertex = getExecutionVertex(upstreamActor); // vertexes that has already cascaded by other vertex in the same level // of graph should be ignored. if (isRollbacking.get(upstreamExeVertex)) { return; } LOG.info("Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}.", upstreamExeVertex, fromVertex, cascadingGroupId); String hostname = ""; Optional<Container> container = ResourceUtil.getContainerById( jobMaster.getResourceManager().getRegisteredContainers(), upstreamExeVertex.getContainerId() ); if (container.isPresent()) { hostname = container.get().getHostname(); } // force upstream vertexes to rollback WorkerRollbackRequest upstreamRequest = new WorkerRollbackRequest( upstreamExeVertex, hostname, String.format("Cascading rollback from %s", fromVertex), true ); upstreamRequest.cascadingGroupId = cascadingGroupId; cascadedRollbackRequest.add(upstreamRequest); }); return cascadedRollbackRequest; }
三、总结
ray-streaming
提供对python API
的支持,其后端运行方式一致,主要时提供了一个用于python
连接java JobMaster
的_PYTHON_GATEWAY
。若提供对新计算算子的支持,需要在streaming/python/datastream.py
,streaming/python/operator.py
,streaming/python/function.py
提供相应接口。ray-streaming
目前仅支持简单计算,不支持双输入处理,单输入中也缺失关于时间的处理。ray-streaming
目前的检查点只有一个简易的内存后端和一个本地文件后端。状态后端streaming-state
只保留了函数接口,并没有整合到计算系统中。- 从调用逻辑上来看,状态后端
streaming-state
应该在StreamTask
中被创建。 - 用于
Task
之间消息传递的DataMessage
包含时间戳相关信息,用于数据处理的Record
已经丢失时间信息,无法有效支持时间处理。