Netty学习之路一(大文件传输案例分析)

news/2024/11/18 14:32:18/

业务场景: 由于工作需要,需要在两台服务器的java服务之间通过netty建立链接,将大文件(几百G到TB级别)从机器A上的serverA发送到机器B上的serverB。

实现方法设计:

  1. 系统现有的实现方法:将业务方存储在服务器上的文件,在传输之前,对文件进行分片,以定义的规则将文件分为大小20MB的分片存储在服务器中。同步时以异步的方式同步分片,当然A服务器上的文件同步到B服务器时也是以分片的形式存储,此时需要在B服务器上按照规则将文件进行组装。 【优点:传输时使用异步方式并发同步,加快了同步效率。缺点:文件上传时将文件分片/同步完成后需要将分片进行组装 当文件过大时,这两个步骤需要消耗大量的时间(单线程模式下一个8G左右的文件采用NIO方式拷贝分片大概需要1分16秒)】
  2. 计划整改系统现有实现方式,省略文件上传时的拷贝分片与文件下载时的组合拷贝,不对文件进行分片,一个文件为一整个实体,利用netty建立长连接,单线程模式进行传输。【优点:省略了上传准备和下载准备的两次拷贝。缺点:传输时采用单线程模式传输,会使文件在传输上的时间消耗变大。】
  3. 还在设计与思考中。。。。

方案二 的调研DEMO:

  1. pom.xml 的依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version></dependency>
  1. 案例项目结构
    案例项目类结构

  2. 对服务端Service的分析

FileUploadServer.java

