1.介绍
当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。
2.示例
2.1 pom.xml
本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hfcsbc.estl</groupId> <artifactId>es-etl</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>es-etl</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.M7</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.github.vanroy</groupId> <artifactId>spring-boot-starter-data-jest</artifactId> <version>3.0.0.RELEASE</version> </dependency> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>5.3.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories></project>
2.2 实体类及repository
package com.hfcsbc.esetl.domain;import lombok.Data;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.OneToOne;/** * Create by pengchao on 2018/2/23 */@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")@Entity@Datapublic class Person { @Id private Long id; private String name; @OneToOne @Field(type = FieldType.Nested) private Address address;}
package com.hfcsbc.esetl.domain;import lombok.Data;import javax.persistence.Entity;import javax.persistence.Id;/** * Create by pengchao on 2018/2/23 */@Entity@Datapublic class Address { @Id private Long id; private String name;}
package com.hfcsbc.esetl.repository.jpa;import com.hfcsbc.esetl.domain.Person;import org.springframework.data.jpa.repository.JpaRepository;/** * Create by pengchao on 2018/2/23 */public interface PersonRepository extends JpaRepository<Person, Long> {}
package com.hfcsbc.esetl.repository.es;import com.hfcsbc.esetl.domain.Person;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;/** * Create by pengchao on 2018/2/23 */public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {}
2.3 配置elasticsearchItemWriter
package com.hfcsbc.esetl.itemWriter;import com.hfcsbc.esetl.repository.es.EsPersonRepository;import com.hfcsbc.esetl.domain.Person;import org.springframework.batch.core.ExitStatus;import org.springframework.batch.core.ItemWriteListener;import org.springframework.batch.core.StepExecution;import org.springframework.batch.core.StepExecutionListener;import org.springframework.batch.item.ItemWriter;import java.util.List;/** * Create by pengchao on 2018/2/23 */public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener { private EsPersonRepository personRepository; public ElasticsearchItemWriter(EsPersonRepository personRepository) { this.personRepository = personRepository; } @Override public void beforeWrite(List<? extends Person> items) { } @Override public void afterWrite(List<? extends Person> items) { } @Override public void onWriteError(Exception exception, List<? extends Person> items) { } @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null; } @Override public void write(List<? extends Person> items) throws Exception { //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引 personRepository.saveAll(items); }}
2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)
package com.hfcsbc.esetl.itemReader;import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;import org.springframework.beans.factory.InitializingBean;import org.springframework.data.elasticsearch.core.ElasticsearchOperations;import org.springframework.data.elasticsearch.core.query.SearchQuery;import java.util.Iterator;/** * Create by pengchao on 2018/2/24 */public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean { private final ElasticsearchOperations elasticsearchOperations; private final SearchQuery query; private final Class<? extends Person> targetType; public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) { this.elasticsearchOperations = elasticsearchOperations; this.query = query; this.targetType = targetType; } @Override protected Iterator<Person> doPageRead() { return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator(); } @Override public void afterPropertiesSet() throws Exception { }}
2.5 配置spring batch需要的配置
package com.hfcsbc.esetl.config;import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;import com.hfcsbc.esetl.repository.es.EsPersonRepository;import com.hfcsbc.esetl.domain.Person;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.JpaPagingItemReader;import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.transaction.PlatformTransactionManager;import javax.persistence.EntityManagerFactory;import javax.sql.DataSource;/** * Create by pengchao on 2018/2/23 */@Configuration@EnableBatchProcessingpublic class BatchConfig { @Autowired private EsPersonRepository personRepository; @Bean public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){ JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>(); String sqlQuery = "select * from person"; try { JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>(); queryProvider.setSqlQuery(sqlQuery); queryProvider.setEntityClass(Person.class); queryProvider.afterPropertiesSet(); reader.setEntityManagerFactory(entityManagerFactory); reader.setPageSize(10000); reader.setQueryProvider(queryProvider); reader.afterPropertiesSet(); reader.setSaveState(true); } catch (Exception e) { e.printStackTrace(); } return reader; } @Bean public ElasticsearchItemWriter itemWriter(){ return new ElasticsearchItemWriter(personRepository); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader itemReader, ItemWriter itemWriter){ return stepBuilderFactory .get("step1") .chunk(10000) .reader(itemReader) .writer(itemWriter) .build(); } @Bean public Job job(JobBuilderFactory jobBuilderFactory, Step step){ return jobBuilderFactory .get("importJob") .incrementer(new RunIdIncrementer()) .flow(step) .end() .build(); } /** * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource * @param dataSource * @param manager * @return */ @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(manager); jobRepositoryFactoryBean.setDatabaseType("postgres"); try { return jobRepositoryFactoryBean.getObject(); } catch (Exception e) { e.printStackTrace(); } return null; }}
2.6配置数据库及es的连接地址
spring: redis: host: 192.168.1.222 data: jest: uri: http://192.168.1.222:9200 username: elastic password: changeme jpa: database: POSTGRESQL show-sql: true hibernate: ddl-auto: update datasource: platform: postgres url: jdbc:postgresql://192.168.1.222:5433/person username: hfcb password: hfcb driver-class-name: org.postgresql.Driver max-active: 2spring.batch.initialize-schema: always
2.7 配置入口类
package com.hfcsbc.esetl;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")public class EsEtlApplication { public static void main(String[] args) { SpringApplication.run(EsEtlApplication.class, args); }}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。
新闻热点
疑难解答