ElasticJob 提供自定义的 Spring Boot Starter,可以与 Spring Boot 配合使用。 基于 ElasticJob Spring Boot Starter 使用 ElasticJob ,用户无需手动创建 CoordinatorRegistryCenter、JobBootstrap 等实例, 只需实现核心作业逻辑并辅以少量配置,即可利用轻量、无中心化的 ElasticJob 解决分布式调度问题。
以下内容仅通过 Spring Boot 3 作为演示。 相关内容在 Spring Boot 2 上仍可能有效,但由于 Spring Boot 2 已经结束维护,不对 Spring Boot 2 做任何可用性假设。
作业逻辑实现与 ElasticJob 的其他使用方式并没有较大的区别,只需将当前作业注册为 Spring 容器中的 bean。
线程安全问题
Bean 默认是单例的,如果该作业实现会在同一个进程内被创建出多个 JobBootstrap
的实例,
可以考虑设置 Scope 为 prototype
。
@Component
public class SpringBootDataflowJob implements DataflowJob<Foo> {
@Override
public List<Foo> fetchData(final ShardingContext shardingContext) {
// 获取数据
}
@Override
public void processData(final ShardingContext shardingContext, final List<Foo> data) {
// 处理数据
}
}
在配置文件中指定 ElasticJob 所使用的 Zookeeper。配置前缀为 elasticjob.reg-center
。
elasticjob.jobs
是一个 Map,key 为作业名称,value 为作业类型与配置。
Starter 会根据该配置自动创建 OneOffJobBootstrap
或 ScheduleJobBootstrap
的实例并注册到 Spring 容器中。
配置参考:
elasticjob:
regCenter:
serverLists: localhost:6181
namespace: elasticjob-springboot
jobs:
dataflowJob:
elasticJobClass: org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
scriptJob:
elasticJobType: SCRIPT
cron: 0/10 * * * * ?
shardingTotalCount: 3
props:
script.command.line: "echo SCRIPT Job: "
定时调度作业在 Spring Boot 应用程序启动完成后会自动启动,无需其他额外操作。
一次性调度的作业的执行权在开发者手中,开发者可以在需要调用作业的位置注入 OneOffJobBootstrap
,
通过 execute()
方法执行作业。
用户不应该使用 jakarta.annotation.Resource
等部分违反 Spring Boot 最佳实践的注解来注入定义的一次性任务的 Spring Bean。
OneOffJobBootstrap
bean 的名称通过属性 jobBootstrapBeanName 配置,注入时需要指定依赖的 bean 名称。
具体配置请参考配置文档。
elasticjob:
jobs:
myOneOffJob:
elasticJobType: SCRIPT
jobBootstrapBeanName: myOneOffJobBean
shardingTotalCount: 9
props:
script.command.line: "echo Manual SCRIPT Job: "
import org.apache.shardingsphere.elasticjob.bootstrap.type.OneOffJobBootstrap;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Objects;
@RestController
public class OneOffJobController {
// 通过 "@Autowired" 注入
@Autowired
@Qualifier("myOneOffJobBean")
private ObjectProvider<OneOffJobBootstrap> myOneOffJobProvider;
@GetMapping("/execute2")
public String executeOneOffJob2() {
OneOffJobBootstrap myOneOffJob = myOneOffJobProvider.getIfAvailable();
Objects.requireNonNull(myOneOffJob);
myOneOffJob.execute();
return "{\"msg\":\"OK\"}";
}
}
使用 ElasticJob 过程中当作业发生异常后,可采用以下错误处理策略。
错误处理策略名称 | 说明 | 是否内置 | 是否默认 | 是否需要额外配置 |
---|---|---|---|---|
记录日志策略 | 记录作业异常日志,但不中断作业执行 | 是 | 是 | |
抛出异常策略 | 抛出系统异常并中断作业执行 | 是 | ||
忽略异常策略 | 忽略系统异常且不中断作业执行 | 是 | ||
邮件通知策略 | 发送邮件消息通知,但不中断作业执行 | 是 | ||
企业微信通知策略 | 发送企业微信消息通知,但不中断作业执行 | 是 | ||
钉钉通知策略 | 发送钉钉消息通知,但不中断作业执行 | 是 |
elasticjob:
regCenter:
...
jobs:
...
jobErrorHandlerType: LOG
elasticjob:
regCenter:
...
jobs:
...
jobErrorHandlerType: THROW
elasticjob:
regCenter:
...
jobs:
...
jobErrorHandlerType: IGNORE
请参考 这里 了解更多。
Maven POM:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-email</artifactId>
<version>${latest.release.version}</version>
</dependency>
elasticjob:
regCenter:
...
jobs:
...
jobErrorHandlerType: EMAIL
props:
email:
host: host
port: 465
username: username
password: password
useSsl: true
subject: ElasticJob error message
from: from@xxx.xx
to: to1@xxx.xx,to2@xxx.xx
cc: cc@xxx.xx
bcc: bcc@xxx.xx
debug: false
请参考 这里 了解更多。
Maven POM:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-wechat</artifactId>
<version>${latest.release.version}</version>
</dependency>
elasticjob:
regCenter:
...
jobs:
...
jobErrorHandlerType: WECHAT
props:
wechat:
webhook: you_webhook
connectTimeout: 3000
readTimeout: 5000
请参考 这里 了解更多。
Maven POM:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-dingtalk</artifactId>
<version>${latest.release.version}</version>
</dependency>
elasticjob:
regCenter:
...
jobs:
...
jobErrorHandlerType: DINGTALK
props:
dingtalk:
webhook: you_webhook
keyword: you_keyword
secret: you_secret
connectTimeout: 3000
readTimeout: 5000