![](https://img-blog.csdnimg.cn/img_convert/355045d6e930c4f63c6327ec7088fe87.png)
1.导入webSocket依赖
<!--websocket依赖包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.编写webSocket类
package com.skyable.device.config.webSocket;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;/*** @author Administrator*/
@ServerEndpoint("/vehicle/{domainId}")
@Component
@Slf4j
public class WebSocketServer {/*** concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/private static final Set<Session> SESSIONS = new HashSet<>();/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {log.info("webSocket link close");}/*** @param error*/@OnErrorpublic void onError(Throwable error) {error.printStackTrace();}/*** 接收数据** @param data*/public static void sendDataToClients(String data) {for (Session session : SESSIONS) {try {session.getBasicRemote().sendText(data);} catch (IOException e) {e.printStackTrace();}}}@OnOpenpublic void onOpen(Session session) {/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*//*** 接收domainId*/SESSIONS.add(session);sendDataToClients();}public void sendDataToClients() {for (Session session : SESSIONS) {try {session.getBasicRemote().sendText("webSocket link succeed");} catch (IOException e) {e.printStackTrace();}}}
}
package com.skyable.device.config.webSocket;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @author Administrator*/
@EnableWebSocket
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
3.kafak消费数据后调用webSocket方法
/*** 获取kafka数据** @param*/@Overridepublic void saveBatch(String jsonValue) {ObjectMapper objectMapper = new ObjectMapper();try {//位置JsonNode jsonNode = objectMapper.readTree(jsonValue);if (jsonNode.has(VehicleConstant.LOCATION)) {RealTimePosition realTimePosition = new RealTimePosition();JsonNode locationNode = jsonNode.get("location");String vehicleId = locationNode.get("vehicleId").asText();double longitude = Double.parseDouble(locationNode.get("longitude").asText());double latitude = Double.parseDouble(locationNode.get("latitude").asText());long timeStamp = locationNode.get("timestamp").asLong();realTimePosition.setTimeStamp(timeStamp);realTimePosition.setLatitude(String.valueOf(latitude));realTimePosition.setLongitude(String.valueOf(longitude));realTimePosition.setVehicleId(vehicleId);VehicleLocationVo locationVo = deviceMapMapper.selectLonLat(vehicleId);if (!Objects.isNull(locationVo)) {//计算距离RedisUtil.addLocation(vehicleId, Double.parseDouble(locationVo.getLongitude()), Double.parseDouble(locationVo.getLatitude()), "l1");RedisUtil.addLocation(vehicleId, longitude, latitude, "l2");Double result = RedisUtil.calculateDistance(vehicleId, "l1", "l2");Double meters = RedisUtil.convertMilesToKilometers(result);DecimalFormat decimalFormat = new DecimalFormat("#.###");String distance = decimalFormat.format(meters);realTimePosition.setDistance(Double.parseDouble(distance));} else {realTimePosition.setDistance(0);}//获取省份Map<String, Object> position = addressUtil.getPosition(longitude, latitude, null, null, null);Map data = (Map) position.get("data");String provinceName = data.get("shortname").toString().replaceAll("\"", "");realTimePosition.setArea(provinceName);deviceMapMapper.insertRealTimePosition(realTimePosition);RedisUtil.addZSetValue(VehicleConstant.VEHICLE_LOCATION, String.valueOf(vehicleId), timeStamp);}} catch (JsonProcessingException e) {e.printStackTrace();}try {//报警JsonNode jsonNode = objectMapper.readTree(jsonValue);if (jsonNode.has(VehicleConstant.ALERT)) {JsonNode alertNode = jsonNode.get("alert");String vehicleId = alertNode.get("vehicleId").asText();Integer alertType = alertNode.get("alertType").asInt();long timeStamp = alertNode.get("timestamp").asLong();Alerts alerts = new Alerts();alerts.setAlertType(alertType);alerts.setTimeStamp(timeStamp);alerts.setVehicleId(vehicleId);deviceMapMapper.insertAlerts(alerts);RedisUtil.addZSetValue(VehicleConstant.VEHICLE_ALERT, String.valueOf(vehicleId), timeStamp);}} catch (JsonProcessingException e) {e.printStackTrace();}//webSocket发送消息VehicleAllVo vehicles = vehicles();WebSocketServer.sendDataToClients(vehicles.toString());}
4.发送消息内容
VehicleAllVo vehicles = vehicles();
该方法就是发送的具体内容
5.kafak消费者
package com.skyable.device.listener.Vehicle;import com.alibaba.fastjson.JSON;
import com.skyable.common.config.CloudApplicationContext;
import com.skyable.common.constants.kafka.KafkaTopicConstants;
import com.skyable.device.config.webSocket.WebSocketServer;
import com.skyable.device.entity.vehicle.Vehicle;
import com.skyable.device.service.DeviceMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.stream.Collectors;/*** Description:** @author yangJun* @date: 2023-08-18-14:12*/
@Service
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class VehicleDataKafkaListener {private final DeviceMapService deviceMapService;@KafkaListener(topics = KafkaTopicConstants.TOPIC_VEHICLE_RECORD, groupId = "rx_1_thing", containerFactory = "batchFactory")public void dealDeviceDataToScript(List<ConsumerRecord<String, String>> recordList) {recordList.parallelStream().map(ConsumerRecord::value).forEach(this::saveVehicleData);}private void saveVehicleData(String jsonValue) {log.info("kafka data:" + jsonValue);deviceMapService.saveBatch(jsonValue);}
}
package com.skyable.device.listener.Vehicle;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;/*** @ClassName KafkaConsumerConfig* @Description Kafka消费者配置* @Author gaoy* @Date 2021/2/25 15:02*/
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.concurrency}")private int concurrency;@Value("${spring.kafka.consumer.max-poll-records}")private int maxPollRecords;/*** 批量消费工厂bean* @return*/@BeanKafkaListenerContainerFactory batchFactory() {ConcurrentKafkaListenerContainerFactory factory = newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));// 开启批量监听factory.setBatchListener(true);factory.setConcurrency(concurrency);// 设置手动提交ackMode// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Beanpublic Map consumerConfigs() {Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//设置每次接收Message的数量props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//开启幂等性。props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);return props;}}