You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何让Java Quartz Scheduler中Job的部分代码仅执行一次

我目前在Java Servlet环境中使用Quartz调度器,已经在web.xml里完成了初始化配置,同时设置了两个独立的Job和对应的触发器,配置文件用的是resources目录下的quartz.properties。现在遇到的问题是:NewLineJob里的Kafka消费者初始化代码每次Job执行都会跑一遍,我希望这段初始化逻辑只执行一次。下面是我的相关配置和代码:

web.xml 配置

<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd" > 
<web-app> 
    <display-name>Archetype Created Web Application</display-name> 
    <context-param> 
        <param-name>quartz:config-file</param-name> 
        <param-value>quartz.properties</param-value> 
    </context-param> 
    <context-param> 
        <param-name>quartz:shutdown-on-unload</param-name> 
        <param-value>true</param-value> 
    </context-param> 
    <context-param> 
        <param-name>quartz:wait-on-shutdown</param-name> 
        <param-value>true</param-value> 
    </context-param> 
    <context-param> 
        <param-name>quartz:start-on-load</param-name> 
        <param-value>true</param-value> 
    </context-param> 
    <listener> 
        <listener-class> org.quartz.ee.servlet.QuartzInitializerListener </listener-class> 
    </listener> 
    <listener> 
        <listener-class>com.hpe.statistics.StatisticsQuartzListener</listener-class> 
    </listener> 
    <servlet> 
        <servlet-name>StatisticsIvr</servlet-name> 
        <display-name>StatisticsIvr</display-name> 
        <description></description> 
        <servlet-class>StatisticsIvr</servlet-class> 
    </servlet> 
    <servlet-mapping> 
        <servlet-name>StatisticsIvr</servlet-name> 
        <url-pattern>/StatisticsIvr</url-pattern> 
    </servlet-mapping> 
</web-app>

StatisticsQuartzListener 类代码

public void contextInitialized(ServletContextEvent ctx) { 
    JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class) 
            .withIdentity("fileJob", "group1").build(); 
    JobDetail newlinejob = JobBuilder.newJob(NewFileJob.class) 
            .withIdentity("lineJob", "group2").build(); 
    Trigger filetrigger = TriggerBuilder 
            .newTrigger() 
            .withIdentity("fileTrigger", "group1") 
            .startNow() 
            .withSchedule( SimpleScheduleBuilder.simpleSchedule() 
                    .withIntervalInMinutes(6).repeatForever()) 
            .build(); 
    Trigger linetrigger = TriggerBuilder 
            .newTrigger() 
            .withIdentity("lineTrigger", "group2") 
            .startNow() 
            .withSchedule( SimpleScheduleBuilder.simpleSchedule() 
                    .withIntervalInMinutes(2).repeatForever()) 
            .build(); 
    try { 
        scheduler1 = ((StdSchedulerFactory) ctx.getServletContext() 
                .getAttribute( QuartzInitializerListener.QUARTZ_FACTORY_KEY)) 
                .getScheduler(); 
        scheduler1.scheduleJob(newfilejob, filetrigger); 
    } catch (SchedulerException e) { } 
    try { 
        scheduler2 = ((StdSchedulerFactory) ctx.getServletContext() 
                .getAttribute( QuartzInitializerListener.QUARTZ_FACTORY_KEY)) 
                .getScheduler(); 
        scheduler2.scheduleJob(newlinejob, linetrigger); 
    } catch (SchedulerException e) { } 
}

quartz.properties 配置

# Main Quartz configuration 
org.quartz.scheduler.skipUpdateCheck = true 
org.quartz.scheduler.instanceName = StatisticsQuartzListener 
org.quartz.scheduler.jobFactory.class = org.quartz.simpl.SimpleJobFactory 
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool 
org.quartz.threadPool.threadCount = 5

NewLineJob 类代码(片段)

public class NewLineJob implements Job{ 
    public static final String KAFKAHOST="kafkahost"; 
    public static final String KAFKAPORT="kafkaport"; 
    public static final String KAFKATOPICNAME="topicname"; 
    KafkaConsumer<String, String> consumer; 
    HashMap<String,Long> consumerMap=new HashMap<String,Long>(); 
    File lastModifiedFile; 
    String flag="off"; 

    public void execute(JobExecutionContext context) throws JobExecutionException { 
        String kafkahost=dataMap.getString(KAFKAHOST); 
        String kafkaport=dataMap.getString(KAFKAPORT); 
        String topicname=dataMap.getString(KAFKATOPICNAME); 
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd-HH-mm"); 
        Date d = new Date(); 
        String timeStamp = sf.format(d); 
        int i=0; 
        String writeString; 

        if(flag.equals("off")) { 
            flag="on"; 
            Properties propsConsumer = new Properties(); 
            propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport); 
            propsConsumer.put("group.id", "test"); 
            propsConsumer.put("enable.auto.commit", "true"); 
            propsConsumer.put("auto.commit.interval.ms", "1000"); 
            propsConsumer.put("session.timeout.ms", "30000"); 
            propsConsumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
            propsConsumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
            consumer = new KafkaConsumer<String, String>(propsConsumer);
            // 注:代码片段未完整展示
        }
    }
}

解决办法:让Kafka消费者仅初始化一次

Quartz的Job实例可能会被多次创建(取决于Job的并发配置),或者每次执行时都会重新初始化非静态变量,所以我们需要把Kafka消费者的初始化逻辑放到只会执行一次的地方,这里提供两种可靠的方案:

