业务场景: 由于工作需要,需要在两台服务器的java服务之间通过netty建立链接,将大文件(几百G到TB级别)从机器A上的serverA发送到机器B上的serverB。
实现方法设计:
- 系统现有的实现方法:将业务方存储在服务器上的文件,在传输之前,对文件进行分片,以定义的规则将文件分为大小20MB的分片存储在服务器中。同步时以异步的方式同步分片,当然A服务器上的文件同步到B服务器时也是以分片的形式存储,此时需要在B服务器上按照规则将文件进行组装。 【优点:传输时使用异步方式并发同步,加快了同步效率。缺点:文件上传时将文件分片/同步完成后需要将分片进行组装 当文件过大时,这两个步骤需要消耗大量的时间(单线程模式下一个8G左右的文件采用NIO方式拷贝分片大概需要1分16秒)】
- 计划整改系统现有实现方式,省略文件上传时的拷贝分片与文件下载时的组合拷贝,不对文件进行分片,一个文件为一整个实体,利用netty建立长连接,单线程模式进行传输。【优点:省略了上传准备和下载准备的两次拷贝。缺点:传输时采用单线程模式传输,会使文件在传输上的时间消耗变大。】
- 还在设计与思考中。。。。
方案二 的调研DEMO:
- pom.xml 的依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version></dependency>
-
案例项目结构
-
对服务端Service的分析
FileUploadServer.java
public class FileUploadServer {public static void main(String[] args) {int port = 8080; // 服务端的默认端口if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}new FileUploadServer().bind(port);}public void bind(int port) {EventLoopGroup boosGroup = new NioEventLoopGroup(); //服务端的管理线程EventLoopGroup workerGroup = new NioEventLoopGroup(); //服务端的工作线程//ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boosGroup, workerGroup) //绑定管理线程和工作线程.channel(NioServerSocketChannel.class) //ServerSocketChannelFactory 有两种选择,一种是NioServerSocketChannelFactory,一种是OioServerSocketChannelFactory。 .option(ChannelOption.SO_BACKLOG, 124) //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadServerHandler()); // 自定义Handler}});try {ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync(); //保证了服务一直启动,相当于一个死循环} catch (InterruptedException e) {e.printStackTrace();} finally {boosGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
FileUploadServerHandler.java 自定义的Handler继承了ChannelInboundHandlerAdapter类
@ChannelHandler.Sharable
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter { //继承ChannelInboundHandlerAdapterprivate int byteRead;private volatile Long start = 0L;private String file_dir = "D:\\new_start\\file\\target";@Override //当前channel从远端读取到数据时执行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) {FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes();byteRead = filePacket.getEndPos();String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5;File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");randomAccessFile.seek(start);randomAccessFile.write(bytes);start = start + byteRead;if (byteRead > 0) {ctx.writeAndFlush(start);} else {randomAccessFile.close();}}}@Override //ChannelHandler回调方法出现异常时被回调public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
ChannelHandler用于处理Channel对应的事件。ChannelHandler接口中定义了如下三个方法
- void handlerAdded(ChannelHandlerContext ctx) throws Exception;
在当前ChannelHander加入ChannelHandlerContext中时被回调 - void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
在当前ChannelHander从ChannelHandlerContext中移除时被回调 - void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
在当前ChannelHandler回调方法出现异常时被回调
程序开发过程中主要实现ChannelHandler的子接口ChannelInboundHandler和ChannelOutboundHandler。
框架提供了ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler这三个适配类,继承适配器类,并实现你需要的方法。
ChannelInboundHandler中的回调方法于触发时间:(参考博文:https://www.jianshu.com/p/96a50869b527)
- channelRegistered 当前channel注册到EventLoop时触发
- channelUnregistered 当前channel从EventLoop取消注册时触发
- channelActive 当前channel激活的时候的时候触发
- channelInactive 当前channel不活跃的时候,生命周期末时触发
- channelRead 当前channel从远端读取到数据时触发(最常用)
- channelReadComplete 当前channel read消费完读取的数据的时候被触发
- userEventTriggered 用户事件触发的时候
- channelWritabilityChanged channel的写状态变化的时候触发
解析重写的channelRead方法
@Override //当前channel从远端读取到数据时执行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) { //服务端于客户端共同定义好的发送对象FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes(); //文件的字节数组内容byteRead = filePacket.getEndPos(); //此次接收客户端发送的字节长度String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5; //文件存储的位置File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); //使用RandomAccessFile读取文件 rw权限读写randomAccessFile.seek(start); //设置此次写操作,文件的起始偏移量 randomAccessFile.write(bytes); //将接收到的字节写入到文件start = start + byteRead;if (byteRead > 0) { //文件未结束ctx.writeAndFlush(start); //将下次(即 未读) 的文件起始位置返回给客户端} else {randomAccessFile.close(); //接收完毕 关闭文件 (存在问题,可能会存在文件一直未关的情况)}}}
- 对客户端Client的分析
FileUploadClient.java
public class FileUploadClient {public static void main(String[] args) {int port = 8080;if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}FilePacket filePacket = new FilePacket();File file = new File("D:\\new_start\\file\\origin\\nohup.out");String fileMd5 = file.getName();filePacket.setFile(file);filePacket.setFile_md5(fileMd5);filePacket.setStartPos(0); //要传输的文件的初始信息new FileUploadClient().connect("127.0.0.1", port, filePacket);}public void connect(String host, int port, final FilePacket filePacket) {EventLoopGroup group = new NioEventLoopGroup(); //只需要一个线程组,和服务端有所不同Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadClientHandler(filePacket)); //自定义的handler}});ChannelFuture future = null;try {future = bootstrap.connect(host, port).sync(); //使得链接保持future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}
FileUploadClientHandler.java 用户自定义的客户端Handler 继承了ChannelInboundHandlerAdapter
@ChannelHandler.Sharable
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {private int byteRead;private volatile Long start = 0l; //使用Long 当传输的文件大于2G时,Integer类型会不够表达文件的长度private volatile int lastLength = 0;public RandomAccessFile randomAccessFile;private FilePacket filePacket;//构造器,FilePacket作为参数public FileUploadClientHandler(FilePacket filePacket) {if (filePacket.getFile().exists()) {if (!filePacket.getFile().isFile()) {System.out.println("Not a file:" + filePacket.getFile());}}this.filePacket = filePacket;}@Override //当前channel激活的时候的时候触发 优先于channelRead方法执行 (我的理解,只执行一次)public void channelActive(ChannelHandlerContext ctx) throws Exception {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(filePacket.getStartPos());lastLength = Integer.MAX_VALUE / 4 ; //每次发送的文件块数的长度byte[] bytes = new byte[lastLength]; if ((byteRead = randomAccessFile.read(bytes)) != -1) {filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else {System.out.println("文件已读完");}}@Override //当前channel从远端读取到数据时触发public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof Long) { //客户端发送FilePacket 到服务端,服务端处理完文件当前部分的数据,返回下次文件段的起始位置start = (Long) msg;if (start != -1) {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(start); //将服务端返回的数据设置此次读操作,文件的起始偏移量System.out.println("文件长度:" + (randomAccessFile.length()));System.out.println("此次分片开始位置:" + start);System.out.println("块儿长度:" + (Integer.MAX_VALUE / 4));System.out.println("剩余长度:" + (randomAccessFile.length()-start));Long a = randomAccessFile.length() - start;int lastLength = (int) (Integer.MAX_VALUE / 4);if (a < lastLength) {System.out.println("最后一片长度:"+a);lastLength = a.intValue();}byte[] bytes = new byte[lastLength];//这个判断关闭判断调整存在漏洞if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length()-start) > 0) {System.out.println("byte长度:" + bytes.length);System.out.println("-----------------" + bytes.length);filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else { randomAccessFile.close(); ctx.close();System.out.println("文件已读完------" + byteRead);}}}}@Override //在当前ChannelHandler回调方法出现异常时被回调public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
- 文件传递模型
FilePacket.java
@Data
public class FilePacket extends Packet implements Serializable {private File file;private String file_md5;private int startPos;private byte[] bytes;private int endPos;public File getFile() {return file;}public void setFile(File file) {this.file = file;}public String getFile_md5() {return file_md5;}public void setFile_md5(String file_md5) {this.file_md5 = file_md5;}public int getStartPos() {return startPos;}public void setStartPos(int startPos) {this.startPos = startPos;}public byte[] getBytes() {return bytes;}public void setBytes(byte[] bytes) {this.bytes = bytes;}public int getEndPos() {return endPos;}public void setEndPos(int endPos) {this.endPos = endPos;}@Overridepublic Byte getCommand() {return FILE;}}
Packet.java
@Data
public abstract class Packet {private Byte type;public abstract Byte getCommand();}
Connand.java
public interface Command {Byte FILE = 1;}
Demo运行效果
如下图为8G文件在两个netty服务之间传输的结果,文件分段传输,需要等待传输完成后再接着传输下一段,时间上其实相当于阻塞,考虑到效率其实不建议采用。
总结:
进期的工作项目定位是一个文件的传输系统,并不会对传输的文件做存储,只是一个传输的桥梁。网络编程要保证文件的可靠性,又要确保传输的效率。此案例实现了在不对文件进行分片存储的传输。项目开发中,应该会实现两套方案。方案一,修改上传规则;方案二,修改同步规则。
曾使用MappedByteBuffer通过多线程的模式对文件进行组装,案例未通过测试。(组装好的文件有损坏,或长度于源文件不同的情况,且测试结果不稳定。)
测试代码如下,希望看见的同胞可对测试代码进行指点。
TestMappedByteBufferDownloadWithThread.java类
public class TestMappedByteBufferDownloadWithThread {public static void main(String[] args) throws IOException {long chunkCount =32646;String chunkBasePath = "D:\\new_start\\file\\523";File dir = new File(chunkBasePath);File[] files = dir.listFiles();for(File f : files){FileThread thread = new FileThread(f , chunkCount); //线程Thread类thread.start();}}
}
FileThread.java 类
public class FileThread extends Thread {private File file ;private Long chunkCount ;public FileThread(File file, Long chunkCount) {this.file = file;this.chunkCount = chunkCount;}@Overridepublic void run() {try{String targetPath = "D:\\new_start\\file\\HTTP协议详细介绍_8.pdf";File downloadFile = new File(targetPath);if(!downloadFile.exists()){downloadFile.createNewFile();}RandomAccessFile randFile = new RandomAccessFile(downloadFile,"rw");FileChannel outChannel = randFile.getChannel();String[] split = file.getName().split("_");long position = Integer.parseInt(split[1]) * chunkCount;FileChannel inChannel = new FileInputStream(file).getChannel();if(file.getName().equals("chunk_100")){System.out.println("file.getName():"+file.getName()+"-------file.length():"+file.length());}MappedByteBuffer mbb = outChannel.map(FileChannel.MapMode.READ_WRITE, position, file.length());if(file.getName().equals("chunk_100")){System.out.println("mbb.getLong():"+mbb.getLong()+"-------file.length():"+file.length());}inChannel.read(mbb);mbb.flip();outChannel.write(mbb);inChannel.close();outChannel.close();}catch (Exception e ){e.printStackTrace();}}}
本案例原型来自GitHub,链接: https://github.com/zhangji-hhu/BigFileTransfer.git 作者:zhangji-hhu 侵删。
参考文章:https://www.jianshu.com/p/96a50869b527 作者:土豆肉丝盖浇饭