Streaming between Threads or Processes

From eqqon

(Difference between revisions)
Jump to: navigation, search
Henon (Talk | contribs)
(New page: == Threading and Streaming == To let one thread send data to other threads without interfering with them we combined a '''Queue''' with an '''AutoResetEvent''' in our '''Stream''' class. T...)

Latest revision as of 15:46, 30 October 2007

Threading and Streaming

To let one thread send data to other threads without interfering with them we combined a Queue with an AutoResetEvent in our Stream class. The stream maintains it's own receiver thread which sleeps (without polling) until some token is available. Then the OnReceive event is fired which executes the processing routines without delaying the sender. This decouples the sender and the receiver's execution contexts from each other which is needed for realtime applications. For optimal efficiency the generic Queue<T> has been chosen to avoid a lot of boxing and unboxing operations.

Stream<T>

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

namespace Threading
{
    public class Stream<T>
    {
        AutoResetEvent e;
        Queue<T> q;
        Thread receiver;
        bool abort;

        public Stream()
        {
            q = new Queue<T>();
            e = new AutoResetEvent(false);
            StartReceiving();
        }

        public AutoResetEvent WaitHandle
        {
            get { return e; }
            set { e = value; }
        }

        public void Send(T value)
        {
            q.Enqueue(value);
            e.Set();
        }

        public T Receive()
        {
            return q.Dequeue();
        }

        public void StopReceiving() { abort = true; e.Set(); }

        public void StartReceiving()
        {
            abort = false;
            receiver = new Thread(delegate()
            {
                //Console.WriteLine("Starting Receiver");
                while (true)
                {
                    //Console.Write(".");
                    e.WaitOne();
                    if (abort) Thread.CurrentThread.Abort();
                    if (OnReceive == null) continue;
                    while (q.Count > 0)
                        OnReceive(q.Dequeue());
                }
            });
            receiver.Start();
        }

        public event Action<T> OnReceive;
    }
}