C#异步编程之数据并行及任务并行

news/2024/11/16 15:28:42/

基于Parallel.ForEach的数据并行使用

1.数据非并行

var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{Console.WriteLine("数据非并行输出:{0}", item);
}

2.数据并行,只要使用Parallel.ForEach

Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));

3.非并行与并行耗时对比

var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{Console.WriteLine("数据非并行输出:{0}", item);
}
DateTime t2 = DateTime.Now;
TimeSpan t3 = t2 - t1;
Console.WriteLine("非数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);t1 = DateTime.Now;
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
t2 = DateTime.Now;
t3 = t2 - t1;
Console.WriteLine("数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);

 基于Parallel.For的数据并行使用

1. Parallel.For返回一个ParallelLoopResult结构

public struct ParallelLoopResult
{internal bool _completed;internal long? _lowestBreakIteration;public bool IsCompleted => _completed;public long? LowestBreakIteration => _lowestBreakIteration;
}

下面的result类型为 ParallelLoopResult结构

var result = Parallel.For(1, 101, (i, state) => {
});

 完整示例:

//Parallel.For返回一个ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {int delay;lock (rnd);delay = rnd.Next(1, 1001);//随机生成1到1001之间随机数Thread.Sleep(delay);//随机休眠线程//循环调用了退出方法Breakif (state.ShouldExitCurrentIteration){if (state.LowestBreakIteration < i){Console.WriteLine("循环调用了退出方法Break()");return;}}if (i == breakIndex){Console.WriteLine($"随机数与索引相同,将退出循环,索引: {i}");state.Break();//退出循环}Console.WriteLine($"完成循环遍历,当前循环索引: {i}");});if (result.LowestBreakIteration.HasValue)Console.WriteLine($"\nLowest Break Iteration: {result.LowestBreakIteration}");
elseConsole.WriteLine($"\nNo lowest break iteration.");

基于Parallel.ForEach的数据并行操作数据并使用局部变量

//初始化数组
int[] input = { 4, 1, 6, 2, 9, 5, 10, 3 };
int sum = 0;try
{Parallel.ForEach(input,                          //要并行遍历的集合() => 0,                        //线程本地初始化//n当前遍历元素,loopState:并行状态对象,localSum局部变量(n, loopState, localSum) =>     //匿名函数,localSum会初始化为0{localSum += n;//累加元素//输出当前线程ID,元素值,与累加后的元素值Console.WriteLine("Thread={0}, n={1}, localSum={2}", Thread.CurrentThread.ManagedThreadId, n, localSum);//返回累加后的值return localSum;},//任务动作,传入线程本地变量localSum(localSum) => Interlocked.Add(ref sum, localSum) //多线程元子性操作共享变量);//输出并行操作后的结果Console.WriteLine("\nSum={0}", sum);
}
catch (AggregateException e) //捕获线程操作异常
{Console.WriteLine("并行操作数据异常\n{0}", e.Message.ToString());
}

基于Parallel.Invoke的任务并行操作

//静态函数实现,供并行任务使用
static void BasicAction()
{Console.WriteLine("静态方法BasicAction, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
try
{Parallel.Invoke(BasicAction,    // 任务1 - 静态方法() =>           // 任务2 - lambda表达式{Console.WriteLine($"lambda表达式, Thread:{Thread.CurrentThread.ManagedThreadId}");},delegate ()     // 任务3 - 肉联委托{Console.WriteLine($"肉联委托, Thread:{Thread.CurrentThread.ManagedThreadId}");});
}
catch (AggregateException e) //捕获任务并行异常
{Console.WriteLine($"抛出异常:{e.InnerException.ToString()}");
}

基于Parallel.ForEachAsync的异常并行操作,异步方法要添加await关键字

await Parallel.ForEachAsync(Enumerable.Range(1, 100),async (_, _) => {await Task.Delay(1000);});

在任务中使用数据并行:

//创建任务线程
Task _t = new Task(() => {Console.WriteLine($"这个是任务子线程");Parallel.ForEach(items, (item, state) =>{Console.WriteLine($"{item},{state}");});
});
Console.WriteLine($"任务线程:{_t.Id}");
_t.Start();//开始任务

任务的运行及等待

Thread.CurrentThread.Name = "主线程";
//创建任务并运行
Task taskA = Task.Run(() => {Thread.CurrentThread.Name = "任务子线程";Console.WriteLine($"当前线程名:-> '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");Console.WriteLine("这个是在任务中输出的信息"); //使用匿名函数});
Console.WriteLine($"当前线程名: '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
taskA.Wait();//等待任务完成

任务分离:

//分离任务
var outer = Task.Factory.StartNew(() =>
{Console.WriteLine("任务开始...");var child = Task.Factory.StartNew(() =>{Thread.SpinWait(5000000);Console.WriteLine("任务分离成功.");});
});
outer.Wait();
Console.WriteLine("任务结束.");

任务阻塞:

//阻塞任务
Task[] tasks = new Task[3]
{Task.Factory.StartNew(() => Console.WriteLine("任务1.")),Task.Factory.StartNew(() => Console.WriteLine("任务2.")),Task.Factory.StartNew(() => Console.WriteLine("任务3."))
};
Task.WaitAll(tasks);//阻塞直接所有任务完成

多任务使用:

//多任务使用
Task[] taskArray = new Task[10];
for (int i = 0; i < taskArray.Length; i++)
{//使用任务工厂启动任务taskArray[i] = Task.Factory.StartNew((Object obj) =>{CustomData data = obj as CustomData;//实例化类对象if (data == null) return;data.ThreadNum = Thread.CurrentThread.ManagedThreadId;//赋值当前线程ID},new CustomData() { Name = i, CreationTime = DateTime.Now.Ticks });
};
Task.WaitAll(taskArray);
//遍历任务
foreach (var task in taskArray)
{var data = task.AsyncState as CustomData;if (data != null)Console.WriteLine("任务 #{0} 已创建于 {1}, 在线程 #{2}.",data.Name, data.CreationTime, data.ThreadNum);
}class CustomData
{public long CreationTime;public int Name;public int ThreadNum;
}

完整Demo:

// See https://aka.ms/new-console-template for more informationusing System;
using System.Diagnostics;var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{Console.WriteLine("数据非并行输出:{0}", item);
}
DateTime t2 = DateTime.Now;
TimeSpan t3 = t2 - t1;
Console.WriteLine("非数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);t1 = DateTime.Now;
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
t2 = DateTime.Now;
t3 = t2 - t1;
Console.WriteLine("数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);var rnd = new Random();
Console.WriteLine("开始遍历...");
int breakIndex = rnd.Next(1, 11);
Console.WriteLine($"Will call Break at iteration {breakIndex}\n");//Parallel.For返回一个ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {int delay;lock (rnd);delay = rnd.Next(1, 1001);//随机生成1到1001之间随机数Thread.Sleep(delay);//随机休眠线程//循环调用了退出方法Breakif (state.ShouldExitCurrentIteration){if (state.LowestBreakIteration < i){Console.WriteLine("循环调用了退出方法Break()");return;}}if (i == breakIndex){Console.WriteLine($"随机数与索引相同,将退出循环,索引: {i}");state.Break();//退出循环}Console.WriteLine($"完成循环遍历,当前循环索引: {i}");});if (result.LowestBreakIteration.HasValue)Console.WriteLine($"\nLowest Break Iteration: {result.LowestBreakIteration}");
elseConsole.WriteLine($"\nNo lowest break iteration.");//初始化数组
int[] input = { 4, 1, 6, 2, 9, 5, 10, 3 };
int sum = 0;try
{Parallel.ForEach(input,                          //要并行遍历的集合() => 0,                        //线程本地初始化//n当前遍历元素,loopState:并行状态对象,localSum局部变量(n, loopState, localSum) =>     //匿名函数,localSum会初始化为0{localSum += n;//累加元素//输出当前线程ID,元素值,与累加后的元素值Console.WriteLine("Thread={0}, n={1}, localSum={2}", Thread.CurrentThread.ManagedThreadId, n, localSum);//返回累加后的值return localSum;},//任务动作,传入线程本地变量localSum(localSum) => Interlocked.Add(ref sum, localSum) //多线程元子性操作共享变量);//输出并行操作后的结果Console.WriteLine("\nSum={0}", sum);
}
catch (AggregateException e) //捕获线程操作异常
{Console.WriteLine("并行操作数据异常\n{0}", e.Message.ToString());
}//静态函数实现,供并行任务使用
static void BasicAction()
{Console.WriteLine("静态方法BasicAction, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
try
{Parallel.Invoke(BasicAction,    // 任务1 - 静态方法() =>           // 任务2 - lambda表达式{Console.WriteLine($"lambda表达式, Thread:{Thread.CurrentThread.ManagedThreadId}");},delegate ()     // 任务3 - 肉联委托{Console.WriteLine($"肉联委托, Thread:{Thread.CurrentThread.ManagedThreadId}");});
}
catch (AggregateException e) //捕获任务并行异常
{Console.WriteLine($"抛出异常:{e.InnerException.ToString()}");
}var watch = Stopwatch.StartNew();
Console.WriteLine(watch.ElapsedMilliseconds);
Console.WriteLine($"当前机器的CPU数量:{Environment.ProcessorCount}");
watch.Restart();//使用异步并行方法
await Parallel.ForEachAsync(Enumerable.Range(1, 100),async (_, _) => {await Task.Delay(1000);});
watch.Stop();
Console.WriteLine($"花费时间:{watch.ElapsedMilliseconds}");
watch.Restart();Thread.CurrentThread.Name = "主线程";
//创建任务线程
Task _t = new Task(() => {Console.WriteLine($"这个是任务子线程");Parallel.ForEach(items, (item, state) =>{Console.WriteLine($"{item},{state}");});
});
Console.WriteLine($"任务线程:{_t.Id}");
_t.Start();//开始任务Thread.CurrentThread.Name = "主线程";
//创建任务并运行
Task taskA = Task.Run(() => {Thread.CurrentThread.Name = "任务子线程";Console.WriteLine($"当前线程名:-> '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");Console.WriteLine("这个是在任务中输出的信息"); //使用匿名函数});
Console.WriteLine($"当前线程名: '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
taskA.Wait();//等待任务完成//分离任务
var outer = Task.Factory.StartNew(() =>
{Console.WriteLine("任务开始...");var child = Task.Factory.StartNew(() =>{Thread.SpinWait(5000000);Console.WriteLine("任务分离成功.");});
});
outer.Wait();
Console.WriteLine("任务结束.");//阻塞任务
Task[] tasks = new Task[3]
{Task.Factory.StartNew(() => Console.WriteLine("任务1.")),Task.Factory.StartNew(() => Console.WriteLine("任务2.")),Task.Factory.StartNew(() => Console.WriteLine("任务3."))
};
Task.WaitAll(tasks);//阻塞直接所有任务完成//多任务使用
Task[] taskArray = new Task[10];
for (int i = 0; i < taskArray.Length; i++)
{//使用任务工厂启动任务taskArray[i] = Task.Factory.StartNew((Object obj) =>{CustomData data = obj as CustomData;//实例化类对象if (data == null) return;data.ThreadNum = Thread.CurrentThread.ManagedThreadId;//赋值当前线程ID},new CustomData() { Name = i, CreationTime = DateTime.Now.Ticks });
};
Task.WaitAll(taskArray);
//遍历任务
foreach (var task in taskArray)
{var data = task.AsyncState as CustomData;if (data != null)Console.WriteLine("任务 #{0} 已创建于 {1}, 在线程 #{2}.",data.Name, data.CreationTime, data.ThreadNum);
}class CustomData
{public long CreationTime;public int Name;public int ThreadNum;
}


http://www.ppmy.cn/news/84916.html

相关文章

自学网络安全遇到问题怎么解决?路线是什么

自学网络安全很容易学着学着就迷茫了&#xff0c;找到源头问题&#xff0c;解决它就可以了&#xff0c;所以首先咱们聊聊&#xff0c;学习网络安全方向通常会有哪些问题&#xff0c;看到后面有惊喜哦 1、打基础时间太长 学基础花费很长时间&#xff0c;光语言都有几门&#xf…

调优圣经:零基础精通Jmeter分布式压测,10Wqps+超高并发

高并发压测的场景 在40岁老架构师尼恩的读者社群&#xff08;50&#xff09;中&#xff0c;很多小伙伴拿不到offer&#xff0c;或者拿不到好的offer。 尼恩经常给大家 优化项目&#xff0c;优化简历&#xff0c;挖掘技术亮点。 在指导简历的过程中&#xff0c; Java 调优是一…

【C++/嵌入式笔试面试八股】一、23.结构体指针 | 指针和引用 | 万能指针 | 野指针

结构体指针 28.将结构体作为参数向函数中传递 传递方式有两种: 值传递地址传递,利用操作符 -> 可以通过结构体指针访问结构体属性//学生结构体定义 struct student {//成员列表string name; //姓名int age; //年龄int score; //分数 };//值传递

Golang如何在VS Code中配置和调试Gin

学习目标: 了解如何在VS Code中配置Golang的Gin框架学习安装Go扩展和Delve调试器通过一个简单的例子学习如何Gin框架学习内容: 1. Gin 框架的介绍: Gin是一个使用Go语言编写的Web框架。它通过提供一组简单的API来帮助开发人员构建高性能、可扩展的Web应用程序。 以下是Gi…

Linux防火墙----firewalld

文章目录 一、firewalld概述二、firewalld 与 iptables 的区别三、firewalld 区域的概念四、firewalld数据处理流程五、firewalld防火墙的配置方法5.1 使用firewall-config 图形工具5.2 编写/etc/firewalld/中的配置文件5.3使用firewall-cmd 命令行工具 一、firewalld概述 fir…

PyG的Planetoid无法直接下载Cora等数据集的解决方法

问题描述&#xff1a; 在使用PyG的时候&#xff0c;通常会涉及到一些公共数据集的下载&#xff0c;由于网络问题&#xff0c;导致无法下载出现以下问题&#xff1a; 尝试了很多的方法都没有成功&#xff08;主要是个人比较菜&#xff01;&#xff09;。但是皇天不负有心人&am…

Java反射机制深入详解

一.概念 反射就是把Java的各种成分映射成相应的Java类。 Class类的构造方法是private&#xff0c;由JVM创建。 反射是java语言的一个特性&#xff0c;它允程序在运行时&#xff08;注意不是编译的时候&#xff09;来进行自我检查并且对内部的成员进行操作。例如它允许一个ja…

Java基础(45)字符流的使用

字符输入流 Reader 类是所有字符流输入类的父类&#xff0c;该类定义了许多方法&#xff0c;这些方法对所有子类都是有效的。 Reader 类的常用子类如下。 CharArrayReader 类&#xff1a;将字符数组转换为字符输入流&#xff0c;从中读取字符。StringReader 类&#xff1a;将字…