ubuntu24.04部署单节点kafka_2.13-3.8.1

server/2024/12/16 4:34:36/

ubuntu24.04部署单节点kafka_2.13-3.8.1

下载地址推荐使用清华镜像源下载
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.8.1/kafka_2.13-3.8.1.tgz

部署kafka部署

# 解压kafka压缩包
sudo tar -zxvf kafka_2.13-3.8.1.tgz -C /usr/local/# 改变权限及所有权
sudo chown -R hadoop:hadoopeco kafka_2.13-3.8.1/# 启动zookeeper
/usr/local/kafka_2.13-3.8.1$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties# 启动kafka
./bin/kafka-server-start.sh ./config/server.properties &# kafka操作
./bin/kafka-topics.sh --bootstrap-server lh007:9092 --list./bin/kafka-topics.sh --bootstrap-server lh007:9092 --create --replication-factor 1 --partitions 1 --topic test./bin/kafka-console-producer.sh --bootstrap-server lh007:9092 --topic test./bin/kafka-console-consumer.sh --bootstrap-server lh007:9092 --topic test

设置一键启动脚本

#!/bin/bash# 定义Kafka安装目录
KAFKA_HOME="/usr/local/kafka_2.13-3.8.1"
PID_FILE="kafka_pids.txt"# 函数:检查命令执行是否成功
check_command() {if [ $? -ne 0 ]; thenecho "Error: Failed to execute the last command."exit 1fi
}# 函数:启动Zookeeper,并等待其准备就绪
start_zookeeper() {echo "Starting Zookeeper..."$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties &ZOOKEEPER_PID=$!# 等待几秒钟以确保Zookeeper已经启动sleep 5# 检查Zookeeper进程是否存在if ! ps -p $ZOOKEEPER_PID > /dev/null; thenecho "Error: Zookeeper failed to start."exit 1elseecho "Zookeeper started with PID: $ZOOKEEPER_PID"fi
}# 函数:启动Kafka,并等待其准备就绪
start_kafka() {echo "Starting Kafka..."$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &KAFKA_PID=$!# 等待几秒钟以确保Kafka已经启动sleep 5# 检查Kafka进程是否存在if ! ps -p $KAFKA_PID > /dev/null; thenecho "Error: Kafka broker failed to start."exit 1elseecho "Kafka broker has been started with PID: $KAFKA_PID"fi
}# 函数:停止Zookeeper和Kafka
stop_services() {if [ -f "$PID_FILE" ]; thenwhile IFS= read -r line; dokill -9 "$line" 2>/dev/null && echo "Stopped service with PID: $line"done < "$PID_FILE"rm -f "$PID_FILE"echo "All services have been stopped."elseecho "No PID file found, services may not be running."fi
}# 主逻辑
if [ $# -eq 0 ]; thenACTION="start"
elseACTION="$1"
ficase "$ACTION" instart)start_zookeeperstart_kafka# 保存PIDs到文件,以便以后可以停止服务echo $ZOOKEEPER_PID > "$PID_FILE"echo $KAFKA_PID >> "$PID_FILE"echo "PIDs have been saved to $PID_FILE for stopping services later."check_commandecho "All services have been started successfully.";;stop)stop_services;;*)echo "Usage: $0 {start|stop}"exit 1;;
esac

操作日志


