Asynchronous Events

From eqqon

Revision as of 23:15, 19 December 2007 by Henon (Talk | contribs)
Jump to: navigation, search

Decoupling Threads via Asynchronous Events

Sometimes it is necessary to fire events without having to wait for them to return. A publisher (an object that exposes events) might run in a real-time thread which must not be delayed by subscriber's execution times. In such a case we need to asynchronously fire an event. Creating a new thread for each event is usually not desired due to efficiency reasons. We present SimpleAsyncPublisher, a very simple solution, which queues all events to be published asynchronously and fires them off on a dedicated thread. SimpleAsyncPublisher runs only one thread (instead of a thread-pool) which means that all queued events will have to wait until the previous events have returned. The obvious advantage of this behavior is that subsequently fired events do not interfere with each other. A disadvantage with long running event handlers might be, that the queue could overflow. However, if events are sent only sporadically and the subscriber's event handlers don't exercise any expensive computations SimpleAsyncPublisher is feasible.

class SimpleAsyncPublisher

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace eqqon
{
    public class SimpleAsyncPublisher : IDisposable
    {
        bool m_abort = false;
        AutoResetEvent m_waithandle;
        Queue<ICommand> m_queue;
        Thread m_thread;

        public event Action<Exception> InvocationFailed;

        public SimpleAsyncPublisher()
        {
            m_queue = new Queue<ICommand>();
            m_waithandle = new AutoResetEvent(false);
            m_thread = new Thread(new ThreadStart(ThreadLoop));
            m_thread.Start();
        }

        // Asynchronously call the given event
        public void Publish(MulticastDelegate d, params object[] args)
        {
            Publish(new MulticastCommand(d, args));
        }

        public void Publish(ICommand cmd)
        {
            m_queue.Enqueue(cmd);
            m_waithandle.Set();
        }

        public void Dispose()
        {
            m_abort = true;
            m_waithandle.Set();
        }

        private void ThreadLoop()
        {
            while (!m_abort)
            {
                m_waithandle.WaitOne();
                if (m_abort) Thread.CurrentThread.Abort();
                while (m_queue.Count > 0)
                    Send(m_queue.Dequeue());
            }
        }

        private void Send(ICommand command)
        {
            try
            {
                command.Send();
            }
            catch (Exception e)
            {
                if (InvocationFailed != null)
                    InvocationFailed(e);
                else
                    throw;
            }
        }
    }

    public interface ICommand
    {
        void Send();
    }

    // Command holds a delegate and its parameters, ready to be call.
    public class MulticastCommand : ICommand
    {
        MulticastDelegate m_delegate;
        object[] m_arguments;

        public MulticastCommand(MulticastDelegate d, params object[] args)
        {
            if (d == null) throw new ArgumentException("Delegate must not be null!", "d");
            m_delegate = d;
            m_arguments = args;
        }

        public void Send()
        {
            foreach (Delegate d in m_delegate.GetInvocationList())
                d.Method.Invoke(d.Target, m_arguments);
        }
    }

}

Test program and output

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace eqqon
{
    class Program
    {
        public static void TestMethod1(string s)
        {
            Thread.Sleep(100);
            Console.WriteLine("TestMethod1:"+ s);
        }

        public static void TestMethod2(string s)
        {
            Thread.Sleep(100);
            Console.WriteLine("TestMethod2:" + s);
        }

        public static void TestMethod3(string s)
        {
            Thread.Sleep(100);
            Console.WriteLine("TestMethod3:" + s);
        }

        public static event Action<string> TestEvent1;
        public static event Action<string> TestEvent2;

        static void Main(string[] args)
        {
            TestEvent1 += TestMethod1;
            TestEvent1 += TestMethod2;
            TestEvent2 += TestMethod3;

            using (SimpleAsyncPublisher publisher = new SimpleAsyncPublisher())
            {
                Console.WriteLine("Publishing async event TestEvent1");
                publisher.Publish(TestEvent1, "Hello");
                Console.WriteLine("Publishing async event TestEvent2");
                publisher.Publish(TestEvent2, "World!");
                Console.WriteLine("Done.");
                Console.ReadLine();
            }
        }
    }
}

Output
Publishing async event TestEvent1
Publishing async event TestEvent2
Done.
TestMethod1:Hello
TestMethod2:Hello
TestMethod3:World!

--Henon 00:14, 20 December 2007 (CET)