C# Tutorial - Using The ThreadPool

Skill

C# Tutorial - Using The ThreadPool

Posted in:

Ah, good old multi-threading. Always fun, and often a source of headaches. With C# and .NET, those headaches don't go away, but there are some nice wrappers that make working with threads a little bit easier. Today we are going to take a look at how to use C#'s ThreadPool - which is probably the simplest way to make a multi-threaded C# app.

A thread pool takes away all the need to manage your threads - all you have to do is essentially say "hey! someone should go do this work!", and a thread in the process' thread pool will pick up the task and go execute it. And that is all there is to it. Granted, you still have to keep threads from stepping on each other's toes, and you probably care about when these 'work items' are completed - but it is at least a really easy way to queue up a work item.

In fact, working with the ThreadPool is so easy, I'm going to throw all the code at you at once. Below is a pretty simple test app that gives 5 (or NumThreads) work items to the ThreadPool, waits for them all to complete, and then prints out all the answers. I will walk through the code step by step below:

using System;
using System.Threading;

namespace ThreadPoolTest
{
  class Program
  {
    private const int NumThreads = 5;

    private static int[] inputArray;
    private static double[] resultArray;
    private static ManualResetEvent[] resetEvents;

    private static void Main(string[] args)
    {
      inputArray = new int[NumThreads];
      resultArray = new double[NumThreads];
      resetEvents = new ManualResetEvent[NumThreads];

      Random rand = new Random();
      for (int s = 0; s < NumThreads; s++)
      {
        inputArray[s] = rand.Next(1,5000000);
        resetEvents[s] = new ManualResetEvent(false);
        ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), (object)s);
      }

      Console.WriteLine("Waiting...");

      WaitHandle.WaitAll(resetEvents);

      Console.WriteLine("And the answers are: ");
      for (int i = 0; i < NumThreads; i++)
        Console.WriteLine(inputArray[i] + " -> " + resultArray[i]);
    }

    private static void DoWork(object o)
    {
      int index = (int)o;

      for (int i = 1; i < inputArray[index]; i++)
        resultArray[index] += 1.0 / (i * (i + 1));

      resetEvents[index].Set();
    }
  }
}

We have three arrays at the top of the program: one for input to the work items (inputArray), one for the results (resultArray), and one for the ManualResetEvents (resetEvents). The first two are self explanatory, but what is a ManualResetEvent? Well, it is an object that allows one thread to signal another thread when something happens. In the case of this code, we use these events to signal the main thread that a work item has been completed.

So we initialize these arrays, and then we get to a for loop, which is where we will be pushing out these work items. First, we make a random value for the initial input (cause random stuff is always more fun!), then we create a ManualResetEvent with its signaled state initially set to false, and then we queue the work item. Thats right, all you have to do to push a work item out for the ThreadPool to do is call ThreadPool.QueueUserWorkItem.

So what are we queuing here? Well, we are saying that a thread in the thread pool should run the method DoWork, with the argument s. Any method that you want to queue up for the thread pool to run needs to take one argument, an object, and return void. The argument will end up being whatever you passed in as the second argument to the QueueUserWorkItem call - and in this case is the 'index' of this work item (the index in the various arrays that it needs to work with). And it makes sense that the method would have to return void - because it isn't actually returning 'to' anything, it is running out there all on its own as a separate thread.

So what are we doing in this DoWork function? Not that much in this case, just a simple summation. The important part is the very last call of the function, which is hit when all the work for this work item is done - resetEvents[index].Set(). This triggers the ManualResetEvent for this work item - signaling the main thread that the work is all done here.

Back up in main thread land, after it has pushed all these work items onto the ThreadPool queue, we hit the very important call WaitHandle.WaitAll(resetEvents). This causes the main thread to block here until all the ManualResetEvent objects in the resetEvents array signal. When all of them have signaled, that means that all the work units have been completed, and so we continue on and print out all the results. The results change because we are seeding with random values, but here is one example output:

Waiting...
And the answers are:
3780591 -> 0.991001809831479
3555614 -> 0.991163782231558
2072717 -> 0.989816715560308
2264396 -> 0.989982111762391
544144 -> 0.99066981542858

Pretty simple, eh? There are a couple things to note, though. The default thread pool size for a process is 25 threads, and while you can change this number, this resource is not infinite. If all of the threads in the pool are currently occupied with other tasks, new work items will be queued up, but they won't get worked on until one of the occupied threads finishes its current task. This generally isn't a problem unless you are giving the pool very large quantities of work. And really, you should never assume that a task is executed immediately after you queue it, because there is no guarantee of that at all.

