SpringBoot整合Quartz 实现分布式定时任务调度

爱被打了一巴掌 2023-09-30 22:22 92阅读 0赞

一、Quartz 集群架构
Quartz 是 Java 领域最著名的开源任务调度工具。

在上篇文章中,我们详细的介绍了 Quartz 的单体应用实践,如果只在单体环境中应用,Quartz 未必是最好的选择,例如Spring Scheduled一样也可以实现任务调度,并且与SpringBoot无缝集成,支持注解配置,非常简单,但是它有个缺点就是在集群环境下,会导致任务被重复调度!

而与之对应的 Quartz 提供了极为广用的特性,如任务持久化、集群部署和分布式调度任务等等,正因如此,基于 Quartz 任务调度功能在系统开发中应用极为广泛!

在集群环境下,Quartz 集群中的每个节点是一个独立的 Quartz 应用,没有负责集中管理的节点,而是通过数据库表来感知另一个应用,利用数据库锁的方式来实现集群环境下进行并发控制,每个任务当前运行的有效节点有且只有一个!

bd56cb604a5442408b6b300e137e6612.jpg

特别需要注意的是:分布式部署时需要保证各个节点的系统时间一致!

二、数据表初始化
数据库表结构官网已经提供,我们可以直接访问Quartz对应的官方网站,找到对应的版本,然后将其下载!

watermark_type_d3F5LXplbmhlaQ_shadow_50_text_Q1NETiBA5Zu-5Zu-5bCP5reY5rCUX3JlYWw_size_20_color_FFFFFF_t_70_g_se_x_16

我选择的是quartz-2.3.0-distribution.tar.gz,下载完成之后将其解压,在文件中搜索sql,在里面选择适合当前环境的数据库脚本文件,然后将其初始化到数据库中即可!

watermark_type_d3F5LXplbmhlaQ_shadow_50_text_Q1NETiBA5Zu-5Zu-5bCP5reY5rCUX3JlYWw_size_20_color_FFFFFF_t_70_g_se_x_16 1

