minio 分布式文件管理

news/2024/12/16 0:52:15/

一、minio 是什么?

MinIO构建分布式文件系统,MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。

官网:https://www.minio.org.cn/

二、minio的部署

本项目采用docker搭建

首先需要创建,文件存储的目录。以后上传的文件,在这4个目录中都会进行存储(即:一个文件存储4份),保证数据的安全性

 mkdir -p /root/minio_data/data1mkdir -p /root/minio_data/data2mkdir -p /root/minio_data/data3mkdir -p /root/minio_data/data4
docker run -p 9000:9000 -p 9001:9001 --name minio \-v /root/minio_data/data1:/data1 \-v /root/minio_data/data2:/data2 \-v /root/minio_data/data3:/data3 \-v /root/minio_data/data4:/data4 \-e "MINIO_ROOT_USER=minioadmin" \-e "MINIO_ROOT_PASSWORD=minioadmin" \minio/minio server /data{1...4} --console-address ":9001"
  • 9000端口是作为S3 API端口,用于API的调用,9001端口用于Web控制台
  • minio/minio: 这是Docker镜像的名称
  • server /data{1...4}: 这部分告诉MinIO以服务器模式启动,并且使用/data1/data2/data3, 和 /data4这四个目录作为存储位置。
  • --console-address ":9001": 这个参数指定了MinIO Web控制台的监听地址和端口。这里设置为":9001",意味着Web控制台将监听容器内的9001端口。
  • 访问地址:http://ip地址:9001   账号:minioadmin 密码:minioadmin
  • 93dac0ad23b94507b17dfece45f17685.png

三、基本使用方法 

1.创建一个bucket

创建一个测试bucket,用以存储文件

69036144c9f1407da346466d7b05d7c8.png

 2.上传文件

f44f9ec88d30422cb22088411c5e3687.png

上传文件后,我们可以发现在,data1 data2 data3 data4 目录下都进行了存储

28be8fdb29d64de7a1eee118b265691c.png

测试minio的数据恢复过程:

1、首先删除一个目录。

删除目录后仍然可以在web控制台上传文件和下载文件。

稍等片刻删除的目录自动恢复。

2、删除两个目录。

删除两个目录也会自动恢复。

3、删除三个目录 。

由于 集合中共有4块硬盘,有大于一半的硬盘损坏数据无法恢复。

此时报错:We encountered an internal error, please try again.  (Read failed.  Insufficient number of drives online)在线驱动器数量不足。

 

四、项目依赖

这些项目中会用到的依赖

<dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.3</version>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.8.1</version>
</dependency><!--根据扩展名取mimetype--><dependency><groupId>com.j256.simplemagic</groupId><artifactId>simplemagic</artifactId><version>1.17</version></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.11</version></dependency>

 需要将访问权限设置public,这样远程才能够访问到4ff11f59df064b6484ea2023b3776798.png

需要三个参数才能连接到minio服务。

dbca3be3b94845a9aa407b398e233346.png

五、图片上传

1.本地测试

包含上传文件、删除文件、下载文件、检查完整性

package com.xuecheng.media;import com.j256.simplemagic.ContentInfo;
import com.j256.simplemagic.ContentInfoUtil;
import io.minio.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.junit.jupiter.api.Test;
import org.springframework.http.MediaType;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterInputStream;/*** @description 测试MinIO* @author Mr.M* @date 2022/9/11 21:24* @version 1.0*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://124.70.208.223:8089/") //9000端口用于API调用.credentials("minioadmin", "minioadmin").build();private String getMimeType(String extension){if(extension==null)extension = "";//根据扩展名取出mimeTypeContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//根据扩展名获取MIME类型,比如.mp4文件的MIME类型是video/mp4//通用mimeType,字节流String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;if(extensionMatch!=null){mimeType = extensionMatch.getMimeType();}return mimeType;}//上传文件@Testvoid upload() {try {String filename="E:\\Users\\31118\\Pictures\\Snipaste_2024-11-10_23-08-04.png";String bucketName = "001/test001.jpg";String bucket ="testbucket";String mimeType = getMimeType(".jpg");UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).filename(filename) //本地文件路径.object(bucketName) //上传到bucket下的路径.contentType(mimeType)//默认根据扩展名确定文件.build();minioClient.uploadObject(testbucket);check(filename,bucketName,bucket);System.out.println("上传成功");} catch (Exception e) {e.printStackTrace();System.out.println("上传失败");}}//删除文件@Testvoid delete(){try {RemoveObjectArgs testbucket = RemoveObjectArgs.builder().bucket("testbucket").object("001/test001.jpg").build();minioClient.removeObject(testbucket);System.out.println("删除成功");} catch (Exception e) {e.printStackTrace();System.out.println("删除失败");}}//查看/下载文件@Testvoid getFile() {GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket("testbucket").object("001/test001.jpg").build();try(FilterInputStream inputStream = minioClient.getObject(getObjectArgs);FileOutputStream outputStream = new FileOutputStream(new File("E:\\图片.gif"));//输出路径) {IOUtils.copy(inputStream,outputStream);} catch (Exception e) {e.printStackTrace();}}//对上传之后和下载完成后的文件进行完整性检查,防止丢包//将上传完成后的文件和本地的临时文件的md5的值进行比对,如果一致,则说明上传和下载成功void check(String fileName,String bucketName,String bucket){GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket(bucket).object(bucketName).build();//校验文件的完整性对文件的内容进行md5try {//获取远程文件的md5FilterInputStream fileInputStream1 = minioClient.getObject(getObjectArgs);String source_md5 = DigestUtils.md5Hex(fileInputStream1);//获取本地文件的md5FileInputStream fileInputStream = new FileInputStream(new File(fileName));String local_md5 = DigestUtils.md5Hex(fileInputStream);if(source_md5.equals(local_md5)){System.out.println("下载成功");}}catch (Exception e){e.printStackTrace();}}
}

 上传文件时contentType("")属性并不是强制要求设置的,但一般建议设置,以便浏览器进行识别该文件的类型

 

2、java服务器远程部署-图片上传

minio:endpoint: http://124.70.208.223:9000 #API访问路径accessKey: minioadmin #登录账号secretKey: minioadmin #登录密码bucket:files: mediafiles #文件/图片 存在的位置videofiles: video #视频存储的位置

 

文件上传时,获取md5,作为主键保存在文件表中

后续上传的如果是同一个文件时,他们的md5的值是一致的,不在进行二次存储

配置类注册,方便后面直接使用

package com.xuecheng.media.config;import io.minio.MinioClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @description minio配置*/@Configuration
public class MinioConfig {@Value("${minio.endpoint}")private String endpoint;@Value("${minio.accessKey}")private String accessKey;@Value("${minio.secretKey}")private String secretKey;@Beanpublic MinioClient minioClient() {return MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).build();}
}

控制层接收到MultipartFile后,这是获取一些常见属性的办法,方便对文件进行存储

    @ApiOperation("上传文件")@RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE) //对文件类型进行声明public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile filedata) throws IOException {//文件大小long fileSize = filedata.getSize();//文件名称String originalFilename = filedata.getOriginalFilename();//创建临时文件File tempFile = File.createTempFile("minio", "temp"); //createTempFile 方法会生成一个唯一的文件名,该文件名由前缀、一个随机生成的字符串和后缀组成。例如/minio1234567890temp//上传的文件拷贝到临时文件filedata.transferTo(tempFile);//文件路径String absolutePath = tempFile.getAbsolutePath();}

 

 3.上传文件

 public boolean addMediaFilesToMinIO(String localFilePath,String mimeType,String bucket, String objectName) {try {UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).object(objectName).filename(localFilePath).contentType(mimeType).build();minioClient.uploadObject(testbucket);log.debug("上传文件到minio成功,bucket:{},objectName:{}",bucket,objectName);System.out.println("上传成功");return true;} catch (Exception e) {e.printStackTrace();log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}",bucket,objectName,e.getMessage(),e);XueChengPlusException.cast("上传文件到文件系统失败");}return false;}

4.需要用到的工具方法

获取文件Md5

 //获取文件的md5private String getFileMd5(File file) {try (FileInputStream fileInputStream = new FileInputStream(file)) {String fileMd5 = DigestUtils.md5Hex(fileInputStream);return fileMd5;} catch (Exception e) {e.printStackTrace();return null;}}

获取年月日结构目录

 //获取文件默认存储目录路径 年/月/日private String getDefaultFolderPath() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");String format = sdf.format(new Date());return format.replace("-", "/")+"/";}

根据扩展名获取MIME类型

比如.mp4文件的MIME类型是video/mp4

 private String getMimeType(String extension){ //传入.jpgif(extension==null)extension = "";//根据扩展名取出mimeTypeContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//通用mimeType,字节流String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;if(extensionMatch!=null){mimeType = extensionMatch.getMimeType();}return mimeType;}

这边只进行关键信息的展示,数据库相关操作根据项目自行处理

图片的访问链接是:

服务器ip:9000/mediafiles/2024/11/27/e0abb735ab793fae5568c2ed537ab37c.jpg

注意9000是API地址,9001是web服务地址

 

六、视频上传-断点续传

minio限制,视频至少以5mb,划分

1.文件上传前检查文件是否已上传

先通过前端计算出视频md5的值,传给后端,检测该视频是否已经存在,

    @ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return bigFilesService.checkFile(fileMd5);}
    @Overridepublic RestResponse<Boolean> checkFile(String fileMd5) {//查询文件信息MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);if (mediaFiles != null) {//桶String bucket = mediaFiles.getBucket();//存储目录String filePath = mediaFiles.getFilePath();//文件流InputStream stream = null;try {stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(filePath).build());if (stream != null) {//文件已存在return RestResponse.success(true);}} catch (Exception e) {log.info("文件不存在,准备开始分块上传");}}//文件不存在return RestResponse.success(false);}

 2.分块上传前检测分块是否已上传

根据后端的响应信息,若该视频不存在,前端对视频划分为一个个分块,并计算每个分块的md5值

将分块的md5值,传给后端,判断该分块是否存在,若该分块不存在则上传分块

 

  @ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception {return bigFilesService.checkChunk(fileMd5,chunk);}
 @Overridepublic RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {//得到分块文件目录String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunkIndex;//文件流InputStream fileInputStream = null;try {fileInputStream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket_videoFiles).object(chunkFilePath).build());if (fileInputStream != null) {//分块已存在log.info("分块{}已存在",chunkIndex);return RestResponse.success(true);}} catch (Exception e) {//minio中没有该分块,上传分块log.info("分块{}不存在,开始上传",chunkIndex);}//分块未存在return RestResponse.success(false);}

3.上传分块

    @Overridepublic RestResponse uploadChunk(String fileMd5, int chunk,String localFilePath) {//得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;try {//将文件存储至minIOaddMediaFilesToMinIO(localFilePath, bucket_videoFiles,chunkFilePath);return RestResponse.success(true);} catch (Exception ex) {ex.printStackTrace();log.debug("上传分块文件:{},失败:{}",chunkFilePath,ex.getMessage());}return RestResponse.validfail(false,"上传分块失败");}/*** @description 将文件写入minIO* @param localFilePath  文件地址* @param bucket  桶* @param objectName 对象名称* @return void* @author Mr.M* @date 2022/10/12 21:22*/public boolean addMediaFilesToMinIO(String localFilePath,String mimeType,String bucket, String objectName) {try {UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).object(objectName).filename(localFilePath).contentType(mimeType).build();minioClient.uploadObject(testbucket);log.debug("上传文件到minio成功,bucket:{},objectName:{}",bucket,objectName);System.out.println("上传成功");return true;} catch (Exception e) {e.printStackTrace();log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}",bucket,objectName,e.getMessage(),e);XueChengPlusException.cast("上传文件到文件系统失败");}return false;}

md5目录结构

分块存储目录:d/a/da112e234adasdasd/chunk

    //得到分块文件的目录private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}

修改文件大小的限制

 前端对文件分块的大小为5MB,SpringBoot web默认上传文件的大小限制为1MB

spring:servlet:multipart:max-file-size: 50MB #单个文件的大小限制max-request-size: 50MB #单次请求的大小限制

4.合并分块

@ApiOperation(value = "合并文件")
@PostMapping("/upload/mergechunks")
public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) throws Exception {Long companyId = 1232141425L;UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();uploadFileParamsDto.setFileType("001002");uploadFileParamsDto.setTags("课程视频");uploadFileParamsDto.setRemark("");uploadFileParamsDto.setFilename(fileName);return bigFilesService.mergechunks(companyId,fileMd5,chunkTotal,uploadFileParamsDto);
}
 @Overridepublic RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {//=====获取分块文件路径=====String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//组成将分块文件路径组成 List<ComposeSource>List<ComposeSource> sourceObjectList = Stream.iterate(0, i -> ++i) //从0开始,迭代到chunkTotal,依次获取所有分块文件,0 1 2.limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_videoFiles).object(chunkFileFolderPath+i).build()).collect(Collectors.toList());//=====合并=====//文件名称String fileName = uploadFileParamsDto.getFilename();//文件扩展名String extName = fileName.substring(fileName.lastIndexOf("."));//合并文件路径String mergeFilePath = getFilePathByMd5(fileMd5, extName);try {//合并文件ObjectWriteResponse response = minioClient.composeObject(ComposeObjectArgs.builder().bucket(bucket_videoFiles).object(mergeFilePath).sources(sourceObjectList).build());log.debug("合并文件成功:{}",mergeFilePath);} catch (Exception e) {log.debug("合并文件失败,fileMd5:{},异常:{}",fileMd5,e.getMessage(),e);return RestResponse.validfail(false, "合并文件失败。");}// ====验证md5====File minioFile = downloadFileFromMinIO(bucket_videoFiles,mergeFilePath);if(minioFile == null){log.debug("下载合并后文件失败,mergeFilePath:{}",mergeFilePath);return RestResponse.validfail(false, "下载合并后文件失败。");}try (InputStream newFileInputStream = new FileInputStream(minioFile)) {//minio上文件的md5值String md5Hex = DigestUtils.md5Hex(newFileInputStream);//比较md5值,不一致则说明文件不完整if(!fileMd5.equals(md5Hex)){return RestResponse.validfail(false, "文件合并校验失败,最终上传失败。");}//文件大小uploadFileParamsDto.setFileSize(minioFile.length());}catch (Exception e){log.debug("校验文件失败,fileMd5:{},异常:{}",fileMd5,e.getMessage(),e);return RestResponse.validfail(false, "文件合并校验失败,最终上传失败。");}finally {if(minioFile!=null){ //删除下载的临时文件minioFile.delete();}}//文件入库currentProxy.addMediaFilesToDb(companyId,fileMd5,uploadFileParamsDto,bucket_videoFiles,mergeFilePath);//=====清除分块文件=====clearChunkFiles(chunkFileFolderPath,chunkTotal);return RestResponse.success(true);}

下载至本地,用于md5检测

 

获得合并后文件存储路径

    private String getFilePathByMd5(String fileMd5,String fileExt){return   fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;}

将上传后的文件下载至本地

将下载文件的md5的值与前端传递过来时视频md5值进行比较,判断视频上传时是否出现丢包

 public File downloadFileFromMinIO(String bucket,String objectName){//临时文件File minioFile = null;FileOutputStream outputStream = null;try{InputStream stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());//创建临时文件minioFile=File.createTempFile("minio", ".merge");outputStream = new FileOutputStream(minioFile);IOUtils.copy(stream,outputStream);return minioFile;} catch (Exception e) {e.printStackTrace();}finally {if(outputStream!=null){try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}return null;}

删除分块

视频上传成功后,删除之前上传的分块

   private void clearChunkFiles(String chunkFileFolderPath,int chunkTotal){try {List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i)))).collect(Collectors.toList());RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);results.forEach(r->{DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();log.error("清楚分块文件失败,objectname:{}",deleteError.objectName(),e);}});} catch (Exception e) {e.printStackTrace();log.error("清楚分块文件失败,chunkFileFolderPath:{}",chunkFileFolderPath,e);}}

分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

 

视频文件格式转换

视频文件的格式有很多中,我们需要把视频格式统一转换为mp4,下面以avi文件格式转换为mp4格式举例

FFmpeg进行媒体文件的转换

ffmpeg的安装及基本使用:

xxl-job分布式任务调度

由于媒体文件转换需要处理的时间,我们采用xxl-job进行分布式任务调度

xxl-job的基本使用方法:

多服务执行:

23c389d5ceb543d99564fc2a2f9d2c52.png

-Dserver.port=63051 -Dxxl.job.executor.port=9998

什么是乐观锁、悲观锁?

synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。

乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。

为了防止多个分布式任务,执行同一个行为,需要使用分布锁进行来控制

1、基于数据库实现分布锁

利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。

数据库表的设计

在上传文件之后,将需要格式转换的文件,存入media_process数据库

b90b028bcd4547988dfec01730c0e1a5.png

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for media_process
-- ----------------------------
DROP TABLE IF EXISTS `media_process`;
CREATE TABLE `media_process`  (`id` bigint NOT NULL AUTO_INCREMENT,`file_id` varchar(120) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件标识',`filename` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件名称',`bucket` varchar(128) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '存储桶',`file_path` varchar(512) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '存储路径',`status` varchar(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '状态,1:未处理,2:处理成功  3处理失败 4处理中',`create_date` datetime NOT NULL COMMENT '上传时间',`finish_date` datetime NULL DEFAULT NULL COMMENT '完成时间',`fail_count` int NULL DEFAULT 0 COMMENT '失败次数',`url` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '媒资文件访问地址',`errormsg` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '失败原因',PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `unique_fileid`(`file_id` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 15 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = DYNAMIC;SET FOREIGN_KEY_CHECKS = 1;

谁先抢到,谁处理

视频处理完成后,转存如 media_process_history表中,在media_process表中,删除该条记录


SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for media_process_history
-- ----------------------------
DROP TABLE IF EXISTS `media_process_history`;
CREATE TABLE `media_process_history`  (`id` bigint NOT NULL AUTO_INCREMENT,`file_id` varchar(120) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件标识',`filename` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '文件名称',`bucket` varchar(128) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '存储源',`status` varchar(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '状态,1:未处理,2:处理成功  3处理失败',`create_date` datetime NOT NULL COMMENT '上传时间',`finish_date` datetime NOT NULL COMMENT '完成时间',`url` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '媒资文件访问地址',`fail_count` int NULL DEFAULT 0 COMMENT '失败次数',`file_path` varchar(512) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '文件路径',`errormsg` varchar(1024) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '失败原因',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 12 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = DYNAMIC;SET FOREIGN_KEY_CHECKS = 1;

 

   @XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();List<MediaProcess> mediaProcessList = null;int size = 0;try {//取出cpu核心数作为一次处理数据的条数int processors = Runtime.getRuntime().availableProcessors();//获取待处理视频//一次处理视频数量不要超过cpu核心数,避免CPU超载mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);size = mediaProcessList.size();log.debug("取出待处理视频任务{}条", size);if (size < 0) {return;}} catch (Exception e) {e.printStackTrace();return;}//启动size个线程的线程池ExecutorService threadPool = Executors.newFixedThreadPool(size);//计数器,用于等待所有线程执行完毕CountDownLatch countDownLatch = new CountDownLatch(size);//将处理任务加入线程池mediaProcessList.forEach(mediaProcess -> {threadPool.execute(() -> { //所以线程,通过循环同时启动try {//任务idLong taskId = mediaProcess.getId();//抢占任务,将任务status状态改为4正在处理boolean b = mediaFileProcessService.startTask(taskId);if (!b) {return;}log.debug("开始执行任务:{}", mediaProcess);//下边是处理逻辑//桶String bucket = mediaProcess.getBucket();//存储路径String filePath = mediaProcess.getFilePath();//原始视频的md5值String fileId = mediaProcess.getFileId();//原始文件名称String filename = mediaProcess.getFilename();//将要处理的文件下载到服务器上File originalFile = bigFilesService.downloadFileFromMinIO(mediaProcess.getBucket(), mediaProcess.getFilePath());if (originalFile == null) {log.debug("下载待处理文件失败,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");return;}//处理结束的视频文件File mp4File = null;//创建临时文件,作为转化后的文件try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("创建mp4临时文件失败");//保存任务是失败的结果mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "创建mp4临时文件失败");return;}//视频处理结果String result = "";try {String absolutePath = mp4File.getAbsolutePath();//包含了,文件名String localPath = absolutePath.substring(0, absolutePath.lastIndexOf("\\")+1);//开始处理视频Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4File.getName(),localPath);//开始视频转换,成功将返回successresult = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("处理视频文件:{},出错:{}", mediaProcess.getFilePath(), e.getMessage());mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");}if (!result.equals("success")) {//记录错误信息log.error("处理视频失败,视频地址:{},错误信息:{}", bucket + filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}//将mp4上传至minio//mp4在minio的存储路径String objectName = getFilePath(fileId, ".mp4");//访问urlString url = "/" + bucket + "/" + objectName;try {bigFilesService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);//将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上传视频失败或入库失败,视频地址:{},错误信息:{}", bucket + objectName, e.getMessage());//最终还是失败了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "处理后视频上传或入库失败");}}finally {countDownLatch.countDown();  //线程数减一}});});//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5,String fileExt){return   fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;}

 

当前需要处理的视频文件,需要根据计算机 当前计算机启动的服务下标(从0开始...),当前计算机启动服务总个数 和 计算机的线程数计算得出,因为若计算机的线程数为8,一次性最多处理8个视频

sql语句这样设计的目的是为了给每个服务(执行器),分配任务。一台8核的计算机,一次性最多分配8个任务

 @Overridepublic List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {return mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);}/*** @description 根据分片参数获取待处理任务,一次处理视频数量不要超过cpu核心数,避免CPU超载* @param shardTotal  分片总数* @param shardIndex  分片序号* @param count 任务数* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>* @author Mr.M* @date 2022/9/14 8:54*/@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);

 Sql语句的查询,原理如下

f5cc1cce94384b7799acf265302e436d.png

 

上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:

1  %  2 = 1    执行器2执行

2  %  2 =  0    执行器1执行

3  %  2 =  1     执行器2执行

以此类推.

 

一个服务(执行器),所以线程同时执行,为了防止多个线程执行的是同一个任务,当前线程执行时需要前开启任务时,将数据库的状态设置为4,表示正在处理中,防止下次执行时,被其他执行器抢占

    /*** 开启一个任务* @param id 任务id* @return 更新记录数*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);

若视频转换过程中出现异常,失败次数+1,失败次数达到3此不在执行。

若视频转换成功,修改任务状态为2,并将其存入历史进程表中,在当前表中删除该条记录

@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任务,如果不存在则直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}//处理失败,更新任务处理结果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);//处理失败if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任务处理状态为失败,任务信息:{}",mediaProcess_u);return ;}//任务处理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){//更新媒资文件中的访问urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//处理成功,更新url和状态mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到历史记录MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//删除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}

工具类

检查视频时长,校验两个视频时长是否相等,等待进程处理完毕

package com.xuecheng.base.utils;import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;/*** 此文件作为视频文件处理父类,提供:* 1、查看视频时长* 2、校验两个视频的时长是否相等**/
public class VideoUtil {String ffmpeg_path;//ffmpeg的安装位置public VideoUtil(String ffmpeg_path){this.ffmpeg_path = ffmpeg_path;}//检查视频时间是否一致public Boolean check_video_time(String source,String target) {String source_time = get_video_time(source);//取出时分秒source_time = source_time.substring(source_time.lastIndexOf(":")+1);String target_time = get_video_time(target);//取出时分秒target_time = target_time.substring(target_time.lastIndexOf(":")+1);if(source_time == null || target_time == null){return false;}float v1 = Float.parseFloat(source_time);float v2 = Float.parseFloat(target_time);float abs = Math.abs(v1 - v2);if(abs<1){//转化是会有细微差距,属于正常现象return true;}return false;}//获取视频时间(时:分:秒:毫秒)public String get_video_time(String video_path) {/*ffmpeg -i  lucene.mp4*/List<String> commend = new ArrayList<String>();commend.add(ffmpeg_path);commend.add("-i");commend.add(video_path);try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//将标准输入流和错误输入流合并,通过标准输入流程读取信息builder.redirectErrorStream(true);Process p = builder.start();String outstring = waitFor(p);System.out.println(outstring);int start = outstring.trim().indexOf("Duration: ");if(start>=0){int end = outstring.trim().indexOf(", start:");if(end>=0){String time = outstring.substring(start+10,end);if(time!=null && !time.equals("")){return time.trim();}}}} catch (Exception ex) {ex.printStackTrace();}return null;}//等待一个外部进程(通过Process对象表示)完成,并在此过程中捕获该进程的标准输出和错误输出。public String waitFor(Process p) {InputStream in = null;InputStream error = null;String result = "error";int exitValue = -1;StringBuffer outputString = new StringBuffer();try {in = p.getInputStream();error = p.getErrorStream();boolean finished = false;int maxRetry = 600;//每次休眠1秒,最长执行时间10分种int retry = 0;while (!finished) {if (retry > maxRetry) {return "error";}try {while (in.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}while (error.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}//进程未结束时调用exitValue将抛出异常exitValue = p.exitValue();finished = true;} catch (IllegalThreadStateException e) {Thread.currentThread().sleep(1000);//休眠1秒retry++;}}} catch (Exception e) {e.printStackTrace();} finally {if (in != null) {try {in.close();} catch (IOException e) {System.out.println(e.getMessage());}}}return outputString.toString();}public static void main(String[] args) throws IOException {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置VideoUtil videoUtil = new VideoUtil(ffmpeg_path);String video_time = videoUtil.get_video_time("E:\\ffmpeg_test\\1.avi");System.out.println(video_time);}
}

avi格式转mp4格式

package com.xuecheng.base.utils;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class Mp4VideoUtil extends VideoUtil {String ffmpeg_path;//ffmpeg的安装位置String video_path;String mp4_name;String mp4folder_path;public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){super(ffmpeg_path);this.ffmpeg_path = ffmpeg_path;this.video_path = video_path;this.mp4_name = mp4_name;this.mp4folder_path = mp4folder_path;}//清除已生成的mp4private void clear_mp4(String mp4_path){//删除原来已经生成的m3u8及ts文件File mp4File = new File(mp4_path);if(mp4File.exists() && mp4File.isFile()){mp4File.delete();}}/*** 视频编码,生成mp4文件* @return 成功返回success,失败返回控制台日志*/public String generateMp4(){//清除已生成的mp4
//        clear_mp4(mp4folder_path+mp4_name);clear_mp4(mp4folder_path);/*ffmpeg.exe -i  lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4*/List<String> commend = new ArrayList<String>();//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");commend.add(ffmpeg_path);commend.add("-i");
//        commend.add("D:\\BaiduNetdiskDownload\\test1.avi");commend.add(video_path);commend.add("-c:v");commend.add("libx264");commend.add("-y");//覆盖输出文件commend.add("-s");commend.add("1280x720");commend.add("-pix_fmt");commend.add("yuv420p");commend.add("-b:a");commend.add("63k");commend.add("-b:v");commend.add("753k");commend.add("-r");commend.add("18");commend.add(mp4folder_path  + mp4_name );String outstring = null;try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//将标准输入流和错误输入流合并,通过标准输入流程读取信息builder.redirectErrorStream(true);Process p = builder.start();outstring = waitFor(p);} catch (Exception ex) {ex.printStackTrace();}Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);if(!check_video_time){return outstring;}else{return "success";}}public static void main(String[] args) throws IOException {//ffmpeg的路径String ffmpeg_path = "F:\\environment\\ffmpeg-7.0.2-full_build\\bin\\ffmpeg.exe";//ffmpeg的安装位置//源avi视频的路径String video_path = "E:\\Users\\31118\\Videos\\1.avi";//转换后mp4文件的名称String mp4_name = "1.mp4";//转换后mp4文件的路径String mp4_path = "E:\\Users\\31118\\Videos\\"; //结尾路径,需要加上\\//创建工具类对象Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);//开始视频转换,成功将返回successString s = videoUtil.generateMp4();System.out.println(s);}
}

任务补偿机制

如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

 


http://www.ppmy.cn/news/1555452.html

相关文章

Elasticsearch02-安装7.x

零、文章目录 Elasticsearch02-安装7.x 1、Windows安装Elasticsearch &#xff08;1&#xff09;JDK安装 Elasticsearch是基于java开发的&#xff0c;所以需要安装JDK。我们安装的Elasticsearch版本是7.15&#xff0c;对应JDK至少1.8版本以上。也可以不安装jdk&#xff0c;…

说说聊聊CNCF(云原生计算基金会)

简介 官网地址&#xff1a;https://www.cncf.io/ Cloud Native Computing Foundation 旨在通过培养和维持开源、供应商中立项目的生态系统来推动这种范式的采用。我们将最先进的模式大众化&#xff0c;让每个人都能使用这些创新。 Cloud Native Computing Foundation&#xff…

力扣-图论-9【算法学习day.59】

前言 ###我做这类文章一个重要的目的还是给正在学习的大家提供方向和记录学习过程&#xff08;例如想要掌握基础用法&#xff0c;该刷哪些题&#xff1f;&#xff09;我的解析也不会做的非常详细&#xff0c;只会提供思路和一些关键点&#xff0c;力扣上的大佬们的题解质量是非…

STM32 出租车计价器系统设计(一) 江科大源码改写

STM32 出租车计价器系统设计 功能目标 驱动步进电机模拟车轮旋转&#xff0c;并实现调速功能。 设置车轮周长和单价&#xff0c;检测车轮转速和运转时间。 计算并显示行驶里程和价格。 硬件材料 28BYJ48 五线四相步进电机和 ULN2003 驱动板模块 测速传感器模块 嵌入式小系统…

快速上手Neo4j图关系数据库

参考视频&#xff1a; 【IT老齐589】快速上手Neo4j网状关系图库 1 Neo4j简介 Neo4j是一个图数据库&#xff0c;是知识图谱的基础 在Neo4j中&#xff0c;数据的基本构建块包括&#xff1a; 节点(Nodes)关系(Relationships)属性(Properties)标签(Labels) 1.1 节点(Nodes) 节点…

分布式全文检索引擎ElasticSearch-基本概念介绍

一、索引类型 索引&#xff0c;可以理解是我们的目录&#xff0c;看一本书的时候&#xff0c;可以根据目录准确快速定位到某一页&#xff0c;那么索引就可以帮我们快速定位到某条数据在庞大的数据表的哪一个位置。 我们常见的索引包括正排索引和倒排索引 1、正排索引 正排索…

算法训练(leetcode)二刷第三十五天 | *121. 买卖股票的最佳时机、*122. 买卖股票的最佳时机 II、*123. 买卖股票的最佳时机 III

刷题记录 *121. 买卖股票的最佳时机贪心*动态规划 *122. 买卖股票的最佳时机 II*123. 买卖股票的最佳时机 III *121. 买卖股票的最佳时机 leetcode题目地址 贪心 记录最低价格作为买入价格&#xff0c;并用当前价格减去最低价格获得利润。 时间复杂度&#xff1a; O ( n )…

二、pxe安装失败,交换机tcpdump dhcp数据包

在交换机上使用 tcpdump 抓取 DHCP 数据包可以帮助你监控和分析 DHCP 流量,这对于故障排除网络配置问题或了解 DHCP 服务器与客户端之间的交互非常有用。不过需要注意的是,并不是所有的交换机都支持直接运行 tcpdump,这通常是在 Linux 或类 Unix 系统上使用的命令行工具。对…