Worker Thread Class

In an article by Juval Löwy, he does a great job of describing and providing an example of a worker thread class. Since many of us are not yet able to pick up .Net 4, we do not yet have the threading beauty that apparently awaits us there. So I adopted this worker thread and made a few changes that fill a few gaps for me. The listing is below along with comments where I made changes to Juval’s original work.

 /// <summary>
 /// Thread mgmt wrapper class - base code taken from devx.com article by Juval Löwy
 /// Added mutex and exception string to allow exception info to be stored and passed out from thread.
 /// Also added ability to pass in delegate that is run as the worker thread function, and some
 /// additional ways to construct.
 /// Added a mutex and wrapped it in a contained class to pass into the worker so that they can monitor
 /// the EndLoop value. I also added a passthrough object to this class so that the worker can take in
 /// an object when it starts and pass one back out when it is done.
 /// Also found a race condition for a test that Juval was doing on the IsAlive when it asserted that
 /// the handle could not be same value as isalive, but when function is done and closing,
 /// this value could be set right at the end and isalive not yet unset. I changed it to a definite
 /// test - that the handle is set when isalive is false. This must be true, since the thread has to exit
 /// that way (unless someone crawls in and kills it which this wrapper is supposed to allow you to avoid
 /// completely).
 /// </summary>
