Q: Зачем это?
A: Библиотека содержит расширяемый набор классов и интерфейсов, представляющих для .NET разработчика простой инструмент для реализации собственной СМО.
Q: Пример использования?
A: Например, Вы пишете приложение которое принимает из сети пакеты, фильтрует, разбирает и складывает их в базу, при этом, каждый из перечисленных шагов выносится в отдельную СМО (отдельный(-ые) поток(и) со своей очередью). Обработка задач из очереди на каждом шаге происходит независимо от других шагов в отдельном потоке, что позволяет достичь максимальной производительности системы целиком.
Q: Как устроена?
A: Основу библиотеки NQueueing представляет интерфейс IServerQueue.
Интерфейс является generic-типом, и типизируется типом задачи. Все обработчики очередей реализуют данный интерфейс.
Уровнем выше, находится абстрактный базовый класс для всех обработчиков очередей ServerQueueBase, инкапсулирующий в себя общие свойства любой СМО (например: объект очередь и массив потоков).
Следующий уровень абстракции - это конкретные реализации различных типов СМО, унаследованные от ServerQueueBase классы: StandartServerQueue (для поштучной обработки задач из очереди) и BatchServerQueue (для пакетной обработки):
Q: Как использовать?
A: Использовать библиотеку достаточно просто:
- Скачать библиотеку.
- Добавить на нее reference в проекте. После чего станет доступен namespace NQueueing.
- В зависимости от типа задачи, выбрать соответствующий тип СМО (StandartServerQueue, BatchServerQueue итп).
- Далее создать экземпляр СМО (выбранной на предыдущем шаге), передав в конструктор ссылку на делегат, в котором и описывается процесс обработки пришедшей задачи, и указав параметры СМО: длина очереди, число потоков обработки.
- При помощи потоко-безопасных TryEnqueue/Enqueue ставить задачи в очередь.
using System;
using System.Threading;
// Добавить reference на библиотеку
using NQueueing;
namespace NQueueingTestApp
{
class Program
{
// Имитация случайной задержки
static Random rnd = new Random(DateTime.Now.Millisecond);
static void RandomSleep(int min, int max)
{
Thread.Sleep(rnd.Next(min, max));
}
static void ProcessingOne(int item)
{
// Обработать здесь задачу item...
// Обработка происходит в отдельном потоке.
RandomSleep(10, 300); // Случайная задержка в обработке
// Номер текущего потока, от 0 до (maxThreadsCount-1)
string name = Thread.CurrentThread.Name;
Console.WriteLine("{0} / Thread №: {1}", item, name);
}
static void Main(string[] args)
{
// Создадим Систему Массового Обслуживания состоящую
// из 3-х потоков и очередью длины 10.
IServerQueue<int> smo = new StandartServerQueue<int>(ProcessingOne, 10, 3);
smo.Start(); // Запускаем процесс обработки
// Добавим 15 задач в очередь
for (int task = 0; task < 15; task++)
{
// Поставим задачу в очередь в текущем потоке
EnqueueStatus result = smo.TryEnqueue(task);
RandomSleep(10, 100); // Случайная задержка при добавлении
Console.WriteLine("{0} / Enqueue: {1}", task, result);
}
smo.Close(); // Не забываем остановить
Console.ReadKey();
}
}
}
Q: Расширяемость?
A: Да, такая возможность есть. Для этого необходимо реализовать интерфейс IServerQueue, либо реализовать наследника от базового класса ServerQueueBase.
Буду благодарен за Ваши отзывы, рекомендации и конструктивные советы.
Ссылки:
"и очередью длины 10"
ОтветитьУдалитьчто такое "длина очереди" ?
@zerkms
ОтветитьУдалитьStandartServerQueue(ProcessingOne, 10, 3);
Имелось в виду максимальный размер размер накопителя или максимальная длина очереди.
Другими словами это параметр K в нотации Кендалла:
"K: The number of places in the system
The capacity of the system, or the maximum number of customers allowed in the system including those in service. When the number is at this maximum, further arrivals are turned away. If this number is omitted, the capacity is assumed to be unlimited, or infinite."
http://en.wikipedia.org/wiki/Kendall%27s_notation
Чем она лучше/хуже NServiceBus?
ОтветитьУдалитьВсё-таки не совсем понятна сфера применения, и от этого непонятно зачем нужна эта библиотека.
ОтветитьУдалитьДля "приложения которое принимает из сети пакеты, фильтрует, разбирает и складывает их в базу" при разумной нагрузке думаю проще использовать Task Parallel Library и BlockingCollection.
@Александр
ОтветитьУдалитьNServiceBus - более ориентирована на взаимодействие нескольких систем. Насколько я помню, в NServiceBus главное это транспорт заявки.
NQueueing фактически очередь и потоки Queue+Threads.
@Konstantin
ОтветитьУдалитьTask Parallel Library и BlockingCollection все это из .NET 4.0.
Здравствуйте!
ОтветитьУдалитьЯ новичек в .NET, есть простая задача - нужна очередь потоков, максимальное количество одновременно выполняемых потоков допустим 10, в очередь можно в любой момент добавлять потоки, которые стартуют когда подойдёт их "очередь".
Как это сделать попроще? Любые .NET-технологии.
@Hermann
ОтветитьУдалитьНапример используя NQueueing так:
static void ProcessingOne(Thread item)
{
item.Start();
item.Join();
}
static void Main(string[] args)
{
IServerQueue smo = new StandartServerQueue(ProcessingOne, 64, 10);
smo.Start();
for (int i = 0; i < 15; i++)
{
smo.Enqueue(new Thread(() => { /* Ваш thread */ }));
RandomSleep(10, 100);
}
smo.Close();
}
Великолепно, спасибо большое!
ОтветитьУдалитьИ маленький вопрос, потоки стартуют в порядке добавления в очередь, или могут в любом порядке? По этому маленькому примеру кажется что в любом порядке:
IServerQueue smo = new StandartServerQueue(ProcessingOne, 512, 10);
smo.Start();
for (int i = 0; i < 200; i++)
{
int i1 = i;
smo.Enqueue(new Thread(() => { Console.WriteLine(i1); Thread.Sleep(5000); }));
}
smo.Stop();
Хм, парсер съедает знаки меньше-больше...
ОтветитьУдалитьДа, в любом порядке.
ОтветитьУдалитьМожет произойти следующая ситуация: извлекается первый поток из очереди и пока ему делают item.Start(); другой поток извлекает уже второй поток из очереди и ему же успевает сделать item.Start(); быстрее первого. Тогда второй окажется запущен быстрее первого.
По правде говоря у меня возник точно такой же вопрос как и у Konstantin Savelev'а.
ОтветитьУдалитьДело в том, что Task Parallel Library и Blocking Collection можно использовать в .NET Framework 3.5 SP1.
Есть релиз библиотеки Reactive Extensions for .NET 3.5, в которую как раз входит то, что раньше называли Parallel Extensions CTP.
Соответственно вопрос. Есть ли какие-то выгодные отличия в API этой библиотеки, ориентированной на определенную задачу от более обобщенного API у Parallel Extensions?
...еще раз прочитал исходную задачу. Вполне возможно что Parallel Extensions тут не очень подойдет. Возможно что тут будет более полезно использовать как раз Reactive Extensions и прокидывать приходящие данные через разные этапы обработки, при этом на каждом из этапе использовать свой Rx-планировщик (IScheduler).
Конечно это при условии что внешний мир проталкивает нам данные (пакеты), а не наша система вытягивает эти данные извне.
Rx, TPL это инструменты для широкого применения, а nQueueing это готовый компонент, решающий конкретную задачу - как можно быстро и просто построить систему массового обслуживания.
ОтветитьУдалитьЭтот комментарий был удален автором.
ОтветитьУдалитьПриветствую! Интересный проект, не совсем понятно:
ОтветитьУдалить"При помощи потоко-безопасных TryEnqueue/Enqueue ставить задачи в очередь"
А сразу обработать задачу, не ставя её в очередь, нельзя?
И можно ещё вопрос, как происходит извлечение задачи из очереди?
ОтветитьУдалить>> А сразу обработать задачу, не ставя её в очередь, нельзя?
ОтветитьУдалитьЕсли интенсивность потока заявок превышает интенсивность обработки, то мы не откидываем заявку, а ставим в очередь, иначе пришлось бы ее откинуть.
>> И можно ещё вопрос, как происходит извлечение задачи из очереди?
В этой версии посредством "опроса очереди" polling. В данном случае это не оптимально. Правильно было бы использовать push механизм. Хотя для batch очереди polling оправдан, собственно для таких очередей я и задумывал проект.
Если Вам необходимо реализовать producing-consuming pattern, то я рекомендую использовать System.Collections.Concurrent.ConcurrentQueue в качестве очереди. Правда это .NET4.0
>>Если интенсивность потока заявок превышает интенсивность обработки, то мы не откидываем заявку, а ставим в очередь, иначе пришлось бы ее откинуть.
ОтветитьУдалитьНет, я имею ввиду случаи, когда интенсивность потока не превышает интенсивность обработки, а просто имеются не занятые обработкой потоки. Т.е. напрямую назначить свободному потоку обработку задачи, игнорируя тем самым дополнительный перенос задачи в очередь.
Мне сегодня этот компонент помог, поставленная задача решена, спасибо)
Ещё правда возникла проблема с тем, что в текущий момент времени не удаётся определить длину очереди и её средний показатель, за всё время работы СМО.
Имею ввиду текущее колличество задач, поставленных в очередь.
ОтветитьУдалитьНу если интенсивность потока ниже чем интенсивность обработки, то тогда очередь не нужна (так как она будет всегда пустой). В таком случае обрабатывать заявки надо в пуле потоков: либо стандартным (http://msdn.microsoft.com/ru-ru/library/system.threading.threadpool.aspx), либо написать свой.
ОтветитьУдалитьДлину очереди можно вычислить теоретически либо измерить практически. В первом случае надо знать показатели системы, а во втором достаточно задать размер очереди с "запасом" и погонять, при этом собирая статистику.
Здравствуйте Илья. Я новичек в C# и сейчас у меня есть задачка составить СМО. Есть несколько вопросов: 1. Как мне сделать, чтобы задача подавалась раз в 20 минут? 2. Я так понимаю если есть 3 свободных потока и 10 задач, то берутся сразу 3 задачи на выполнение, по освобождению поток берутся еще 3. А если моя задача должна выполняться сначала в 1 потоке(допустим 10 минут), после должна выполняться во 2 потоке(8 минут), как мне это реализовать?
ОтветитьУдалить>> 1. Как мне сделать, чтобы задача подавалась раз в 20 минут?
ОтветитьУдалитьВам нужно использовать планировщик задач. Есть 2 пути решения: написать свой и очень простой либо использовать готовую библиотеку. Свой планировщик пишется с использованием таймера (см http://msdn.microsoft.com/en-us/library/system.threading.timer.aspx), из готовых например http://quartznet.sourceforge.net.
>>А если моя задача должна выполняться сначала в 1 потоке(допустим 10 минут), после должна выполняться во 2 потоке(8 минут), как мне это реализовать?
Блоки выполнения можно соединять либо параллельно либо последовательно. В вашем случае нужно именно последовательно. Если рассмотреть мой код, то в методе ProcessingOne нужно сделать новую задачу, и запланировать ее в новой СМО: smo1.TryEnqueue(task1);
PS: Олег, моя статья очень сильно устарела, и я настоятельно рекомендую вам воспользоваться TPL для решения этой задачи: http://msdn.microsoft.com/en-us/library/dd460717.aspx, или
TPL Dataflow: http://msdn.microsoft.com/en-us/devlabs/gg585582.aspx.