[学成在线]06-视频分片上传

news/2025/3/25 20:54:56/

上传视频

需求分析

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件。

点击“媒资管理”

进入媒资管理列表页面查询本机构上传的媒资文件。

  1. 教育机构用户在"媒资管理"页面中点击 "上传视频" 按钮。

点击“上传视频”打开上传页面

  1. 选择要上传的文件,自动执行文件上传。

  1. 视频上传成功会自动处理,处理完成可以预览视频。

断点续传

概念介绍

需求背景

通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。

什么是断点续传

引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。

断点续传流程如下图:

流程如下:

1、前端上传前先把文件分成块

2、一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传

3、各分块上传完成最后在服务端合并文件

分块与合并原理

为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。

文件分块的流程如下:

1、获取源文件长度

2、根据设定的分块文件的大小计算出块数

3、从源文件读数据依次向每一个块文件写数据。

package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件处理测试* @date 2022/9/13 9:21*/public class BigFileTest {//测试文件分块方法@Testpublic void testChunk() throws IOException {File sourceFile = new File("C:\\Users\\Lenovo\\Desktop\\学成在线项目—视频\\day06\\Day6-01.上传视频-什么是断点续传.mp4"); // 原始文件String chunkPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\"; // 分块文件存储目录File chunkFolder = new File(chunkPath);if (!chunkFolder.exists()) {chunkFolder.mkdirs();}//分块大小long chunkSize = 1024 * 1024 * 1;//分块数量long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);System.out.println("分块总数:"+chunkNum);//缓冲区大小byte[] b = new byte[1024];//使用RandomAccessFile访问文件(变化流), "r"表示读取流, "rw"表示写入流RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");//分块for (int i = 0; i < chunkNum; i++) {//创建分块文件File file = new File(chunkPath + i);if(file.exists()){file.delete();}boolean newFile = file.createNewFile();if (newFile) {//向分块文件中写数据RandomAccessFile raf_write = new RandomAccessFile(file, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);if (file.length() >= chunkSize) {break;}}raf_write.close();System.out.println("完成分块"+i);}}raf_read.close();}
}

执行结果: 目标文件被分片的储存在文件夹中

]文件合并流程:

1、找到要合并的文件并按文件合并的先后进行排序。

2、创建合并文件

3、依次从合并的文件中读取数据向合并文件写入数

package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件处理测试* @date 2022/9/13 9:21*/public class BigFileTest {//测试文件合并方法@Testpublic void testMerge() throws IOException {//块文件目录File chunkFolder = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\");//原始文件File originalFile = new File("C:\\Users\\Lenovo\\Desktop\\学成在线项目—视频\\day06\\Day6-01.上传视频-什么是断点续传.mp4");//合并文件File mergeFile = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\Day6-01.上传视频-什么是断点续传.mp4");if (mergeFile.exists()) {mergeFile.delete();}//创建新的合并文件mergeFile.createNewFile();//用于写文件RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");//指针指向文件顶端raf_write.seek(0);//缓冲区byte[] b = new byte[1024];//分块列表File[] fileArray = chunkFolder.listFiles();// 转成集合,便于排序List<File> fileList = Arrays.asList(fileArray);// 从小到大排序Collections.sort(fileList, new Comparator<File>() {@Overridepublic int compare(File o1, File o2) {return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());}});//开始合并文件for (File chunkFile : fileList) {RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);}raf_read.close();}raf_write.close();//校验文件try (FileInputStream fileInputStream = new FileInputStream(originalFile);FileInputStream mergeFileStream = new FileInputStream(mergeFile);) {//取出原始文件的md5String originalMd5 = DigestUtils.md5Hex(fileInputStream);//取出合并文件的md5进行比较String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);if (originalMd5.equals(mergeFileMd5)) {System.out.println("合并文件成功");} else {System.out.println("合并文件失败");}}}
}

执行结果: 分片文件被合并为正常文件

视频上传流程

下图是上传视频的整体流程:

1、前端对文件进行分块。

2、前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。

3、如果分块文件不存在则前端开始上传

4、前端请求媒资服务上传分块。

5、媒资服务将分块上传至MinIO。

6、前端将分块上传完毕请求媒资服务合并分块。

7、媒资服务判断分块上传完成则请求MinIO合并文件。

8、合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。

minio合并文件测试

1、将分块文件上传至minio, minio限制每个分片文件不小于5M