例如,我使用的数据库是mysql-5.7,因此我选择的是tables_mysql_innodb.sql脚本,具体内容如下:

  1. DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
  2. DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
  3. DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
  4. DROP TABLE IF EXISTS QRTZ_LOCKS;
  5. DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
  6. DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
  7. DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
  8. DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
  9. DROP TABLE IF EXISTS QRTZ_TRIGGERS;
  10. DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
  11. DROP TABLE IF EXISTS QRTZ_CALENDARS;
  12. CREATE TABLE QRTZ_JOB_DETAILS(
  13. SCHED_NAME VARCHAR(120) NOT NULL,
  14. JOB_NAME VARCHAR(190) NOT NULL,
  15. JOB_GROUP VARCHAR(190) NOT NULL,
  16. DESCRIPTION VARCHAR(250) NULL,
  17. JOB_CLASS_NAME VARCHAR(250) NOT NULL,
  18. IS_DURABLE VARCHAR(1) NOT NULL,
  19. IS_NONCONCURRENT VARCHAR(1) NOT NULL,
  20. IS_UPDATE_DATA VARCHAR(1) NOT NULL,
  21. REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
  22. JOB_DATA BLOB NULL,
  23. PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
  24. ENGINE=InnoDB;
  25. CREATE TABLE QRTZ_TRIGGERS (
  26. SCHED_NAME VARCHAR(120) NOT NULL,
  27. TRIGGER_NAME VARCHAR(190) NOT NULL,
  28. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  29. JOB_NAME VARCHAR(190) NOT NULL,
  30. JOB_GROUP VARCHAR(190) NOT NULL,
  31. DESCRIPTION VARCHAR(250) NULL,
  32. NEXT_FIRE_TIME BIGINT(13) NULL,
  33. PREV_FIRE_TIME BIGINT(13) NULL,
  34. PRIORITY INTEGER NULL,
  35. TRIGGER_STATE VARCHAR(16) NOT NULL,
  36. TRIGGER_TYPE VARCHAR(8) NOT NULL,
  37. START_TIME BIGINT(13) NOT NULL,
  38. END_TIME BIGINT(13) NULL,
  39. CALENDAR_NAME VARCHAR(190) NULL,
  40. MISFIRE_INSTR SMALLINT(2) NULL,
  41. JOB_DATA BLOB NULL,
  42. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  43. FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
  44. REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
  45. ENGINE=InnoDB;
  46. CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
  47. SCHED_NAME VARCHAR(120) NOT NULL,
  48. TRIGGER_NAME VARCHAR(190) NOT NULL,
  49. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  50. REPEAT_COUNT BIGINT(7) NOT NULL,
  51. REPEAT_INTERVAL BIGINT(12) NOT NULL,
  52. TIMES_TRIGGERED BIGINT(10) NOT NULL,
  53. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  54. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  55. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
  56. ENGINE=InnoDB;
  57. CREATE TABLE QRTZ_CRON_TRIGGERS (
  58. SCHED_NAME VARCHAR(120) NOT NULL,
  59. TRIGGER_NAME VARCHAR(190) NOT NULL,
  60. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  61. CRON_EXPRESSION VARCHAR(120) NOT NULL,
  62. TIME_ZONE_ID VARCHAR(80),
  63. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  64. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  65. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
  66. ENGINE=InnoDB;
  67. CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  68. (
  69. SCHED_NAME VARCHAR(120) NOT NULL,
  70. TRIGGER_NAME VARCHAR(190) NOT NULL,
  71. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  72. STR_PROP_1 VARCHAR(512) NULL,
  73. STR_PROP_2 VARCHAR(512) NULL,
  74. STR_PROP_3 VARCHAR(512) NULL,
  75. INT_PROP_1 INT NULL,
  76. INT_PROP_2 INT NULL,
  77. LONG_PROP_1 BIGINT NULL,
  78. LONG_PROP_2 BIGINT NULL,
  79. DEC_PROP_1 NUMERIC(13,4) NULL,
  80. DEC_PROP_2 NUMERIC(13,4) NULL,
  81. BOOL_PROP_1 VARCHAR(1) NULL,
  82. BOOL_PROP_2 VARCHAR(1) NULL,
  83. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  84. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  85. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
  86. ENGINE=InnoDB;
  87. CREATE TABLE QRTZ_BLOB_TRIGGERS (
  88. SCHED_NAME VARCHAR(120) NOT NULL,
  89. TRIGGER_NAME VARCHAR(190) NOT NULL,
  90. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  91. BLOB_DATA BLOB NULL,
  92. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  93. INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
  94. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  95. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
  96. ENGINE=InnoDB;
  97. CREATE TABLE QRTZ_CALENDARS (
  98. SCHED_NAME VARCHAR(120) NOT NULL,
  99. CALENDAR_NAME VARCHAR(190) NOT NULL,
  100. CALENDAR BLOB NOT NULL,
  101. PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
  102. ENGINE=InnoDB;
  103. CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
  104. SCHED_NAME VARCHAR(120) NOT NULL,
  105. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  106. PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
  107. ENGINE=InnoDB;
  108. CREATE TABLE QRTZ_FIRED_TRIGGERS (
  109. SCHED_NAME VARCHAR(120) NOT NULL,
  110. ENTRY_ID VARCHAR(95) NOT NULL,
  111. TRIGGER_NAME VARCHAR(190) NOT NULL,
  112. TRIGGER_GROUP VARCHAR(190) NOT NULL,
  113. INSTANCE_NAME VARCHAR(190) NOT NULL,
  114. FIRED_TIME BIGINT(13) NOT NULL,
  115. SCHED_TIME BIGINT(13) NOT NULL,
  116. PRIORITY INTEGER NOT NULL,
  117. STATE VARCHAR(16) NOT NULL,
  118. JOB_NAME VARCHAR(190) NULL,
  119. JOB_GROUP VARCHAR(190) NULL,
  120. IS_NONCONCURRENT VARCHAR(1) NULL,
  121. REQUESTS_RECOVERY VARCHAR(1) NULL,
  122. PRIMARY KEY (SCHED_NAME,ENTRY_ID))
  123. ENGINE=InnoDB;
  124. CREATE TABLE QRTZ_SCHEDULER_STATE (
  125. SCHED_NAME VARCHAR(120) NOT NULL,
  126. INSTANCE_NAME VARCHAR(190) NOT NULL,
  127. LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
  128. CHECKIN_INTERVAL BIGINT(13) NOT NULL,
  129. PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
  130. ENGINE=InnoDB;
  131. CREATE TABLE QRTZ_LOCKS (
  132. SCHED_NAME VARCHAR(120) NOT NULL,
  133. LOCK_NAME VARCHAR(40) NOT NULL,
  134. PRIMARY KEY (SCHED_NAME,LOCK_NAME))
  135. ENGINE=InnoDB;
  136. CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
  137. CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
  138. CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
  139. CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
  140. CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
  141. CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
  142. CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
  143. CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
  144. CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
  145. CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
  146. CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
  147. CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
  148. CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
  149. CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
  150. CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
  151. CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
  152. CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
  153. CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
  154. CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
  155. CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
  156. commit;

watermark_type_d3F5LXplbmhlaQ_shadow_50_text_Q1NETiBA5Zu-5Zu-5bCP5reY5rCUX3JlYWw_size_20_color_FFFFFF_t_70_g_se_x_16 2

其中,QRTZ_LOCKS 就是 Quartz 集群实现同步机制的行锁表!

三、Quartz 集群实践

1、引入quartz依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-quartz</artifactId>
  4. </dependency>

2、创建 application.properties 配置文件,再配置文件中加入quartz配置

  1. #调度配置
  2. org:
  3. quartz:
  4. scheduler:
  5. #调度器实例名称
  6. instanceName: MaterialFromEamScheduler
  7. #调度器实例编号自动生成
  8. instanceId: AUTO
  9. #是否在Quartz执行一个job前使用UserTransaction
  10. wrapJobExecutionInUserTransaction: false
  11. #线程池配置
  12. threadPool:
  13. #线程池的实现类
  14. class: org.quartz.simpl.SimpleThreadPool
  15. #线程池中的线程数量
  16. threadCount: 10
  17. #线程优先级
  18. threadPriority: 5
  19. #配置是否启动自动加载数据库内的定时任务,默认true
  20. threadsInheritContextClassLoaderOfInitializingThread: true
  21. #持久化方式配置
  22. jobStore:
  23. #JobDataMaps是否都为String类型
  24. useProperties: true
  25. #数据表的前缀,默认QRTZ_
  26. tablePrefix: QRTZ_
  27. #最大能忍受的触发超时时间
  28. misfireThreshold: 60000
  29. #是否以集群方式运行
  30. isClustered: true
  31. #调度实例失效的检查时间间隔,单位毫秒
  32. clusterCheckinInterval: 2000
  33. maxMisfiresToHandleAtATime: 1
  34. #数据保存方式为数据库持久化
  35. class: org.quartz.impl.jdbcjobstore.JobStoreTX
  36. #数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
  37. driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
  38. # 是否开启悲观锁控制集群中trigger并发
  39. acquireTriggersWithinLock: true
  40. #数据库别名 随便取
  41. dataSource: qzDS
  42. #数据库连接池,将其设置为druid
  43. dataSource:
  44. qzDS:
  45. connectionProvider:
  46. class: com.mes.material.quartz.provider.DruidConnectionProvider
  47. #数据库引擎
  48. driver: org.postgresql.Driver
  49. #数据库连接
  50. URL: jdbc:postgresql://127.0.0.1:5432/mes_cloud_stag_yqs?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
  51. #数据库用户
  52. user: postgres
  53. #数据库密码
  54. password: postgres
  55. #允许最大连接
  56. maxConnection: 100

3、注册 Quartz 任务工厂

  1. import org.quartz.spi.TriggerFiredBundle;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
  4. import org.springframework.scheduling.quartz.AdaptableJobFactory;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class QuartzJobFactory extends AdaptableJobFactory {
  8. @Autowired
  9. private AutowireCapableBeanFactory capableBeanFactory;
  10. @Override
  11. protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
  12. //调用父类的方法
  13. Object jobInstance = super.createJobInstance(bundle);
  14. //进行注入
  15. capableBeanFactory.autowireBean(jobInstance);
  16. return jobInstance;
  17. }
  18. }

4、注册调度工厂

本文采用自定义数据源实现

  1. import com.mes.material.constants.MaterialConstants;
  2. import com.mes.material.quartz.factory.QuartzJobFactory;
  3. import com.mes.material.quartz.listener.SimpleJobListener;
  4. import com.mes.material.quartz.listener.SimpleSchedulerListener;
  5. import com.mes.material.quartz.listener.SimpleTriggerListener;
  6. import org.quartz.JobKey;
  7. import org.quartz.Scheduler;
  8. import org.quartz.SchedulerException;
  9. import org.quartz.impl.matchers.EverythingMatcher;
  10. import org.quartz.impl.matchers.KeyMatcher;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import org.springframework.scheduling.quartz.SchedulerFactoryBean;
  16. import javax.sql.DataSource;
  17. import java.io.IOException;
  18. import java.util.Properties;
  19. /** @Author:
  20. * @Description: 3.5、注册调度工厂
  21. * @Date: 15:42 2022/8/10
  22. * @Param
  23. * @return
  24. **/
  25. @Configuration
  26. public class QuartzConfig {
  27. @Autowired
  28. private QuartzJobFactory jobFactory;
  29. @Autowired
  30. private SimpleSchedulerListener simpleSchedulerListener;
  31. @Autowired
  32. private SimpleJobListener simpleJobListener;
  33. @Autowired
  34. private SimpleTriggerListener simpleTriggerListener;
  35. @Autowired
  36. private DataSource dataSource;
  37. @Value("${org.quartz.scheduler.instanceName}")
  38. private String instanceName;
  39. @Value("${org.quartz.scheduler.instanceId}")
  40. private String instanceId;
  41. @Value("${org.quartz.threadPool.class}")
  42. private String threadPoolClass;
  43. @Value("${org.quartz.threadPool.threadCount}")
  44. private String threadCount;
  45. @Value("${org.quartz.threadPool.threadPriority}")
  46. private String threadPriority;
  47. @Value("${org.quartz.jobStore.class}")
  48. private String jobStoreClass;
  49. @Value("${org.quartz.jobStore.isClustered}")
  50. private String isClustered;
  51. @Value("${org.quartz.jobStore.clusterCheckinInterval}")
  52. private String clusterCheckinInterval;
  53. @Value("${org.quartz.jobStore.maxMisfiresToHandleAtATime}")
  54. private String maxMisfiresToHandleAtATime;
  55. @Value("${org.quartz.jobStore.misfireThreshold}")
  56. private String misfireThreshold;
  57. @Value("${org.quartz.jobStore.tablePrefix}")
  58. private String tablePrefix;
  59. @Value("${org.quartz.jobStore.acquireTriggersWithinLock}")
  60. private String acquireTriggersWithinLock;
  61. @Value("${org.quartz.jobStore.driverDelegateClass}")
  62. private String driverDelegateClass;
  63. @Value("${org.quartz.jobStore.dataSource}")
  64. private String dataSourceName;
  65. @Value("${org.quartz.dataSource.qzDS.connectionProvider.class}")
  66. private String connectionProviderClass;
  67. @Value("${org.quartz.dataSource.qzDS.driver}")
  68. private String driver;
  69. @Value("${org.quartz.dataSource.qzDS.URL}")
  70. private String url;
  71. @Value("${org.quartz.dataSource.qzDS.user}")
  72. private String user;
  73. @Value("${org.quartz.dataSource.qzDS.password}")
  74. private String password;
  75. @Value("${org.quartz.dataSource.qzDS.maxConnection}")
  76. private String maxConnection;
  77. @Value("${org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread}")
  78. private String threadsInheritContextClassLoaderOfInitializingThread;
  79. @Bean
  80. public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
  81. //创建SchedulerFactoryBean
  82. SchedulerFactoryBean factory = new SchedulerFactoryBean();
  83. //quartz参数
  84. Properties prop = new Properties();
  85. //调度器实例名称
  86. prop.put("org.quartz.scheduler.instanceName", instanceName);
  87. //调度器实例编号自动生成
  88. prop.put("org.quartz.scheduler.instanceId", instanceId);
  89. //线程池配置--线程池的实现类
  90. prop.put("org.quartz.threadPool.class", threadPoolClass);
  91. //线程池中的线程数量
  92. prop.put("org.quartz.threadPool.threadCount", threadCount);
  93. //配置是否启动自动加载数据库内的定时任务,默认true
  94. prop.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", threadsInheritContextClassLoaderOfInitializingThread);
  95. //线程优先级
  96. prop.put("org.quartz.threadPool.threadPriority", threadPriority);
  97. //JobStore配置-数据保存方式为数据库持久化
  98. prop.put("org.quartz.jobStore.class", jobStoreClass);
  99. //集群配置--是否以集群方式运行
  100. prop.put("org.quartz.jobStore.isClustered", isClustered);
  101. //调度实例失效的检查时间间隔,单位毫秒
  102. prop.put("org.quartz.jobStore.clusterCheckinInterval", clusterCheckinInterval);
  103. prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", maxMisfiresToHandleAtATime);
  104. //最大能忍受的触发超时时间
  105. prop.put("org.quartz.jobStore.misfireThreshold", misfireThreshold);
  106. //数据表的前缀,默认QRTZ_
  107. prop.put("org.quartz.jobStore.tablePrefix", tablePrefix);
  108. //是否开启悲观锁控制集群中trigger并发
  109. prop.put("org.quartz.jobStore.acquireTriggersWithinLock", acquireTriggersWithinLock);
  110. //数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
  111. prop.put("org.quartz.jobStore.driverDelegateClass", driverDelegateClass);
  112. //数据库别名 随便取
  113. prop.put("org.quartz.jobStore.dataSource",dataSourceName);
  114. //数据库连接池,将其设置为druid
  115. prop.put("org.quartz.dataSource.qzDS.connectionProvider.class",connectionProviderClass);
  116. //数据库引擎
  117. prop.put("org.quartz.dataSource.qzDS.driver",driver);
  118. //数据库连接
  119. prop.put("org.quartz.dataSource.qzDS.URL",url);
  120. //数据库用户
  121. prop.put("org.quartz.dataSource.qzDS.user",user);
  122. //数据库密码
  123. prop.put("org.quartz.dataSource.qzDS.password",password);
  124. //允许最大连接
  125. prop.put("org.quartz.dataSource.qzDS.maxConnection",maxConnection);
  126. factory.setQuartzProperties(prop);
  127. //支持在JOB实例中注入其他的业务对象
  128. factory.setJobFactory(jobFactory);
  129. factory.setApplicationContextSchedulerContextKey("applicationContextKey");
  130. //这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
  131. factory.setWaitForJobsToCompleteOnShutdown(true);
  132. //是否覆盖己存在的Job
  133. factory.setOverwriteExistingJobs(false);
  134. //QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动
  135. factory.setStartupDelay(10);
  136. //使用数据源,自定义数据源(采用项目数据源)
  137. //在quartz.properties配置文件中,去掉org.quartz.jobStore.dataSource配置
  138. // factory.setDataSource(dataSource);
  139. return factory;
  140. }
  141. /**
  142. * 通过SchedulerFactoryBean获取Scheduler的实例
  143. * @return
  144. * @throws IOException
  145. * @throws SchedulerException
  146. */
  147. @Bean(name = "scheduler")
  148. public Scheduler scheduler() throws IOException, SchedulerException {
  149. Scheduler scheduler = schedulerFactoryBean().getScheduler();
  150. //全局添加监听器
  151. //添加SchedulerListener监听器
  152. scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener);
  153. // 添加JobListener, 支持带条件匹配监听器
  154. scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey(MaterialConstants.QURATZ_NAME,MaterialConstants.QURATZ_GROUP_NAME)));
  155. // 添加triggerListener,设置全局监听
  156. scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers());
  157. return scheduler;
  158. }
  159. }

5、重新设置 Quartz 数据连接池

  1. import com.alibaba.druid.pool.DruidDataSource;
  2. import org.quartz.SchedulerException;
  3. import org.quartz.utils.ConnectionProvider;
  4. import java.sql.Connection;
  5. import java.sql.SQLException;
  6. /** @Author:
  7. * @Description: 3.6、重新设置 Quartz 数据连接池
  8. * @Date: 15:40 2022/8/10
  9. * @Param
  10. * @return
  11. **/
  12. public class DruidConnectionProvider implements ConnectionProvider {
  13. /**
  14. * 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。
  15. * @return
  16. * @throws SQLException
  17. */
  18. /**JDBC驱动*/
  19. public String driver;
  20. /**JDBC连接串*/
  21. public String URL;
  22. /**数据库用户名*/
  23. public String user;
  24. /**数据库用户密码*/
  25. public String password;
  26. /**数据库最大连接数*/
  27. public int maxConnection;
  28. /**数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。*/
  29. public String validationQuery;
  30. private boolean validateOnCheckout;
  31. private int idleConnectionValidationSeconds;
  32. public String maxCachedStatementsPerConnection;
  33. private String discardIdleConnectionsSeconds;
  34. public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;
  35. public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;
  36. //Druid连接池
  37. private DruidDataSource datasource;
  38. @Override
  39. public Connection getConnection() throws SQLException {
  40. return datasource.getConnection();
  41. }
  42. @Override
  43. public void shutdown() throws SQLException {
  44. datasource.close();
  45. }
  46. @Override
  47. public void initialize() throws SQLException {
  48. if (this.URL == null) {
  49. throw new SQLException("DBPool could not be created: DB URL cannot be null");
  50. }
  51. if (this.driver == null) {
  52. throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
  53. }
  54. if (this.maxConnection < 0) {
  55. throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");
  56. }
  57. datasource = new DruidDataSource();
  58. try{
  59. datasource.setDriverClassName(this.driver);
  60. } catch (Exception e) {
  61. try {
  62. throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
  63. } catch (SchedulerException e1) {
  64. }
  65. }
  66. datasource.setUrl(this.URL);
  67. datasource.setUsername(this.user);
  68. datasource.setPassword(this.password);
  69. datasource.setMaxActive(this.maxConnection);
  70. datasource.setMinIdle(1);
  71. datasource.setMaxWait(0);
  72. datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);
  73. if (this.validationQuery != null) {
  74. datasource.setValidationQuery(this.validationQuery);
  75. if(!this.validateOnCheckout){
  76. datasource.setTestOnReturn(true);
  77. }else{
  78. datasource.setTestOnBorrow(true);
  79. }
  80. datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
  81. }
  82. }
  83. public String getDriver() {
  84. return driver;
  85. }
  86. public void setDriver(String driver) {
  87. this.driver = driver;
  88. }
  89. public String getURL() {
  90. return URL;
  91. }
  92. public void setURL(String URL) {
  93. this.URL = URL;
  94. }
  95. public String getUser() {
  96. return user;
  97. }
  98. public void setUser(String user) {
  99. this.user = user;
  100. }
  101. public String getPassword() {
  102. return password;
  103. }
  104. public void setPassword(String password) {
  105. this.password = password;
  106. }
  107. public int getMaxConnection() {
  108. return maxConnection;
  109. }
  110. public void setMaxConnection(int maxConnection) {
  111. this.maxConnection = maxConnection;
  112. }
  113. public String getValidationQuery() {
  114. return validationQuery;
  115. }
  116. public void setValidationQuery(String validationQuery) {
  117. this.validationQuery = validationQuery;
  118. }
  119. public boolean isValidateOnCheckout() {
  120. return validateOnCheckout;
  121. }
  122. public void setValidateOnCheckout(boolean validateOnCheckout) {
  123. this.validateOnCheckout = validateOnCheckout;
  124. }
  125. public int getIdleConnectionValidationSeconds() {
  126. return idleConnectionValidationSeconds;
  127. }
  128. public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {
  129. this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;
  130. }
  131. public DruidDataSource getDatasource() {
  132. return datasource;
  133. }
  134. public void setDatasource(DruidDataSource datasource) {
  135. this.datasource = datasource;
  136. }
  137. public String getDiscardIdleConnectionsSeconds() {
  138. return discardIdleConnectionsSeconds;
  139. }
  140. public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) {
  141. this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds;
  142. }
  143. }

