spring batch 学习 (转载)


1简介

SpringBatch是一个批处理的框架,作为一个Spring组件,提供了通过使用Spring的依赖注入(dependency injection)来处理批处理的条件。

业务场景

·周期性的提交批处理

·把一个任务并行处理

·消息驱动应用分级处理

·大规模并行批处理

·手工或调度使任务失败之后重新启动

·有依赖步骤的顺序执行(使用工作流驱动扩展)

·处理时跳过部分记录

·成批事务:为小批量的或有的存储过程/脚本的场景使用

SpringBatch使用三层架构,三层分别为应用、核心和基础服务。应用层是用户写的批处理任务。核心层包含执行和控制任务必须的核心类。如JobLauncherJobStep的实现。应用和核心层基于一层公用的基础服务。基础服务包括通用的ReaderWritersRetryTemplate

Spring Batch作为Spring的子项目,是一款基于Spring的企业批处理框架。通过它可以构建出健壮的企业批处理应用。Spring Batch不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

Spring Batch是一款批处理应用框架,不是调度框架。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。因此,如果我们希望批处理任务定期执行,可结合Quartz等成熟的调度框架实现。

2任务定义

Spring Batch将批处理任务称为一个Job,同时,Job下分为多个StepStep是一个独立的、顺序的处理步骤,包含该步骤批处理中需要的所有信息。多个批处理Step按照一定的流程组成一个Job

<job id="messageJob">

<step id="messageStep">

<tasklet>

<chunk reader="messageReader" processor="messageProcessor"

writer="messageWriter" commit-interval="5"

chunk-completion-policy="">

</chunk>

</tasklet>

</step>

</job>

commit-intervalchunk-completion-policy属性。前者指定了该Step中事务提交的粒度,取值为5即表明每当处理完毕读入的5条数据时,提交一次事务。后者指定了Step的完成策略,即当什么情况发生时表明该Step已经完成,可以转入后续处理。如果没有明确指定相应的类,Spring Batch使用默认策略,即当读入数据为空时认为Step结束。

3运行时管理

Spring Batch提供了如表1所示的类用于记录每个Job的运行信息:

1.运行时类信息

类名

描述

JobInstance

该类记录了 Job 的运行实例。举例:10 月和 11 月分别执行同一 Job,将生成两个 JobInstance。主要信息有:标识、版本、Job 名称、Job 参数

JobExecution

该类记录了 Job 的运行记录。如上面的示例,Job 第一次运行失败,第二次运行成功,那么将形成两条运行记录,但是对应的是同一个运行实例。主要信息有:Job 的运行时间、运行状态等。

JobParameters

该类记录了 Job 的运行参数

ExecutionContext

该类主要用于开发人员存储任务运行过程中的相关信息(以键值对形式),主要分为 Job 和 Step 两个范围

StepExecution

该类与 JobExecution 类似,主要记录了 Step 的运行记录。包括此次运行读取记录条数、输出记录条数、提交次数、回滚次数、读跳过条数、处理跳过条数、写跳过条数等信息

Spring Batch通过JobRepository接口维护所有Job的运行信息,此外JobLauncherrun方法也返回一个JobExecution对象,通过该对象可以方便的获得Job其他的运行信息,代码如下所示:

ClassPathXmlApplicationContext c =

new ClassPathXmlApplicationContext("message_job.xml");

SimpleJobLauncher launcher = new SimpleJobLauncher();

launcher.setJobRepository((JobRepository) c.getBean("jobRepository

Map<String,JobParameter> parameters = new HashMap<String,JobParameter>();

parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10"));

JobExecution je =

launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));

System.out.println(je);

System.out.println(je.getJobInstance());

System.out.println(je.getStepExecutions());

4常用组件

Spring batch现有的读写操作已经支持对文本文件、XML文件、数据库等支持,基本的读写类库如下:

FlatFileItemReader

FlatFileItemWriter

StaxEventItemReader

StaxEventItemWriter

JdbcCursorItemReader/JdbcPagingItemReader

JdbcBatchItemWriter

Spring Batch学习总结

5动态参数传递

步骤一:启动job时,设定参数JobParameters。如下:

HashMap<String, JobParameter> parameters = new HashMap<String, JobParameter>();

parameters.put("database.sql", new JobParameter("insert into ps.TDR_PCC_UPSELL_DATA values(1397724800, 8, '4', 'FUP_Month_jx_1', 'FUP_Month_jx_2', 1, 1, 2,4)"));