That's it for this intro to thread pools in C#. If there are any questions, leave them below - especially if they push on the more advanced aspects of threads and thread pools (cause then I'll have an excuse to write some more threading tutorials!).

Carl
02/04/2008 - 02:59

Thanks mate! Great help!

reply

BJ
02/11/2008 - 06:27

Thanks man
that was a good intro to Threading

reply


04/02/2008 - 06:14

Hi ,
this is a queue .. What if i have some 1000 items in an array that needs to be updated to the database 50 at a time ie a function that takes a parameter and based on that parameter it will fetch some data and update the database.
Can we make sure that once a thread has finished a work it will start next work without waiting for the other threads to finish ..
thanks in advance

reply

Anonymous
06/18/2008 - 11:54

Kumar,

The answer to this question is yes. But you have to research the JOIN keyword in System.Threading in order to have a "new thread" wait for an "old thread" to complete.

reply

BeBorz
08/07/2008 - 12:09

hw can i pass more than one argument to my Dowork function ?

reply

The Reddest
08/07/2008 - 14:03

Kind of. The WaitCallback delegate only allows you to specify a single object as the parameter, however, since an array is still an object, you could fill one with each argument you need and pass the array to the DoWork function. This has the downside of having to know what index is what object, but it's a solution.

A better solution is to simply create a small container class that has members for each object. Then simply pass the container class to the DoWork function.

reply

William
08/09/2008 - 16:44

This code worked well in my project. One question? If I attempt to run more than 64 threads I get the following error: "The number of waithandles must be less than or equal to 64." I spent several hours researching the error and could not find a satisfactory answer.

My project takes a set of name / address records from a database and runs each one against a remote web service in an effort to update the person name, address or phone. The input records could number in the thousands and it speeds processing if I can make several hundred requests to the remote service at a time.

reply

Doug Gale
11/17/2008 - 06:08

This is a response to William's question, "How do you wait for more than 64"

The answer is, "don't do that". What you should do is use ONE event object, and have an "int" (I'll call it numBusy) which tracks how many workers are not finished yet. Lets say you are going to run 100 work items. Before queueing up any work items, set numBusy to 100 and proceed to queue up the work items.

Summary of changes:
1) Add "int numBusy" member variable.
2) Get rid of the array "resetEvents" and create a single "doneEvent" event.
3) At the end of the DoWork function, remove "resetEvents[index].Set();" add the following code:

if (Interlocked.Decrement(ref numBusy) == 0)
{
    doneEvent.Set();
}

And to wait for all of the work items to complete, you simply do:

doneEvent.WaitOne();

How does it work? "Interlocked.Decrement" will subtract 1 from an int in a thread-safe manner, and return the new value. When the last work item has decremented the value to zero, it sets the event.

This technique is far more efficient than setting a bunch of separate events. Setting an event is a pretty expensive operation, because it has to make an expensive call all the way down into kernel mode to set the event. An interlocked decrement is one instruction, and will take nanoseconds to complete. This technique will work for over 2 billion work items (up to int.MaxValue items).

For robust code, you should wrap the work in a try-finally block, and put the interlocked decrement code in the finally. This way, it won't hang forever if one of the work items cause an exception to be thrown.

reply

S@W
12/18/2008 - 02:44

Hello Doug Gale!

The solution you presented for William’s problem is very good one. Can you suggest me any alternatives to it if we do not know the number of threads to be spawned before hand.

Waiting for your reply.

Thanks!

reply

Yang
09/18/2008 - 07:42

nice article well written...however if i have 50 workitems and i should use 5 threads (max). Then what changes in the code are required.

reply

Olivier Goossens Bara
11/22/2008 - 11:31

Great article !

However my problem ist to detect the end of only one thread launched by the threadpool

And I can not use WaitAll because my second thread makes call to the main thread using events : DoWork is calling itself some method in the secundary thread

So, how can I detect that DoWork is finished in order to enable some button in a form in the main thread ?

reply

candyman
01/30/2009 - 15:31

Olivier
Check if you are in the background thread.
You dont want your background thread to call foreground methods

reply

Chaitanya
03/11/2009 - 05:14

what if i want to run a loop more than number of threads? I mean what if i need to call Dowork function more than number of threads in Threadpool? how do we return status here ?

reply

William
04/08/2009 - 00:54

I've been using this code in production for about 4 months and it works well but I would like to warn everyone about an issue you might encounter.

QueueUserWorkItem uses the static thread pool which is limited to 25 threads per process. In my case I used the code in an a web service that makes multiple calls in parallel to a remote web service. I setup my code to make 25 calls in parallel using QueueUserWorkItem not realizing that .net uses this same thread pool for it's processes. To make matters worse, when I set up the web service in IIS I put it in the same application pool as the asp.net web site that was calling the web service. Everytime the web site called the web service, the web service would consume the available threads in the thread pool, causing the web site to freeze. The web site would unfreeze when threads became available.

The resolution was to place the web service and the web site in separate application pools. This creates a separate static pool for each process and significantly increased the performance of the system as a whole.

reply

Anonymous
04/10/2009 - 10:46

How do you:

2) Get rid of the array "resetEvents" and create a single "doneEvent" event.

?

reply

chrisn
04/27/2009 - 14:17

Answer to question "How do you get rid of the array "resetEvents" and create a single "doneEvent" event?"

I haven't used this code in a production enviroment but it seems to work OK.

using System;
using System.Threading;

namespace ThreadPoolTest
{
    class Program
    {
        private static int numBusy = 100;
        private const int NumThreads = 100;
        private static ManualResetEvent doneEvent;

        private static void Main(string[] args)
        {
            doneEvent = new ManualResetEvent(false);
            for (int s = 0; s < NumThreads; s++)
            {
                ThreadPool.QueueUserWorkItem(
                    new WaitCallback(DoWork), (object)s);
            }
            doneEvent.WaitOne();
        }

        private static void DoWork(object o)
        {
            try
            {
                // do work here
                int index = (int)o;
                Console.WriteLine(index);
                Console.WriteLine("numbusy {0}: ", numBusy);
            }
            catch
            {
                // error handling goes here
            }
            finally
            {
                if (Interlocked.Decrement(ref numBusy) == 0)
                {
                    doneEvent.Set();
                }
            }
        }
    }
}

reply

Anonymous
05/11/2009 - 08:03

Thank you for the code example. In the application I have, the above approach works if I run everything from the dev studio. But if I launch the app directly, my app hangs (doneEvent.WaitOne()). I have searched and not found a plausible explanation for this. I would appreciate any pointers to address this.

reply

Question
05/03/2009 - 13:38

Hello,

Really nice, and the comments were quite helpful too.. but I have a question; Mainly because I am concerned about this sentence from the article: This generally isn't a problem unless you are giving the pool very large quantities of work.

I have a web service WSA (lets say this will have the main thread) and it will call another method F() within the WS. This method F() does a LOT of xml processing and database inserts and deletes, for each and every call. The website which calls this webservice is a very high volume one. There are easily more than 500 requests per minute, across multiple web servers.

The bottomline is the method F() does a lot of work.. The Calling Thread does not need any results back from F(), so I thought I would use :
ThreadPool.QueueUserWorkItem(F,object state) .
I wasn't planning on using the ManualResetEvents, and even after reading the article, I don't think I need to, since I don't need the results back. Is that correct?

My concern is, when WSA calls F(), if F() takes longer compared to the speed with which WSA itself receives the requests, what are the repercussions? I mean, will the main thread (in WSA) still be able to keep up and continue with its work irrespective of F(), or will it create problems at some point? I thought that the fact that we're calling asynchronously means that it wouldn't create problems for the main thread, but because of the high volume and the critical nature of this call, I just want to make sure.

reply

chrisn
05/05/2009 - 02:09

I use a System.Collections.Generic.Queue to hold pending requests. In my case I'm processing records from a .csv file so I load each row into an object and insert the object into the queue. I then use a While loop that runs until the queue is empty. Each iteration of the loop spawns 50 threads that deenqueue the requests and make the appropriate calls to DoWork. Let me know if you need a code sample and I will throw something together.

You might also want to check out this article on MSDN:

http://msdn.microsoft.com/en-us/magazine/dd419664.aspx

It's not 100% relevant but might give you some ideas.

reply

Anonymous
05/25/2009 - 16:25

chrisn, I really appreciate if you could post a sample code here utilizing the Queue object as you mentioned above.

reply

chrisn
06/07/2009 - 14:38

Here's a sample using a queue. I used XElements because my application makes calls to external web services but you could enqueue a text file or objects.

using System;
using System.Collections.Generic;
using System.Xml.Linq;

namespace QueueDemo
{
    class Program
    {
        static int numthreads = 25;
        static Queue<XElement> inputqueue;
        static XElement[] inputArray;
        static XElement[] resultArray;
        static System.Threading.ManualResetEvent[] resetEvents;
       
