using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Collections.Concurrent;using System.Threading;namespace 并发编程{ class PRogram { static ConcurrentQueue<Order> orderQueue = new ConcurrentQueue<Order>(); static void Main(string[] args) { ProductData<Order> productData = new ProductData<Order>(orderQueue); Thread.Sleep(TimeSpan.FromSeconds(2)); //C#并发编程(一) //单线程处理 //SingleThreadDo(); //多线程处理 //MutilThreadDo(); //并行处理 //ParallelDo(); //异步处理 AsyncDo(); while (true) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (orderQueue.Count == 0) { Console.WriteLine("订单全部处理完成"); } } } /// <summary> /// 异步 /// </summary> static void AsyncDo() { int maxTaskCount = 100; int currentTaskCount = 0; OrderMgr orderMgr = new OrderMgr(); Order order; while (true) { if (orderQueue.TryDequeue(out order)) { Console.WriteLine("当前还剩:{0}", orderQueue.Count); while (currentTaskCount > maxTaskCount) { } var delegateAct = new Action(() => { orderMgr.SaveOrder(order); }); delegateAct.BeginInvoke(new AsyncCallback(r => { delegateAct.EndInvoke(r); Interlocked.Decrement(ref currentTaskCount); }), null); Interlocked.Increment(ref currentTaskCount); } } } /// <summary> /// 并行处理 /// </summary> static void ParallelDo() { OrderMgr orderMgr = new OrderMgr(); Order order; while (orderQueue.Count > 0) { List<Order> orderList = new List<Order>(); if (orderQueue.TryDequeue(out order)) { orderList.Add(order); if (orderQueue.Count > 100) { for (int i = 0; i < 100; i++) { orderQueue.TryDequeue(out order); orderList.Add(order); } } //Parallel中的任务全部执行完成才会往下走 Parallel.ForEach(orderList, (p) => { orderMgr.SaveOrder(order); Console.WriteLine("当前还剩:{0}", orderQueue.Count); }); } } } /// <summary> /// 多线程处理 /// </summary> static void MutilThreadDo() { int maxTaskCount = 100; int currentTaskCount = 0; OrderMgr orderMgr = new OrderMgr(); Order order; while (orderQueue.Count > 0) { if (orderQueue.TryDequeue(out order)) { Console.WriteLine("当前还剩:{0}", orderQueue.Count); while (currentTaskCount > maxTaskCount) { } Task.Factory.StartNew(() => { orderMgr.SaveOrder(order); Interlocked.Decrement(ref currentTaskCount); }); Interlocked.Increment(ref currentTaskCount); } } } /// <summary> /// 单线程处理 /// </summary> static void SingleThreadDo() { OrderMgr orderMgr = new OrderMgr(); Order order; while (orderQueue.Count > 0) { if (orderQueue.TryDequeue(out order)) { Console.WriteLine("当前还剩:{0}", orderQueue.Count); orderMgr.SaveOrder(order); } } } } public class ProductData<T> where T : Order, new() { ConcurrentQueue<T> queue; public ProductData(ConcurrentQueue<T> queue) { this.queue = queue; Data(); } private void Data() { for (int j = 0; j < 100; j++) { queue.Enqueue(new T { ID = Guid.NewGuid(), NO = string.Format("J{0:00000000}", j) }); } var i = 1; Task.Factory.StartNew(() => { while (true) { queue.Enqueue(new T { ID = Guid.NewGuid(), NO = string.Format("{0:00000000}", i) }); i++; Thread.Sleep(TimeSpan.FromMilliseconds(100));//每1S生产10个订单 } }); } } public class Order { public Guid ID { get; set; } public string NO { get; set; } } public class OrderMgr { public OrderMgr() { } public string SaveOrder(Order order) { Console.WriteLine("订单:" + order.NO + ",处理完成"); Thread.Sleep(TimeSpan.FromSeconds(2));//假设每个订单的处理时间为2S return order.NO; } }}
新闻热点
疑难解答