avatar

分布式调度问题

调度指的就是定时任务,分布式调度的意思就是在分布式集群环境下定时任务。

什么是分布式调度

  • 运行在分布式集群环境下的调度任务(同一个定时任务部署多份,只应该有一个定时任务在执行)
  • 分布式调度-》定时任务的分布式=》定时任务的拆分(即为把一个大的作业任务拆分为多个小的作业任务,同时执行)

定时任务与消息队列的区别

  • 共同点:
    • 异步处理:比如注册,下单事件
    • 应用解耦:不管定时任务还是MQ都可以作为两个应用质检的齿轮是实现应用解耦,这个齿轮额可以中转数据,当然单体服务不需要考虑这些,服务拆分的时候往往都会考虑
    • 流量削峰:双十一的时候,任务作业和MQ都可以用来扛流量,后端系统根据服务能力定时处理订单或者从MQ抓取订单抓取到一个订单到来事件的话触发处理,对于前端用户来说,看到的结果是已经下单成功了,下单时不受任何影响的
  • 不同点,本质不同:
    定时任务是时间驱动,而MQ是事件驱动;
    时间驱动不可代替,比如金融系统每日的利息结算,不是说利息来一条(利息到来事件)就结算一下,而往往是通过定时任务批处理计算;
    所以,定时任务作业更倾向于批处理,MQ倾向于逐条处理。

定时任务的实现方式

定时任务实现方式有很多,早期没有定时任务框架的时候,我们会是用JDK中的Timer机制和多线程机制(Runnable+线程休眠)来实现定时或者隔一段时间执行某一段程序,后来有了定时任务框架,比如大名鼎鼎的Quartz任务调度框架,使用时间表达式(包括:秒、分、时、日、周、年),配置某一个任务什么时间去执行

Quartz回顾

  • 引入Quartz的jar包
    <dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
    </dependency>
  • 创建一个job任务
    public class DemoJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    System.out.println("定时任务执行逻辑");
    }
    }
  • 执行调度器
    public class QuartzMain {

    /**
    * main中开启定时任务
    * @param args
    */
    public static void main(String[] args) throws SchedulerException {
    // 1、创建任务调度器
    Scheduler scheduler = createScheduler();
    // 2、创建一个任务
    JobDetail job = createJob();
    // 3、创建任务的时间触发器
    Trigger trigger = createTrigger();
    // 4、使用任务调度器根据时间触发器执行我们的任务
    scheduler.scheduleJob(job, trigger);
    scheduler.start();
    }

    // 3、创建任务的时间触发器
    public static Trigger createTrigger(){
    // 创建时间触发器
    CronTrigger cronTrigger = TriggerBuilder.newTrigger()
    .withIdentity("triggerName","myTrigger" )
    .startNow()
    .withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * * ?")).build();
    return cronTrigger;
    }

    //2、创建一个任务
    public static JobDetail createJob(){
    JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);
    jobBuilder.withIdentity("jobName","myJob");
    return jobBuilder.build();
    }

    //1、创建任务调度器
    public static Scheduler createScheduler() throws SchedulerException {
    SchedulerFactory factory = new StdSchedulerFactory();
    return factory.getScheduler();
    }
    }
  • 点击运行就可以在控制台看到每隔两秒出现一次定时任务执行逻辑 的字样

分布式调度框架Elastic-job

Elastic-job是当当网开源的一个分布式调度解决方案,基于Quartz二次开发的,由两个相互独立的子项目Elastic-job-Lite和Elastic-job-Cloud组成。我们要学习的是Elastic-job-lite,它定位为轻量级五中心化解决方案,使用jar包的形式提供分布式任务的协调服务,而Elastic-job-cloud子项目需要结合Mesos以及Docker在云环境下使用。

主要功能

  • 分布式调度协调:
    在分布式环境中,任务能够按照指定的调度策略执行,并且能够避免同一任务多实例重复执行
  • 丰富的调度策略:
    基于成熟的定时任务作业框架Quartz cron表达式执行定时任务
  • 弹性扩容缩容:
    当集群中增加某一个实例,它应当也能够被选举并执行任务,当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行
  • 失效转移:
    某实例在任务执行失败后,会被转移到其他实例执行
  • 错过执行作业重触发:
    因为某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发
  • 支持并行调度:
    支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行
  • 作业分片一致性:
    当任务被分片后,保证同一分片在分布式环境中仅有一个执行实例

Elastic-Job_Lite应用

Elastic-Job依赖于Zookeeper进行分布式协调,所有需要安装Zookeeper(3.4.6版本以上),关于Zookeeper在后续会有介绍,这里只需要明白Zookeeper的本质功能是:存储+通知

  • 安装Zookeeper(单例配置)

  • 引入jar包

    <dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
    </dependency>
  • 创建一个任务,即一个job

    /**
    * Elastic-Job-Lite定时任务业务逻辑处理类
    */
    public class ArchivieJob implements SimpleJob {

    /**
    * 业务逻辑,每次定时任务执行都会执行一次
    * @param shardingContext
    */
    @Override
    public void execute(ShardingContext shardingContext) {
    System.out.println("定时任务执行逻辑");
    }
    }
  • 需要进行一些配置

    public class ElasticJobMain {
    public static void main(String[] args) {
    // 配置分布式协调服务 (zookeeper注册中心) namespace 可以自定义,同一命名空间下可以建立多个任务
    ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "default");
    // 创建注册中心对象
    CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
    coordinatorRegistryCenter.init();
    // 配置任务(时间事件、定时任务业务逻辑、调度器)
    // shardingTotalCount 任务分片数,先置为1
    JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("job-one", "*/2 * * * * * ?", 1).build();
    SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,ArchivieJob.class.getName());
    // 创建调度器 overwrite 每次执行完任务配置覆盖
    new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build())
    .init();
    }
    }
  • 开启多个实例,进行测试,此时如果开两个实例,会随机选择一个实例执行任务,如果当运行实例的服务挂掉,另一个服务会紧接着继续执行。

  • Leader选举机制,每个Elastic-job的任务执行实例App作为Zookeeper欸点客户端来操作Zookeeper的znode

    • 多个实例同时创建/leader节点
    • /leader节点只能创建一个,后创建的会失败,创建成功的实例会选为leader节点,执行该任务

Elastic-Job-Lite轻量级去中心化的特点

  • 去中心化:
    • 执行节点对等(程序和jar一样,唯一不一样的可能是分片)
    • 定时调度自触发(没有中心调度节点分配)
    • 服务自发现(过注册中心的服务发现)
    • 主节点非固定
  • 轻量级:
    • All in jar 必要依赖仅仅是一个zookeeper
    • 非独立部署的中间件,就是jar程序

Elastic-job任务分片

一个大的非常耗时的作业job,比如:一次要处理一亿的数据,那这一亿的数据存储在数据库中,如果用一个作业节点处理一亿数据要很久,在互联网领域是不太能接收的,互联网领域更希望机器的增加去横向扩展处理能力。所以,Elastic-job可以把作业分为多个task(每一个task就是一个任务分片),每一个task交给具体的一个机器实例去处理(一个机器实例是可以处理多个task的),但是具体每一个task执行什么逻辑由我们自己来指定。

Strategy策略定义这些分片项怎么去分配到各个机器上去,默认是平均去分的,可以定制,比如某一个机器负载比较高或者预配置比较高,那么就可以写策略。分片和作业本身是通过一个注册中心协调的,因为在分布式环境下,状态数据肯定集中到一点,才可以在分布式中沟通。

分片使用方式

在配置中JobCoreConfiguration配置任务分片数以及task参数

 // shardingTotalCount 设置分片参数
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("job-one", "*/2 * * * * ?", 3)
// 设置分片参数
.shardingItemParameters("0=test1,1=test2,2=test3")
.build();

然后在job中可以通过shardingContext获取到分片参数

/**
* Elastic-Job-Lite定时任务业务逻辑处理类
*/
public class ArchivieJob implements SimpleJob {

/**
* 业务逻辑,每次定时任务执行都会执行一次
* @param shardingContext
*/
@Override
public void execute(ShardingContext shardingContext) {
// 任务分片 shardingContext.getShardingParameter() 参数
System.out.println("定时任务执行分片逻辑"+shardingContext.getShardingParameter());
}
}

执行后,就可以看到每个进程获取到的分片数

Elastic-job弹性扩容

新增加一个运行实例,他会自动给注册到注册中心,注册中心发现新的服务上线,注册中心会通知ElasticJob进行重新分片,那么总得片项有多少,那么就可以搞多少个实例机器,比如完全可以分1000片,那么就可以搞1000台机器一起执行作业
注意:

  • 分片项也是一个Job配置,修改配置,重新分片,在下一次定时运行之前会重新调用分片算法,那么这个分片算法的结果就是:哪台机器运行那一片,这个结果存储在zk中,主节点会把分片给分好放到注册中心去,然后执行节点从注册中心获取信息(执行节点在定时任务开启的时候获取相应的分片)
  • 如果所有的节点挂掉剩下一个节点,那么所有分片都会指向剩余的一个节点,这也是ElasticJob的高可用。
文章作者: zenshin
文章链接: https://zlh.giserhub.com/2022/01/06/cl35o0mr7003yp4tg5umq9trr/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 zenshin's blog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论