天天热推荐:elastic-job源码(2)-选举机制
(相关资料图)
public void registerStartUpInfo(final boolean enabled) { //开始所有的监听器 listenerManager.startAllListeners(); //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器 leaderService.electLeader(); //{namespace}/{ipservers} 设置enable处理 serverService.persistOnline(enabled); //临时节点 /{namespave}/instances 放置运行服务实例信息 instanceService.persistOnline(); //开启一个异步服务 if (!reconcileService.isRunning()) { reconcileService.startAsync(); }}listenerManager.startAllListeners();会开启一个选举相关的listenerManager ElectionListenerManager.classleaderService.electLeader();执行选举功能 第一步:执行选举功能
public void electLeader() { log.debug("Elect a new leader now."); this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback()); log.debug("Leader election completed.");}
public void executeInLeader(String key, LeaderExecutionCallback callback) { try { LeaderLatch latch = new LeaderLatch(this.client, key); try { latch.start(); latch.await(); callback.execute(); } catch (Throwable var7) { try { latch.close(); } catch (Throwable var6) { var7.addSuppressed(var6); } throw var7; } latch.close(); } catch (Exception var8) { this.handleException(var8); }}{job name}/leader/election/latch节点加zk锁,在抢到锁之后,调用callback对象中的execute方法
class LeaderElectionExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { //{jobname}/leader/election/instance 不存在 if (!hasLeader()) { //创建临时节点 {jobname}/leader/election/instance 值为 当前运行的实例值 例如:10.100.16.75@-@134642 前面是ip地址,后面是产生的随机数 //当应用实例与zk断开重新连接时,该节点信息会清除 jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } }}第二步:ElectionListenerManager.class开启监听
@Overridepublic void start() { addDataListener(new LeaderElectionJobListener()); addDataListener(new LeaderAbdicationJobListener());}执行start方法有两个监听LeaderElectionJobListener:用于leader宕机之后重新选举监听LeaderAbdicationJobListener :用于监听leader宕机数据处理 LeaderElectionJobListener.java
@Overridepublic void onChange(final DataChangedEvent event) { //1.schedulerMap 和 jobInstanceMap 没有job信息 //2.{jobname}/service/{ip} 节点数据为DISABLE 或者 ({jobname}/leader/election/instance 节点的类型为删除且{jobname}/servers 节点的值是ENABLED 且 {jobname}/instances 节点下有其他的在线实例) //当前运行的job实例宕机,并且有其他运行的实例 if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) { //重新选举 leaderService.electLeader(); }}
LeaderAbdicationJobListener.java
@Overridepublic void onChange(final DataChangedEvent event) { //{jobname}/leader/election/instance节点的实例id和JobRegistry对象中的实例id相等 //{jobname}/service/{ip}/ 是DISABLED //就是实例下线 if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) { //删除{jobname}/leader/election/instance 节点 leaderService.removeLeader(); }}
标签:
为您推荐
-
人民网杭州9月17日电 (记者孙博洋)9月16日至17日,中国质量(杭州)大会在浙江杭州举行。在16日举行...
2021-09-18