本章节会重点讲解NIO的Selector、ByteBuffer和Channel三大组件。NIO即非阻塞IO。
1.1 三大组件
1.1.1 Channel(通道) & Buffer(缓冲区)
channel有一点类似于stream,他就是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输出,channel比stream更为底层。
常见的Channel:
- FileChannel :文件
- DatagramChannel :UDP
- SocketChannel :socket
- ServerSocketChannel :socket
Buffer用来缓冲读写数据,常见的Buffer:
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
1.1.2 Selector(选择器)
服务器的三种设计模式
-
多线程版设计
缺点:
1)内存占用高
2)线程上下文切换成本高
3)只适合连接数少的场景
-
线程池版设计
缺点:
1)阻塞模式下,线程仅能处理一个socket
连接
2)仅适合短连接场景
-
selector
(选择器)版设计selector
的作用就是配合一个线程来管理多个channel
,获取这些channel
上发生的事件,这些channel
工作在非阻塞模式下,不会让线程吊死在一个channel
上,适合连接数多,流量低的场景。
调用selector
的select()
会阻塞直到channel
发生读写就绪事件,这些事件发生,select
方法就会返回这些事件交给thread
来处理。
1.2 ByteBuffer
1.2.1 基本使用
-
创建测试文件
test.txt
123456789qazxsw
-
FileChannel
测试//获取FileChannel 1.输入输出流;2.RandomAccessFile JDK提供的文本编辑工具 try (FileChannel channel = new FileInputStream("F:\\test.txt").getChannel()) {//建立缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);int len = 0;while (true){//从 channel 读取数据,即向 buffer 写入len = channel.read(buffer);if(len == -1){break;}//打印 buffer 内容buffer.flip(); //切换到读模式while (buffer.hasRemaining()){byte b = buffer.get();logger.info("获取的内容:{}",(char)b);}buffer.clear(); //切换到写模式} } catch (IOException e) {e.printStackTrace(); }
1.2.2 内部结构
ByteBuffer
有以下重要属性
capacity
position
limit
开始
写模式下,position
是写入位置,limit
等于容量,下图表示写入字节后的状态
flip
动作发生后,position
切换为读取位置,limit
切换为读取限制
读取字节后,此时的状态为:
clear
动作发生后,状态切换为写状态
compact
动作是将未读完的数据向前压缩,然后切换至写状态
1.2.3 常见的方法
-
分配空间
ByteBuffer.allocate(1024); //使用java的堆内存 ByteBuffer.allocateDirect(1024); //使用直接内存
-
写入数据
//调用channel的read方法 channel.read(buffer); //调用bytebuffer的put方法 buffer.put((byte) 0x61); //存入单独的字节 buffer.put(new byte[]{0x62,0x63,0x64}); //存入字节数组 ... ...
-
读取数据
//调用channel的write方法 channel.write(buffer); //调用ByteBuffer的get方法 System.out.println(buffer.get()); //获取当前指定字节 System.out.println(buffer.get(2)); //获取指定index字节 ... ... //使用rewind方法可以使positios重新定位到0,即达到重复读取数据的效果 buffer.rewind(); //重新读取数据
-
由写模式切换到读模式
buffer.flip(); //切换到读模式
-
由读模式切换到写模式
buffer.clear(); //清空buffer,重新开始填入 buffer.compact(); //将未读取的数据向前压缩,在其后填入
-
标记与复位
buffer.mark(); //标记当前position buffer.reset(); //复位到上次标记的position
-
字符串与
ByteBuffer
的相互转换//字符串转ByteBuffer ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好"); ByteBuffer buffer2 = Charset.forName("UTF-8").encode("你好"); ByteBuffer buffer3 = ByteBuffer.allocate(1024); buffer3.put("你好".getBytes()); ByteBuffer buffer4 = ByteBuffer.wrap("你好".getBytes()); //ByteBuffer转字符串 CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer1); System.out.println(charBuffer.toString());
1.2.4 分散读(Scattering Reads)
-
准备一个文本文件,其内容如下
123456789
-
分散读取到多个ByteBuffer
public static void ScatteringReads(String filePath){try (RandomAccessFile file = new RandomAccessFile(filePath,"rw")){FileChannel channel = file.getChannel();//声明多个ByteBufferByteBuffer buffer1 = ByteBuffer.allocate(10);ByteBuffer buffer2 = ByteBuffer.allocate(10);ByteBuffer buffer3 = ByteBuffer.allocate(10);//读入数据channel.read(new ByteBuffer[]{buffer1,buffer2,buffer3});//切换到写模式buffer1.flip();buffer2.flip();buffer3.flip();//读取第一个字节System.out.println((char) buffer1.get());System.out.println((char) buffer2.get());System.out.println((char) buffer3.get());} catch (Exception e) {e.printStackTrace();} }
1.2.5 集中写(Gathering Writes)
-
集中写入文件
public static void GatheringReads(String filePath){try (FileChannel channel = new RandomAccessFile(filePath, "rw").getChannel()) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好");ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("李焕英");channel.write(new ByteBuffer[]{buffer,buffer1});} catch (IOException e) {e.printStackTrace();} }
1.2.6 粘包半包解析
-
原始数据说明
-
原始数据
Hello,word \n
I am fyy \n
He is a apply
-
粘包现象(多条数据拼接到一起)
Hello,word\nI am fyy\nHe is
-
半包现象(完整数据被截断)
Hello,word\nI am fyy\nHe is
a apply
-
-
解析粘包半包
public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put("Hello,word\nI am fyy\nHe is".getBytes());split(buffer);buffer.put("a apply".getBytes());split(buffer); }public static void split(ByteBuffer buffer){//切换未读buffer.flip();for (int i = 0; i < buffer.limit(); i++) {if(buffer.get(i) == '\n'){int length = i + 1 - buffer.position();ByteBuffer target = ByteBuffer.allocate(length);//读取for (int j = 0; j < length; j++) {target.put(buffer.get());}}}//切换写buffer.compact(); //不能使用clear }
1.3 文件编程(FileChannel
)
注意:FileChannel
只能工作在阻塞模式下
1.3.1 FileChannel简介
-
获取:
不能直接打开
FileChannel
,必须通过FileInputStream
、FileOutputStream
或者RandomAccessFile
来获取FileChannel
,他们都提供有getChannel()
方法- 通过FileInputStream获取的channel只能读
- 通过FileOutputStream获取的channel只能写
- 通过RandomAccessFile获取的channel依据构造RandomAccessFile时的读写模式决定
- r :只读
- rw :读写
- rwd :读写,写操作时同步的刷新到磁盘,刷新内容
- rws :读写,写操作时同步的刷新到磁盘,刷新内容和元数据
//通过FileInputStream获取channel FileChannel channel1 = new FileInputStream(filePath).getChannel(); //通过FileOutputStream获取channel FileChannel channel2 = new FileOutputStream(filePath).getChannel(); //通过RandomAccessFile获取channel FileChannel channel3 = new RandomAccessFile(filePath, "rw").getChannel();
-
读取:
从channel读取数据填充到ByteBuffer,返回值表示读到的字节,-1表示结束
len = channel.read(buffer);
-
写入:
将一个buffer写入channel
//通过FileOutputStream获取channel FileChannel channel = new FileOutputStream(filePath).getChannel(); buffer.put("你好".getBytes()); //填入数据 buffer.flip(); //切换到读模式 //为了将buffer数据全部放入,使用while while (buffer.hasRemaining()){channel.write(buffer); }
-
关闭:
channel必须关闭
- 通过调用
FileInputStream
、FileOutputStream
或者RandomAccessFile
的close
方法 - 通过try-catch的自动资源管理(ARM)
//调用close方法 FileChannel channel1 = null; try {channel1 = new FileInputStream(filePath).getChannel(); }finally {channel1.close(); } //try-catch的自动资源管理(ARM) try (FileChannel channel = new FileInputStream(filePath).getChannel()) {}
- 通过调用
-
位置:
获取当前位置
long position = channel.position();
设置位置
long newPositios = 100L; channel.position(newPositios);
设置当前位置时,如果设置为文件的末尾
- 读取会返回-1,既没有数据
- 写入会追加内容,如果position超出文件末尾,中间会存在空洞(00)
1.3.2 两个channel数据传输
try (FileChannel readChannel = new FileInputStream(readPath).getChannel();FileChannel writeChannel = new FileOutputStream(writePath).getChannel()) {//效率高,底层使用零拷贝技术,最大支持2G文件传输readChannel.transferTo(0,readChannel.size(),writeChannel);//大于2G的文件可以多次传输long size = readChannel.size();for (long left = size; left > 0; ) {left -= readChannel.transferTo((size -left),left,writeChannel);}
}catch (Exception e){e.printStackTrace();
}
1.3.3 Path
JDK7开始引入了Path和Paths类
-
Path 用来表示文件路径
-
Paths时工具类,用来获取Path实例
Path path = Paths.get("F:\\test.txt");
-
.
:当前路径 -
..
:上一级路径Path path = Paths.get("F:\\test.txt\\..\\test2.txt");
-
1.3.4 Files
-
检查文件是否存在
Path path = Paths.get("F:\\test.txt"); System.out.println(Files.exists(path));
-
创建目录
//创建一级目录,存在:FileAlreadyExistsException;多级:NoSuchFileException Path path1 = Paths.get("F:\\test2"); System.out.println(Files.createDirectory(path1));//创建多级目录 Path path2 = Paths.get("F:\\test2\\eee\\222"); System.out.println(Files.createDirectories(path2));
-
文件拷贝
//文件拷贝,文件存在:FileAlreadyExistsException Path source = Paths.get("F:\\test.txt"); Path target = Paths.get("F:\\test2.txt"); Files.copy(source,target); //覆盖拷贝 Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);
-
文件移动
//文件移动ATOMIC_MOVE保证文件移动的原子性 Files.move(source,target,StandardCopyOption.ATOMIC_MOVE);
-
删除文件
//文件删除 Files.delete(source); //文件不存在:NoSuchFileException Files.deleteIfExists(source);
-
删除目录
//删除目录:存在内容:DirectoryNotEmptyException Files.delete(target); //使用walkFileTree Files.walkFileTree(Paths.get("F:\\test2"),new SimpleFileVisitor<Path>(){//进入文件前@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.deleteIfExists(file); //删除文件return super.visitFile(file, attrs);}//推出文件夹时@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.deleteIfExists(dir); //删除文件夹return super.postVisitDirectory(dir, exc);} });
-
文件遍历
AtomicInteger dirCount = new AtomicInteger(); AtomicInteger fileCount = new AtomicInteger(); Files.walkFileTree(Paths.get("F:\\"),new SimpleFileVisitor<Path>(){//进入目录前@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}//进入文件前@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {fileCount.incrementAndGet();return super.visitFile(file, attrs);} });
-
多级目录拷贝
String sourcePath = "F:\\test"; String targetPath = "F:\\test2"; Files.walk(Paths.get(sourcePath)).forEach(path3 -> {try {String targetName = path3.toString().replace(sourcePath,targetPath);if(Files.isDirectory(path3)){//创建拷贝目录Files.createDirectory(Paths.get(targetName));}if(Files.isRegularFile(path3)){//拷贝文件Files.copy(path3,Paths.get(targetName));}}catch (Exception e){e.printStackTrace();} });
1.4 网络编程
1.4.1 非阻塞 vs 阻塞
-
阻塞
- 在没有数据可读时,包括数据复制过程中,线程必须阻塞等待,不会占用CPU,但线程相当于闲置
- 为了减少线程数,可以采用线程池技术
- 即使存在线程池,如果建立的连接过多,长时间的inactive,会阻塞线程池中的所有线程
演示代码
//服务器: //1.声明全局的ByteBuffer用于处理客户端数据 ByteBuffer buffer = ByteBuffer.allocate(1024); //2.声明服务器通道,等待客户端接入 ServerSocketChannel ssc = ServerSocketChannel.open(); //3.绑定端口 ssc.bind(new InetSocketAddress("127.0.0.1",8080)); //4.声明一个集合,保存连接的客户端 List<SocketChannel> socketChannels = new ArrayList<>(); while (true){//5.阻塞SocketChannel socketChannel = ssc.accept(); //阻塞线程,等待客户端接入socketChannels.add(socketChannel); //保存新接入的客户端for (SocketChannel sc : socketChannels) {sc.read(buffer); //阻塞线程,等待客户端数据buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();} } //客户端: SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("127.0.0.1",8080)); while (true){sc.write(StandardCharsets.UTF_8.encode("123456"));Thread.sleep(5000); }
-
非阻塞
- 在channel没有可读事件时,线程不必阻塞,它可以去处理其它有可读事件的channel
- 数据的复制过程中,线程实际还是阻塞的
- 写数据时,线程只是等待数据写入channel即刻,无需等channel通过网络发送数据出去
演示代码
//服务器: //1.声明全局的ByteBuffer用于处理客户端数据 ByteBuffer buffer = ByteBuffer.allocate(1024); //2.声明服务器通道,等待客户端接入 ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); //改为非阻塞模式 //3.绑定端口 ssc.bind(new InetSocketAddress("127.0.0.1",8080)); //4.声明一个集合,保存连接的客户端 List<SocketChannel> socketChannels = new ArrayList<>(); while (true){//5.阻塞SocketChannel socketChannel = ssc.accept(); //非阻塞线程,等待客户端接入if(socketChannel != null){socketChannels.add(socketChannel); //保存新接入的客户端socketChannel.configureBlocking(false); //改为非阻塞模式}for (SocketChannel sc : socketChannels) {int read = sc.read(buffer);//非阻塞,等待客户端数据if(read > 0){log.info("客户端:{}",sc);buffer.flip();log.info("客户端信息:{}",StandardCharsets.UTF_8.decode(buffer));buffer.clear();}} } //客户端: SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("127.0.0.1",8080)); while (true){sc.write(StandardCharsets.UTF_8.encode("123456"));Thread.sleep(5000); }
-
多路复用
线程配合Selector完成对多个channel可读事件的监控,称之为多路复用
- 仅针对网络IO
- Selector保证了有可读事件才去读取
- channel输入的数据一旦准备好,会触发Selector的可读事件
演示代码
//服务器 //声明Selector,管理多个Channel Selector selector = Selector.open(); ByteBuffer buffer = ByteBuffer.allocate(1024); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); //设置为非阻塞模式 //注册到selector,并获得监听的key SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); //注册感兴趣的事件 ssc.bind(new InetSocketAddress("127.0.0.1",8080)); while (true){//关注事件int select = selector.select(); //无事件时阻塞//处理事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();//key.cancel(); //取消处理,避免死循环if(key.isAcceptable()){ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false); //设置为非阻塞模式SelectionKey scKey = sc.register(selector, 0, null); //注册到selector,并获得监听的keyscKey.interestOps(SelectionKey.OP_READ); //注册感兴趣的事件}if(key.isReadable()){try {SocketChannel channel = (SocketChannel)key.channel();int read = channel.read(buffer); //正常断开返回-1if(read == -1){key.cancel(); //取消这个key的所有监听事件}else{buffer.flip();log.info("数据内容:{}",StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}catch (Exception e){key.cancel(); //取消这个key的所有监听事件}}iter.remove(); //处理完删除key,否则下次会处理到已处理的key} }
1.4.2 消息边界
一般情况下会存在三种边界问题:
-
消息过长,超出bytebuffer容量
-
半包现象,消息长度大于bytebuffer一半容量但未超出
-
粘包现象,消息长度过于短
解决思路:
- 固定消息长度,服务器于客户端约定长度,都严格遵守,缺点是浪费带宽
- 约定分隔符,按分隔符拆分,缺点是效率低
- TLV格式:即Type(类型)-Length(长度)-Value(数据),这样类型、长度已知,就可以合理的分配buffer,缺点是buffer需要提前分配,如内容过大会影响server的吞吐量
- Http1.1 是TLV格式
- Http1.2 是LTV格式
//拆分分隔符
public static void selectorServer() throws IOException {//声明Selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); //设置为非阻塞模式//注册到selector,并获得监听的keySelectionKey sscKey = ssc.register(selector, 0, null);sscKey.interestOps(SelectionKey.OP_ACCEPT); //注册感兴趣的事件ssc.bind(new InetSocketAddress("127.0.0.1",8080));while (true){//关注事件int select = selector.select(); //无事件时阻塞//处理事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();//key.cancel(); //取消处理,避免死循环if(key.isAcceptable()){ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false); //设置为非阻塞模式ByteBuffer buffer = ByteBuffer.allocate(10); //注册附件SelectionKey scKey = sc.register(selector, 0, buffer); //注册到selector,并获得监听的keyscKey.interestOps(SelectionKey.OP_READ); //注册感兴趣的事件}if(key.isReadable()){try {SocketChannel channel = (SocketChannel)key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment(); //获取key关联的附件int read = channel.read(buffer); //正常断开返回-1if(read == -1){key.cancel(); //取消这个key的所有监听事件}else{split(buffer);if(buffer.position() == buffer.limit()){ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); //扩容//复制拷贝buffer.flip();newBuffer.put(buffer);//替换附件key.attach(newBuffer);}}}catch (Exception e){key.cancel(); //取消这个key的所有监听事件}}iter.remove(); //处理完删除key,否则下次会处理到已处理的key}}
}public static void split(ByteBuffer buffer){//切换未读buffer.flip();for (int i = 0; i < buffer.limit(); i++) {if(buffer.get(i) == '\n'){int length = i - buffer.position();ByteBuffer target = ByteBuffer.allocate(length);//读取for (int j = 0; j < length; j++) {target.put(buffer.get());}buffer.get(); //读取分隔符,丢掉target.flip();System.out.println(StandardCharsets.UTF_8.decode(target));target.clear();}}//切换写buffer.compact(); //不能使用clear
}
1.4.2 处理可写事件
//服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while (true){selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey scKey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);//准备数据StringBuilder sb = new StringBuilder();for (int i = 0; i < 3000000; i++) {sb.append("a");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());//写入客户端int writeSize = sc.write(buffer);System.out.println("实际写入:" + writeSize);if (buffer.hasRemaining()) {scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);scKey.attach(buffer);}}if(key.isWritable()){SocketChannel sc = (SocketChannel)key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();int writeSize = sc.write(buffer);System.out.println("实际写入:" + writeSize);//清理附件if(!buffer.hasRemaining()){key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}
}
//客户端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));while (true){ByteBuffer buffer = ByteBuffer.allocate(1024);sc.read(buffer);buffer.flip();System.out.println(StandardCharsets.UTF_8.decode(buffer));buffer.clear();
}
1.4.3 优化
//基于多线程实现服务器
public static void main(String[] args) throws IOException {//指定主线程名称Thread.currentThread().setName("BOSS"); //负责处理客户端连接事件ServerSocketChannel ssc = ServerSocketChannel.open(); //创建服务器连接ssc.configureBlocking(false); //设置为非阻塞模式Selector boss = Selector.open(); //声明选择器SelectionKey bossKey = ssc.register(boss, 0, null); //注册选择器bossKey.interestOps(SelectionKey.OP_ACCEPT); //关注连接事件ssc.bind(new InetSocketAddress(8080)); //绑定端口//创建多个Worker,并初始化Worker[] workers = new Worker[5];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-"+i);}AtomicInteger index = new AtomicInteger();while (true){boss.select();Iterator<SelectionKey> keys = boss.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next(); //获取事件的keykeys.remove(); //删除事件,防止重复处理if(key.isAcceptable()){SocketChannel sc = ssc.accept();log.info("创建连接{}",sc.getRemoteAddress());sc.configureBlocking(false); //设置为非阻塞模式//关联workers[index.getAndIncrement() % workers.length].register(sc); //worker初始化}}}
}static class Worker implements Runnable{private String name; //线程名称private Selector selector; //Selectorprivate Thread thread; //工作线程private volatile boolean start = false; //是否已创建线程及Selectorpublic Worker(String name) {this.name = name;}public void register(SocketChannel sc) throws IOException {if (!start) {selector = Selector.open();thread = new Thread(this,name);thread.start(); //启动线程start = true;}selector.wakeup(); //唤醒sc.register(selector,SelectionKey.OP_READ,null);}//执行读写事件@Overridepublic void run() {while (true){try {selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);sc.read(buffer);buffer.flip();log.info("内容" + StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}} catch (IOException e) {log.error("处理读写事件异常:{}",e.getMessage());}}}
}
1.4.3 UDP
UDP是无连接的,客户端发送数据不会管服务器是否启动,服务器的receive方法会将接收的数据存入bytebuffer,但是如果数据报文超过buffer大小,多出来的数据会被抛弃。
- 服务器的实现
try (DatagramChannel dc = DatagramChannel.open()) {dc.socket().bind(new InetSocketAddress(8080));dc.configureBlocking(false);Selector selector = Selector.open();SelectionKey dcKey = dc.register(selector, 0, null);dcKey.interestOps(SelectionKey.OP_READ);while (true){selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();if(key.isReadable()){ByteBuffer buffer = ByteBuffer.allocate(1024);dc.receive(buffer);buffer.flip();System.out.println(StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}}
} catch (IOException e) {System.out.println("服务器异常:" + e.getMessage());
}
- 客户端实现
while (true){try (DatagramChannel dc = DatagramChannel.open()) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好,李焕英");InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);dc.send(buffer,address);} catch (Exception e) {System.out.println("客户端异常:" + e.getMessage());}Thread.sleep(1000);
}