Статьи‎ > ‎Фильтры‎ > ‎

Потоковая модель фильтров

Нити и критические секции

Рассмотрим важнейший вопрос о нитях и критических секциях в фильтрах DirectShow при их использовании в различных аспектах и шаги, которые нужно предпринять для избежания краха и блокировок при разработкке собственных фильтров.

Примеры этго раздела используют псевдокод для иллюстрации того, что нужно будет реализовать. Предполагается, что пользовательские фильтры используют классы, происходящие от следующих базовых классов DirectShow:

CMyInputPin: Происходит от CBaseInputPin - абстрактного класса для реализации входящих контактов:

CMyOutputPin: Происходит от класса CBaseOutputPin - абстрактного класса для реализации исходящих контактов:


CMyFilter: Происходит от класса CBaseFilter - абстрактного класса для реализации фильтров:

CMyInputAllocator: Распределитель памяти входящего контакта, происходит от класса CMemAllocator - он поддерживает интерфейс IMemAllocator и происходит от класса CBaseAllocator:


Не для каждого фильтра нужен пользовательский распределитель. Для многих фильтров достаточно класса CMemAllocator.

Потоковость и нити приложения

Любое приложение DirectShow содержит как минимум две важнейших нити: нить приложения и одну или больше потоковых нитей. Сэмплы перемещаются на потоковых нитях, а изменения состояний происходят на нити приложения. Главная потоковая нить создается фильтром источника или анализа. Другие фильтры могут создавать рабочие потоки, перемещающие сэмплы, и они учитываются потоковыми нитями. Некоторые методы вызываются нитью приложения, тогда как другие - потоковыми нитями. Например:

  • Потоковая нить: IMemInputPin::Receive, IMemInputPin::ReceiveMultiple, IPin::EndOfStream, IMemAllocator::GetBuffer.
  • Нить приложения: IMediaFilter::Pause, IMediaFilter::Run, IMediaFilter::Stop, IMediaSeeking::SetPosition, IPin::BeginFlush, IPin::EndFlush.
  • И та, и другая: IPin::NewSegment.

Обладание разделенными потоковыми нитями позволяет данным перемещаться через граф в то время, пока нить приложения ждет пользовательского ввода. Опасность многопоточности состоит, однако, в том, что фильтр может создать ресурс, находясь в приостановленном состоянии (на нити приложения), использовать его внутри потокового метода и освободить его при остановке (тоже на нити приложения). Если быть невнимательным, то потоковая нить может попытаться использовать ресурс после его освобождения. Решением есть использование критических секций для защиты ресурса и синхронизация потоковых методов с изменением состояний.

Фильтру необходима одна критическая секция для защиты состояния фильтра. Класс CBaseFilter имеет переменную-член для критической секции - CBaseFilter::m_pLock. Эта критическая секция называется замком фильтра (filter lock). Каждому входящему контакту также нужна критическая секция для защиты ресурсов, используемых потокововой нитью. Эти критические секции называются потоковыми замками (streaming lock); их нужно объявить в производных классах контактов. Проще использовать класс CCritSec, являющийся оберточным для объекта Windows CRITICAL_SECTION и может быть замкнут (locked) использованием класса CAutoLock. Класс CCritSec дает возможность также использовать некоторые функции отладки.

Когда фильтр останавливается или сбрасывается, он должен синхронизировать нить приложения с потоковой нитью. Для предотвращения взаимоблокировки сначала нужно разблокировать потоковую нить, которая может быть блокирована по следующим причинам:

  • Она ожидает получения сэмпла внутри метода IMemAllocator::GetBuffer, т.к. используются все сэмплы распределителя памяти.
  • Производится ожидание другого фильтра на предмет возвращения из потокового метода, такого как Receive.
  • Производится ожидание внутри из собственных потоковых методов некоторого ресурса.
  • Фильтр-рендерер ожидает времени демонстрации некоторого сэмпла.
  • Фильтр-рендерер ожидает внутри метода Receive при приостановке.

Т.о., когда фильтр останавливается или сбрасывается, он может делать следующее:

  • Освободить все удерживаемые (независомо от причины) сэмплы. Тем самым разблокируются вызовы метода GetBuffer.
  • Вернуться изо всех потоковых методов так быстро, как это возможно. Если потоковый метод ожидает каких бы то ни было ресурсов, нужно немедленно прекратить это ожидание.
  • Начать отказ от сэмплов в методе Receive, так что потоковая нить прекращает иметь доступ к любым ресурсам. (Класс CBaseInputPin поддерживает это поведение автоматически).
  • Метод Stop должен отменить (decommit) все распределители фильтра. (CBaseInputPon поддерживает такое поведение автоматически).

