如何让Java Quartz Scheduler中Job的部分代码仅执行一次
我目前在Java Servlet环境中使用Quartz调度器,已经在web.xml里完成了初始化配置,同时设置了两个独立的Job和对应的触发器,配置文件用的是resources目录下的quartz.properties。现在遇到的问题是:NewLineJob里的Kafka消费者初始化代码每次Job执行都会跑一遍,我希望这段初始化逻辑只执行一次。下面是我的相关配置和代码:
web.xml 配置
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd" > <web-app> <display-name>Archetype Created Web Application</display-name> <context-param> <param-name>quartz:config-file</param-name> <param-value>quartz.properties</param-value> </context-param> <context-param> <param-name>quartz:shutdown-on-unload</param-name> <param-value>true</param-value> </context-param> <context-param> <param-name>quartz:wait-on-shutdown</param-name> <param-value>true</param-value> </context-param> <context-param> <param-name>quartz:start-on-load</param-name> <param-value>true</param-value> </context-param> <listener> <listener-class> org.quartz.ee.servlet.QuartzInitializerListener </listener-class> </listener> <listener> <listener-class>com.hpe.statistics.StatisticsQuartzListener</listener-class> </listener> <servlet> <servlet-name>StatisticsIvr</servlet-name> <display-name>StatisticsIvr</display-name> <description></description> <servlet-class>StatisticsIvr</servlet-class> </servlet> <servlet-mapping> <servlet-name>StatisticsIvr</servlet-name> <url-pattern>/StatisticsIvr</url-pattern> </servlet-mapping> </web-app>
StatisticsQuartzListener 类代码
public void contextInitialized(ServletContextEvent ctx) { JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class) .withIdentity("fileJob", "group1").build(); JobDetail newlinejob = JobBuilder.newJob(NewFileJob.class) .withIdentity("lineJob", "group2").build(); Trigger filetrigger = TriggerBuilder .newTrigger() .withIdentity("fileTrigger", "group1") .startNow() .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(6).repeatForever()) .build(); Trigger linetrigger = TriggerBuilder .newTrigger() .withIdentity("lineTrigger", "group2") .startNow() .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(2).repeatForever()) .build(); try { scheduler1 = ((StdSchedulerFactory) ctx.getServletContext() .getAttribute( QuartzInitializerListener.QUARTZ_FACTORY_KEY)) .getScheduler(); scheduler1.scheduleJob(newfilejob, filetrigger); } catch (SchedulerException e) { } try { scheduler2 = ((StdSchedulerFactory) ctx.getServletContext() .getAttribute( QuartzInitializerListener.QUARTZ_FACTORY_KEY)) .getScheduler(); scheduler2.scheduleJob(newlinejob, linetrigger); } catch (SchedulerException e) { } }
quartz.properties 配置
# Main Quartz configuration org.quartz.scheduler.skipUpdateCheck = true org.quartz.scheduler.instanceName = StatisticsQuartzListener org.quartz.scheduler.jobFactory.class = org.quartz.simpl.SimpleJobFactory org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 5
NewLineJob 类代码(片段)
public class NewLineJob implements Job{ public static final String KAFKAHOST="kafkahost"; public static final String KAFKAPORT="kafkaport"; public static final String KAFKATOPICNAME="topicname"; KafkaConsumer<String, String> consumer; HashMap<String,Long> consumerMap=new HashMap<String,Long>(); File lastModifiedFile; String flag="off"; public void execute(JobExecutionContext context) throws JobExecutionException { String kafkahost=dataMap.getString(KAFKAHOST); String kafkaport=dataMap.getString(KAFKAPORT); String topicname=dataMap.getString(KAFKATOPICNAME); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd-HH-mm"); Date d = new Date(); String timeStamp = sf.format(d); int i=0; String writeString; if(flag.equals("off")) { flag="on"; Properties propsConsumer = new Properties(); propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport); propsConsumer.put("group.id", "test"); propsConsumer.put("enable.auto.commit", "true"); propsConsumer.put("auto.commit.interval.ms", "1000"); propsConsumer.put("session.timeout.ms", "30000"); propsConsumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); propsConsumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(propsConsumer); // 注:代码片段未完整展示 } } }
解决办法:让Kafka消费者仅初始化一次
Quartz的Job实例可能会被多次创建(取决于Job的并发配置),或者每次执行时都会重新初始化非静态变量,所以我们需要把Kafka消费者的初始化逻辑放到只会执行一次的地方,这里提供两种可靠的方案:
方案1:使用静态初始化块
把Kafka消费者的初始化放到静态代码块中,静态块会在类第一次加载时仅执行一次,后续无论创建多少Job实例,都不会重复初始化:
public class NewLineJob implements Job{ public static final String KAFKAHOST="kafkahost"; public static final String KAFKAPORT="kafkaport"; public static final String KAFKATOPICNAME="topicname"; // 静态变量,类加载时初始化 private static KafkaConsumer<String, String> consumer; private static HashMap<String,Long> consumerMap = new HashMap<>(); // 静态初始化块,仅执行一次 static { // 这里可以从配置文件或Servlet上下文获取参数,建议不要硬编码 ServletContext servletContext = null; try { // 如果需要从ServletContext拿配置,可以通过Quartz上下文获取 StdSchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); servletContext = (ServletContext) scheduler.getContext().get("org.quartz.servletContext"); } catch (SchedulerException e) { e.printStackTrace(); } String kafkahost = servletContext != null ? servletContext.getInitParameter(KAFKAHOST) : "默认主机"; String kafkaport = servletContext != null ? servletContext.getInitParameter(KAFKAPORT) : "默认端口"; String topicname = servletContext != null ? servletContext.getInitParameter(KAFKATOPICNAME) : "默认Topic"; Properties propsConsumer = new Properties(); propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport); propsConsumer.put("group.id", "test"); propsConsumer.put("enable.auto.commit", "true"); propsConsumer.put("auto.commit.interval.ms", "1000"); propsConsumer.put("session.timeout.ms", "30000"); propsConsumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); propsConsumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(propsConsumer); consumer.subscribe(Collections.singletonList(topicname)); } @Override public void execute(JobExecutionContext context) throws JobExecutionException { // 直接使用已经初始化好的consumer处理业务 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // 处理消息逻辑 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
方案2:在Servlet监听器中初始化并存储到上下文
既然Quartz是通过Servlet监听器启动的,我们可以在StatisticsQuartzListener的初始化方法中创建Kafka消费者,然后放到ServletContext中,Job执行时直接从上下文获取:
首先修改StatisticsQuartzListener:
public class StatisticsQuartzListener implements ServletContextListener { @Override public void contextInitialized(ServletContextEvent ctx) { // 1. 初始化Kafka消费者并放入ServletContext ServletContext servletContext = ctx.getServletContext(); String kafkahost = servletContext.getInitParameter("kafkahost"); String kafkaport = servletContext.getInitParameter("kafkaport"); String topicname = servletContext.getInitParameter("topicname"); Properties propsConsumer = new Properties(); propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport); propsConsumer.put("group.id", "test"); propsConsumer.put("enable.auto.commit", "true"); propsConsumer.put("auto.commit.interval.ms", "1000"); propsConsumer.put("session.timeout.ms", "30000"); propsConsumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); propsConsumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(propsConsumer); consumer.subscribe(Collections.singletonList(topicname)); servletContext.setAttribute("kafkaNewLineConsumer", consumer); // 2. 原有Quartz Job和Trigger配置 JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class) .withIdentity("fileJob", "group1").build(); JobDetail newlinejob = JobBuilder.newJob(NewLineJob.class) .withIdentity("lineJob", "group2").build(); Trigger filetrigger = TriggerBuilder.newTrigger() .withIdentity("fileTrigger", "group1") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMinutes(6).repeatForever()) .build(); Trigger linetrigger = TriggerBuilder.newTrigger() .withIdentity("lineTrigger", "group2") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMinutes(2).repeatForever()) .build(); try { Scheduler scheduler = ((StdSchedulerFactory) servletContext.getAttribute(QuartzInitializerListener.QUARTZ_FACTORY_KEY)).getScheduler(); scheduler.scheduleJob(newfilejob, filetrigger); scheduler.scheduleJob(newlinejob, linetrigger); } catch (SchedulerException e) { e.printStackTrace(); } } @Override public void contextDestroyed(ServletContextEvent ctx) { // 应用关闭时关闭Kafka消费者 KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) ctx.getServletContext().getAttribute("kafkaNewLineConsumer"); if (consumer != null) { consumer.close(); } } }
然后修改NewLineJob:
public class NewLineJob implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException { // 从Quartz上下文获取ServletContext ServletContext servletContext = (ServletContext) context.getScheduler().getContext().get("org.quartz.servletContext"); KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) servletContext.getAttribute("kafkaNewLineConsumer"); // 使用消费者处理业务 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 消息处理逻辑... } }
额外注意点
- Kafka消费者线程安全问题:KafkaConsumer本身不是线程安全的,如果你的Job允许并发执行(默认是允许的),多个Job线程同时操作同一个消费者会出问题。这种情况下,要么给Job加上
@DisallowConcurrentExecution注解禁止并发,要么考虑为每个线程创建独立的消费者(但这样就无法做到“仅初始化一次”,需要权衡)。 - 资源释放:在Web应用关闭时,一定要记得关闭Kafka消费者,避免资源泄漏,方案2中已经在
contextDestroyed方法中处理了这个问题。 - 配置参数来源:建议把Kafka的配置参数放到
web.xml的context-param中,或者单独的配置文件里,不要硬编码到代码里,方便后续修改。
内容的提问来源于stack exchange,提问作者Lakshmi




