施用 Spring Batch 构建企业级批处理应用

使用 Spring Batch 构建企业级批处理应用引言总述本系列文章旨在通过示例搭建以及特性介绍,详细讲述如何利

使用 Spring Batch 构建企业级批处理应用
引言

总述

本系列文章旨在通过示例搭建以及特性介绍,详细讲述如何利用 Spring Batch 开发企业批处理应用。本系列文章共分为三部分,第一部分初步介绍了批处理以及 Spring Batch 的相关概念,同时搭建了一个简单的基于 Spring Batch 的批处理应用。第二部分介绍了 Step Flow 以及并发支持。第三部分则主要介绍了 Spring Batch 对任务监控的支持。下面让我们进入第一部分内容。

什么是批处理

在现代企业应用当中,面对复杂的业务以及海量的数据,除了通过庞杂的人机交互界面进行各种处理外,还有一类工作,不需要人工干预,只需要定期读入大批量数据,然后完成相应业务处理并进行归档。这类工作即为“批处理”。

从上面的描述可以看出,批处理应用有如下几个特点:

    * 数据量大,少则百万,多则上亿的数量级。
    * 不需要人工干预,由系统根据配置自动完成。
    * 与时间相关,如每天执行一次或每月执行一次。

同时,批处理应用又明显分为三个环节:

    * 读数据,数据可能来自文件、数据库或消息队列等
    * 数据处理,如电信支撑系统的计费处理
    * 写数据,将输出结果写入文件、数据库或消息队列等

因此,从系统架构上,应重点考虑批处理应用的事务粒度、日志监控、执行、资源管理(尤其存在并发的情况下)。从系统设计上,应重点考虑数据读写与业务处理的解耦,提高复用性以及可测试性。

什么是 Spring Batch

Spring Batch 作为 Spring 的子项目,是一款基于 Spring 的企业批处理框架。通过它可以构建出健壮的企业批处理应用。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

另外我们还需要知道,Spring Batch 是一款批处理应用框架,不是调度框架。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。因此,如果我们希望批处理任务定期执行,可结合 Quartz 等成熟的调度框架实现。

下面将通过一个示例详细介绍如何使用 Spring Batch 搭建批处理应用。这个示例比较简单,对系统中所有用户发送一封缴费提醒通知。此处,我们简单地将缴费提醒输出到控制台。当然,随着介绍的深入,我将逐渐丰富该功能,使其最终完整展示 Spring Batch 的各种特性。

回页首

环境搭建

首先,从 Spring 官方网站下载 Spring Batch 发布包(见 参考资源)。本文基于 Spring Batch 2.1.6(当前最新版本为 2.1.8)以及 Spring 2.5.6 版本构建。我们可以看到 Spring Batch 共包含 spring-batch-core 和 spring-batch-infrastructure 两个包。spring-batch-core 主要包含批处理领域相关类,而 spring-batch-infrastructure 提供了一个基础访问处理框架。

接下来,让我们新建一个 Eclipse 工程,并将 Spring Batch 以及 Spring 相关包添加到依赖环境,如 图 1 所示

图 1. 依赖环境


环境搭建完成后,让我们看一下如何一步步构建一个批处理应用。

回页首

构建应用

如“引言”中所述 Spring Batch 按照关注点的不同,将整个批处理过程分为三部分:读、处理、写,从而将批处理应用进行合理解耦。同时,Spring Batch 还针对读、写操作提供了多种实现,如消息、文件、数据库。对于数据库,还提供了 Hibernate、iBatis、JPA 等常见 ORM 框架的读、写接口支持。

对象定义

首先我们需要编写用户以及消息类,比较简单,如清单 1 和 清单 2 所示:

清单 1. User 类
package org.springframework.batch.sample;

public class User {
private String name;
private Integer age;
public String getName()
Unknown macro: {return name;}

public void setName(String name)
Unknown macro: {this.name = name;}

public Integer getAge()
Unknown macro: {return age;}

public void setAge(Integer age)
Unknown macro: {this.age = age;}

} |
清单 2. Message 类
package org.springframework.batch.sample;

public class Message {
private String content;
public String getContent()
Unknown macro: {return content;}

public void setContent(String content)
Unknown macro: {this.content = content;}

} |
读写及处理接口

