kafak消费数据,webSocket实时推送数据到前端

news/2025/2/12 21:21:37/

1.导入webSocket依赖

 <!--websocket依赖包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

2.编写webSocket类

package com.skyable.device.config.webSocket;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;/*** @author Administrator*/
@ServerEndpoint("/vehicle/{domainId}")
@Component
@Slf4j
public class WebSocketServer {/*** concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/private static final Set<Session> SESSIONS = new HashSet<>();/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {log.info("webSocket link close");}/*** @param error*/@OnErrorpublic void onError(Throwable error) {error.printStackTrace();}/*** 接收数据** @param data*/public static void sendDataToClients(String data) {for (Session session : SESSIONS) {try {session.getBasicRemote().sendText(data);} catch (IOException e) {e.printStackTrace();}}}@OnOpenpublic void onOpen(Session session) {/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*//*** 接收domainId*/SESSIONS.add(session);sendDataToClients();}public void sendDataToClients() {for (Session session : SESSIONS) {try {session.getBasicRemote().sendText("webSocket link succeed");} catch (IOException e) {e.printStackTrace();}}}
}
package com.skyable.device.config.webSocket;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @author Administrator*/
@EnableWebSocket
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

3.kafak消费数据后调用webSocket方法

  /*** 获取kafka数据** @param*/@Overridepublic void saveBatch(String jsonValue) {ObjectMapper objectMapper = new ObjectMapper();try {//位置JsonNode jsonNode = objectMapper.readTree(jsonValue);if (jsonNode.has(VehicleConstant.LOCATION)) {RealTimePosition realTimePosition = new RealTimePosition();JsonNode locationNode = jsonNode.get("location");String vehicleId = locationNode.get("vehicleId").asText();double longitude = Double.parseDouble(locationNode.get("longitude").asText());double latitude = Double.parseDouble(locationNode.get("latitude").asText());long timeStamp = locationNode.get("timestamp").asLong();realTimePosition.setTimeStamp(timeStamp);realTimePosition.setLatitude(String.valueOf(latitude));realTimePosition.setLongitude(String.valueOf(longitude));realTimePosition.setVehicleId(vehicleId);VehicleLocationVo locationVo = deviceMapMapper.selectLonLat(vehicleId);if (!Objects.isNull(locationVo)) {//计算距离RedisUtil.addLocation(vehicleId, Double.parseDouble(locationVo.getLongitude()), Double.parseDouble(locationVo.getLatitude()), "l1");RedisUtil.addLocation(vehicleId, longitude, latitude, "l2");Double result = RedisUtil.calculateDistance(vehicleId, "l1", "l2");Double meters = RedisUtil.convertMilesToKilometers(result);DecimalFormat decimalFormat = new DecimalFormat("#.###");String distance = decimalFormat.format(meters);realTimePosition.setDistance(Double.parseDouble(distance));} else {realTimePosition.setDistance(0);}//获取省份Map<String, Object> position = addressUtil.getPosition(longitude, latitude, null, null, null);Map data = (Map) position.get("data");String provinceName = data.get("shortname").toString().replaceAll("\"", "");realTimePosition.setArea(provinceName);deviceMapMapper.insertRealTimePosition(realTimePosition);RedisUtil.addZSetValue(VehicleConstant.VEHICLE_LOCATION, String.valueOf(vehicleId), timeStamp);}} catch (JsonProcessingException e) {e.printStackTrace();}try {//报警JsonNode jsonNode = objectMapper.readTree(jsonValue);if (jsonNode.has(VehicleConstant.ALERT)) {JsonNode alertNode = jsonNode.get("alert");String vehicleId = alertNode.get("vehicleId").asText();Integer alertType = alertNode.get("alertType").asInt();long timeStamp = alertNode.get("timestamp").asLong();Alerts alerts = new Alerts();alerts.setAlertType(alertType);alerts.setTimeStamp(timeStamp);alerts.setVehicleId(vehicleId);deviceMapMapper.insertAlerts(alerts);RedisUtil.addZSetValue(VehicleConstant.VEHICLE_ALERT, String.valueOf(vehicleId), timeStamp);}} catch (JsonProcessingException e) {e.printStackTrace();}//webSocket发送消息VehicleAllVo vehicles = vehicles();WebSocketServer.sendDataToClients(vehicles.toString());}

4.发送消息内容

VehicleAllVo vehicles = vehicles();
该方法就是发送的具体内容

5.kafak消费者

package com.skyable.device.listener.Vehicle;import com.alibaba.fastjson.JSON;
import com.skyable.common.config.CloudApplicationContext;
import com.skyable.common.constants.kafka.KafkaTopicConstants;
import com.skyable.device.config.webSocket.WebSocketServer;
import com.skyable.device.entity.vehicle.Vehicle;
import com.skyable.device.service.DeviceMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.stream.Collectors;/*** Description:** @author yangJun* @date: 2023-08-18-14:12*/
@Service
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class VehicleDataKafkaListener {private final DeviceMapService deviceMapService;@KafkaListener(topics = KafkaTopicConstants.TOPIC_VEHICLE_RECORD, groupId = "rx_1_thing", containerFactory = "batchFactory")public void dealDeviceDataToScript(List<ConsumerRecord<String, String>> recordList) {recordList.parallelStream().map(ConsumerRecord::value).forEach(this::saveVehicleData);}private void saveVehicleData(String jsonValue) {log.info("kafka data:" + jsonValue);deviceMapService.saveBatch(jsonValue);}
}
package com.skyable.device.listener.Vehicle;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;/*** @ClassName KafkaConsumerConfig* @Description Kafka消费者配置* @Author gaoy* @Date 2021/2/25 15:02*/
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.concurrency}")private int concurrency;@Value("${spring.kafka.consumer.max-poll-records}")private int maxPollRecords;/*** 批量消费工厂bean* @return*/@BeanKafkaListenerContainerFactory batchFactory() {ConcurrentKafkaListenerContainerFactory factory = newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));// 开启批量监听factory.setBatchListener(true);factory.setConcurrency(concurrency);// 设置手动提交ackMode// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Beanpublic Map consumerConfigs() {Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//设置每次接收Message的数量props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//开启幂等性。props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);return props;}}


http://www.ppmy.cn/news/1061409.html

相关文章

工具--录屏软件

记录下录屏软件 ScreenToGif 官网 &#xff1a;https://www.screentogif.com/downloads 我下载的是 Installer 版本。 录屏&#xff0c;默认输出为 gif 。录制的 gif 清晰&#xff0c;且容量低。需要录gif的话主推&#xff01; 录制后输出为 mp4 的话提示要下载 FFmpeg &a…

Redis 7 教程 数据类型 基础篇

🌹 引导 Commands | Redishttps://redis.io/commands/Redis命令中心(Redis commands) -- Redis中国用户组(CRUG)Redis命令大全,显示全部已知的redis命令,redis集群相关命令,近期也会翻译过来,Redis命令参考,也可以直接输入命令进行命令检索。

引入本地 jar 包教程

将本地 jar 包&#xff0c;放到 resource 目录下&#xff0c;在 pom.xml 文件中加入如下依赖&#xff1a; <dependency><groupId>com.hk</groupId><artifactId>examples</artifactId><version>1.0</version><scope>system<…

2023高教社杯数学建模思路 - 复盘:光照强度计算的优化模型

文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米&#xff0c;宽为12米&…

Ubuntu22.0网络/网卡丢失

Ubuntu22.0开机突然连不上网了&#xff0c;右上角网络图标消失了&#xff0c;设置里网络也没有了“有线”&#xff0c;只剩下VPN了&#xff0c;试了好多种办法&#xff0c;最终终于解决了。 看到有些直接用的下面的两条命令&#xff0c;有解决的&#xff0c;不过我这不行。 s…

5分钟理解NPL算法 之 马尔可夫链 Markov Chain

马尔可夫链&#xff08;Markov Chain&#xff09; 马尔可夫链是一种简单的推理模型。用于描述受当前事件影响下的下一事件发生概率。在预测学科中广泛应用。例如股票预测、文字推理、路线推荐等。 他的核心思路是&#xff1a;假设事件顺序为: X 1 , X 2 , X 3 , . . . . . X…

LeetCode 面试题 02.01. 移除重复节点

文章目录 一、题目二、C# 题解 一、题目 编写代码&#xff0c;移除未排序链表中的重复节点。保留最开始出现的节点。 点击此处跳转题目。 示例1: 输入&#xff1a;[1, 2, 3, 3, 2, 1] 输出&#xff1a;[1, 2, 3] 示例2: 输入&#xff1a;[1, 1, 1, 1, 2] 输出&#xff1a;[1, …

Desnet模型详解

模型介绍 DenseNet的主要思想是密集连接&#xff0c;它在卷积神经网络&#xff08;CNN&#xff09;中引入了密集块&#xff08;Dense Block&#xff09;&#xff0c;在这些块中&#xff0c;每个层都与前面所有层直接连接。这种设计可以让信息更快速地传播&#xff0c;有助于解…