首先老规矩, 引入maven依赖
12 org.apache.kafka 3kafka-clients 41.0.0 5
关于kafka-clients的消息生产者:
1 @Slf4j 2 public class KafkaProducerClient { 3 4 public static void pushMsg(String msg) throws Exception { 5 Properties props = new Properties(); 6 props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS); 7 props.put("acks", "0"); 8 props.put("retries", 0); 9 props.put("batch.size", 16384);10 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");11 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");12 Producer producer = new KafkaProducer(props);13 ProducerRecordrecord = new ProducerRecord<>(KafkaConstant.KAFKA_TOPIC_NAME, 0, "123", msg);14 producer.send(record, new Callback() {15 @Override16 public void onCompletion(RecordMetadata metadata, Exception e) {17 if (e != null) {18 e.printStackTrace();19 }20 log.info("pushMsg of msg: {}, metadata: {}", msg, metadata);21 }22 });23 producer.close();24 }25 26 }
关于kafka-clients的消息消费者
1 @Slf4j 2 public class KafkaConsumerClient extends Thread { 3 4 private KafkaConsumerClient() { 5 } 6 7 /** 8 * 初始化consumer 9 */10 public void initKafkaConsumer () {11 log.info("init Kafka Consumer");12 new KafkaConsumerClient().start();13 }14 15 @Override16 public void run() {17 Properties props = new Properties();18 19 props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS);20 props.put("group.id", "1");21 props.put("enable.auto.commit", "true");22 props.put("auto.commit.interval.ms", "1000");23 props.put("session.timeout.ms", "30000");24 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");25 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");26 27 Consumerconsumer = new KafkaConsumer<>(props);28 consumer.subscribe(Arrays.asList(KafkaConstant.KAFKA_TOPIC_NAME));29 consumer.seekToBeginning(new ArrayList<>());30 31 // ===== 拿到所有的topic ===== //32 Map > listTopics = consumer.listTopics();33 Set >> entries = listTopics.entrySet();34 35 while (true) {36 ConsumerRecords records = consumer.poll(1000 * 60);37 for(ConsumerRecord record : records) {38 System.out.println("[fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value() + "]");39 }40 }41 }42 }
我们需要在项目启动的时候将消费者consumer启动起来
1
然后调用生产者producer时, 消费者consumer就能拿到消息
如:
1 @Override 2 public void pushMsgById(Long id) throws Exception { 3 User user = new User(); 4 user.setId(id); 5 user.setUsername("test11111111"); 6 user.setPassword("test22222222"); 7 String str = JsonUtil.toCompactJsonString(user); 8 log.info("pushMsgById is user: {}", str); 9 KafkaProducerClient.pushMsg(str);10 }
得到结果: