Parallel processing of a list of items using a dynamic number of threads

by Heathesh 20. June 2011 01:46

While working on a project I came across an interesting problem. I had a list of items to process and I wanted to run a certain number of those items at the same time and let the number of items running at the same time be set in a settings file. To illustrate how I did this, let's start of with the items I was going to process. In this case let's say I had a class called "Queue" and each "Queue" item was something I need to do some work with.

    /// <summary>
    /// Queue entity
    /// </summary>
    public class Queue
    {
        /// <summary>
        /// Gets or sets the id
        /// </summary>
        public int Id { get; set; }

        /// <summary>
        /// Gets or sets the name
        /// </summary>
        public string Name { get; set; }
    }

Now in my application, I have a list of these Queue items and I want to be able to process more than one at a time, and I want to be able to set how many items I'm processing at a time in a settings file. So I added a setting file called "QueueSettings.settings" and added a setting called NumberOfThreads which I set to "3".

Now let's pretend my List of queued items to process looks like this:

            List<Queue> queuedItems = new List<Queue>();
           
            queuedItems.Add(new Queue { Id = 1, Name = "Apple" });
            queuedItems.Add(new Queue { Id = 2, Name = "Banana" });
            queuedItems.Add(new Queue { Id = 3, Name = "Orange" });
            queuedItems.Add(new Queue { Id = 4, Name = "Pear" });
            queuedItems.Add(new Queue { Id = 5, Name = "Peach" });

And for simplicity sake, the method that actually processes the Queue item looks like this:

        /// <summary>
        /// Process the queued item
        /// </summary>
        /// <param name="queue"></param>
        private static void processQueuedItem(Queue queue)
        {
            Console.WriteLine("Processed: {0} - {1}", queue.Id, queue.Name);
        }

Now we need to create a method that will call the above method as we require. So we first add the following namespace to our application:

        using System.Threading.Tasks;

Then we create the method that does the actual processing:

        /// <summary>
        /// Processes the queued items
        /// </summary>
        /// <param name="queuedItems"></param>
        /// <param name="threadsToUse"></param>
        private static void processQueuedItems(List<Queue> queuedItems, int threadsToUse)
        {
            //calculate how many times we need to loop, by determining how many times we'll do a full set of
            //items and adding one for the remainder of items. For example, if you have 5 items and you're threading
            //3 at a time, you need to run twice, once to do the first 3, then once to finish off the last 2
            int timesToLoop = ((queuedItems.Count % threadsToUse) > 0 ? 1 : 0) + queuedItems.Count / threadsToUse;

            //set a current record number which we will use to make sure we don't process the same item
            //more than once
            int currentRecordNumber = 0;

            //loop through the amount of times we need to loop
            for (int i = 0; i < timesToLoop; i++)
            {
                if ((currentRecordNumber + threadsToUse) > queuedItems.Count)
                {
                    //this means we're doing the remainder of items
                    int lastThreadsToUse = (queuedItems.Count % threadsToUse);
                    Console.WriteLine("Processing {0} items", lastThreadsToUse);

                    //run in parallel the number of remainder items by calling the process queued item method
                    Parallel.For(0, lastThreadsToUse, _ => processQueuedItem(queuedItems[currentRecordNumber + _]));
                }
                else
                {
                    //this means we're doing a full set of items
                    Console.WriteLine("Processing {0} items", threadsToUse);

                    //run in parallel the number of threads we've been passed by calling the process queued item method
                    Parallel.For(0, threadsToUse, _ => processQueuedItem(queuedItems[currentRecordNumber + _]));
                }

                //increment the current record number by the number of threads we've processed
                currentRecordNumber += threadsToUse;
            }
        }


I'm using a Console application to test this, hence all the Console.WriteLine calls. The method should be self explanatory, it determines how many times it should run and then runs the relevant number of threads each time it is required until it's done. But just for complete disclosure sake, here's the "Main" method of my console app to show you how I tested this:

        /// <summary>
        /// Main application thread
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            //create a dummy list of queue items
            List<Queue> queuedItems = new List<Queue>();

            queuedItems.Add(new Queue { Id = 1, Name = "Apple" });
            queuedItems.Add(new Queue { Id = 2, Name = "Banana" });
            queuedItems.Add(new Queue { Id = 3, Name = "Orange" });
            queuedItems.Add(new Queue { Id = 4, Name = "Pear" });
            queuedItems.Add(new Queue { Id = 5, Name = "Peach" });

            //get the number of threads we want to run in parallel from the setting file we
            //created before
            int threadsToUse = QueueSettings.Default.NumberOfThreads;

            //call the method to do the threading etc.
            processQueuedItems(queuedItems, threadsToUse);
        }


Happy Threading!

Tags: , , ,

Development | .Net | Visual Studio 2010 | VS2010

Comments are closed



Powered by BlogEngine.NET 1.5.0.7 (with enhancements by Heathesh)
Theme by Mads Kristensen (with tweeks by Heathesh)

Certifications

Microsoft Certified Professional

Microsoft Certified Technology Specialist

Answer Questions

 

Tag cloud

Calendar

<<  July 2017  >>
MoTuWeThFrSaSu
262728293012
3456789
10111213141516
17181920212223
24252627282930
31123456

View posts in large calendar

http://heathesh.com