И сбрасывание, и остановка происходят на нити приложения. Фильтр останавливается в ответ на вызов метода IMediaControl::Stop. Менеджер графа фильтров выдает команду остановки в порядке снизу вверх, начиная от рендерера и возвращаясь назад фильтру источника. Команда остановки происходит полностью внутри метода фильтра CBaseFilter::Stop. Когда метод возвращает управление, фильтр должен находиться в остановленном состоянии.

Сбрасывание происходит, обычно, при поиске. Команда сброса стартует с фильтра источника или анализа и идет вниз по потоку. Сброс происходит в два этапа: метод IPin::BeginFlush сообщает фильтру о необходимости отмены всех стящих в очереди и входящих данных; метод IPin::EndFlush сообщает фильтру о разрешении снова принимать данные. Сброс требует двух этапов, т.к. BeginFlush вызывается на нити приложения, а потоковая нить тем временем продолжает пересылать данные. Т.о., некоторые сэмплы могут придти после вызова BeginFlush. Фильтр их должен сбросить. Все сэмплы, что пришли после вызова EndFlush гарантированно новые, и они будут переданы.

Последующие разделы будут содержать примеры, показывающие, как реализовывать наиболее важные методы фильтров, такие, как Pause, Receive и т.д. так, чтобы избежать блокировок. Каждый фильтр имеет разные требования, но, однако, нужно будет адаптировать эти примеры к своим фильтрам.

Примечание. Базовые классы CTransformFilter и CTransInPlaceFilter реализовывают многое, о чем было сказано в этом разделе. Если вы пишете трансформационный фильтр и ваш фильтр не ожидает событий внутри потокового метода или удерживается на сэмплах вне Receive, то эти базовые классы будут подходящими.

Приостановка

Все изменения состояний фильтров долдны поддерживать замыкание фильтра (filter lock). В методе Pause можно создавать произвольные ресурсы, необходимые фильтру:

HRESULT CMyFilter::Pause() 
{
CAutoLock lock_it(m_pLock);
/* Create filter resources. */
return CBaseFilter::Pause();
}

Метод CBaseFilter::Pause устанавливает корректное состояние на фильтре (State_Paused) и вызывает метод CBasePin::Active на каждом соединенном контакте фильтра. Метод Active информирует контакт, что фильтр стал активным (?). Если контакт создает ресурсы, нужно переписать метод Active следующим образом:

HRESULT CMyInputPin::Active()
{
// You do not need to hold the filter lock here. It is already held in Pause./* Create pin resources. */return CBaseInputPin::Active()
}

Передача и получение сэмплов

Следующий псевдокод показывает как реализовать метод IMemInput::Receive:

HRESULT CMyInputPin::Receive(IMediaSample *pSample)
{
CAutoLock cObjectLock(&m_csReceive);

// Perhaps the filter needs to wait on something.
WaitForSingleObject(m_hSomeEventThatReceiveNeedsToWaitOn, INFINITE);

// Before using resources, make sure it is safe to proceed. Do not// continue if the base-class method returns anything besides S_OK.
hr = CBaseInputPin::Receive(pSample);
if (hr != S_OK)
{
return hr;
}
/* It is safe to use resources allocated in Active and Pause. */// Deliver sample(s), via your output pin(s).for (each output pin)
pOutputPin->Deliver(pSample);

return hr;
}

Метод Receive блокирует поток, но не фильтр. Фильтру может быть необходимо дождаться некоторого события перед началом обработки данных - это показано здесь вызовом WaitForSingleObject. Это нужно не всегда. Метод CBaseInputPin::Receive проверяет некоторые общие потоковые условия. Он возвращает VFW_E_WRONG_STATE, если фильтр остановлен, S_FALSE, если фильтр сброшен (flushing) и т.д. Любой код возврата, отличающийся от S_OK свидетельствует о том, что метод Receive должен немедленно вернуть управление и не обрабатывать данных.

После обработки сэмпла он отправляется нижележащему фильтру посредством вызова CBaseOutputPin::Deliver. Этот вспомогательный метод вызывает IMemInputPin::Receive на нижележащем входящем контакте. Фильтр может передать сэмпл нескольким контактам.

