国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術文章
文章詳情頁

Java操作Kafka執行不成功

瀏覽:69日期:2023-12-29 14:49:40

問題描述

使用kafka-clients操作kafka始終不成功,原因不清楚,下面貼出相關代碼及配置,請懂得指點一下,謝謝!

環境及依賴

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version></dependency>

JDK版本為1.8、Kafka版本為2.12-0.10.2.0,服務器使用CentOS-7構建。

測試代碼

TestBase.java

public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = '192.168.60.160:9092' ; protected String topic = 'zlikun_topic';}

ProducerTest.java

public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() {props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ProducerConfig.ACKS_CONFIG, 'all');props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException {KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 發送消息for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) {System.out.printf('offset = %d ,partition = %d n', recordMetadata.offset() ,recordMetadata.partition()); } else {log.error('send error !' ,e); }} });}TimeUnit.SECONDS.sleep(3);producer.close(); }}

ConsumerTest.java

public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() {props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ConsumerConfig.GROUP_ID_CONFIG ,'zlikun') ;props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 'true');props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '1000');props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() {Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));//consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value()); }} }}問題

# 測試topic為手動創建$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制臺輸出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

問題解答

回答1:

測試了下, 正常 https://github.com/MOBX/kafka...

建議檢查下kafka集群連接是否正常,你報的是TimeoutException;如果不行, kafka-clients降到0.8.2.0試試

回答2:

我把日志調成DEBUG級別,觀察日志發現是不能正確解析主機名造成的。

2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:java.io.IOException: Can’t resolve address: m160:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:182) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:107) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649) at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ... 36 more

網上找到一篇博文http://blog.sina.com.cn/s/blo...也支持了這一點,同樣我是在hosts文件中配置了主機名,測試就正常了。不過感覺這樣做似乎不太合理,實際應用中這樣用,太影響運維了吧,不知道有沒有其它更好的解決辦法。

[2017/04/11 16:16]剛從網上找到一篇文章http://www.tuicool.com/articl...,解決了這個問題!

標簽: java
相關文章:
主站蜘蛛池模板: 无遮挡一级毛片私人影院 | 亚洲视频在线观看免费 | 91影视做在线观看免费 | 欧美日韩免费一区二区三区 | 久久精品国产99精品最新 | 亚洲你我色| 韩日一级毛片 | 又摸又揉又黄又爽的视频 | 午夜久久久久久久 | 久草在线看片 | 欧美亚洲激情视频 | 99在线精品视频在线观看 | a国产成人免费视频 | 一级毛片免费观看不卡的 | 久久九九爱 | 欧美视频一区二区三区 | 99视频在线观看视频一区 | 国产精品久久久久久久久久久不卡 | 亚洲午夜片子大全精品 | 国内自拍视频在线看免费观看 | 亚洲网视频 | 6一10周岁毛片免费 6一12呦女精品 | 成年大片免费视频播放二级 | 精品免费国产一区二区三区 | 成人做爰全过程免费看网站 | www.欧美成| 免费的成人a视频在线观看 免费的毛片 | 中文字幕在线视频观看 | 亚洲片在线观看 | 我们2018在线完整免费观看 | 日韩乱码中文字幕视频 | 国产精品激情丝袜美女 | 亚洲综合精品一二三区在线 | 亚洲黄色免费在线观看 | 色婷婷久久综合中文久久蜜桃 | 亚洲欧美日韩在线不卡中文 | 欧美精品束缚一区二区三区 | 怡红院在线观看视频 | 国产精品久久久久久麻豆一区 | 久久福利资源站免费观看i 久久高清精品 | 日产一区2区三区有限公司 日产一区两区三区 |