二百六十四、Java——Java采集Kafka主题A的JSON数据,解析成一条条数据,然后写入Kafka主题B中

devtools/2024/9/19 11:47:22/ 标签: java, kafka, 开发语言

一、目的

由于Hive是单机环境,因此庞大的原始JSON数据在Hive中解析的话就太慢了,必须放在Hive之前解析成一个个字段、一条条CSV数据

二、IDEA创建SpringBoot项目

三、项目中各个文件

3.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hurys</groupId><artifactId>hurrys-jw-kafka</artifactId><version>1.0.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

3.2 application.yml

kafka:servers: 192.168.10.12:9092server:port: 9830spring:application:name: jw-kafkakafka:bootstrap-servers: ${kafka.servers}consumer:group-id: jw-kafkakey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

3.3 log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<Configuration status="OFF" monitorInterval="600"><!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL如果查看DEBUG级别日志,需要修改<RollingFile name="RollingFileInfo"> <ThresholdFilter level="INFO">和<root level="DEBUG">--><!--变量配置--><Properties><!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符--><!-- %logger{36} 表示 Logger 名字最长36个字符 --><property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %class{36} %M() @%L - %msg%n"/><!-- 定义日志存储的路径 --><property name="FILE_PATH" value="/home/hurys-log/jw-kafka"/><property name="FILE_DAY_PATH" value="/home/hurys-log/jw-kafka/%d{yyyy-MM}/%d{yyyy-MM-dd}"/></Properties><Appenders><!-- 这个输出到控制台的配置--><Console name="Console" target="SYSTEM_OUT"><!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) --><ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/><!--输出日志的格式--><PatternLayout pattern="${LOG_PATTERN}"/></Console><!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--><RollingFile name="RollingFileInfo" fileName="${FILE_PATH}/info.log"filePattern="${FILE_DAY_PATH}/INFO-%d{yyyy-MM-dd}_%i.log.gz"><!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) --><ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/><!-- 输出格式 --><PatternLayout pattern="${LOG_PATTERN}"/><Policies><!--interval属性用来指定多久滚动一次,默认是1 hour--><TimeBasedTriggeringPolicy modulate="true" interval="1"/><!-- 此处为每个文件大小策略限制,使用它一般会在文件中filePattern采用%i模式 --><SizeBasedTriggeringPolicy size="100MB"/></Policies><!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--><DefaultRolloverStrategy max="20"><!-- 从basePath到日志文件路径%d{yyyy-MM}/%d{yyyy-MM-dd}/INFO-%d{yyyy-MM-dd}_%i.log.gz的maxDepth是3--><Delete basePath="${FILE_PATH}" maxDepth="3"><!-- 这里的age必须和filePattern协调, 后者是精确到dd, 这里就要写成xd, xD就不起作用另外, 数字最好>2, 否则可能造成删除的时候, 最近的文件还处于被占用状态,导致删除不成功!--><IfLastModified age="30d"/></Delete></DefaultRolloverStrategy></RollingFile><!-- 这个会打印出所有的warn及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--><RollingFile name="RollingFileWarn" fileName="${FILE_PATH}/warn.log"filePattern="${FILE_DAY_PATH}/WARN-%d{yyyy-MM-dd}_%i.log.gz"><!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--><ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout pattern="${LOG_PATTERN}"/><Policies><!--interval属性用来指定多久滚动一次,默认是1 hour--><TimeBasedTriggeringPolicy modulate="true" interval="1"/><SizeBasedTriggeringPolicy size="100MB"/></Policies><!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--><DefaultRolloverStrategy max="15"/></RollingFile><!-- 这个会打印出所有的error及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--><RollingFile name="RollingFileError" fileName="${FILE_PATH}/error.log"filePattern="${FILE_DAY_PATH}/ERROR-%d{yyyy-MM-dd}_%i.log.gz"><!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--><ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout pattern="${LOG_PATTERN}"/><Policies><!--interval属性用来指定多久滚动一次,默认是1 hour--><TimeBasedTriggeringPolicy interval="1"/><SizeBasedTriggeringPolicy size="100MB"/></Policies><!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--><DefaultRolloverStrategy max="15"/></RollingFile></Appenders><!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。--><!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效--><loggers><!--过滤掉spring和mybatis的一些无用的DEBUG信息--><logger name="org.mybatis" level="error" additivity="false"><AppenderRef ref="Console"/></logger><!--监控系统信息--><!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。--><Logger name="org.springframework" level="error" additivity="false"><AppenderRef ref="Console"/></Logger><root level="INFO"><appender-ref ref="Console"/><appender-ref ref="RollingFileInfo"/><appender-ref ref="RollingFileWarn"/><appender-ref ref="RollingFileError"/></root></loggers>
</Configuration>

3.4 KafkaConstants

package com.hurys.kafka.constant;public interface KafkaConstants {/*** 静态排队数据*/String TOPIC_INTERNAL_DATA_STATIC_QUEUE = "topic_internal_data_static_queue";/*** 动态排队数据*/String TOPIC_INTERNAL_DATA_DYNAMIC_QUEUE = "topic_internal_data_dynamic_queue";}

3.5 JsonUtil

package com.hurys.kafka.util;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;public class JsonUtil {/*** 将对象转换为 JSON 字符串,不忽略空值字段。** @param object 要序列化的对象* @return 转换后的 JSON 字符串*/public static String objectToJson(Object object) {return JSON.toJSONString(object, SerializerFeature.WriteMapNullValue);}
}

3.6 KafkaApplication

package com.hurys.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}}

3.7 KafkaServiceListener

package com.hurys.kafka.listener;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.hurys.kafka.constant.KafkaConstants;
import com.hurys.kafka.util.JsonUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;/*** kafka消费service** @author wangjing* @Date 2024/09/09*/
@Service
@Log4j2
public class KafkaServiceListener {@Resourceprivate KafkaTemplate kafkaTemplate;// 1、转向比数据@KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_TURN_RATIO)public void processData(String message) {try {JSONObject jsonObject = JSON.parseObject(message);//   System.out.println("原始数据"+JsonUtil.objectToJson(jsonObject));//获取雷达数据String device_no = jsonObject.getString("deviceNo");String source_device_type = jsonObject.getString("sourceDeviceType");String sn = jsonObject.getString("sn");String model =  jsonObject.getString("model");String createTime =  jsonObject.getString("createTime");String create_time = createTime.substring(0,19);JSONObject data = jsonObject.getJSONObject("data");String cycle = data.getString("cycle");String volume_sum = data.getString("volumeSum");String speed_avg = data.getString("speedAvg");String volume_left = data.getString("volumeLeft");String speed_left = data.getString("speedLeft");String volume_straight = data.getString("volumeStraight");String speed_straight = data.getString("speedStraight");String volume_right = data.getString("volumeRight");String speed_right = data.getString("speedRight");String volume_turn = data.getString("volumeTurn");String speed_turn = data.getString("speedTurn");String outputLine = (device_no +","+source_device_type+","+sn+","+model+","+create_time+","+cycle+","+volume_sum+","+speed_avg+","+ volume_left+","+speed_left+","+volume_straight+","+speed_straight+","+volume_right+","+speed_right+","+volume_turn+","+speed_turn);//   System.out.println("outputLine数据1"+outputLine);kafkaTemplate.send("topic_db_data_turn_ratio", outputLine);} catch (Exception e) {log.error("process turn_ratio error", e);}}// 2、静态排队数据@KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_STATIC_QUEUE)public void processData2(String message) {try {JSONObject jsonObject = JSON.parseObject(message);//获取雷达数据String device_no = jsonObject.getString("deviceNo");String source_device_type = jsonObject.getString("sourceDeviceType");String sn = jsonObject.getString("sn");String model =  jsonObject.getString("model");String createTime =  jsonObject.getString("createTime");String create_time = createTime.substring(0,19);JSONObject data = jsonObject.getJSONObject("data");List<JSONObject> queueList = data.getJSONArray("queueList").toJavaList(JSONObject.class);for (JSONObject queueItem:queueList) {String lane_no = queueItem.getString("laneNo");String lane_type = queueItem.getString("laneType");String queue_count = queueItem.getString("queueCount");String queue_len = queueItem.getString("queueLen");String queue_head = queueItem.getString("queueHead");String queue_tail = queueItem.getString("queueTail");String outputLine = ( device_no+","+ source_device_type+","+ sn+","+ model+","+ create_time+","+ lane_no+","+ lane_type+","+ queue_count+","+ queue_len+","+ queue_head+","+queue_tail);System.out.println("outputLine数据2"+outputLine);kafkaTemplate.send("topic_db_data_static_queue", outputLine);}} catch (Exception e) {log.error("process static_queue error", e);}}// 7、区域数据@KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_AREA)public void processData7(String message) {try {JSONObject jsonObject = JSON.parseObject(message);//获取雷达数据String device_no = jsonObject.getString("deviceNo");String source_device_type = jsonObject.getString("sourceDeviceType");String sn = jsonObject.getString("sn");String model =  jsonObject.getString("model");String createTime =  jsonObject.getString("createTime");String create_time = createTime.substring(0,19);JSONObject data = jsonObject.getJSONObject("data");List<JSONObject> areaStatusList = data.getJSONArray("areaStatusList").toJavaList(JSONObject.class);for (JSONObject areaStatus:areaStatusList) {String area_no = areaStatus.getString("areaNo");List<JSONObject> laneStatusList = areaStatus.getJSONArray("laneStatusList").toJavaList(JSONObject.class);for (JSONObject laneItem : laneStatusList) {String lane_no = laneItem.getString("laneNo");String lane_type = laneItem.getString("laneType");String target_count = laneItem.getString("targetCount");String space_occupancy = laneItem.getString("spaceOccupancy");String pareto = laneItem.getString("pareto");String speed_avg = laneItem.getString("speedAvg");String speed_head = laneItem.getString("speedHead");String speed_tail = laneItem.getString("speedTail");String pos_head = laneItem.getString("posHead");String pos_tail = laneItem.getString("posTail");String average_arrival_time = laneItem.getString("averageArrivalTime");String head_position = laneItem.getString("headPosition");String tail_position = laneItem.getString("tailPosition");String outputLine = (device_no + "," + source_device_type + "," + sn+ "," +model+ "," +create_time+ "," + lane_no+ "," + lane_type+ "," + target_count+ "," + space_occupancy+ "," + pareto+ "," + speed_avg+ "," + speed_head+ "," + speed_tail+ "," + pos_head+ "," + pos_tail+ "," + area_no+ "," + average_arrival_time+ "," + head_position+ "," + tail_position);//    System.out.println("outputLine数据7" + outputLine);kafkaTemplate.send("topic_db_data_area", outputLine);}}} catch (Exception e) {log.error("process area error", e);}}// 8、统计数据@KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_STATISTICS)public void processData8(String message) {try {JSONObject jsonObject = JSON.parseObject(message);//获取雷达数据String device_no = jsonObject.getString("deviceNo");String source_device_type = jsonObject.getString("sourceDeviceType");String sn = jsonObject.getString("sn");String model =  jsonObject.getString("model");String createTime =  jsonObject.getString("createTime");String create_time = createTime.substring(0,19);JSONObject data = jsonObject.getJSONObject("data");String cycle =  data.getString("cycle");List<JSONObject> sectionList = data.getJSONArray("sectionList").toJavaList(JSONObject.class);for (JSONObject sectionStatus:sectionList) {String section_no = sectionStatus.getString("sectionNo");List<JSONObject> coilList = sectionStatus.getJSONArray("coilList").toJavaList(JSONObject.class);for (JSONObject coilItem : coilList) {String lane_no = coilItem.getString("laneNo");String lane_type = coilItem.getString("laneType");String coil_no = coilItem.getString("coilNo");String volume_sum = coilItem.getString("volumeSum");String volume_person = coilItem.getString("volumePerson");String volume_car_non = coilItem.getString("volumeCarNon");String volume_car_small = coilItem.getString("volumeCarSmall");String volume_car_middle = coilItem.getString("volumeCarMiddle");String volume_car_big = coilItem.getString("volumeCarBig");String speed_avg = coilItem.getString("speedAvg");String speed_85 = coilItem.getString("speed85");String time_occupancy = coilItem.getString("timeOccupancy");String average_headway = coilItem.getString("averageHeadway");String average_gap = coilItem.getString("averageGap");String outputLine = (device_no + "," + source_device_type + "," + sn+ "," +model+ "," +create_time+ "," + cycle+ "," + lane_no+ "," + lane_type+ "," + section_no+ "," + coil_no+ "," + volume_sum+ "," +  volume_person+ "," + volume_car_non+ "," + volume_car_small+ "," + volume_car_middle+ "," + volume_car_big+ "," + speed_avg+ "," +  speed_85+ "," + time_occupancy+ "," + average_headway+ "," + average_gap);//    System.out.println("outputLine数据8" + outputLine);kafkaTemplate.send("topic_db_data_statistics", outputLine);}}} catch (Exception e) {log.error("process statistics error", e);}}// 9、事件资源@KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_EVENT_RESOURCE)public void processData9(String message) {try {JSONObject jsonObject = JSON.parseObject(message);//获取雷达数据String device_no = jsonObject.getString("deviceNo");String source_device_type = jsonObject.getString("sourceDeviceType");String sn = jsonObject.getString("sn");String model =  jsonObject.getString("model");String createTime =  jsonObject.getString("createTime");String create_time = createTime.substring(0,19);JSONObject data = jsonObject.getJSONObject("data");String event_id = data.getString("eventId");// 获取数组中的第一个元素,并转换为文本JSONArray pictureArray = data.getJSONArray("picture");String picture = pictureArray.getString(0);// 获取数组中的第一个元素// 获取数组中的第一个元素,并转换为文本JSONArray videoArray = data.getJSONArray("video");String video = videoArray.getString(0);// 获取数组中的第一个元素String outputLine = (device_no +","+source_device_type+","+sn+","+model+","+create_time+","+event_id+","+picture+","+ video);//    System.out.println("outputLine数据9" + outputLine);kafkaTemplate.send("topic_db_data_event_resource", outputLine);} catch (Exception e) {log.error("process event_resource error", e);}}// 10、事件数据@KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_EVENT)public void processData10(String message) {try {JSONObject jsonObject = JSON.parseObject(message);//    System.out.println("原始数据"+JsonUtil.objectToJson(jsonObject));//获取雷达数据String device_no = jsonObject.getString("deviceNo");String source_device_type = jsonObject.getString("sourceDeviceType");String sn = jsonObject.getString("sn");String model =  jsonObject.getString("model");String createTime =  jsonObject.getString("createTime");String create_time = createTime.substring(0,19);String event_id = jsonObject.getString("eventId");String event_type = jsonObject.getString("eventType");String state = jsonObject.getString("state");switch (event_type) {case "QueueOverrun":// 1、处理QueueOverrun事件JSONObject data = jsonObject.getJSONObject("data");String station = data.getString("station");String flow = data.getString("flow");List<JSONObject> queueList = data.getJSONArray("queueList").toJavaList(JSONObject.class);for (JSONObject queueItem:queueList) {String lane_no = queueItem.getString("laneNo");String queue_len = queueItem.getString("queueLen");String geography_head = queueItem.getString("geographyHead");String geography_tail = queueItem.getString("geographyTail");String queue_count = queueItem.getString("queueCount");String speed_avg = queueItem.getString("speedAvg");String event_type_detail = null;String area_no = null;String lane_no_original = null;String target_id = null;String target_type = null;String speed = null;String limit_speed = null;String pos_x = null;String pos_y = null;String pos_z = null;String longitude = null;String latitude = null;String altitude = null;String area_num = null;String space_occupancy = null;String congestion_grade = null;String congestion_length = null;String length = null;String width = null;String height = null;String vehicle_type = null;String vehicle_color = null;String plate_type = null;String plate_color = null;String plate_number = null;String outputLine = (device_no+","+ source_device_type+","+sn+","+model+","+ create_time+","+ event_id+","+event_type+","+event_type_detail+","+ state+","+area_no+","+station+","+ flow+","+ lane_no+","+ lane_no_original+","+target_id+","+target_type+","+ queue_len+","+ queue_count+","+ speed+","+ speed_avg+","+limit_speed+","+ pos_x+","+ pos_y+","+ pos_z+","+ geography_head+","+ geography_tail+","+longitude+","+latitude+","+altitude+","+area_num+","+ space_occupancy+","+ congestion_grade+","+ congestion_length+","+length+","+width+","+height+","+ vehicle_type+","+vehicle_color+","+plate_type+","+plate_color+","+ plate_number);//    System.out.println("outputLine数据10"+outputLine);kafkaTemplate.send("topic_db_data_event", outputLine);}break;case "Debris":// 12、处理Debris事件JSONObject data12 = jsonObject.getJSONObject("data");String event_type_detail12 = null;String area_no12 = data12.getString("areaNo");String station12 = data12.getString("station");String flow12 = null;String lane_no12 = null;String lane_no_original12 = null;String target_id12 = null;String target_type12 =null;String queue_len12 = null;String queue_count12 = null;String speed12 = null;String speed_avg12 =null;String limit_speed12 = null;String pos_x12 = data12.getString("posX");String pos_y12 = data12.getString("posY");String pos_z12 = data12.getString("posZ");String geography_head12 = null;String geography_tail12 = null;String longitude12 = data12.getString("longitude");String latitude12 = data12.getString("latitude");String altitude12 = data12.getString("altitude");String area_num12 = null;String space_occupancy12 = null;String congestion_grade12 = null;String congestion_length12 =null;String length12 = data12.getString("length");String width12 = data12.getString("width");String height12 = data12.getString("height");String vehicle_type12 = null;String vehicle_color12 = null;String plate_type12 = null;String plate_color12 = null;String plate_number12 = null;String outputLine12 = (device_no+","+ source_device_type+","+sn+","+model+","+ create_time+","+ event_id+","+event_type+","+event_type_detail12+"," + state+","+area_no12+"," +station12+","+ flow12+","+ lane_no12+","+ lane_no_original12+","+target_id12+","+target_type12+","+ queue_len12+"," + queue_count12+","+ speed12+","+ speed_avg12+"," +limit_speed12+","+ pos_x12+","+ pos_y12+","+ pos_z12+","+ geography_head12+","+ geography_tail12+"," +longitude12+","+latitude12+","+altitude12+","+area_num12+"," + space_occupancy12+","+ congestion_grade12+","+ congestion_length12+","+length12+"," +width12+","+height12+","+ vehicle_type12+","+vehicle_color12+"," +plate_type12+","+plate_color12+","+ plate_number12);//   System.out.println("outputLine数据22"+outputLine12);kafkaTemplate.send("topic_db_data_event", outputLine12);break;default:// 默认处理break;}} catch (Exception e) {log.error("process event error", e);}}}

四、启动KafkaApplication任务,可以打开Kafka主题B的消费窗口进行查看

4.1 启动KafkaApplication任务

4.2 打开Kafka主题B的消费窗口

搞定!!!


http://www.ppmy.cn/devtools/114134.html

相关文章

Xorbits Inference(Xinference):一款性能强大且功能全面的大模型部署与分布式推理框架

大模型部署与分布式推理框架Xinference Xinference的基本使用概述安装启动服务模型部署模型参数配置说明 API接口概述对话接口模型列表嵌入模型Rerank模型使用Xinference SDK使用OpenAI SDK 命令行工具概述启动模型引擎参数其他操作 集成LoRA启动时集成LoRA应用时集成LoRA 部署…

STM32 单片机最小系统全解析

STM32 单片机最小系统全解析 本文详细介绍了 STM32 单片机最小系统&#xff0c;包括其各个组成部分及设计要点与注意事项。STM32 最小系统在嵌入式开发中至关重要&#xff0c;由电源、时钟、复位、调试接口和启动电路等组成。 在电源电路方面&#xff0c;采用 3.3V 直流电源供…

【微服务-注册中心】

注册中心的作用&#xff1a; 微服务将业务拆分成了一个一个服务&#xff0c;当实现一个业务的时需要调用多个服务&#xff0c;那么每个服务的调用都需要知道它的URL。如何更方便的调用&#xff0c;注册中心就出现了。 我们可以把注册中心当作通讯录&#xff0c;通讯录中记录了服…

【C++】猜数字小游戏

写一个简单的C程序&#xff0c;用于实现一个猜数字游戏。 要求&#xff1a; 生成一个1到100之间的随机数&#xff0c;并让玩家猜测这个数是什么。如果玩家的猜测不正确&#xff0c;程序会提示猜测过大或过小&#xff0c;直到玩家猜对为止。 要点&#xff1a; _CRT_SECURE_NO…

[数据集][目标检测]智慧养殖场肉鸡目标检测数据集VOC+YOLO格式3548张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;3548 标注数量(xml文件个数)&#xff1a;3548 标注数量(txt文件个数)&#xff1a;3548 标注…

智能体趋势:未来科技的核心驱动力

随着人工智能&#xff08;AI&#xff09;技术的不断发展&#xff0c;**智能体&#xff08;intelligent agents&#xff09;**逐渐成为当今科技发展的重要趋势。这些智能体不仅仅是软件&#xff0c;它们正在改变我们生活和工作的方式&#xff0c;成为推动科技和社会变革的核心力…

C#中的委托

Action委托 Action 委托是 C# 中预定义的委托类型之一&#xff0c;它是泛型委托 Action<T...> 的非泛型形式&#xff0c;用于表示不接受返回值的方法。Action 委托通常用于需要执行操作但不需要返回结果的场景&#xff0c;比如事件处理、异步编程或回调函数。 Action 委…

QT实现TCP协议

QT中实现服务器原理 QT中实现客户端原理 网络聊天室服务器实现 用QTcpServer服务器类实例化一个服务器对象通过listen&#xff08;&#xff09;函数&#xff0c;监听客户端&#xff0c;监听可以监听指定主机&#xff0c;也可以监听任意主机&#xff0c;监听的端口号&#xff0…

数据库连接池与Druid【后端 16】

数据库连接池与Druid 在现代软件开发中&#xff0c;数据库连接池作为一种关键的技术手段&#xff0c;被广泛用于提升数据库访问的效率和稳定性。本文将深入探讨数据库连接池的概念、常见实现&#xff0c;并重点介绍我国阿里集团开源的数据库连接池——Druid&#xff0c;以及如何…

10款超好用的文档加密软件|企业常用的文档加密软件排行榜

在当今信息化时代&#xff0c;文档加密已成为企业保障数据安全的关键手段之一。无论是保护敏感的财务数据、合同文件&#xff0c;还是防止机密信息泄露&#xff0c;文档加密软件都是企业日常运营不可或缺的工具。本文将介绍10款企业常用的文档加密软件&#xff0c;并为您提供详…

web技术栈总体概念

Web技术栈是指用于开发和运行Web应用程序的一组技术集合&#xff0c;它涵盖了前端、后端、数据库以及相关的开发工具和实用程序等多个方面。具体来说&#xff0c;Web技术栈主要包括以下几个部分&#xff1a; 一、前端技术栈 前端技术栈主要负责构建用户界面和处理用户交互。它…

【TabBar嵌套Navigation案例-复习昨天的内容-预习今天的内容 Objective-C语言】

一、复习与预习 1.我们昨天呢,是从发现开始讲的 发现那个页面,就是一个静态单元格, 点第一个合买,首先,隐藏tabbar, 一开始,是用hideBottomBarOnPush,然后,你需要把你自定义的这个tabbar,加到系统的tabbar里边,然后,再去勾选这个选项,因为那个选项,只是隐藏系统…

C++编程:多线程环境下std::vector内存越界导致的coredump问题分析

文章目录 1. 背景描述2. 问题分析3. 问题复现示例4. 数据竞争&#xff1a;并发访问未加锁的共享数据 5. 解决方案5.1 方法一&#xff1a;提前resize分配足够的内存5.2 方法二&#xff1a;使用同步机制保护共享资源&#xff08;最优解&#xff09; 6. 问题定位总结6.1 内存越界难…

6年前倒闭的机器人独角兽,再次杀入AGV市场

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 在科技创新的浪潮中&#xff0c;一个曾经辉煌又迅速陨落的企业正悄然重生&#xff0c;引发业界广泛关注。 曾经的协作机器人鼻祖Rethink Robotic…

堆的概念与实现

目录 一、堆的介绍 1.堆的概念 2.堆的性质&#xff1a; 3.堆的结构 二、堆的实现 1.堆的定义 2.接口函数 三、堆的实现 1.堆的初始化 2.堆的销毁 3.获取堆顶数据 4.判断堆是否为空 5. 堆的插入 向上调整算法&#xff08;重点&#xff09; 向下调整算法(重点) 6.删除…

el-table多选,分页切换时,选中内容不变;清空多选的内容

el-table中添加:row-key“getRowKeys” 设置true【 :reserve-selection“true”】 :row-key"getRowKeys" <el-table-column type"selection" :reserve-selection"true" width"55" align"center" fixed"left" …

day14-单例设计模式动态代理

一、单例设计模式 单例设计模式作用&#xff1a;确保一个类只有一个对象。场景&#xff1a;计算机中的回收站、任务管理器、Java中的Runtime类等好处&#xff1a;在这些业务场景下&#xff0c;使用单例模式&#xff0c;可以避免浪费内存。 1.1 饿汉式 饿汉式(提前创建对象)把类…

828华为云征文|华为云Flexus X实例docker部署Rocket.Chat构建属于自己的团队通讯协作平台

828华为云征文&#xff5c;华为云Flexus X实例docker部署Rocket.Chat构建属于自己的团队通讯协作平台 华为云最近正在举办828 B2B企业节&#xff0c;Flexus X实例的促销力度非常大&#xff0c;特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务…

闲鱼网页版开放,爬虫的难度指数级降低。

爬虫&#xff0c;可以说是程序员最基础的热手项目。 之前我也一直说阿里系的签名系统搞得太复杂&#xff0c;风控太高&#xff0c;很不利于正常的自动化工具开发&#xff0c;这对于需要阿里应用的客户来说&#xff0c;也是一个很难覆盖的成本支出不是。 当然&#xff0c;我做项…

iPhone 16系列:摄影艺术的全新演绎,探索影像新境界

在科技的浪潮中&#xff0c;智能手机摄影功能的进化从未停歇。 苹果公司即将推出的iPhone 16系列&#xff0c;以其卓越的相机升级和创新特性&#xff0c;再次站在了手机摄影的前沿。 从硬件到软件&#xff0c;从拍照体验到图像处理&#xff0c;iPhone 16系列都展现了其在移动…