public class WorkerThread : IDisposable
 {
 /// <summary>
 /// Thread function passing ref to Thread Objects so we can:
 /// 1. check to see if we're done, and
 /// 2. Pass in or out values to / from the thread.
 /// </summary>
 public delegate void ThreadFunc(ThreadObjects threadObjects);

 /// <summary>The <see cref="ThreadFunc"/> delegate.</summary>
 protected ThreadFunc threadFunc = null;

 /// <summary>The <see cref="ManualResetEvent"/> m_WorkerStartedEvent - that is reset when the worker thread starts its task.</summary>
 protected ManualResetEvent m_WorkerStartedEvent;
 /// <summary>ManualResetEvent m_WorkerDoneEvent - reset when the worker thread has completed its task.</summary>
 protected ManualResetEvent m_WorkerDoneEvent;
 /// <summary>The <see cref="ManualResetEvent"/> m_WorkerExceptionEvent - reset when an exception occurs</summary>
 protected ManualResetEvent m_WorkerExceptionEvent;

 /// <summary>The System thread that we are using to implement our worker thread</summary>
 protected Thread m_ThreadObj;

 /// <summary>holds the objects used to initialize, communicate with, and pull data back from the worker thread.</summary>
 protected ThreadObjects m_mutexedObjects;

 /// <summary>
 /// This object is passed into the worker so it can monitor when to stop,
 /// and to get / set the passthrough object
 /// </summary>
 public class ThreadObjects
 {
 /// <summary>if true tells thread to stop processing (must be processed by the worker thread user code)</summary>
 protected bool _mEndLoop;
 /// <summary>controls access to m_endloop</summary>
 protected Mutex _endLoopMutex;

 /// <summary>used to pass data into and out of the worker thread</summary>
 protected object _mPassThrough;
 /// <summary>controls access to the passthrough data</summary>
 protected Mutex _mPassThroughMutex;

 /// <summary>
 /// Initializes a new instance of the <see cref="ThreadObjects"/> class.
 /// Creates the mutexes.
 /// </summary>
 public ThreadObjects()
 {
 _endLoopMutex = new Mutex();
 _mPassThroughMutex = new Mutex();
 }
 /// <summary>
 /// Gets or sets a value indicating whether the worker thread should stop processing.
 /// This is initially set false and then set to true by the caller to tell the worker
 /// thread to stop. The worker thread function should check this value periodically so
 /// that the thread can be shutdown gracefully.
 /// </summary>
 /// <value><c>true</c> if processing should stop; otherwise, <c>false</c>.</value>
 public bool EndLoop
 {
 set
 {
 _endLoopMutex.WaitOne();
 _mEndLoop = value;
 _endLoopMutex.ReleaseMutex();
 }
 get
 {
 bool result = false;
 _endLoopMutex.WaitOne();
 result = _mEndLoop;
 _endLoopMutex.ReleaseMutex();
 return result;
 }
 }

 /// <summary>
 /// Gets or sets an object value that the worker thread can use or set.
 /// It is recommended that this only be set before starting the thread by the caller
 /// and that it be checked by the caller only after the thread is complete. Otherwise the
 /// values are possibly unreliable and require careful management.
 /// For instance, this could be used to hold an array of strings indicating the status of
 /// a thread that cyclically performs a task. The Thread could keep it updated and
 /// delete old info out of it when it has new and keep just the last 20 events in there
 /// so a status monitor could check out whats happening from time to time without
 /// forcing the thread to stop and restart.
 /// </summary>
 /// <value>Any value that the caller / worker wishes to pass through.</value>
 public object PassThrough
 {
 set
 {
 _mPassThroughMutex.WaitOne();
 _mPassThrough = value;
 _mPassThroughMutex.ReleaseMutex();
 }
 get
 {
 object result = "";
 _mPassThroughMutex.WaitOne();
 result = _mPassThrough;
 _mPassThroughMutex.ReleaseMutex();
 return result;
 }
 }

 internal void Close()
 {
 _endLoopMutex.Close();
 _mPassThroughMutex.Close();
 }

 private string _name = "";
 /// <summary>Gets or sets the name of the thread.</summary>
 public string Name { get { return _name; }  set { _name = value; } }
 }

 /// <summary>
 /// Gets or sets a value indicating whether the worker thread should stop processing.
 /// This is initially set false and then set to true by the caller to tell the worker
 /// thread to stop. The worker thread function should check this value periodically so
 /// that the thread can be shutdown gracefully.
 /// </summary>
 /// <value><c>true</c> if processing should stop; otherwise, <c>false</c>.</value>
 public bool EndLoop
 {
 get { return m_mutexedObjects.EndLoop; }
 set { m_mutexedObjects.EndLoop = value; }
 }

 /// <summary>
 /// Gets or sets an object value that the worker thread can use or set.
 /// It is recommended that this only be set before starting the thread by the caller
 /// and that it be checked by the caller only after the thread is complete. Otherwise the
 /// values are possibly unreliable and require careful management.
 /// For instance, this could be used to hold an array of strings indicating the status of
 /// a thread that cyclically performs a task. The Thread could keep it updated and
 /// delete old info out of it when it has new and keep just the last 20 events in there
 /// so a status monitor could check out whats happening from time to time without
 /// forcing the thread to stop and restart.
 /// </summary>
 /// <value>Any value that the caller / worker wishes to pass through.</value>
 public object PassThrough
 {
 get { return m_mutexedObjects.PassThrough; }
 set { m_mutexedObjects.PassThrough = value; }
 }

 /// <summary>
 /// Gets the ThreadState of the thread.
 /// </summary>
 /// <value>The <see cref="System.Threading.ThreadState"/> of the underlying thread.</value>
 public ThreadState State { get { return m_ThreadObj.ThreadState; } }

 /// <summary>Holds the exception details (if any) when an exception is generated in the thread so
 /// that it can be picked up by the thread that started this worker.</summary>
 protected string m_ExceptionInfo;

 /// <summary>controls access to the exception info</summary>
 protected Mutex m_ExceptionInfoMutex;

 /// <summary>Gets the thread being managed.</summary>
 public Thread Thread{get{return m_ThreadObj;}}

 /// <summary>
 /// Gets or sets the exception info. A holding place for any exception that occurs in the thread.
 /// It will contain whatever string the worker function puts in it (i.e. message, stack, etc.).
 /// Generally this should be an empty string unless there is an exception, thus allowing the
 /// caller to be able to determine if a thread processed successfully.
 /// </summary>
 /// <value>The exception info, or an empty string if there was no exception.</value>
 public string ExceptionInfo
 {
 set
 {
 m_ExceptionInfoMutex.WaitOne();
 m_ExceptionInfo = value;
 m_ExceptionInfoMutex.ReleaseMutex();
 }
 get
 {
 string result = "";
 m_ExceptionInfoMutex.WaitOne();
 result = m_ExceptionInfo;
 m_ExceptionInfoMutex.ReleaseMutex();
 return result;
 }
 }

 /// <summary>
 /// Gets the generated thread name using the namespace and the current system
 /// ticks. It does not need to be unique, but we make it so in order to tell
 /// threads apart (since they were not explicitly named).
 /// </summary>
 /// <value>The name of the generated thread.</value>
 static string GenThreadName { get { return typeof(WorkerThread).FullName + DateTime.Now.Ticks.ToString(); } }

 /// <summary>
 /// Initializes a new instance of the <see cref="WorkerThread"/> class
 /// using a <see cref="ThreadFunc"/> delegate and a thread name.
 /// </summary>
 /// <param name="threadFuncIn"><see cref="ThreadFunc"/> delegate
 /// passes an object that allows the worker function to get
 /// input and/or set a result, and to monitor if the caller
 /// has requested that the worker stop processing.
 /// </param>
 /// <param name="threadName">Name of the thread. If this is made meaningful, then
 /// it makes debugging easier, so it is recommended that it be set accordingly.</param>
 public WorkerThread(ThreadFunc threadFuncIn, string threadName)
 {
 m_mutexedObjects = new ThreadObjects();
 m_mutexedObjects.Name = threadName;
 m_mutexedObjects.EndLoop = false;
 m_ThreadObj = null;
 m_ExceptionInfo = "";
 m_ExceptionInfoMutex = new Mutex();
 m_WorkerStartedEvent = new ManualResetEvent(false);
 m_WorkerDoneEvent = new ManualResetEvent(false);
 m_WorkerExceptionEvent = new ManualResetEvent(false);
 threadFunc = threadFuncIn;
 ThreadStart threadStart = new ThreadStart(Worker);
 m_ThreadObj = new Thread(threadStart);
 m_ThreadObj.Name = threadName;
 }

 /// <summary>
 /// Initializes a new instance of the <see cref="WorkerThread"/> class
 /// using a <see cref="ThreadFunc"/> delegate. The thread name is generated
 /// using <see cref="GenThreadName"/> which grabs the full name of the WorkerThread type and
 /// appends the system clock ticks.
 /// </summary>
 /// <param name="threadFunc"><see cref="ThreadFunc"/> delegate
 /// passes an object that allows the worker function to get
 /// input and/or set a result, and to monitor if the caller
 /// has requested that the worker stop processing.
 /// </param>
 public WorkerThread(ThreadFunc threadFunc) : this(threadFunc, GenThreadName)
 {
 }

 /// <summary>
 /// Initializes a new instance of the <see cref="WorkerThread"/> class.
 /// </summary>
 /// <param name="autoStart">if set to <c>true</c> if the thread should start
 /// as soon as it is set up. The thread name is generated
 /// using <see cref="GenThreadName"/> which grabs the full
 /// name of the WorkerThread type and appends the system clock ticks.</param>
 /// <param name="threadFunc"><see cref="ThreadFunc"/> delegate
 /// passes an object that allows the worker function to get
 /// input and/or set a result, and to monitor if the caller
 /// has requested that the worker stop processing.
 /// </param>
 public WorkerThread(bool autoStart, ThreadFunc threadFunc)
 : this(autoStart, threadFunc, GenThreadName)
 {
 }

 /// <summary>
 /// Initializes a new instance of the <see cref="WorkerThread"/> class.
 /// </summary>
 /// <param name="autoStart">if set to <c>true</c> if the thread should start
 /// as soon as it is set up. The thread name is generated
 /// using <see cref="GenThreadName"/> which grabs the full
 /// name of the WorkerThread type and appends the system clock ticks.</param>
 /// <param name="threadFunc"><see cref="ThreadFunc"/> delegate
 /// passes an object that allows the worker function to get
 /// input and/or set a result, and to monitor if the caller
 /// has requested that the worker stop processing.
 /// </param>
 /// <param name="threadName">Name of the thread. If this is made meaningful, then
 /// it makes debugging easier, so it is recommended that it be set accordingly.</param>
 public WorkerThread(bool autoStart, ThreadFunc threadFunc, string threadName)
 : this(threadFunc, threadName)
 {
 if (autoStart)
 {
 Start();
 }
 }

 /// <summary>
 /// This <see cref="WaitHandle"/> is used to indicate that the thread worker has started.
 /// It is set at the beginning of the thread function inside the WorkerThread class, and
 /// verified in the IsAlive, that if the thread is dead, that this handle got set.
 /// If not, an exception is thrown.
 /// See also <seealso cref="WorkerDoneHandle"/> and <seealso cref="WorkerExceptionHandle"/>
 /// </summary>
 /// <value>The handle.</value>
 public WaitHandle WorkerStartedHandle
 {
 get
 {
 return m_WorkerStartedEvent;
 }
 }

 /// <summary>
 /// This <see cref="WaitHandle"/> is used to indicate that the thread worker has finished.
 /// It is set at the end of the thread function inside the WorkerThread class, and
 /// verified in the IsAlive, that if the thread is dead, that this handle got set.
 /// If not an exception is thrown.
 /// See also <seealso cref="WorkerStartedHandle"/> and <seealso cref="WorkerExceptionHandle"/>
 /// </summary>
 /// <value>The handle.</value>
 public WaitHandle WorkerDoneHandle
 {
 get
 {
 return m_WorkerDoneEvent;
 }
 }

 /// <summary>
 /// This <see cref="WaitHandle"/> is used to indicate that the thread worker terminated in an exception state.
 /// It is set if an exception is caught in the wrapper worker function.
 /// See also <seealso cref="WorkerStartedHandle"/> and <seealso cref="WorkerDoneHandle"/>
 /// </summary>
 /// <value>The handle.</value>
 public WaitHandle WorkerExceptionHandle
 {
 get
 {
 return m_WorkerExceptionEvent;
 }
 }

 /// <summary>
 /// Starts the thread after checking that it is currently instantiated and
 /// is not alive. This is called automatically if autostart is set in the
 /// call to the constructor.
 /// </summary>
 public void Start()
 {
 if(m_ThreadObj == null)
 throw new NullReferenceException("Start called on null thread object.");

 if(m_ThreadObj.IsAlive)
 throw new ThreadStateException("Start called on thread that is already running.");

 m_ThreadObj.Start();
 }

 /// <summary>
 /// If object is being disposed, this function will ensure that the thread is killed.
 /// Moved the close handles to this function from Kill, because we may still want to use these
 /// values after killing the thread to check on what happened.
 /// </summary>
 public void Dispose()
 {
 Kill();
 m_mutexedObjects.Close();
 m_WorkerStartedEvent.Close();
 m_WorkerDoneEvent.Close();
 m_WorkerExceptionEvent.Close();
 }

 /// <summary>
 /// Kills this thread. First it checks to see if we have a thread and if it is currently
 /// alive. If not, the function returns without doing anything. Otherwise, EndLoop is set,
 /// and it waits using a Join for the thread to die. If the thread won't die after the allotted time,
 /// then the thread is aborted. THe called thread should never require this of the caller, but
 /// in some cases, since we are calling code that is unmanaged and outside of our control,
 /// we may have threads that are hung. Thus the thread should have resource freeing coded into
 /// the catch handler of the thread function.
 /// </summary>
 public void Kill(TimeSpan timeout)
 {
 //Kill is called on client thread - must use cached object
 if (m_ThreadObj == null) // thread is not there - leave it
 return;

 if (IsAlive == false)
 return;

 m_mutexedObjects.EndLoop = true;
 //Wait for thread to die
 Join(timeout);

 if (m_ThreadObj.IsAlive == true)
 {    // this thread refuses to die and we gave it 2 minutes - abort.
 m_ThreadObj.Abort(); // send abort Exception
 }
 }

 /// <summary>
 /// Kills this thread. First it checks to see if we have a thread and if it is currently
 /// alive. If not, the function returns without doing anything. Otherwise, EndLoop is set,
 /// and it waits using a Join for the thread to die.
 /// </summary>
 public void Kill()
 {
 Kill(new TimeSpan(0, 2, 0));
 }

 /// <summary>
 /// Joins thread - if thread is already dead, it returns, otherwise it waits on the thread to die.
 /// </summary>
 public void Join()
 {
 if (m_ThreadObj == null) // thread is not there - leave it
 return;

 if (IsAlive == false)
 return;

 Debug.Assert(Thread.CurrentThread.GetHashCode() !=
 m_ThreadObj.GetHashCode());

 m_ThreadObj.Join();
 }

 /// <summary>
 /// Joins with the specified milliseconds timeout.
 /// </summary>
 /// <param name="millisecondsTimeout">The milliseconds timeout.</param>
 /// <returns></returns>
 public bool Join(int millisecondsTimeout)
 {
 TimeSpan timeout = TimeSpan.FromMilliseconds(millisecondsTimeout);

 return Join(timeout);
 }

 /// <summary>
 /// Joins with the specified timeout.
 /// </summary>
 /// <param name="timeout">The timeout.</param>
 /// <returns></returns>
 public bool Join(TimeSpan timeout)
 {
 if (m_ThreadObj == null) // thread is not there - leave it
 return true;

 if (IsAlive == false)
 return true;

 if (Thread.CurrentThread.GetHashCode() == m_ThreadObj.GetHashCode())
 throw new Exception("Attempting to Join a thread that has the same identity as the current thread.");

 return m_ThreadObj.Join(timeout);
 }

 /// <summary>
 /// Gets or sets the name of the thread.
 /// </summary>
 /// <value>The name of the thread.</value>
 public string Name { get { return m_ThreadObj.Name; } set { m_ThreadObj.Name = value; } }

 /// <summary>
 /// Gets a value indicating whether the worker thread is alive.
 /// </summary>
 /// <value><c>true</c> if the worker thread is alive; otherwise, <c>false</c>.</value>
 public bool IsAlive
 {
 get
 {
 Debug.Assert(m_ThreadObj != null);
 bool isAlive = m_ThreadObj.IsAlive;
 return isAlive;
 }
 }

 /// <summary>
 /// Worker runs the function that was passed in wrapped by an exception handler.
 /// If the function being called has any issues the exception is caught
 /// and the exception message is put into a string along with the thread name,
 /// call stack and any inner exceptions for access by the caller / creator of the thread.
 /// </summary>
 protected void Worker()
 {
 try
 {
 if (threadFunc != null)
 {
 m_WorkerStartedEvent.Set(); // indicate that we have started
 threadFunc.Invoke(m_mutexedObjects);
 }
 else
 throw new NullReferenceException("WorkerThread function expected.");
 }
 catch(Exception ex)
 {
 string exInfo = ex.Message;
 ExceptionInfo = Thread.CurrentThread.Name + " exception:" + exInfo;
 m_WorkerExceptionEvent.Set();
 }
 finally
 {
 m_WorkerDoneEvent.Set();
 }
 }
 }

Posted on March 15, 2011, in .Net, C#, programming, Threading. Bookmark the permalink. Leave a comment.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: