Смекни!
smekni.com

MSSQL 2005 (Yukon) – работа с очередями и асинхронная обработка данных (стр. 3 из 7)

Сначала займемся получателем. Для получения сообщения служит команда RECEIVE, которая сильно напоминает обычный SELECT, только вместо имени таблицы указывается имя очереди. К слову, и команда SELECT для очереди работает (поскольку с точки зрения базы данных очередь – это обычная таблица), показывая ее содержимое, но ничего из нее не удаляя. Команда же RECEIVE выбирает данные из очереди, удаляя выбранные сообщения. Однако если очередь пуста, RECEIVE отработает вхолостую и вернет пустой набор данных, а хотелось бы, чтобы кто-то караулил очередь, и RECEIVE бы срабатывала, как только в очереди что-то появится. К счастью, в этом нет ничего сложного, достаточно обернуть RECEIVE в WAITFOR. Итак, в отдельном окне выполняем следующую команду для своевременного получения сообщения:

WAITFOR(RECEIVE cast(message_body as nvarchar(MAX)) FROM [TargetQueue])

После выполнения этой команды подключение замрет в ожидании сообщения из очереди. Теперь самое время заняться отправителем. У него задачка посложнее, надо начать диалог и передать сообщение с идентификатором открытого диалога.

DECLARE @convHandler uniqueidentifier-- началодиалога--BEGIN DIALOG @convHandler FROM SERVICE [SourceService] TO SERVICE 'TargetService' ON CONTRACT [TestContract];-- посылкасообщения--SEND ON CONVERSATION @convHandler MESSAGE TYPE [TestType] (N'Message!!!')-- завершениедиалога--END CONVERSATION @convHandler

Если после отправки сообщения вернуться в окошко, где ожидали его получения, можно увидеть, что сообщение успешно получено.

Стоит заметить, что TargetService при создании диалога взят в кавычки, а SourceService – нет. Дело в том, что TargetService может быть создан на совершенно другом сервере, и просто отсутствовать на сервере, где начинается диалог такого сервиса.

Как можно видеть из примера, в каком именно диалоге отправлять сообщение, определяется некой меткой (handler), которая возвращается при создании диалога, и представляет собой GUID. Если ее в какой-то момент потерять, то завершить диалог можно будет только административными методами, узнав этот GUID из служебных представлений (catalog view). Эта же метка приезжает к получателю вместе с сообщением, и выбрав эту метку из очереди, можно отправить сообщение обратно в том же диалоге.

Асинхронные триггеры

Теперь рассмотрим, как можно использовать коммуникативные возможности Service Broker на сервере. Например, можно использовать его для реализации асинхронных триггеров, причем не только для DML- и DDL-операций, но и для событий, отслеживаемых профайлером (trace events), и если DML-триггеры придется реализовывать отчасти с применением обычных, то для DDL-триггеров и событий профайлера предусмотрен специальный механизм.

Асинхронные DML-триггеры

Начнем с DML, идея которых, в общем-то, должна быть очевидна. Допустим, у нас есть очень большая таблица (Very_Big_Table), для отчетов по которой надо периодически считать некие агрегатные значения. Поскольку таблица очень большая, то агрегаты считаются очень долго. Отчет не всегда должен быть актуальным, но всегда – согласованным, и строиться должен максимально быстро. Это значит, что в идеале агрегаты должны быть посчитаны заранее. Делать пересчет данных в обычном триггере накладно для операций обновления, так как расчет агрегатов происходит долго, как уже было упомянуто. И тут на помощь приходит Service Broker. В обычном триггере на изменение Very_Big_Table создается диалог (строго говоря, мало что мешает создать диалог заранее, разве что проблемы с запоминанием метки при развертывании) и отправляется сообщение, о том что таблица изменилась. Это занимает минимум времени, а изменяющий процесс идет дальше заниматься своими делами. Получатель же начинает не торопясь пересчитывать эти занудные агрегаты, чтобы к моменту, когда понадобится отчет, все уже было готово.

Вот как это может выглядеть. Сначала создадим необходимые тестовые таблички:

CREATE TABLE Very_Big_Table(ID int IDENTITY, Data bigint, [Time] DateTime)GO-- заполнимтаблицуданными--INSERT INTO Very_Big_Table(Data, [Time])SELECT object_id, create_date FROM sys.objectsGO-- табличка для вычисленного агрегата--CREATE TABLE Big_Aggregate(Agg bigint, [Time] DateTime)GO-- Ну и проинициализируем ее--INSERT INTO Big_Aggregate(Agg, [Time])SELECT Sum(Data), GetDate() FROM Very_Big_Table

Теперь триггер на изменение очень большой таблички. Здесь мы сильно мудрствовать не будем, воспользуемся уже готовыми метаданными из предыдущего примера:

CREATE TRIGGER AsyncAggregate ON Very_Big_Table FOR INSERT, UPDATE, DELETEAS DECLARE @convHandler uniqueidentifier BEGIN DIALOG @convHandler FROM SERVICE [SourceService] TO SERVICE 'TargetService' ON CONTRACT [TestContract]; SEND ON CONVERSATION @convHandler MESSAGE TYPE [TestType] (N'The data hase been changed')END CONVERSATION @convHandlerGO

