Streaming between Threads or Processes
From eqqon
(Difference between revisions)
Henon (Talk | contribs)
(New page: == Threading and Streaming == To let one thread send data to other threads without interfering with them we combined a '''Queue''' with an '''AutoResetEvent''' in our '''Stream''' class. T...)
(New page: == Threading and Streaming == To let one thread send data to other threads without interfering with them we combined a '''Queue''' with an '''AutoResetEvent''' in our '''Stream''' class. T...)
Latest revision as of 15:46, 30 October 2007
Threading and Streaming
To let one thread send data to other threads without interfering with them we combined a Queue with an AutoResetEvent in our Stream class. The stream maintains it's own receiver thread which sleeps (without polling) until some token is available. Then the OnReceive event is fired which executes the processing routines without delaying the sender. This decouples the sender and the receiver's execution contexts from each other which is needed for realtime applications. For optimal efficiency the generic Queue<T> has been chosen to avoid a lot of boxing and unboxing operations.
Stream<T>
using System; using System.Threading; using System.Collections; using System.Collections.Generic; namespace Threading { public class Stream<T> { AutoResetEvent e; Queue<T> q; Thread receiver; bool abort; public Stream() { q = new Queue<T>(); e = new AutoResetEvent(false); StartReceiving(); } public AutoResetEvent WaitHandle { get { return e; } set { e = value; } } public void Send(T value) { q.Enqueue(value); e.Set(); } public T Receive() { return q.Dequeue(); } public void StopReceiving() { abort = true; e.Set(); } public void StartReceiving() { abort = false; receiver = new Thread(delegate() { //Console.WriteLine("Starting Receiver"); while (true) { //Console.Write("."); e.WaitOne(); if (abort) Thread.CurrentThread.Abort(); if (OnReceive == null) continue; while (q.Count > 0) OnReceive(q.Dequeue()); } }); receiver.Start(); } public event Action<T> OnReceive; } }