Получение конца потока

Когда входящий контакт принимает сообщение об окончании потока, он распространяет этот вызов вниз по потоку. Все нижележащие фильтры, принимающие данные от этого входящего контакта, должны также получить сообщение об окончании потока. Опять-таки, нужно использовать потоковую, а ни фильтровую блокировку. Если у фильтра есть отложенные данные, которые он еще не передал, фильтр должен передать их теперь, перед отправкой сообщения об окончании потока. После окончания потока нельзя посылать каких бы то ни было данных.

HRESULT CMyInputPin::EndOfStream()
{
CAutoLock lock_it(&m_csReceive);

/* If the pin has not delivered all of the data in the stream
(based on what it received previously), do so now. */// Propagate EndOfStream call downstream, via your output pin(s).for (each output pin)
{
hr = pOutputPin->DeliverEndOfStream();
}
return S_OK;
}

Метод CBaseOutputPin::DeliverEndOfStream вызывает метод IPin::EndOfStream на нижележащем входящем контакте.

Сброс данных

Следующий псевдокод показывает, как можно реализовать метод IPin::BeginFlush:

HRESULT CMyInputPin::BeginFlush()
{
CAutoLock lock_it(m_pLock);

// Будем уверены, что метод Receive будет возвращать ошибку.
HRESULT hr = CBaseInputPin::BeginFlush();

// Заставим нижележащие фильтры освободить сэмплы. Если наш метод Receive // блокирован в GetBuffer или Deliver, деблокируем его.for (each output pin)
{
hr = pOutputPin->DeliverBeginFlush();
}

// Разблокируем наш метод Receive если он ожидает события.
SetEvent(m_hSomeEventThatReceiveNeedsToWaitOn);

// Здесь метод Receive не будет блокирован. Блокируем напоследок поток.// (Это не необходимо, если это последний шаг.)
{
CAutoLock lock_2(&m_csReceive);

/* Теперь будет сохранено все, что может привести к краху, если будет
выполнен метод Receive (???). */
}
return hr;
}

Когда начинается сброс, метод BeginFlush блокирует фильтр, который сериализирует изменение состояния (???). Это еще не безопасно для потоковой блокировки, т.к. сброс происходит на нити приложения и потоковая нить может находиться внутри вызова метода Receive. Контакту необходимо гарантировать, что Receive не будет заблокирован и что любые последующие вызовы Receive будут возвращать ошибку. Метод CBaseInputPin::BeginFlush устанавливает внутренний флаг - CBaseInputPin::m_bFlushing. Когда этот флаг равен TRUE - метод Receive возвращает ошибку.

Передавая вызов BeginFlush вниз по потоку, контакт гарантирует, что нижележащие фильтры освобождают свои сэмплы и возвращаются из вызовов Receive. Этот возврат гарантирует, что все входящие контакты не будут блокированы вызовами GetBuffer или Receive. Если метод Receive вашего контакта ожидает некоего события (для получения ресурса, например), метод BeginFlush должен сбросить это ожидание насильной установкой события. И тогда метод Receive гарантированно вернет управление, и флаг m_bFlushing упредит новые вызовы Receive.

Для некоторых фильтров BeginFlush - это все, что нужно. Метод EndFlush сообщает фильтру, что он снова может начать прием сэмплов. Другим фильтрам может понадобиться использование переменных или ресурсов в BeginFlush, которые используеются и в Receive. В этом случае фильтр должен сначала заблокировать поток. Будьте уверены, что это не так перед выполнением любых предыдущих шагов, т.к. это может привести к блокировке.

Метод EndFlush блокирует фильтр и распространяет вызов вниз по течению:

HRESULT CMyInputPin::EndFlush()
{
CAutoLock lock_it(m_pLock);
for (each output pin)
hr = pOutputPin->DeliverEndFlush();
return CBaseInputPin::EndFlush();
}

Метод CBaseInputPin::EndFlush сбрасывает флаг m_bFlushing в FALSE, который позволяет методу Receive снова начать получение данных. Это должно быть последним шагом в EndFlush, т.к. не должен получать никаких сэмплов до тех пор, пока сброс не будет завершен и все нижележащие фильтры не будут уведомлены.

Остановка