创建完成之后,还需要在配置文件中设置一下即可!

  1. org:
  2. quartz:
  3. dataSource:
  4. qzDS:
  5. connectionProvider:
  6. class: com.mes.material.quartz.provider.DruidConnectionProvider

6、编写 Job 具体任务类

  1. package com.mes.material.quartz.tfjob;
  2. import com.mes.material.eam.domain.MaterialRequestVo;
  3. import com.mes.material.eam.enums.InterfaceTypeEnum;
  4. import com.mes.material.eam.service.StrategyService;
  5. import org.quartz.DisallowConcurrentExecution;
  6. import org.quartz.Job;
  7. import org.quartz.JobExecutionContext;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.beans.factory.annotation.Value;
  12. import org.springframework.data.redis.core.RedisTemplate;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15. * @Author:
  16. * @Description: 3.7、编写 Job 具体任务类
  17. * @Date Create in 15:43 2022/8/10
  18. * @Modified By:
  19. */
  20. //此标记用在实现Job的类上面,意思是不允许并发执行,按照我之前的理解是不允许调度框架在同一时刻调用Job类,后来经过测试发现并不是这样,
  21. // 而是Job(任务)的执行时间[比如需要10秒]大于任务的时间间隔[Interval(5秒)],那么默认情况下,调度框架为了能让
  22. //任务按照我们预定的时间间隔执行,会马上启用新的线程执行任务。否则的话会等待任务执行完毕以后再重新执行!(这样会导致任务的执行不是按照我们预先定义的时间间隔执行)
  23. //测试代码,这是官方提供的例子。设定的时间间隔为3秒,但job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,
  24. // 否则会在3秒时再启用新的线程执行
  25. @DisallowConcurrentExecution
  26. public class TfCommandJob implements Job {
  27. @Autowired
  28. private StrategyService strategyService;
  29. private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class);
  30. private static final String lock_key = "material_from_eam_info";
  31. private static final String lock_value = "material_from_eam_info_zkaw";
  32. @Value("${schedule.expire}")
  33. private long timeOut;
  34. @Autowired
  35. private RedisTemplate redisTemplate;
  36. @Override
  37. public void execute(JobExecutionContext context) {
  38. try {
  39. //分布式锁
  40. boolean lock = false;
  41. try {
  42. //如果返回true,说明key不存在,获取到锁
  43. lock = redisTemplate.opsForValue().setIfAbsent(lock_key, lock_value);
  44. log.info("是否获取到锁:" + lock);
  45. if (lock) {
  46. log.info("获取到锁,开启定时任务!");
  47. //设置过期时间
  48. redisTemplate.expire(lock_key, timeOut, TimeUnit.SECONDS);
  49. //调用EAM接口,同步物资数据
  50. log.info("调用EAM接口,同步物资数据");
  51. String type = InterfaceTypeEnum.autoMaterial.getCode();
  52. MaterialRequestVo materialRequestVo = new MaterialRequestVo();
  53. materialRequestVo.setType(type);
  54. strategyService.render(type).materialInfoMesToRoma(materialRequestVo);
  55. } else {
  56. log.info("其他系统正在执行此项任务");
  57. return;
  58. }
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. } catch (Throwable throwable) {
  62. throw new RuntimeException("分布式锁执行发生异常" + throwable.getMessage(), throwable);
  63. }
  64. } catch (Exception e) {
  65. log.error("任务执行失败",e);
  66. }
  67. }
  68. }