parameters.put("database.output.file", new JobParameter("file:d:/temp/new/ymq.csv"));

JobParameters jobParameters = new JobParameters(parameters);

JobExecution result = launcher.run(job, jobParameters);

步骤二:在处理方法中,获取参数,方法如下,注意必须设定scope="step"

<step id="step_procedure" next="step_output">

<tasklet ref="callProcedure" transaction-manager="transactionManager">

</tasklet>

</step>

<bean:bean id="callProcedure"scope="step"

class="com.abc.springbatch.ProcedureTaskLet">

<bean:property name="dataSource" ref="iqDataSource" />

<bean:property name="sql" value="#{jobParameters['database.sql']}" />

</bean:bean>

6从数据库中读取数据

<step id="step_output">

<tasklet transaction-manager="transactionManager">

<chunk reader="outputDataReader" writer="outputDataWriter2" processor="dataItemProcessor" commit-interval="10"/>

</tasklet>

</step>

<bean:bean id="outputDataReader" scope="step"

class="org.springframework.batch.item.database.JdbcCursorItemReader">

<bean:property name="dataSource" ref="iqDataSource" /> --设定数据源

<bean:property name="sql" value="#{jobParameters['database.searchsql']}" /> --查询SQL

<bean:property name="rowMapper" ref="mapperUpsellData" /> --从查询结果到java对象的转换

</bean:bean>

<bean:bean id="mapperUpsellData" class="org.springframework.jdbc.core.BeanPropertyRowMapper">

<bean:property name="mappedClass" value="com.abc.bean.Data"/>

</bean:bean>

7读取结果处理输出到文件

7.1从java对象中直接取部分字段输出

<bean:bean id="outputDataWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">

<bean:property name="resource" value="#{jobParameters['database.output.file']}"/>

<bean:property name="lineAggregator">

<bean:bean

class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">

<bean:property name="delimiter" value=","></bean:property>//各字段之间的分割符

<bean:property name="fieldExtractor">

<bean:bean

class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">

<!--bean对象中某些字段进行输出-->

<bean:property name="names"

value="subscriberIdentifier,originalServiceName,targetServiceName,period,consumption1,consumption2,consumption3">

</bean:property>

</bean:bean>

</bean:property>

</bean:bean>

</bean:property>

</bean:bean>

7.2特殊格式处理的输出

将读取结果进行一定的格式处理再进行输出,格式的处理如果比较复杂,需要自己实现process进行处理,然后在write中进行输出。

<step id="step_output">

<tasklet transaction-manager="transactionManager">

<chunk reader="outputDataReader" writer="outputDataWriter2" processor="dataItemProcessor" commit-interval="10"/>

</tasklet>

</step>

<bean:bean id="dataItemProcessor" class="com.abc.springbatch.DataItemProcessor">

</bean:bean>

<bean:bean id="outputDataWriter2" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">

<bean:property name="resource" value="#{jobParameters['database.output.file']}"/>

<bean:property name="lineAggregator">

<bean:bean

class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">

<!--下面这个分隔符设置基本没有,可以删除-->

<bean:property name="delimiter" value=","></bean:property>

<!--下面这个类直接process过程将传递过来的对象进行输出-->

<bean:property name="fieldExtractor">

<bean:bean

class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor">

</bean:bean>

</bean:property>

</bean:bean>

</bean:property>

</bean:bean>

Process类的实现:

public class DataItemProcessor implements ItemProcessor<UpsellData, String>

{

@Override

public String process(UpsellData item) throws Exception

{

//进行格式的特殊处理后输出

return item.getOutputString(",");

}

}

8某个步骤只执行存储过程

直接在tasklet中实现,方法如下:

<step id="step_procedure" next="step_output">

<tasklet ref="callProcedure" transaction-manager="transactionManager">

</tasklet>

</step>

不需要再实现chunk,因为chunk必须有readerwriter

9步骤之间传递参数

各步骤的处理不能在tasklet中实现,而是在chunk中实现。实现细节参见官方文档11.8. Passing Data to Future Steps

转自:http://blog.csdn.net/takeiteasy2009/article/details/8556251

优质内容筛选与推荐>>
1、将 &#36152&#26131 转换成gb2312中文编码
2、DS博客作业01--日期抽象数据类型设计与实现
3、.net core 配制文件读取操作
4、利用XAF中的FeatureCenter例子的,直接打开DetailView
5、CentOS 下PHP的卸载


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号