Легенда:
   новое сообщение
    закрытая нитка
    новое сообщение
    в закрытой нитке
    старое сообщение
         
		 | 
- Напоминаю, что масса вопросов по функционированию форума снимается после прочтения его описания.
 - Новичкам также крайне полезно ознакомиться с данным документом.
   
  |   | 
Использование windows messages не принципиально на мой...  31.12.07 07:22  Число просмотров: 5009
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
Использование windows messages не принципиально на мой взгляд. Это просто существующий дизайн, к которому привыкли местные. Если зайдёт в тупик, то буду отстаивать другие решения (пока не решил какие).
 
 > Если с сообщением будет передаваться только индекс > очередного элемента, то может быть это оформить как > event-объект ядра, который только фиксирует факт увеличения > индекса. 
 Поясни, пожалуйта, детали. Просто чувствую, что здесь может быть очень неплохое зерно.
 
 > И даже если не хватит времени обрабатывать каждое > событие, если видно, что индекс увеличился на n по > сравнению с прошлым разом, эти n сообщений и обрабатывать. 
 Если правильно понял тебя, то ты попал в яблочка. Последние обновления - самые важные. Их и надо обрабатывать. Если по какой-то причине пропущены предудущие месаджи - не беда.
 
 Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом году!
 | 
 
| 
<programming>
 |  
 
[Win32] Две проблемы с передачей сообщения thred-у  28.12.07 09:20  
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
Подскажите пожалуйста бест практис о следующем.
 Приложение Publisher посылает обновление приложению Router. Roter пересылает это сообщение вначале в свой workthread (где сообщение обрабатывается), и далее пересылается приложению Subscriber:
 
 	HRESULT Router::OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 	{
 		// Pack the publisher message
 		ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 		memmove(topicData->m_data, data, len);
 		
 		// Queue message to the Router thread
 		if (!::PostThreadMessage(m_RouterMsgThreadID, WM_USER_ON_PUBLISHER_PUSH, 0, (LPARAM) topicData ) )
 		{
 			DWORD err = GetLastError();
 			delete topicData;
 			if ( ERROR_NOT_ENOUGH_QUOTA == err ) {
 				std::cout << "\nERROR_NOT_ENOUGH_QUOTA" << std::endl;
 				g_cnt_lost ++;
 				//::SwitchToThread();
 			}
 			return E_FAIL;
 		}
 		//::Sleep(0);
 		return S_OK;
 	}
 
 В рабочем потоке это сообщение, topicData, распаковывается в transmittedTopic,  обрабатывается и удаляется delete transmittedTopic:
 
 	DWORD WINAPI Router::RouterMessageThreadProc( LPVOID lpParameter )
 	{
 		MSG msg;
 		// Create the message queue
 		...
 		...
 		BOOL bRet;  
 		while( (bRet = ::GetMessage( &msg, NULL, 0, 0 )) != 0)
 		{ 
 			if (bRet != -1 error) 
 			{
 				if (WM_TIMER ==  msg.message) {
 					...
 					...
 					}
 				}
 				else if (WM_USER_ON_PUBLISHER_PUSH == msg.message) {
 					//Unpack the message
 					ConcreteTopicA* transmittedTopic = (ConcreteTopicA*) msg.lParam;
 
 					// TO DO
 					...
 					...
 
 					delete transmittedTopic;
 					...
 					...
 					...
 
 Первая проблема, в том, что для обработки сообщения внутри RouterMessageThreadProc надо:
 1) создать  структуру ConcreteTopicA* topicData = new ConcreteTopicA
 2) скопировать данные memmove из transmittedTopic для последующей обработки
 3) удалить структуру delete transmittedTopic;
 Всё это (new, memmove, delete) может быть накладно и долго как интерфейса Publisher, так и для самого Router. Кроме того, поскольку Publisher может весьма интенсивно обновлять данные, это может  фрагментировать память и снизить общий перформанс.
 
 
 Вторая проблема в том, что при интенсивной посылке сообщений в поток, часть сообщений теряется из очереди из-за её переполнения (GetLastError ==ERROR_NOT_ENOUGH_QUOTA ). Теряется приблизительно 50-80 сообщений из 1000 000. Изменение, размера очереди потока по умолчанию (10000 для W2k/XP/W2k3) - не подходит.  Кроме того? при переполнении очереди появляется другая, вполне ожидаемая и более существенная проблема - сообщения по таймеру потока (WM_TIMER) вообще теряют всякий смысл.
 
 По поводу первой проблемы - смутно мелькает такая мысль: данные Publisher копировать в преаллокированный глобальный массив, а в поток передавать индекс элемента этого массива. Массив никогда не очищать и не освобождать. Для синхронизации сообщений дополнительно использовать вектор индексов "свободных фреймов"  памяти.
 
 По поводу второй проблемы - не мелькает никакая мысль, даже смутно.
 
 Может ли кто-нибудь помочь советом, примером? 
 Спасибо.
 | 
 
 
  | 
По поводу new/delete самого ConcreteTopic-а: можно...  28.12.07 15:22  
 Автор: amirul <Serge> Статус: The Elderman
 | 
 
> Первая проблема, в том, что для обработки сообщения внутри > RouterMessageThreadProc надо: > 1) создать  структуру ConcreteTopicA* topicData = new > ConcreteTopicA > 2) скопировать данные memmove из transmittedTopic для > последующей обработки > 3) удалить структуру delete transmittedTopic; > Всё это (new, memmove, delete) может быть накладно и долго > как интерфейса Publisher, так и для самого Router. Кроме > того, поскольку Publisher может весьма интенсивно обновлять > данные, это может  фрагментировать память и снизить общий > перформанс. 
 По поводу new/delete самого ConcreteTopic-а: можно организовать пул (пример пула насколько я помню есть у страуструпа в его C++ Programming Language). Идея в том, чтобы выделить некоторое количество памяти, порезать ее на одинаковые куски и связать куски в односвязный список. Выделение и освобождение памяти в этом пуле - простейшие операции добавления и удаления элемента в голову списка и имеют сложность O(const). Если надо поддержка увеличения, то можно выделять chunk-ами по. В принципе и писать то ничего не надо, все уже есть в boost::pool и в Loki::SmallObjAllocator. Причем решается одновременно и проблема оверхеда по времени и проблема фрагментации памяти.
 
 Что до memmove, то я бы предпочел использовать какую нибудь схему передачи владения (или совместного владения): std::auto_ptr, boost::shared_ptr, boost::intrusive_ptr, Loki::SmartPtr и т.п.. Вместо копирования передать владение (выделить в одном месте, удалить в другом).
 
 > Вторая проблема в том, что при интенсивной посылке > сообщений в поток, часть сообщений теряется из очереди > из-за её переполнения (GetLastError > ==ERROR_NOT_ENOUGH_QUOTA ). Теряется приблизительно 50-80 > сообщений из 1000 000. Изменение, размера очереди потока по > умолчанию (10000 для W2k/XP/W2k3) - не подходит.  Кроме > того? при переполнении очереди появляется другая, вполне > ожидаемая и более существенная проблема - сообщения по > таймеру потока (WM_TIMER) вообще теряют всякий смысл. 
 А это обязательно использовать стандартные виндовые сообщения?
 Как уже написали, самопальная FIFO (std::list или даже std::deque) будет гораздо эффективнее и при этом будет уверенность, что потерявшихся сообщений не будет.
 | 
 
 
  |   | 