首先,所有 Spring Batch 的读操作均需要实现 ItemReader 接口,而且 Spring Batch 为我们提供了多种默认实现,尤其是基于 ORM 框架的读接口,同时支持基于游标和分页两类操作。因此,多数情况下我们并不需要手动编写 ItemReader 类,而是直接使用相应实现类即可。

在该示例中,我们使用 org.springframework.batch.item.file.FlatFileItemReader 类从文件中进行信息读入,用户信息格式定义如 清单 3 所示。

清单 3. 用户信息
User1,20
User2,21
User3,22
User4,23
User5,24
User6,25
User7,26
User8,27
User9,28
User10,29

该类封装了文件读操作,仅仅需要我们手动设置 LineMapper 与访问文件路径即可。Spring Batch 通过 LineMapper 可以将文件中的一行映射为一个对象。我们不难发现,Spring Batch 将文件操作封装为类似 Spring JDBC 风格的接口,这也与 Spring 一贯倡导的接口统一是一致的。此处我们使用 org.springframework.batch.item.file.mapping.DefaultLineMapper 进行行映射。读操作的配置信息如 清单 4 所示:

清单 4. message_job.xml
<beans:bean id="messageReader"
>
<beans:property name="lineMapper" ref="lineMapper">
</beans:property>
<beans:property name="resource"
value="classpath:/users.txt"></beans:property>
</beans:bean>
<beans:bean id="lineMapper"
+ user.getName() + ",please pay promptly at the end of this month."); return m; }

} |
任务定义

通过上面一节,我们已经完成了批处理任务的读数据、处理过程、写数据三个过程。那么,我们如何将这三部分结合在一起完成批处理任务呢?

Spring Batch 将批处理任务称为一个 Job,同时,Job 下分为多个 Step。Step 是一个独立的、顺序的处理步骤,包含该步骤批处理中需要的所有信息。多个批处理 Step 按照一定的流程组成一个 Job。通过这样的设计方式,我们可以灵活配置 Job 的处理过程。

接下来,让我们看一下如何配置缴费通知的 Job,如 清单 8 所示:

清单 8. message_job.xml
<job id="messageJob">
<step id="messageStep">
<tasklet>
<chunk reader="messageReader" processor="messageProcessor"
writer="messageWriter" commit-interval="5"
chunk-completion-policy="">
</chunk>
</tasklet>
</step>
</job>

如上,我们定义了一个名为“messageJob”的 Job,该 Job 仅包含一个 Step。在配置 Step 的过程中,我们不仅要指定读数据、处理、写数据相关的 bean,还要指定 commit-interval 和 chunk-completion-policy 属性。前者指定了该 Step 中事务提交的粒度,取值为 5 即表明每当处理完毕读入的 5 条数据时,提交一次事务。后者指定了 Step 的完成策略,即当什么情况发生时表明该 Step 已经完成,可以转入后续处理。由于没有明确指定相应的类,Spring Batch 使用默认策略,即当读入数据为空时认为 Step 结束。

最后,我们还需要配置一个 JobRepository 并为其指定一个事务管理器,该类用于对 Job 进行管理,如 清单 9 所示:

清单 9. message_job.xml
<beans:bean id="jobRepository"
ref="transactionManager" />
</beans:bean>

<beans:bean id="transactionManager"
restartable="true">
<step id="messageStep">
<tasklet>
<chunk reader="messageReader" processor="messageProcessor"
writer="messageWriter"
commit-interval="5" chunk-completion-policy="" retry-limit="2">
<retryable-exception-classes>
<include />
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
</job>

我们可以看到,主要分两部分:首先,需要设置重试次数,其次是当执行过程中捕获到哪些异常时需要重试。如果在执行过程中捕获到重试异常列表中的异常信息,则进行重试操作。如果重试操作达到最大次数仍提示异常,则认为任务执行失败。对于异常信息的配置,除了通过 include 配置包含列表外,也可以通过 exclude 配置排除列表。

由于通过配置进行的 Step 重试是自动的,因此较难控制(多用于网络访问异常等不需要人工干预的情况)。可以考虑一下本示例,如果有一个用户的信息有问题,名字为空,不能发送缴费通知,步骤重试便不合适了,此时我们可以对 Job 进行重试操作。

