消息中间件Kafka快速入门

news/2024/11/25 17:34:40/

前言

Kafka是基于zookeeper管理的,所以要先安装zookeeper,如果是单机模式,zookeeper安装比较简单,本文就介绍一下单机如何搭建kafka,以及基本的java demo。

环境搭建

Zookeeper 安装

http://mirrors.cnnic.cn/apache/zookeeper/ 下载zookeeper
解压安装
解压tar –zxvf
修改zookeeper config 把cfg改成 zoo.cfg 单机zookeeper不需要配置
启动

bin/zkServer.sh start

kafka安装

下载 http://kafka.apache.org/downloads.html
解压安装
解压tar –zxvf
启动

 sh kafka-server-start.sh ../config/server.properties &

没有异常就算是成功的了。

消息测试

打开2个crt客户端,一个做producer,一个做consumer,在producer里面发送消息,就可以立即在consumer里面接收到。

1.生产消息producer

 Sh kafka-console-producer.sh --broker-list  127.0.0.1:9092  --topic test

2.消费消息方consumer

 Sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning          

3.创建topic命令

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic message

对于每一个topic都会在/tmp/kafka-logs/ 生成一个文

4.获取topic 列表

sh kafka-topics.sh --list --zookeeper localhost:2181 test

Java Demo

需要依赖zookeeper和kafka的jar

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.9.0.0</version>
</dependency>
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.7</version></dependency>

发送消息

public class KafKaProducer {//zookeeper 地址public static  String zookeeperConnect = "10.101.14.230:2181";public static String topicName = "message"; //topic nameprivate static Producer createProducer() {Properties properties = new Properties();properties.put("zookeeper.connect", zookeeperConnect);// 声明zkproperties.put("serializer.class", "kafka.serializer.StringEncoder"); //配置value的序列化类properties.put("key.serializer.class", "kafka.serializer.StringEncoder");     //配置key的序列化类properties.put("metadata.broker.list", "10.101.14.230:9092");// 声明kafkareturn new Producer<Integer, String>(new ProducerConfig(properties));}public static void main(String[] args) {try {Producer producer = createProducer();producer.send(new KeyedMessage<Integer, String>(topicName, "message:hello"));TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}
}

接收消息

注意接收消息,是客户端主动去pull,没有消息时就会阻塞

public class KafkaConsumer {public static  String zookeeperConnect = "10.101.14.230:2181";private final ConsumerConnector consumer;private KafkaConsumer() {Properties props = new Properties();props.put("zookeeper.connect", zookeeperConnect);//声明zkprops.put("group.id", "jd-group2");   //group 代表一个消费组props.put("zookeeper.session.timeout.ms", "4000");  //zk连接超时props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest");props.put("serializer.class", "kafka.serializer.StringEncoder");//序列化类ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafKaProducer.topicName, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);KafkaStream<String, String> stream = consumerMap.get(KafKaProducer.topicName).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()){//该地方没有消息时会阻塞System.out.printf("接受到信息:");System.out.println(it.next().message());}}public static void main(String[] args) {//更好的方法是启动一个线程new KafkaConsumer().consume();}
}

可以看到结果,只要新建一个group,都会把历史消息也接受到

遇到的问题

用java 发送的遇到了这样的一个问题,网上搜了很久,看到说只要是在服务器上安装都有这样的问题,本地的服务就没有问题,按照帖子的方法修改kafka服务器的host.name也没解决。后来在使用hbase的时候,发现也是链接不上,然后 Debug hbase的时候发现,连接zookeeper里面有这样一段代码。

InetSocketAddress remoteAddr=new InetSocketAddress("e010101014230.zmf",37556);
if(remoteAddr.isUnresolved()){ // 表示链接不上。System.out.print("isUnresolved");
}else{System.out.print("resolved");
}
properties.put("zookeeper.connect",10.101.14.230:2181);// 声明zk
properties.put("zookeeper.connect",” e010101014230.zmf:2181);//机器名称

我就感觉zookeeper 的作用在hbase 和 kafka 是类似的,所以两个应该是同一个问题,果然debug了一遍就解决了。不管你在开始设置zookeeper地址的时候是用ip地址,还是机器名称,到了InetSocketAddress这里他都会转换成机器名称,所以就链接不上。解决办法就是:在hosts文件里加上 10.101.14.230 e010101014230.zmf

kafka原理

kafka原理中最重要的一点是,他的消息推送,是消费者主动pull的,这样实现起来相对简单,而且还有个好处就是,消费者按需按能力消费,消费服务器不会有积压问题。

Broker

一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Partition

就是分块的意思,一个topic可以分为多个 partition,可以分布到不同的机子上。同一个topic的partition在一台机子上只能有一个。

Replication

Partition的多个备份

Topic

话题,队列,消费

Producer

Producer 客户端通过zookeeper 获取到所有broker,可以动态更新
Producer直接通过socket发送消息到broker
消息被路由到哪个partition上,有producer客户端决定
Consumer与topic关系以及机制
Consumer是主动pull topic,没有消息时会阻塞。
每个consumer属于一个consumer group,对于同一条消息一个group只接收到一次。
一个group 有多个consumer,那么topic就会负载均衡的发送到每个consumer里面去。
如果所有的consumer都是属于不同的group,那所有的人都会接收到消息

Zookeeper 的作用

Producer端使用zookeeper用来"发现"broker列表, 以及和Topic下每个partition leader建立socket连接并发送消息.(如果partition失效了,就接收到消息)
2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.

