基于Parallel.ForEach的数据并行使用
1.数据非并行
var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{Console.WriteLine("数据非并行输出:{0}", item);
}
2.数据并行,只要使用Parallel.ForEach
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
3.非并行与并行耗时对比
var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{Console.WriteLine("数据非并行输出:{0}", item);
}
DateTime t2 = DateTime.Now;
TimeSpan t3 = t2 - t1;
Console.WriteLine("非数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);t1 = DateTime.Now;
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
t2 = DateTime.Now;
t3 = t2 - t1;
Console.WriteLine("数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);
基于Parallel.For的数据并行使用
1. Parallel.For返回一个ParallelLoopResult结构
public struct ParallelLoopResult
{internal bool _completed;internal long? _lowestBreakIteration;public bool IsCompleted => _completed;public long? LowestBreakIteration => _lowestBreakIteration;
}
下面的result类型为 ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {
});
完整示例:
//Parallel.For返回一个ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {int delay;lock (rnd);delay = rnd.Next(1, 1001);//随机生成1到1001之间随机数Thread.Sleep(delay);//随机休眠线程//循环调用了退出方法Breakif (state.ShouldExitCurrentIteration){if (state.LowestBreakIteration < i){Console.WriteLine("循环调用了退出方法Break()");return;}}if (i == breakIndex){Console.WriteLine($"随机数与索引相同,将退出循环,索引: {i}");state.Break();//退出循环}Console.WriteLine($"完成循环遍历,当前循环索引: {i}");});if (result.LowestBreakIteration.HasValue)Console.WriteLine($"\nLowest Break Iteration: {result.LowestBreakIteration}");
elseConsole.WriteLine($"\nNo lowest break iteration.");
基于Parallel.ForEach的数据并行操作数据并使用局部变量
//初始化数组
int[] input = { 4, 1, 6, 2, 9, 5, 10, 3 };
int sum = 0;try
{Parallel.ForEach(input, //要并行遍历的集合() => 0, //线程本地初始化//n当前遍历元素,loopState:并行状态对象,localSum局部变量(n, loopState, localSum) => //匿名函数,localSum会初始化为0{localSum += n;//累加元素//输出当前线程ID,元素值,与累加后的元素值Console.WriteLine("Thread={0}, n={1}, localSum={2}", Thread.CurrentThread.ManagedThreadId, n, localSum);//返回累加后的值return localSum;},//任务动作,传入线程本地变量localSum(localSum) => Interlocked.Add(ref sum, localSum) //多线程元子性操作共享变量);//输出并行操作后的结果Console.WriteLine("\nSum={0}", sum);
}
catch (AggregateException e) //捕获线程操作异常
{Console.WriteLine("并行操作数据异常\n{0}", e.Message.ToString());
}
基于Parallel.Invoke的任务并行操作
//静态函数实现,供并行任务使用
static void BasicAction()
{Console.WriteLine("静态方法BasicAction, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
try
{Parallel.Invoke(BasicAction, // 任务1 - 静态方法() => // 任务2 - lambda表达式{Console.WriteLine($"lambda表达式, Thread:{Thread.CurrentThread.ManagedThreadId}");},delegate () // 任务3 - 肉联委托{Console.WriteLine($"肉联委托, Thread:{Thread.CurrentThread.ManagedThreadId}");});
}
catch (AggregateException e) //捕获任务并行异常
{Console.WriteLine($"抛出异常:{e.InnerException.ToString()}");
}
基于Parallel.ForEachAsync的异常并行操作,异步方法要添加await关键字
await Parallel.ForEachAsync(Enumerable.Range(1, 100),async (_, _) => {await Task.Delay(1000);});
在任务中使用数据并行:
//创建任务线程
Task _t = new Task(() => {Console.WriteLine($"这个是任务子线程");Parallel.ForEach(items, (item, state) =>{Console.WriteLine($"{item},{state}");});
});
Console.WriteLine($"任务线程:{_t.Id}");
_t.Start();//开始任务
任务的运行及等待
Thread.CurrentThread.Name = "主线程";
//创建任务并运行
Task taskA = Task.Run(() => {Thread.CurrentThread.Name = "任务子线程";Console.WriteLine($"当前线程名:-> '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");Console.WriteLine("这个是在任务中输出的信息"); //使用匿名函数});
Console.WriteLine($"当前线程名: '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
taskA.Wait();//等待任务完成
任务分离:
//分离任务
var outer = Task.Factory.StartNew(() =>
{Console.WriteLine("任务开始...");var child = Task.Factory.StartNew(() =>{Thread.SpinWait(5000000);Console.WriteLine("任务分离成功.");});
});
outer.Wait();
Console.WriteLine("任务结束.");
任务阻塞:
//阻塞任务
Task[] tasks = new Task[3]
{Task.Factory.StartNew(() => Console.WriteLine("任务1.")),Task.Factory.StartNew(() => Console.WriteLine("任务2.")),Task.Factory.StartNew(() => Console.WriteLine("任务3."))
};
Task.WaitAll(tasks);//阻塞直接所有任务完成
多任务使用:
//多任务使用
Task[] taskArray = new Task[10];
for (int i = 0; i < taskArray.Length; i++)
{//使用任务工厂启动任务taskArray[i] = Task.Factory.StartNew((Object obj) =>{CustomData data = obj as CustomData;//实例化类对象if (data == null) return;data.ThreadNum = Thread.CurrentThread.ManagedThreadId;//赋值当前线程ID},new CustomData() { Name = i, CreationTime = DateTime.Now.Ticks });
};
Task.WaitAll(taskArray);
//遍历任务
foreach (var task in taskArray)
{var data = task.AsyncState as CustomData;if (data != null)Console.WriteLine("任务 #{0} 已创建于 {1}, 在线程 #{2}.",data.Name, data.CreationTime, data.ThreadNum);
}class CustomData
{public long CreationTime;public int Name;public int ThreadNum;
}
完整Demo:
// See https://aka.ms/new-console-template for more informationusing System;
using System.Diagnostics;var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{Console.WriteLine("数据非并行输出:{0}", item);
}
DateTime t2 = DateTime.Now;
TimeSpan t3 = t2 - t1;
Console.WriteLine("非数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);t1 = DateTime.Now;
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
t2 = DateTime.Now;
t3 = t2 - t1;
Console.WriteLine("数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);var rnd = new Random();
Console.WriteLine("开始遍历...");
int breakIndex = rnd.Next(1, 11);
Console.WriteLine($"Will call Break at iteration {breakIndex}\n");//Parallel.For返回一个ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {int delay;lock (rnd);delay = rnd.Next(1, 1001);//随机生成1到1001之间随机数Thread.Sleep(delay);//随机休眠线程//循环调用了退出方法Breakif (state.ShouldExitCurrentIteration){if (state.LowestBreakIteration < i){Console.WriteLine("循环调用了退出方法Break()");return;}}if (i == breakIndex){Console.WriteLine($"随机数与索引相同,将退出循环,索引: {i}");state.Break();//退出循环}Console.WriteLine($"完成循环遍历,当前循环索引: {i}");});if (result.LowestBreakIteration.HasValue)Console.WriteLine($"\nLowest Break Iteration: {result.LowestBreakIteration}");
elseConsole.WriteLine($"\nNo lowest break iteration.");//初始化数组
int[] input = { 4, 1, 6, 2, 9, 5, 10, 3 };
int sum = 0;try
{Parallel.ForEach(input, //要并行遍历的集合() => 0, //线程本地初始化//n当前遍历元素,loopState:并行状态对象,localSum局部变量(n, loopState, localSum) => //匿名函数,localSum会初始化为0{localSum += n;//累加元素//输出当前线程ID,元素值,与累加后的元素值Console.WriteLine("Thread={0}, n={1}, localSum={2}", Thread.CurrentThread.ManagedThreadId, n, localSum);//返回累加后的值return localSum;},//任务动作,传入线程本地变量localSum(localSum) => Interlocked.Add(ref sum, localSum) //多线程元子性操作共享变量);//输出并行操作后的结果Console.WriteLine("\nSum={0}", sum);
}
catch (AggregateException e) //捕获线程操作异常
{Console.WriteLine("并行操作数据异常\n{0}", e.Message.ToString());
}//静态函数实现,供并行任务使用
static void BasicAction()
{Console.WriteLine("静态方法BasicAction, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
try
{Parallel.Invoke(BasicAction, // 任务1 - 静态方法() => // 任务2 - lambda表达式{Console.WriteLine($"lambda表达式, Thread:{Thread.CurrentThread.ManagedThreadId}");},delegate () // 任务3 - 肉联委托{Console.WriteLine($"肉联委托, Thread:{Thread.CurrentThread.ManagedThreadId}");});
}
catch (AggregateException e) //捕获任务并行异常
{Console.WriteLine($"抛出异常:{e.InnerException.ToString()}");
}var watch = Stopwatch.StartNew();
Console.WriteLine(watch.ElapsedMilliseconds);
Console.WriteLine($"当前机器的CPU数量:{Environment.ProcessorCount}");
watch.Restart();//使用异步并行方法
await Parallel.ForEachAsync(Enumerable.Range(1, 100),async (_, _) => {await Task.Delay(1000);});
watch.Stop();
Console.WriteLine($"花费时间:{watch.ElapsedMilliseconds}");
watch.Restart();Thread.CurrentThread.Name = "主线程";
//创建任务线程
Task _t = new Task(() => {Console.WriteLine($"这个是任务子线程");Parallel.ForEach(items, (item, state) =>{Console.WriteLine($"{item},{state}");});
});
Console.WriteLine($"任务线程:{_t.Id}");
_t.Start();//开始任务Thread.CurrentThread.Name = "主线程";
//创建任务并运行
Task taskA = Task.Run(() => {Thread.CurrentThread.Name = "任务子线程";Console.WriteLine($"当前线程名:-> '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");Console.WriteLine("这个是在任务中输出的信息"); //使用匿名函数});
Console.WriteLine($"当前线程名: '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
taskA.Wait();//等待任务完成//分离任务
var outer = Task.Factory.StartNew(() =>
{Console.WriteLine("任务开始...");var child = Task.Factory.StartNew(() =>{Thread.SpinWait(5000000);Console.WriteLine("任务分离成功.");});
});
outer.Wait();
Console.WriteLine("任务结束.");//阻塞任务
Task[] tasks = new Task[3]
{Task.Factory.StartNew(() => Console.WriteLine("任务1.")),Task.Factory.StartNew(() => Console.WriteLine("任务2.")),Task.Factory.StartNew(() => Console.WriteLine("任务3."))
};
Task.WaitAll(tasks);//阻塞直接所有任务完成//多任务使用
Task[] taskArray = new Task[10];
for (int i = 0; i < taskArray.Length; i++)
{//使用任务工厂启动任务taskArray[i] = Task.Factory.StartNew((Object obj) =>{CustomData data = obj as CustomData;//实例化类对象if (data == null) return;data.ThreadNum = Thread.CurrentThread.ManagedThreadId;//赋值当前线程ID},new CustomData() { Name = i, CreationTime = DateTime.Now.Ticks });
};
Task.WaitAll(taskArray);
//遍历任务
foreach (var task in taskArray)
{var data = task.AsyncState as CustomData;if (data != null)Console.WriteLine("任务 #{0} 已创建于 {1}, 在线程 #{2}.",data.Name, data.CreationTime, data.ThreadNum);
}class CustomData
{public long CreationTime;public int Name;public int ThreadNum;
}