Spring Batch 允许重复执行未成功的 Job,而每次执行即为一次重试操作。示例代码如 清单 14 所示:

清单 14. Main 类
Map<String,JobParameter> parameters = new HashMap<String,JobParameter>();
parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10"));
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));
Thread.sleep(10000);
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));

您可以通过如下步骤查看运行结果:首先,将 users.txt 文件中的第 7 行(之所以指定该行,便于验证事务提交以及重复执行的起始位置)的用户名修改为空。其次,运行示例。最后,在程序出现异常提示时,更新第 7 行的用户名(为了便于演示,程序在两次任务执行过程中等待 10 秒钟)。

您可以在控制台中很明显的看到,任务先打印了 5 条记录(第一次事务提交),然后出现异常信息,待我们将错误更正后,又打印了 5 条记录,任务最终成功完成。

从输出结果,我们可以知道 Spring Batch 是从出错的事务边界内第一条记录重复执行的,这样便确保了数据完整性,而且所有这一切对于用户均是透明的。

那么 Spring Batch 是如何做到这一步的呢?这与 Spring Batch 的运行时管理是分不开的。

回页首

运行时管理

Spring Batch 提供了如 表 1 所示的类用于记录每个 Job 的运行信息:

表 1. 运行时类信息
类名 描述
JobInstance 该类记录了 Job 的运行实例。举例:10 月和 11 月分别执行同一 Job,将生成两个 JobInstance。主要信息有:标识、版本、Job 名称、Job 参数
JobExecution 该类记录了 Job 的运行记录。如上面的示例,Job 第一次运行失败,第二次运行成功,那么将形成两条运行记录,但是对应的是同一个运行实例。主要信息有:Job 的运行时间、运行状态等。
JobParameters 该类记录了 Job 的运行参数
ExecutionContext 该类主要用于开发人员存储任务运行过程中的相关信息(以键值对形式),主要分为 Job 和 Step 两个范围
StepExecution 该类与 JobExecution 类似,主要记录了 Step 的运行记录。包括此次运行读取记录条数、输出记录条数、提交次数、回滚次数、读跳过条数、处理跳过条数、写跳过条数等信息

Spring Batch 通过 JobRepository 接口维护所有 Job 的运行信息,此外 JobLauncher 的 run 方法也返回一个 JobExecution 对象,通过该对象可以方便的获得 Job 其他的运行信息,代码如 清单 15 所示:

清单 15. Main 类
Map<String,JobParameter> parameters = new HashMap<String,JobParameter>();
parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10"));
JobExecution je =
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));
System.out.println(je);
System.out.println(je.getJobInstance());
System.out.println(je.getStepExecutions());

输出信息如 清单 16 所示:

清单 16. 输出结果
JobExecution: id=0, version=2, startTime=Tue Nov 15 21:00:09 CST 2011,
endTime=Tue Nov 15 21:00:09 CST 2011, lastUpdated=Tue Nov 15 21:00:09 CST 2011,
status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=,
job=[JobInstance: id=0, version=0, JobParameters=Unknown macro: {run.month=2011-10}

], Job=messageJob]

JobInstance: id=0, version=0, JobParameters=[
&linkCreation=true&fromPageId=18351269" class="createlink">
Unknown macro: {run.month=2011-10}

], Job=messageJob]

JobInstance: id=0, version=0, JobParameters=[
, Job=messageJob

[StepExecution: id=1, version=5, name=messageStep, status=COMPLETED,
exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0,
writeSkipCount=0, processSkipCount=0, commitCount=3 , rollbackCount=0,
exitDescription=] |
从日志您可以发现事务一共提交了 3 次,这与前面的说明是不一致的。之所以会如此是因为当事务提交粒度恰好可以被记录数整除时,事务会有一次空提交。

关于 Spring Batch 运行时信息管理,将在讲解 Job 监控时再详细介绍,此处不再赘述,你也可以查看 Spring Batch 参考资料了解相关信息。


总结

本文通过一个简单示例演示了如何构建 Spring Batch 应用,同时介绍了 Spring Batch 的相关核心概念。希望您通过本文可以掌握