/*** @description 测试MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();// 将分块文件上传至minio@Testpublic void uploadChunk() {String chunkFolderPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\";File chunkFolder = new File(chunkFolderPath);//分块文件File[] files = chunkFolder.listFiles();//将分块文件上传至miniofor (int i = 0; i < files.length; i++) {try {UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder().bucket("testbucket").object("chunk/" + i).filename(files[i].getAbsolutePath()).build();minioClient.uploadObject(uploadObjectArgs);System.out.println("上传分块成功" + i);} catch (Exception e) {e.printStackTrace();}}}}

2、通过minio的合并文件

/*** @description 测试MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//合并文件,要求分块文件最小5M@Testpublic void test_merge() throws Exception {// 分块文件集合(传统方式)
//        List<ComposeSource> sources = new ArrayList<>();
//        for (int i = 0; i < 10; i++) {
//            // 构建文件信息
//            ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build();
//            sources.add(composeSource);
//        }// 分块文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(10).map(i -> ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build()).collect(Collectors.toList());// 指定合并后的文件名// 通过sources指定源文件ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket("testbucket").object("01.上传视频-什么是断点续传.mp4").sources(sources).build();// 合并文件minioClient.composeObject(composeObjectArgs);}
}

3、分块文件使用后就没用了, 清除分块文件

/*** @description 测试MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//清除分块文件@Testpublic void test_removeObjects() {//合并分块完成将分块文件清除List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(10).map(i -> new DeleteObject("chunk/".concat(Integer.toString(i)))).collect(Collectors.toList());//构建参数RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("testbucket").objects(deleteObjects).build();//执行删除Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();}});}}

接口定义

根据上传视频流程,定义接口,与前端的约定是操作成功返回{code:0}否则返回{code:-1}

从课程资料中拷贝RestResponse.java类到base工程下的model包下。

/*** @author Mr.M* @version 1.0* @description 通用结果类型* @date 2022/9/13 14:44*/@Data
@ToString
public class RestResponse<T> {/*** 响应编码,0为正常,-1错误*/private int code;/*** 响应提示信息*/private String msg;/*** 响应内容*/private T result;public RestResponse() {this(0, "success");}public RestResponse(int code, String msg) {this.code = code;this.msg = msg;}/*** 错误信息的封装** @param msg* @param <T>* @return*/public static <T> RestResponse<T> validfail(String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setMsg(msg);return response;}public static <T> RestResponse<T> validfail(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常响应数据(包含响应内容)** @return RestResponse Rest服务封装相应数据*/public static <T> RestResponse<T> success(T result) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);return response;}public static <T> RestResponse<T> success(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常响应数据(不包含响应内容)** @return RestResponse Rest服务封装相应数据*/public static <T> RestResponse<T> success() {return new RestResponse<T>();}public Boolean isSuccessful() {return this.code == 0;}}

定义接口如下:

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return null;}@ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "上传分块文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}

service开发

校验方法

首先实现检查文件方法和检查分块方法, 在MediaFileService中定义service接口如下

/*** @author Mr.M* @version 1.0* @description 媒资文件管理业务类* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 检查文件是否存在* @param fileMd5 文件的md5* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:38*/public RestResponse<Boolean> checkFile(String fileMd5);/*** @description 检查分块是否存在* @param fileMd5  文件的md5* @param chunkIndex  分块序号* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:39*/public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);}

service接口实现方法

package com.xuecheng.media.api;import com.xuecheng.base.model.RestResponse;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.service.MediaFileService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}@ApiOperation(value = "上传分块文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}

在接口中调用service提供的检查文件方法和检查分块方法

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}
}
上传方法

定义service接口

/*** @author Mr.M* @version 1.0* @description 媒资文件管理业务类* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 上传分块* @param fileMd5  文件md5* @param chunk  分块序号* @param localChunkFilePath  分块文件本地路径* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:50*/public RestResponse uploadChunk(String fileMd5,int chunk,String localChunkFilePath);}

接口实现:

/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//视频文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;/*** @param localFilePath 文件地址* @param bucket        桶* @param objectName    对象名称* @return void* @description 将文件写入minIO* @author Mr.M* @date 2022/10/12 21:22*/public boolean addMediaFilesToMinIO(String localFilePath, String mimeType, String bucket, String objectName) {try {// 构建文件参数UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).object(objectName).filename(localFilePath).contentType(mimeType).build();// 执行上传操作minioClient.uploadObject(testbucket);log.debug("上传文件到minio成功,bucket:{},objectName:{}", bucket, objectName);System.out.println("上传成功");return true;} catch (Exception e) {e.printStackTrace();log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}", bucket, objectName, e.getMessage(), e);XueChengPlusException.cast("上传文件到文件系统失败");}return false;}// 根据文件扩展名取出mimeTypeprivate String getMimeType(String extension) {if (extension == null)extension = "";//根据扩展名取出mimeTypeContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//通用mimeType,字节流String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;if (extensionMatch != null) {mimeType = extensionMatch.getMimeType();}return mimeType;}//得到分块文件的目录private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}@Overridepublic RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {//得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;//mimeTypeString mimeType = getMimeType(null);//将文件存储至minIOboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_video, chunkFilePath);if (!b) {log.debug("上传分块文件失败:{}", chunkFilePath);return RestResponse.validfail(false, "上传分块失败");}log.debug("上传分块文件成功:{}",chunkFilePath);return RestResponse.success(true);}}

