生产者消费者线程在Queue中实现多线程同步
使用C#进行多线程编程经常会用队列池进行线程同步的方法,实现就用到Queue。Queue是线程安全的(Thread?safe),但不是泛型的,对象出列时需要进行拆箱转换。也有人会马上想到Queue<T>,但可惜的是泛型Queue<T>却不是线程安全,我们需要用其它编程方法来实现它。
下面介绍一种方法,它能够使用泛型Queue<T>进行线程同步,但是需要用到?lock?关键字以及?AutoResetEvent?和?ManualResetEvent?类对主线程和两个辅助线程进行线程同步。
该示例创建两个辅助线程。一个线程生成元素并将它们存储在非线程安全的泛型队列中。另一个线程使用此队列中的项。另外,主线程定期显示队列的内容,以便该队列可由三个线程进行访问。lock?关键字用于同步对队列的访问,以确保队列的状态不会被破坏。
除了只是使用?lock?关键字来防止同时访问以外,还可以用两个事件对象提供进一步的同步。一个事件对象用来通知辅助线程终止,另一个事件对象由制造者线程用来在有新项添加到队列中时通知使用者线程。这两个事件对象封装在一个名为?SyncEvents?的类中。这使事件可以轻松传递给表示制造者线程和使用者线程的对象。SyncEvents?类按如下方式定义:
public class SyncEvents{ public SyncEvents() { _newItemEvent = new AutoResetEvent(false); _exitThreadEvent = new ManualResetEvent(false); _eventArray = new WaitHandle[2]; _eventArray[0] = _newItemEvent; _eventArray[1] = _exitThreadEvent; } public EventWaitHandle ExitThreadEvent { get { return _exitThreadEvent; } } public EventWaitHandle NewItemEvent { get { return _newItemEvent; } } public WaitHandle[] EventArray { get { return _eventArray; } } private EventWaitHandle _newItemEvent; private EventWaitHandle _exitThreadEvent; private WaitHandle[] _eventArray;}
?
对“新项”事件使用?AutoResetEvent?类,因为您希望每当使用者线程响应此事件后,此事件都能自动重置。或者,将?ManualResetEvent?类用于“退出”事件,因为您希望当此事件终止时有多个线程响应。如果您改为使用?AutoResetEvent,则仅在一个线程响应该事件以后,该事件就还原到非终止状态。另一个线程不会响应,因此在这种情况下,将无法终止。
SyncEvents?类创建两个事件,并将它们以两种不同的形式存储:一种是作为?EventWaitHandle(它是?AutoResetEvent?和?ManualResetEvent?的基类),一种是作为基于?WaitHandle?的数组。如关于使用者线程的讨论中所述,此数组是必需的,因为它使使用者线程可以响应两个事件中的任何一个。
使用者线程和制造者线程分别由名为?Consumer?和?Producer?的类表示。这两个类都定义了一个名为?ThreadRun?的方法。这些方法用作?Main?方法创建的辅助线程的入口点。
Producer?类定义的?ThreadRun?方法如下所示:
public class Producer{ public Producer(Queue<int> queue, SyncEvents syncEvents) { _queue = queue; _syncEvents = syncEvents; } Queue<int> _queue; SyncEvents _syncEvents; public void ThreadRun() { int count = 0; Random r = new Random(); while (!_syncEvents.ExitThreadEvent.WaitOne(0, false)) { lock (((ICollection) _queue).SyncRoot) { while (_queue.Count < 20) { _queue.Enqueue(r.Next(0, 100)); _syncEvents.NewItemEvent.Set(); count++; } } } Console.WriteLine("Producer thread: produced {0} items", count); }}
?
此方法一直循环,直到“退出线程”事件变为终止状态。此事件的状态由?WaitOne?方法使用?SyncEvents?类定义的?ExitThreadEvent?属性进行测试。在这种情况下,检查该事件的状态不会阻止当前线程,因为?WaitOne?使用的第一个参数为零,这表示该方法应立即返回。如果?WaitOne?返回?true,则说明该事件当前处于终止状态。如果是这样,ThreadRun?方法将返回,其效果相当于终止执行此方法的辅助线程。
在“退出线程”事件终止前,Producer.ThreadStart?方法将尝试在队列中保留?20?项。项只是?0?到?100?之间的一个整数。在添加新项前,必须锁定该集合,以防止使用者线程和主线程同时访问该集合。这一点是使用?lock?关键字完成的。传递给?lock?的参数是通过?ICollection?接口公开的?SyncRoot?字段。此字段专门为同步线程访问而提供。对该集合的独占访问权限被授予?lock?后面的代码块中包含的所有指令。对于制造者添加到队列中的每个新项,都将调用“新项”事件的?Set?方法。这将通知使用者线程离开挂起状态并开始处理新项。
Consumer?对象还定义名为?ThreadRun?的方法。与制造者的?ThreadRun?类似,此方法由?Main?方法创建的辅助线程执行。然而,使用者的?ThreadStart?必须响应两个事件。Consumer.ThreadRun?方法如下所示:
public class Consumer{ public Consumer(Queue<int> queue, SyncEvents syncEvents) { _queue = queue; _syncEvents = syncEvents; } Queue<int> _queue; SyncEvents _syncEvents; public void ThreadRun() { int count = 0; while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1) { lock (((ICollection) _queue).SyncRoot) { int item = _queue.Dequeue(); } count++; } Console.WriteLine("Consumer Thread: consumed {0} items", count); }}
?
此方法使用?WaitAny?来阻止使用者线程,直到所提供的数组中的任意一个等待句柄变为终止状态。在这种情况下,数组中有两个句柄,一个用来终止辅助线程,另一个用来指示有新项添加到集合中。WaitAny?返回变为终止状态的事件的索引。“新项”事件是数组中的第一个事件,因此索引零表示新项。在这种情况下,检查索引?1(它指示“退出线程”事件),并使用它来确定此方法是否继续使用项。如果“新项”事件处于终止状态,您将通过?lock?获得对集合的独占访问权限并使用新项。因为此示例生成并使用数千个项,所以不显示使用的每个项,而是使用?Main?定期显示队列中的内容,如下面所演示的那样。
Main?方法首先创建一个队列(该队列的内容将被生成和使用)和?SyncEvents?的一个实例(已在前面演示):
Queue<int> queue = new Queue<int>();SyncEvents syncEvents = new SyncEvents();
?
然后,Main?配置?Producer?和?Consumer?对象以供辅助线程使用。然而,此步骤并不创建或启动实际的辅助线程:
Producer producer = new Producer(queue, syncEvents);Consumer consumer = new Consumer(queue, syncEvents);Thread producerThread = new Thread(producer.ThreadRun);Thread consumerThread = new Thread(consumer.ThreadRun);producerThread.Start();consumerThread.Start();
?
请注意,队列和同步事件对象作为构造函数参数同时传递给?Consumer?和?Producer?线程。这提供了两个对象,它们具有执行各自任务所需的共享资源。然后创建两个新的?Thread?对象,并使用每个对象的?ThreadRun?方法作为参数。每个辅助线程在启动时都将此参数用作线程的入口点。
接着,Main?通过调用?Start?方法来启动两个辅助线程,如下所示:
producerThread.Start();consumerThread.Start();
?
此时,创建了两个新的辅助线程,它们独立于当前正在执行?Main?方法的主线程开始异步执行过程。事实上,Main?接下来要做的事情是通过调用?Sleep?方法将主线程挂起。该方法将当前正在执行的线程挂起指定的时间(毫秒)。在此时间间隔过后,Main?将重新激活,这时它将显示队列的内容。Main?重复此过程四次,如下所示:
//主线程显示for (int i = 0; i < 12; i++){ Thread.Sleep(1000); ShowQueueContents(queue);}
?
最后,Main?通过调用“退出线程”事件的?Set?方法通知辅助线程终止,然后对每个辅助线程调用?Join?方法以阻止主线程,直到每个辅助线程都响应该事件并终止。
有一个线程同步的最终示例:ShowQueueContents?方法。与制造者线程和使用者线程类似,此方法使用?lock?获得对队列的独占访问权限。然而在这种情况下,独占访问非常重要,因为?ShowQueueContents?对整个集合进行枚举。对集合进行枚举是一个特别容易由于异步操作而造成数据损坏的操作,因为它需要遍历整个集合的内容。
请注意,ShowQueueContents?是由主线程执行的,因为它被?Main?调用。这意味着,当此方法获得对项队列的独占访问权限时,既阻止了制造者线程访问队列,也阻止了使用者线程访问队列。ShowQueueContents?锁定队列并枚举其内容:
?
?
private static void ShowQueueContents(Queue<int> q){ lock (((ICollection) q).SyncRoot) { foreach (int item in q) { Console.Write("{0} ", item); } } Console.WriteLine();}
?
完整源代码如下
using System;using System.Collections;using System.Collections.Generic;using System.Threading;namespace cjl{ public static class Ubb { static void Main(string[] args) { Queue<int> queue = new Queue<int>(); SyncEvents syncEvents = new SyncEvents(); Producer producer = new Producer(queue, syncEvents); Consumer consumer = new Consumer(queue, syncEvents); Thread producerThread = new Thread(producer.ThreadRun); Thread consumerThread = new Thread(consumer.ThreadRun); producerThread.Start(); consumerThread.Start(); //主线程显示 for (int i = 0; i < 12; i++) { Thread.Sleep(1000); ShowQueueContents(queue); } //设置退出信号,退出 syncEvents.ExitThreadEvent.Set(); } private static void ShowQueueContents(Queue<int> q) { lock (((ICollection) q).SyncRoot) { foreach (int item in q) { Console.Write("{0} ", item); } } Console.WriteLine(); } } public class SyncEvents { public SyncEvents() { _newItemEvent = new AutoResetEvent(false); _exitThreadEvent = new ManualResetEvent(false); _eventArray = new WaitHandle[2]; _eventArray[0] = _newItemEvent; _eventArray[1] = _exitThreadEvent; } public EventWaitHandle ExitThreadEvent { get { return _exitThreadEvent; } } public EventWaitHandle NewItemEvent { get { return _newItemEvent; } } public WaitHandle[] EventArray { get { return _eventArray; } } private EventWaitHandle _newItemEvent; private EventWaitHandle _exitThreadEvent; private WaitHandle[] _eventArray; } public class Producer { public Producer(Queue<int> queue, SyncEvents syncEvents) { _queue = queue; _syncEvents = syncEvents; } Queue<int> _queue; SyncEvents _syncEvents; public void ThreadRun() { int count = 0; Random r = new Random(); while (!_syncEvents.ExitThreadEvent.WaitOne(0, false)) { lock (((ICollection) _queue).SyncRoot) { while (_queue.Count < 20) { _queue.Enqueue(r.Next(0, 100)); _syncEvents.NewItemEvent.Set(); count++; } } } Console.WriteLine("Producer thread: produced {0} items", count); } } public class Consumer { public Consumer(Queue<int> queue, SyncEvents syncEvents) { _queue = queue; _syncEvents = syncEvents; } Queue<int> _queue; SyncEvents _syncEvents; public void ThreadRun() { int count = 0; while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1) { lock (((ICollection) _queue).SyncRoot) { int item = _queue.Dequeue(); } count++; } Console.WriteLine("Consumer Thread: consumed {0} items", count); } }}
?