从sftp下载大文件到浏览器
- 问题
- 方案
- 相关依赖包
- 相关代码片段(后端)
- 文件信息缓存工具类-FileChunkCache
- 文件信息对象-FileDetail
- sftp传输进度监控-FileProgressMonitor
- 切片工具类-ChunkService
- 文件下载服务-AsyncDownloadService
问题
近期遇到直接使用sftp下载文件到前端,前端下载一部分后就会卡住,分析可能是response缓存原因,是故采取切片分式下载到前端,即分多个请求去下载大文件,最终在前端对多个切片进行合并大文件。
方案
查询资料,没有了解到jsch sftp有相关切片下载知识,所以考虑先将文件下载到后端(使用线程异步下载),再下载到前端,具体设计思路如下:
相关依赖包
<dependency><groupId>com.jcraft</groupId><artifactId>jsch</artifactId><version>0.1.55</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version></dependency>
相关代码片段(后端)
文件信息缓存工具类-FileChunkCache
用于缓存下载文件的信息
public class FileChunkCache {private FileChunkCache() {}/*** 缓存下载文件对象信息(3小时)*/private static final TimedCache<String, FileDetail> TIMED_CACHE;static {// 缓存下载文件对象信息(3小时)TIMED_CACHE = new TimedCache<>(1000 * 60 * 60 * 3L);}public static FileDetail get(String fileKey) {return TIMED_CACHE.get(fileKey, false);}public static void put(String fileKey, FileDetail fileDetail) {TIMED_CACHE.put(fileKey, fileDetail);}public static void remove(String fileKey) {TIMED_CACHE.remove(fileKey);}public static void clear() {TIMED_CACHE.clear();}public static boolean containsKey(String fileKey) {return TIMED_CACHE.containsKey(fileKey);}public static int size() {return TIMED_CACHE.size();}
}
文件信息对象-FileDetail
记录文件下载状态、分片信息、连接通道
public class FileDetail {/*** 文件名*/private String fileName;/*** 文件路径(本地保存路径)*/private String filePath;/*** 文件大小*/private long fileSize;/*** 分片大小*/private long chunkSize;/*** 分片数量*/private long chunkNum;/*** 文件对象key标识*/private String fileKey;/*** 连接session*/@JsonIgnoreprivate Session session;/*** sftp通道*/@JsonIgnoreprivate ChannelSftp channelSftp;/*** 下载状态(枚举类,自行定义)*/private DownloadStatus downloadStatus = DownloadStatus.NOT_DOWNLOADED;public FileDetail() {}public void connectSession() throws JSchException {this.session.connect();}public void connectChannelSftp() throws JSchException {this.channelSftp.connect();}public void closeConnect() {if (this.session != null) {this.session.disconnect();}if (this.channelSftp != null) {this.channelSftp.disconnect();}}public void setDownloadStatus(DownloadStatus downloadStatus) {this.downloadStatus = downloadStatus;FileChunkCache.put(this.fileKey, this);}
}
sftp传输进度监控-FileProgressMonitor
实时sftp监控下载进度
@Slf4j
public class FileProgressMonitor implements SftpProgressMonitor {/*** 默认间隔时间为2秒*/private static final long PROGRESS_INTERVAL = 2000;/*** 记录传输是否结束*/private boolean isEnd = false;/*** 记录已传输的数据总大小*/private long transfer = 0;/*** 记录文件总大小*/private final long fileSize;/*** 记录文件路径*/private final String filePath;/*** 记录文件信息*/private final FileDetail fileDetail;/*** 定时器对象*/private ScheduledExecutorService scheduledExecutorService;/*** 记录是否已启动记时器*/private boolean isScheduled = false;private Date startTime;private Date endTime;/*** 构造方法中初始化文件大小*/public FileProgressMonitor(long fileSize, String filePath, FileDetail fileDetail) {this.fileSize = fileSize;this.filePath = filePath;this.fileDetail = fileDetail;}/*** 输出当前传输进度信息*/public void outCurrentDetails() {// 判断传输是否已结束if (!isEnd()) {log.info("文件:{} 传输中...", filePath);long transmissionSize = getTransfer();if (transmissionSize != fileSize) {// 判断当前已传输数据大小是否等于文件总大小log.info("当前传输:{} bytes", transmissionSize);sendProgressMessage(transmissionSize);} else {log.info("文件已传输完成。");// 如果当前已传输数据大小等于文件总大小,说明已完成,设置endsetEnd(true);}} else {log.info("文件:【{}】传输完成。关闭进度监视器", filePath);stop();}}/*** 启动监视器*/public void start() {log.info("尝试启动进度监视器。");if (scheduledExecutorService == null) {scheduledExecutorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("sftp-schedule-pool-%d").daemon(true).build());}scheduledExecutorService.scheduleAtFixedRate(this::outCurrentDetails, 1000, PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);isScheduled = true;log.info("进度监视器启动。");}/*** 关闭监视器*/public void stop() {log.info("尝试停止进度监视器。");if (scheduledExecutorService != null) {scheduledExecutorService.shutdownNow();scheduledExecutorService = null;isScheduled = false;}log.info("进度监视器停止了。");}/*** 输出进度条信息** @param transmissionSize 当前已传输数据大小*/private void sendProgressMessage(long transmissionSize) {if (fileSize != 0) {double d = ((double) transmissionSize * 100) / (double) fileSize;DecimalFormat df = new DecimalFormat("#.##");log.info("文件【{}】已传输进度:{}", filePath, df.format(d) + "%");} else {log.info("进度消息,文件:{},文件总大小:{},当前传输大小:{}", filePath, fileSize, transmissionSize);}}/*** 记录已传输数据大小** @param count 当次传输数据大小*/private synchronized void add(long count) {transfer = transfer + count;}/*** 获取当前已传输数据大小** @return 当前已传输数据大小*/private synchronized long getTransfer() {return transfer;}/*** 设置传输是否结束** @param isEnd 是否结束*/private synchronized void setEnd(boolean isEnd) {this.isEnd = isEnd;}/*** 判断传输是否结束** @return 是否结束*/private synchronized boolean isEnd() {return isEnd;}@Overridepublic void init(int op, String src, String dest, long max) {log.info("开始传输文件:{},文件大小:{}", filePath, max);this.startTime = new Date();this.fileDetail.setDownloadStatus(DownloadStatus.BE_DOWNLOADING);}/*** 实现SftpProgressMonitor接口的count方法*/@Overridepublic boolean count(long count) {if (isEnd()) {return false;}if (!isScheduled) {start();}add(count);return true;}/*** 实现了SftpProgressMonitor接口的end方法*/@Overridepublic void end() {this.endTime = new Date();setEnd(true);stop();// 计算耗时long time = endTime.getTime() - startTime.getTime();this.fileDetail.setDownloadStatus(DownloadStatus.DOWNLOAD_SUCCESS);log.info("文件:{},传输结束,耗时:{}ms", filePath, time);}
}
切片工具类-ChunkService
对本地文件做切片处理
@Service
@Slf4j
public class ChunkService {/*** 获取分片数据(从正在下载的文件中返回)** @param fileKey 文件key* @param chunkSize 分片大小* @param resultFileName 文件名* @param offset 分片偏移量* @return 分片数据*/public byte[] getChunkOnDownloadFile(String fileKey, Integer chunkSize, String resultFileName, long offset, HttpServletResponse response) {File file = new File(resultFileName);// 重试次数int retryCount = 0;// 当前文件大小long currentFileSize = file.length();// 当前文件大小是否达到分片大小while (offset + chunkSize > currentFileSize) {// 重试次数大于100次则退出if (retryCount > 120) {throw new RuntimeException("重试达到最大值");}FileDetail fileDetail = FileChunkCache.get(fileKey);if (DownloadStatus.DOWNLOAD_FAILED.equals(fileDetail.getDownloadStatus())) {throw new RuntimeException("当前文件sftp下载失败");}// 休眠1秒后再次获取try {log.info("文件大小未达到分片大小,休眠1秒后再次获取,分片大小:{},偏移量:{},文件大小:{},当前文件下载状态:{}", chunkSize, offset, currentFileSize, fileDetail.getDownloadStatus().getLabel());Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("线程休眠异常", e);}retryCount++;// 重新获取文件大小file = new File(resultFileName);currentFileSize = file.length();}return getChunk(chunkSize, resultFileName, offset, response);}/*** 获取分片数据** @param chunkSize 分片大小* @param resultFileName 文件名* @param offset 分片偏移量* @return 分片数据*/public byte[] getChunk(Integer chunkSize, String resultFileName, long offset, HttpServletResponse response) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(resultFileName, "r")) {// 定位到该分片的偏移量randomAccessFile.seek(offset);//读取byte[] buffer = new byte[chunkSize];randomAccessFile.read(buffer);return buffer;} catch (IOException e) {throw new RuntimeException("读取文件分片数据失败", e);}}}
文件下载服务-AsyncDownloadService
@Slf4j
@Service
public class AsyncDownloadService {// 本地下载目录private String localFilePath = "/data/test/";// 文件分片大小private long fileChunkSize=1024*1024*100L;@Autowiredprivate ChunkService chunkService;// 线程池private static final ThreadFactory FACTORY = new ThreadFactoryBuilder().setNameFormat("sftp-pool-%d").build();/*** 任务执行线程池*/private static final ExecutorService POOL = new ThreadPoolExecutor(1, 10, 1L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1),FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());public String createFileKey() {// 创建唯一标识随机字符串String s = RandomUtil.randomString(6);// 当前时间String dateTime = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");// 文件唯一标识(时间戳+随机字符串 保证唯一性)return dateTime + s;}/*** 异步从sftp下载文件** @param fileName 文件名* @param remoteDir 远程文件路径* @param sftpHost sftp主机* @param sftpUser sftp用户名* @param sftpPassword sftp密码* @param port sftp端口* @return 文件对象*/public FileDetail downloadInSftp(String fileName,String remoteDir,String sftpHost,String sftpUser,String sftpPassword,int port) {FileDetail fileDetail = new FileDetail();fileDetail.setFileName(fileName);// 远程文件路径String remoteFile = remoteDir + fileName;// 获取文件名尾缀String suffix = FileUtil.getSuffix(fileName);// 获取文件名String fileNamePrefix = FileUtil.getPrefix(fileName);String fileKey = createFileKey();fileDetail.setFileKey(fileKey);suffix = StringUtils.isBlank(suffix) ? "" : "." + suffix;// 文件名重构String fileNameRebuild = fileNamePrefix + "_" + fileKey + suffix;String localFile = FilePathUtil.normalizePath(localFilePath + DateUtil.format(new Date(), "yyyyMMdd") + File.separator + fileNameRebuild);fileDetail.setFilePath(localFile);// 父目录创建File file = new File(localFile);if (!file.getParentFile().exists()) {FileUtil.mkdir(file.getParentFile());}try {JSch jsch = new JSch();Session session = null;session = jsch.getSession(sftpUser, sftpHost, port);session.setPassword(sftpPassword);session.setConfig("StrictHostKeyChecking", "no");fileDetail.setSession(session);fileDetail.connectSession();ChannelSftp channelSftp = (ChannelSftp) session.openChannel("sftp");fileDetail.setChannelSftp(channelSftp);fileDetail.connectChannelSftp();SftpATTRS attrs = channelSftp.stat(remoteFile);// 文件总大小long fileSize = attrs.getSize();fileDetail.setFileSize(fileSize);// 使用线程池下载文件downloadFile(fileDetail, remoteFile, localFile, fileSize);} catch (Exception e) {log.error("sftp连接失败", e);throw new RuntimeException("sftp连接异常", e);}// 计算分片信息setChunk(fileDetail);FileChunkCache.put(fileKey, fileDetail);return fileDetail;}/*** 线程内部下载文件** @param fileDetail 文件对象* @param remoteFile 远程文件路径* @param localFile 本地文件路径* @param fileSize 文件大小*/public void downloadFile(FileDetail fileDetail, String remoteFile, String localFile, long fileSize) {POOL.execute(() -> {try (// 本地文件输出流FileOutputStream outputStream = new FileOutputStream(localFile)) {// 下载文件到输出流fileDetail.getChannelSftp().get(remoteFile, outputStream, new FileProgressMonitor(fileSize, localFile, fileDetail), ChannelSftp.OVERWRITE, 0);} catch (Exception e) {fileDetail.setDownloadStatus(DownloadStatus.DOWNLOAD_FAILED);log.error("下载文件失败", e);} finally {// 关闭连接fileDetail.closeConnect();}});}/*** 设置分片信息** @param fileDetail 文件对象*/public void setChunk(FileDetail fileDetail) {long fileSize = fileDetail.getFileSize();fileDetail.setChunkSize(fileChunkSize);long fragmentNum = new BigDecimal(fileSize).divide(new BigDecimal(fileDetail.getChunkSize()), 0, RoundingMode.UP).longValue();fileDetail.setChunkNum(fragmentNum);}/*** 分片下载** @param fileKey 文件唯一标识* @param index 分片索引*/public void fragmentDownload(String fileKey, Integer index, HttpServletResponse response) {FileDetail fileDetail = FileChunkCache.get(fileKey);if (fileDetail == null) {throw new RuntimeException("未存在下载文件信息");}long fileSize = fileDetail.getFileSize();// 分片大小long chunkSize = fileDetail.getChunkSize();// 分片偏移量long offset = index * chunkSize;long chunkNum = fileDetail.getChunkNum();// 最后一片分片,重新计算分片大小if (index == chunkNum - 1) {chunkSize = fileSize - offset;}log.info("下载分片数据,文件:{},文件总大小:{},当前分片索引:{},本次分片大小:{}", fileDetail, fileSize, index, chunkSize);byte[] chunk = chunkService.getChunkOnDownloadFile(fileKey, (int) chunkSize, fileDetail.getFilePath(), offset, response);// 设置响应头try {String fileName = URLEncoder.encode(fileDetail.getFileName(), "UTF-8");response.addHeader("Content-Disposition", "attachment;filename=" + fileName);response.addHeader("Content-Length", "" + (chunk.length));response.setContentType("application/octet-stream");// 写出数据ServletOutputStream outputStream = response.getOutputStream();outputStream.write(chunk);outputStream.flush();outputStream.close();} catch (Exception e) {response.setStatus(HttpResponseStatus.REQUEST_EXCEPTION.getCode());throw new RuntimeException("写回分片信息异常", e);}}public void downloadComplete(String fileKey) {FileDetail fileDetail = FileChunkCache.get(fileKey);if (fileDetail == null) {throw new RuntimeException("未存在下载文件信息");}// 删除缓存FileChunkCache.remove(fileKey);// 删除本地文件FileUtil.del(fileDetail.getFilePath());}
}