| 
 
 
 
 Легенда:
  новое сообщение 
  закрытая нитка 
  новое сообщение 
  в закрытой нитке 
  старое сообщение   | 
Напоминаю, что масса вопросов по функционированию форума снимается после прочтения его описания.
Новичкам также крайне полезно ознакомиться с данным документом.
|  | Для передачи сообщений потокам хорошо использовать IOCP или...  28.12.07 11:51  Число просмотров: 5216 Автор: IgorR <Igor Razin> Статус: Member
 |  
| Для передачи сообщений потокам хорошо использовать IOCP или самодельную FIFO очередь. |  | <programming> |  
| [Win32] Две проблемы с передачей сообщения thred-у  28.12.07 09:20 Автор: void <Grebnev Valery> Статус: Elderman
 |  
| Подскажите пожалуйста бест практис о следующем. Приложение Publisher посылает обновление приложению Router. Roter пересылает это сообщение вначале в свой workthread (где сообщение обрабатывается), и далее пересылается приложению Subscriber:
 
 HRESULT Router::OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 {
 // Pack the publisher message
 ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 memmove(topicData->m_data, data, len);
 
 // Queue message to the Router thread
 if (!::PostThreadMessage(m_RouterMsgThreadID, WM_USER_ON_PUBLISHER_PUSH, 0, (LPARAM) topicData ) )
 {
 DWORD err = GetLastError();
 delete topicData;
 if ( ERROR_NOT_ENOUGH_QUOTA == err ) {
 std::cout << "\nERROR_NOT_ENOUGH_QUOTA" << std::endl;
 g_cnt_lost ++;
 //::SwitchToThread();
 }
 return E_FAIL;
 }
 //::Sleep(0);
 return S_OK;
 }
 
 В рабочем потоке это сообщение, topicData, распаковывается в transmittedTopic,  обрабатывается и удаляется delete transmittedTopic:
 
 DWORD WINAPI Router::RouterMessageThreadProc( LPVOID lpParameter )
 {
 MSG msg;
 // Create the message queue
 ...
 ...
 BOOL bRet;
 while( (bRet = ::GetMessage( &msg, NULL, 0, 0 )) != 0)
 {
 if (bRet != -1 error)
 {
 if (WM_TIMER ==  msg.message) {
 ...
 ...
 }
 }
 else if (WM_USER_ON_PUBLISHER_PUSH == msg.message) {
 //Unpack the message
 ConcreteTopicA* transmittedTopic = (ConcreteTopicA*) msg.lParam;
 
 // TO DO
 ...
 ...
 
 delete transmittedTopic;
 ...
 ...
 ...
 
 Первая проблема, в том, что для обработки сообщения внутри RouterMessageThreadProc надо:
 1) создать  структуру ConcreteTopicA* topicData = new ConcreteTopicA
 2) скопировать данные memmove из transmittedTopic для последующей обработки
 3) удалить структуру delete transmittedTopic;
 Всё это (new, memmove, delete) может быть накладно и долго как интерфейса Publisher, так и для самого Router. Кроме того, поскольку Publisher может весьма интенсивно обновлять данные, это может  фрагментировать память и снизить общий перформанс.
 
 
 Вторая проблема в том, что при интенсивной посылке сообщений в поток, часть сообщений теряется из очереди из-за её переполнения (GetLastError ==ERROR_NOT_ENOUGH_QUOTA ). Теряется приблизительно 50-80 сообщений из 1000 000. Изменение, размера очереди потока по умолчанию (10000 для W2k/XP/W2k3) - не подходит.  Кроме того? при переполнении очереди появляется другая, вполне ожидаемая и более существенная проблема - сообщения по таймеру потока (WM_TIMER) вообще теряют всякий смысл.
 
 По поводу первой проблемы - смутно мелькает такая мысль: данные Publisher копировать в преаллокированный глобальный массив, а в поток передавать индекс элемента этого массива. Массив никогда не очищать и не освобождать. Для синхронизации сообщений дополнительно использовать вектор индексов "свободных фреймов"  памяти.
 
 По поводу второй проблемы - не мелькает никакая мысль, даже смутно.
 
 Может ли кто-нибудь помочь советом, примером?
 Спасибо.
 |  
|  | По поводу new/delete самого ConcreteTopic-а: можно...  28.12.07 15:22 Автор: amirul <Serge> Статус: The Elderman
 |  
| > Первая проблема, в том, что для обработки сообщения внутри > RouterMessageThreadProc надо:
 > 1) создать  структуру ConcreteTopicA* topicData = new
 > ConcreteTopicA
 > 2) скопировать данные memmove из transmittedTopic для
 > последующей обработки
 > 3) удалить структуру delete transmittedTopic;
 > Всё это (new, memmove, delete) может быть накладно и долго
 > как интерфейса Publisher, так и для самого Router. Кроме
 > того, поскольку Publisher может весьма интенсивно обновлять
 > данные, это может  фрагментировать память и снизить общий
 > перформанс.
 
 По поводу new/delete самого ConcreteTopic-а: можно организовать пул (пример пула насколько я помню есть у страуструпа в его C++ Programming Language). Идея в том, чтобы выделить некоторое количество памяти, порезать ее на одинаковые куски и связать куски в односвязный список. Выделение и освобождение памяти в этом пуле - простейшие операции добавления и удаления элемента в голову списка и имеют сложность O(const). Если надо поддержка увеличения, то можно выделять chunk-ами по. В принципе и писать то ничего не надо, все уже есть в boost::pool и в Loki::SmallObjAllocator. Причем решается одновременно и проблема оверхеда по времени и проблема фрагментации памяти.
 
 Что до memmove, то я бы предпочел использовать какую нибудь схему передачи владения (или совместного владения): std::auto_ptr, boost::shared_ptr, boost::intrusive_ptr, Loki::SmartPtr и т.п.. Вместо копирования передать владение (выделить в одном месте, удалить в другом).
 
 > Вторая проблема в том, что при интенсивной посылке
 > сообщений в поток, часть сообщений теряется из очереди
 > из-за её переполнения (GetLastError
 > ==ERROR_NOT_ENOUGH_QUOTA ). Теряется приблизительно 50-80
 > сообщений из 1000 000. Изменение, размера очереди потока по
 > умолчанию (10000 для W2k/XP/W2k3) - не подходит.  Кроме
 > того? при переполнении очереди появляется другая, вполне
 > ожидаемая и более существенная проблема - сообщения по
 > таймеру потока (WM_TIMER) вообще теряют всякий смысл.
 
 А это обязательно использовать стандартные виндовые сообщения?
 Как уже написали, самопальная FIFO (std::list или даже std::deque) будет гораздо эффективнее и при этом будет уверенность, что потерявшихся сообщений не будет.
 |  
|  |  | Не стал пока делать  20.01.08 05:31 Автор: void <Grebnev Valery> Статус: Elderman
 |  
| Не стал пока делать " ...схему передачи владения (или совместного владения):...", равно как и использованье "стандартных" memory pool. Сделал простой массив назеранных кусков преадллокированного буфера.
 Возможо, что написанный класс CMemoryArray далеко неоптимален, но  получилось, наверное, то что и должно было получиться. Увеличение производительности не более 3%.
 Три процента, всё же лучше чем ничего, хотя вцелом код с использование CMemoryArray получается более корявым по сравнению с new/delete.
 |  
|  |  | Приблизительно это я имел ввиду. У Страуструпа не помню...  31.12.07 07:01 Автор: void <Grebnev Valery> Статус: Elderman
 |  
| > По поводу new/delete самого ConcreteTopic-а: можно > организовать пул (пример пула насколько я помню есть у
 > страуструпа в его C++ Programming Language). Идея в том,
 > чтобы выделить некоторое количество памяти, порезать ее на
 > одинаковые куски и связать куски в односвязный список.
 > Выделение и освобождение памяти в этом пуле - простейшие
 > операции добавления и удаления элемента в голову списка и
 > имеют сложность O(const).
 
 Приблизительно это я имел ввиду. У Страуструпа не помню такого. Попробую в инете поискать достойные прототипы. В принципе ясно, как это должно бы работать.
 
 > Если надо поддержка увеличения,
 > то можно выделять chunk-ами по.
 
 Поддержка увеличения не нужна. По крайне мере так видится сейчас.
 
 > А это обязательно использовать стандартные виндовые
 > сообщения?
 > Как уже написали, самопальная FIFO (std::list или даже
 > std::deque) будет гораздо эффективнее и при этом будет
 > уверенность, что потерявшихся сообщений не будет.
 
 Не обязательно в принципе. Но в этом проекте, другие девелоперы используют виндовую очередь. Я только делаю там свой кусок. Так сложилось исторически (попросту был копипастнут кусок майкрософтовского примера RTD сервера). То, что и в других критических по производительности проектах виндовая очередь ведёт себя плохо - это факт. Местная публика лечит это увеличением размера очереди до 100 000. На мой вкус это лажа - приложение делает вид, что работает. Масштабируемости 0. Мульпроцессорной обработки - думаю, не больше.
 
 Отчасти  (как временное решение ?) можно подлечить проблему потери сообщений, если после посылки сообщения переключать контекст потока ::SwitchToThread(), см. ниже. Тогда в рабочем потоке мы "успеваем" вытащить элемент до переполнения очереди. По крайней мере в стресс-тестах (~20000 месаджей в секунду) сообщения не теряются:
 
 HRESULT Router::OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 {
 // Pack the publisher message
 ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 ...
 if (!::PostThreadMessage(g_RouterMsgThreadID, WM_USER_ON_PUBLISHER_PUSH, 0, (LPARAM) topicData ) )   {
 delete topicData;
 ...
 return E_FAIL;
 }
 ::SwitchToThread();
 return S_OK;
 }
 
 Здесь две проблемы: как сам понимаешь, "улучшение синхронизации" при помощи Sleep(0) - полная лажа и никаких гарантий на будущее, поскольку проблема по существу не решается.
 Вторая проблема - интерфейс Router::OnPublisherPush может быть синхроный (он таковым сейчас пока и является COM/DCOM). Поэтому, решение со ::SwitchToThread() до завершения функции Router::OnPublisherPush будет держать Publisher. А это пострашнее любых задержек и потерь месаджей на Router-е.  Publisher выполняет основную работу в системе.
 Короче, надо тестировать всю систему, чтобы сказать можно ли использовать ::SwitchToThread(), или нет с точки зрения производительности приложения Publisher.
 
 Спасибо тебе за мнение, Сергей.
 С Новым Годом!
 |  
|  |  |  | У Страуструпа это было в 19.4.2 Распределители памяти,...  02.01.08 14:38 Автор: amirul <Serge> Статус: The Elderman
 |  
| > Приблизительно это я имел ввиду. У Страуструпа не помню > такого. Попробую в инете поискать достойные прототипы. В
 > принципе ясно, как это должно бы работать.
 
 У Страуструпа это было в 19.4.2 Распределители памяти, определяемые пользователем, а достойные прототипы я уже привел: в частности я бы использовал boost::pool
 |  
|  |  |  | Уж лучше самодельная очередь...  02.01.08 07:36 Автор: void <Grebnev Valery> Статус: Elderman
 |  
| > Здесь две проблемы: как сам понимаешь, "улучшение > синхронизации" при помощи Sleep(0) - полная лажа и никаких
 > гарантий на будущее, поскольку проблема по существу не
 > решается.
 
 Смешно... Получается с небольшой потерей производительности (~7%) защитить виндовую очередь от переполнения при помощи семафора, инициализируемого ~ размером очереди из реестра виндовз. Такой вот конгломерат - очередь виндовз с семафором ;))
 
 |  
|  | Для передачи сообщений потокам хорошо использовать IOCP или...  28.12.07 11:51 Автор: IgorR <Igor Razin> Статус: Member
 |  
| Для передачи сообщений потокам хорошо использовать IOCP или самодельную FIFO очередь. |  
|  |  | В части самодельной FIFO - ну что тут скажешь ... понятно.  31.12.07 08:11 Автор: void <Grebnev Valery> Статус: Elderman
 |  
| > Для передачи сообщений потокам хорошо использовать IOCP или > самодельную FIFO очередь.
 В части самодельной FIFO - ну что тут скажешь ... понятно.
 
 В части IOCP для не I/O запросов. Поправьте, если я не прав (я слабый программист) -
 на мой взгляд, это наиболее простое и ясное программирование для Windows для тех задач, где это можно применить. Есть и подводные камни и вопросы, которые придётся отстаивать:
 
 1) для производительных систем (более 50 000 не I/O запросов в секунду, до 100 000) использование  простых решений "всё в одном флаконе", типа QueueUserWorkItemможет_не_работать даже для минимального кол-ва потоков WT_EXECUTEINPERSISTENTTHREAD, если после посылки элемента queue в очередь IOCP  не переключать контекст ::SwitchToThread(), типа:
 
 static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 {
 // Pack the publisher message
 ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 ...
 // Queue message to the IOCP
 if (!::QueueUserWorkItem(Router::RouterWorkItemProc, (PVOID) topicData, WT_EXECUTEINPERSISTENTTHREAD /*1 thread*/ ) )
 delete topicData;
 ...
 return E_FAIL;
 }
 ...
 ::SwitchToThread();
 return S_OK;
 }
 
 Причина в том, FIFO запросов IOCP забивается полностью. Реально при 1000 элементах очереди производительность снижается в разы. При 10 000 и более - может и reboot компа понадобтся. Увеличение числа потоков на порту (WT_EXECUTEDEFAULT, WT_EXECUTELONGFUNCTION) только ухудшат ситуацию, если посылки в порт запроса не переключиться ::SwitchToThread().
 
 2) Использование контроля количества потоков и размера очереди запросов IOCP () несомненно улучшает ситуацию. Трюк в том, что можно переключаться ::SwitchToThread() не каждый раз после посылки очередного месаджа, когда очередь достигает N - элементов. Реально, у меня получается  оптимальное число N ~ 100-250  и должно быть разным на разном хардваре:
 
 static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 {
 // Pack the publisher message
 ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 ...
 // Queue message to the IOCP
 if (!::PostQueuedCompletionStatus( g_hIOCP, 0, 0, (OVERLAPPED*)topicData ))  {
 
 delete topicData;
 ...
 return E_FAIL;
 }
 ...
 
 if (GetIoCompletionQueueLen(g_hIOCP) > 250)
 ::SwitchToThread();
 return S_OK;
 }
 
 Врядли можно здесь оспаривать главные преимущества - масштабируемость на многопроцессорных компютерах и простота программирования. Однако, это не очень привычный подход для не I/O запросов.
 
 Мне IOCP  больше нравится для многопроцессорной обработки, чем самодельный FIFO. Тем более, что "самодельный" нормальный пул потоков (работающий с этой FIFO) с контролем их состояний - очень трудная задача.
 
 Спасибо, Игорь за мнение. С Новым Годом!
 |  
|  | а использование windows messages принципиально?  28.12.07 10:49 Автор: dl <Dmitry Leonov>
 |  
| > По поводу первой проблемы - смутно мелькает такая мысль: > данные Publisher копировать в преаллокированный глобальный
 > массив, а в поток передавать индекс элемента этого массива.
 > Массив никогда не очищать и не освобождать. Для
 > синхронизации сообщений дополнительно использовать вектор
 > индексов "свободных фреймов"  памяти.
 > По поводу второй проблемы - не мелькает никакая мысль, даже
 > смутно.
 
 
 Если с сообщением будет передаваться только индекс очередного элемента, то может быть это оформить как event-объект ядра, который только фиксирует факт увеличения индекса. И даже если не хватит времени обрабатывать каждое событие, если видно, что индекс увеличился на n по сравнению с прошлым разом, эти n сообщений и обрабатывать.
 |  
|  |  | Использование windows messages не принципиально на мой...  31.12.07 07:22 Автор: void <Grebnev Valery> Статус: Elderman
 |  
| Использование windows messages не принципиально на мой взгляд. Это просто существующий дизайн, к которому привыкли местные. Если зайдёт в тупик, то буду отстаивать другие решения (пока не решил какие). 
 > Если с сообщением будет передаваться только индекс
 > очередного элемента, то может быть это оформить как
 > event-объект ядра, который только фиксирует факт увеличения
 > индекса.
 
 Поясни, пожалуйта, детали. Просто чувствую, что здесь может быть очень неплохое зерно.
 
 > И даже если не хватит времени обрабатывать каждое
 > событие, если видно, что индекс увеличился на n по
 > сравнению с прошлым разом, эти n сообщений и обрабатывать.
 
 Если правильно понял тебя, то ты попал в яблочка. Последние обновления - самые важные. Их и надо обрабатывать. Если по какой-то причине пропущены предудущие месаджи - не беда.
 
 Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом году!
 |  
|  |  |  | примерно то же, что говорилось в других ответах  31.12.07 12:50 Автор: dl <Dmitry Leonov>
 |  
| > > Если с сообщением будет передаваться только индекс > > очередного элемента, то может быть это оформить как
 > > event-объект ядра, который только фиксирует факт
 > увеличения
 > > индекса.
 > Поясни, пожалуйта, детали. Просто чувствую, что здесь может
 > быть очень неплохое зерно.
 > > И даже если не хватит времени обрабатывать каждое
 > > событие, если видно, что индекс увеличился на n по
 > > сравнению с прошлым разом, эти n сообщений и
 > обрабатывать.
 > Если правильно понял тебя, то ты попал в яблочка. Последние
 > обновления - самые важные. Их и надо обрабатывать. Если по
 > какой-то причине пропущены предудущие месаджи - не беда.
 
 Использовать для передачи собственно данных некую очередь в глобальной или разделяемой (если общение между разными процессами) памяти. С сообщением передавать индекс очередного элемента этой очереди - либо в данных сообщения (если это windows message), либо опять же через глобальную/разделяемую память (если объект ядра). Хранить индекс последнего обработанного сообщения, при приходе очередного проверять, случились ли пробелы в обработке - если передавать индекс новой порции данных через глобальную/разделяемую память, на обработку всегда будет поступать последний из переданных. А дальше уже решать, что делать с пропущенными и на какую глубину.
 
 > Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом
 > году!
 
 Аналогично :)
 |  
 
 
 |  |