hadoop@lh007:~/scripts$ ./kafka_control.sh start
Starting Zookeeper...
[2024-12-11 09:23:54,782] INFO Reading configuration from: /usr/local/kafka_2.13-3.8.1/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,788] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,788] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,789] INFO observerMasterPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,789] INFO metricsProvider.className is org.apache.zookeeper.metrics.impl.DefaultMetricsProvider (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,792] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2024-12-11 09:23:54,792] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2024-12-11 09:23:54,792] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2024-12-11 09:23:54,792] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2024-12-11 09:23:54,794] INFO Log4j 1.2 jmx support not found; jmx disabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2024-12-11 09:23:54,795] INFO Reading configuration from: /usr/local/kafka_2.13-3.8.1/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,796] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,796] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,796] INFO observerMasterPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,796] INFO metricsProvider.className is org.apache.zookeeper.metrics.impl.DefaultMetricsProvider (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-12-11 09:23:54,796] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2024-12-11 09:23:54,815] INFO ServerMetrics initialized with provider org.apache.zookeeper.metrics.impl.DefaultMetricsProvider@4590c9c3 (org.apache.zookeeper.server.ServerMetrics)
[2024-12-11 09:23:54,818] INFO ACL digest algorithm is: SHA1 (org.apache.zookeeper.server.auth.DigestAuthenticationProvider)
[2024-12-11 09:23:54,818] INFO zookeeper.DigestAuthenticationProvider.enabled = true (org.apache.zookeeper.server.auth.DigestAuthenticationProvider)
[2024-12-11 09:23:54,822] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2024-12-11 09:23:54,835] INFO  (org.apache.zookeeper.server.ZooKeeperServer)
[2024-12-11 09:23:54,836] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2024-12-11 09:23:54,836] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2024-12-11 09:23:54,836] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2024-12-11 09:23:54,836] INFO    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2024-12-11 09:23:54,836] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2024-12-11 09:23:54,836] INFO  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)epoch 0, of which 47 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
Kafka broker has been started with PID: 199769
PIDs have been saved to kafka_pids.txt for stopping services later.
All services have been started successfully.
hadoop@lh007:~/scripts$ jps
200310 Jps
199769 Kafka
199295 QuorumPeerMain
hadoop@lh007:~/scripts$ cat kafka_control.sh
#!/bin/bash

操作脚本

