我试图在一个mesos集群上的docker容器上部署一个kafka代理。
具体来说,我有一个mesos集群,我使用marathon作为init系统部署各种docker容器。 所有容器都有服务端口,可通过代理(HAproxy)访问。
问题
当我使用marathon部署一个kafka容器时,我可以创build一个主题,列出所有主题,但不能运行produce / consume命令。 产生命令给我以下错误
[2016-01-18 11:10:09,926] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
我使用的docker图像是spotify / kafka它有zookeeper和kafka预装。 当我用docker run命令运行它时,这个镜像运行良好。
我正在使用下面的马拉松json文件来部署容器:
{ "id": "spotify-kafka.marathon", "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; env; supervisord -n", "container": { "type": "DOCKER", "docker": { "image": "spotify/kafka", "network": "BRIDGE", "portMappings": [ {"containerPort": 2181, "hostPort": 0, "servicePort": 20000}, {"containerPort": 9092, "hostPort": 0, "servicePort": 20500} ] } }, "cpus": 0.5, "mem": 1024.0, "instances": 1 }
cmd导出一些设置内部主机ip和端口的env vars。 外部端口是随机的,并被HAproxy捕获,将它们路由到静态的端口。
我用来和kafka交谈的命令来自文档:
https://kafka.apache.org/documentation.html#quickstart
我还使用了其他图像,如ches / kafka,wurstmeister / kafka和我自己创build的一个。 我也发现了https://github.com/mesos/kafka ,在build立之后,您可以发送命令到端口7000并将代理部署到集群,这对我来说是失败的。 理想情况下,我想要一个已经有动物园pipe理员和卡夫卡像一个spotify图像的图像。
更新1
所以我改变了马拉松JSON文件,并导出了一些似乎需要的variables。 最终的JSON如下所示
{ "id": "spotify-kafka.marathon", "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; export PORT_9092=9092; export PORT=2181; export PORT0=2181; export PORT1=9092; export PORT_2181=2181 ; env; supervisord -n", "container": { "type": "DOCKER", "docker": { "image": "192.168.1.235:5000/spotify-kafka", "network": "BRIDGE", "portMappings": [ {"containerPort": 2181, "hostPort": 0, "servicePort": 20000}, {"containerPort": 9092, "hostPort": 0, "servicePort": 20500} ] } }, "cpus": 0.5, "mem": 1024.0, "instances": 1 }
当我试图产生一个消息时,这个改变给了我不同的结果。
[2016-01-19 11:02:09,297] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,309] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,310] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2016-01-19 11:02:09,416] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,422] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,422] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2016-01-19 11:02:09,528] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,533] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,533] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2016-01-19 11:02:09,639] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,644] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,644] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2016-01-19 11:02:09,750] WARN Error while fetching metadata [{TopicMetadata for topic test -> No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-01-19 11:02:09,750] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2016-01-19 11:02:09,751] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
更新2 – 解决scheme
所以在网上挖掘,我发现这个存储库https://github.com/tobilg/docker-kafka-marathon/这个家伙创build了一个shell脚本,它会自动为您创build属性文件。 你也可以扩展这个容器,并有多个kafka代理实例。 我唯一的缺点是它依赖于一个外部的zookeeper服务器,但我不认为这将是一个问题解决在图像中安装它。
所以我把这个标记为已解决。