Передавать в сообщении никакой ценной информации нам не надо, так как принимающая сторона должна просто узнать, о том, что таблица поменялась, а признаком этого служит сам факт доставки сообщения. Более того, в данной ситуации нет необходимости даже вызывать команду SEND, так как закрытие диалога (END CONVERSATION) вызывает посылку специального сообщения об этом печальном событии на принимающую сторону. Однако в реальной ситуации может понадобиться передать некоторую информацию, и если ее необходимо структурировать, то придется воспользоваться XML.

Теперь займемся принимающей стороной. Для начала создадим процедуру пересчета агрегата:

CREATE PROCEDURE AggRecalculate AS -- очисткаочереди-- RECEIVE * FROM [TargetQueue] -- небольшая задержка для имитации действительно долгого расчета-- WAITFOR DELAY '00:00:02' UPDATE Big_Aggregate SET Agg = (SELECT SUM(Data) FROM Very_Big_Table), [Time] = GetDate()GO

Процедура готова, но есть одна проблема. Как выполнить эту процедуру при появлении сообщения в очереди? Конечно, можно, как и раньше, обернуть RECEIVE в WAITFOR, но в этом случае кто-то должен запусить процедуру, чтобы она начала ждать сообщений из очереди. И мало того, сообщение-то у нас может быть не одно. Значит, нужно чтобы после получения кто-то активизировал процедуру снова. Другими словами, нужен некий монитор, который следил бы за состоянием очереди и при появлении в ней сообщений вызывал нашу процедуру. К счастью, все уже сделано за нас. Такой монитор имеется в Service Broker, и для его включения достаточно немного изменить параметры очереди, указав, какую процедуру надо вызвать при получении сообщения:

ALTER QUEUE [TargetQueue] WITH ACTIVATION( STATUS = ON, PROCEDURE_NAME = AggRecalculate, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER)

Ключевое слово здесь, конечно же, ACTIVATION, то есть активация. Однако если параметр STATUS у нее выставлен в OFF, она не сработает. Как несложно догадаться, в параметре PROCEDURE_NAME указывается имя процедуры, которая будет вызвана при активации, а в EXECUTE AS – от имени какого пользователя эта процедура будет вызвана. Параметр MAX_QUEUE_READERS определяет максимальное количество процедур, которое одновременно может быть запущено для разгребания очереди. Если во время работы процедуры поступили новые сообщения, то запускается еще один экземпляр этой процедуры, и так до максимального разрешенного количества или опустошения очереди.

Теперь все готово для эксперимента, можно приступать. Сначала обновим нашу «очень_большую_таблицу», и тут же заберем данные из таблички агрегатов, затем подождем чуть-чуть и снова заберем агрегированные данные, чтобы увидеть, как они изменились после работы процедуры перерасчета, автоматически запущенной Service Broker-ом.

UPDATE Very_Big_Table SET Data = Data + 10 WHERE ID=1SELECT * FROM Big_AggregateWAITFOR DELAY '00:00:05'SELECT * FROM Big_Aggregate-- Результат:--Agg Time-------------------- -----------------------76577545551 13:44:37.98776577545561 13:59:24.630

Как можно заметить, все отлично сработало, произошло асинхронное обновление агрегатной таблицы. Вся прелесть в том, что агрегатная таблица может находиться на совершенно другом сервере в другой стране, надо только настроить подключение соответствующим образом, что совсем не сложно.

Асинхронные DDL и SQL-Trace триггеры (Event Notification)

Для реализации асинхронных триггеров на DDL-операции и события профайлера существует специальный механизм, Event Notification (извещение о событии).

ПРИМЕЧАНИЕНадо учитывать, что в связи с асинхронностью данного механизма породившие это извещение изменения в базе или на сервере, не отменятся в случае отката извещения, как это было бы в DDL-триггере. Они – уже свершившийся факт. И еще один нюанс: поскольку события профайлера работают вне транзакций, то даже если изменение на сервере, вызвавшее посылку сообщения, не увенчается успехом, то само сообщение все равно будет доставлено до получателя, однако для DDL-событий это не работает, так как DDL-операции работают в рамках транзакции и в случае отмены DDL транзакции сообщение отправлено не будет.

Как не сложно догадаться, этот механизм отслеживает события, на которые есть подписчики, и посылает соответствующее сообщение. Для того чтобы механизм сообщений заработал, достаточно создать очередь и сервис получателя с предопределенным контрактом [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification], все остальное - и контракт, и диалог, и сервис с очередью отправителя, уже реализовано. Затем надо создать объект EventNotification, связывающий нужное событие с сервисом – и готово. На практике, допустим, для асинхронного аудита подключений к серверу и отключений от оного, это может выглядеть следующим образом:

-- сначала создадим очередь получателя, при желании -- здесь можно назначить процедуру обработки новых сообщений--CREATE QUEUE [LoginQueue]GO-- затем необходимо создать сервис со специальным контрактом,-- в котором уже есть необходимые типы сообщений--CREATE SERVICE [LoginService] ON QUEUE [LoginQueue]( [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification])GO-- Ну а теперь можно создать и сам Event Notification, связывающий-- серверные события с сервисом доставки сообщения--CREATE Event Notification auditLogin ON SERVER FOR Audit_Login, Audit_Logout TO SERVICE 'LoginService', 'current database'

Здесь ‘current database’ – это константа, которая говорит о том, что в качестве механизма доставки будет использоваться экземпляр Service Broker-а, установленный в текущей базе. Указание этого экземпляра является необходимым параметром при создании уведомления.