Легенда:
новое сообщение
закрытая нитка
новое сообщение
в закрытой нитке
старое сообщение
|
- Напоминаю, что масса вопросов по функционированию форума снимается после прочтения его описания.
- Новичкам также крайне полезно ознакомиться с данным документом.
|
Простой ответ 20.08.08 11:09 Число просмотров: 2522
Автор: IgorR <Igor Razin> Статус: Member
|
Не надо городить огород и разделять приход данных на синхронный и асинхронный :) И тогда все будет красиво, понятно, сухо и комфортно.
|
<programming>
|
[Win32] простой вопрос по iocp 20.08.08 09:56
Автор: void <Grebnev Valery> Статус: Elderman
|
Хорошо бы получить ответ на один маленький давнишний вопрос... Не задавал его раньше, поскольку и так работает... Есть функция, которая весьма стандартно "вычитывает" данные из overlapped socket (впрочем не важно что это сокет - это может быть любой oveplapped хендл)
bool read(DWORD bytes_to_read)
{
if (m_io_status == IOSTATUS_NOOP|m_io_status == IOSTATUS_COMPLETED)
{
// reset counters and start reading
::ZeroMemory( get_overlapped(), sizeof(OVERLAPPED)); //reset overlapped
m_bytes_done = 0; // how many bytes we've received
m_bytes_left = bytes_to_read; // how many bytes we have left to receive
}
else if (m_io_status == IOSTATUS_PENDING|m_io_status == IOSTATUS_COMPLETED_SYNCHRONOUSLY)
{
DWORD async_done = (DWORD) get_overlapped()->InternalHigh;
m_bytes_done += async_done;
m_bytes_left -= async_done;
if ( 0 == m_bytes_left)
{
// reading is completed
m_io_status = IOSTATUS_COMPLETED;
return true;
}
else if (m_io_status == IOSTATUS_PENDING)
{
// continue reading asynchronous results
return true;
}
else
{
// queue next overlapped operation
}
}
else
{
m_io_status = IOSTATUS_ERROR;
return false;
}
// queue next overlapped operation
if ( read((LPBYTE)m_buffer + m_bytes_done, m_bytes_left) )
{
// completed synchronously
m_io_status = IOSTATUS_COMPLETED_SYNCHRONOUSLY;
return true;
}
else
{
DWORD err = ::GetLastError();
if (err == ERROR_IO_PENDING )
{
// operation will complete in the future
m_io_status = IOSTATUS_PENDING;
return true;
}
else
{
m_io_status = IOSTATUS_ERROR;
return false;
}
}
}
---
Это работает нормально. Хотелось бы чуток лучше - если операция заканчивается синхронно (см. m_io_status = IOSTATUS_COMPLETED_SYNCHRONOUSLY), то получив сразу же ! данные, я их не обрабатываю, поскольку IOCP всё равно поместит соответствующий completion status в очередь.
Хотелось бы сделать что-нить типа (вычитывать данные, которые драйвер вернул синхронно, сразу же)
while ( read((LPBYTE)m_buffer + m_bytes_done, m_bytes_left) )
{
// completed synchronously
m_io_status = IOSTATUS_COMPLETED_SYNCHRONOUSLY;
DWORD async_done = (DWORD) get_overlapped()->InternalHigh;
m_bytes_done += async_done;
m_bytes_left -= async_done;
if ( 0 == m_bytes_left)
{
// reading is completed
m_io_status = IOSTATUS_COMPLETED;
return true;
}
}
DWORD err = ::GetLastError();
if (err == ERROR_IO_PENDING )
{
// operation will complete in the future
m_io_status = IOSTATUS_PENDING;
return true;
}
else
{
m_io_status = IOSTATUS_ERROR;
return false;
}
---
Не работает ... Данные завершения синхронных операций "портятся" последующими вызовами GetQueuedCompletionStatus. Проблема в том, что для операций, которые queued на IOCP, и завершились синхронно, IOCP всё равно поставит соответстуючие issues в свою очередь. Так что следующие вызовы GetQueuedCompletionStatus будут продолжать возвращать данные которые уже были получены.
Это можно побороть?
Спасибо !
|
|
Простой ответ 20.08.08 11:09
Автор: IgorR <Igor Razin> Статус: Member
|
Не надо городить огород и разделять приход данных на синхронный и асинхронный :) И тогда все будет красиво, понятно, сухо и комфортно.
|
| |
Простой ответ... 20.08.08 19:36
Автор: void <Grebnev Valery> Статус: Elderman Отредактировано 21.08.08 03:42 Количество правок: 1
|
> Не надо городить огород и разделять приход данных на > синхронный и асинхронный
Если (WSARecv/ReadFile) возвращают 0 или TRUE, соответственно, для сокета/файла/пайпа который на IOCP, то операция чтения была завершена синхронно, иначе если (DWORD err = WSAGetLastError / GetLastError) == (WSA_IO_PENDING / ERROR_IO_PENDING) - асинхронно. Это функциональность асинхронного IO не зависит от наших желаний - "разделять" или "...не разделять приход данных на синхронный и асинхронный...".
> :) И тогда все будет красиво, понятно, сухо и комфортно.
Тогда будет то, что я написал в первом постинге. Оно работает. Но вместо того, чтобы сразу обработать данные, которые пришли синхронно, придётся получать их из очереди IOCP (это дольше).
В этом был и постинг.
|
| | |
Понятно, что я говорил про логику обработки. 21.08.08 09:02
Автор: IgorR <Igor Razin> Статус: Member
|
> Это функциональность > асинхронного IO не зависит от наших желаний - "разделять" > или "...не разделять приход данных на синхронный и > асинхронный...". Понятно, что я говорил про логику обработки.
> Тогда будет то, что я написал в первом постинге. Оно > работает. Но вместо того, чтобы сразу обработать данные, > которые пришли синхронно, придётся получать их из очереди > IOCP (это дольше). Сильно дольше? Из-за этого ломать всю красивую схему асинхронной работы?
|
| | | |
Нет не сильно, но теоретически дольше. Поскольку FIFO... 21.08.08 09:55
Автор: void <Grebnev Valery> Статус: Elderman
|
> Сильно дольше? Из-за этого ломать всю красивую схему > асинхронной работы? Нет не сильно, но теоретически дольше. Поскольку FIFO запросов IOCP - это по существу очередь (KLIST кажись), то, как говориться - это зависит насколько дольше. Ну и потом, есть старый вопрос, который я всегда оставлял "на потом". Думал, мож кто победил такое.
|
| | | | |
Даже не теоретически, а реально дольше. Но это такие копейки... 21.08.08 10:31
Автор: IgorR <Igor Razin> Статус: Member
|
> Нет не сильно, но теоретически дольше. Даже не теоретически, а реально дольше. Но это такие копейки по сравнению с остальным, что даже говорить как-то. Это в простом случае. А вот такой момент: в одном потоке ты вызываешь WSARecv с синхронным завершением (и логикой обработки сразу), и одновременно в другом у тебя срабатывает GetQueuedCompletionStatus на этот приход. Что делать?
В общем не ломай голову над несуществующей проблемой :)
|
| | | | | |
Не совсем так. Я постараюсь далее показать на цифрах... 22.08.08 09:17
Автор: void <Grebnev Valery> Статус: Elderman Отредактировано 22.08.08 09:28 Количество правок: 2
|
> > Нет не сильно, но теоретически дольше. > Даже не теоретически, а реально дольше. Но это такие > копейки по сравнению с остальным, что даже говорить как-то.
Не совсем так. Я постараюсь далее показать на цифрах. Ключевой поинт - если завершение операции синхронное (в данном случае), то поток может вычитывать данные из системного кэша (которые туда поместил драйвер) без переключения контекста. Это не аргумент?
> Это в простом случае. А вот такой момент: в одном потоке ты > вызываешь WSARecv с синхронным завершением (и логикой > обработки сразу), и одновременно в другом у тебя > срабатывает GetQueuedCompletionStatus на этот приход. Что > делать?
А сам-то что думаешь, что делать? Давай на минуту отвлечёмся от темы топика. Пусть, как я написал в реализации функции read (DWORD bytes_to_read) (см. в начале нитки), все запросы обрабатываются, как асинхронные (т.е имеем стандартное решение, когда количество полученных байт берётся из OVEPLAPPED::InternalHigh по завершению ::GetQueuedCompletionStatus(...), и нет этой ). Пусть мы имеем 4CPU и число конкурерентных потоков разрешённых использовать IOCP - 8. Тогда с большой вероятностью два или более потока (на разных CPU) могут одновременно обрабатывать разные pending issue из очереди IOCP для одно и того же сокета вне всякой последовательности:
1) Клиент посылает пакеты 1Mb + 1 Mb + ...
2) Драйвер получил данные (0.5Mb) и поместил issue в очередь IOCP
3) Поток 1 получил данные (0.5Mb) на ::GetQueuedCompletionStatus(...), начал обработку и .... произошло переключение контекста потока. Ужос !
4) Драйвер получил данные (0.3Mb) и поместил issue в очередь IOCP
5) Поток 8 получил данные (0.3Mb) на ::GetQueuedCompletionStatus(...), начал обработку и .... с радостью продолжил поломав при этом последовательность байт (поток 1 ещё не закончил или даже не начинал). Ужос !
6) Потоки 2,3,4,5,6,7 такого натворят, что только держись.
Чтобы прекратить это безобразие, я бы сделал так (кстати это решает вопрос, что делать, если во время синхронной обработки придёт завершение pending I/O):
static DWORD WINAPI worker_thread_proc(LPVOID lParam)
{
Socket_overlapped_acceptor& acceptor =reinterpret_cast<Socket_overlapped_acceptorgt;(lParam);
DWORD num_bytes = 0;
ULONG_PTR comp_key = 0;
OVERLAPPED* poverlapped = NULL;
while ( false == acceptor.m_exit_pool_thread_flag /* in addition to ::PostQueuedCompletionStatus(m_completion_port, 0, CK_EXIT_THREAD, NULL) for all the threads*/)
{
BOOL fOk = ::GetQueuedCompletionStatus(acceptor.m_completion_port, &num_bytes, &comp_key, &poverlapped, IOCP_TIMEOUT);
if(fOk)
{
...
...
if (comp_key == CK_READ)
{
bool queue_next_overlapped_request = true;
_ASSERT(poverlapped);
Client_proxy_connector* connector = static_cast<Client_proxy_connector*>(poverlapped);
Lock lock(&connector->m_cs);
do
{
// Call of the function "connector->read()" returns false, if there has been an I/O error.
// Read operation returns true in two cases:
// - socket connector has read all the data requested (reading is completed).
// - socket connector has partially read data; there are some data left (I/O pending).
if(false == connector->read(CONNECTOR_PACKET_SIZE))
{
LOG("Failed reading data from the connector.");
acceptor.release_client(connector);
break;
}
else if(true == connector->is_io_completed() )
{
if (connector->is_on_command())
{
// The command is OK. Proceed the command and read next one.
}
else
{
LOG("Error: invalid command.");
acceptor.release_client(connector);
break;
}
}
else
{
// IO pending. Wait for IO completion on ::GetQueuedCompletionStatus(...)
queue_next_overlapped_request = false;
}
}
while (true == queue_next_overlapped_request);
}
...
...
}
else
{
// TO DO on error
}
}
return (0);
}
---
Поправь, если я не прав.
> В общем не ломай голову над несуществующей проблемой :)
Немного поломаю що :( Я не могу кансельнуть дубликаты issues на IOCP для операций которые завершились синхронно. ::CancelIo((HANDLE) m_socket) не помогает.
|
| | | | | | |
Ээээ.. Чо? :) Есть 2 буфера - драйвера и твой,... 22.08.08 11:09
Автор: IgorR <Igor Razin> Статус: Member
|
> Ключевой поинт - если завершение операции синхронное (в > данном случае), то поток может вычитывать данные из > системного кэша (которые туда поместил драйвер) без > переключения контекста. Ээээ.. Чо? :) Есть 2 буфера - драйвера и твой, пользовательский (я за сокеты говорю). При любом завершении данные из сокетного копируются в твой (если сокетный не установлен в 0), из которого ты и будешь брать. А уж переключится контекст или нет - оно неведомо в любом случае. Не понимаю, где ты тут выиграть хочешь?
> Тогда с большой вероятностью два или > более потока (на разных CPU) могут одновременно > обрабатывать разные pending issue из очереди IOCP для одно > и того же сокета вне всякой > последовательности: Ёптель. Ты несколько раз подряд WSARecv вызываешь что-ли? Если да, то ничем помочь не могу, я извращениями не занимаюсь ;) Если нет, то, конечно, никаких таких проблем нет. Кстати, для одновременных запросов WSARecv вполне логично их нумеровать, таким образом восстанавливая последовательность (но это, повторюсь, извращение).
|
| | | | | | | |
Возможно твоя критика верна... 22.08.08 22:01
Автор: void <Grebnev Valery> Статус: Elderman
|
Возможно твоя критика верна...
> > Ключевой поинт - если завершение операции синхронное > (в > > данном случае), то поток может вычитывать данные из > > системного кэша (которые туда поместил драйвер) без > > переключения контекста. > Ээээ.. Чо? :) Есть 2 буфера - драйвера и твой, > пользовательский (я за сокеты говорю). При любом завершении > данные из сокетного копируются в твой (если сокетный не > установлен в 0), из которого ты и будешь брать. А уж > переключится контекст или нет - оно неведомо в любом > случае. Не понимаю, где ты тут выиграть хочешь?
Имелось ввиду, что при синхронном завершении чтения данные уже были в буфере драйвера, поэтому последующий вызов read() скопировал данные из буфера драйвера в пользовательский буфер (если размер, размер буфера сокета не 0, иначе драйвер прямо поместил данные в буфер пользователя).
Далее, если операция чтения завершается синхронно несколько раз подряд, то в таком случае можно продолжать "вычитывать" данные из одного и того же потока, не обращаять к очереди IOCP вызывая ::GetQueuedCompletionStatus(...). Т.е. вычитываем, пока вычитывается синхронно. Всё это делается из одного и тогоже потока. Напротив, если мы вызываем ::GetQueuedCompletionStatus(...), то нет гарантии что IOCP будет использовать тот же поток. Конечно, поскольу очередь активных потоков - LIFO, и из за оптимизации IOCP, скорее всего будет использован тот же поток. Но не факт.
> > > Тогда с большой вероятностью два или > > более потока (на разных CPU) могут одновременно > > обрабатывать разные pending issue из очереди IOCP для > одно > > и того же сокета вне всякой > > последовательности: > Ёптель. Ты несколько раз подряд WSARecv вызываешь что-ли? > Если да, то ничем помочь не могу, я извращениями не > занимаюсь ;)
Если "мешать в одном огороде" обработку pending и синхронных операций (то, что ты критикуешь) - это неизбежно, и да, WSARecv (или ReadFile) вызывается несколько раз подряд (фактически столько раз, сколько раз чтение завершается синхронно). Если же операция обрабатыается в логике аппликации как peding(вне зависимости от того, как она фактически завершиласть, синхронно или асинхронно), то WSARecv (или ReadFile) вызывается только один раз.
> Если нет, то, конечно, никаких таких проблем > нет.
Согласен.
> Кстати, для одновременных запросов WSARecv вполне > логично их нумеровать, таким образом восстанавливая > последовательность (но это, повторюсь, извращение).
В принципе хорошая мысль (кажется, народ так и делает). Тогда придётся поддерживать вектор буферов, и каким-то образом "пересобирать" результирующую последовательность байт в соответствии с их "нумерацией". Лично мне кажется это слишком сложным. Проще эксклюзивно заблокировать до тех пор, пока он не завершит чтение. Блокироваться будет только доступ к одному сокету из 1000.
|
| | | | | | | | |
В общем, по моему мнению, не надо портить красивую... 25.08.08 13:55
Автор: IgorR <Igor Razin> Статус: Member
|
В общем, по моему мнению, не надо портить красивую асинхронную модель. Тем более что выгоды от этого крайне сомнительные. Вот если ты протестируешь оба варианта и явно покажешь выигрыш в скорости, тогда можно будет пообсуждать :)
А так, есть другие более насущные проблемы, например, создание эффективного динамического пула рабочих потоков.
|
| | | | | | | | | |
Согласен с тобой. Ты прав, конечно. У нас работает очень... 26.08.08 04:59
Автор: void <Grebnev Valery> Статус: Elderman
|
> В общем, по моему мнению, не надо портить красивую > асинхронную модель. Тем более что выгоды от этого крайне > сомнительные. Вот если ты протестируешь оба варианта и явно > покажешь выигрыш в скорости, тогда можно будет пообсуждать > :) > А так, есть другие более насущные проблемы, например, > создание эффективного динамического пула рабочих потоков.
Согласен с тобой. Ты прав, конечно. У нас работает очень грамотный парень. Ну ... он приблизительно такого же мнения, что и ты. Так что брошу всё это пока.
Спасибо за критику!
|
| | | | | | | |
Кстати, не знаю как там сокеты, но read/write практически всего выполняется прямо в пользовательский буфер 22.08.08 12:29
Автор: amirul <Serge> Статус: The Elderman
|
Так называемый type3 метод
|
| | | | | | | | |
Кстати, интересно, а можно ли этим управлять для PIPE? Для... 22.08.08 22:06
Автор: void <Grebnev Valery> Статус: Elderman
|
> Так называемый type3 метод Кстати, интересно, а можно ли этим управлять для PIPE? Для сокета "управление" заключается в установке размера буфера сокета в ноль. А если данные читаются из PIPE, то как сказать, что использовать пользовательский буфер напрямую?
|
| | | | | | | | |
Да, в сокетах так же можно. 22.08.08 12:32
Автор: IgorR <Igor Razin> Статус: Member
|
|
|
|