文章目录
- 〇、代码逻辑
- 一、搭建Server
- 1.引入依赖
- 2.搭建一个简单的Server
- 二、搭建WebSocket建立连接
- 1.修改Server,增加一些支持
- 2.自定义一个WebSocketHandler
- 三、功能实现——用户注册上线
- 1.先定义一个工具类Result,用于封装服务端返回消息
- 2.封装客户端指令
- 3.完善WebSocketHandler
- 4.给Server添加一个存放用户映射关系的Map
- 5.定义ConnectionHandler,用于处理用户上线功能的逻辑
- 6.最后,运行项目来进行测试
- 四、功能实现——私聊
- 1.扩展Command类,增加封装后的消息协议
- 2.修改WebSocketHandler,增加对聊天功能的处理ChatHandler
- 3.定义ChatHandler用于处理聊天任务
- 4.运行项目进行测试
- 五、功能实现——群聊
- 1.给Server类添加一个channel组,实现系统默认群聊组
- 2.给CommandType增加一个加入群聊指令的枚举类型
- 3.在WebSocketHandler中增加加入群组功能的处理
- 4.定义JoinHandler,用于处理加入群聊的业务逻辑
- 5.在ChatHandler中增加发送群聊消息的代码
- 6.运行项目进行测试
项目源码: github
websocket在线测试工具(在没有前端的情况下也可以与Server连接并进行通信): http://websocket-test.com/
在前两篇博客中我介绍了Java IO通信模型和Netty核心概念,在这篇博客中我会展示如何使用Netty开发一个仿WeChat通讯工具——SmartChat。
我们都知道,Netty是一个异步基于事件驱动的高性能网络通信框架,下面我们使用它来一步步搭建SmartChat。
〇、代码逻辑
一、搭建Server
首先,使用熟悉的IDE工具创建一个Java项目,我命名为了SmartChat。
1.引入依赖
<!-- netty -->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.85.Final</version>
</dependency>
<!-- lombok工具 -->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope>
</dependency>
<!-- json -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.18</version>
</dependency>
2.搭建一个简单的Server
package tracy;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class Server {public static void start(){//主线程池EventLoopGroup bossPool=new NioEventLoopGroup();//副线程池EventLoopGroup workPool=new NioEventLoopGroup();//用于监听端口ServerBootstrap bootstrap=new ServerBootstrap();bootstrap.group(bossPool,workPool)//放入两个线程池.channel(NioServerSocketChannel.class)//指定channel.childHandler(new ChannelInitializer<SocketChannel>() {//初始化channel@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {}});//监听端口ChannelFuture future=bootstrap.bind(8080);}
}
- 在项目启动类中调用Server类的start():
package tracy;/*** 启动类*/
public class Main {public static void main(String[] args) {System.out.println("SmartChat!!!");System.out.println("Hello and welcome!");Server.start();}
}
- 运行启动类:
二、搭建WebSocket建立连接
websocket用于在服务端和客户端之间建立连接。
1.修改Server,增加一些支持
package tracy;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;public class Server {public static void start () throws InterruptedException{//主线程池EventLoopGroup bossPool=new NioEventLoopGroup();//副线程池EventLoopGroup workPool=new NioEventLoopGroup();//用于监听端口ServerBootstrap bootstrap=new ServerBootstrap();bootstrap.group(bossPool,workPool)//放入两个线程池.channel(NioServerSocketChannel.class)//指定channel.childHandler(new ChannelInitializer<SocketChannel>() {//初始化channel@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//获取pipeline,pipeline的工作是基于责任链模式ChannelPipeline pipeline=socketChannel.pipeline();//添加一些handler//http编码解码器pipeline.addLast(new HttpServerCodec())//对大数据量的支持.addLast(new ChunkedWriteHandler())//对http消息进行聚合.addLast(new HttpObjectAggregator(1024*24))//对websocket进行支持.addLast(new WebSocketServerProtocolHandler("/"))//websocket具体怎么处理,需要自定义.addLast(new WebSocketHandler());}});//监听端口bootstrap.bind(8080);}
}
2.自定义一个WebSocketHandler
package tracy;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;/*** TextWebSocketFrame表示消息体为文本类型*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//先做一个最简单的处理,把消息内容直接打印出来System.out.println("客户端消息:"+msg.text());}
}
- 启动项目后去websocket在线测试网站http://websocket-test.com/进行测试:
- 成功:
三、功能实现——用户注册上线
功能概述: 用户A要想与用户B进行私聊,首先需要保存用户A和B和Server的连接标识,即需要保存映射关系。
- 优化目录结构:
首先,我们先来优化一下目录结构,使其更清晰。
command用来存放客户端的指令,handler用来存放我们的一些处理逻辑,util存放工具类。
1.先定义一个工具类Result,用于封装服务端返回消息
package tracy.util;import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.AllArgsConstructor;
import lombok.Data;import java.time.LocalDateTime;@Data
@AllArgsConstructor
public class Result {private String name;//消息类型private LocalDateTime time;private String message;//消息内容public static TextWebSocketFrame fail(String message){return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息",LocalDateTime.now(),message)));}public static TextWebSocketFrame success(String message){return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息",LocalDateTime.now(),message)));}public static TextWebSocketFrame success(String name,String message){return new TextWebSocketFrame(JSON.toJSONString(new Result(name,LocalDateTime.now(),message)));}
}
2.封装客户端指令
- 定义一个Command类,用于描述客户端的指令:
package tracy.command;import lombok.Data;@Data
public class Command {//用户昵称private String nickname;//指令private Integer code;
}
- 定义一个CommandType枚举类型,用于罗列客户端的指令类型:
package tracy.command;import lombok.AllArgsConstructor;
import lombok.Getter;@Getter
@AllArgsConstructor
public enum CommandType{//指令:建立连接CONNECTION(1),//指令:错误指令ERROR(0),;private final Integer code;public static CommandType match(Integer code){//遍历枚举类型的所有值,看输入的code是否能与某一个匹配上for (CommandType value:CommandType.values()){if(value.getCode().equals(code))return value;}//匹配不上,说明输入的指令不是合法的枚举类return ERROR;}
}
3.完善WebSocketHandler
package tracy.handler;import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import tracy.command.*;
import tracy.util.Result;/*** TextWebSocketFrame表示消息体为文本类型*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//将消息解析成一个Command能兼容的对象try{//将json形式的文本解析成Command类Command command= JSON.parseObject(msg.text(),Command.class);//每一种指令都定义一个对应的Handler来进行处理switch(CommandType.match(command.getCode())){case CONNECTION: ConnectionHandler.execute(command,ctx);break;default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));}}catch (Exception e){e.printStackTrace();ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));}}
}
4.给Server添加一个存放用户映射关系的Map
//用于保存映射关系
public static final Map<String,Channel> USERS=new ConcurrentHashMap<>();
5.定义ConnectionHandler,用于处理用户上线功能的逻辑
package tracy.handler;import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.command.Command;
import tracy.util.Result;public class ConnectionHandler {public static void execute(Command command, ChannelHandlerContext ctx){//容错处理:避免相同昵称的用户重复上线if(Server.USERS.containsKey(command.getNickname())){ctx.channel().writeAndFlush(Result.fail("该用户已上线,请更换昵称后重试!"));ctx.channel().disconnect();return;}//将用户加入到服务端的映射队列中Server.USERS.put(command.getNickname(),ctx.channel());//返回一条表示用户上线成功的消息ctx.channel().writeAndFlush(Result.success("与服务端连接建立成功!"));//再以json字符串的形式返回当前在线的用户列表ctx.channel().writeAndFlush(Result.success(JSON.toJSONString(Server.USERS.keySet())));}
}
6.最后,运行项目来进行测试
http://websocket-test.com/
wang001上线成功
{"nickname": "tracy001","code": 1
}
新开一个测试窗口,tracy002上线成功
{"nickname": "tracy002","code": 1
}
新开一个测试窗口,tracy001上线失败,原因是昵称重复
{"nickname": "tracy001","code": 1
}
新开一个测试窗口,tracy003上线失败,原因是指令=2不合法
{"nickname": "tracy003","code": 2
}
四、功能实现——私聊
1.扩展Command类,增加封装后的消息协议
- Command:
package tracy.command;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Command {//用户昵称private String nickname;//指令private Integer code;//消息private Message message;
}
- CommandType也需要增加相应的枚举值:
//指令:聊天
CHAT(2),
- Message:
package tracy.command;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message{//消息类型private Integer type;//接收对象private String target;//内容private String content;public Message(Integer type,String target){this.type=type;this.target=target;}public Message(Integer type){this.type=type;}
}
- MessageType:
package tracy.command;import lombok.AllArgsConstructor;
import lombok.Getter;@Getter
@AllArgsConstructor
public enum MessageType {//私聊PRIVATE(1),//群聊GROUP(2),//错误ERROR(0);private Integer type;public static MessageType match(Integer type){//遍历枚举类型的所有值,看输入的type否能与某一个匹配上for (MessageType value:MessageType.values()){if(value.getType().equals(type))return value;}//匹配不上,说明输入的type不是合法的枚举类return ERROR;}
}
2.修改WebSocketHandler,增加对聊天功能的处理ChatHandler
package tracy.handler;import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import tracy.command.*;
import tracy.util.Result;/*** TextWebSocketFrame表示消息体为文本类型*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {try{//将json形式的文本解析成Command类Command command= JSON.parseObject(msg.text(),Command.class);//每一种指令都定义一个对应的Handler来进行处理switch(CommandType.match(command.getCode())){case CONNECTION: ConnectionHandler.execute(command,ctx);break;case CHAT: ChatHandler.execute(command,ctx);break;default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));}}catch (Exception e){e.printStackTrace();ctx.channel().writeAndFlush(Result.fail("发送消息格式错误,请确认后再试"));}}
}
3.定义ChatHandler用于处理聊天任务
package tracy.handler;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.command.Command;
import tracy.command.Message;
import tracy.command.MessageType;
import tracy.util.Result;public class ChatHandler {//用户聊天逻辑public static void execute(Command command, ChannelHandlerContext ctx){try{Message message=command.getMessage();//按不同聊天类型进行处理switch(MessageType.match(message.getType())){//私聊case PRIVATE: {//信息接收对象为空String target=message.getTarget();Channel channel=Server.USERS.get(target);if(target==null||target.isEmpty()){ctx.channel().writeAndFlush(Result.fail("接收者信息为空,请确认后再试"));return;}//信息接收对象不存在else if(channel==null){ctx.channel().writeAndFlush(Result.fail("接收者"+target+"不存在,请确认后再试"));return;}//信息接收对象下线了else if(!channel.isActive()){ctx.channel().writeAndFlush(Result.fail("接收者"+target+"已下线,请确认后再试"));return;}else{channel.writeAndFlush(Result.success("私聊消息("+command.getNickname()+")",message.getContent()));}};break;//群聊,先空着,下一章实现case GROUP: ;break;default: ctx.channel().writeAndFlush(Result.fail("不支持的TYPE"));}}catch (Exception e){e.printStackTrace();ctx.channel().writeAndFlush(Result.fail("发送消息格式错误,请确认后再试"));}}
}
4.运行项目进行测试
http://websocket-test.com/
wang001、wang002、wang003上线成功
wang001向wang002发送消息,wang002接收到了
{"nickname": "wang001","code": 2,"message": {"type": 1,"target": "wang002","content": "你好,我是1号"}
}
wang001向并不存在的wang004发送消息,失败
wang003下线,然后wang001向下线的wang003发送消息,失败
五、功能实现——群聊
功能概述:系统提供一个群里组,但是用户需要有加入群聊这个操作,才能进行后续的收发群聊消息。
1.给Server类添加一个channel组,实现系统默认群聊组
//添加一个channel组,用于实现群聊一对多通信
public static final ChannelGroup CHANNEL_GROUP=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
2.给CommandType增加一个加入群聊指令的枚举类型
//指令:加入群聊
JOIN(3),
3.在WebSocketHandler中增加加入群组功能的处理
switch(CommandType.match(command.getCode())){case CONNECTION: ConnectionHandler.execute(command,ctx);break;case CHAT: ChatHandler.execute(command,ctx);break;case JOIN: JoinHandler.execute(ctx);break;default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));}
4.定义JoinHandler,用于处理加入群聊的业务逻辑
package tracy.handler;import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.util.Result;public class JoinHandler {public static void execute(ChannelHandlerContext ctx){Server.CHANNEL_GROUP.add(ctx.channel());ctx.channel().writeAndFlush(Result.success("加入系统默认群聊成功"));}
}
5.在ChatHandler中增加发送群聊消息的代码
switch(MessageType.match(message.getType())){//私聊case PRIVATE: {……};break;//群聊case GROUP: {Server.CHANNEL_GROUP.writeAndFlush(Result.success("群聊消息("+command.getNickname()+")",message.getContent()));};break;default: ctx.channel().writeAndFlush(Result.fail("不支持的TYPE"));}
6.运行项目进行测试
http://websocket-test.com/
自行完成这一工作。
- 加入群聊json:
{"nickname": "wang001","code": 3
}
- 发送群聊消息json:
{"nickname": "wang001","code": 2,"message": {"type": 2,"content": "你好,我是1号"}
}
成功!
到这里,我们就完成了mini版微信聊天工具SmartChat的开发工作了,在此基础上,可以增加前端的开发,以及更多功能的实现,实际上开发步骤都是类似的,只是针对不同的功能具体的业务逻辑不同罢了,感兴趣的同学可以尝试着扩展一下SmartChat的功能。
源码请看文章的最顶部。感谢阅读。