首页IT科技yarn框架第一版是( )年发布(【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 – 状态机库)

yarn框架第一版是( )年发布(【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 – 状态机库)

时间2025-05-02 09:35:20分类IT科技浏览3334
导读:当一个服务拥有太多处理逻辑时,会导致代码结构异常的混乱,很难分辨一段逻辑是在哪个阶段发挥作用的。 这时就可以引入状态机模型,帮助代码结构变得清晰。...

当一个服务拥有太多处理逻辑时           ,会导致代码结构异常的混乱                  ,很难分辨一段逻辑是在哪个阶段发挥作用的            。

这时就可以引入状态机模型      ,帮助代码结构变得清晰                 。

一           、状态机库概述

一)简介

状态机由一组状态组成:

【初始状态 -> 中间状态 -> 最终状态】      。

在一个状态机中        ,每个状态会接收一组特定的事件                  ,根据事件类型进行处理         ,并转换到下一个状态         。当转换到最终状态时则退出                 。

二)状态转换方式

状态间转换会有下面这三种类型:

三)Yarn 状态机类

在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮助定义状态机         。如何使用     ,我们直接写个 demo      。

二                  、案例 demo

在上一篇文章《Yarn 服务库和事件库》案例基础上进行扩展                  ,增加状态机库的内容                 。如果还不了解服务库和事件库的同学            ,建议先学习下上一篇文章            。

案例已上传至 github  ,有帮助可以点个 ⭐️

https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo

一)状态机实现

状态机实现                 ,可以直接嵌入到上篇文章中的 AsyncDispatcher使用   。

这里仅给出状态机JobStateMachine以及各种事件处理的代码                 。完整的代码项目执行               ,请到 github demo 中查看              。 import com.shuofxz.event.JobEvent; import com.shuofxz.event.JobEventType; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; import java.util.EnumSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /* * 可参考 Yarn 中实现的状态机对象: * ResourceManager 中的 RMAppImpl      、RMApp- AttemptImpl        、RMContainerImpl 和 RMNodeImpl, * NodeManager 中 的 ApplicationImpl                  、 ContainerImpl 和 LocalizedResource              , * MRAppMaster 中的 JobImpl         、TaskImpl 和 TaskAttemptImpl 等 * */ @SuppressWarnings({"rawtypes", "unchecked"}) public class JobStateMachine implements EventHandler<JobEvent> { private final String jobID; private EventHandler eventHandler; private final Lock writeLock; private final Lock readLock; // 定义状态机 protected static final StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent> stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW) .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition()) .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition()) .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition()) .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition()) .installTopology(); private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine; public JobStateMachine(String jobID, EventHandler eventHandler) { this.jobID = jobID; // 多线程异步处理                  ,state 有可能被同时读写   ,使用读写锁来避免竞争 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.eventHandler = eventHandler; stateMachine = stateMachineFactory.make(this); } protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { return stateMachine; } public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> { @Override public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); // do something... // 完成后发送新的 Event —— JOB_START jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START)); } } public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> { @Override public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED)); } } public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> { @Override public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED)); } } public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> { @Override public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); // 这是多结果状态部分           ,因此需要人为制定后续状态 // 这里整个流程结束                  ,设置一下对应的状态 boolean flag = true; if (flag) { return JobStateInternal.SUCCEEDED; } else { return JobStateInternal.KILLED; } } } @Override public void handle(JobEvent jobEvent) { try { // 注意这里为了避免静态条件      ,使用了读写锁 writeLock.lock(); JobStateInternal oldState = getInternalState(); try { getStateMachine().doTransition(jobEvent.getType(), jobEvent); } catch (InvalidStateTransitionException e) { System.out.println("Cant handle this event at current state!"); } if (oldState != getInternalState()) { System.out.println("Job Transitioned from " + oldState + " to " + getInternalState()); } } finally { writeLock.unlock(); } } public JobStateInternal getInternalState() { readLock.lock(); try { return getStateMachine().getCurrentState(); } finally { readLock.unlock(); } } public enum JobStateInternal { NEW, SETUP, INITED, RUNNING, SUCCEEDED, KILLED } }

二)状态机可视化

hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java        ,可以拷贝到我们的工程中使用。

根据提示                  ,运行需要三个参数: Usage: %s <GraphName> <class[,class[,...]]> <OutputFile>%n

运行后会在项目根目录生成图文件 jsm.gv               。

需要使用 graphviz工具将 gv 文件转换成 png 文件: # linux 安装 yum install graphviz # mac 安装 brew install graphviz

转换:

dot -Tpng jsm.gv > jsm.png

可视化状态机展示:

再使用这个工具对 Yarn 中的 Application 状态进行展示:

三)如果不用状态机库

【思考】

如果不用状态机         ,代码结构会是什么样呢?

下面这样的代码     ,如果要增加或修改逻辑可能就是很痛苦的一件事情了                 。 // 一堆的函数调用 // 一堆的 if 嵌套 // 或者 switch case

三     、总结

本节对 Yarn 状态机库进行了介绍   。实际使用时会结合事件库                  、服务库一同使用            。

状态机库的使用帮助代码结构更加的清晰                  ,新增状态处理逻辑只需要增加一个状态类别            ,或者增加一个方法处理对应类型的事件即可                 。将整个处理逻辑进行了拆分  ,便于编写和维护      。

参考文章:

源码|Yarn的事件驱动模型与状态机

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
xp桌面文字大小怎么调(windowsXP桌面字体模糊怎么办) windows11关闭自动更新(Windows 11 Build 22000.176 (KB5006050) 更新推送(附更新内容+安装))