Метод Stop должен разблокировать метод Receive и отменить (decommit) распределители фильтра. Отмена распределителя форсирует вызовы из всех стоящих в очереди вызовов GetBuffer, которые разблокируют вышележащие фильтры, что ожидают сэмплы. Метод Stop накладывает блокировку на фильтр и затем вызывает метод CBaseFilter::Stop, вызывающий метод CBasePin::Inactive на всех контактах фильтра:

HRESULT CMyFilter::Stop()
{
CAutoLock lock_it(m_pLock);
// Inactivate all the pins, to protect the filter resources.
hr = CBaseFilter::Stop();

/* Safe to destroy filter resources used by the streaming thread. */return hr;
}

Метод входящих контактов Inactive нужно переписать следующим образом:

HRESULT CMyInputPin::Inactive()
{
// You do not need to hold the filter lock here. // It is already locked in Stop.// Unblock Receive.
SetEvent(m_hSomeEventThatReceiveNeedsToWaitOn);

// Make sure Receive will fail. // This also decommits the allocator.
HRESULT hr = CBaseInputPin::Inactive();

// Make sure Receive has completed, and is not using resources.
{
CAutoLock c(&m_csReceive);

/* It is now safe to destroy filter resources used by the
streaming thread. */
}
return hr;
}

Получение буферов

Если ваш фильтр имеет пользовательский распределитель, использующий ресурсы фильтра, метод GetBuffer должен блокировать поток, как с другими потоковыми методами:

HRESULT CMyInputAllocator::GetBuffer(
IMediaSample **ppBuffer,
REFERENCE_TIME *pStartTime,
REFERENCE_TIME *pEndTime,
DWORD dwFlags)
{
CAutoLock cObjectLock(&m_csReceive);

/* Use resources. */return CMemAllocator::GetBuffer(ppBuffer, pStartTime, pEndTime, dwFlags);
}

Потоковые нити и менеджер графа фильтров

Когда менеджер графа фильтров останавливает граф, он ждет завершения всех потоковых нитей. Для фильтров это влечет следующее:

  • Фильтр никогда не должен вызывать методы менеджера графа фильтров из потоковой нити.

Менеджер графа фильтров использует критические секции для синхронизации своих собственных операций. Если потоковая нить пытается удержать критическую секцию, это может вызвать взаимоблокировку. Предположим, например, что другой поток останавливает граф. Этот поток блокирует граф фильтра и ждет, пока ваш фильтр пректатит принимать данные. Если ваш фильтр будет ожидать блокировки, он никогда не остановится, что приведет к дедлоку.

  • Фильтр никогда не должен вызывать AddRef и QueryInterface менеджера графа фильтра из потоковой нити.

Если фильтр удерживает счетчик ссылок менеджера графа фильтров (через AddRef или QueryInterface), он может быть последним объектом, удерживающий счетчик ссылок. Когда фильтр вызывает Release, менеджер графа фильтров уничтожает его. Внутри этого процесса менеджер графа фильтров пыается остановить граф, что ведет к ожиданию завершения потковой нити. Однако, это ожидание происходит внутри потоковой нити, так что потоковая нить не может завершиться. Результатом будет дедлок. (НЕ ПОНЯЛ ИЛИ СОВСЕМ НЕ ПОНЯЛ ОБА ПУНКТА)

Резюме

Потоковой нитью вызываются следующие методы:

  • IMemInputPin::Receive
  • IMemInputPin::ReceiveMultiple
  • IPin::EndOfStream
  • IPin::NewSegment
  • IMemAllocator::GetBuffer

Следующие методы вызываются нитью приложения:

  • Изменения состояний: IBaseFilter::JoinFilterGraph, IMediaFilter::Pause, IMediaFilterLLRun, IMediaFilter::Stop, IQualityControl::SetSink.
  • Ссылочные часы: IMediaFilter::GetSyncSource, IMediaFilter::SetSyncSource.
  • Операции с контактами: IBaseFilter::FindPin, IPin::Connect, IPin::ConnectedTo, IPin::ConnectionMediaType, IPin::Disconnect, IPin::ReceiveConnection.
  • Функции распределителей: IMemInputPin::GetAllocator, IMemInputPin::NotifyAllocator.
  • Сбрасывание: IPin::BeginFlush, IPin::EndFlush.

Это не исчерпывающий список. Когда вы пишете фильтр, вы должны рассмотреть какие методы изменяют состояния фильтров и какие методы исполняются на потоковых операциях.

Comments