完善接口

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "上传分块文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {// 创建一个临时文件File tempFile = File.createTempFile("minio", "temp");file.transferTo(tempFile);// 获取文件路径String localFilePath = tempFile.getAbsolutePath();RestResponse restResponse = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath);return restResponse;}}

接口测试

  1. 更新前端文件
  • 将uploadtools.ts文件覆盖前端工程src/utils 目录下的同名文件,
  • 把前端切换文件增大到10M

  • 将 media-add-dialog.vue文件覆盖前端工程src\module-organization\pages\media-manage\components目录下的同名文件
  1. 修改后端配置
  • 前端对文件分块的大小为5MB,SpringBoot web默认上传文件的大小限制为1MB,这里需要在media-api工程修改配置如下:
spring:servlet:multipart:max-file-size: 50MBmax-request-size: 50MB

  • max-file-size: 单个文件的大小限制
  • Max-request-size: 单次请求的大小限制
  1. 启动前后端服务, 联调

合并方法

定义service接口

/*** @author Mr.M* @version 1.0* @description 媒资文件管理业务类* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 合并分块* @param companyId  机构id* @param fileMd5  文件md5* @param chunkTotal 分块总和* @param uploadFileParamsDto 文件信息* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:56*/public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
}

接口实现:

/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//视频文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;@AutowiredMediaFileService currentProxy;/*** 合并分块** @param companyId           机构id* @param fileMd5             文件md5* @param chunkTotal          分块总和* @param uploadFileParamsDto 文件信息* @return*/@Overridepublic RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {// 1.找到分块文件, 调用minio的sdk进行文件合并// 分块文件目录String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 1.1分块文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath.concat(Integer.toString(i))).build()).collect(Collectors.toList());//源文件名称String fileName = uploadFileParamsDto.getFilename();//文件扩展名String extension = fileName.substring(fileName.lastIndexOf("."));//合并后文件的objectNameString objectName = getFilePathByMd5(fileMd5, extension);// 1.2指定合并后的文件信息ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket(bucket_video).object(objectName) // 合并后的文件objectName.sources(sources)   // 通过sources指定源文件.build();// 1.3合并文件try {minioClient.composeObject(composeObjectArgs);} catch (Exception e) {e.printStackTrace();log.debug("合并文件失败,fileMd5:{},异常:{}", fileMd5, e.getMessage(), e);return RestResponse.validfail(false, "合并文件失败。");}// 2.校验合并后的文件和源文件是否一致// 先下载合并后的文件File file = downloadFileFromMinIO(bucket_video, objectName);try (FileInputStream fileInputStream = new FileInputStream(file)) {//计算合并后文件的md5值String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);//比较原始文件和合并后文件的MD5值if (!fileMd5.equals(mergeFile_md5)) {log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{}", fileMd5, mergeFile_md5);return RestResponse.validfail(false, "文件合并校验失败");}//保存文件大小uploadFileParamsDto.setFileSize(file.length());} catch (Exception e) {e.printStackTrace();return RestResponse.validfail(false, "文件合并校验失败");}// 3.将文件信息入库MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName);if (mediaFiles == null) {return RestResponse.validfail(false, "文件入库失败");}// 4.清理分块文件clearChunkFiles(chunkFileFolderPath, chunkTotal);return RestResponse.success(true);}//得到分块文件的目录private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}/*** 得到合并后的文件的地址** @param fileMd5 文件id即md5值* @param fileExt 文件扩展名* @return*/private String getFilePathByMd5(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}/*** 从minio下载文件** @param bucket     桶* @param objectName 对象名称* @return 下载后的文件*/public File downloadFileFromMinIO(String bucket, String objectName) {//临时文件File minioFile = null;FileOutputStream outputStream = null;try {InputStream stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());//创建临时文件minioFile = File.createTempFile("minio", ".merge");outputStream = new FileOutputStream(minioFile);IOUtils.copy(stream, outputStream);return minioFile;} catch (Exception e) {e.printStackTrace();} finally {if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}return null;}/*** 清除分块文件** @param chunkFileFolderPath 分块文件路径* @param chunkTotal          分块文件总数*/private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {try {//待删除分块文件列表List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i)))).collect(Collectors.toList());// 分块文件信息RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();// 清除分块文件Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);// 真正删除分块文件results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();log.error("清除分块文件失败,objectname:{}", deleteError.objectName(), e);}});} catch (Exception e) {e.printStackTrace();log.error("清除分块文件失败,chunkFileFolderPath:{}", chunkFileFolderPath, e);}}}