7、编写 Quartz 服务层接口

  1. package com.mes.material.quartz.service;
  2. import java.util.Map;
  3. /**
  4. * @Author:
  5. * @Description: 3.8、编写 Quartz 服务层接口
  6. * @Date Create in 14:14 2022/5/16
  7. * @Modified By:
  8. */
  9. public interface IQuartzJobService {
  10. /**
  11. * 添加任务可以传参数
  12. * @param clazzName
  13. * @param jobName
  14. * @param groupName
  15. * @param cronExp
  16. * @param param
  17. */
  18. void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param);
  19. /**
  20. * 暂停任务
  21. * @param jobName
  22. * @param groupName
  23. */
  24. void pauseJob(String jobName, String groupName);
  25. /**
  26. * 恢复任务
  27. * @param jobName
  28. * @param groupName
  29. */
  30. void resumeJob(String jobName, String groupName);
  31. /**
  32. * 立即运行一次定时任务
  33. * @param jobName
  34. * @param groupName
  35. */
  36. void runOnce(String jobName, String groupName);
  37. /**
  38. * 更新任务
  39. * @param jobName
  40. * @param groupName
  41. * @param cronExp
  42. * @param param
  43. */
  44. void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param);
  45. /**
  46. * 删除任务
  47. * @param jobName
  48. * @param groupName
  49. */
  50. void deleteJob(String jobName, String groupName);
  51. /**
  52. * 启动所有任务
  53. */
  54. void startAllJobs();
  55. /**
  56. * 暂停所有任务
  57. */
  58. void pauseAllJobs();
  59. /**
  60. * 恢复所有任务
  61. */
  62. void resumeAllJobs();
  63. /**
  64. * 关闭所有任务
  65. */
  66. void shutdownAllJobs();
  67. }