public class FileUploadServer {public static void main(String[] args) {int port = 8080; // 服务端的默认端口if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}new FileUploadServer().bind(port);}public void bind(int port) {EventLoopGroup boosGroup = new NioEventLoopGroup(); //服务端的管理线程EventLoopGroup workerGroup = new NioEventLoopGroup(); //服务端的工作线程//ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求ServerBootstrap bootstrap = new ServerBootstrap();  bootstrap.group(boosGroup, workerGroup)   //绑定管理线程和工作线程.channel(NioServerSocketChannel.class)   //ServerSocketChannelFactory 有两种选择,一种是NioServerSocketChannelFactory,一种是OioServerSocketChannelFactory。 .option(ChannelOption.SO_BACKLOG, 124)  //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadServerHandler()); // 自定义Handler}});try {ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync(); //保证了服务一直启动,相当于一个死循环} catch (InterruptedException e) {e.printStackTrace();} finally {boosGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

FileUploadServerHandler.java 自定义的Handler继承了ChannelInboundHandlerAdapter类

@ChannelHandler.Sharable
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter { //继承ChannelInboundHandlerAdapterprivate int byteRead;private volatile Long start = 0L;private String file_dir = "D:\\new_start\\file\\target";@Override   //当前channel从远端读取到数据时执行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) {FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes();byteRead = filePacket.getEndPos();String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5;File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");randomAccessFile.seek(start);randomAccessFile.write(bytes);start = start + byteRead;if (byteRead > 0) {ctx.writeAndFlush(start);} else {randomAccessFile.close();}}}@Override  //ChannelHandler回调方法出现异常时被回调public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

ChannelHandler用于处理Channel对应的事件。ChannelHandler接口中定义了如下三个方法

  • void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    在当前ChannelHander加入ChannelHandlerContext中时被回调
  • void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    在当前ChannelHander从ChannelHandlerContext中移除时被回调
  • void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    在当前ChannelHandler回调方法出现异常时被回调

程序开发过程中主要实现ChannelHandler的子接口ChannelInboundHandler和ChannelOutboundHandler。
框架提供了ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler这三个适配类,继承适配器类,并实现你需要的方法。

ChannelInboundHandler中的回调方法于触发时间:(参考博文:https://www.jianshu.com/p/96a50869b527)

  • channelRegistered 当前channel注册到EventLoop时触发
  • channelUnregistered 当前channel从EventLoop取消注册时触发
  • channelActive 当前channel激活的时候的时候触发
  • channelInactive 当前channel不活跃的时候,生命周期末时触发
  • channelRead 当前channel从远端读取到数据时触发(最常用)
  • channelReadComplete 当前channel read消费完读取的数据的时候被触发
  • userEventTriggered 用户事件触发的时候
  • channelWritabilityChanged channel的写状态变化的时候触发

解析重写的channelRead方法

  @Override   //当前channel从远端读取到数据时执行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) {   //服务端于客户端共同定义好的发送对象FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes();   //文件的字节数组内容byteRead = filePacket.getEndPos();   //此次接收客户端发送的字节长度String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5;  //文件存储的位置File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");  //使用RandomAccessFile读取文件  rw权限读写randomAccessFile.seek(start);  //设置此次写操作,文件的起始偏移量 randomAccessFile.write(bytes);  //将接收到的字节写入到文件start = start + byteRead;if (byteRead > 0) { //文件未结束ctx.writeAndFlush(start);  //将下次(即 未读) 的文件起始位置返回给客户端} else {randomAccessFile.close();   //接收完毕 关闭文件  (存在问题,可能会存在文件一直未关的情况)}}}
  1. 对客户端Client的分析

FileUploadClient.java

public class FileUploadClient {public static void main(String[] args) {int port = 8080;if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}FilePacket filePacket = new FilePacket();File file = new File("D:\\new_start\\file\\origin\\nohup.out");String fileMd5 = file.getName();filePacket.setFile(file);filePacket.setFile_md5(fileMd5);filePacket.setStartPos(0);     //要传输的文件的初始信息new FileUploadClient().connect("127.0.0.1", port, filePacket);}public void connect(String host, int port, final FilePacket filePacket) {EventLoopGroup group = new NioEventLoopGroup();  //只需要一个线程组,和服务端有所不同Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadClientHandler(filePacket));  //自定义的handler}});ChannelFuture future = null;try {future = bootstrap.connect(host, port).sync();   //使得链接保持future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}

FileUploadClientHandler.java 用户自定义的客户端Handler 继承了ChannelInboundHandlerAdapter

@ChannelHandler.Sharable
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {private int byteRead;private volatile Long start = 0l;   //使用Long 当传输的文件大于2G时,Integer类型会不够表达文件的长度private volatile int lastLength = 0;public RandomAccessFile randomAccessFile;private FilePacket filePacket;//构造器,FilePacket作为参数public FileUploadClientHandler(FilePacket filePacket) {if (filePacket.getFile().exists()) {if (!filePacket.getFile().isFile()) {System.out.println("Not a file:" + filePacket.getFile());}}this.filePacket = filePacket;}@Override    //当前channel激活的时候的时候触发  优先于channelRead方法执行  (我的理解,只执行一次)public void channelActive(ChannelHandlerContext ctx) throws Exception {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(filePacket.getStartPos());lastLength = Integer.MAX_VALUE / 4 ;   //每次发送的文件块数的长度byte[] bytes = new byte[lastLength];    if ((byteRead = randomAccessFile.read(bytes)) != -1) {filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else {System.out.println("文件已读完");}}@Override  //当前channel从远端读取到数据时触发public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof Long) {    //客户端发送FilePacket 到服务端,服务端处理完文件当前部分的数据,返回下次文件段的起始位置start = (Long) msg;if (start != -1) {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(start);  //将服务端返回的数据设置此次读操作,文件的起始偏移量System.out.println("文件长度:" + (randomAccessFile.length()));System.out.println("此次分片开始位置:" + start);System.out.println("块儿长度:" + (Integer.MAX_VALUE / 4));System.out.println("剩余长度:" + (randomAccessFile.length()-start));Long a = randomAccessFile.length() - start;int lastLength = (int) (Integer.MAX_VALUE / 4);if (a < lastLength) {System.out.println("最后一片长度:"+a);lastLength = a.intValue();}byte[] bytes = new byte[lastLength];//这个判断关闭判断调整存在漏洞if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length()-start) > 0) {System.out.println("byte长度:" + bytes.length);System.out.println("-----------------" + bytes.length);filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else {  randomAccessFile.close();   ctx.close();System.out.println("文件已读完------" + byteRead);}}}}@Override  //在当前ChannelHandler回调方法出现异常时被回调public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
  1. 文件传递模型
    FilePacket.java
@Data
public class FilePacket extends Packet implements Serializable {private File file;private String file_md5;private int startPos;private byte[] bytes;private int endPos;public File getFile() {return file;}public void setFile(File file) {this.file = file;}public String getFile_md5() {return file_md5;}public void setFile_md5(String file_md5) {this.file_md5 = file_md5;}public int getStartPos() {return startPos;}public void setStartPos(int startPos) {this.startPos = startPos;}public byte[] getBytes() {return bytes;}public void setBytes(byte[] bytes) {this.bytes = bytes;}public int getEndPos() {return endPos;}public void setEndPos(int endPos) {this.endPos = endPos;}@Overridepublic Byte getCommand() {return FILE;}}

Packet.java

@Data
public abstract class Packet {private Byte type;public abstract Byte getCommand();}

Connand.java

public interface Command {Byte FILE = 1;}

Demo运行效果
如下图为8G文件在两个netty服务之间传输的结果,文件分段传输,需要等待传输完成后再接着传输下一段,时间上其实相当于阻塞,考虑到效率其实不建议采用。
运行结果

总结:
进期的工作项目定位是一个文件的传输系统,并不会对传输的文件做存储,只是一个传输的桥梁。网络编程要保证文件的可靠性,又要确保传输的效率。此案例实现了在不对文件进行分片存储的传输。项目开发中,应该会实现两套方案。方案一,修改上传规则;方案二,修改同步规则。
曾使用MappedByteBuffer通过多线程的模式对文件进行组装,案例未通过测试。(组装好的文件有损坏,或长度于源文件不同的情况,且测试结果不稳定。)
测试代码如下,希望看见的同胞可对测试代码进行指点。
TestMappedByteBufferDownloadWithThread.java类

public class TestMappedByteBufferDownloadWithThread {public static void main(String[] args) throws IOException {long chunkCount =32646;String chunkBasePath = "D:\\new_start\\file\\523";File dir = new File(chunkBasePath);File[] files = dir.listFiles();for(File f : files){FileThread thread = new FileThread(f , chunkCount);  //线程Thread类thread.start();}}
}

FileThread.java 类

public class FileThread extends Thread {private File file ;private Long chunkCount ;public FileThread(File file, Long chunkCount) {this.file = file;this.chunkCount = chunkCount;}@Overridepublic void run() {try{String targetPath = "D:\\new_start\\file\\HTTP协议详细介绍_8.pdf";File downloadFile = new File(targetPath);if(!downloadFile.exists()){downloadFile.createNewFile();}RandomAccessFile randFile = new RandomAccessFile(downloadFile,"rw");FileChannel outChannel = randFile.getChannel();String[] split = file.getName().split("_");long position = Integer.parseInt(split[1]) * chunkCount;FileChannel inChannel = new FileInputStream(file).getChannel();if(file.getName().equals("chunk_100")){System.out.println("file.getName():"+file.getName()+"-------file.length():"+file.length());}MappedByteBuffer mbb = outChannel.map(FileChannel.MapMode.READ_WRITE, position, file.length());if(file.getName().equals("chunk_100")){System.out.println("mbb.getLong():"+mbb.getLong()+"-------file.length():"+file.length());}inChannel.read(mbb);mbb.flip();outChannel.write(mbb);inChannel.close();outChannel.close();}catch (Exception e ){e.printStackTrace();}}}

本案例原型来自GitHub,链接: https://github.com/zhangji-hhu/BigFileTransfer.git 作者:zhangji-hhu 侵删。
参考文章:https://www.jianshu.com/p/96a50869b527 作者:土豆肉丝盖浇饭


http://www.ppmy.cn/news/546766.html

相关文章

大文件传输(gofastdfs)

简介&#xff1a; 1、使用的是go-fast&#xff0c;支持大文件传输。 2、参考的资料&#xff1a;https://gitee.com/sjqzhang/go-fastdfs/blob/master/README-en.md 3、下载fileserver.exe 4、找到“断点续传示例”,点击“更多客户端请参考”&#xff0c;下载“tus-java-client”…

如何进行大文件传输?

本文首发微信公众号&#xff1a;码上观世界 网络文件传输的应用场景很多&#xff0c;如网络聊天的点对点传输、文件同步网盘的上传与下载、文件上传到分布式文件存储器等&#xff0c;其传输速度主要受限于网络带宽、存储器大小、CPU处理速度以及磁盘读写速度&#xff0c;尤其是…

完美解码终于出新版本了

新版本叫做完美者解码。。名字好山寨。一步到位的播放软件。

固定完美解码播放器尺寸

右键“播放 ”“播放设置”“播放窗口尺寸 自己调或者选”勾选“仅在播放视频时调整”

【经验分享】为什么视频画面解码失败之后显示的是绿幕?

项目场景&#xff1a; 项目场景&#xff1a;我们在处理视频数据解码时&#xff0c;经常会遇到解码失败出现绿幕的情况&#xff0c;这个时候一般我们会去检查解码端的程序代码。 问题描述 在出现问题的时候&#xff0c;是否有考虑过以下的问题&#xff1a; “为什么是绿幕&a…

php让视频自动全屏播放,完美解码怎么设置打开视频文件就全屏

想不想在电脑上实现打开视频文件就直接全屏播放&#xff0c;一般播放器做不到吧&#xff0c;完美解码这个万能的视频播放器就能实现&#xff0c;下面就教你如何设置。 软件名称&#xff1a;完美解码(PureCodec) 全能型影音解码包20200922软件大小&#xff1a;104MB更新时间&…

关于kgm文件的解码

寻找可以完成 kgm to mp3的解码源代码文件 这里的 kgm 是酷狗的音乐格式&#xff1f;现在酷狗没有开放解密算法 kgm 网传这其实就是mp3&#xff0c;不过是被酷狗加密过的&#xff0c;加密算法未知。 网传可以调用酷狗自己的库进行转换 https://download.csdn.net/download/we…