调度指的就是定时任务,分布式调度的意思就是在分布式集群环境下定时任务。
什么是分布式调度
- 运行在分布式集群环境下的调度任务(同一个定时任务部署多份,只应该有一个定时任务在执行)
- 分布式调度-》定时任务的分布式=》定时任务的拆分(即为把一个大的作业任务拆分为多个小的作业任务,同时执行)
定时任务与消息队列的区别
- 共同点:
- 异步处理:比如注册,下单事件
- 应用解耦:不管定时任务还是MQ都可以作为两个应用质检的齿轮是实现应用解耦,这个齿轮额可以中转数据,当然单体服务不需要考虑这些,服务拆分的时候往往都会考虑
- 流量削峰:双十一的时候,任务作业和MQ都可以用来扛流量,后端系统根据服务能力定时处理订单或者从MQ抓取订单抓取到一个订单到来事件的话触发处理,对于前端用户来说,看到的结果是已经下单成功了,下单时不受任何影响的
- 不同点,本质不同:
定时任务是时间驱动,而MQ是事件驱动;
时间驱动不可代替,比如金融系统每日的利息结算,不是说利息来一条(利息到来事件)就结算一下,而往往是通过定时任务批处理计算;
所以,定时任务作业更倾向于批处理,MQ倾向于逐条处理。
定时任务的实现方式
定时任务实现方式有很多,早期没有定时任务框架的时候,我们会是用JDK中的Timer机制和多线程机制(Runnable+线程休眠)来实现定时或者隔一段时间执行某一段程序,后来有了定时任务框架,比如大名鼎鼎的Quartz任务调度框架,使用时间表达式(包括:秒、分、时、日、周、年),配置某一个任务什么时间去执行
Quartz回顾
- 引入Quartz的jar包
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency> - 创建一个job任务
public class DemoJob implements Job {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("定时任务执行逻辑");
}
} - 执行调度器
public class QuartzMain {
/**
* main中开启定时任务
* @param args
*/
public static void main(String[] args) throws SchedulerException {
// 1、创建任务调度器
Scheduler scheduler = createScheduler();
// 2、创建一个任务
JobDetail job = createJob();
// 3、创建任务的时间触发器
Trigger trigger = createTrigger();
// 4、使用任务调度器根据时间触发器执行我们的任务
scheduler.scheduleJob(job, trigger);
scheduler.start();
}
// 3、创建任务的时间触发器
public static Trigger createTrigger(){
// 创建时间触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","myTrigger" )
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * * ?")).build();
return cronTrigger;
}
//2、创建一个任务
public static JobDetail createJob(){
JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);
jobBuilder.withIdentity("jobName","myJob");
return jobBuilder.build();
}
//1、创建任务调度器
public static Scheduler createScheduler() throws SchedulerException {
SchedulerFactory factory = new StdSchedulerFactory();
return factory.getScheduler();
}
} - 点击运行就可以在控制台看到每隔两秒出现一次
定时任务执行逻辑
的字样
分布式调度框架Elastic-job
Elastic-job是当当网开源的一个分布式调度解决方案,基于Quartz二次开发的,由两个相互独立的子项目Elastic-job-Lite和Elastic-job-Cloud组成。我们要学习的是Elastic-job-lite,它定位为轻量级五中心化解决方案,使用jar包的形式提供分布式任务的协调服务,而Elastic-job-cloud子项目需要结合Mesos以及Docker在云环境下使用。
主要功能
- 分布式调度协调:
在分布式环境中,任务能够按照指定的调度策略执行,并且能够避免同一任务多实例重复执行 - 丰富的调度策略:
基于成熟的定时任务作业框架Quartz cron表达式执行定时任务 - 弹性扩容缩容:
当集群中增加某一个实例,它应当也能够被选举并执行任务,当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行 - 失效转移:
某实例在任务执行失败后,会被转移到其他实例执行 - 错过执行作业重触发:
因为某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发 - 支持并行调度:
支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行 - 作业分片一致性:
当任务被分片后,保证同一分片在分布式环境中仅有一个执行实例
Elastic-Job_Lite应用
Elastic-Job依赖于Zookeeper进行分布式协调,所有需要安装Zookeeper(3.4.6版本以上),关于Zookeeper在后续会有介绍,这里只需要明白Zookeeper的本质功能是:存储+通知
安装Zookeeper(单例配置)
- (下载)[https://zookeeper.apache.org/releases.html]解压到相应的目录下
- 在conf文件夹下,复制一份zoo_sample.cfg,改名为zoo.cfg,该文件为zookeeper的配置文件
- bin目录下,zkServer.cmd(windows下启动文件)/zkServer.sh(linux下启动文件),启动服务
引入jar包
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>创建一个任务,即一个job
/**
* Elastic-Job-Lite定时任务业务逻辑处理类
*/
public class ArchivieJob implements SimpleJob {
/**
* 业务逻辑,每次定时任务执行都会执行一次
* @param shardingContext
*/
public void execute(ShardingContext shardingContext) {
System.out.println("定时任务执行逻辑");
}
}需要进行一些配置
public class ElasticJobMain {
public static void main(String[] args) {
// 配置分布式协调服务 (zookeeper注册中心) namespace 可以自定义,同一命名空间下可以建立多个任务
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "default");
// 创建注册中心对象
CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
coordinatorRegistryCenter.init();
// 配置任务(时间事件、定时任务业务逻辑、调度器)
// shardingTotalCount 任务分片数,先置为1
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("job-one", "*/2 * * * * * ?", 1).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,ArchivieJob.class.getName());
// 创建调度器 overwrite 每次执行完任务配置覆盖
new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build())
.init();
}
}开启多个实例,进行测试,此时如果开两个实例,会随机选择一个实例执行任务,如果当运行实例的服务挂掉,另一个服务会紧接着继续执行。
Leader选举机制,每个Elastic-job的任务执行实例App作为Zookeeper欸点客户端来操作Zookeeper的znode
- 多个实例同时创建/leader节点
- /leader节点只能创建一个,后创建的会失败,创建成功的实例会选为leader节点,执行该任务
Elastic-Job-Lite轻量级去中心化的特点
- 去中心化:
- 执行节点对等(程序和jar一样,唯一不一样的可能是分片)
- 定时调度自触发(没有中心调度节点分配)
- 服务自发现(过注册中心的服务发现)
- 主节点非固定
- 轻量级:
- All in jar 必要依赖仅仅是一个zookeeper
- 非独立部署的中间件,就是jar程序
Elastic-job任务分片
一个大的非常耗时的作业job,比如:一次要处理一亿的数据,那这一亿的数据存储在数据库中,如果用一个作业节点处理一亿数据要很久,在互联网领域是不太能接收的,互联网领域更希望机器的增加去横向扩展处理能力。所以,Elastic-job可以把作业分为多个task(每一个task就是一个任务分片),每一个task交给具体的一个机器实例去处理(一个机器实例是可以处理多个task的),但是具体每一个task执行什么逻辑由我们自己来指定。
Strategy策略定义这些分片项怎么去分配到各个机器上去,默认是平均去分的,可以定制,比如某一个机器负载比较高或者预配置比较高,那么就可以写策略。分片和作业本身是通过一个注册中心协调的,因为在分布式环境下,状态数据肯定集中到一点,才可以在分布式中沟通。
分片使用方式
在配置中JobCoreConfiguration
配置任务分片数以及task参数
// shardingTotalCount 设置分片参数 |
然后在job中可以通过shardingContext
获取到分片参数
/** |
执行后,就可以看到每个进程获取到的分片数
Elastic-job弹性扩容
新增加一个运行实例,他会自动给注册到注册中心,注册中心发现新的服务上线,注册中心会通知ElasticJob进行重新分片,那么总得片项有多少,那么就可以搞多少个实例机器,比如完全可以分1000片,那么就可以搞1000台机器一起执行作业
注意:
- 分片项也是一个Job配置,修改配置,重新分片,在下一次定时运行之前会重新调用分片算法,那么这个分片算法的结果就是:哪台机器运行那一片,这个结果存储在zk中,主节点会把分片给分好放到注册中心去,然后执行节点从注册中心获取信息(执行节点在定时任务开启的时候获取相应的分片)
- 如果所有的节点挂掉剩下一个节点,那么所有分片都会指向剩余的一个节点,这也是ElasticJob的高可用。