博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
关于kafka-clients JAVA API的基本使用
阅读量:6625 次
发布时间:2019-06-25

本文共 3429 字,大约阅读时间需要 11 分钟。

首先老规矩, 引入maven依赖

1 
2
org.apache.kafka
3
kafka-clients
4
1.0.0
5

关于kafka-clients的消息生产者: 

1 @Slf4j 2 public class KafkaProducerClient { 3  4     public static void pushMsg(String msg) throws Exception { 5         Properties props = new Properties(); 6         props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS); 7         props.put("acks", "0"); 8         props.put("retries", 0); 9         props.put("batch.size", 16384);10         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");11         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");12         Producer producer = new KafkaProducer(props);13         ProducerRecord
record = new ProducerRecord<>(KafkaConstant.KAFKA_TOPIC_NAME, 0, "123", msg);14 producer.send(record, new Callback() {15 @Override16 public void onCompletion(RecordMetadata metadata, Exception e) {17 if (e != null) {18 e.printStackTrace();19 }20 log.info("pushMsg of msg: {}, metadata: {}", msg, metadata);21 }22 });23 producer.close();24 }25 26 }

关于kafka-clients的消息消费者

1 @Slf4j 2 public class KafkaConsumerClient extends Thread { 3      4     private KafkaConsumerClient() { 5     } 6  7     /** 8      * 初始化consumer 9      */10     public void initKafkaConsumer () {11         log.info("init Kafka Consumer");12         new KafkaConsumerClient().start();13     }14     15     @Override16     public void run() {17         Properties props = new Properties();18 19         props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS);20         props.put("group.id", "1");21         props.put("enable.auto.commit", "true");22         props.put("auto.commit.interval.ms", "1000");23         props.put("session.timeout.ms", "30000");24         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");25         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");26 27         Consumer
consumer = new KafkaConsumer<>(props);28 consumer.subscribe(Arrays.asList(KafkaConstant.KAFKA_TOPIC_NAME));29 consumer.seekToBeginning(new ArrayList<>());30 31 // ===== 拿到所有的topic ===== //32 Map
> listTopics = consumer.listTopics();33 Set
>> entries = listTopics.entrySet();34 35 while (true) {36 ConsumerRecords
records = consumer.poll(1000 * 60);37 for(ConsumerRecord
record : records) {38 System.out.println("[fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value() + "]");39 }40 }41 }42 }

我们需要在项目启动的时候将消费者consumer启动起来

1 

然后调用生产者producer时, 消费者consumer就能拿到消息

如: 

1 @Override 2 public void pushMsgById(Long id) throws Exception { 3     User user = new User(); 4     user.setId(id); 5     user.setUsername("test11111111"); 6     user.setPassword("test22222222"); 7     String str = JsonUtil.toCompactJsonString(user); 8     log.info("pushMsgById is user: {}", str); 9     KafkaProducerClient.pushMsg(str);10 }

得到结果: 

 

转载于:https://www.cnblogs.com/yanwu0527/p/9083386.html

你可能感兴趣的文章
醒醒吧少年,只用Cucumber不能帮助你BDD
查看>>
众安质量学堂文章汇总
查看>>
AsyncHttpSupport并发发送请求
查看>>
一名女程序员对iOS的想法
查看>>
Cloud Native未来值得关注的方向:Service Mesh简介
查看>>
西班牙现新型电费退款网络诈骗 侨胞需谨防上当
查看>>
JVM新生代和老年代配置原则
查看>>
昆明滇池水质达30年来最好 百名“市民河长”守卫“母亲河”
查看>>
太合音乐发布“少年红星音乐计划” 力促00后创作浪潮
查看>>
ICO后STO也被明令禁止,区块链并不是法外之地
查看>>
浅尝辄止,React是如何工作的
查看>>
ECS vs Kubernetes:相似但是不同
查看>>
6000多套iOS源码大分享
查看>>
一个完整Java Web项目背后的密码
查看>>
前端笔试题面试题记录(上)| 掘金技术征文
查看>>
移动端页面分享快照生成总结
查看>>
收发数据的原理(上)
查看>>
AccessibilityService 从入门到出轨
查看>>
七层网络协议-tcp/ip协议
查看>>
React 学习资源
查看>>