首页 > 编程 > Java > 正文

基于Spring Batch向Elasticsearch批量导入数据示例

2019-11-26 10:14:54
字体:
来源:转载
供稿:网友

1.介绍

当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。

2.示例

2.1 pom.xml

本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.hfcsbc.estl</groupId>  <artifactId>es-etl</artifactId>  <version>0.0.1-SNAPSHOT</version>  <packaging>jar</packaging>  <name>es-etl</name>  <description>Demo project for Spring Boot</description>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.0.0.M7</version>    <relativePath/> <!-- lookup parent from repository -->  </parent>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>    <java.version>1.8</java.version>  </properties>  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-data-jpa</artifactId>    </dependency>    <dependency>      <groupId>org.postgresql</groupId>      <artifactId>postgresql</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-batch</artifactId>    </dependency>    <dependency>      <groupId>com.github.vanroy</groupId>      <artifactId>spring-boot-starter-data-jest</artifactId>      <version>3.0.0.RELEASE</version>    </dependency>    <dependency>      <groupId>io.searchbox</groupId>      <artifactId>jest</artifactId>      <version>5.3.2</version>    </dependency>    <dependency>      <groupId>org.projectlombok</groupId>      <artifactId>lombok</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>  </dependencies>  <build>    <plugins>      <plugin>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>      </plugin>    </plugins>  </build>  <repositories>    <repository>      <id>spring-snapshots</id>      <name>Spring Snapshots</name>      <url>https://repo.spring.io/snapshot</url>      <snapshots>        <enabled>true</enabled>      </snapshots>    </repository>    <repository>      <id>spring-milestones</id>      <name>Spring Milestones</name>      <url>https://repo.spring.io/milestone</url>      <snapshots>        <enabled>false</enabled>      </snapshots>    </repository>  </repositories>  <pluginRepositories>    <pluginRepository>      <id>spring-snapshots</id>      <name>Spring Snapshots</name>      <url>https://repo.spring.io/snapshot</url>      <snapshots>        <enabled>true</enabled>      </snapshots>    </pluginRepository>    <pluginRepository>      <id>spring-milestones</id>      <name>Spring Milestones</name>      <url>https://repo.spring.io/milestone</url>      <snapshots>        <enabled>false</enabled>      </snapshots>    </pluginRepository>  </pluginRepositories></project>

2.2 实体类及repository

package com.hfcsbc.esetl.domain;import lombok.Data;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.OneToOne;/** * Create by pengchao on 2018/2/23 */@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")@Entity@Datapublic class Person {  @Id  private Long id;  private String name;  @OneToOne  @Field(type = FieldType.Nested)  private Address address;}
package com.hfcsbc.esetl.domain;import lombok.Data;import javax.persistence.Entity;import javax.persistence.Id;/** * Create by pengchao on 2018/2/23 */@Entity@Datapublic class Address {  @Id  private Long id;  private String name;}
package com.hfcsbc.esetl.repository.jpa;import com.hfcsbc.esetl.domain.Person;import org.springframework.data.jpa.repository.JpaRepository;/** * Create by pengchao on 2018/2/23 */public interface PersonRepository extends JpaRepository<Person, Long> {}
package com.hfcsbc.esetl.repository.es;import com.hfcsbc.esetl.domain.Person;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;/** * Create by pengchao on 2018/2/23 */public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {}

2.3 配置elasticsearchItemWriter

package com.hfcsbc.esetl.itemWriter;import com.hfcsbc.esetl.repository.es.EsPersonRepository;import com.hfcsbc.esetl.domain.Person;import org.springframework.batch.core.ExitStatus;import org.springframework.batch.core.ItemWriteListener;import org.springframework.batch.core.StepExecution;import org.springframework.batch.core.StepExecutionListener;import org.springframework.batch.item.ItemWriter;import java.util.List;/** * Create by pengchao on 2018/2/23 */public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {  private EsPersonRepository personRepository;  public ElasticsearchItemWriter(EsPersonRepository personRepository) {    this.personRepository = personRepository;  }  @Override  public void beforeWrite(List<? extends Person> items) {  }  @Override  public void afterWrite(List<? extends Person> items) {  }  @Override  public void onWriteError(Exception exception, List<? extends Person> items) {  }  @Override  public void beforeStep(StepExecution stepExecution) {  }  @Override  public ExitStatus afterStep(StepExecution stepExecution) {    return null;  }  @Override  public void write(List<? extends Person> items) throws Exception {    //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引    personRepository.saveAll(items);  }}

2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemReader;import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;import org.springframework.beans.factory.InitializingBean;import org.springframework.data.elasticsearch.core.ElasticsearchOperations;import org.springframework.data.elasticsearch.core.query.SearchQuery;import java.util.Iterator;/** * Create by pengchao on 2018/2/24 */public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean {  private final ElasticsearchOperations elasticsearchOperations;  private final SearchQuery query;  private final Class<? extends Person> targetType;  public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) {    this.elasticsearchOperations = elasticsearchOperations;    this.query = query;    this.targetType = targetType;  }  @Override  protected Iterator<Person> doPageRead() {    return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator();  }  @Override  public void afterPropertiesSet() throws Exception {  }}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;import com.hfcsbc.esetl.repository.es.EsPersonRepository;import com.hfcsbc.esetl.domain.Person;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.JpaPagingItemReader;import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.transaction.PlatformTransactionManager;import javax.persistence.EntityManagerFactory;import javax.sql.DataSource;/** * Create by pengchao on 2018/2/23 */@Configuration@EnableBatchProcessingpublic class BatchConfig {  @Autowired  private EsPersonRepository personRepository;  @Bean  public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){    JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();    String sqlQuery = "select * from person";    try {      JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();      queryProvider.setSqlQuery(sqlQuery);      queryProvider.setEntityClass(Person.class);      queryProvider.afterPropertiesSet();      reader.setEntityManagerFactory(entityManagerFactory);      reader.setPageSize(10000);      reader.setQueryProvider(queryProvider);      reader.afterPropertiesSet();      reader.setSaveState(true);    } catch (Exception e) {      e.printStackTrace();    }    return reader;  }  @Bean  public ElasticsearchItemWriter itemWriter(){    return new ElasticsearchItemWriter(personRepository);  }  @Bean  public Step step(StepBuilderFactory stepBuilderFactory,           ItemReader itemReader,           ItemWriter itemWriter){    return stepBuilderFactory        .get("step1")        .chunk(10000)        .reader(itemReader)        .writer(itemWriter)        .build();  }  @Bean  public Job job(JobBuilderFactory jobBuilderFactory, Step step){    return jobBuilderFactory        .get("importJob")        .incrementer(new RunIdIncrementer())        .flow(step)        .end()        .build();  }  /**   * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource   * @param dataSource   * @param manager   * @return   */  @Bean  public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();    jobRepositoryFactoryBean.setDataSource(dataSource);    jobRepositoryFactoryBean.setTransactionManager(manager);    jobRepositoryFactoryBean.setDatabaseType("postgres");    try {      return jobRepositoryFactoryBean.getObject();    } catch (Exception e) {      e.printStackTrace();    }    return null;  }}

2.6配置数据库及es的连接地址

spring: redis:  host: 192.168.1.222 data:  jest:   uri: http://192.168.1.222:9200   username: elastic   password: changeme jpa:  database: POSTGRESQL  show-sql: true  hibernate:   ddl-auto: update datasource:  platform: postgres  url: jdbc:postgresql://192.168.1.222:5433/person  username: hfcb  password: hfcb  driver-class-name: org.postgresql.Driver  max-active: 2spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")public class EsEtlApplication {  public static void main(String[] args) {    SpringApplication.run(EsEtlApplication.class, args);  }}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表