首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > Apache >

【Apache Kafka】代码范例

2013-10-13 
【Apache Kafka】代码实例前提:已经配置好kafka。若未安装,可以参照【Apache Kafka】安装升级指南已在eclipse里

【Apache Kafka】代码实例

前提:

已经配置好kafka。若未安装,可以参照【Apache Kafka】安装升级指南已在eclipse里面安装scala插件。Eclipse Kepler中在Help->Eclipse Markectplace中搜索Scalar,然后安装即可。使用maven构建kafka测试project在eclipse中。创建topic:在kafka的安装目录下执行bin/kafka-create-topic.sh --zookeeper 192.168.20.99:2181 --replica 1 --partition 1 --topic test启动consumer:在kafka的安装目录下执行bin/kafka-console-consumer.sh --zookeeper 192.168.20.99:2181 --topic test --from-beginning

pom.xml文件如下:

所有kafka依赖的jar包都在com.sksamuel.kafka下面。其中kafka使用的版本是0.8.0-beta1,kafka是2.10。

package com.iflytek.cpcloud.kafka.kafkatest;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class ConsumerTest extends Thread {private final ConsumerConnector consumer;private final String topic;public static void main(String[] args) {ConsumerTest consumerThread = new ConsumerTest("test");consumerThread.start();}public ConsumerTest(String topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());this.topic = topic;}private static ConsumerConfig createConsumerConfig() {Properties props = new Properties();props.put("zookeeper.connect", "192.168.20.99:2181");props.put("group.id", "0");props.put("zookeeper.session.timeout.ms", "400000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public void run() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext())System.out.println(new String(it.next().message()));}}

在kafka-console-producer端输入的数据会回显到eclipse的console中。

以上程序参考kafka-0.8.0-bata1中的example。


热点排行