方案1:使用静态初始化块

把Kafka消费者的初始化放到静态代码块中,静态块会在类第一次加载时仅执行一次,后续无论创建多少Job实例,都不会重复初始化:

public class NewLineJob implements Job{
    public static final String KAFKAHOST="kafkahost";
    public static final String KAFKAPORT="kafkaport";
    public static final String KAFKATOPICNAME="topicname";
    
    // 静态变量,类加载时初始化
    private static KafkaConsumer<String, String> consumer;
    private static HashMap<String,Long> consumerMap = new HashMap<>();

    // 静态初始化块,仅执行一次
    static {
        // 这里可以从配置文件或Servlet上下文获取参数,建议不要硬编码
        ServletContext servletContext = null;
        try {
            // 如果需要从ServletContext拿配置,可以通过Quartz上下文获取
            StdSchedulerFactory factory = new StdSchedulerFactory();
            Scheduler scheduler = factory.getScheduler();
            servletContext = (ServletContext) scheduler.getContext().get("org.quartz.servletContext");
        } catch (SchedulerException e) {
            e.printStackTrace();
        }

        String kafkahost = servletContext != null ? servletContext.getInitParameter(KAFKAHOST) : "默认主机";
        String kafkaport = servletContext != null ? servletContext.getInitParameter(KAFKAPORT) : "默认端口";
        String topicname = servletContext != null ? servletContext.getInitParameter(KAFKATOPICNAME) : "默认Topic";
        
        Properties propsConsumer = new Properties();
        propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
        propsConsumer.put("group.id", "test");
        propsConsumer.put("enable.auto.commit", "true");
        propsConsumer.put("auto.commit.interval.ms", "1000");
        propsConsumer.put("session.timeout.ms", "30000");
        propsConsumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        propsConsumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        consumer = new KafkaConsumer<>(propsConsumer);
        consumer.subscribe(Collections.singletonList(topicname));
    }

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 直接使用已经初始化好的consumer处理业务
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息逻辑
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

方案2:在Servlet监听器中初始化并存储到上下文

既然Quartz是通过Servlet监听器启动的,我们可以在StatisticsQuartzListener的初始化方法中创建Kafka消费者,然后放到ServletContext中,Job执行时直接从上下文获取:

首先修改StatisticsQuartzListener

public class StatisticsQuartzListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent ctx) {
        // 1. 初始化Kafka消费者并放入ServletContext
        ServletContext servletContext = ctx.getServletContext();
        String kafkahost = servletContext.getInitParameter("kafkahost");
        String kafkaport = servletContext.getInitParameter("kafkaport");
        String topicname = servletContext.getInitParameter("topicname");
        
        Properties propsConsumer = new Properties();
        propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
        propsConsumer.put("group.id", "test");
        propsConsumer.put("enable.auto.commit", "true");
        propsConsumer.put("auto.commit.interval.ms", "1000");
        propsConsumer.put("session.timeout.ms", "30000");
        propsConsumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        propsConsumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(propsConsumer);
        consumer.subscribe(Collections.singletonList(topicname));
        servletContext.setAttribute("kafkaNewLineConsumer", consumer);

        // 2. 原有Quartz Job和Trigger配置
        JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class)
                .withIdentity("fileJob", "group1").build();
        JobDetail newlinejob = JobBuilder.newJob(NewLineJob.class)
                .withIdentity("lineJob", "group2").build();
        
        Trigger filetrigger = TriggerBuilder.newTrigger()
                .withIdentity("fileTrigger", "group1")
                .startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMinutes(6).repeatForever())
                .build();
        Trigger linetrigger = TriggerBuilder.newTrigger()
                .withIdentity("lineTrigger", "group2")
                .startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMinutes(2).repeatForever())
                .build();

        try {
            Scheduler scheduler = ((StdSchedulerFactory) servletContext.getAttribute(QuartzInitializerListener.QUARTZ_FACTORY_KEY)).getScheduler();
            scheduler.scheduleJob(newfilejob, filetrigger);
            scheduler.scheduleJob(newlinejob, linetrigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent ctx) {
        // 应用关闭时关闭Kafka消费者
        KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) ctx.getServletContext().getAttribute("kafkaNewLineConsumer");
        if (consumer != null) {
            consumer.close();
        }
    }
}

然后修改NewLineJob

public class NewLineJob implements Job{
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 从Quartz上下文获取ServletContext
        ServletContext servletContext = (ServletContext) context.getScheduler().getContext().get("org.quartz.servletContext");
        KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) servletContext.getAttribute("kafkaNewLineConsumer");
        
        // 使用消费者处理业务
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        // 消息处理逻辑...
    }
}

额外注意点

  • Kafka消费者线程安全问题:KafkaConsumer本身不是线程安全的,如果你的Job允许并发执行(默认是允许的),多个Job线程同时操作同一个消费者会出问题。这种情况下,要么给Job加上@DisallowConcurrentExecution注解禁止并发,要么考虑为每个线程创建独立的消费者(但这样就无法做到“仅初始化一次”,需要权衡)。
  • 资源释放:在Web应用关闭时,一定要记得关闭Kafka消费者,避免资源泄漏,方案2中已经在contextDestroyed方法中处理了这个问题。
  • 配置参数来源:建议把Kafka的配置参数放到web.xmlcontext-param中,或者单独的配置文件里,不要硬编码到代码里,方便后续修改。

内容的提问来源于stack exchange,提问作者Lakshmi

火山引擎 最新活动