Amazon.com Widgets All posts tagged 'Producer/Consumer pattern'

WilliaBlog.Net

I dream in code

About the author

Robert Williams is an internet application developer for the Salem Web Network.
E-mail me Send mail
Code Project Associate Logo
Go Daddy Deal of the Week: 30% off your order at GoDaddy.com! Offer expires 11/6/12

Recent comments

Archive

Authors

Tags

Disclaimer

The opinions expressed herein are my own personal opinions and do not represent my employer's view in anyway.


Using System.Threading.Tasks and BlockingCollections to FTP multiple Files at the same time

I recently needed to write an application that would loop through a queue of files and FTP them to our Content Delivery Network for streaming. Users upload files, and our administrators can mark some of them as urgent. Urgent ones need to jump to the front of the queue, otherwise everything should be orderd by broadcast date. My initial code was basically a loop that looked something like this:

While (GetFtpQueue().Count > 0)
{
    // Gather all the info from the db,  Ftp the file, then clean up (move the source file, update the db, etc.

}

It worked beautifully while we were just uploading small audio files, but as soon asl we started adding a lot of video files to the queue it became so slow that it might take 2 hours or more to upload a single video file. So, we experimented with Filezilla to see how many concurrent uploads we could add before the overall speed of each upload started to drop. We found that at our location, 4 simultaneous FTP uploads seemed to hit the sweet spot: instead of uploading 1 file at 500 kb/s we could upload all four and each one would still be at that speed, quadrupling our throughput.

I read up on using the new Threading classes in .Net 4.0, and began refactoring my FTP application. I decided to use the Task Factory to manage the threads, in conjunction with a BlockingCollection to create a classic Producer/Consumer pattern. My first attempt looked a lot like this:

int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();

// Stage #1: Producer
var initFtp = Task.Factory.StartNew(() =>
{
    try
    {
        While (GetFtpQueue().Count > 0)
        {
            // Gather all the info from the db and use it to create FtpItem objects
            // Add them to list of filesToFtp, which only allows maxThreads in at a time (this allows us to have an urgent item jump to the top while current items are still FTPing)
            filesToFtp.Add(new FtpItem { ... };
        }
    }
    finally { filesToFtp .CompleteAdding(); }
});

// Stage #2 Consumer of initFtpTask and Producer for Cleanup Task
var process = Task.Factory.StartNew(() =>

{
    try
    {
        foreach(var file in filesToFtp.GetConsumingEnumerable()
        {
            // Ftp the file
            // Add to list of processedFiles
            processedFiles.Add(file);
        }
    }
    finally { processedFiles.CompleteAdding(); }
});

// Stage #3
var cleanup = Task.Factory.StartNew(() =>
{
    foreach(var file in processedFiles.GetConsumingEnumerable()
    {
        // Clean up (move the source file, update the db, etc.
    }
});

Task.WaitAll(initFtp, process, cleanup);

Initially, this looked quite promising. I wrote a bare bones version of it like the one above that just did thread.sleep to simulate work and iterated through a list of ints. I was ablt to verify that each "stage" was running on it's own thread, that it never allowed more than 4 items through at a time, that I could add items to the front of the queue and get them processed next, and that it never tried to 'cleanup' a file until that file had passed through both stage 1 and stage 2. However, I did notice that the elapsed time was the same as when I ran a similar unit test in a simple while loop. It might be obvious to you why this is, but at the time I put it down to a limitation of the unit test and pushed my new code to production. The first thing I noticed was that it wasn't any faster. Not even slightly. It took me hours of staring at the code to finally figure out why my multi threaded code was not running any faster, but the answer is simple: I only created one consumer of filesToFtp. I had incorrectly assumed that because I was creating up to 4 ftpItems at a time, and the ftp process was running on it's own thread, that it would consume as many as it could, but the reality is that in the code above, while each of the three stages are running on their own thread, the whole process was still happening in series, since stage 1 doesn't create 4 items at once, it creates them one after the other, stage 2 does begin working before stage 1 is complete (as soon as there is an item to consume), but then it will be busy Ftping that first item until that item is fully uploaded, only then will it grab the next file.

To resolve this problem, I simply wrapped stage 2 in a for loop, and created a IList of Tasks to wait on:

int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
IList<Task> tasks = new List<Task>();

// Stage #1: Producer
tasks.Add(Task.Factory.StartNew(() =>
{
    try
    {
        While (GetFtpQueue().Count > 0)
        {
            // Gather all the info from the db and use it to create FtpItem objects
            // Add them to list of filesToFtp, which only allows maxThreads in at a time (this allows us to have an urgent item jump to the top while current items are still FTPing)
            filesToFtp.Add(new FtpItem { ... };
        }
    }
    finally { filesToFtp .CompleteAdding(); }
}));

// Start multiple instances of the ftp process
for (int i = 0; i < maxThreads; i++)
{
    // Stage #2 Consumer of initFtpTask and Producer for Cleanup Task
    tasks.Add(Task.Factory.StartNew(() =>
    {
	try
	{
		foreach(var file in filesToFtp.GetConsumingEnumerable()
		{
			// Ftp the file
			// Add to list of processedFiles
			processedFiles.Add(file);
		}
	}
	finally { processedFiles.CompleteAdding(); }
	}));
}

// Stage #3
tasks.Add(Task.Factory.StartNew(() =>
{
	foreach(var file in processedFiles.GetConsumingEnumerable()
	{
		// Clean up (move the source file, update the db, etc.
	}
}));

Task.WaitAll(tasks.ToArray());

I reran the unit test and it was faster! Very nearly 4 times faster in fact. Wahoo! I updated the code, published my changes and sat back. Sure enough, the Ftp process finally started to make up some ground. In the mean time, I went back to my unit test and began tweaking. The first thing I noticed was that sometimes I would get a "System.InvalidOperationException: The BlockingCollection<T> has been marked as complete with regards to additions." Luckily, this didn't take a lot of head scratching to figure out: the first thread to reach the 'finally' clause of  stage 2 closed the processedFiles collection, leaving the other three threads hanging. A final refactoring resolved the issue:

int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
IList<Task> tasks = new List<Task>();

// maintain a seperate list of wait handles for the FTP Tasks, 
// since we need to know when they all complete in order to close the processedFiles blocking collection
IList<Task> ftpProcessTasks = new List<Task>();

// Stage #1: Producer
tasks.Add(Task.Factory.StartNew(() =>
{
	try
	{
		While (GetFtpQueue().Count > 0)
		{
			// Gather all the info from the db and use it to create FtpItem objects
			// Add them to list of filesToFtp, which only allows maxThreads in at a time (this allows us to have an urgent item jump to the top while current items are still FTPing)
			filesToFtp.Add(new FtpItem { ... };
		}
	}
	finally { filesToFtp .CompleteAdding(); }
}));

// Start multiple instances of the ftp process
for (int i = 0; i < maxThreads; i++)
{
	// Stage #2 Consumer of initFtpTask and Producer for Cleanup Task
	ftpProcessTasks.Add(Task.Factory.StartNew(() =>
	{
		try
		{
			foreach(var file in filesToFtp.GetConsumingEnumerable()
			{
				// Ftp the file
				// Add to list of processedFiles
				processedFiles.Add(file);
			}
		}
	}));
}

// Stage #3
tasks.Add(Task.Factory.StartNew(() =>
{
	foreach(var file in processedFiles.GetConsumingEnumerable()
	{
		// Clean up (move the source file, update the db, etc.
	}
}));


// When all the FTP Threads complete
Task.WaitAll(ftpProcessTasks.ToArray());

// Notify the stage #3 cleanup task that there is no need to wait, there will be no more processedFiles.
processedFiles.CompleteAdding();

// Make sure all the other tasks are complete too.
Task.WaitAll(tasks.ToArray());

Download a working example (Just enter your FTP Server details prior to running):

ProducerConsumer.zip (11.18 mb)


Posted by Williarob on Monday, April 18, 2011 11:47 AM
Permalink | Comments (0) | Post RSSRSS comment feed