Kafka简介及使用PHP处理Kafka消息

news/2025/1/15 22:59:08/

Kafka简介及使用PHP处理Kafka消息

Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。

 

Kafka的特点:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
  • 支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。
  • 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
  • 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
  • 同时支持离线数据处理和实时数据处理。

Kafka的架构:

Kafka简介及使用PHP处理Kafka消息-kafka架构图

 

Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。

 

Kafka基本概念:

  • Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
  • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  • Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
  • Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
  • Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。

 

Kafka消息发送的流程:

Kafka简介及使用PHP处理Kafka消息-Kafka消息发送

 

下面是PHP生产、消费Kafka消息的例子(假设已经配置好Kafka):

1.从zookeeper源码src/c/src安装zookeeper c client

 
cd zookeeper-3.4.8/src/c./configuremake && make install


2.编译php libzookper扩展

 

git clone https://github.com/Timandes/libzookeeper.gitcd libzookeeperphpize./configure--with-libzookeeper=/usr/local/bin/cli_mtmake && makeinstall

 

3.编译php zookeeper扩展

git clone https://github.com/andreiz/php-zookeeper.gitcd php-zookeeperphpize./configuremake && make install

4.修改php.ini配置,添加libzookeeper和php-zookeeper扩展

extension=libzookeeper.soextension=zookeeper.so

PHP处理Kafka消息

1.启动zookeeper和kafka

./bin/zookeeper-server-start.sh config/zookeeper.properties./bin/kafka-server-start.sh config/server.properties

2.创建由2个partition组成的、名为testtopic的topic

kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor --partitions --topic testtopic


3.composer安装nmred/kafka-php

1composer require "nmred/kafka-php"

4.producer.php代码

 
  1.  

    <php require_once('./vendor/autoload.php'); $produce=/Kafka/Produce::getInstance('localhost:2181',3000); $produce->setRequireAck(-1); $topicName='testtopic';//获取到topic下可用的partitions$partitions=$produce->getAvailablePartitions($topicName);$partitionCount=count($partitions); $count=1;//可以处理的消费者数量(可以理解为server数量)while(true){    $message=json_encode(array('uid'=>$count,'age'=>$count%100,'datetime'=>date('Y-m-d H:i:s')));     //发送消息到不同的partition   $partitionId=$count%$partitionCount;    $produce->setMessages('testtopic',$partitionId,array($message));   $result=$produce->send();    var_dump($result);     $count++;   echo"producer sleeping/n";   sleep(1);}

     

5、consumer.php代码

<?php require_once('./vendor/autoload.php'); //获取需要处理的partitionId$partitionId = isset($argv[1]) ? intval($argv[1]) :0; $consumer =/Kafka/Consumer::getInstance('localhost:2181'); $consumer->setGroup('test-consumer-group');$consumer->setPartition('testtopic', $partitionId);$consumer->setFromOffset(true);$consumer->setMaxBytes(102400); while(true){    $topic = $consumer->fetch();     foreach ($topic as $topicName => $partition{        foreach ($partition as $partId => $messageSet{            foreach ($messageSet as $message){                var_dump($message);           }        }    }    echo"consumer sleeping/n";   sleep(1);}

 

6、在3个终端界面分别运行

7、两个consumer脚本依次收到producer发送的消息

php-kafka-consumer-output

 


http://www.ppmy.cn/news/610553.html

相关文章

ecplise 设置代码自动提示功能的设置

看下面的图把 1 然后找到java 第三步 然后输入 abcdefghijklmnopqrstuvwxyz. 下面看下效果 这个是提示&#xff0c;很方便把

MySQL开发医药管理系统_java Web开发医药后台管理系统mysql版本源代码下载,支持中英文...

package com.lyq.dao;import com.lyq.persistence.Medicine;import com.lyq.util.HibernateFilter;/*** 药品数据库操作类** author Li Yong Qiang*/public class MedicineDao extends SupperDao {/*** 查询药品信息** param id* return Medicine*/public Medicine loadMedicin…

图形数据标准化

图形数据标准化 AWS&#xff0c;Google&#xff0c;Neo4j&#xff0c;Oracle。这些只是在W3C关于图形数据的Web标准化的W3C研讨会上代表的一些供应商&#xff0c;内容必然会促进数据管理中最热门的部分&#xff1a;Graph的采用。 让许多供应商互相交谈&#xff0c;更不用说团结…

机器学习——决策树1(三种算法)

要开始了…内心还是有些复杂的 因为涉及到熵…单纯的熵&#xff0c;可以单纯 复杂的熵&#xff0c;如何能通俗理解呢… 我也没有底气&#xff0c;且写且思考吧 1. 决策树分类思想 首先&#xff0c;决策树的思想&#xff0c;有点儿像KNN里的KD树。 KNN里的KD树&#xff0c;是每…

Java设计模式:观察者模式

观察者模式 观察者模式又称为发布/订阅(Publish/Subscribe)模式 在对象之间定义了一对多的依赖&#xff0c;这样一来&#xff0c;当一个对象改变状态&#xff0c;依赖它的对象会收到通知并自动更新。 如果这句话不好理解 可以这样理解&#xff0c; 微信公众号发布消息&…

Centos7下安装MongoDB

简介 MongoDB 是一个基于分布式 文件存储的NoSQL数据库由C语言编写&#xff0c;运行稳定&#xff0c;性能高旨在为 WEB 应用提供可扩展的高性能数据存储解决方案查看官方网站 MongoDB特点 模式自由 :可以把不同结构的文档存储在同一个数据库里面向集合的存储&#xff1a;适合…

开源组件websocket-sharp中基于webapi的httpserver使用体验

一、背景 因为需要做金蝶ERP的二次开发&#xff0c;金蝶ERP的开放性真是不错&#xff0c;但是二次开发金蝶一般使用引用BOS.dll的方式&#xff0c;这个dll对newtonsoft.json.dll这个库是强引用&#xff0c;必须要用4.0版本&#xff0c;而asp.net mvc的webapi client对newtonsof…

mysql 5.7 1054_MySQL5.7更改密码时出现ERROR 1054 (42S22)的解决方法

MySQL5.7更改密码时出现ERROR 1054 (42S22)的解决方法发布时间&#xff1a;2020-10-14 16:01:38来源&#xff1a;脚本之家阅读&#xff1a;81作者&#xff1a;剑侠365新安装的MySQL5.7&#xff0c;登录时提示密码错误&#xff0c;安装的时候并没有更改密码&#xff0c;后来通过…