文章目录
- 一:newScheduledThreadPool(周期性线程池)
- 1.1 特点
- 1.2 核心线程数
- 1.3 创建实例
- 1.4 常用方法
- 1.4.1 schedule方法
- 1.4.2 scheduleAtFixedRate方法
- 1.4.3 scheduleWithFixedDelay方法
- 二:多线程下载展示文件总大小、剩余时间、下载速率
- 2.1 文件总大小
- 2.2 累计下载大小
- 2.2.1 Volatile
- 2.2.2 LongAdder
- 2.3 前一次下载的大小
- 2.4 重写run方法
- 三:编写下载器
- 3.1 编写http工具类
- 3.2 核心下载功能
- 3.3 编写分片下载的线程DownloaderTask
- 3.4 合并文件
- 3.5 清理临时文件
- 3.6 编写main方法
一:newScheduledThreadPool(周期性线程池)
1.1 特点
延时启动 、定时启动 、可以自定义最大线程池数量
1.2 核心线程数
int cpuNubmer = Runtime.getRuntime().availableProcessors();
1.3 创建实例
public ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(cpuNubmer);
1.4 常用方法
1.4.1 schedule方法
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;/*** @description 测试周期性线程池ScheduledThread-schedule方法*/
public class ScheduledThreadTest {public static void main(String[] args) throws Exception {ScheduledExecutorService executorService = ScheduledThread.getInstance();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");// 延迟2秒执行下一个任务System.out.println("当前时间=" + sdf.format(new Date()));for (int i = 0; i < 9; i++) {int finalI = i;ScheduledFuture future = executorService.schedule(new Callable<String>() {@Overridepublic String call() throws Exception {int value = finalI;System.out.println("时间=" + sdf.format(new Date()) + ",线程=" + Thread.currentThread().getName() + ",任务=" + value);
// Thread.sleep(3);return "call";}}, 2, TimeUnit.SECONDS);System.out.println(future.get());}executorService.shutdown();}
}
1.4.2 scheduleAtFixedRate方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
- scheduleAtFixedRate :是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
- initialDelay:表示首次延迟2秒执行
- period :表示周期执行的时间为6秒,即表示会重复执行,重复执行的间隔时间为6秒
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 测试周期性线程池ScheduledThread-scheduleAtFixedRate方法* initialDelay 表示首次延迟2秒执行* period 表示周期执行的时间为6秒,即表示会重复执行,重复执行的间隔时间为6秒* scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,* 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。*/
public class ScheduledThreadTest2 {public static void main(String[] args) throws Exception {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");System.out.println("当前时间="+sdf.format(new Date()));// 延迟2秒执行下一个任务executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("begin=" + sdf.format(new Date()));try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end=" + sdf.format(new Date()));}}, 2, 4, TimeUnit.SECONDS);}
// 周期性执行
// 当执行时间小于period时,下次执行时间= 当前执行开始时间+period
// 当前时间=2021-09-16 05:21:59
// begin=2021-09-16 05:22:01
// end=2021-09-16 05:22:05
// begin=2021-09-16 05:22:07
// end=2021-09-16 05:22:11
// 当执行时间大于period时,下次执行时间 = 当前执行结束时间
// 当前时间=2021-09-16 05:23:13
// begin=2021-09-16 05:23:15
// end=2021-09-16 05:23:20
// begin=2021-09-16 05:23:20
// end=2021-09-16 05:23:25
}
1.4.3 scheduleWithFixedDelay方法
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** @description 测试周期性线程池ScheduledThread-scheduleWithFixedDelay方法* scheduleWithFixedDelay,是从上一个任务结束时开始计时,period时间过去后,再次执行下一次任务。*/
public class ScheduleWithFixedDelayTest {public static void main(String[] args) throws Exception {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");System.out.println("当前时间="+sdf.format(new Date()));// 延迟2秒执行下一个任务executorService.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {System.out.println("begin=" + sdf.format(new Date()));try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end=" + sdf.format(new Date()));}}, 2, 2, TimeUnit.SECONDS);}
// 周期性执行查询
// 执行结果如下:当前时间延迟2秒时间后开始执行线程,下次执行的时间= 当前执行结束时间+period
// 当前时间=2021-09-16 05:13:28
// begin=2021-09-16 05:13:31
// end=2021-09-16 05:13:36
// begin=2021-09-16 05:13:38
// end=2021-09-16 05:13:43
// begin=2021-09-16 05:13:45
// end=2021-09-16 05:13:50
}
二:多线程下载展示文件总大小、剩余时间、下载速率
2.1 文件总大小
/*** 文件总大小*/public long httpFileContentSize;
2.2 累计下载大小
public static volatile LongAdder downSize = new LongAdder();
2.2.1 Volatile
Volatile关键字的作用主要有如下两个:
1.线程的可见性:当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。
2. 顺序一致性:禁止指令重排序。
2.2.2 LongAdder
LongAdder可以是计数器的增强版,高并发下性能会更好,适合频繁的更新,但是不太频繁读取,汇总统计信息时使用分成多个操作单元,不同线程更新不同的单元,只有需要汇总的时候才计算所有单元的操作。其实就是将内存中操作的变量拆分出来,让它变成多个变量(这里和ConcurrentHashMap的原理就很相似了),然后让线程去竞争这些变量,将这些变量处理完后,然后在进行求和,这样降低了变量的并发度,减少了CAS失败次数。
2.3 前一次下载的大小
/*** 前一次下载的大小* currentDownSize - preDownSize就是一秒内下载了多少*/public double preDownSize;
2.4 重写run方法
package com.sysg.file.core;import com.sysg.file.constant.FileConstant;import java.util.concurrent.atomic.LongAdder;/*** 展示下载信息的线程*/
public class DownloadInfoThread implements Runnable {/*** 文件总大小*/public long httpFileContentSize;/*** 累计下载大小* Volatile关键字的作用主要有如下两个:* 1.线程的可见性:当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。* 2. 顺序一致性:禁止指令重排序。** LongAdder可以是计数器的增强版,高并发下性能会更好,适合频繁的更新,但是不太频繁读取,* 汇总统计信息时使用分成多个操作单元,不同线程更新不同的单元,只有需要汇总的时候才计算所有单元的操作。** 其实就是将内存中操作的变量拆分出来,让它变成多个变量( 这里和ConcurrentHashMap的原理就很相似了),* 然后让线程去竞争这些变量,将这些变量处理完后,然后在进行求和,这样降低了变量的并发度,减少了CAS失败次数*/public static volatile LongAdder downSize = new LongAdder();/*** 前一次下载的大小* currentDownSize - preDownSize就是一秒内下载了多少*/public double preDownSize;public DownloadInfoThread(long httpFileContentSize){this.httpFileContentSize = httpFileContentSize;}/*** 通过定时任务线程池,让其每一秒执行一次*/@Overridepublic void run() {//计算文件总大小,单位MBString fileSize = String.format("%.2f", httpFileContentSize/FileConstant.MB);//计算每秒下载速度,单位MBdouble speed = Double.parseDouble(String.format("%.2f", (downSize.doubleValue() - preDownSize)/FileConstant.MB));//计算后,将当前下载大小作为上一次的,currentDownSize在不断变化preDownSize = downSize.doubleValue();//剩余文件大小,文件总大小 - 本地已下载文件的大小 - 当前下载的大小double remainSize = httpFileContentSize - downSize.doubleValue();//估算剩余时间String remainTime = String.format("%.1f", (remainSize / FileConstant.MB / speed));//判断剩余时间是否为无限大if("Infinity".equals(remainTime)){remainTime = "-";}//计算已下载大小String alreadyDownSize = String.format("%.2f", downSize.doubleValue() / FileConstant.MB);String downInfo = String.format("已下载%sMB,文件总大小%sMB,下载速度%smb/s,剩余时间%ss",alreadyDownSize,fileSize,speed,remainTime);System.out.println("\r");System.out.println(downInfo);}
}
三:编写下载器
以下载QQ客户端的url为例:https://dldir1.qq.com/qqfile/qq/PCQQ9.7.9/QQ9.7.9.29059.exe
3.1 编写http工具类
package com.sysg.file.utils;import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;/*** http工具类*/
public class HttpUtil {/*** 得到下载文件长度** @param url url* @return long*/public static long getHttpFileContentLength(String url){HttpURLConnection httpURLConnection = null;try {httpURLConnection = getHttpURLConnection(url);return httpURLConnection.getContentLength();} catch (Exception e) {e.printStackTrace();return 0;} finally {if(httpURLConnection != null){httpURLConnection.disconnect();}}}/*** 分片下载* @param url 下载地址* @param startPos 下载起始位置* @param endPos 下载结束位置* @return*/public static HttpURLConnection getHttpURLConnection(String url,long startPos,long endPos){HttpURLConnection httpURLConnection = getHttpURLConnection(url);LogUtil.info("下载的区间是:{}-{}",startPos,endPos);if(httpURLConnection != null){if (endPos != 0){//bytes=100-200httpURLConnection.setRequestProperty("RANGE","BYTES="+ startPos + "-" + endPos);} else {//下载最后一部分时endPos会赋值为0,此时如果为0,就表示会下载最后一部分httpURLConnection.setRequestProperty("RANGE","BYTES="+ startPos + "-");}return httpURLConnection;}return null;}/*** 获取HttpURLConnection连接对象* @param url 文件的地址* @return*/public static HttpURLConnection getHttpURLConnection(String url){try {//建立连接URL httpUrl = new URL(url);HttpURLConnection urlConnection =(HttpURLConnection)httpUrl.openConnection();//向文件所在服务器发送标识信息,模拟浏览器urlConnection.setRequestProperty("User-Agent","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36");return urlConnection;} catch (IOException e) {e.printStackTrace();return null;}}/*** 获取下载文件的文件名* @param url* @return*/public static String getHttpFileName(String url){//获取最后一次“/”出现的下标int index = url.lastIndexOf("/");String fileName = url.substring(index + 1);if(!fileName.endsWith(".exe")){fileName = fileName + ".exe";}return fileName;}}
3.2 核心下载功能
/*** 下载器** @param url url*/public void downloader(String url){//获取下载的文件名String fileName = HttpUtil.getHttpFileName(url);File file = new File(FileConstant.DOWNLOAD_PATH);if(!file.exists()){file.mkdirs();}//拼接文件下载路径String downPath = FileConstant.DOWNLOAD_PATH + fileName;//获取本地文件大小long localFileLength = FileUtil.getFileContentLength(downPath);//获取连接对象HttpURLConnection httpURLConnection = null;//获取需要下载下载的文件大小int contentLength = 0;try {httpURLConnection = HttpUtil.getHttpURLConnection(url);contentLength = httpURLConnection.getContentLength();//判断文件是否已经下载过if(localFileLength >= contentLength){LogUtil.info("无须重新下载:{}",fileName);return;}//创建获取下载信息的任务对象DownloadInfoThread downloadInfoThread = new DownloadInfoThread(contentLength);/*** 将任务交给线程执行,每隔一秒执行一次* initialDelay:延迟一秒执行* period:每隔一秒执行一次*/scheduledExecutorService.scheduleAtFixedRate(downloadInfoThread,1,1, TimeUnit.SECONDS);//调用切分任务的方法splitFile(url);countDownLatch.await();//合并文件if(mergeFile(fileName)){clearTemp(fileName);}} catch (Exception e) {e.printStackTrace();} finally {System.out.println("\r");System.out.println("下载完成");if(httpURLConnection != null){//关掉连接对象httpURLConnection.disconnect();}//关闭线程scheduledExecutorService.shutdownNow();//关闭线程池fixedThreadPool.shutdown();}}
3.3 编写分片下载的线程DownloaderTask
package com.sysg.file.core;import com.sysg.file.constant.FileConstant;
import com.sysg.file.utils.HttpUtil;
import com.sysg.file.utils.LogUtil;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.util.concurrent.CountDownLatch;/*** 分块下载任务*/
public class DownloaderTask implements Runnable {/*** url*/private final String url;/*** 下载的起始位置*/private final long startPos;/*** 下载的结束位置*/private final long endPos;/*** 标识当前下载块*/private final int part;/*** 程序计数器*/private CountDownLatch countDownLatch;public DownloaderTask(String url, long startPos, long endPos, int part , CountDownLatch countDownLatch) {this.url = url;this.startPos = startPos;this.endPos = endPos;this.part = part;this.countDownLatch = countDownLatch;}@Overridepublic void run() {//获取文件名String httpFileName = HttpUtil.getHttpFileName(url);//下载路径 + 分块文件名httpFileName = FileConstant.DOWNLOAD_PATH + httpFileName + ".temp" + part;//获取分块下载的链接HttpURLConnection httpURLConnection = HttpUtil.getHttpURLConnection(url, startPos, endPos);try(InputStream inputStream = httpURLConnection.getInputStream();BufferedInputStream bis = new BufferedInputStream(inputStream);RandomAccessFile randomAccessFile = new RandomAccessFile(httpFileName,"rw");){byte[] buffer = new byte[FileConstant.BYTE_SIZE];int len = -1;//循环读取数据while((len = bis.read(buffer)) != -1){//一秒内下载数据之和,通过原子类进行操作DownloadInfoThread.downSize.add(len);randomAccessFile.write(buffer,0,len);}} catch (FileNotFoundException e) {LogUtil.error("下载文件不存在:{}",url);} catch (Exception e) {LogUtil.error("url出现异常,此url有问题,请换一个试试");} finally {countDownLatch.countDown();//关闭链接httpURLConnection.disconnect();}}
}
3.4 合并文件
/*** 合并文件** @param fileName 文件名称* @return boolean 文件合并是否成功*/public boolean mergeFile(String fileName){LogUtil.info("开始合并文件:{}",fileName);byte[] buffer = new byte[FileConstant.BYTE_SIZE];int len = -1;try (RandomAccessFile accessFile = new RandomAccessFile(FileConstant.DOWNLOAD_PATH + fileName, "rw")){for (int i = 0; i < FileConstant.THREAD_NUM; i++) {try (FileInputStream fileInputStream = new FileInputStream(FileConstant.DOWNLOAD_PATH + fileName + ".temp" + i);BufferedInputStream bis = new BufferedInputStream(fileInputStream);) {while ((len = bis.read(buffer)) != -1){accessFile.write(buffer,0,len);}} catch (FileNotFoundException e) {e.printStackTrace();}}LogUtil.info("结束合并文件:{}",fileName);return true;} catch (Exception e) {e.printStackTrace();return false;}}
3.5 清理临时文件
/*** 清理临时文件** @param fileName 文件名称*/public void clearTemp(String fileName){for (int i = 0; i < FileConstant.THREAD_NUM; i++) {//删除临时文件File file = new File(FileConstant.DOWNLOAD_PATH,fileName + ".temp" + i);if(file.exists()){file.delete();}}}
3.6 编写main方法
public static void main(String[] args) {String url = "https://dldir1.qq.com/qqfile/qq/PCQQ9.7.9/QQ9.7.9.29059.exe";new Downloader().downloader(url);}