Не стал пока делать  20.01.08 05:31  
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
Не стал пока делать 
 " ...схему передачи владения (или совместного владения):...", равно как и использованье "стандартных" memory pool. Сделал простой массив назеранных кусков преадллокированного буфера.
 Возможо, что написанный класс CMemoryArray далеко неоптимален, но  получилось, наверное, то что и должно было получиться. Увеличение производительности не более 3%. 
 Три процента, всё же лучше чем ничего, хотя вцелом код с использование CMemoryArray получается более корявым по сравнению с new/delete.
 | 
 
 
  |   | 
Приблизительно это я имел ввиду. У Страуструпа не помню...  31.12.07 07:01  
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
> По поводу new/delete самого ConcreteTopic-а: можно > организовать пул (пример пула насколько я помню есть у > страуструпа в его C++ Programming Language). Идея в том, > чтобы выделить некоторое количество памяти, порезать ее на > одинаковые куски и связать куски в односвязный список. > Выделение и освобождение памяти в этом пуле - простейшие > операции добавления и удаления элемента в голову списка и > имеют сложность O(const). 
 Приблизительно это я имел ввиду. У Страуструпа не помню такого. Попробую в инете поискать достойные прототипы. В принципе ясно, как это должно бы работать.
 
 > Если надо поддержка увеличения, > то можно выделять chunk-ами по.  
 Поддержка увеличения не нужна. По крайне мере так видится сейчас.
 
 > А это обязательно использовать стандартные виндовые > сообщения? > Как уже написали, самопальная FIFO (std::list или даже > std::deque) будет гораздо эффективнее и при этом будет > уверенность, что потерявшихся сообщений не будет. 
 Не обязательно в принципе. Но в этом проекте, другие девелоперы используют виндовую очередь. Я только делаю там свой кусок. Так сложилось исторически (попросту был копипастнут кусок майкрософтовского примера RTD сервера). То, что и в других критических по производительности проектах виндовая очередь ведёт себя плохо - это факт. Местная публика лечит это увеличением размера очереди до 100 000. На мой вкус это лажа - приложение делает вид, что работает. Масштабируемости 0. Мульпроцессорной обработки - думаю, не больше. 
 
 Отчасти  (как временное решение ?) можно подлечить проблему потери сообщений, если после посылки сообщения переключать контекст потока ::SwitchToThread(), см. ниже. Тогда в рабочем потоке мы "успеваем" вытащить элемент до переполнения очереди. По крайней мере в стресс-тестах (~20000 месаджей в секунду) сообщения не теряются:
 
 HRESULT Router::OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 	{
 		// Pack the publisher message
 		ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 		...
 		if (!::PostThreadMessage(g_RouterMsgThreadID, WM_USER_ON_PUBLISHER_PUSH, 0, (LPARAM) topicData ) )   {
 			delete topicData;
 			...
 			return E_FAIL;
 		}
 		::SwitchToThread();
 		return S_OK;
 	}
 
 Здесь две проблемы: как сам понимаешь, "улучшение синхронизации" при помощи Sleep(0) - полная лажа и никаких гарантий на будущее, поскольку проблема по существу не решается.
 Вторая проблема - интерфейс Router::OnPublisherPush может быть синхроный (он таковым сейчас пока и является COM/DCOM). Поэтому, решение со ::SwitchToThread() до завершения функции Router::OnPublisherPush будет держать Publisher. А это пострашнее любых задержек и потерь месаджей на Router-е.  Publisher выполняет основную работу в системе.
 Короче, надо тестировать всю систему, чтобы сказать можно ли использовать ::SwitchToThread(), или нет с точки зрения производительности приложения Publisher.
 
 Спасибо тебе за мнение, Сергей.
 С Новым Годом! 
 | 
 
 
  |   |   | 
У Страуструпа это было в 19.4.2 Распределители памяти,...  02.01.08 14:38  
 Автор: amirul <Serge> Статус: The Elderman
 | 
 
> Приблизительно это я имел ввиду. У Страуструпа не помню > такого. Попробую в инете поискать достойные прототипы. В > принципе ясно, как это должно бы работать. 
 У Страуструпа это было в 19.4.2 Распределители памяти, определяемые пользователем, а достойные прототипы я уже привел: в частности я бы использовал boost::pool
 | 
 
 
  |   |   | 
Уж лучше самодельная очередь...  02.01.08 07:36  
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
> Здесь две проблемы: как сам понимаешь, "улучшение > синхронизации" при помощи Sleep(0) - полная лажа и никаких > гарантий на будущее, поскольку проблема по существу не > решается. 
 Смешно... Получается с небольшой потерей производительности (~7%) защитить виндовую очередь от переполнения при помощи семафора, инициализируемого ~ размером очереди из реестра виндовз. Такой вот конгломерат - очередь виндовз с семафором ;))
 
 | 
 
 
  | 
Для передачи сообщений потокам хорошо использовать IOCP или...  28.12.07 11:51  
 Автор: IgorR <Igor Razin> Статус: Member
 | 
 
| 
Для передачи сообщений потокам хорошо использовать IOCP или самодельную FIFO очередь.
 | 
 
 
  |   | 
В части самодельной FIFO - ну что тут скажешь ... понятно.  31.12.07 08:11  
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
> Для передачи сообщений потокам хорошо использовать IOCP или > самодельную FIFO очередь. В части самодельной FIFO - ну что тут скажешь ... понятно.
 
 В части IOCP для не I/O запросов. Поправьте, если я не прав (я слабый программист) -
 на мой взгляд, это наиболее простое и ясное программирование для Windows для тех задач, где это можно применить. Есть и подводные камни и вопросы, которые придётся отстаивать:
 
 1) для производительных систем (более 50 000 не I/O запросов в секунду, до 100 000) использование  простых решений "всё в одном флаконе", типа QueueUserWorkItemможет_не_работать даже для минимального кол-ва потоков WT_EXECUTEINPERSISTENTTHREAD, если после посылки элемента queue в очередь IOCP  не переключать контекст ::SwitchToThread(), типа:
 
 	static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 	{
 		// Pack the publisher message
 		ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 		...		
 		// Queue message to the IOCP
 		if (!::QueueUserWorkItem(Router::RouterWorkItemProc, (PVOID) topicData, WT_EXECUTEINPERSISTENTTHREAD /*1 thread*/ ) ) 
 			delete topicData;
 			...
 			return E_FAIL;
 		}
 		...
 		::SwitchToThread();
 		return S_OK;
 	}
 
 Причина в том, FIFO запросов IOCP забивается полностью. Реально при 1000 элементах очереди производительность снижается в разы. При 10 000 и более - может и reboot компа понадобтся. Увеличение числа потоков на порту (WT_EXECUTEDEFAULT, WT_EXECUTELONGFUNCTION) только ухудшат ситуацию, если посылки в порт запроса не переключиться ::SwitchToThread(). 
 
 2) Использование контроля количества потоков и размера очереди запросов IOCP () несомненно улучшает ситуацию. Трюк в том, что можно переключаться ::SwitchToThread() не каждый раз после посылки очередного месаджа, когда очередь достигает N - элементов. Реально, у меня получается  оптимальное число N ~ 100-250  и должно быть разным на разном хардваре:
 
 	static HRESULT OnPublisherPush(const string& skey1, const string& skey2, long key3, char* data, size_t len )
 	{
 		// Pack the publisher message
 		ConcreteTopicA* topicData = new ConcreteTopicA(INVALIDE_TOPIC_ID, skey1, skey2, key3);
 		...		
 		// Queue message to the IOCP
 		if (!::PostQueuedCompletionStatus( g_hIOCP, 0, 0, (OVERLAPPED*)topicData ))  {
 
 			delete topicData;
 			...
 			return E_FAIL;
 		}
 		...
 
 		if (GetIoCompletionQueueLen(g_hIOCP) > 250)
 		       ::SwitchToThread();
 		return S_OK;
 	}
 
 Врядли можно здесь оспаривать главные преимущества - масштабируемость на многопроцессорных компютерах и простота программирования. Однако, это не очень привычный подход для не I/O запросов. 
 
 Мне IOCP  больше нравится для многопроцессорной обработки, чем самодельный FIFO. Тем более, что "самодельный" нормальный пул потоков (работающий с этой FIFO) с контролем их состояний - очень трудная задача.
 
 Спасибо, Игорь за мнение. С Новым Годом!
 | 
 
 
  | 
а использование windows messages принципиально?  28.12.07 10:49  
 Автор: dl <Dmitry Leonov> 
 | 
 
> По поводу первой проблемы - смутно мелькает такая мысль: > данные Publisher копировать в преаллокированный глобальный > массив, а в поток передавать индекс элемента этого массива. > Массив никогда не очищать и не освобождать. Для > синхронизации сообщений дополнительно использовать вектор > индексов "свободных фреймов"  памяти. > По поводу второй проблемы - не мелькает никакая мысль, даже > смутно. 
 
 Если с сообщением будет передаваться только индекс очередного элемента, то может быть это оформить как event-объект ядра, который только фиксирует факт увеличения индекса. И даже если не хватит времени обрабатывать каждое событие, если видно, что индекс увеличился на n по сравнению с прошлым разом, эти n сообщений и обрабатывать.
 | 
 
 
  |   | 
Использование windows messages не принципиально на мой...  31.12.07 07:22  
 Автор: void <Grebnev Valery> Статус: Elderman
 | 
 
Использование windows messages не принципиально на мой взгляд. Это просто существующий дизайн, к которому привыкли местные. Если зайдёт в тупик, то буду отстаивать другие решения (пока не решил какие).
 
 > Если с сообщением будет передаваться только индекс > очередного элемента, то может быть это оформить как > event-объект ядра, который только фиксирует факт увеличения > индекса. 
 Поясни, пожалуйта, детали. Просто чувствую, что здесь может быть очень неплохое зерно.
 
 > И даже если не хватит времени обрабатывать каждое > событие, если видно, что индекс увеличился на n по > сравнению с прошлым разом, эти n сообщений и обрабатывать. 
 Если правильно понял тебя, то ты попал в яблочка. Последние обновления - самые важные. Их и надо обрабатывать. Если по какой-то причине пропущены предудущие месаджи - не беда.
 
 Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом году!
 | 
 
 
  |   |   | 
примерно то же, что говорилось в других ответах  31.12.07 12:50  
 Автор: dl <Dmitry Leonov> 
 | 
 
> > Если с сообщением будет передаваться только индекс > > очередного элемента, то может быть это оформить как > > event-объект ядра, который только фиксирует факт > увеличения > > индекса. > Поясни, пожалуйта, детали. Просто чувствую, что здесь может > быть очень неплохое зерно. > > И даже если не хватит времени обрабатывать каждое > > событие, если видно, что индекс увеличился на n по > > сравнению с прошлым разом, эти n сообщений и > обрабатывать. > Если правильно понял тебя, то ты попал в яблочка. Последние > обновления - самые важные. Их и надо обрабатывать. Если по > какой-то причине пропущены предудущие месаджи - не беда. 
 Использовать для передачи собственно данных некую очередь в глобальной или разделяемой (если общение между разными процессами) памяти. С сообщением передавать индекс очередного элемента этой очереди - либо в данных сообщения (если это windows message), либо опять же через глобальную/разделяемую память (если объект ядра). Хранить индекс последнего обработанного сообщения, при приходе очередного проверять, случились ли пробелы в обработке - если передавать индекс новой порции данных через глобальную/разделяемую память, на обработку всегда будет поступать последний из переданных. А дальше уже решать, что делать с пропущенными и на какую глубину.
 
 > Спасибо. С Новым годом! Всех тебе благ, Дмитрий, в новом > году! 
 Аналогично :)
 | 
 
 
  
 
 | 
 |