controller完善

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {// todo: 机构idLong companyId = 1232141425L;UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();uploadFileParamsDto.setFileType("001002");uploadFileParamsDto.setTags("课程视频");uploadFileParamsDto.setRemark("");uploadFileParamsDto.setFilename(fileName);return mediaFileService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);}}

功能测试: 下边进行前后端联调

  1. 上传一个视频测试合并分块的执行逻辑

进入service方法逐行跟踪。

  1. 断点续传测试

上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传

  1. 文件分片上传后合并分片, 合并完成后删除分片文件

update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?

多个线程同时执行上边的sql只会有一个线程执行成功。

  1. 什么是乐观锁、悲观锁?
  • synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
  • 乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。

定义mapper

package com.xuecheng.media.mapper;/*** <p>*  Mapper 接口* </p>** @author itcast*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** 开启一个任务* @param id 任务id* @return 更新记录数*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);}

service方法

package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒资文件处理业务方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/***  开启一个任务* @param id 任务id* @return true开启任务成功,false开启任务失败*/public boolean startTask(long id);}
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;//实现如下public boolean startTask(long id) {int result = mediaProcessMapper.startTask(id);return result<=0?false:true;}}

更新任务状态

任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。

在MediaFileProcessService接口添加方法

package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒资文件处理业务方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** @description 保存任务结果* @param taskId  任务id* @param status 任务状态* @param fileId  文件id* @param url url* @param errorMsg 错误信息* @return void* @author Mr.M* @date 2022/10/15 11:29*/void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);}

service接口方法实现如下:

package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任务,如果不存在则直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}//处理失败,更新任务处理结果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);//处理失败if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任务处理状态为失败,任务信息:{}",mediaProcess_u);return ;}//任务处理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){//更新媒资文件中的访问urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//处理成功,更新url和状态mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到历史记录MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//删除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}}

视频处理

视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。

所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。

定义任务类VideoTask 如下:

package com.xuecheng.media.jobhandler;/*** 视频处理任务类** @author Mr.M* @version 1.0* @description TODO* @date 2022/10/15 11:58*/
@Slf4j
@Component
public class VideoTask {@AutowiredMediaFileProcessService mediaFileProcessService;@AutowiredMediaFileService mediaFileService;@Value("${videoprocess.ffmpegpath}")private String ffmpeg_path;@XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex(); // 执行器序号,从0开始int shardTotal = XxlJobHelper.getShardTotal();  // 执行器总数//取出cpu核心数作为一次处理数据的条数int processors = Runtime.getRuntime().availableProcessors();//查询待处理的任务List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);//实际查到的任务数int size = mediaProcessList.size();log.debug("取到视频处理任务数:" + size);if (size <= 0) {return;}//创建线程池ExecutorService executorService = Executors.newFixedThreadPool(size);// 使用的计数器CountDownLatch countDownLatch = new CountDownLatch(size);mediaProcessList.forEach(mediaProcess -> {// 将任务加入线程池executorService.execute(() -> {try {Long taskId = mediaProcess.getId(); // 任务id// 开启任务boolean b = mediaFileProcessService.startTask(taskId);if (!b) {log.debug("抢占任务失败, 任务id: {}", taskId);return;}// 准备参数String bucket = mediaProcess.getBucket();  //桶String filePath = mediaProcess.getFilePath();  //存储路径String fileId = mediaProcess.getFileId(); //原始视频的md5值//将要处理的文件下载到服务器上File originalFile = mediaFileService.downloadFileFromMinIO(bucket, filePath);if (originalFile == null) {log.debug("下载待处理文件失败,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");return;}//处理结束的视频文件File mp4File = null;try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("创建mp4临时文件失败");mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "创建mp4临时文件失败");return;}//视频处理结果String result = "";try {//开始处理视频Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());//开始视频转换,成功将返回successresult = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("处理视频文件:{},出错:{}", mediaProcess.getFilePath(), e.getMessage());}if (!result.equals("success")) {//记录错误信息log.error("处理视频失败,视频地址:{},错误信息:{}", bucket + filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}//将mp4上传至minio//mp4在minio的存储路径String objectName = getFilePath(fileId, ".mp4");//访问urlString url = "/" + bucket + "/" + objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);//将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上传视频失败或入库失败,视频地址:{},错误信息:{}", bucket + objectName, e.getMessage());//最终还是失败了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "处理后视频上传或入库失败");}} finally {// 计数器减1countDownLatch.countDown();}});});//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}}

测试

基本测试

进入xxl-job调度中心添加执行器和视频处理任务

  1. 添加执行器

  1. 视频处理任务

  • 在xxl-job配置任务调度策略:
    • 1)配置阻塞处理策略为:丢弃后续调度。
    • 2)配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。

  1. 配置完成开始测试视频处理:
  • 首先上传至少4个视频,非mp4格式。

  • 在xxl-job启动视频处理任务

  • 观察媒资管理服务后台日志

失败测试

1、先停止调度中心的视频处理任务。

2、上传视频,手动修改待处理任务表中file_path字段为一个不存在的文件地址

3、启动任务

观察任务处理失败后是否会重试,并记录失败次数。

抢占任务测试

1、修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”

2、在抢占任务代码处打断点并选择支持多线程方式

3、在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。

4、启动任务

此时多个线程执行都停留在断点处

依次放行,观察同一个任务只会被一个线程抢占成功。


http://www.ppmy.cn/news/1583047.html

相关文章

OPENCV数字识别(非手写数字/采用模板匹配)

这篇文章的重点在于 模板匹配 的使用。模板匹配是计算机视觉中的一项基本技术&#xff0c;它通过比对输入图像与模板图像的相似度&#xff0c;来进行目标识别。对于数字识别&#xff0c;特别是标准数字的识别&#xff0c;模板匹配非常有效。 请看效果&#xff1a; 文章结构 …

在shell脚本内部获取该脚本所在目录的绝对路径

目录 需求描述 方法一&#xff1a;使用 dirname 和 readlink 命令 方法二&#xff1a;使用 BASH_SOURCE 变量 方法三&#xff1a;仅使用纯 Bash 实现 需求描述 工作中经常有这样情况&#xff0c;需要在脚本内部获取该脚本自己所在目录的绝对路径。 假如有一个脚本/a/b/c/…

Excel online开始支持Copilot高级数据分析:Python提供强大的数据见解

前文讲过Excel中的copilot可以直接调用Python进行高级数据分析&#xff1a; Copilot&#xff1a;Excel中的Python高级分析来了 Python in Excel高级分析&#xff1a;一键RFM分析 超越DeepSeek&#xff1a;Copilot in Excel高级数据分析原生支持Python无需安装软件 零代码、…

git,openpnp - 根据安装程序打包名称找到对应的源码版本

文章目录 git,openpnp - 根据安装程序打包名称找到对应的源码版本概述笔记备注 - 提交时间不可以作为查找提交记录的依据END git,openpnp - 根据安装程序打包名称找到对应的源码版本 概述 想在openpnp官方最新稳定版上改一改&#xff0c;首先就得知道官方打包的安装程序对应的…

基于Spring Boot的供应商管理系统的设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

2025年Postman的五大替代工具

虽然Postman是一个广泛使用的API测试工具&#xff0c;但许多用户在使用过程中会遇到各种限制和不便。因此&#xff0c;可能需要探索替代解决方案。本文介绍了10款强大的替代工具&#xff0c;它们能够有效替代Postman&#xff0c;成为你API测试工具箱的一部分。 什么是Postman&…

自动驾驶背后的数学:多模态传感器融合的简单建模

上一篇博客自动驾驶背后的数学:特征提取中的线性变换与非线性激活 以单个传感器为例,讲解了特征提取中的线性变换与非线性激活。 这一篇将以多模态传感器融合为例,讲解稍复杂的线性变换和非线性激活应用场景。 (一)权重矩阵的张量积分解 y = W x + b = [ w 11 ⋯ w 1 n ⋮…

VS010生成可由MATLAB2016调用的DLL文件方法

亲测实用&#xff0c;不用配置杂七杂八的依赖项 1&#xff1a;新建Win32的DLL输出项目 2&#xff1a;修改为release模式 3&#xff1a;添加calc.cpp文件&#xff0c;即要导出的函数myadd&#xff1a; #include "calc.h" __declspec(dllexport) int myadd(int a,in…