Легенда:
новое сообщение
закрытая нитка
новое сообщение
в закрытой нитке
старое сообщение
|
- Напоминаю, что масса вопросов по функционированию форума снимается после прочтения его описания.
- Новичкам также крайне полезно ознакомиться с данным документом.
| |
В части самодельной FIFO - ну что тут скажешь ... понятно. 31.12.07 08:11 Число просмотров: 5334
Автор: void <Grebnev Valery> Статус: Elderman
|
> Для передачи сообщений потокам хорошо использовать IOCP или > самодельную FIFO очередь. В части самодельной FIFO - ну что тут скажешь ... понятно.
В части IOCP для не I/O запросов. Поправьте, если я не прав (я слабый программист) -
на мой взгляд, это наиболее простое и ясное программирование для Windows для тех задач, где это можно применить. Есть и подводные камни и вопросы, которые придётся отстаивать:
1) для производительных систем (более 50 000 не I/O запросов в секунду, до 100 000) использование простых решений "всё в одном флаконе", типа QueueUserWorkItemможет_не_работать даже для минимального кол-ва потоков WT_EXECUTEINPERSISTENTTHREAD, если после посылки элемента queue в очередь IOCP не переключать контекст ::SwitchToThread(), типа:
static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
{
// Pack the publisher message
ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
...
// Queue message to the IOCP
if (!::QueueUserWorkItem(Router::RouterWorkItemProc, (PVOID) topicData, WT_EXECUTEINPERSISTENTTHREAD /*1 thread*/ ) )
delete topicData;
...
return E_FAIL;
}
...
::SwitchToThread();
return S_OK;
}
Причина в том, FIFO запросов IOCP забивается полностью. Реально при 1000 элементах очереди производительность снижается в разы. При 10 000 и более - может и reboot компа понадобтся. Увеличение числа потоков на порту (WT_EXECUTEDEFAULT, WT_EXECUTELONGFUNCTION) только ухудшат ситуацию, если посылки в порт запроса не переключиться ::SwitchToThread().
2) Использование контроля количества потоков и размера очереди запросов IOCP () несомненно улучшает ситуацию. Трюк в том, что можно переключаться ::SwitchToThread() не каждый раз после посылки очередного месаджа, когда очередь достигает N - элементов. Реально, у меня получается оптимальное число N ~ 100-250 и должно быть разным на разном хардваре:
static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
{
// Pack the publisher message
ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
...
// Queue message to the IOCP
if (!::PostQueuedCompletionStatus( g_hIOCP, 0, 0, (OVERLAPPED*)topicData )) {
delete topicData;
...
return E_FAIL;
}
...
if (GetIoCompletionQueueLen(g_hIOCP) > 250)
::SwitchToThread();
return S_OK;
}
Врядли можно здесь оспаривать главные преимущества - масштабируемость на многопроцессорных компютерах и простота программирования. Однако, это не очень привычный подход для не I/O запросов.
Мне IOCP больше нравится для многопроцессорной обработки, чем самодельный FIFO. Тем более, что "самодельный" нормальный пул потоков (работающий с этой FIFO) с контролем их состояний - очень трудная задача.
Спасибо, Игорь за мнение. С Новым Годом!
|
<programming>
|
[Win32] Две проблемы с передачей сообщения thred-у 28.12.07 09:20
Автор: void <Grebnev Valery> Статус: Elderman
|
Подскажите пожалуйста бест практис о следующем.
Приложение Publisher посылает обновление приложению Router. Roter пересылает это сообщение вначале в свой workthread (где сообщение обрабатывается), и далее пересылается приложению Subscriber:
HRESULT Router::OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
{
// Pack the publisher message
ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
memmove(topicData->m_data, data, len);
// Queue message to the Router thread
if (!::PostThreadMessage(m_RouterMsgThreadID, WM_USER_ON_PUBLISHER_PUSH, 0, (LPARAM) topicData ) )
{
DWORD err = GetLastError();
delete topicData;
if ( ERROR_NOT_ENOUGH_QUOTA == err ) {
std::cout << "\nERROR_NOT_ENOUGH_QUOTA" << std::endl;
g_cnt_lost ++;
//::SwitchToThread();
}
return E_FAIL;
}
//::Sleep(0);
return S_OK;
}
В рабочем потоке это сообщение, topicData, распаковывается в transmittedTopic, обрабатывается и удаляется delete transmittedTopic:
DWORD WINAPI Router::RouterMessageThreadProc( LPVOID lpParameter )
{
MSG msg;
// Create the message queue
...
...
BOOL bRet;
while( (bRet = ::GetMessage( &msg, NULL, 0, 0 )) != 0)
{
if (bRet != -1 error)
{
if (WM_TIMER == msg.message) {
...
...
}
}
else if (WM_USER_ON_PUBLISHER_PUSH == msg.message) {
//Unpack the message
ConcreteTopicA* transmittedTopic = (ConcreteTopicA*) msg.lParam;
// TO DO
...
...
delete transmittedTopic;
...
...
...
Первая проблема, в том, что для обработки сообщения внутри RouterMessageThreadProc надо:
1) создать структуру ConcreteTopicA* topicData = new ConcreteTopicA
2) скопировать данные memmove из transmittedTopic для последующей обработки
3) удалить структуру delete transmittedTopic;
Всё это (new, memmove, delete) может быть накладно и долго как интерфейса Publisher, так и для самого Router. Кроме того, поскольку Publisher может весьма интенсивно обновлять данные, это может фрагментировать память и снизить общий перформанс.
Вторая проблема в том, что при интенсивной посылке сообщений в поток, часть сообщений теряется из очереди из-за её переполнения (GetLastError ==ERROR_NOT_ENOUGH_QUOTA ). Теряется приблизительно 50-80 сообщений из 1000 000. Изменение, размера очереди потока по умолчанию (10000 для W2k/XP/W2k3) - не подходит. Кроме того? при переполнении очереди появляется другая, вполне ожидаемая и более существенная проблема - сообщения по таймеру потока (WM_TIMER) вообще теряют всякий смысл.
По поводу первой проблемы - смутно мелькает такая мысль: данные Publisher копировать в преаллокированный глобальный массив, а в поток передавать индекс элемента этого массива. Массив никогда не очищать и не освобождать. Для синхронизации сообщений дополнительно использовать вектор индексов "свободных фреймов" памяти.
По поводу второй проблемы - не мелькает никакая мысль, даже смутно.
Может ли кто-нибудь помочь советом, примером?
Спасибо.
|
|
По поводу new/delete самого ConcreteTopic-а: можно... 28.12.07 15:22
Автор: amirul <Serge> Статус: The Elderman
|
> Первая проблема, в том, что для обработки сообщения внутри > RouterMessageThreadProc надо: > 1) создать структуру ConcreteTopicA* topicData = new > ConcreteTopicA > 2) скопировать данные memmove из transmittedTopic для > последующей обработки > 3) удалить структуру delete transmittedTopic; > Всё это (new, memmove, delete) может быть накладно и долго > как интерфейса Publisher, так и для самого Router. Кроме > того, поскольку Publisher может весьма интенсивно обновлять > данные, это может фрагментировать память и снизить общий > перформанс.
По поводу new/delete самого ConcreteTopic-а: можно организовать пул (пример пула насколько я помню есть у страуструпа в его C++ Programming Language). Идея в том, чтобы выделить некоторое количество памяти, порезать ее на одинаковые куски и связать куски в односвязный список. Выделение и освобождение памяти в этом пуле - простейшие операции добавления и удаления элемента в голову списка и имеют сложность O(const). Если надо поддержка увеличения, то можно выделять chunk-ами по. В принципе и писать то ничего не надо, все уже есть в boost::pool и в Loki::SmallObjAllocator. Причем решается одновременно и проблема оверхеда по времени и проблема фрагментации памяти.
Что до memmove, то я бы предпочел использовать какую нибудь схему передачи владения (или совместного владения): std::auto_ptr, boost::shared_ptr, boost::intrusive_ptr, Loki::SmartPtr и т.п.. Вместо копирования передать владение (выделить в одном месте, удалить в другом).
> Вторая проблема в том, что при интенсивной посылке > сообщений в поток, часть сообщений теряется из очереди > из-за её переполнения (GetLastError > ==ERROR_NOT_ENOUGH_QUOTA ). Теряется приблизительно 50-80 > сообщений из 1000 000. Изменение, размера очереди потока по > умолчанию (10000 для W2k/XP/W2k3) - не подходит. Кроме > того? при переполнении очереди появляется другая, вполне > ожидаемая и более существенная проблема - сообщения по > таймеру потока (WM_TIMER) вообще теряют всякий смысл.
А это обязательно использовать стандартные виндовые сообщения?
Как уже написали, самопальная FIFO (std::list или даже std::deque) будет гораздо эффективнее и при этом будет уверенность, что потерявшихся сообщений не будет.
|
| |
Не стал пока делать 20.01.08 05:31
Автор: void <Grebnev Valery> Статус: Elderman
|
Не стал пока делать
" ...схему передачи владения (или совместного владения):...", равно как и использованье "стандартных" memory pool. Сделал простой массив назеранных кусков преадллокированного буфера.
Возможо, что написанный класс CMemoryArray далеко неоптимален, но получилось, наверное, то что и должно было получиться. Увеличение производительности не более 3%.
Три процента, всё же лучше чем ничего, хотя вцелом код с использование CMemoryArray получается более корявым по сравнению с new/delete.
|
| |
Приблизительно это я имел ввиду. У Страуструпа не помню... 31.12.07 07:01
Автор: void <Grebnev Valery> Статус: Elderman
|
> По поводу new/delete самого ConcreteTopic-а: можно > организовать пул (пример пула насколько я помню есть у > страуструпа в его C++ Programming Language). Идея в том, > чтобы выделить некоторое количество памяти, порезать ее на > одинаковые куски и связать куски в односвязный список. > Выделение и освобождение памяти в этом пуле - простейшие > операции добавления и удаления элемента в голову списка и > имеют сложность O(const).
Приблизительно это я имел ввиду. У Страуструпа не помню такого. Попробую в инете поискать достойные прототипы. В принципе ясно, как это должно бы работать.
> Если надо поддержка увеличения, > то можно выделять chunk-ами по.
Поддержка увеличения не нужна. По крайне мере так видится сейчас.
> А это обязательно использовать стандартные виндовые > сообщения? > Как уже написали, самопальная FIFO (std::list или даже > std::deque) будет гораздо эффективнее и при этом будет > уверенность, что потерявшихся сообщений не будет.
Не обязательно в принципе. Но в этом проекте, другие девелоперы используют виндовую очередь. Я только делаю там свой кусок. Так сложилось исторически (попросту был копипастнут кусок майкрософтовского примера RTD сервера). То, что и в других критических по производительности проектах виндовая очередь ведёт себя плохо - это факт. Местная публика лечит это увеличением размера очереди до 100 000. На мой вкус это лажа - приложение делает вид, что работает. Масштабируемости 0. Мульпроцессорной обработки - думаю, не больше.
Отчасти (как временное решение ?) можно подлечить проблему потери сообщений, если после посылки сообщения переключать контекст потока ::SwitchToThread(), см. ниже. Тогда в рабочем потоке мы "успеваем" вытащить элемент до переполнения очереди. По крайней мере в стресс-тестах (~20000 месаджей в секунду) сообщения не теряются:
HRESULT Router::OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
{
// Pack the publisher message
ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
...
if (!::PostThreadMessage(g_RouterMsgThreadID, WM_USER_ON_PUBLISHER_PUSH, 0, (LPARAM) topicData ) ) {
delete topicData;
...
return E_FAIL;
}
::SwitchToThread();
return S_OK;
}
Здесь две проблемы: как сам понимаешь, "улучшение синхронизации" при помощи Sleep(0) - полная лажа и никаких гарантий на будущее, поскольку проблема по существу не решается.
Вторая проблема - интерфейс Router::OnPublisherPush может быть синхроный (он таковым сейчас пока и является COM/DCOM). Поэтому, решение со ::SwitchToThread() до завершения функции Router::OnPublisherPush будет держать Publisher. А это пострашнее любых задержек и потерь месаджей на Router-е. Publisher выполняет основную работу в системе.
Короче, надо тестировать всю систему, чтобы сказать можно ли использовать ::SwitchToThread(), или нет с точки зрения производительности приложения Publisher.
Спасибо тебе за мнение, Сергей.
С Новым Годом!
|
| | |
У Страуструпа это было в 19.4.2 Распределители памяти,... 02.01.08 14:38
Автор: amirul <Serge> Статус: The Elderman
|
> Приблизительно это я имел ввиду. У Страуструпа не помню > такого. Попробую в инете поискать достойные прототипы. В > принципе ясно, как это должно бы работать.
У Страуструпа это было в 19.4.2 Распределители памяти, определяемые пользователем, а достойные прототипы я уже привел: в частности я бы использовал boost::pool
|
| | |
Уж лучше самодельная очередь... 02.01.08 07:36
Автор: void <Grebnev Valery> Статус: Elderman
|
> Здесь две проблемы: как сам понимаешь, "улучшение > синхронизации" при помощи Sleep(0) - полная лажа и никаких > гарантий на будущее, поскольку проблема по существу не > решается.
Смешно... Получается с небольшой потерей производительности (~7%) защитить виндовую очередь от переполнения при помощи семафора, инициализируемого ~ размером очереди из реестра виндовз. Такой вот конгломерат - очередь виндовз с семафором ;))
|
|
Для передачи сообщений потокам хорошо использовать IOCP или... 28.12.07 11:51
Автор: IgorR <Igor Razin> Статус: Member
|
Для передачи сообщений потокам хорошо использовать IOCP или самодельную FIFO очередь.
|
| |
В части самодельной FIFO - ну что тут скажешь ... понятно. 31.12.07 08:11
Автор: void <Grebnev Valery> Статус: Elderman
|
> Для передачи сообщений потокам хорошо использовать IOCP или > самодельную FIFO очередь. В части самодельной FIFO - ну что тут скажешь ... понятно.
В части IOCP для не I/O запросов. Поправьте, если я не прав (я слабый программист) -
на мой взгляд, это наиболее простое и ясное программирование для Windows для тех задач, где это можно применить. Есть и подводные камни и вопросы, которые придётся отстаивать:
1) для производительных систем (более 50 000 не I/O запросов в секунду, до 100 000) использование простых решений "всё в одном флаконе", типа QueueUserWorkItemможет_не_работать даже для минимального кол-ва потоков WT_EXECUTEINPERSISTENTTHREAD, если после посылки элемента queue в очередь IOCP не переключать контекст ::SwitchToThread(), типа:
static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
{
// Pack the publisher message
ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
...
// Queue message to the IOCP
if (!::QueueUserWorkItem(Router::RouterWorkItemProc, (PVOID) topicData, WT_EXECUTEINPERSISTENTTHREAD /*1 thread*/ ) )
delete topicData;
...
return E_FAIL;
}
...
::SwitchToThread();
return S_OK;
}
Причина в том, FIFO запросов IOCP забивается полностью. Реально при 1000 элементах очереди производительность снижается в разы. При 10 000 и более - может и reboot компа понадобтся. Увеличение числа потоков на порту (WT_EXECUTEDEFAULT, WT_EXECUTELONGFUNCTION) только ухудшат ситуацию, если посылки в порт запроса не переключиться ::SwitchToThread().
2) Использование контроля количества потоков и размера очереди запросов IOCP () несомненно улучшает ситуацию. Трюк в том, что можно переключаться ::SwitchToThread() не каждый раз после посылки очередного месаджа, когда очередь достигает N - элементов. Реально, у меня получается оптимальное число N ~ 100-250 и должно быть разным на разном хардваре:
static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
{
// Pack the publisher message
ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
...
// Queue message to the IOCP
if (!::PostQueuedCompletionStatus( g_hIOCP, 0, 0, (OVERLAPPED*)topicData )) {
delete topicData;
...
return E_FAIL;
}
...
if (GetIoCompletionQueueLen(g_hIOCP) > 250)
::SwitchToThread();
return S_OK;
}
Врядли можно здесь оспаривать главные преимущества - масштабируемость на многопроцессорных компютерах и простота программирования. Однако, это не очень привычный подход для не I/O запросов.
Мне IOCP больше нравится для многопроцессорной обработки, чем самодельный FIFO. Тем более, что "самодельный" нормальный пул потоков (работающий с этой FIFO) с контролем их состояний - очень трудная задача.
Спасибо, Игорь за мнение. С Новым Годом!
|
|
а использование windows messages принципиально? 28.12.07 10:49
Автор: dl <Dmitry Leonov>
|
> По поводу первой проблемы - смутно мелькает такая мысль: > данные Publisher копировать в преаллокированный глобальный > массив, а в поток передавать индекс элемента этого массива. > Массив никогда не очищать и не освобождать. Для > синхронизации сообщений дополнительно использовать вектор > индексов "свободных фреймов" памяти. > По поводу второй проблемы - не мелькает никакая мысль, даже > смутно.
Если с сообщением будет передаваться только индекс очередного элемента, то может быть это оформить как event-объект ядра, который только фиксирует факт увеличения индекса. И даже если не хватит времени обрабатывать каждое событие, если видно, что индекс увеличился на n по сравнению с прошлым разом, эти n сообщений и обрабатывать.
|
| |
Использование windows messages не принципиально на мой... 31.12.07 07:22
Автор: void <Grebnev Valery> Статус: Elderman
|
Использование windows messages не принципиально на мой взгляд. Это просто существующий дизайн, к которому привыкли местные. Если зайдёт в тупик, то буду отстаивать другие решения (пока не решил какие).
> Если с сообщением будет передаваться только индекс > очередного элемента, то может быть это оформить как > event-объект ядра, который только фиксирует факт увеличения > индекса.
Поясни, пожалуйта, детали. Просто чувствую, что здесь может быть очень неплохое зерно.
> И даже если не хватит времени обрабатывать каждое > событие, если видно, что индекс увеличился на n по > сравнению с прошлым разом, эти n сообщений и обрабатывать.
Если правильно понял тебя, то ты попал в яблочка. Последние обновления - самые важные. Их и надо обрабатывать. Если по какой-то причине пропущены предудущие месаджи - не беда.
Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом году!
|
| | |
примерно то же, что говорилось в других ответах 31.12.07 12:50
Автор: dl <Dmitry Leonov>
|
> > Если с сообщением будет передаваться только индекс > > очередного элемента, то может быть это оформить как > > event-объект ядра, который только фиксирует факт > увеличения > > индекса. > Поясни, пожалуйта, детали. Просто чувствую, что здесь может > быть очень неплохое зерно. > > И даже если не хватит времени обрабатывать каждое > > событие, если видно, что индекс увеличился на n по > > сравнению с прошлым разом, эти n сообщений и > обрабатывать. > Если правильно понял тебя, то ты попал в яблочка. Последние > обновления - самые важные. Их и надо обрабатывать. Если по > какой-то причине пропущены предудущие месаджи - не беда.
Использовать для передачи собственно данных некую очередь в глобальной или разделяемой (если общение между разными процессами) памяти. С сообщением передавать индекс очередного элемента этой очереди - либо в данных сообщения (если это windows message), либо опять же через глобальную/разделяемую память (если объект ядра). Хранить индекс последнего обработанного сообщения, при приходе очередного проверять, случились ли пробелы в обработке - если передавать индекс новой порции данных через глобальную/разделяемую память, на обработку всегда будет поступать последний из переданных. А дальше уже решать, что делать с пропущенными и на какую глубину.
> Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом > году!
Аналогично :)
|
|
|