        static void Main(string[] args)
        {
            inputqueue = new Queue<XElement>();

            LoadQueue();

            while (inputqueue.Count != 0)
            {
                // if there are fewer records to process than threads then reduce the thread count
                if (inputqueue.Count < numthreads)
                {
                    numthreads = inputqueue.Count;
                }

                inputArray = new XElement[numthreads];
                resultArray = new XElement[numthreads];
                resetEvents = new System.Threading.ManualResetEvent[numthreads];

                for (int s = 0; s < numthreads; s++)
                {
                    if (inputqueue.Count == 0)
                    {
                        break;
                    }
                    else
                    {
                        // deenque the request
                        XElement request = inputqueue.Dequeue();
                        // add an Id to the node so DoWork knows which reset event to trigger
                        request.Add(new XElement("ThreadId", s));
                        inputArray[s] = request;
                        resetEvents[s] = new System.Threading.ManualResetEvent(false);
                        System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(DoWork), inputArray[s]);
                    }
                }
                System.Threading.WaitHandle.WaitAll(resetEvents);
                for (int i = 0; i < numthreads; i++)
                {
                    // do something with the result depending on status
                    switch (resultArray[i].Element("Result").Value)
                    {
                        case "false":
                            Console.WriteLine("Failure");
                            break;
                        case "true":
                            Console.WriteLine("Success");
                            break;
                        default:
                            Console.WriteLine("Error");
                            break;
                    }
                }
            }
            Console.ReadLine();
        }
        static void LoadQueue()
        {
            // generate 50 test requests and add them to the inputqueue
            List<XElement> requests = new List<XElement>();
            for (int i = 0; i < 50; i++)
            {
                // the requests could be anything, in my case they are XML used to make requests to an external web service
                inputqueue.Enqueue(new XElement("Request", new XElement("Parameter1", "valueA"), new XElement("Parameter2", "valueB")));
            }
        }
        static void DoWork(object o)
        {
            XElement request = (XElement)o;
            // call external service and set result array to response based on threadid
            resultArray[Int32.Parse(request.Element("ThreadId").Value)]      
                = new XElement("Response", new XElement("Result", Int32.Parse(request.Element("ThreadId").Value) < 12 ? true : false)); // simulate response
            // get the threadid and reset
            resetEvents[Int32.Parse(request.Element("ThreadId").Value)].Set();    
        }
    }
}

reply

Abhishek Udiya
11/14/2009 - 03:18

Hichrisn ,
Thank so much for nice article.I want to some advice to u in "static void DoWork(object o)" this method,

resultArray[Int32.Parse(request.Element("ThreadId").Value)]
= new XElement("Response", new XElement("Result", Int32.Parse(request.Element("ThreadId").Value) < 12 ? true : false));

on which basis u r comparing ThreadId < 12. This 12 is ur no. of Xml requests or Maximum ThreadId of xml request.
Pls reply as soon as possible.
abhishek.udiya@diaspark.com

reply

Michael Dausmann
07/13/2009 - 21:21

Thanks mr Tallest! fantastic article. This stuff can be soooo complicated but you managed to keep your code and explanation terrifically tight and really help me understand what was going on.

Thank you also commenters, all food for thought.

Michael

reply

blueVic
11/23/2009 - 04:54

Dear all,
I have a multithread program, each thread is do some work and write out to "Logfile.txt". All thread is write to "Logfile.txt". Now I want to use a thread that manage write above file, it's mean thread1 write file complete and then thread2 write file...

But I can't write program as description above.
can you help me ???

reply

Anonymous
01/27/2010 - 08:34

Excellent article - most helpful!

reply

vikram
03/03/2010 - 01:06

Hi in my case I create a thread pool of size n if n jobs are availbale for processing in database.But once it starts processing polling stops and though some file are ready for processing they have to wait until the the previous thread pool completes.So i want to add threads to the thread pool even if it processes other jobs how to do that.
This is peice of code where i have used thread pooling:
doneEvents[i] = new ManualResetEvent(false);
PreprocessThreadManager p = new PreprocessThreadManager(i, doneEvents[i]);
preprocessArray[i] = p;

Preprocessor preprocessRequest = new Preprocessor();
preprocessRequest.JobID = jobID;
preprocessRequest.UUID = UUID;
preprocessRequest.InFile = sourceFile;
preprocessRequest.OutFile = destinationFile;
////preprocessRequest.ABitRate = Convert.ToInt32(ConfigurationManager.AppSettings["Civolution_ABitRate"].ToString());
ThreadPool.QueueUserWorkItem(p.ThreadPoolCallback, preprocessRequest);
i++;

reply

Add Comment

Put code snippets inside language tags:
[language] [/language]

Examples:
[javascript] [/javascript]
[actionscript] [/actionscript]
[csharp] [/csharp]

See here for supported languages.

Javascript must be enabled to submit anonymous comments - or you can login.

Sponsors