复古 护眼 海天 深邃 暗黑 默认

ElasticJob 提供自定义的 Spring Boot Starter,可以与 Spring Boot 配合使用。 基于 ElasticJob Spring Boot Starter 使用 ElasticJob ,用户无需手动创建 CoordinatorRegistryCenter、JobBootstrap 等实例, 只需实现核心作业逻辑并辅以少量配置,即可利用轻量、无中心化的 ElasticJob 解决分布式调度问题。

以下内容仅通过 Spring Boot 3 作为演示。 相关内容在 Spring Boot 2 上仍可能有效,但由于 Spring Boot 2 已经结束维护,不对 Spring Boot 2 做任何可用性假设。

作业配置

实现作业逻辑

作业逻辑实现与 ElasticJob 的其他使用方式并没有较大的区别,只需将当前作业注册为 Spring 容器中的 bean。

线程安全问题

Bean 默认是单例的,如果该作业实现会在同一个进程内被创建出多个 JobBootstrap 的实例, 可以考虑设置 Scope 为 prototype

@Component
public class SpringBootDataflowJob implements DataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(final ShardingContext shardingContext) {
        // 获取数据
    }
    
    @Override
    public void processData(final ShardingContext shardingContext, final List<Foo> data) {
        // 处理数据
    }
}

配置协调服务与作业

在配置文件中指定 ElasticJob 所使用的 Zookeeper。配置前缀为 elasticjob.reg-center

elasticjob.jobs 是一个 Map,key 为作业名称,value 为作业类型与配置。 Starter 会根据该配置自动创建 OneOffJobBootstrapScheduleJobBootstrap 的实例并注册到 Spring 容器中。

配置参考:

elasticjob:
  regCenter:
    serverLists: localhost:6181
    namespace: elasticjob-springboot
  jobs:
    dataflowJob:
      elasticJobClass: org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
    scriptJob:
      elasticJobType: SCRIPT
      cron: 0/10 * * * * ?
      shardingTotalCount: 3
      props:
        script.command.line: "echo SCRIPT Job: "

作业启动

定时调度

定时调度作业在 Spring Boot 应用程序启动完成后会自动启动,无需其他额外操作。

一次性调度

一次性调度的作业的执行权在开发者手中,开发者可以在需要调用作业的位置注入 OneOffJobBootstrap, 通过 execute() 方法执行作业。

用户不应该使用 jakarta.annotation.Resource 等部分违反 Spring Boot 最佳实践的注解来注入定义的一次性任务的 Spring Bean。

OneOffJobBootstrap bean 的名称通过属性 jobBootstrapBeanName 配置,注入时需要指定依赖的 bean 名称。 具体配置请参考配置文档

elasticjob:
  jobs:
    myOneOffJob:
      elasticJobType: SCRIPT
      jobBootstrapBeanName: myOneOffJobBean
      shardingTotalCount: 9
      props:
        script.command.line: "echo Manual SCRIPT Job: "
import org.apache.shardingsphere.elasticjob.bootstrap.type.OneOffJobBootstrap;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Objects;

@RestController
public class OneOffJobController {
    // 通过 "@Autowired" 注入
    @Autowired
    @Qualifier("myOneOffJobBean")
    private ObjectProvider<OneOffJobBootstrap> myOneOffJobProvider;

    @GetMapping("/execute2")
    public String executeOneOffJob2() {
        OneOffJobBootstrap myOneOffJob = myOneOffJobProvider.getIfAvailable();
        Objects.requireNonNull(myOneOffJob);
        myOneOffJob.execute();
        return "{\"msg\":\"OK\"}";
    }
}

配置错误处理策略

使用 ElasticJob 过程中当作业发生异常后,可采用以下错误处理策略。

错误处理策略名称 说明 是否内置 是否默认 是否需要额外配置
记录日志策略 记录作业异常日志,但不中断作业执行
抛出异常策略 抛出系统异常并中断作业执行
忽略异常策略 忽略系统异常且不中断作业执行
邮件通知策略 发送邮件消息通知,但不中断作业执行
企业微信通知策略 发送企业微信消息通知,但不中断作业执行
钉钉通知策略 发送钉钉消息通知,但不中断作业执行

记录日志策略

elasticjob:
  regCenter:
    ...
  jobs:
    ...
    jobErrorHandlerType: LOG 

抛出异常策略

elasticjob:
  regCenter:
    ...
  jobs:
    ...
    jobErrorHandlerType: THROW 

忽略异常策略

elasticjob:
  regCenter:
    ...
  jobs:
    ...
    jobErrorHandlerType: IGNORE 

邮件通知策略

请参考 这里 了解更多。

Maven POM:

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-email</artifactId>
    <version>${latest.release.version}</version>
</dependency>
elasticjob:
  regCenter:
    ...
  jobs:
    ...
    jobErrorHandlerType: EMAIL 
    props:
      email:
        host: host
        port: 465
        username: username
        password: password
        useSsl: true
        subject: ElasticJob error message
        from: from@xxx.xx
        to: to1@xxx.xx,to2@xxx.xx
        cc: cc@xxx.xx
        bcc: bcc@xxx.xx
        debug: false

企业微信通知策略

请参考 这里 了解更多。

Maven POM:

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-wechat</artifactId>
    <version>${latest.release.version}</version>
</dependency>
elasticjob:
  regCenter:
    ...
  jobs:
    ...
    jobErrorHandlerType: WECHAT 
    props:
      wechat:
        webhook: you_webhook
        connectTimeout: 3000
        readTimeout: 5000

钉钉通知策略

请参考 这里 了解更多。

Maven POM:

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-dingtalk</artifactId>
    <version>${latest.release.version}</version>
</dependency>
elasticjob:
  regCenter:
    ...
  jobs:
    ...
    jobErrorHandlerType: DINGTALK 
    props:
      dingtalk:
        webhook: you_webhook
        keyword: you_keyword
        secret: you_secret
        connectTimeout: 3000
        readTimeout: 5000