本章节主要是学习WebSocket, 目标快速入门, 能够回答以下问题:
WebSocket和HTTP的区别.
WebSocket使用的是全双工通讯协议, 与其他类似协议有啥区别?
WebSocket中的常用注解有哪些?
通过本章节的学习, 应该可以回答上来这几个问题.
8.1 WebSocket概念快速理解
WebSocket 是HTML5一种新的协议。 主要应用范围为实时通讯相关领域, 比如聊天软件, 实况更新和社交订阅等.
WebSocket 实现了浏览器与服务器全双工通信.
全双工通信就是我们现在的电话, 双方都可以讲话.
半双工通信就是指一个时间段内只有一个动作, 就是以前的对讲机, 同时只能有一个人说话, 说完需要加一个"over", 以便让别人说话.
单工通信更常见, 就是遥控 你的电视遥控只能发,不能收.
8.1.1 为什么HTML5提出WebSocket
思考这样一类问题, 现在有这样一类简单的需求需要你实现:
在网页不刷新的情况下, 搭建一个网页版聊天室;
网页不刷新, 并刷新购物车内的商品个数
网页不刷新, 实时更新朋友的位置.
为什么一定要强调网页不刷新呢? 因为如果网页刷新,你就可以使用HTTP请求那套, 你发起一个请求, 服务器给你一个响应了.但是网页刷新在实时性要求较高的业务中根本没办法满足需求.
所以在HTML5之前. 通常的解决方案如下:
采用轮询的方式。即:通过js不断的请求服务器,查看是否有新数据,如果有,就获取到新数据
这种方案有很明显的缺点: js发出的大部分请求都没有获取到数据更新, 从而对双方机器造成了严重的资源浪费.
所以, 为了提出一个更好的方案, HTML5提出了WebSocket技术.
8.1.2 http与websocket的区别
http协议是短连接,每次请求之后,都会关闭连接,下次重新请求数据,需要再次打开链接.
WebSocket协议是一种长链接,只需要通过一次请求来初始化链接,然后所有的请求和响应都是通过这个TCP链接
进行通讯。
不理解的话,我举个例子.
HTTP就是发微信语音, 每次都需要先找到联系人, 然后按一下那个录音和发送.
WebSocket则与语音通话, 之后所有的对话都通过语音直接进行通讯. (双方只需要控制好发送的内容, 对流量也不会造成太大浪费)
8.2 WebSocket 的Java版demo快速实现
多说无益, 我们开始实战吧.
8.2.1 新建itcast-websocket工程(Maven工程)
首先先引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>itcast-websocket2</artifactId><version>1.0-SNAPSHOT</version><!-- 我们用的maven的tomcat插件运行, 所以在maven中一定要配置打包方式为为war包 --><packaging>war</packaging><dependencies><!-- websocket所需依赖 --><dependency><groupId>javax</groupId><artifactId>javaee-api</artifactId><version>7.0</version><scope>provided</scope></dependency></dependencies><build><plugins><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- 配置Tomcat插件 --><plugin><groupId>org.apache.tomcat.maven</groupId><artifactId>tomcat7-maven-plugin</artifactId><version>2.2</version><configuration><port>8082</port><path>/</path></configuration></plugin></plugins></build>
</project>
然后新建一个demo类, 话都在注释里:
package cn.itcast.websocket;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;/*** Websocket快速入门demo, 阅读代码前最好理解'WebSocket生命周期概念'* 连接->正常收发信息/异常消息->正常收发信息/异常消息->.....->正常收发信息/异常消息->断开连接** @author 过道*/
// @ServerEndpoint 申明这是一个websocket服务, 可以简单类比为Controller注解
@ServerEndpoint("/websocket/{uid}")
public class MyWebsocket {// @OnOpen 该注解标识的方法将在建立连接后执行// @OnOpen 标识的方法可以接收到session对象,就是客户端与服务端建立的长连接通道// @PathParam, 从路径中的{}中读取内容, 与SpringMVC一致.@OnOpenpublic void onOpen(Session session, @PathParam("uid") String uid) throwsIOException {// 连接成功, 使用 session 的 api 发送文字session.getBasicRemote().sendText(uid + ",你好,欢迎连接WebSocket!");}// 关闭连接时, 会触发对这个注解@OnClose标识的方法的调用@OnClosepublic void onClose() {System.out.println(this + "关闭连接");}/*** 该方法用于接收客户端发来的消息** @param message 发来的消息数据* @param session 会话对象(也是通道)*/@OnMessagepublic void onMessage(String message, Session session) throws IOException {System.out.println("接收到消息:" + message);session.getBasicRemote().sendText("消息已收到.");}
}
写完之后, 我们使用maven的tomcat插件运行下项目:
8.2.2 进行测试
打开网页: http://www.easyswoole.com/wstool.html
然后url改为
ws://localhost:8082/websocket/12
如图所示, 点击开始连接, 连接成功后发送就可以了.
8.3 SpringBoot整合WebSocket
为了避免麻烦, 我们直接在8.2的demo项目上改进
pom.xml修改为下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><!--spring boot的支持--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.0.RELEASE</version></parent><groupId>cn.itcast.websocket</groupId><artifactId>itcast-websocket</artifactId><version>1.0-SNAPSHOT</version><!-- 这里不用管, 我们之后会用boot的main方式启动 --><packaging>war</packaging><dependencies><!--<dependency>--><!--<groupId>javax</groupId>--><!--<artifactId>javaee-api</artifactId>--><!--<version>7.0</version>--><!--<scope>provided</scope>--><!--</dependency>--><!-- 修改为SpringBoot的webstocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency></dependencies><build><plugins><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- 配置Tomcat插件 --><plugin><groupId>org.apache.tomcat.maven</groupId><artifactId>tomcat7-maven-plugin</artifactId><version>2.2</version><configuration><port>8082</port><path>/</path></configuration></plugin></plugins></build>
</project>
8.3.1 快速接入SpringBoot
在Spring中,处理消息的具体业务逻辑需要实现WebSocketHandler接口。
package cn.itcast.websocket.spring;import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.io.IOException;/*** 在Spring中, 我们需要实现WebSocketHandler接口, 并且需要注册为组件,交给IOC池子维护.*/
@Component
public class MyHandler extends TextWebSocketHandler {/*** 这里就是我们demo版本的 @onMessage 哦,每次收到信息都会走这个方法.*/@Overridepublic void handleTextMessage(WebSocketSession session, TextMessage message)throws IOException {System.out.println("获取到消息 >> " + message.getPayload());// 向客户端发送消息session.sendMessage(new TextMessage("消息已收到"));// 如果收到了 '10', 那么给另一方发送0,1,2,3,... 9if(message.getPayload().equals("10")){for (int i = 0; i < 10; i++) {session.sendMessage(new TextMessage("消息 -> " + i));try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 这个就是我们demo版本中的 @OnOpen 了, 建立连接后会调用这个.*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) throwsException {session.sendMessage(new TextMessage(" 你好!欢迎连接到ws服务"));}/*** 这就是demo版本的 @OnClose 注解, 关闭连接后会触发这个.*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status)throws Exception {System.out.println("断开连接!");}
}
编写配置类
package cn.itcast.websocket.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;/*** WebSocket相关配置类*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {// 从IOC池子中取出我们自定义的socket处理器@Autowiredprivate MyHandler myHandler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {// 将webSocket收到的所有路径中带有 '/ws'的交给myHandler处理registry.addHandler(myHandler, "/ws").setAllowedOrigins("*");}}
编写启动类
package cn.itcast.websocket;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class, args);}
}
然后运行后打开测试网站http://www.easyswoole.com/wstool.html
8.3.2 websocket拦截器
在Spring中提供了websocket拦截器,可以在建立连接之前写些业务逻辑,比如校验登录等。
新建一个 MyHandshakeInterceptor 拦截器
package cn.itcast.websocket.spring;import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;@Component
public class MyHandshakeInterceptor implements HandshakeInterceptor {/*** 握手之前,若返回false,则不建立链接*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponseresponse, WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {//将用户id放入socket处理器的会话(WebSocketSession)中attributes.put("uid", 1001);System.out.println("开始握手。。。。。。。");return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponseresponse, WebSocketHandler wsHandler, Exception exception) {System.out.println("握手成功啦。。。。。。");}
}
修改WebSocketConfig类注册拦截器
package cn.itcast.websocket.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;/*** WebSocket相关配置类*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {// 从IOC池子中取出我们自定义的socket处理器@Autowiredprivate MyHandler myHandler;@Autowiredprivate MyHandshakeInterceptor myHandshakeInterceptor;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(this.myHandler, "/ws").setAllowedOrigins("*")// 追加注册一个拦截器的代码.addInterceptors(this.myHandshakeInterceptor);}
}
然后为了证明我们再在MyHandler的afterConnectionEstablished 修改下, 加上对方的uid
/*** 这个就是我们demo版本中的 @OnOpen 了, 建立连接后会调用这个.*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) throwsException {// 获取uidInteger uid = (Integer) session.getAttributes().get("uid");session.sendMessage(new TextMessage(uid + ", 你好!欢迎连接到ws服务"));}
再次启动并测试, 发现ok
8.4 使用WebSocket搭建即时通讯系统
在这里我们不连接数据库, 只做简单的缓存, 之后在其他一章节中连接数据库.
8.4.1 引入依赖
pom.xml文件如下.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>itcast-haoke-im-webstocket</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.0.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency></dependencies><build><plugins><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build>
</project>
项目结构图如下所示 :
我们先只实现WebSocket相关内容
8.4.2 搭建pojo和对应的DAO层
package cn.itcast.haoke.im.pojo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message {private long id;// 消息体暂不支持复杂消息.private String msg;/*** 消息状态,1-未读,2-已读*/private Integer status;// 发送的时间和已读的时间private Date sendDate;private Date readDate;// 发送方和接收方private User from;private User to;
}
用到的User类代码如下:
package cn.itcast.haoke.im.pojo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class User {private Long id;private String username;
}
MessageDao如下, 我这里先用HashMap模拟数据库
package cn.itcast.haoke.im.dao;import cn.itcast.haoke.im.pojo.Message;import java.util.List;public interface MessageDAO {/*** 查询点对点聊天记录*/List<Message> findListByFromAndTo(Long fromId, Long toId, Integer page, Integerrows);/*** 根据id查询数据*/Message findMessageById(Long id);/*** 新增消息数据** * @param message* * @return*/Message saveMessage(Message message);void updateMessageState(long id, int i);
}
具体实现如下:
package cn.itcast.haoke.im.dao.impl;import cn.itcast.haoke.im.dao.MessageDAO;
import cn.itcast.haoke.im.pojo.Message;
import org.springframework.stereotype.Component;import java.util.*;
import java.util.concurrent.atomic.AtomicLong;/*** 我们在内存中简单实现一个查询类, 之后使用数据库替换.*/
@Component
public class MessageDAOImpl implements MessageDAO {private static AtomicLong ID_BUILDER = new AtomicLong(1L);Map<Long, Message> db = new HashMap<>();@Overridepublic List<Message> findListByFromAndTo(Long fromId, Long toId, Integer page, Integer rows) {// 遍历所有信息, 找到满足的信息Collection<Message> values = db.values();// 先查出所有内容List<Message> messageList = new ArrayList<>();for (Message value : values) {if (value.getFrom() != null && Objects.equals(value.getFrom().getId(), fromId)) {if (value.getTo() != null && Objects.equals(value.getTo().getId(), toId)) {messageList.add(value);}}}// 处理分页return messageList.subList(page * rows, rows);}@Overridepublic Message findMessageById(Long id) {return db.get(id);}@Overridepublic Message saveMessage(Message message) {// 按照调用顺序生成一个idmessage.setId(ID_BUILDER.getAndIncrement());db.put(message.getId(), message);return message;}@Overridepublic void updateMessageState(long id, int i) {Message messageById = findMessageById(id);if (messageById != null) {// 因为是在map中操作, 所以直接修改状态就可以了.messageById.setStatus(i);}}
}
因为我们用的是假数据, 所以再写一些默认的假数据
package cn.itcast.haoke.im.pojo;import java.util.HashMap;
import java.util.Map;public class UserData {public static final Map<Long, User> USER_MAP = new HashMap<>();static {USER_MAP.put(1001L, User.builder().id(1001L).username("zhangsan").build());USER_MAP.put(1002L, User.builder().id(1002L).username("lisi").build());USER_MAP.put(1003L, User.builder().id(1003L).username("wangwu").build());USER_MAP.put(1004L, User.builder().id(1004L).username("zhaoliu").build());USER_MAP.put(1005L, User.builder().id(1005L).username("sunqi").build());}
}
8.4.3 接入WebSocket
首先, 我们按照之前的demo版流程, 编写一个Handler来处理WebSocket的几个生命周期.
package cn.itcast.haoke.im.websocket;import cn.itcast.haoke.im.dao.MessageDAO;
import cn.itcast.haoke.im.pojo.Message;
import cn.itcast.haoke.im.pojo.UserData;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.util.HashMap;
import java.util.Map;/*** webSocket 消息处理器** @author 过道*/
@Component
public class MessageHandler extends TextWebSocketHandler {@Autowiredprivate MessageDAO messageDAO;private static final ObjectMapper MAPPER = new ObjectMapper();// 记录所有在线的终端, 并配置唯一标识.private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 将当前用户的session放置到map中,后面会使用相应的session通信Long uid = (Long) session.getAttributes().get("uid");SESSIONS.put(uid, session);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessagetextMessage) throws Exception {// 解析消息中的发送方, 接收方, 消息内容.Long uid = (Long) session.getAttributes().get("uid");JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());Long toId = jsonNode.get("toId").asLong();String msg = jsonNode.get("msg").asText();Message message = Message.builder()// 假装发送用户和接受用户都是从数据库中查出来的.from(UserData.USER_MAP.get(uid)).to(UserData.USER_MAP.get(toId)).msg(msg).build();// 存入数据库message = this.messageDAO.saveMessage(message);// 判断to用户是否在线WebSocketSession toSession = SESSIONS.get(toId);if (toSession != null && toSession.isOpen()) {// 在线且可收消息的话, 实时发送给接收方.//TODO 具体格式需要和前端对接toSession.sendMessage(new TextMessage(MAPPER.writeValueAsString(message)));// 更新消息状态为已读this.messageDAO.updateMessageState(message.getId(), 2);}}
}
为了简单起见,我们直接使用url来传当前用户的id, 格式如下
ws://{服务器url}:{服务器端口}/ws/{当前用户id}
示例如下:
ws://localhost:8080/ws/1002
ws://localhost:8080/ws/1001
约定好id传输格式,我们需要在每个连接建立时, 将uid放入到attributes中, 所以需要用到拦截器
package cn.itcast.haoke.im.websocket;import org.apache.commons.lang3.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;@Component
public class MessageHandshakeInterceptor implements HandshakeInterceptor {/*** 解析路径中的uid, 并放入 attributes 中, 以便之后使用*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponseresponse, WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {String path = request.getURI().getPath();String[] ss = StringUtils.split(path, '/');if (ss.length != 2) {return false;}if (!StringUtils.isNumeric(ss[1])) {return false;}attributes.put("uid", Long.valueOf(ss[1]));return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponseresponse, WebSocketHandler wsHandler, Exception exception) {}
}
现在, 只需要我们将我们自定义的拦截器和Handler配置到Spring中, 让Spring将接收到的请求转发给我们就ok了
package cn.itcast.haoke.im.websocket;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate MessageHandler messageHandler;@Autowiredprivate MessageHandshakeInterceptor messageHandshakeInterceptor;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(this.messageHandler, "/ws/{uid}").setAllowedOrigins("*").addInterceptors(this.messageHandshakeInterceptor);}
}
8.4.4 编写启动类并测试
package cn.itcast.haoke.im;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ImApplication {public static void main(String[] args) {SpringApplication.run(ImApplication.class, args);}
}
启动项目, 启动成功;
这次还是打开在线测试工具 http://www.easyswoole.com/wstool.html
注意要打开两份
一份的url : ws://localhost:8080/ws/1001
另一份的url: ws://localhost:8080/ws/1002
点击连接, 都连接成功.
我们打开url是 1001的页面, 发送内容
{
"toId":1002,
"msg" : "你好, 1002"
}
然后看一眼url为1002的页面, 确实收到了内容. 如此测试就可以了.