Spring Batch + Elasticsearch + JMS
Processing data using Spring Batch , ArtemisMQ and Elasticsearch.
animal.sql
CREATE TABLE IF NOT EXISTS ANIMAL (
id int(11) PRIMARY KEY NOT NULL AUTO_INCREMENT,
scientificName VARCHAR(50),
status VARCHAR(50),
veterinarian VARCHAR(50),
created DATE
);
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.tiago</groupId>
<artifactId>spring-batch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<!-- JAVA 11+ REQ. -->
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>
application.properties
log4j.logger.org.springframework.jdbc=debug
log4j.logger.org.springframework.batch=debug
# MAIN DB
spring.datasource.jdbcUrl=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=12345678
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
# SECONDARY DB
spring.second-db.jdbcUrl=jdbc:mysql://192.168.1.7:3306/test
spring.second-db.username=api
spring.second-db.password=12345678
spring.second-db.driverClassName=com.mysql.cj.jdbc.Driver
spring.batch.initialize-schema=always
spring.datasource.initialization-mode=always
spring.main.allow-bean-definition-overriding=true
spring.batch.job.enabled=false
# ELASTIC SEARCH
elasticsearch.clustername=elasticsearch
elasticsearch.host=localhost
elasticsearch.port=9300
DatasourceConfig.java
package com.tiago.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
@Configuration
public class DatasourceConfig {
@Bean(name = "db1")
@ConfigurationProperties(prefix = "spring.datasource")
@Primary
public DataSource dataSource1() {
return DataSourceBuilder.create().build();
}
@Bean(name = "jdbcTemplate1")
public JdbcTemplate jdbcTemplate1(@Qualifier("db1") DataSource ds) {
return new JdbcTemplate(ds);
}
@Bean(name = "db2")
@ConfigurationProperties(prefix = "spring.second-db")
public DataSource dataSource2() {
return DataSourceBuilder.create().build();
}
@Bean(name = "jdbcTemplate2")
public JdbcTemplate jdbcTemplate2(@Qualifier("db2") DataSource ds) {
return new JdbcTemplate(ds);
}
}
ElasticsearchConfig.java
package com.tiago.config;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import java.net.InetAddress;
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.tiago.repository")
public class ElasticsearchConfig {
@Value("${elasticsearch.host}")
private String esHost;
@Value("${elasticsearch.port}")
private int esPort;
@Value("${elasticsearch.clustername}")
private String esClusterName;
@Bean
public Client client() throws Exception {
Settings settings = Settings.builder()
.put("cluster.name", esClusterName)
.build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new TransportAddress(InetAddress.getByName(esHost), esPort));
return client;
}
@Bean
public ElasticsearchOperations elasticsearchTemplate() throws Exception {
return new ElasticsearchTemplate(client());
}
}
JmsConfig.java
package com.tiago.config;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
@EnableJms
public class JmsConfig {
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
Animal.java
package com.tiago.model.es;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import java.util.Date;
@Document(indexName = "animal-idx", type = "animals")
public class Animal {
@Id
private String id;
private String scientificName;
private String status;
private String veterinarian;
private Date created;
}
AnimalRepository.java
package com.tiago.repository;
import com.tiago.model.es.Animal;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface AnimalRepository extends ElasticsearchRepository<Animal, String> {
}
EsItemReader.java
package com.tiago.beans.es;
import com.tiago.model.Animal;
import com.tiago.util.AnimalRowMapper;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.logging.Logger;
@Component
@StepScope
public class EsItemReader implements ItemReader<JdbcCursorItemReader<Animal>> {
private static final Logger LOGGER = Logger.getLogger(EsItemReader.class.getSimpleName());
@Value("#{jobParameters['tableName']}")
private String tableName;
@Autowired
@Qualifier("jdbcTemplate1")
private JdbcTemplate jdbcTemplate;
public JdbcCursorItemReader<Animal> read() throws Exception {
LOGGER.info("** READING DATABASE ENTRIES! **");
final String SQL = "SELECT * FROM " + this.tableName + " ORDER BY id DESC LIMIT 1";
return new JdbcCursorItemReaderBuilder<Animal>()
.sql(SQL)
.dataSource(this.jdbcTemplate.getDataSource())
.rowMapper(new AnimalRowMapper())
.name("replication-cursor")
.build();
}
}
EsItemWriter.java
package com.tiago.beans.replication;
import com.tiago.model.Animal;
import com.tiago.repository.AnimalRepository;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.logging.Logger;
@Component
public class EsItemWriter implements ItemWriter<Animal> {
private static final Logger LOGGER = Logger.getLogger(EsItemWriter.class.getSimpleName());
@Autowired
private AnimalRepository animalRepository;
@Override
public void write(List<? extends Animal> items) throws Exception {
LOGGER.info("** WRITING DATABASE ENTRIES INTO ES! **");
for (Animal item : items) {
Optional<com.tiago.model.es.Animal> animal = animalRepository.findById(String.valueOf(item.getId()));
if (animal.isPresent()) {
animal.get().setId(String.valueOf(item.getId()));
animal.get().setStatus(item.getStatus());
animal.get().setScientificName(item.getScientificName());
animal.get().setVeterinarian(item.getVeterinarian());
animal.get().setCreated(item.getCreated());
animalRepository.save(animal.get());
} else {
com.tiago.model.es.Animal a = new com.tiago.model.es.Animal();
a.setId(String.valueOf(item.getId()));
a.setStatus(item.getStatus());
a.setScientificName(item.getScientificName());
a.setVeterinarian(item.getVeterinarian());
a.setCreated(item.getCreated());
animalRepository.save(a);
}
}
}
}
EsItemProcessor.java
package com.tiago.beans.replication;
import com.tiago.model.Animal;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import java.util.logging.Logger;
@Component
public class EsItemProcessor implements ItemProcessor<Animal, Animal> {
private static final Logger LOGGER = Logger.getLogger(EsItemProcessor.class.getSimpleName());
@Override
public Animal process(Animal animal) throws Exception {
LOGGER.info("** PROCESSING DB ENTRIES! **");
if (animal.getStatus().equalsIgnoreCase("bad")) {
animal.setStatus("changed from bad to good");
}
return animal;
}
}
JobCompletionNotificationListener.java
package com.tiago.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger LOG = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@Autowired
@Qualifier("jdbcTemplate2")
private JdbcTemplate jdbcTemplate;
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOG.info("JOB FINISHED");
}
}
}
ElasticSearchJob.java
package com.tiago.jobs;
import com.tiago.beans.es.EsItemProcessor;
import com.tiago.beans.es.EsItemReader;
import com.tiago.beans.es.EsItemWriter;
import com.tiago.listener.JobCompletionNotificationListener;
import com.tiago.model.Animal;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticSearchJob {
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
private EsItemReader readerEs;
@Autowired
private EsItemWriter writerEs;
@Bean
public EsItemProcessor processorReplica() {
return new EsItemProcessor();
}
@Bean
public Job esJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilders.get("es").listener(listener)
.incrementer(new RunIdIncrementer())
.flow(step1())
.end().build();
}
@Bean
public Step step1() throws Exception {
return stepBuilders.get("step1").<Animal, Animal>chunk(1).reader(readerEs.read())
.faultTolerant()
.skipLimit(1)
.skip(NullPointerException.class)
.processor(processorReplica())
.faultTolerant()
.skipLimit(1)
.skip(NullPointerException.class)
.writer(writerEs)
.faultTolerant()
.skipLimit(1)
.skip(NullPointerException.class)
.build();
}
}
MessageDrivenBean.java
package com.tiago.mdb;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MessageDrivenBean {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@JmsListener(destination = "batch-job", containerFactory = "myFactory")
public void receiveMessage(String message) throws Exception {
System.out.println(message);
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.addString("tableName", message).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("Exit Status : " + execution.getStatus());
}
}
App.java
package com.tiago;
import com.tiago.config.DatasourceConfig;
import com.tiago.config.JmsConfig;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import org.springframework.jms.core.JmsTemplate;
@SpringBootApplication
@EnableBatchProcessing
@Import({DatasourceConfig.class, JmsConfig.class})
@ComponentScan({"com.tiago.*"})
public class App implements CommandLineRunner {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Override
public void run(String... args) throws Exception {
String tableName = "animal";
jmsTemplate.convertAndSend("batch-job", tableName);
}
}
Flat Files
Loading flat files into MySQL.
animals.txt
101120210423
28Pongo pygmaeus good John 20130415
11Pongo pygmaeus bad Ziggy 20130424
04Elephas maximus indicus bad Be 20130324
15Panthera pardus orientalis good Tiago 20130912
18Lycaon pictus good Tiago 20130220
25Gorilla beringei graueri bad Tiago 20130730
17Pongo pygmaeus bad Be 20130107
25Lycaon pictus good Be 20130928
17Panthera pardus orientalis good Tiago 20131213
25Diceros bicornis bad John 20130506
15Spheniscus mendiculus good Ziggy 20130315
29Elephas maximus indicus good John 20130912
23Panthera tigris tigris bad Ziggy 20131209
12Diceros bicornis good Fran 20131230
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.tiago</groupId>
<artifactId>spring-batch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>
src/main/java/resources/application.properties
log4j.logger.org.springframework.jdbc=debug
log4j.logger.org.springframework.batch=debug
spring.datasource.jdbcUrl=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=12345678
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.batch.initialize-schema=always
spring.datasource.initialization-mode=always
spring.main.allow-bean-definition-overriding=true
src/main/java/resources/schema.sql
CREATE TABLE IF NOT EXISTS ANIMAL (
AID int(11) PRIMARY KEY NOT NULL AUTO_INCREMENT,
id int(11) NOT NULL,
scientificName VARCHAR(50),
status VARCHAR(50),
veterinarian VARCHAR(50),
created DATE
);
JobCompletionNotificationListener.java
package com.tiago.listener;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import com.tiago.entity.Animal;
import com.tiago.util.AnimalRowMapper;
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger LOG = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOG.info("JOB FINISHED");
List<Animal> results = jdbcTemplate.query("SELECT * FROM ANIMAL", new AnimalRowMapper());
results.forEach((Animal animal) -> LOG.info("FOUND " + animal.getScientificName() + " IN THE DATABASE."));
}
}
}
FlatFileConfiguration.java
package com.tiago.config;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import com.tiago.beans.flat.FlatItemProcessor;
import com.tiago.beans.flat.FlatItemReader;
import com.tiago.beans.flat.FlatItemWriter;
import com.tiago.listener.JobCompletionNotificationListener;
import com.tiago.entity.Animal;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
@Configuration
public class FlatFileConfiguration {
@Value("${spring.datasource.jdbcUrl}")
private String url;
@Value("${spring.datasource.driverClassName}")
private String driver;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
private FlatItemReader flatItemReader;
@Autowired
private FlatItemWriter flatItemWriter;
@Bean
public ItemProcessor<Animal, Animal> processor() {
return new FlatItemProcessor();
}
@Bean
public Job flatJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilders.get("flatFiles").listener(listener)
.incrementer(new RunIdIncrementer())
.flow(step1())
.end().build();
}
@Bean
public Step step1() throws Exception {
return stepBuilders.get("step1").<Animal, Animal>chunk(1000).reader(flatItemReader.read())
.faultTolerant()
.skipLimit(1)
.skip(NullPointerException.class)
.processor(processor())
.faultTolerant()
.skipLimit(1)
.skip(NullPointerException.class)
.writer(flatItemWriter.writer(getDataSource()))
.faultTolerant()
.skipLimit(1)
.skip(NullPointerException.class)
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public DataSource getDataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(driver);
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
}
BlankLineRecordSeparatorPolicy.java
package com.tiago.util;
import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy;
public class BlankLineRecordSeparatorPolicy extends SimpleRecordSeparatorPolicy {
@Override
public boolean isEndOfRecord(String line) {
return line.trim().length() != 0 && super.isEndOfRecord(line);
}
@Override
public String postProcess(String line) {
if (line == null || line.trim().length() == 0) {
return null;
}
return super.postProcess(line);
}
@Override
public String preProcess(String line) {
if (line == null || line.trim().length() == 0) {
return null;
}
return super.preProcess(line);
}
}
AnimalRowMapper.java
package com.tiago.util;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.tiago.entity.Animal;
public class AnimalRowMapper implements RowMapper<Animal> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "scientificName";
public static final String STATUS = "status";
private static final String VETERINARIAN = "veterinarian";
private static final String CREATED = "created";
public Animal mapRow(ResultSet rs, int rowNum) throws SQLException {
Animal animal = new Animal();
animal.setId(rs.getInt(ID_COLUMN));
animal.setScientificName(rs.getString(NAME_COLUMN));
animal.setStatus(rs.getString(STATUS));
animal.setVeterinarian(rs.getString(VETERINARIAN));
animal.setCreated(rs.getDate(CREATED));
return animal;
}
}
AnimalFieldSetMapper.java
package com.tiago.util;
import com.tiago.entity.Animal;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
public class AnimalFieldSetMapper implements FieldSetMapper<Animal> {
@Override
public Animal mapFieldSet(FieldSet fieldSet) {
Animal animal = new Animal();
animal.setId(fieldSet.readInt("ID"));
animal.setScientificName(fieldSet.readString("scientificName"));
animal.setStatus(fieldSet.readString("status"));
animal.setVeterinarian(fieldSet.readString("veterinarian"));
animal.setCreated(fieldSet.readDate("created", "yyyyMMdd"));
return animal;
}
}
FlatItemReader.java
package com.tiago.beans.flat;
import com.tiago.entity.Animal;
import com.tiago.util.AnimalFieldSetMapper;
import com.tiago.util.BlankLineRecordSeparatorPolicy;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.FixedLengthTokenizer;
import org.springframework.batch.item.file.transform.Range;
import org.springframework.core.io.FileSystemResource;
import org.springframework.stereotype.Component;
import java.util.logging.Logger;
@Component
public class FlatItemReader {
private static final Logger LOGGER = Logger.getLogger("FlatItemReader.class");
private FlatFileItemReader<Animal> itemReader;
public ItemReader<Animal> read() throws Exception {
LOGGER.info("** READING FLAT FILE! **");
if (itemReader == null) {
itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("data/animals.txt"));
itemReader.setLinesToSkip(1);
DefaultLineMapper<Animal> lineMapper = new DefaultLineMapper<>();
FixedLengthTokenizer lineTokenizer = new FixedLengthTokenizer();
lineTokenizer.setNames(new String[]{"ID", "scientificName", "status", "veterinarian", "created"});
lineTokenizer.setColumns(new Range[]{new Range(1, 2), new Range(3, 32), new Range(33, 39), new Range(43, 50), new Range(93, 100)});
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(new AnimalFieldSetMapper());
itemReader.setRecordSeparatorPolicy(new BlankLineRecordSeparatorPolicy());
itemReader.setLineMapper(lineMapper);
}
return itemReader;
}
}
FlatItemProcessor.java
package com.tiago.beans.flat;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import com.tiago.entity.Animal;
import java.util.logging.Logger;
@Component
public class FlatItemProcessor implements ItemProcessor<Animal, Animal> {
private static final Logger LOGGER = Logger.getLogger("FlatItemProcessor.class");
@Override
public Animal process(Animal animal) throws Exception {
LOGGER.info("** PROCESSING FLAT FILE! **");
if(animal.getStatus().trim().equalsIgnoreCase("GOOD")) {
return animal;
}
return null;
}
}
FlatItemWriter.java
package com.tiago.beans.flat;
import javax.sql.DataSource;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.stereotype.Component;
import com.tiago.entity.Animal;
import java.util.logging.Logger;
@Component
public class FlatItemWriter {
private static final Logger LOGGER = Logger.getLogger("FlatItemWriter.class");
public ItemWriter<Animal> writer(DataSource dataSource) throws Exception {
LOGGER.info("** WRITING FLAT FILE! **");
JdbcBatchItemWriter<Animal> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO ANIMAL (id, aid, scientificName, status, veterinarian, created) "
+ " VALUES (:id, :aid, :scientificName, :status, :veterinarian, :created)");
writer.setDataSource(dataSource);
writer.setAssertUpdates(true);
writer.afterPropertiesSet();
return writer;
}
}
App.java
package com.tiago;
import com.tiago.config.DatasourceConfiguration;
import com.tiago.config.XmlFileConfiguration;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@EnableBatchProcessing
@Import({FlatFileConfiguration.class})
@ComponentScan({"com.tiago.*"})
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
System.exit(0);
}
}