在一些常见的编程情形中,使用任务也许能提升性能。为了简化变成,静态类System.Threading.Tasks.Parallel封装了这些常见的情形,它内部使用Task对象。
Parallel.For & Parallel.Foreach & Pararllel.Invoke
Parallel.For(0, 1000, (i) => { //i是从0开始一直到1000结束 }); var lst = new List(); Parallel.ForEach(lst, (s) => { //do something }); //Invoke Parallel.Invoke( () => { }, () => { }, () => { } );
三个不同的方式做并行操作,Invoke是自己定义的并行,像上面的代码,只会开三个线程去做,而For和Foreach和普通的没什么区别只是并行去做了.
这里需要注意的是:这个三个方法都会堵塞当前线程,要等待所有线程都做完了以后才能往下执行,这是有用的,当进行数据分析的时候,每条数据都是独立的,这个时候等待所有线程做完是有意义的,而如果需要不堵塞线程可以用TASK来包一层,具体请参照。在并发的时候如果有某些线程出现了异常,这个时候不会中断线程,会在所有线程结束的时候抛出AggregateException(并不是所有异常都能用这个捕获),我们通过这个异常可以知道所有的异常:
Parallel.ForEach(new List{ "aa", "bb", "cc", "dd", "ee", }, (b) => { Thread.Sleep(1000); throw new Exception(b); }); } catch (AggregateException ex) { foreach (var exception in ex.Flatten().InnerExceptions) { Console.WriteLine(exception.Message); } }
ParallelOptions
看一下官方给出的解释:
public class ParallelOptions { public ParallelOptions(); //允许取消操作 public CancellationToken CancellationToken { get; set; } //允许指定可以并发操作的最大工作项目数 public int MaxDegreeOfParallelism { get; set; } //允许指定要使用哪个TaskScheduler public TaskScheduler TaskScheduler { get; set; } }
这个类提供我们最大并发数量的设置,取消操作的Token赋值,以及任务调度器的设置(关于这个,需要另拉一章来说,此处只关注前面两个)
//定义线程取消的一个对象 var cancel = new CancellationTokenSource(); var po = new ParallelOptions() { CancellationToken = cancel.Token, MaxDegreeOfParallelism = 2, }; //为了不堵塞线程,这里开启一个线程 Task.Run(() => { try { Parallel.For(0, 1000, po, (i) => { Thread.Sleep(1000); po.CancellationToken.ThrowIfCancellationRequested(); Console.WriteLine(i); }); } catch (AggregateException ex) { foreach (var e in ex.Flatten().InnerExceptions) { Console.WriteLine(e.Message); } } catch (Exception ex) { //OperationCanceledException Console.WriteLine(ex.GetType().Name); } //不设置取消的TOKEN }, CancellationToken.None); Thread.Sleep(10 * 1000); cancel.Cancel();
线程最大并行数位2个,如果取消的话,整个Parallel会全部取消,并且抛出OperationCanceledException异常。
Parallel.For & Parallel.Foreach的重载
我只挑选了一个重载来说
public static ParallelLoopResult For(int fromInclusive, int toExclusive, Func localInit, Func body, Action localFinally);
来看一下官方的解释:
localInit:任务局部初始化,为参与工作的每一个任务都调用一次该委托这个委托在任务被要求处理一个工作项之前调用的。
boy:为参与工作的各个线程锁处理的每一项都调用一次该委托
localFinally:任务局部终结委托,为参与工作的每一个任务都调用一次该委托,这个委托是在任务处理好派给它的所有工作项之后调用的。即使主体委托代码引发一个未处理的异常,也会调用它。
public static void Parallel_For_Local_Test(){ int[] nums = Enumerable.Range(0, 1000000).ToArray (); long total = 0; ParallelLoopResult result = Parallel.For(0, nums.Length, () => { return 0; }, (j, loop, subtotal) => { // 延长任务时间,更方便观察下面得出的结论 Thread.SpinWait(200); Console.WriteLine("当前线程ID为:{0},j为{1},subtotal为:{2}。" , Thread.CurrentThread.ManagedThreadId, j.ToString(), subtotal.ToString()); if (j == 23) loop.Break(); if (j > loop.LowestBreakIteration) { Thread.Sleep(4000); Console.WriteLine("j为{0},等待4s种,用于判断已开启且大于阻断迭代是否会运行完。", j.ToString()); } Console.WriteLine("j为{0},LowestBreakIteration为:{1}", j.ToString(), loop.LowestBreakIteration); subtotal += nums[j]; return subtotal; }, (finalResult) => Interlocked.Add(ref total, finalResult) ); Console.WriteLine("total值为:{0}", total.ToString()); if (result.IsCompleted) Console.WriteLine("循环执行完毕"); else Console.WriteLine("{0}" , result.LowestBreakIteration.HasValue ? "调用了Break()阻断循环." : "调用了Stop()终止循环.");}
看一下输出
分析一下:
a) 泛型类型参数TLocal为本地线程数据类型,本示例设置为long。
b) 三个委托的参数解析body(j, loop, subtotal):首先初始委托localInit中返回了0,所以body委托中参数subtotal的初始值即为0,body委托的参数j对应的是当前迭代索引,参数loop为当前迭代状态ParallelLoopState对象;localFinally委托参数为body委托的返回值。
c) 三个委托三个阶段中都可能并行运行,因此您必须同步对任何共享变量的访问,如示例中在finally委托中使用了System.Threading.Interlocked对象。
d) 在索引为23的迭代中调用Break()后:
i. 索引小于23的所有迭代仍会运行(即使还未开始处理),并在退出循环之前处理完。
ii. 索引大于 23 的迭代若还未开启则会被放弃;若已处于运行中则会在退出循环之前处理完。
e) 对于调用Break()之后,在任何循环迭代中访问LowestBreakIteration属性都会返回调用Break()的迭代对应的索引。
注:
ParallelLoopState
可用来使 Tasks.Parallel 循环的迭代与其他迭代交互,并为 Parallel 类的循环提供提前退出循环的功能。此类的实例不要自行创建,它由 Parallel 类创建并提供给每个循环项,并且只应该在提供此实例的“循环内部”使用。
public class ParallelLoopState{ // 获取循环的任何迭代是否已引发相应迭代未处理的异常。 public bool IsExceptional { get; } // 获取循环的任何迭代是否已调用 ParallelLoopState.Stop()。 public bool IsStopped { get; } // 获取在Parallel循环中调用 ParallelLoopState.Break() 的最低循环迭代。 public long? LowestBreakIteration { get; } // 获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。 public bool ShouldExitCurrentIteration { get; } //通知Parallel循环当前迭代”之后”的其他迭代不需要运行。 public void Break(); //通知Parallel循环当前迭代“之外”的所有其他迭代不需要运行。 public void Stop();}
Break()
Break()用于通知Parallel循环当前迭代“之后”的其他迭代不需要运行。例如,对于从 0 到 1000 并行迭代的 for 循环,如果在第 100 次迭代调用 Break(),则低于 100 的所有迭代仍会运行(即使还未开始处理),并在退出循环之前处理完。从 101 到 1000 中还未开启的迭代则会被放弃。
对于已经在执行的长时间运行迭代,Break()将为已运行还未结束的迭代对应ParallelLoopResult结构的LowestBreakIteration属性设置为调用Bread()迭代项的索引。
Stop()
Stop() 用于通知Parallel循环当前迭代“之外”的所有其他迭代不需要运行,无论它们是位于当前迭代的上方还是下方。
对于已经在执行的长时间运行迭代,可以检查 IsStopped属性,在观测到是 true 时提前退出。
Stop 通常在基于搜索的算法中使用,在找到一个结果之后就不需要执行其他任何迭代。(比如在看视频或漫画时自动匹配响应最快的服务器)
ShouldExitCurrentIteration 属性
当循环的迭代调用 Break 或 Stop时,或一个迭代引发异常,或取消循环时,Parallel 类将主动尝试禁止开始执行循环的其他迭代。但是,可能有无法阻止其他迭代启动的情况。也可能是长时间运行的迭代已经开始执行的情况。在此类情况下,迭代可以通过显式检查 ShouldExitCurrentIteration 属性,在该属性返回 true 时停止执行。
LowestBreakIteration 属性
返回过程中调用过Break方法的最低的项,如果从来没有调用过Break则返回null.
这里还有一个好玩的地方, 我们知道Parallel.For & Parallel.Foreach会返回一个ParallelLoopResult类型的对象,我们可以通过IsComplete()来判断是否结束并发,但是如果我们用了ParallelLoopState的Break(), 这个时候IsComplete()返回的是false,并且LowestBreakIteration属性不为null,如果调用的是Stop(), IsComplete()返回的是true,且LowestBreakIteration属性为null.