Docker运行kafka

我在使用Docker运行Kafka时一直不能将消息发送成功,通过修改docker镜像最终解决,记录下。。。

Posted by Rong on March 7, 2019

这几天一直想学习个中间件,自己比较熟悉的语言环境下有Kafka和RocketMQ,在之前的工作中有使用Kafka,但那时候项目太急,自己没有深入了解过,而RocketMQ是阿里开源的一款消息中间件,技术沙龙和公众号等也是感觉很活跃,目前自己稍微有点时间,所以计划先了解下Kafka。 Kafka官方图 先简单打个开发环境,选用虚拟机docker快速部署个。

$ docker images          
REPOSITORY                  TAG                 IMAGE ID            CREATED             SIZE
harisekhon/hbase            latest              2e0874ffdb84        4 months ago        615MB
zookeeper                   latest              89f7884dcc4e        5 months ago        148MB
wurstmeister/kafka          latest              e4c6cedf70f9        7 months ago        312MB
tomcat                      latest              df50c9d355cf        8 months ago        463MB
sheepkiller/kafka-manager   latest              4e4a8c5dabab        11 months ago       463MB
# 拉取相关镜像启动
$ docker run --name my-kafka -p 9092:9092 --restart=always -e KAFKA_ADVERTISED_HOST_NAME=kafka01 -e KAFKA_CREATE_TOPICS="test:1:1" -e KAFKA_ZOOKEEPER_CONNECT=10.0.0.72:2181 -d wurstmeister/kafka:latest

$ docker run -it --restart=always --name=my-kafka-manager2 -p 9000:9000 -e ZK_HOSTS="10.0.0.72:2181" -d sheepkiller/kafka-manager

# 启动几个镜像后GUI查看下

kafka-manager 一切都貌似正常,赶紧写个demo发个消息试试


@Component("simpleProducer")
public class SimpleProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    /**
     * sendMessage 发送消息
     * @param topicName topic名称
     * @param data 数据
     * @author MZRong
     * @date 2019/3/6 15:03
     */
    public void sendMessage(String topicName,String data){
        LOGGER.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
        try {
            kafkaTemplate.send(topicName, data);

            LOGGER.info("推送数据成功!");
        } catch (Exception e) {
            LOGGER.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
                    ,topicName,data));
        }
    }
}

/**
 * 通过HTTP发送数据 
 */
     @RequestMapping("/send")
    public String send(String data) {
        producer.sendMessage(TOPIC, data);
        return "发送数据【" + data + "】成功!";
    }
// 我使用idea的HTTP client
GET http://localhost:8099/kafka/send?data="hello kafka"
Accept: */*
Cache-Control: no-cache

控制台狂抛异常,org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE}
参考stackoverflow 解决下问题先。

# 先进入docker容器
$ docker exec -it my-kafka /bin/bash
bash-4.4# cd opt/kafka
bash-4.4# vi config/server.properties
# 取消注释,并填写自己的IP进去
advertised.listeners=PLAINTEXT://10.0.0.72:9092 
# 保存并重启Kafka

重新发送消息:

  • 2019-03-07 14:37:34.136 INFO 26292 — [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 2019-03-07 14:37:34.141 INFO 26292 — [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d 2019-03-07 14:37:36.811 INFO 26292 — [nio-8099-exec-1] com.sha.kafka.producer.SimpleProducer : 推送数据成功! 2019-03-07 14:38:50.660 INFO 26292 — [nio-8099-exec-3] com.sha.kafka.producer.SimpleProducer : 开始向Kafka推送数据:”hello kafka” 2019-03-07 14:38:50.661 INFO 26292 — [nio-8099-exec-3] com.sha.kafka.producer.SimpleProducer : 推送数据成功!

刚开始以为是docker的镜像版本和客户端不一致,官网也有个说明,服务端的版本不能低于客户端,客户端向下兼容。maven仓库各种搜索和检查,累哭~

消费者就比较好说了,做监听就行

    @KafkaListener(id="test",topics={"test"})
    public void listen(String data){
        System.out.println("SimpleConsumer收到消息:" + data);
        LOGGER.info(MessageFormat.format("SimpleConsumer收到消息:{0}", data));
    }
2019-03-07 14:37:58.723  INFO 25156 --- [     test-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 1
2019-03-07 14:37:58.723  INFO 25156 --- [     test-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions []
2019-03-07 14:37:58.723  INFO 25156 --- [     test-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: []
2019-03-07 14:37:58.735  INFO 25156 --- [     test-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-0]
SimpleConsumer收到消息:"hello kafka"
2019-03-07 14:38:50.717  INFO 25156 --- [     test-0-C-1] com.sha.kafka.consumer.SimpleConsumer    : SimpleConsumer收到消息:"hello kafka"

收获

再温习docker命令,刚看是以为是端口映射失败,也是愚钝。。。 了解spring boot 发送消息到Kafka。 最后放一张docker命令图

docker常用图 docker常用命令

撒花★,°:.☆( ̄▽ ̄)/$:.°★