Java操作Kafka执⾏不成功的解决⽅
法,KafkaBrokerAdvertid.L。。。
创建Spring Boot项⽬继承Kafka,向Kafka发送消息始终不成功。具体项⽬配置如下:
<?xml version="1.0" encoding="UTF-8"?>
大学之道翻译及原文<project xmlns="/POM/4.0.0" xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
乔治华盛顿大学<groupId>com.lodestone</groupId>
<artifactId>lodestone-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>lodestone-kafka</name>
<description>Lodestone kafka</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath />
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
&porting.outputEncoding>UTF-8</porting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
slipped away<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- /artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- /artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- /artifact/org.slf4j/log4j-over-slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
spring:
kafka:
bootstrap-rvers:
- 192.168.52.131:9092
consumer:
gjtauto-offt-ret: earliest
group-id: console-consumer-53989
key-derializer:org.rialization.StringDerializer
value-derializer:org.rialization.StringDerializer
producer:
key-rializer:org.rialization.StringSerializer
value-rializer:org.rialization.StringSerializer
logging:
level:
root: DEBUG
org:
springframework: DEBUG
mybatis: DEBUG
⽣产者代码:
package com.lodestone.kafka.producer;
import java.util.Date;
import java.util.UUID;姿势英文怎么说
import org.springframework.beans.factory.annotation.Autowired; import org.KafkaTemplate;
import ;
import com.alibaba.fastjson.JSON;
import com.ssage.LodestoneMessage;
@Component
public class Sender {
@Autowired
private KafkaTemplate kafkaTemplate;
public void ndMessage() {
LodestoneMessage message = new LodestoneMessage();
message.tId(UUID.randomUUID().toString().replaceAll("-", ""));
message.tMsg(UUID.randomUUID().toString());
message.tSendTime(new Date());
kafkaTemplate.nd("test", JSONString(message));
}
iquit
}
消息代码定义:
package com.ssage;
import java.io.Serializable;
import java.util.Date;
public class LodestoneMessage implements Serializable {
private static final long rialVersionUID = -6847574917429814430L; private String id;
private String msg;
private Date ndTime;
public String getId() {
return id;
}
public void tId(String id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void tMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return ndTime;
}
public void tSendTime(Date ndTime) {
this.ndTime = ndTime;
}
}
Spring Boot应⽤启动类:
package com.lodestone.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
movelikeajaggerimport t.ApplicationContext;
import com.lodestone.kafka.producer.Sender;
@SpringBootApplication
public class LodestoneKafkaApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext app = SpringApplication.run(LodestoneKafkaApplication.class, args);
//测试
for(int i=0; i<5; i++) {
Sender nder = Bean(Sender.class);
nder.ndMessage();
Thread.sleep(500);
}
}
}
将应⽤的⽇志调整位Debug级别,启动应⽤时看到如下报错:
2018-04-21 16:05:10.755 DEBUG 10272 --- [ad | producer-1] : Connection with localhost/127.0.0.1 disconnected
: Connection refud: no further informationxdc是什么意思
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_111]
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_111]
at ) ~[kafka-clients-0.10.1.1.jar:na]
at ) ~[kafka-clients-0.10.1.1.jar:na]
at ) [kafka-clients-0.10.1.1.jar:na]
at ) [kafka-clients-0.10.1.1.jar:na]
at ) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) [kafka-clients-0.10.1.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]
可以看到报错第⼀句显⽰:Connection with localhost/127.0.0.1 disconnected
但是我明明在l中配置了我的Kafka Server的地址是:192.168.52.131:9092,⽽在实际连接kafka服务器时却使⽤的localhost/127.0.0.1这个地址,所以导致⽆法连接kafka Server。
经过百度,得知,在设置Kafka的时候,需要设置advertid.listeners这个属性。
该属性在config/rver.properties中的描述如下:
# Hostname and port the broker will adverti to producers and consumers. If not t,
# it us the value for "listeners" if configured. Otherwi, it will u the value
# returned from ().
# advertid.listeners=PLAINTEXT://:your.host.name:9092
翻译过来就是hostname和端⼝是⽤来建议给⽣产者和消费者使⽤的,如果没有设置,将会使⽤listeners的配置,如果listeners也没有配置,将使⽤()来获取这个hostname和port,对于ipv4,基本就是localhost了。
"PLAINTEXT"表⽰协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以⽤"0.0.0.0"表⽰对所有的⽹络接⼝有效,如果hostname为空表⽰只对默认的⽹络接⼝有效
也就是说如果你没有配置advertid.listeners,就使⽤listeners的配置通告给消息的⽣产者和消费者,这个过程是在⽣产者和消费者获取源数据(metadata)。
zambiapensioner因此重新设置advertid.listeners为如下:
advertid.listeners=PLAINTEXT://192.168.52.131:9092
需要注意的是,如果Kafka有多个节点,那么需要每个节点都按照这个节点的实际hostname和port情况进⾏设置。
每个节点都设置之后,再重新启动Spring Boot应⽤,则能够正常连接Kafka Server,并能够正常发送消息了。
部分援引和参考: