学成在线 - 第3章任务补偿机制实现 + 分块文件清理

ops/2024/10/18 5:40:56/

7.9 额外实现

7.9.1 任务补偿机制

问题:如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

大家思考这个sql该如何实现?

大家尝试自己实现此任务补偿机制。

数据库表结构

media_process:

在这里插入图片描述

根据status字段判断视频文件是否正在处理,如果status值为4(正在处理),且当前时间减去create_date超过30分钟,就把status的值修改为3(处理失败),并更新其他字段的值。

具体实现流程
  1. 编写检查超时的sql

    SELECT * FROM `media_process_history`
    WHERE TIMESTAMPDIFF(MINUTE,create_date,finish_date) < 30
    
  2. 在media_process的mapper接口中添加相应的接口

    MediaProcessMapper.java

    java">/*** 查询是否有执行超过30分钟的视频处理任务* @param nowDate 当前任务执行的时间* @return List<MediaProcess>*/
    @Select("SELECT * FROM media_process_history WHERE TIMESTAMPDIFF(MINUTE,create_date,#{date}) > 30")
    List<MediaProcess> selectTimeoutProcess(@Param("date")Date nowDate);
    
  3. 实现service代码

    java">/*** 超时任务列表* @param shardIndex 分片序号* @param shardTotal 分片总数* @param count 获取记录数(cpu核心数)* @return List<MediaProcess>*/
    @Override
    public List<MediaProcess> getTimeoutMediaProcessList(int shardIndex, int shardTotal, int count) {List<MediaProcess> mediaTimeoutProcessList = mediaProcessMapper.selectTimeoutProcess(LocalDateTime.now(), shardIndex, shardTotal, count);return mediaTimeoutProcessList;
    }
    
  4. 业务逻辑在task中实现

    查询相应超时记录,把status和errMsg字段更新后保存到media_process表里。

    java">/*** 处理视频超时任务* @throws Exception*/
    @XxlJob("videoTimeoutJobHandler")
    public void videoTimeoutJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();List<MediaProcess> mediaTimeoutProcessList = null;int size = 0;try {// 取出cpu核心数作为一次检查超时的记录条数int availableProcessors = Runtime.getRuntime().availableProcessors();mediaTimeoutProcessList = mediaFileProcessService.getTimeoutMediaProcessList(LocalDateTime.now(), shardIndex, shardTotal, availableProcessors);size = mediaTimeoutProcessList.size();log.debug("取出的超时任务数量是 {} 条", size);if (size <= 0) {return ;}} catch (Exception e) {log.error("取出的超时任务超时", size);e.printStackTrace();}// 启动size个线程的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(size);// 定义size个插销CountDownLatch countDownLatch = new CountDownLatch(size);mediaTimeoutProcessList.forEach(mediaProcess -> {// 将任务加入线程池fixedThreadPool.execute(() -> {try {// 把status值设置为3,并把errMsg更新Long taskId = mediaProcess.getId();String fileId = mediaProcess.getFileId();mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "处理视频超时(30分钟)");log.debug("删除超时任务");} catch (Exception e) {e.printStackTrace();log.error("删除超时任务失败,失败原因: {}", e.getMessage());} finally {countDownLatch.countDown();}});});// 等待,给一个充裕的超时事件,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);
    }
    

7.9.2 达到最大失败次数(暂时未实现)

问题:需要找到前端对应的接口。

当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

7.9.3 分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。(实现过程中并没有往文件表中插入正在上传的文件记录,只是在media_minio_files临时保存了分块信息)

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

实现思路

难点:怎么判断文件上传了一半不再上传了?上传分块文件只在上传视频时有用,上传分块文件时不会往media_files表里添加记录,只有所有分块上传完成,并合并成功后,才会往media_files里插入数据。所以要建立一个media_minio_files表,每上传一个分块就往media_minio_files表里插入一条分块信息,记录包括分块的上传时间。如果超过30分钟分块记录还没有被删除,说明上传到一半不传了,把minio里的分块目录删除,并删除对应的数据库里的记录。在文件合并成功后,数据库里的分块文件上传记录也要删除。

media_minio_files表结构

在这里插入图片描述

具体实现流程
  1. 上传文件块的时候,上传一个文件块完毕要把这个块的信息保存到表里。

    java">/*** 把文件分块信息入库* @param fileMd5 文件md5* @param fileName 文件名称* @param bucket minio桶* @param objectName minio中块的存储路径* @param chunkSize 块大小* @return MediaMinioFiles*/
    @Transactional
    @Override
    public MediaMinioFiles addMediaChunkToDb(String fileMd5, String fileName, String bucket, String objectName, Long chunkSize) {MediaMinioFiles mediaMinioFile = new MediaMinioFiles();mediaMinioFile.setFileId(fileMd5);mediaMinioFile.setFilename(fileName);mediaMinioFile.setBucket(bucket);mediaMinioFile.setFilePath(objectName);mediaMinioFile.setFileSize(chunkSize);mediaMinioFile.setStatus("4");  // 上传中int insert = mediaMinioFilesMapper.insert(mediaMinioFile);if (insert < 0) {log.error("保存分块文件信息到数据库失败,{}", mediaMinioFile.toString());XueChengPlusException.cast("保存分块文件信息失败");}log.debug("保存分块文件信息到数据库成功,{}", mediaMinioFile.toString());return mediaMinioFile;
    }/*** 上传分块* @param fileMd5 文件md5* @param chunk 分块序号* @param localChunkFilePath 分块文件本地路径* @return RestResponse*/
    @Override
    public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {// 得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;// mimeTypeString mimeType = getMimeType(null);// 将文件存储到 minioboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucketVideoFiles, chunkFilePath);// 将分块信息存到数据库中currentProxy.addMediaChunkToDb(fileMd5, null, bucketVideoFiles, chunkFilePath, 1024 * 5L);if (!b) {log.debug("上传分块文件失败: {}", chunkFilePath);return RestResponse.validfail(false, "上传分块失败");}log.debug("上传分块文件成功: {}", chunkFilePath);return RestResponse.success(true);
    }
    
  2. 如果没有上传失败,在成功合并块文件之后,要把上面数据库中记录删除。

    java">/*** 删除数据库中的分块文件上传记录* @param fileMd5 文件md5*/
    @Transactional
    public void clearChunkFromDb(String fileMd5) {LambdaQueryWrapper<MediaMinioFiles> deleteQueryWrapper = new LambdaQueryWrapper<>();deleteQueryWrapper.eq(MediaMinioFiles::getFileId, fileMd5);try {mediaMinioFilesMapper.delete(deleteQueryWrapper);log.debug("删除分块上传记录成功");} catch (Exception e) {e.printStackTrace();log.error("删除分块上传记录失败");}
    }
    

    在合并分块的最后几行:

    java">// 文件入库
    currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucketVideoFiles, mergeFilePath);
    // 清除文件分块
    clearChunkFiles(chunkFileFolderPath, chunkTotal);
    // 清除数据库中文件分块上传信息
    currentProxy.clearChunkFromDb(fileMd5);
    return RestResponse.success(true);
    
  3. 如果上传文件一半失败,要根据数据库中media_minio_files表中存的块的路径删除所有超时块。

    首先查询所有的超时块,得到上传的超时块列表:

    java">/*** 拿到数据库中所有上传超时的文件分块信息* @param time 当前任务执行时间* @return List<MediaMinioFiles>*/
    @Override
    public List<MediaMinioFiles> getChunkTimeoutFiles(LocalDateTime time) {List<MediaMinioFiles> mediaMinioFiles = mediaMinioFilesMapper.selectTimeoutChunks(DateUtil.toDateTime(time));return mediaMinioFiles;
    }
    

    删除minio中所有超时块,并且删除所有超时记录

    java">@XxlJob("videoChunkTimeoutJobHandler")
    public void videoChunkTimeoutJobHandler() {log.debug(">>>>>>>>>> 开始执行检查上传超时块任务");// 拿到所有的超时分块任务List<MediaMinioFiles> chunkTimeoutFiles = mediaFileService.getChunkTimeoutFiles(LocalDateTime.now());if (chunkTimeoutFiles == null) {  // 没有超时任务log.debug("没有超时任务");return ;}// 根据记录中的file_path,删除minio中的所有文件chunkTimeoutFiles.forEach(chunk -> {String filePath = chunk.getFilePath();mediaFileService.clearSingleChunkFile(filePath);});// 删除数据库中对应的记录mediaFileService.clearChunkFromDb(null, LocalDateTime.now());
    }
    

http://www.ppmy.cn/ops/37999.html

相关文章

[C++]哈希应用-布隆过滤器快速入门

布隆过滤器 布隆过滤器&#xff08;Bloom Filter&#xff09;是一个由布隆在1970年提出的概率型数据结构&#xff0c;它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器的主要特点是高效的插入和查询&#xff0c;可以用于检索一个元素是否在一个集合中。 原理…

射频无源器件之耦合器

一. 耦合器的作用 在射频电路中,射频耦合器将一路微波功率按比例分成几路,用于检测或监测信号,如功率测量和波检测,还可改变信号的幅度、相位等特性,以满足不同的通信需求。根据输入与耦合端的功率差,常被分为5dB、6dB、10dB等耦合器。射频耦合器的类型主要包括定向耦合…

uniapp引入vant组件库

在 UniApp 中引入 Vant 组件库的完整步骤通常如下&#xff1a; 安装 Vant&#xff1a; 首先&#xff0c;你需要通过 npm 或 yarn 安装 Vant。打开项目的根目录&#xff0c;然后在命令行中执行以下命令&#xff1a; 使用 npm&#xff1a; npm install vant 或者使用 yarn&…

JUC下的ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor是Java并发编程框架中一个强大且灵活的线程池实现&#xff0c;专为定时与周期性任务而设计。作为ThreadPoolExecutor的子类&#xff0c;它不仅继承了线程池管理的高效与灵活性&#xff0c;还内置了基于优先级队列的延迟任务调度机制&#xff0c;支持…

数据库(MySQL)—— 函数

数据库&#xff08;MySQL&#xff09;—— 函数 字符串函数数值函数日期函数流程函数 我们今天来看MySQL中为我们提供了哪些函数&#xff1a; MySQL中的函数主要分为以下四类&#xff1a; 字符串函数、数值函数、日期函数、流程函数。 字符串函数 函数功能CONCAT(S1, S2, ……

6.Nginx

Nginx反向代理 将前端发送的动态请求有Nginx转发到后端服务器 那为何要多一步转发而不直接发送到后端呢&#xff1f; 反向代理的好处&#xff1a; 提高访问速度&#xff08;可以在nginx做缓存&#xff0c;如果请求的是同样的接口地址&#xff0c;这样就不用多次请求后端&#…

LeetCode763:划分字母区间

题目描述 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。 返回一个表示每个字符串片段的长度的列表。 …

Unity初级---初识生命周期

1. Awake() &#xff1a;唤醒函数&#xff0c;最先执行的函数&#xff0c;只执行一次&#xff0c;当脚本文件挂载的对象被激活时调用 2. OnEnable() &#xff0c;OnDisable()&#xff1a;当脚本启用和禁用时触发&#xff0c;可执行多次&#xff0c;触发的前提是脚本挂载的对象…