这几天一直想学习个中间件,自己比较熟悉的语言环境下有Kafka和RocketMQ,在之前的工作中有使用Kafka,但那时候项目太急,自己没有深入了解过,而RocketMQ是阿里开源的一款消息中间件,技术沙龙和公众号等也是感觉很活跃,目前自己稍微有点时间,所以计划先了解下Kafka。 先简单打个开发环境,选用虚拟机docker快速部署个。
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
harisekhon/hbase latest 2e0874ffdb84 4 months ago 615MB
zookeeper latest 89f7884dcc4e 5 months ago 148MB
wurstmeister/kafka latest e4c6cedf70f9 7 months ago 312MB
tomcat latest df50c9d355cf 8 months ago 463MB
sheepkiller/kafka-manager latest 4e4a8c5dabab 11 months ago 463MB
# 拉取相关镜像启动
$ docker run --name my-kafka -p 9092:9092 --restart=always -e KAFKA_ADVERTISED_HOST_NAME=kafka01 -e KAFKA_CREATE_TOPICS="test:1:1" -e KAFKA_ZOOKEEPER_CONNECT=10.0.0.72:2181 -d wurstmeister/kafka:latest
$ docker run -it --restart=always --name=my-kafka-manager2 -p 9000:9000 -e ZK_HOSTS="10.0.0.72:2181" -d sheepkiller/kafka-manager
# 启动几个镜像后GUI查看下
一切都貌似正常,赶紧写个demo发个消息试试
@Component("simpleProducer")
public class SimpleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* sendMessage 发送消息
* @param topicName topic名称
* @param data 数据
* @author MZRong
* @date 2019/3/6 15:03
*/
public void sendMessage(String topicName,String data){
LOGGER.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
try {
kafkaTemplate.send(topicName, data);
LOGGER.info("推送数据成功!");
} catch (Exception e) {
LOGGER.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
,topicName,data));
}
}
}
/**
* 通过HTTP发送数据
*/
@RequestMapping("/send")
public String send(String data) {
producer.sendMessage(TOPIC, data);
return "发送数据【" + data + "】成功!";
}
// 我使用idea的HTTP client
GET http://localhost:8099/kafka/send?data="hello kafka"
Accept: */*
Cache-Control: no-cache
控制台狂抛异常,org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE}
参考stackoverflow 解决下问题先。
# 先进入docker容器
$ docker exec -it my-kafka /bin/bash
bash-4.4# cd opt/kafka
bash-4.4# vi config/server.properties
# 取消注释,并填写自己的IP进去
advertised.listeners=PLAINTEXT://10.0.0.72:9092
# 保存并重启Kafka
重新发送消息:
- 2019-03-07 14:37:34.136 INFO 26292 — [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 2019-03-07 14:37:34.141 INFO 26292 — [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d 2019-03-07 14:37:36.811 INFO 26292 — [nio-8099-exec-1] com.sha.kafka.producer.SimpleProducer : 推送数据成功! 2019-03-07 14:38:50.660 INFO 26292 — [nio-8099-exec-3] com.sha.kafka.producer.SimpleProducer : 开始向Kafka推送数据:”hello kafka” 2019-03-07 14:38:50.661 INFO 26292 — [nio-8099-exec-3] com.sha.kafka.producer.SimpleProducer : 推送数据成功!
刚开始以为是docker的镜像版本和客户端不一致,官网也有个说明,服务端的版本不能低于客户端,客户端向下兼容。maven仓库各种搜索和检查,累哭~
消费者就比较好说了,做监听就行
@KafkaListener(id="test",topics={"test"})
public void listen(String data){
System.out.println("SimpleConsumer收到消息:" + data);
LOGGER.info(MessageFormat.format("SimpleConsumer收到消息:{0}", data));
}
2019-03-07 14:37:58.723 INFO 25156 --- [ test-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 1
2019-03-07 14:37:58.723 INFO 25156 --- [ test-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions []
2019-03-07 14:37:58.723 INFO 25156 --- [ test-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: []
2019-03-07 14:37:58.735 INFO 25156 --- [ test-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test-0]
SimpleConsumer收到消息:"hello kafka"
2019-03-07 14:38:50.717 INFO 25156 --- [ test-0-C-1] com.sha.kafka.consumer.SimpleConsumer : SimpleConsumer收到消息:"hello kafka"
收获
再温习docker命令,刚看是以为是端口映射失败,也是愚钝。。。 了解spring boot 发送消息到Kafka。 最后放一张docker命令图
撒花★,°:.☆( ̄▽ ̄)/$:.°★ 。