Parallel processing of a list of items using a dynamic number of threads

by Heathesh 20. June 2011 01:46

While working on a project I came across an interesting problem. I had a list of items to process and I wanted to run a certain number of those items at the same time and let the number of items running at the same time be set in a settings file. To illustrate how I did this, let's start of with the items I was going to process. In this case let's say I had a class called "Queue" and each "Queue" item was something I need to do some work with.

    /// <summary>
    /// Queue entity
    /// </summary>
    public class Queue
    {
        /// <summary>
        /// Gets or sets the id
        /// </summary>
        public int Id { get; set; }

        /// <summary>
        /// Gets or sets the name
        /// </summary>
        public string Name { get; set; }
    }

Now in my application, I have a list of these Queue items and I want to be able to process more than one at a time, and I want to be able to set how many items I'm processing at a time in a settings file. So I added a setting file called "QueueSettings.settings" and added a setting called NumberOfThreads which I set to "3".

Now let's pretend my List of queued items to process looks like this:

            List<Queue> queuedItems = new List<Queue>();
           
            queuedItems.Add(new Queue { Id = 1, Name = "Apple" });
            queuedItems.Add(new Queue { Id = 2, Name = "Banana" });
            queuedItems.Add(new Queue { Id = 3, Name = "Orange" });
            queuedItems.Add(new Queue { Id = 4, Name = "Pear" });
            queuedItems.Add(new Queue { Id = 5, Name = "Peach" });

And for simplicity sake, the method that actually processes the Queue item looks like this:

        /// <summary>
        /// Process the queued item
        /// </summary>
        /// <param name="queue"></param>
        private static void processQueuedItem(Queue queue)
        {
            Console.WriteLine("Processed: {0} - {1}", queue.Id, queue.Name);
        }

Now we need to create a method that will call the above method as we require. So we first add the following namespace to our application:

        using System.Threading.Tasks;

Then we create the method that does the actual processing:

        /// <summary>
        /// Processes the queued items
        /// </summary>
        /// <param name="queuedItems"></param>
        /// <param name="threadsToUse"></param>
        private static void processQueuedItems(List<Queue> queuedItems, int threadsToUse)
        {
            //calculate how many times we need to loop, by determining how many times we'll do a full set of
            //items and adding one for the remainder of items. For example, if you have 5 items and you're threading
            //3 at a time, you need to run twice, once to do the first 3, then once to finish off the last 2
            int timesToLoop = ((queuedItems.Count % threadsToUse) > 0 ? 1 : 0) + queuedItems.Count / threadsToUse;

            //set a current record number which we will use to make sure we don't process the same item
            //more than once
            int currentRecordNumber = 0;

            //loop through the amount of times we need to loop
            for (int i = 0; i < timesToLoop; i++)
            {
                if ((currentRecordNumber + threadsToUse) > queuedItems.Count)
                {
                    //this means we're doing the remainder of items
                    int lastThreadsToUse = (queuedItems.Count % threadsToUse);
                    Console.WriteLine("Processing {0} items", lastThreadsToUse);

                    //run in parallel the number of remainder items by calling the process queued item method
                    Parallel.For(0, lastThreadsToUse, _ => processQueuedItem(queuedItems[currentRecordNumber + _]));
                }
                else
                {
                    //this means we're doing a full set of items
                    Console.WriteLine("Processing {0} items", threadsToUse);

                    //run in parallel the number of threads we've been passed by calling the process queued item method
                    Parallel.For(0, threadsToUse, _ => processQueuedItem(queuedItems[currentRecordNumber + _]));
                }

                //increment the current record number by the number of threads we've processed
                currentRecordNumber += threadsToUse;
            }
        }


I'm using a Console application to test this, hence all the Console.WriteLine calls. The method should be self explanatory, it determines how many times it should run and then runs the relevant number of threads each time it is required until it's done. But just for complete disclosure sake, here's the "Main" method of my console app to show you how I tested this:

        /// <summary>
        /// Main application thread
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            //create a dummy list of queue items
            List<Queue> queuedItems = new List<Queue>();

            queuedItems.Add(new Queue { Id = 1, Name = "Apple" });
            queuedItems.Add(new Queue { Id = 2, Name = "Banana" });
            queuedItems.Add(new Queue { Id = 3, Name = "Orange" });
            queuedItems.Add(new Queue { Id = 4, Name = "Pear" });
            queuedItems.Add(new Queue { Id = 5, Name = "Peach" });

            //get the number of threads we want to run in parallel from the setting file we
            //created before
            int threadsToUse = QueueSettings.Default.NumberOfThreads;

            //call the method to do the threading etc.
            processQueuedItems(queuedItems, threadsToUse);
        }


Happy Threading!

Tags: , , ,

Development | .Net | Visual Studio 2010 | VS2010

Generic implementation of System.Web.Cache to get or read and add or store items in the cache

by Heathesh 29. November 2010 21:48

I have a bunch of settings stored in the database that I know may never change. In order to read these settings I would prefer not to do a database call everytime I want to get a setting value. So in order to minimize the database calls, I decided to implement a simple manager class to get and add items to the cache.

To start off with, I created my manager class (as static) and called it CacheManager:

    /// <summary>
    /// Cache manager to store and read items in the cache
    /// </summary>
    public static class CacheManager
    {
    }


Next, I added the usings I knew I would need to the class:

using System.Diagnostics;
using System.Reflection;
using System.Text;
using System.Web.Caching;


I'm adding System.Diagnostics and System.Reflection to dynamically create the "keys" I'll be using to cache my items. I created a method that would dynamically generate this key as follows:

        /// <summary>
        /// Gets the cache key for a particular method
        /// </summary>
        /// <param name="parameters"></param>
        /// <returns></returns>
        private static string getCacheKey(params object[] parameters)
        {
            StackTrace stackTrace = new StackTrace();
            MethodBase methodBase = stackTrace.GetFrame(2).GetMethod();
            StringBuilder cacheKey = new StringBuilder();

            //the start of the key will be the full method namespace underscore the method name
            cacheKey.AppendFormat("{0}_{1}", methodBase.ReflectedType.FullName, methodBase.Name);

            //add any parameters as trailing values in the key
            if (parameters != null && parameters.Length > 0)
                foreach (object parameter in parameters)
                    cacheKey.AppendFormat("_{0}", parameter);

            //return the key we've generated
            return cacheKey.ToString();
        }


The "key" you use must be unique for each unique "object" you store in the cache. So the above method will determine the name of the calling method and allows you to pass in all the parameters of that method to create a unique key for every method you have. This should become clearer later on.

The next thing I wanted to do was create the relevant methods to get and add items to the cache, so I created the following two. The comments should be self explanatory:

        /// <summary>
        /// Gets an item from the cache
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="parameters"></param>
        /// <returns></returns>
        public static T Get<T>(params object[] parameters)
        {
            //generate the "key" for the cached item
            string key = getCacheKey(parameters);

            //set the return item to it's default
            T returnValue = default(T);

            //make sure the current http context is not null
            if (HttpContext.Current != null)
                if (HttpContext.Current.Cache.Get(key) != null)
                    returnValue = (T)HttpContext.Current.Cache.Get(key);

            //return the value
            return returnValue;
        }

        /// <summary>
        /// Adds the relevant item to the cache and returns it as well
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="itemToCache"></param>
        /// <param name="hoursToCache"></param>
        /// <param name="parameters"></param>
        /// <returns></returns>
        public static T Add<T>(T itemToCache, int hoursToCache, params object[] parameters)
        {
            //generate the "key" for the cached item
            string key = getCacheKey(parameters);

            //make sure the current http context is not null
            if (HttpContext.Current != null && itemToCache != null)
                HttpContext.Current.Cache.Add(key, itemToCache, null, DateTime.Now.AddHours(hoursToCache), TimeSpan.Zero, CacheItemPriority.BelowNormal, null);

            //return the cached item
            return itemToCache;
        }


As you can see each method allows you to pass in an array of parameters and that array is then passed to the getCacheKey method and used to generate the unique key. Now to implement my caching, I have a method called "ReadAll" in my SettingsManager class. And this is how I implemented the caching:

        /// <summary>
        /// Reads all the settings and returns a list of them
        /// </summary>
        /// <returns></returns>
        public static List<Setting> ReadAll()
        {
            //try and get the list of items from the cache
            List<Setting> settings = CacheManager.Get<List<Setting>>();

            //if we found something in the cache, return it
            if (settings != null && settings.Count > 0)
                return settings;

            //if we get here, we haven't found anything so connect to the database and populate the list of settings from there, but first initialize the list
            settings = new List<Setting>();

            //now populate from the database
            using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["EngineDatabase"].ConnectionString))
            {
                connection.Open();

                using (SqlCommand command = new SqlCommand("SettingReadAll", connection))
                {
                    command.CommandType = System.Data.CommandType.StoredProcedure;

                    using (SqlDataReader dataReader = command.ExecuteReader())
                        while (dataReader.Read())
                            settings.Add(populateSettingFromReader(dataReader));
                }

                connection.Close();
            }

            //now add the list to the cache for 24 hours, and return it
            return CacheManager.Add<List<Setting>>(settings, 24);
        }


The "populateSettingFromReader" is a simple method I created to return a "Setting" object from a data reader as follows:

        /// <summary>
        /// Populates a setting object from the data reader
        /// </summary>
        /// <param name="dataReader"></param>
        /// <returns></returns>
        private static Setting populateSettingFromReader(SqlDataReader dataReader)
        {
            return new Setting
            {
                Id = Convert.ToInt32(dataReader["Id"]),
                Name = Convert.ToString(dataReader["Name"]),
                Value = Convert.ToString(dataReader["Value"])
            };
        }


My "Setting" class is a simple entity class that looks like so:

    /// <summary>
    /// Setting entity
    /// </summary>
    public class Setting
    {
        /// <summary>
        /// Gets or sets the Id
        /// </summary>
        public int Id { get; set; }
        /// <summary>
        /// Gets or sets the Name
        /// </summary>
        public string Name { get; set; }
        /// <summary>
        /// Gets or sets the Value
        /// </summary>
        public string Value { get; set; }
    }


As you can see the "ReadAll" method has no parameters so you don't get to see how the getCacheKey method works with parameters. However the cache key generated for this method would be something like: "[Namespace].SettingsManager_ReadAll".

To illustrate what the getCacheKey would return for a method with parameters, I created a method that would only retrieve one setting at a time using the setting's name as a parameter:

        /// <summary>
        /// Reads the setting for a specified setting name
        /// </summary>
        /// <param name="name"></param>
        /// <returns></returns>
        public static Setting ReadByName(string name)
        {
            //get the value from the cache passing in the name as a parameter to generate the unique key
            Setting setting = CacheManager.Get<Setting>(name);

            if (setting != null)
                return setting;

            //if we get here, we haven't found anything so connect to the database and populate the list of settings from there
            using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["EngineDatabase"].ConnectionString))
            {
                connection.Open();

                using (SqlCommand command = new SqlCommand("SettingReadByName", connection))
                {
                    command.CommandType = System.Data.CommandType.StoredProcedure;
                    command.Parameters.Add(new SqlParameter("@Name", name));

                    using (SqlDataReader dataReader = command.ExecuteReader())
                        if (dataReader.Read())
                            setting = populateSettingFromReader(dataReader);
                }

                connection.Close();
            }

            //here I call the cache method passing in the "name" variable passed into this method as a parameter
            //which will be used as part of the cache key
            return CacheManager.Add<Setting>(setting, 24, name);
        }


I called this method like so:

            //get the theme setting value
            Setting setting = SettingsManager.ReadByName("Theme");

Now as you can see the value of the parameter I'm passing in is "Theme". So for this method the cache key would be something like: "[Namespace].SettingsManager_ReadByName_Theme". Note the trailing "_Theme".

Happy caching!

Tags: , , , , , , , ,

Development | .Net | Visual Studio 2010 | VS2010



Powered by BlogEngine.NET 1.5.0.7 (with enhancements by Heathesh)
Theme by Mads Kristensen (with tweeks by Heathesh)

Certifications

Microsoft Certified Professional

Microsoft Certified Technology Specialist

Answer Questions

 

Tag cloud

Calendar

<<  August 2014  >>
MoTuWeThFrSaSu
28293031123
45678910
11121314151617
18192021222324
25262728293031
1234567

View posts in large calendar

http://heathesh.com