QueuedWorker

From Yefu's notes
Jump to: navigation, search
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace Yefu.Utils
{
        /// <summary>
    /// A class that puts jobs into a queue and allows several workers to do the jobs on their own threads
    /// </summary>
    class QueuedWorker
    {
        /// <summary>
        /// Multiple queues to queue the jobs. JobQueues[0] has the highest priority
        /// </summary>
        protected Queue<IJob>[] JobQueues;


        /// <summary>
        /// Gets a integer indicating the lowest priority.
        /// </summary>
        /// <value>The lowest priority.</value>
        public int LowestPriority
        {
            get
            {
                return JobQueues.Length-1;
            }
        }

        /// <summary>
        /// Gets a value indicating whether the queue is empty.
        /// </summary>
        /// <value><c>true</c> if the queue is empty; otherwise, <c>false</c>.</value>
        protected bool QueueEmpty
        {
            get
            {
                foreach (Queue<IJob> q in JobQueues)
                {
                    if (q.Count != 0)
                    {
                        return false;
                    }
                }
                return true;
            }
        }

        /// <summary>
        /// Retrieve a job from the queues, starting from the first one (highest priority).
        /// </summary>
        /// <returns></returns>
        protected IJob Dequeue()
        {
            foreach (Queue<IJob> q in JobQueues)
            {
                if (q.Count != 0)
                {
                    return q.Dequeue();
                }
            }
            return null;
        }

        /// <summary>
        /// Adds the job.
        /// </summary>
        /// <param name="job">The job.</param>
        public void AddJob(IJob job)
        {
            AddJob(job, LowestPriority);
        }

        /// <summary>
        /// Adds the job.
        /// </summary>
        /// <param name="job">The job.</param>
        /// <param name="priority">The priority pf the job. The priority should be between 0 (highest priority) and LowestPriority.</param>
        public void AddJob(IJob job, int priority)
        {
            lock (this)
            {
                if (priority > LowestPriority)
                {
                    priority = LowestPriority ;
                }
                if (priority < 0)
                {
                    priority = 0;
                }
                JobQueues[priority].Enqueue(job);
                Monitor.Pulse(this);
            }
        }
        /// <summary>
        /// Workers
        /// </summary>
        private List<Thread> workers = new List<Thread>();
        /// <summary>
        /// Workers' threads.
        /// </summary>
        private void worker_thread()
        {
            for (; ; )
            {
                IJob cmd = null;
                lock (this)
                {

                    if (QueueEmpty)
                    {
                        Monitor.Wait(this);
                    }
                    cmd = Dequeue();
                }
                if (cmd != null)
                {
                    cmd.DoJob();
                }
            }
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="QueuedWorker"/> class with 1 worker.
        /// </summary>
        public QueuedWorker()
            : this(1)
        {

        }
        /// <summary>
        /// Initializes a new instance of the <see cref="QueuedWorker"/> class with n workers.
        /// </summary>
        /// <param name="n_workers">The number of workers.</param>
        public QueuedWorker(int n_workers):this(n_workers,1)
        {

        }

        /// <summary>
        /// Initializes a new instance of the <see cref="QueuedWorker"/> class.
        /// </summary>
        /// <param name="n_workers">The number of workers.</param>
        /// <param name="n_queues">The number of priority levels.</param>
        public QueuedWorker(int n_workers, int n_priority)
        {
            JobQueues = new Queue<IJob>[n_priority];
            for (int i = 0; i < n_priority; i++)
            {
                JobQueues[i] = new Queue<IJob>();
            }
            for (int i = 0; i < n_workers; i++)
            {
                Thread worker = new Thread(new ThreadStart(worker_thread));
                worker.Name = "Worker Thread #" + i;
                workers.Add(worker);
                worker.Start();
            }
        }

        /// <summary>
        /// A interface for jobs. 
        /// </summary>
        public interface IJob
        {
            /// <summary>
            /// Does the job.
            /// </summary>
            void DoJob();
        }
    }
}


Example

public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        Yefu.Utils.QueuedWorker w = new Yefu.Utils.QueuedWorker(10);

        private void button1_Click(object sender, EventArgs e)
        {

            for (int i = 0; i < 100; i++)
            {
                thisjob j = new thisjob();
                w.AddJob(j);
            }

        }

        private class thisjob : Yefu.Utils.QueuedWorker.IJob
        {

            static int counter = 0;
            static Random rand = new Random(0);
            #region IJob Members

            public void DoJob()
            {

                Console.WriteLine("Counter :"+counter);
                counter++;
                System.Threading.Thread.Sleep(rand.Next(1000));
            }

            #endregion
        }
    }


<comments/>