首页 > 编程 > Java > 正文

Spring Boot集成Kafka的示例代码

2019-11-26 10:06:02
字体:
来源:转载
供稿:网友

本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

  1. Ubuntu 16.04 LTS
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.laravelshao.springboot</groupId>  <artifactId>spring-boot-integration-kafka</artifactId>  <version>0.0.1-SNAPSHOT</version>  <packaging>jar</packaging>  <name>spring-boot-integration-kafka</name>  <description>Demo project for Spring Boot</description>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.0.0.RELEASE</version>    <relativePath/> <!-- lookup parent from repository -->  </parent>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>    <java.version>1.8</java.version>  </properties>  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter</artifactId>    </dependency>    <!--kafka-->    <dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-json</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>  </dependencies>  <build>    <plugins>      <plugin>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>      </plugin>    </plugins>  </build></project>

2.添加配置信息,这里使用yml文件

spring: kafka:  bootstrap-servers:X.X.X.X:9092  producer:   value-serializer: org.springframework.kafka.support.serializer.JsonSerializer  consumer:   group-id: test   auto-offset-reset: earliest   value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer   properties:    spring:     json:      trusted:       packages: com.laravelshao.springboot.kafka

3.创建消息对象

public class Message {  private Integer id;  private String msg;  public Message() {  }  public Message(Integer id, String msg) {    this.id = id;    this.msg = msg;  }  public Integer getId() {    return id;  }  public void setId(Integer id) {    this.id = id;  }  public String getMsg() {    return msg;  }  public void setMsg(String msg) {    this.msg = msg;  }  @Override  public String toString() {    return "Message{" +        "id=" + id +        ", msg='" + msg + '/'' +        '}';  }}

4.创建生产者

package com.laravelshao.springboot.kafka;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/** * Created by shaoqinghua on 2018/3/23. */@Componentpublic class Producer {  private static Logger log = LoggerFactory.getLogger(Producer.class);  @Autowired  private KafkaTemplate kafkaTemplate;  public void send(String topic, Message message) {    kafkaTemplate.send(topic, message);    log.info("Producer->topic:{}, message:{}", topic, message);  }}

5.创建消费者,使用@ KafkaListener注解监听主题

package com.laravelshao.springboot.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * Created by shaoqinghua on 2018/3/23. */@Componentpublic class Consumer {  private static Logger log = LoggerFactory.getLogger(Consumer.class);  @KafkaListener(topics = "test_topic")  public void receive(ConsumerRecord<String, Message> consumerRecord) {    log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());  }}

6.发送消费测试

package com.laravelshao.springboot;import com.laravelshao.springboot.kafka.Message;import com.laravelshao.springboot.kafka.Producer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ApplicationContext;@SpringBootApplicationpublic class IntegrationKafkaApplication {  public static void main(String[] args) throws InterruptedException {    ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);    Producer producer = context.getBean(Producer.class);    for (int i = 1; i < 10; i++) {      producer.send("test_topic", new Message(i, "test topic message " + i));      Thread.sleep(2000);    }  }}

可以依次看到发送消息,消费消息

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

spring: kafka:  consumer:   properties:    spring:     json:      trusted:       packages: com.laravelshao.springboot.kafka

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表