hadoop@lh007:~/scripts$ cat my_kafka_options_en.sh
#!/bin/bash# 定义Kafka安装目录和其他常量
KAFKA_HOME="/usr/local/kafka_2.13-3.8.1"
BOOTSTRAP_SERVER="lh007:9092"# 函数:显示帮助信息
show_help() {echo "Usage: $0 {list|create|produce|consume} [options]"echo "Commands:"echo "  list       List all topics."echo "  create     Create a new topic with specified replication factor and partitions."echo "  produce    Start a console producer to send messages to the specified topic."echo "  consume    Start a console consumer to receive messages from the specified topic."echo "Options for 'create':"echo "  --replication-factor <value>"echo "  --partitions <value>"echo "  --topic <name>"echo "Options for 'produce' and 'consume':"echo "  --topic <name>"
}# 主逻辑
case "$1" inlist)shift$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --list;;create)shiftif [ $# -lt 6 ]; thenecho "Error: Not enough arguments for creating a topic."show_helpexit 1fiwhile [[ $# -gt 0 ]]; docase "$1" in--replication-factor)REPLICATION_FACTOR="$2"shift 2;;--partitions)PARTITIONS="$2"shift 2;;--topic)TOPIC_NAME="$2"shift 2;;*)echo "Unknown option: $1"show_helpexit 1;;esacdone$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --create --replication-factor $REPLICATION_FACTOR --partitions $PARTITIONS --topic $TOPIC_NAME;;produce)shiftwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC_NAME="$2"shift 2;;*)echo "Unknown option: $1"show_helpexit 1;;esacdoneif [ -z "$TOPIC_NAME" ]; thenecho "Error: Topic name is required for producing messages."show_helpexit 1fi$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC_NAME;;consume)shiftwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC_NAME="$2"shift 2;;*)echo "Unknown option: $1"show_helpexit 1;;esacdoneif [ -z "$TOPIC_NAME" ]; thenecho "Error: Topic name is required for consuming messages."show_helpexit 1fi$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC_NAME --from-beginning;;*)show_helpexit 1;;
esac

中文版脚本

hadoop@lh007:~/scripts$ cat my_kafka_options_zn.sh
#!/bin/bash# 定义Kafka安装目录和其他常量
KAFKA_HOME="/usr/local/kafka_2.13-3.8.1"
BOOTSTRAP_SERVER="lh007:9092"# 函数:显示帮助信息(中文)
show_help() {echo "用法: $0 {list|create|produce|consume} [选项]"echo "命令:"echo "  list       列出所有主题."echo "  create     创建一个新主题,并指定复制因子和分区数."echo "  produce    启动控制台生产者,向指定主题发送消息."echo "  consume    启动控制台消费者,从指定主题接收消息."echo "创建主题的选项:"echo "  --replication-factor <值>   设置复制因子"echo "  --partitions <值>           设置分区数量"echo "  --topic <名称>              指定主题名称"echo "启动生产和消费的选项:"echo "  --topic <名称>              指定主题名称"
}# 主逻辑
case "$1" inlist)shiftecho "正在列出所有主题..."$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --list;;create)shiftif [ $# -lt 6 ]; thenecho "错误: 创建主题时参数不足."show_helpexit 1fiwhile [[ $# -gt 0 ]]; docase "$1" in--replication-factor)REPLICATION_FACTOR="$2"shift 2;;--partitions)PARTITIONS="$2"shift 2;;--topic)TOPIC_NAME="$2"shift 2;;*)echo "未知选项: $1"show_helpexit 1;;esacdoneecho "正在创建新主题: $TOPIC_NAME, 复制因子: $REPLICATION_FACTOR, 分区数: $PARTITIONS..."$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --create --replication-factor $REPLICATION_FACTOR --partitions $PARTITIONS --topic $TOPIC_NAME;;produce)shiftwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC_NAME="$2"shift 2;;*)echo "未知选项: $1"show_helpexit 1;;esacdoneif [ -z "$TOPIC_NAME" ]; thenecho "错误: 发送消息时必须指定主题名称."show_helpexit 1fiecho "正在启动控制台生产者,向主题 '$TOPIC_NAME' 发送消息..."$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC_NAME;;consume)shiftwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC_NAME="$2"shift 2;;*)echo "未知选项: $1"show_helpexit 1;;esacdoneif [ -z "$TOPIC_NAME" ]; thenecho "错误: 接收消息时必须指定主题名称."show_helpexit 1fiecho "正在启动控制台消费者,从主题 '$TOPIC_NAME' 接收消息..."$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC_NAME --from-beginning;;*)show_helpexit 1;;
esac

欢迎关注、您的支持与回复,是我继续开发更多技术文档分享的动力。欢迎扫描关注WX:夏目的杂货店


http://www.ppmy.cn/server/150529.html

相关文章

【软件工程】第六章·考虑对象(UML、UML在软件开发中的应用、面向对象方法的软件开发)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;软件开发必练内功_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前…

TDengine Flink集成

Flink 集成 TDengine 主要涉及在 Flink 项目中配置与 TDengine 的连接&#xff0c;实现数据的读取和写入。以下是一个详细的指南&#xff0c;介绍如何在 Flink 中集成 TDengine&#xff1a; 一、准备工作 安装并启动 Flink&#xff1a; 下载并解压 Flink 安装包。启动 Flink …

MacOs 日常故障排除troubleshooting

1. 关闭开机自启动 app X macOs 15.1 System settings -> General -> Login Items & Extensions->Open at Login -> Select app X and click -

Nginx 缓存那些事儿:原理、配置和最佳实践

Nginx 缓存那些事儿&#xff1a;原理、配置和最佳实践 在当今的互联网世界&#xff0c;网站的访问量和数据处理量不断攀升&#xff0c;如何确保用户能够快速、稳定地访问我们的网站&#xff0c;已经成为每个运维工程师面临的挑战。幸运的是&#xff0c;Nginx 作为一款高性能的…

C语言:const的用法

有时候我们希望定义这样一种变量&#xff0c;它的值不能被改变&#xff0c;在整个作用域中都保持固定。例如&#xff0c;用一个变量来表示班级的最大人数&#xff0c;或者表示缓冲区的大小。为了满足这一要求&#xff0c;可以使用 const 关键字对变量加以限定&#xff1a; con…

交换瓶子(图论 贪心)

1224. 交换瓶子 - AcWing题库 把每一个瓶子看成一个点&#xff0c;从每个瓶子向他应该在的那个位置的瓶子连一条边 通过这个方式&#xff0c;我们就可以连出n条边 观察可以发现这些图有特点&#xff1a; n个点 连成n条边 因为每个点会指向它应该在的位置的那个点&#xff…

Rust学习路线图

‌Rust是一种现代的系统编程语言&#xff0c;专注于性能、安全性和并发性。它在没有垃圾回收器的情况下实现了这些目标&#xff0c;使其成为许多其他语言不擅长的用例中的有用语言。其语法与C相似&#xff0c;但Rust在保持高性能的同时提供了更好的内存安全性。 获取路线图 你…

小程序-基于java+SSM+Vue的优购电商小程序设计与实现

项目运行 1.运行环境&#xff1a;最好是java jdk 1.8&#xff0c;我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境&#xff1a;IDEA&#xff0c;Eclipse,Myeclipse都可以。推荐IDEA; 3.tomcat环境&#xff1a;Tomcat 7.x,8.x,9.x版本均可 4.硬件环境&#xff1a…