  1. Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.

总结

这个教程比较简单,适合刚刚使用过消息队列,还不了解其原理的人看看。也提供了简单的java demo,对理解复杂的notify 框架还是挺有帮助的。

推荐2篇比较好的文章:
[1] http://blog.csdn.net/derekjiang/article/details/9053863/
[2] http://blog.csdn.net/hmsiwtv/article/details/46960053


http://www.ppmy.cn/news/6082.html

相关文章

Python 函数用法和底层分析

【无限嚣张&#xff08;菜菜&#xff09;】&#xff1a;hello您好&#xff0c;我是菜菜&#xff0c;很高兴您能来访我的博客&#xff0c;我是一名爱好编程学习研究的菜菜&#xff0c;每天分享自己的学习&#xff0c;想法&#xff0c;博客来源与自己的学习项目以及编程中遇到问题…

详细设计说明书(GB8567——88)基于协同的在线表格forture-sheet

详细设计说明书 1引言 1.1编写目的 该文档在概要设计的基础上&#xff0c;进一步的细化系统结构&#xff0c;展示了软件结构的图标&#xff0c;物理设计、数据结构设计、及算法设计、详细的介绍了系统各个模块是如何实现的&#xff0c;包括涉及到的算法&#xff0c;逻辑流程…

Redis常见面试题(七)

目录 1、什么是缓存预热? 2、什么是缓存热备? 3、什么是缓存雪崩? 4、如何解决缓存雪崩? 5、什么是缓存穿透? 6、如何解决缓存穿透? 7、什么是缓存击穿? 8、如何解决缓存击穿? 9、什么是缓存抖动? 10、如何解决缓存抖动? 11、什么是缓存无底洞? 12、如何…

Vue基础8之Vue组件化编程、非单文件组件与单文件组件

Vue基础8Vue组件化编程对组件的理解一些概念的理解非单文件组件基本使用几个注意点组件的嵌套VueComponent一个重要的内置关系先导篇&#xff1a;原型对象正文&#xff08;可以理解为类的继承&#xff09;单文件组件Vue组件化编程 对组件的理解 传统方式&#xff1a; 使用组…

路由器的工作原理(计算机网络-网络层)

目录 路由器的构成 转发和路由选择的区别 典型的路由器结构 交换结构 输出端口 路由器与交换机的比较 两种基于存储转发的分组交换设备的比较 交换机和路由器各有的应用场合 三层交换机 三层交换机的应用 路由器的构成 路由器的任务 路由器是一种具有多个输入端口和多…

Vue + Element-ui实现后台管理系统---项目搭建 + ⾸⻚布局实现

目录&#xff1a;导读 项目搭建 ⾸⻚布局实现 一、项目搭建 1、环境搭建 2、项目初期搭建 二、Main.vue 三、左侧栏部分(CommonAside.vue) 四、header部分(CommonHeader.vue) 五、Home.vue 写在最后 项目搭建 ⾸⻚布局实现 这篇主要讲解 项目搭建 后台⾸⻚布局实现…

Sentinel

Sentinel—高可用流量管理框架/服务容错组件 一.为什么要用Sentinel? 1.微服务架构中当某服务挂掉的时候常见的原因有哪些&#xff1f; 1.异常没处理 比如DB连接失败&#xff0c;文件读取失败等 2.突然的流量激增 比如&#xff1a;用户经常会在京东、淘宝、天猫、拼多多…

Android -- 每日一问:如何实现自定义View?

经典回答 回忆一下&#xff0c;你去面试时常被问到的自定义 View 方面的问题是那些。有没有&#xff1a; invalidate 和 postInvalidate 方法的区别&#xff1f;自定义 View 的绘制流程&#xff1f;View 的 Touch 事件分发流程&#xff1f; 因为在实际的工作中并不是每个人都…