Pipeline Design Pattern

Implementation and examples of pipeline design pattern


Category: Design Patterns Tags: C#

Pipeline Design Pattern Code Files

Introduction

    Pipeline design pattern is used when we have to perform multiple tasks in sequence on data/item. An item flows through pipeline, going through multiple stages where each stage performs some task on it.

Pipeline design pattern
Fig 1: Pipeline design pattern

Real-time Example: Think of it as a water purifier, water purifiers perform multiple tasks on water in order to make it drinkable. Firstly water is collected inside small container and from there it starts getting into stage where purifier removes sodium from it and stores it into next container. Then as part of next step, purifier removes carbon impurities and stores it into next container and this goes on until water flows through all steps like RO, UF, UV etc.

Software Example: Suppose a social networking site where millions of posts and comments are posted every day and system has to mark any violent/abusive post asynchronously. We can build a pipeline based system which pulls data from database servers, find blacklisted words in posts and update their status back in database.

Implementation

    We will use .net core Worker Service project template for this implementation. Let’s implement pipeline pattern for our software example given above in introduction section. We start with creating some of the required models:

public abstract class BaseBufferItem
{
    public int RetryCount { get; set; }
}

public class BufferItem : BaseBufferItem { public int ItemId { get; set; } public string Text { get; set; } public bool ContainsBadWords { get; set; } }

RetryCount could be used to retry processing on items for number of times in case of failure. Now let’s create class for Step:

public class Step<T> where T : BaseBufferItem
{
    // Holds data for processing
    public BlockingCollection<T> Buffer { get;}
    // Action we want to perform on each item
    public Func<T, Task<T>> StepActionAsync { get; }
    // Should retry in case of failure?
    public bool ShouldRetry { get; set; }
    public int MaxRetryCount { get; set; }
    // Number of threads which will be running to process this step
    public int DegreeOfParallelism { get; set; } = 1;

    public Step(int bufferSize, Func<T, Task<T>> stepActionAsync)
    {
        Buffer = new BlockingCollection<T>(bufferSize);
        StepActionAsync = stepActionAsync;
    }
}

You can go through the comments on each property to understand each one but we will focus on two important properties Buffer and StepActionAsync. Buffer is used to hold data for a step in pipeline, when threads run for a step they pick data from Buffer and execute StepActionAsync on each item.

public class ItemsSummery
{
    public UInt64 NumberOfTotalItemsEnequed;
    public UInt64 NumberOfItemsSuccessfullyProcessed;
    public UInt64 NumberOfItemsFailedToProcess;

    private static ItemsSummery _itemsSummery = new ItemsSummery();
    private static object _object = new object();

    private ItemsSummery()
    { }

    public static UInt64 GetNumberOfTotalItemsEnequed()
    {
        lock (_object)
        {
            return _itemsSummery.NumberOfTotalItemsEnequed;
        }
    }
    public static UInt64 GetNumberOfItemsSuccessfullyProcessed()
    {
        lock (_object)
        {
            return _itemsSummery.NumberOfItemsSuccessfullyProcessed;
        }
    }
    public static UInt64 GetNumberOfItemsFailedToProcess()
    {
        lock (_object)
        {
            return _itemsSummery.NumberOfItemsFailedToProcess;
        }
    }

    public static UInt64 IncrementNumberOfTotalItemsEnequed()
    {
        lock (_object)
        {
            _itemsSummery.NumberOfTotalItemsEnequed++;
            return _itemsSummery.NumberOfTotalItemsEnequed;
        }
    }
    public static UInt64 IncrementNumberOfItemsSuccessfullyProcessed()
    {
        lock (_object)
        {
            _itemsSummery.NumberOfItemsSuccessfullyProcessed++;
            return _itemsSummery.NumberOfItemsSuccessfullyProcessed;
        }
    }
    public static UInt64 IncrementNumberOfItemsFailedToProcess()
    {
        lock (_object)
        {
            _itemsSummery.NumberOfItemsFailedToProcess++;
            return _itemsSummery.NumberOfItemsFailedToProcess;
        }
    }

    public static bool AreAllItemsProcessed()
    {
        lock (_object)
        {
            return _itemsSummery.NumberOfTotalItemsEnequed == (_itemsSummery.NumberOfItemsSuccessfullyProcessed + _itemsSummery.NumberOfItemsFailedToProcess);
        }
    }
}

ItemsSummery class maintains summary of how many items are enqueued so far, how many are successfully processed and how many are failed. We will also use this class later for graceful shutdown of service. We will implement pipeline class now, below is the class structure:

public sealed class Pipeline<T> where T : BaseBufferItem
{
    private List<Step<T>> _steps;
    private List<Task> _tasks;

    public event Func<T, Task> OnFinishAsync;

    public Pipeline()
    {
        _steps = new List<Step<T>>();
        _tasks = new List<Task>();
    }
}

We have lists for steps and threads, OnFinishAsync event can be used to execute some task on successful processing of an item after last step is passed. We will add methods to add steps and items.

public void AddStep(Step<T> step)
{
    _steps.Add(step);
}

public bool AddItem(T item)
{
    var firstStep = _steps[0];
    if (!firstStep.Buffer.IsAddingCompleted)
    {
        try
        {
            firstStep.Buffer.Add(item);
            ItemsSummery.IncrementNumberOfTotalItemsEnequed();
            return true;
        }
        catch
        {
            Console.WriteLine("unable to add item to pipeline");
        }
    }
    return false;
}

Basically AddStep appends a step into the steps list. AddItem is required to add new items into the buffer of the first step so they can be processed by pipeline.

public void StartPipeline()
{
    for (int i = 0; i < _steps.Count; i++)
    {
        int localStepIndex = i;
        for (int degreeOfParallelism = 0; degreeOfParallelism < _steps[i].DegreeOfParallelism; degreeOfParallelism++)
        {
            var task = Task.Run(async () => await StartStepAsync(localStepIndex));
            _tasks.Add(task);
        }
    }
}

StartPipeline is the main method which starts the pipeline by creating threads for each step. Each thread calls same method StartStepAsync.

private async Task StartStepAsync(int stepIndex)
{
    int numberOfSteps = _steps.Count;
    var step = _steps[stepIndex];
    var isFinalStep = (stepIndex == (numberOfSteps - 1));

    foreach (var input in step.Buffer.GetConsumingEnumerable())
    {
        try
        {
            var output = await step.StepActionAsync(input);

            if (!isFinalStep)
            {
                var nextStep = _steps[stepIndex + 1];
                nextStep.Buffer.Add(output);
            }
            else
            {
                if(OnFinishAsync != null) 
                    await OnFinishAsync(output);

                ItemsSummery.IncrementNumberOfItemsSuccessfullyProcessed();
            }
        }
        catch
        {
            if (step.ShouldRetry && step.MaxRetryCount > input.RetryCount)
            {
                input.RetryCount++;
                step.Buffer.Add(input);
            }
            else
            {
                ItemsSummery.IncrementNumberOfItemsFailedToProcess();
            }
        }
    }
}

StartStepAsync contains a loop which reads the buffer, GetConsumingEnumerable method is provided by BlockingCollection which blocks the execution until an item is available for processing. Once an item is received StepActionAsync is executed and result is passed to next step’s buffer and this goes till last step of the pipeline. Retry is attempted on exception by adding item back to the buffer of current step where it failed in processing.

public async Task StopPipelineAsync()
{
    _steps[0].Buffer.CompleteAdding();
    while (!ItemsSummery.AreAllItemsProcessed())
    {
        await Task.Delay(1000);
    }
    _steps.ForEach(x => x.Buffer.CompleteAdding());
    await Task.WhenAll(_tasks);
}

StopPipelineAsync is used for graceful shutdown of pipeline, basically this method marks first step's buffer as complete so no new item can be added to the pipeline. This method waits and doesn’t kill threads until existing items in pipeline are processed. 

Now let’s test our pipeline, since we have created project using Worker Service project template we must be having Worker class paste below code in Worker class.

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using PipelinePattern.Models;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

namespace PipelinePattern
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly Pipeline<BufferItem> _pipeline;
        private readonly CancellationTokenSource _cancellationTokenSource;

        List<string> badWords = new List<string> { "Kill", "Murder", "Fuck" };
        string[] sampleText = new string[]
            {
                "You are smart",
                "I will Kill you",
                "Get the fuck out of here",
                "I love food"
            };

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
            _pipeline = GetPipeline();
            _cancellationTokenSource = new CancellationTokenSource();
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Random random = new Random();
            _pipeline.StartPipeline();
            int id = 0;
            //Add items to the pipeline until service is stopped
            while (!_cancellationTokenSource.IsCancellationRequested)
            {
                await Task.Delay(500);
                id++;
                BufferItem item = new BufferItem();
                item.ItemId = id;
                // adding a random text from samples
                item.Text = sampleText[random.Next(0, sampleText.Length - 1)];
                if (_pipeline.AddItem(item))
                    Console.WriteLine($"Created item {id}");
            }
        }

        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource.Cancel();
            await _pipeline.StopPipelineAsync();
            PrintSummery();
        }

        private Pipeline<BufferItem> GetPipeline()
        {
            Pipeline<BufferItem> pipeline = new Pipeline<BufferItem>();

            //Step 1
            Func<BufferItem, Task<BufferItem>> findBadWords = async (BufferItem input) =>
            {
                await Task.Delay(1000);
                if (badWords.Any(x => input.Text.Contains(x, StringComparison.OrdinalIgnoreCase)))
                {
                    input.ContainsBadWords = true;
                }

                Console.WriteLine($"Processed step one for item {input.ItemId} ThreadID {Thread.CurrentThread.ManagedThreadId}");
                return input;
            };
            Step<BufferItem> stepOne = new Step<BufferItem>(10, findBadWords) { DegreeOfParallelism = 2 };
            pipeline.AddStep(stepOne);

            //Step 2
            Func<BufferItem, Task<BufferItem>> funcTwo = async (BufferItem input) =>
            {
                await Task.Delay(1000);
                // Write logic here to call an api
                Console.WriteLine($"Calling API to update status of itemid: {input.ItemId}, text: \"{input.Text}\" in database, Contains bad words: {input.ContainsBadWords}");
                Console.WriteLine($"Processed step two for item {input.ItemId} ThreadID {Thread.CurrentThread.ManagedThreadId}");
                return input;
            };
            Step<BufferItem> stepTwo = new Step<BufferItem>(10, funcTwo) { DegreeOfParallelism = 1 };
            pipeline.AddStep(stepTwo);

            //On Finish
            pipeline.OnFinishAsync += async (BufferItem input) =>
            {
                Console.WriteLine($"Item {input.ItemId} processed");
                await Task.CompletedTask;
            };

            return pipeline;
        }

        private void PrintSummery()
        {
            Console.WriteLine("-----------Summary-------------");
            Console.WriteLine($"Total items enqueued: {ItemsSummery.GetNumberOfTotalItemsEnequed()}");
            Console.WriteLine($"Total items successfully processed: {ItemsSummery.GetNumberOfItemsSuccessfullyProcessed()}");
            Console.WriteLine($"Total items failed to process: {ItemsSummery.GetNumberOfItemsFailedToProcess()}");
        }
    }
}

In above class we are having method GetPipeline to create pipeline object with required steps, each step is deligated with function to process an item and buffer limit. We have added two steps one is for finding bad words in text and another to update status in database. ExecuteAsync method has infinite loop which creates dummy items and adds it to pipeline. Output from the code is shown below:

info: Microsoft.Hosting.Lifetime[0]

      Application started. Press Ctrl+C to shut down.

info: Microsoft.Hosting.Lifetime[0]

      Hosting environment: Development

info: Microsoft.Hosting.Lifetime[0]

      Content root path: C:\Users\nikhiljo\source\repos\PipelinePattern\PipelinePattern

Created item 1

Created item 2

Processed step one for item 1 ThreadID 12

Created item 3

info: Microsoft.Hosting.Lifetime[0]

      Application is shutting down...

Processed step one for item 2 ThreadID 10

Processed step one for item 3 ThreadID 4

Calling API to update status of itemid: 1, text: "Get the fuck out of here" in database, Contains bad words: True

Processed step two for item 1 ThreadID 10

Item 1 processed

Calling API to update status of itemid: 2, text: "I will Kill you" in database, Contains bad words: True

Processed step two for item 2 ThreadID 10

Item 2 processed

Calling API to update status of itemid: 3, text: "You are smart" in database, Contains bad words: False

Processed step two for item 3 ThreadID 9

Item 3 processed

-----------Summary-------------

Total items enqueued: 3

Total items successfully processed: 3

Total items failed to process: 0

As we can see 3 items are processed, all the steps are executed for all the items. We tried stopping the application (by pressing ctrl+c) just after item 3 is added but application continued until all items are processed which is called graceful shutdown.


Like 1 Person
Last modified about 23 days ago
Nikhil Joshi

Nikhil Joshi
Ceo & Founder at Dotnetlovers
Atricles: 140
Questions: 12
Given Best Solutions: 12 *

Comments:

No Comments Yet

You are not loggedin, please login or signup to add comments:

Existing User

Login via:

New User



x