8、编写 Quartz 服务层接口实现类

  1. package com.mes.material.quartz.service.impl;
  2. import com.mes.material.quartz.service.IQuartzJobService;
  3. import org.quartz.*;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import java.util.Map;
  9. /**
  10. * @Author:
  11. * @Description:
  12. * @Date Create in 14:15 2022/5/16
  13. * @Modified By:
  14. */
  15. @Service
  16. public class IQuartzJobServiceImpl implements IQuartzJobService {
  17. private static final Logger log = LoggerFactory.getLogger(IQuartzJobServiceImpl.class);
  18. @Autowired
  19. private Scheduler scheduler;
  20. @Override
  21. public void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) {
  22. try {
  23. // 启动调度器,默认初始化的时候已经启动
  24. // scheduler.start();
  25. //构建job信息
  26. Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName);
  27. JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build();
  28. //表达式调度构建器(即任务执行的时间)
  29. CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
  30. //按新的cronExpression表达式构建一个新的trigger
  31. CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build();
  32. //获得JobDataMap,写入数据
  33. if (param != null) {
  34. trigger.getJobDataMap().putAll(param);
  35. }
  36. scheduler.scheduleJob(jobDetail, trigger);
  37. } catch (Exception e) {
  38. log.error("创建任务失败", e);
  39. }
  40. }
  41. @Override
  42. public void pauseJob(String jobName, String groupName) {
  43. try {
  44. scheduler.pauseJob(JobKey.jobKey(jobName, groupName));
  45. } catch (SchedulerException e) {
  46. log.error("暂停任务失败", e);
  47. }
  48. }
  49. @Override
  50. public void resumeJob(String jobName, String groupName) {
  51. try {
  52. scheduler.resumeJob(JobKey.jobKey(jobName, groupName));
  53. } catch (SchedulerException e) {
  54. log.error("恢复任务失败", e);
  55. }
  56. }
  57. @Override
  58. public void runOnce(String jobName, String groupName) {
  59. try {
  60. scheduler.triggerJob(JobKey.jobKey(jobName, groupName));
  61. } catch (SchedulerException e) {
  62. log.error("立即运行一次定时任务失败", e);
  63. }
  64. }
  65. @Override
  66. public void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) {
  67. try {
  68. TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName);
  69. CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
  70. if (cronExp != null) {
  71. // 表达式调度构建器
  72. CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
  73. // 按新的cronExpression表达式重新构建trigger
  74. trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
  75. }
  76. //修改map
  77. if (param != null) {
  78. trigger.getJobDataMap().putAll(param);
  79. }
  80. // 按新的trigger重新设置job执行
  81. scheduler.rescheduleJob(triggerKey, trigger);
  82. } catch (Exception e) {
  83. log.error("更新任务失败", e);
  84. }
  85. }
  86. @Override
  87. public void deleteJob(String jobName, String groupName) {
  88. try {
  89. //暂停、移除、删除
  90. scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName));
  91. scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName));
  92. scheduler.deleteJob(JobKey.jobKey(jobName, groupName));
  93. } catch (Exception e) {
  94. log.error("删除任务失败", e);
  95. }
  96. }
  97. @Override
  98. public void startAllJobs() {
  99. try {
  100. scheduler.start();
  101. } catch (Exception e) {
  102. log.error("开启所有的任务失败", e);
  103. }
  104. }
  105. @Override
  106. public void pauseAllJobs() {
  107. try {
  108. scheduler.pauseAll();
  109. } catch (Exception e) {
  110. log.error("暂停所有任务失败", e);
  111. }
  112. }
  113. @Override
  114. public void resumeAllJobs() {
  115. try {
  116. scheduler.resumeAll();
  117. } catch (Exception e) {
  118. log.error("恢复所有任务失败", e);
  119. }
  120. }
  121. @Override
  122. public void shutdownAllJobs() {
  123. try {
  124. if (!scheduler.isShutdown()) {
  125. // 需谨慎操作关闭scheduler容器
  126. // scheduler生命周期结束,无法再 start() 启动scheduler
  127. scheduler.shutdown(true);
  128. }
  129. } catch (Exception e) {
  130. log.error("关闭所有的任务失败", e);
  131. }
  132. }
  133. }

9、编写 contoller 服务

先编写一个调用实体类

  1. package com.mes.material.quartz.config;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. import java.util.Map;
  5. /**
  6. * @Author:
  7. * @Description:
  8. * @Date Create in 14:16 2022/5/16
  9. * @Modified By:
  10. */
  11. @Data
  12. public class QuartzConfigDTO implements Serializable {
  13. private static final long serialVersionUID = 1L;
  14. /**
  15. * 任务名称
  16. */
  17. private String jobName;
  18. /**
  19. * 任务所属组
  20. */
  21. private String groupName;
  22. /**
  23. * 任务执行类
  24. */
  25. private String jobClass;
  26. /**
  27. * 任务调度时间表达式
  28. */
  29. private String cronExpression;
  30. /**
  31. * 附加参数
  32. */
  33. private Map<String, Object> param;
  34. }

web服务接口

  1. @RestController
  2. @RequestMapping("/test")
  3. public class TestController {
  4. private static final Logger log = LoggerFactory.getLogger(TestController.class);
  5. @Autowired
  6. private QuartzJobService quartzJobService;
  7. /**
  8. * 添加新任务
  9. * @param configDTO
  10. * @return
  11. */
  12. @RequestMapping("/addJob")
  13. public Object addJob(@RequestBody QuartzConfigDTO configDTO) {
  14. quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
  15. return HttpStatus.OK;
  16. }
  17. /**
  18. * 暂停任务
  19. * @param configDTO
  20. * @return
  21. */
  22. @RequestMapping("/pauseJob")
  23. public Object pauseJob(@RequestBody QuartzConfigDTO configDTO) {
  24. quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName());
  25. return HttpStatus.OK;
  26. }
  27. /**
  28. * 恢复任务
  29. * @param configDTO
  30. * @return
  31. */
  32. @RequestMapping("/resumeJob")
  33. public Object resumeJob(@RequestBody QuartzConfigDTO configDTO) {
  34. quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName());
  35. return HttpStatus.OK;
  36. }
  37. /**
  38. * 立即运行一次定时任务
  39. * @param configDTO
  40. * @return
  41. */
  42. @RequestMapping("/runOnce")
  43. public Object runOnce(@RequestBody QuartzConfigDTO configDTO) {
  44. quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName());
  45. return HttpStatus.OK;
  46. }
  47. /**
  48. * 更新任务
  49. * @param configDTO
  50. * @return
  51. */
  52. @RequestMapping("/updateJob")
  53. public Object updateJob(@RequestBody QuartzConfigDTO configDTO) {
  54. quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
  55. return HttpStatus.OK;
  56. }
  57. /**
  58. * 删除任务
  59. * @param configDTO
  60. * @return
  61. */
  62. @RequestMapping("/deleteJob")
  63. public Object deleteJob(@RequestBody QuartzConfigDTO configDTO) {
  64. quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName());
  65. return HttpStatus.OK;
  66. }
  67. /**
  68. * 启动所有任务
  69. * @return
  70. */
  71. @RequestMapping("/startAllJobs")
  72. public Object startAllJobs() {
  73. quartzJobService.startAllJobs();
  74. return HttpStatus.OK;
  75. }
  76. /**
  77. * 暂停所有任务
  78. * @return
  79. */
  80. @RequestMapping("/pauseAllJobs")
  81. public Object pauseAllJobs() {
  82. quartzJobService.pauseAllJobs();
  83. return HttpStatus.OK;
  84. }
  85. /**
  86. * 恢复所有任务
  87. * @return
  88. */
  89. @RequestMapping("/resumeAllJobs")
  90. public Object resumeAllJobs() {
  91. quartzJobService.resumeAllJobs();
  92. return HttpStatus.OK;
  93. }
  94. /**
  95. * 关闭所有任务
  96. * @return
  97. */
  98. @RequestMapping("/shutdownAllJobs")
  99. public Object shutdownAllJobs() {
  100. quartzJobService.shutdownAllJobs();
  101. return HttpStatus.OK;
  102. }
  103. }

10、调用quartz

217a2642ac84463c98d9cb4f6675b9ea.png

常量值

e5d687a51df748e397b14ff55203536f.png

11、注册监听器(选用)

  • 创建任务调度监听器

    package com.mes.material.quartz.listener;

    import org.quartz.*;
    import org.quartz.listeners.SchedulerListenerSupport;
    import org.springframework.stereotype.Component;

    /**

    • @Author:
    • @Description: 注册监听器(选用)
    • @Date Create in 14:47 2022/5/16
    • @Modified By:
      */

    @Component
    public class SimpleSchedulerListener extends SchedulerListenerSupport {

    1. @Override
    2. public void jobScheduled(Trigger trigger) {
    3. System.out.println("任务被部署时被执行");
    4. }
    5. @Override
    6. public void jobUnscheduled(TriggerKey triggerKey) {
    7. System.out.println("任务被卸载时被执行");
    8. }
    9. @Override
    10. public void triggerFinalized(Trigger trigger) {
    11. System.out.println("任务完成了它的使命,光荣退休时被执行");
    12. }
    13. @Override
    14. public void triggerPaused(TriggerKey triggerKey) {
    15. System.out.println(triggerKey + "(一个触发器)被暂停时被执行");
    16. }
    17. @Override
    18. public void triggersPaused(String triggerGroup) {
    19. System.out.println(triggerGroup + "所在组的全部触发器被停止时被执行");
    20. }
    21. @Override
    22. public void triggerResumed(TriggerKey triggerKey) {
    23. System.out.println(triggerKey + "(一个触发器)被恢复时被执行");
    24. }
    25. @Override
    26. public void triggersResumed(String triggerGroup) {
    27. System.out.println(triggerGroup + "所在组的全部触发器被回复时被执行");
    28. }
    29. @Override
    30. public void jobAdded(JobDetail jobDetail) {
    31. System.out.println("一个JobDetail被动态添加进来");
    32. }
    33. @Override
    34. public void jobDeleted(JobKey jobKey) {
    35. System.out.println(jobKey + "被删除时被执行");
    36. }
    37. @Override
    38. public void jobPaused(JobKey jobKey) {
    39. System.out.println(jobKey + "被暂停时被执行");
    40. }
    41. @Override
    42. public void jobsPaused(String jobGroup) {
    43. System.out.println(jobGroup + "(一组任务)被暂停时被执行");
    44. }
    45. @Override
    46. public void jobResumed(JobKey jobKey) {
    47. System.out.println(jobKey + "被恢复时被执行");
    48. }
    49. @Override
    50. public void jobsResumed(String jobGroup) {
    51. System.out.println(jobGroup + "(一组任务)被恢复时被执行");
    52. }
    53. @Override
    54. public void schedulerError(String msg, SchedulerException cause) {
    55. System.out.println("出现异常" + msg + "时被执行");
    56. cause.printStackTrace();
    57. }
    58. @Override
    59. public void schedulerInStandbyMode() {
    60. System.out.println("scheduler被设为standBy等候模式时被执行");
    61. }
    62. @Override
    63. public void schedulerStarted() {
    64. System.out.println("scheduler启动时被执行");
    65. }
    66. @Override
    67. public void schedulerStarting() {
    68. System.out.println("scheduler正在启动时被执行");
    69. }
    70. @Override
    71. public void schedulerShutdown() {
    72. System.out.println("scheduler关闭时被执行");
    73. }
    74. @Override
    75. public void schedulerShuttingdown() {
    76. System.out.println("scheduler正在关闭时被执行");
    77. }
    78. @Override
    79. public void schedulingDataCleared() {
    80. System.out.println("scheduler中所有数据包括jobs, triggers和calendars都被清空时被执行");
    81. }

    }

  • 创建任务触发监听器

    package com.mes.material.quartz.listener;

    import org.quartz.JobExecutionContext;
    import org.quartz.Trigger;
    import org.quartz.listeners.TriggerListenerSupport;
    import org.springframework.stereotype.Component;

    /**

    • @Author:
    • @Description: 任务触发监听器
    • @Date Create in 14:48 2022/5/16
    • @Modified By:
      */
      @Component
      public class SimpleTriggerListener extends TriggerListenerSupport {

      /**

      • Trigger监听器的名称
      • @return
        */
        @Override
        public String getName() {
        return “mySimpleTriggerListener”;
        }

        /**

      • Trigger被激发 它关联的job即将被运行
      • @param trigger
      • @param context
        */
        @Override
        public void triggerFired(Trigger trigger, JobExecutionContext context) {
        System.out.println(“myTriggerListener.triggerFired()”);
        }

        /**

      • Trigger被激发 它关联的job即将被运行, TriggerListener 给了一个选择去否决 Job 的执行,如果返回TRUE 那么任务job会被终止
      • @param trigger
      • @param context
      • @return
        */
        @Override
        public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
        System.out.println(“myTriggerListener.vetoJobExecution()”);
        return false;
        }

        /**

      • 当Trigger错过被激发时执行,比如当前时间有很多触发器都需要执行,但是线程池中的有效线程都在工作,
      • 那么有的触发器就有可能超时,错过这一轮的触发。
      • @param trigger
        */
        @Override
        public void triggerMisfired(Trigger trigger) {
        System.out.println(“myTriggerListener.triggerMisfired()”);
        }

        /**

      • 任务完成时触发
      • @param trigger
      • @param context
      • @param triggerInstructionCode
        */
        @Override
        public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) {
        System.out.println(“myTriggerListener.triggerComplete()”);
        }
        }
  • 创建任务执行监听器

    package com.mes.material.quartz.listener;

    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.quartz.listeners.JobListenerSupport;
    import org.springframework.stereotype.Component;

    /**

    • @Author:
    • @Description: 任务执行监听器
    • @Date Create in 14:49 2022/5/16
    • @Modified By:
      */

    @Component
    public class SimpleJobListener extends JobListenerSupport {

  1. /**
  2. * job监听器名称
  3. * @return
  4. */
  5. @Override
  6. public String getName() {
  7. return "mySimpleJobListener";
  8. }
  9. /**
  10. * 任务被调度前
  11. * @param context
  12. */
  13. @Override
  14. public void jobToBeExecuted(JobExecutionContext context) {
  15. System.out.println("simpleJobListener监听器,准备执行:"+context.getJobDetail().getKey());
  16. }
  17. /**
  18. * 任务调度被拒了
  19. * @param context
  20. */
  21. @Override
  22. public void jobExecutionVetoed(JobExecutionContext context) {
  23. System.out.println("simpleJobListener监听器,取消执行:"+context.getJobDetail().getKey());
  24. }
  25. /**
  26. * 任务被调度后
  27. * @param context
  28. * @param jobException
  29. */
  30. @Override
  31. public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
  32. System.out.println("simpleJobListener监听器,执行结束:"+context.getJobDetail().getKey());
  33. }
  34. }
  • 最后,将监听器注册到Scheduler

    @Autowired
    private SimpleSchedulerListener simpleSchedulerListener;

    @Autowired
    private SimpleJobListener simpleJobListener;

    @Autowired
    private SimpleTriggerListener simpleTriggerListener;

    @Bean(name = “scheduler”)
    public Scheduler scheduler() throws IOException, SchedulerException {

    1. Scheduler scheduler = schedulerFactoryBean().getScheduler();
    2. //全局添加监听器
    3. //添加SchedulerListener监听器
    4. scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener);
    5. // 添加JobListener, 支持带条件匹配监听器
    6. scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey("myJob", "myGroup")));
    7. // 添加triggerListener,设置全局监听
    8. scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers());
    9. return scheduler;

    }

本文主要围绕springboot + quartz + postgresql实现持久化分布式调度进行介绍,所有的代码功能,都测试过。

发表评论

表情:
评论列表 (有 0 条评论,92人围观)

还没有评论,来说两句吧...

相关阅读