Preface
本篇主要记录任务调度相关框架知识.
任务调度这个在日常开发中非常经典, 比如每天固定时刻同步用户信息、或者是动态的活动开始与结束时间, 亦或者每天早上8点发条短信鼓励一下自己今天努力填坑之类的. . .
Quartz
Quartz是一个功能丰富的开源作业调度库, 几乎可以集成在任何Java应用程序中 - 从最小的独立应用程序到最大的电子商务系统. Quartz可用于创建简单或复杂的计划, 以执行数十, 数百甚至数万个作业;将任务定义为标准Java组件的作业, 这些组件可以执行几乎任何可以编程的程序. Quartz Scheduler包含许多企业级功能, 例如支持JTA事务和集群.
主要成员
Scheduler
- 与调度器交互的主要API.Job
- 需要被调度器调度的任务必须实现的接口.JobDetail
- 用于定义任务的实例.Trigger
- 用于定义调度器何时调度任务执行的组件.JobBuilder
- 用于定义或创建JobDetail
的实例 .TriggerBuilder
- 用于定义或创建触发器实例.
构建流程
定义
ScheduleFactory
,Schedule
实例对象通过该工厂接口的实现类获取.定义
JobDetail
实例对象, 该对象需要指定名称、组和Job
接口的Class
信息.定义
Trigger
实例对象, 通过该对象设置触发任务的相关信息, 如起始时间、重复次数等.向
Schedule
中注册JobDetail
和Trigger
, 有两种方式:- 通过
Schedule
的schedule方法注册, 此时它自动让Trigger
和JobDetail
绑定. - 通过
addJob
和scheduleJob
方法注册, 此时需要手动设置Trigger
的关联的Job
组名和Job
名称, 让Trigger
和JobDetail
绑定.
- 通过
启动调度器(调用
Schedule
对象的start
方法).
运行模式
内部运行图:
与Spring集成
在Spring
中使用Quartz
有两种方式实现: MethodInvokingJobDetailFactoryBean
和QuartzJobBean
. 其中MethodInvokingJobDetailFactoryBean
不支持存储到数据库, 会报java.io.NotSerializableException
.
xml方式声明Job
MethodInvokingJobDetailFactoryBean
先来看一下MethodInvokingJobDetailFactoryBean
的方式(指定targetObject
与targetMethod
再通过反射调用):
1 | <!-- 使用MethodInvokingJobDetailFactoryBean, 任务类可以不实现Job接口, 通过targetMethod指定调用方法--> |
QuartzJobBean
一般很少会使用上述方式, 一般是使用QuartzJobBean
:
1 | <bean name="redisKeySpaceMetricReportJob" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> |
1 | public class RedisKeySpaceMetricReportScheduleJob extends QuartzJobBean { |
JobFactory
Quartz是通过JobFactory#newJob()
接口返回Job
实例的, 默认实现SimpleJobFactory
是通过jobClass.newInstance()
反射构建实例的.
在Spring中, 也是类似地通过反射构建Job
实例, 不同的是在此实例上做了扩展(注入Spring Bean).
AdaptableJobFactory
只是简单地通过反射构建Job
, SpringBeanJobFactory
继承AdaptableJobFactory
并重写createJobInstance
方法, 把jobDataMap
跟triggerDataMap
中的bean
注入到Job
实例当中:
1 |
|
其中isEligibleForPropertyPopulation()
:
1 | protected boolean isEligibleForPropertyPopulation(Object jobObject) { |
所以要获得注入bean
的支持, 有两步, 第一继承QuartzJobBean
, 在构建JobDetail
时在jobDataMap
中注入Spring Bean.
不过这种方法也有缺点, 理论上我们是不应该关注Job
中依赖了哪些Spring Bean, 这样耦合度太大. 所以在Spring Boot中已经优化掉了这一点.
Spring Boot自动配置
在Spring Boot 中通过QuartzAutoConfiguration
自动配置Quartz相关类并对SpringBeanJobFactory
进行了扩展:
1 | class AutowireCapableBeanJobFactory extends SpringBeanJobFactory { |
这样我们只需要在Job
实现类中用@Autowired
或@Resource
注解声明需要注入的Spring Bean即可.
Spring Boot提供SchedulerFactoryBeanCustomizer
定制SchedulerFactoryBean
, 比如换一个JobFactory
(从Spring IoC容器中获取无状态Job
):
1 |
|
1 |
|
Job的增删改
1 | /** |
信息类:
1 |
|
其他问题
Durability
当设置了JobDetail.setDurability(true)
, 当job
不再有trigger
引用它的时候, Quartz
也不要删除job
.
Misfire
由于某些原因(比如Worker线程池满了)导致任务没有及时执行, 此时扫描Misfire的线程就会把它们找出来并按照Misfire指令处理这个任务. 比如CronTrigger
的默认策略是CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW
,也可以自己指定:
1 | CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJobInfo.getCron()).withMisfireHandlingInstructionDoNothing(); |
maxBatchSize
一次拉取trigger的最大数量, 默认是1, 可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount
改写. 但是在集群环境下, 不建议设置为很大值. 如果值 > 1, 并且使用了 JDBC JobStore的话, org.quartz.jobStore.acquireTriggersWithinLock
属性必须设置为true
, 以避免”弄脏”数据.
更多参数配置: https://blog.csdn.net/zixiao217/article/details/53091812
性能问题
由于Quartz的集群是通过底层调度依赖数据库的悲观锁, 谁先抢到谁调度, 这样会导致节点负载不均衡, 并且影响性能.
Spring Scheduler
Spring Scheduler相对Quartz来说比较轻量级, 通过简单的配置就可以使用了, 但灵活度不如Quartz
开启配置
Xml方式
1 | <task:scheduler id="scheduler" pool-size="50"/> |
- 如果不设置
pool-size
, 默认是1, 会导致任务单线程执行.
Java配置方式
1 |
|
@EnableScheduling
表示告诉Spring开启Scheduler- 实现
SchedulingConfigurer
是为了配置线程池
使用
Xml方式
1 | <task:scheduled-tasks scheduler="myScheduler"> |
1 |
|
注解声明方式
使用@Scheduled
可以非常简单地就声明一个任务:
1 |
|
@Scheduled
有几个参数:
cron
: cron表达式, 指定任务在特定时间执行;fixedDelay
: 表示上一次任务执行完成后多久再次执行, 参数类型为long, 单位ms;fixedDelayString
: 与fixedDelay
含义一样, 只是参数类型变为String;fixedRate
: 表示按一定的频率执行任务, 参数类型为long, 单位ms;fixedRateString
: 与fixedRate
的含义一样, 只是将参数类型变为String;initialDelay
: 表示延迟多久再第一次执行任务, 参数类型为long, 单位ms;initialDelayString
: 与initialDelay
的含义一样, 只是将参数类型变为String;zone
: 时区, 默认为当前时区, 一般没有用到.
Cron表达式
想了解Cron最好的方法是看Quartz的官方文档. 本节也会大致介绍一下.
Cron表达式由6~7项组成, 中间用空格分开. 从左到右依次是: 秒、分、时、日、月、周几、年(可省略). 值可以是数字, 也可以是以下符号:*
: 所有值都匹配?
: 无所谓, 不关心, 通常放在“周几”里,
: 或者/
: 增量值-
: 区间
下面举几个例子, 看了就知道了:0 * * * * *
: 每分钟(当秒为0的时候)0 0 * * * *
: 每小时(当秒和分都为0的时候)*/10 * * * * *
: 每10秒0 5/15 * * * *
: 每小时的5分、20分、35分、50分0 0 9,13 * * *
: 每天的9点和13点0 0 8-10 * * *
: 每天的8点、9点、10点0 0/30 8-10 * * *
: 每天的8点、8点半、9点、9点半、10点0 0 9-17 * * MON-FRI
: 每周一到周五的9点、10点…直到17点(含)0 0 0 25 12 ?
: 每年12约25日圣诞节的0点0分0秒(午夜)0 30 10 * * ? 2016
: 2016年每天的10点半
其中的?
在用法上其实和*
是相同的. 但是*
语义上表示全匹配, 而?
并不代表全匹配, 而是不关心. 比如对于0 0 0 5 8 ? 2016
来说, 2016年8月5日是周五, ?
表示我不关心它是周几. 而0 0 0 5 8 * 2016
中的*
表示周一也行, 周二也行……语义上和2016年8月5日冲突了, 你说谁优先生效呢.
不记得也没关系, 记住Cron Maker也可以, 它可以在线生成cron表达式.
时间轮
时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
例子, 使用Netty中的时间轮实现:
1 | import io.netty.util.HashedWheelTimer; |
Elastic Job
官网: Elastic Job
Elastic Job 与 Sping Cloud 集成解决依赖冲突问题
由于Elastic Job自身的 curator-client
,curator-framework
,curator-recipes
与Spring Cloud组件中的curator-client
,curator-framework
,curator-recipes
有版本冲突,在启动过程会报如下错误:
1 | 2018-07-06 18:19:34.403 | epms | WARN | IP: | main | AnnotationConfigServletWebServerApplicationContext | 558 | refresh | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'reqAspect' defined in file [/home/ybd/data/git-repo/bitbucket/epms/epms-core/target/classes/com/yanglaoban/epms/core/aop/ReqAspect.class]: Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disruptorConfig': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'delayHandler' defined in file [/home/ybd/data/git-repo/bitbucket/epms/epms-core/target/classes/com/yanglaoban/epms/core/pubsub/disruptor/handler/DelayHandler.class]: Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'delayService' defined in file [/home/ybd/data/git-repo/bitbucket/epms/epms-core/target/classes/com/yanglaoban/epms/core/domain/service/DelayService.class]: Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'elasticJobService' defined in file [/home/ybd/data/git-repo/bitbucket/epms/epms-core/target/classes/com/yanglaoban/epms/core/elasticjob/ElasticJobService.class]: Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'regCenter' defined in class path resource [com/yanglaoban/epms/core/elasticjob/config/ElasticJobConfig.class]: Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/curator/connection/StandardConnectionHandlingPolicy |
解决方式是排除Elastic Job中curator
相关依赖,重新导入:
1 | <properties> |
Spting Boot 集成
Github: https://github.com/yinjihuan/elastic-job-spring-boot-starter
pom.xml:
1 | <dependency> |
还需要加上repository配置:
1 | <repository> |
yml配置:
1 | elastic: |
只需一个注解即可开启任务:
1 | "0 0 0 * * ?", failover = true, misfire = true, overwrite = true, (name = JobName, cron = |
eventTraceRdbDataSource = "dataSource"
是启用事件追踪, 但在最新版的Spring Boot 中并不会创建 JOB_EXECUTION_LOG
与 JOB_STATUS_TRACE_LOG
这两个记录表, 最好是手动创建, 下面是建表语句:
1 | CREATE TABLE `JOB_EXECUTION_LOG` ( |
运维平台
ElasticJob提供了一个运维平台拱查看任务执行详情.
需要clone ElasticJob源码并install, 会生成运维平台的压缩包, 解压后通过脚本可一键启动运维平台.
根据启动脚本的内容, 可做成Docker镜像, 只需将lib包中的jar包copy进去再按照脚本的启动方式配置entrypoint即可.
Dockerfile:
1 | FROM frolvlad/alpine-oraclejre8:slim |
docker-compose.yml:
1 | version: '3.7' |
auth.properties:
1 | root.username=admin |
Redis Keyspace Notifications
通过设置一个过期键, 并在过期的时候回调监听者实现延迟任务.
可参考: Spring监听Redis Keyspace Event
Redis的pub/sub机制存在一个硬伤,官网内容如下
原:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.就是说Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
RabbitMQ 延迟队列
这是一个不错的方案, 结合 rabbitmq_delayed_message_exchange
插件可以很优雅地做到延迟任务.
可参考: 延迟队列
基于 Redis Sorted Set 轮训的延迟任务
- 将关键数据以及执行时间戳分别作为
Sorted Set
的member
和score
添加到Sorted Set
中. - 通过周期任务使用
ZRANGEBYSCORE
命令读取指定数量的数据并删除Sorted Set
中对应的数据.
对于第二部需要使用 lua 保证原子性:
1 | local zset_key = KEYS[1] |
ZRANGEBYSCORE
的时间复杂度为 O(log(N)+M)
, 为了避免带来性能问题, 我们可以对key取模进行哈希处理.
其他
XXL Job
由个人开源的中心化分布式调度平台:
http://www.xuxueli.com/xxl-job/#/
Saturn
唯品会基于 Elastic Job 开发的分布式任务调度平台: