Смекни!
smekni.com

Эффективная многопоточность (стр. 2 из 5)

DWORD QueueUserAPC( PAPCFUNC pfnAPC, // APC функция HANDLE hThread, // хендл потока ULONG_PTR dwData // параметр APC функции);

Рассмотрим небольшой пример ее использования (проверка ошибок устранена для повышения наглядности).

const int _SOME_MAGIC_VALUE = 5;DWORD CALLBACK trd1(LPVOID p){ HANDLE hEvent = (HANDLE)p; SetEvent(hEvent); int i = 0; while(i < _SOME_MAGIC_VALUE){ SleepEx(INFINITE, true); cout << i++ << endl; } return 0;}VOID CALLBACK APCProc(ULONG_PTR dwParam){ cout << "APC Proc #" << dwParam; cout << " threadid :" << GetCurrentThreadId() << endl;}int main(){ HANDLE hEvent = CreateEvent(0, false, false, NULL); DWORD trd_id = 0; HANDLE hThread = CreateThread(0, 0, trd1, hEvent, 0, &trd_id); cout << "Thread id is 0x" << hex << trd_id << endl; WaitForSingleObject(hEvent, INFINITE); for(int i = 0;i < _SOME_MAGIC_VALUE;i++){ QueueUserAPC(APCProc, hThread, i); } WaitForSingleObject(hThread, 1000);CloseHandle(hThread); return 0;}

Несмотря на кажущуюся простоту, пример довольно сложен, и не всегда можно предсказать, что будет на экране после его завершения. Давайте разберем его в форме вопрос-ответ.

Почему я синхронизирую потоки с помощью события?

Если этого не сделать, то все пять APC-запросов выполнятся еще до того, как функция потока trd1 получит управление. Это произойдет потому, что сама система в процессе создания потока использует механизм APC-вызовов для инициализации потока. С его помощью, например, происходит вызов всех функций DllMain с параметром DLL_THREAD_ATTACH, если, конечно, вы не вызывали DisableThreadLibraryCalls для какой-либо библиотеки.

Почему на экран выводится странный результат?

Thread id is 0x68cAPC Proc #0 threadid :68c0APC Proc #1 threadid :68cAPC Proc #2 threadid :68cAPC Proc #3 threadid :68cAPC Proc #4 threadid :68c1

Как я уже говорил, для каждого потока система организует очередь из APC-запросов, так что в момент обработки первого запроса потоком система успевает добавить в очередь все остальные запросы, которые и выполняются при следующем вызове SleepEx. Результат сильно зависит от загруженности системы, так что у вас он может быть другим: например, все запросы успеют выполниться за одну итерацию.

Почему, если закомментировать тело APCProc, на экран выводится следующее?

Thread id is 0x7b401234

Так как теперь эта процедура фактически ничего не делает, система не успевает добавить новый запрос в очередь до завершения обработки предыдущего, так что каждый SleepEx обрабатывает «свой» APC-запрос.

Теперь вам должно быть понятно, как использовать данный механизм для организации пула потоков. Вот примерный сценарий для фиксированного количества потоков в пуле: в главном потоке приложения создаются несколько рабочих потоков, каждый из которых сразу переходит в состояние тревожного ожидания специально созданного основным потоком события. Когда приходит клиентский запрос, главный поток передает APC-запрос одному из рабочих потоков. Рабочий поток пробуждается и выполняет функцию, поставленную в очередь главным потоком. При этом он не покидает функции WaitForSingleObjectEx. То есть выполнение APC-запроса производится как бы внутри функции WaitForSingleObjectEx. После завершения выполнения запроса управление передается функции WaitForSingleObjectEx, которая, в свою очередь, передает управление основному коду потока, возвращая WAIT_IO_COMPLETION.

При получении управления рабочий поток должен проанализировать значение, возвращенное этой функцией. Если оно равно WAIT_IO_COMPLETION, то причиной выхода из функции WaitForSingleObjectEx было завершение обработки APC-запроса – поток при этом должен снова перейти в состояние ожидания события. Если же возвращается значение WAIT_OBJECT_0, то причиной выхода была установка события в сигнальное состояние главным потоком приложения. При этом рабочий поток должен завершиться.

Это очень простая схема (например, рабочие потоки вместо ожидания могут выполнять какую-то другую полезную работу), но она довольно неплохо объясняет механизм использования APC для организации пула.

SetWaitableTimer

Эта функция появилась с версии 4.0. Она позволяет активировать таймер, который через заданный период времени в 100-наносекундных интервалах или при наступлении заданного абсолютного времени переходит в сигнальное состояние. Кроме этого, можно указать процедуру завершения, которая будет вызвана с помощью APC-запроса в данном потоке. Для процедуры завершения можно указать дополнительный параметр. Функция хороша тем, что она не привязана к окнам и циклу выборки сообщений, как, например, SetTimer. С ее помощью можно использовать таймеры в любых приложениях, включая консольные и сервисы. Однако у SetWaitableTimer есть и некоторые недостатки:

функция завершения всегда вызывается в потоке, вызвавшем SetWaitableTimer;

поток должен быть в состоянии тревожного ожидания, чтобы обработать APC-запрос;

второй APC-запрос начнет обрабатываться только после окончания обработки предыдущего запроса, то есть запросы обрабатываются последовательно.

Все эти проблемы решает объект "очередь таймеров", о котором речь пойдет позже.

Порт завершения ввода/вывода

Это, безусловно, один из самых мощных и сложных объектов исполнительной системы. Он специально предназначен для оптимизации обработки клиентских запросов в серверных приложениях. Он не только организует очередь запросов, но и эффективно управляет их обработкой.

Основная идея порта завершения ввода/вывода (в дальнейшем просто порта) состоит в том, чтобы эффективно расходовать процессорное время при обработке клиентских запросов. Обработка должна вестись параллельно несколькими потоками, но строго до определенного момента, когда число потоков будет равняться максимальному значению. После этого новые потоки перестают создаваться, а запросы ставятся в очередь к существующим потокам. Давайте разберемся в этом поподробнее.

Как работает порт

При создании порта указывается максимальное количество активных потоков, способных обрабатывать клиентские запросы параллельно. Так как количество реально работающих параллельно потоков на компьютере равно количеству процессоров, то указание большего максимального количества активных потоков не выгодно. Почему? Дело в том, что для исполнения нескольких потоков на одном процессоре, системе приходится постоянно переключать процессор между потоками, эмулируя, таким образом, параллельность, однако это переключение, называемое переключением контекстов – довольно дорогая операция. Избежать ее можно только одним способом – не создавать параллельно работающие потоки в количестве большем, чем число процессоров. Таким образом, при создании порта, казалось бы, нужно указывать в качестве максимального количества активных потоков число процессоров в системе, но здесь есть одна тонкость. Допустим, у нас однопроцессорный компьютер и, соответственно, клиентские запросы мы обрабатываем в одном потоке. Что будет, если клиентский запрос придет в момент выполнения синхронной операции с диском или в момент ожидания какого-либо объекта этим потоком? Он будет ждать, пока поток не закончит свою работу, но ведь процессор в это время бездействует, потому что поток заблокирован на синхронной операции или на каком-либо объекте. Когда процессор бездействует, а клиентский запрос не обрабатывается – это плохо. Мы приходим к выводу о том, что всегда должен существовать резервный поток, который подхватывал бы запросы в момент, когда «основной» поток выполняет блокирующие операции, и процессор бездействует.

Работа с файлами (в самом широком смысле слова) очень тесно связана с многопоточностью и обработкой запросов на сервере. Сокет или pipe – это тоже файлы. Чтобы обрабатывать запросы через эти каналы параллельно, нужен порт. Давайте рассмотрим функцию создания порта и связи его с файлом (зачем-то разработчики из Microsoft объединили две эти функции в одну; в исполнительной системе эти две функции выполняют сервисы NtCreateIoCompletion и NtSetInformationFile, соответственно).

HANDLE CreateIoCompletionPort ( HANDLE FileHandle, // хендлфайлаHANDLE ExistingCompletionPort, // хендл порта завершения ввода/вывода ULONG_PTR CompletionKey, // ключ завершения DWORD NumberOfConcurrentThreads // максимальное число параллельных потоков);

Для простого создания порта нужно в качестве первого параметра передать INVALID_HANDLE_VALUE, а в качестве второго и третьего – 0. Для связывания файла с портом нужно указать первые три параметра и проигнорировать четвертый.

После того, как файл (под файлом здесь подразумевается объект подсистемы Win32, который реализуется с помощью объекта "файл исполнительной системы", к таковым относятся файлы, сокеты, почтовые ящики, именованные каналы и проч.) связан с портом, окончания всех асинхронных запросов ввода/вывода попадают в очередь порта и могут быть обработаны пулом потоков. Следующие функции могут быть использованы с портом завершения для обработки асинхронных операций ввода/вывода:

ConnectNamedPipe – ожидает подключения клиента к именованному каналу.

DeviceIoControl – низкоуровневый ввод/вывод.

LockFileEx – блокировка региона файла.

ReadDirectoryChangesW – ожидание изменений в директории.

ReadFile – чтение файла.

TransactNamedPipe – Комбинированное чтение и запись по именованному каналу, осуществляемые за одну сетевую операцию.

WaitCommEvent – ожидание события последовательного интерфейса (СОМ-порт).

WriteFile – запись в файл.

Если вы не хотите, чтобы окончание асинхронного ввода/вывода обрабатывалось портом (например, когда вам не важен результат операции), нужно использовать следующий трюк [1]. Нужно установить поле hEvent структуры OVERLAPPED равным описателю события с установленным первым битом. Делается это примерно так:

OVERLAPPED ov = {0};ov.hEvent = CreateEvent(...);ov.hEvent = (HANDLE)((DWORD_PTR)(ov.hEvent) | 1);

И не забывайте сбрасывать младший бит при закрытии хендла события.