Помощь - Поиск - Пользователи - Календарь
Полная версия этой страницы: Сложная синхронизация
Форум на CrossPlatform.RU > Библиотеки > Qt > Qt Ввод/Вывод, Сеть. Межпроцессное взаимодействие
mezmay
Работаю с Jack audio connection kit API, нужна обработка звука в "реальном" времени. Сама обработка делается в callback-функции, вызываемой из другого потока. Но в описании сказано, что нельзя применять длительные операции, в т.ч. мьютексы. Но как тогда работать с общими с основным потоком данными? Какие варианты?

Цитата
int jack_set_process_callback ( jack_client_t * client,
JackProcessCallback process_callback,
void * arg
)

Tell the Jack server to call process_callback whenever there is work be done, passing arg as the second argument.

The code in the supplied function must be suitable for real-time execution. That means that it cannot call functions that might block for a long time. This includes all I/O functions (disk, TTY, network), malloc, free, printf, pthread_mutex_lock, sleep, wait, poll, select, pthread_join, pthread_cond_wait, etc, etc.


Итак, у меня два потока - GUI и поток обработки звука. Как ясно из первого сообщения, в потоке обработки звука нельзя пользоваться мьютексами и т.д. И есть как минимум пять переменных :

int shift_left;
float volume_left;
float volume_right
Filter *filter_left;
Filter *filter_right;

shift_left - текущая задержка левого канала,
volume_left, volume_right - громкость каналов.

GUI поток пишет значения этих переменных, поток обработки звука - читает. Как синхронизировать? Особенно интересуют объекты типа Filter...
FireBlack
Цитата(mezmay @ 27.12.2014, 6:40) *
GUI поток пишет значения этих переменных, поток обработки звука - читает. Как синхронизировать? Особенно интересуют объекты типа Filter...

А если просто отправлять эти значения из GUI потока с помощью сигнала, а в потоке обработке звука ловить их слотом?! Только сконнектить их с помощью Queued Connection. Однако, учтите, что в потоке обработке звука должен быть свой QEventLoop.
mezmay
Я не управляю этим потоком, только callback задаю
mezmay
Хотя можно получить posix thread id, а значит наверное и QThread создать...
lanz
Используйте например
http://qt-project.org/doc/qt-4.8/qatomicint.html
http://qt-project.org/doc/qt-4.8/qatomicpointer.html

Это если вы создаете объект типа фильтр в коллбэке и подменяете.
Если же вы просто используете разделяемый объект, то все сложнее, но направление то-же, используйте атомарные операции, они дешевле мьютексов.
Iron Bug
если я правильно понимаю идею, то это вообще изначально неверно организована работа с риалтаймовскими данными. обычно в callback'е данные только складываются в некий буфер. а потом уже другой поток их читает и что-то с ними делает. в этом другом потоке делай хоть что. а в callback'е никаких ожиданий быть не может. обычно делается кольцевой буфер, туда пишутся данные и атомарно меняются указатели на конец данных. а читающий поток атомарно меняет указатель на начало данных.
mezmay
Цитата(Iron Bug @ 27.12.2014, 23:08) *
если я правильно понимаю идею, то это вообще изначально неверно организована работа с риалтаймовскими данными. обычно в callback'е данные только складываются в некий буфер. а потом уже другой поток их читает и что-то с ними делает. в этом другом потоке делай хоть что. а в callback'е никаких ожиданий быть не может. обычно делается кольцевой буфер, туда пишутся данные и атомарно меняются указатели на конец данных. а читающий поток атомарно меняет указатель на начало данных.


Но в родном примере:
/**
* The process callback for this JACK application.
* It is called by JACK at the appropriate times.
*/
int process (jack_nframes_t nframes, void *arg)
{
    jack_default_audio_sample_t *out = (jack_default_audio_sample_t *) jack_port_get_buffer (output_port, nframes);
    jack_default_audio_sample_t *in = (jack_default_audio_sample_t *) jack_port_get_buffer (input_port, nframes);

    memcpy (out, in, sizeof (jack_default_audio_sample_t) * nframes);
  
    return 0;      
}


Цитата
int jack_set_process_callback ( jack_client_t * сlient, 
    JackProcessCallback    process_callback,
    void * arg )


Tell the Jack server to call process_callback whenever there is work be done, passing arg as the second argument.

The code in the supplied function must be suitable for real-time execution. That means that it cannot call functions that might block for a long time. This includes all I/O functions (disk, TTY, network), malloc, free, printf, pthread_mutex_lock, sleep, wait, poll, select, pthread_join, pthread_cond_wait, etc, etc.


Как тут можно использовать дополнительный буфер, ведь данные принимаются и отправляются в одном месте?
mezmay
Вот полистал API, обратил внимание на функцию в разделе Creating and managing client threads:

Цитата
int jack_client_create_thread    (    jack_client_t *     client,
jack_native_thread_t *     thread,
int     priority,
int     realtime,
void *(*)(void *)     start_routine,
void *     arg
)

Create a thread for JACK or one of its clients. The thread is created executing start_routine with arg as its sole argument.

Parameters:
client the JACK client for whom the thread is being created. May be NULL if the client is being created within the JACK server.
thread place to return POSIX thread ID.
priority thread priority, if realtime.
realtime true for the thread to use realtime scheduling. On some systems that may require special privileges.
start_routine function the thread calls when it starts.
arg parameter passed to the start_routine.
Returns:
0, if successful; otherwise some error number.


но пока не придумал как использовать и надо ли.
mezmay
Не знаю как это реализовать, но правильно работать было бы так:
у нас два потока A - GUI поток и B - поток обработки и ввода/вывода звука.
Все переменные, используемые в обработке звука, должны "жить" в потоке B (в котором должен крутиться QEventLoop). Если их (данные) надо изменить из A (GUI), то делаем это через Queued Connection...
lanz
Все верно, в потоке B устанавливаете нужный коллбэк, потом запускаете event loop. В этом коллбэке делаете всю обработку и выводите данные.
Коллбэк через void*args связан с объектом, который живет в потоке B - так получаете все нужные параметры для обработки.
Поток А через Queued connection выставляет параметры на объекк в B, Qt делает всю синхронизацию.
mezmay
Вопрос в том как сделать этот поток B. Jack работает и вызывает коллбэки в своем потоке, но как мне зауправлять этим потоком?
lanz
Не надо им управлять, вам нужно просто передавать в коллбэк правильные данные
Вроде
struct data {
QAtomicInt guard;
int shift_left;
float volume_left;
float volume_right;
Filter *filter_left;
Filter *filter_right;
}

Цитата
int jack_client_create_thread ( jack_client_t * client,
jack_native_thread_t * thread,
int priority,
int realtime,
void *(*)(void *) start_routine,
void * arg <- Сюда передайте указатель на структуру данных с QAtomicInt например для синхронизации


В коллбэке arg примет то же значение что вы передали, если я все правильно понял(обычно так работает)
В начале коллбэка лочите структуру(например используя testAndSet на QAtomicInt) или ждете пока она освободится, затем читаете данные, разлочиваете.

В GUI треде лочите структуру или ждете пока она освободится, записываете данные, разлочиваете.
Для отладки попробуйте сначала то же самое с мутексами, если будет тормозить, переходите на atomic. Можно будет поиграть и с разными барьерами, чтобы улучшить быстродействие.

Пока структура залочена, вы просто ничего с ней не делаете.
mezmay
Цитата
лочите структуру(например используя testAndSet на QAtomicInt) или ждете пока она освободится

Это можете подробнее объяснить?
lanz
Ну во первых, можно использовать обычный QMutex, я не думаю что он будет сильно хуже самопального.
Во вторых, заводите два значения например(это упрощенный мутекс)
int LOCKED = 1;
int UNLOCKED = 0;

Потом лочите структуру
while (!str->guard.testAndSetOrtdered(UNLOCKED, LOCKED));
//Здесь можно пользоватся
str->guard.fetchAndStoreOrdered(UNLOCKED);
// Здесь структура разлочена

В третьих можно использовать две одинаковых структуры и один QAtomicPointer, суть такая - меняем структуру на которую не указывает указатель в данный момент, и с тех пор больше не трогаем структуру, это в принципе упрощенный вариант того, что предложила Iron Bug, т.е. кольцевой буфер из двух элементов, возможно потребуется завести два буфера - один для входящих данных, один для исходящих.
mezmay
guard.fetchAndStoreOrdered(UNLOCKED);

Я правильно понимаю что эта функция просто присваивает значение (ну и возвращает которое было)? Она в любом случает сделает присваивание за 1 раз, цикл не нужен?
lanz
Да, она просто перезаписывает.
Iron Bug
Цитата(lanz @ 11.1.2015, 15:52) *
это упрощенный мутекс

это не мьютекс, это спинлок (spinlock). там используется специальная ассемблерная синхронизация (fence), которая гарантирует, что весь код, описанный выше "забора" будет выполнен до его вызова - отсюда наименование "ordered" в имени функции, это идёт из типов ассемблерной синхронизации работы конвейера по выборке и распараллеливании выполнения инструкций. и эта синхронизация касается, в том числе, внутренних очередей выборки команд у разных ядер процессора.
спинлок отличается от мьютекса тем, что при залоченном ресурсе поток будет долбиться в него, пока не получит доступ, не уступая процессорное время другим потокам. спинлок шустрее мьютекса (за счёт отсутствия загрузки контекстов спящих потоков при переключении), но больше грузит проц. обрати внимание, что на одноядерном проце или на машине с одним простым процессором спинлок сожрёт всё процессорное время и, скорее всего, завесит машину. так что его использовать можно, но если ты уверен, что у тебя не возникнет проблем с залочиванием потоков.

да, про циклические буферы я говорила не в смысле синхронизации, а в смысле передачи данных между потоками. синхронизация доступа потоков к очередям или буферам может быть какая угодно. можно по-разному реализовать обмен, но чем шустрее будут работать связанные с работой железа callback'и - тем правильнее. железяка (или неуправляемый входящий поток) не может ожидать и получаемые данные всегда имеют приоритет. отдавать их можно и с задержкой, а принимать надо по мере поступления.
mezmay
Цитата(Iron Bug @ 13.1.2015, 9:46) *
это спинлок (spinlock)
Цитата(Iron Bug @ 13.1.2015, 9:46) *
синхронизация доступа потоков к очередям или буферам может быть какая угодно
какие еще есть варианты (не через мьютексы и семафоры)?

Цитата(Iron Bug @ 13.1.2015, 9:46) *
...не уступая процессорное время другим потокам
разве потокам не поочередно предоставляются отрезки времени?
lanz
Цитата(Iron Bug @ 13.1.2015, 9:46) *
это не мьютекс, это спинлок (spinlock)

Технически, мьютекс это семафор с одним ресурсом, так что тут нет противоречий. Спинлок это деталь реализации именно операции захвата семафора. Но это все непринципиально :lol:

Цитата
спинлок отличается от мьютекса тем, что при залоченном ресурсе поток будет долбиться в него

Технически :lol: , можно реализовать мьютекс так, чтобы он сначала пытался захватится спинлоком и только потом уходил в переключение.

Цитата(Iron Bug @ 13.1.2015, 9:46) *
обрати внимание, что на одноядерном проце или на машине с одним простым процессором спинлок сожрёт всё процессорное время и, скорее всего, завесит машину

Если нет ОС, которая переключает потоки, тогда да. Но если ОС не переключает потоки, зачем синхронизация? Про завесит тоже не понял.

Цитата
какие еще есть варианты (не через мьютексы и семафоры)?

Мьютекс и семафор это абстрактные понятия, реализации их могут быть разные(см. выше), но я знаю только через объекты ОС и через атомарные операции/volatile переменные.

Цитата
разве потокам не поочередно предоставляются отрезки времени?

Да, но можно отказаться от своего отрезка и засуспендить поток до наступления определенного события, ОС разбудит. Обычно этот способ предпочтительный, потому что не использует процессорное время для ожидания освобождения ресурса. Но соответственно он и медленнее.
Iron Bug
Цитата(lanz @ 13.1.2015, 22:16) *
Технически, мьютекс это семафор с одним ресурсом, так что тут нет противоречий. Спинлок это деталь реализации именно операции захвата семафора. Но это все непринципиально

это как раз принципиально. потому что базируется на разной реализации и используется для разных целей.
упрощённо разницу я пояснила. если углубляться, нужно ковырять до ассемблерных вызовов и принципов работы очередей команд и синхронизации обращений к памяти. просто из имени этой функции очевидно, что она базируется на ассемблерной синхронизации. иначе такое странное название не придумать: оно просто отражает смысл конкретного вида синхронизации. в последних версиях GCC все виды синхронизации стали функциями стандартной библиотеки, до этого всё делалось вручную, на ассемблере. и ОС тут ни при чём, это архитектура процессора.
если хочешь понять разницу - читай про реализацию мьютексов и спинлоков. можно, например, здесь:
http://stackoverflow.com/questions/5869825...nstead-of-mutex
там не подробно, но понятно и доступно написано в первом ответе.
lanz
Цитата
упрощённо разницу я пояснила. если углубляться, нужно ковырять до ассемблерных вызовов и принципов работы очередей команд и синхронизации обращений к памяти

Мы о разных вещах говорим, то что тред использующий мьютекс засыпает, возможно только при наличии ОС. К самой идее мьютекса это не относится.
Например QMutex использует события под виндой и pthread_mutex_t под лин. Но при этом и тот и тот вроде как мьютекс.

Это я к тому, что вопрос в том как не использовать мьютексы неверен. Правильно спросить - как реализовать синхронизацию, не используя системный-объект-мьютекс или другие системные объекты.
На этот счет есть например такие ссылки
http://en.wikipedia.org/wiki/Dekker%27s_algorithm
http://en.wikipedia.org/wiki/Peterson%27s_algorithm
http://en.wikipedia.org/wiki/Lamport%27s_bakery_algorithm
http://en.wikipedia.org/wiki/Szymanski%27s_Algorithm
Все они так или иначе полагаются на атомарные операции, которые реализаванны через семейство QAtomic, который позволяет использовать всевозможные барьеры кроссплатформенно.
Про барьеры можно почитать тут:
http://www.rdrop.com/users/paulmck/scalabi...2010.07.23a.pdf
lanz
Наврал немного, QMutex под лин использует
https://ru.wikipedia.org/wiki/Фьютекс
Опять наврал.
Век живи, век учись :lol:
Iron Bug
я не о мьютесках писала, а о спинлоках. в них тред не засыпает. и это надо понимать, чтобы не наступить на суровые грабли с дедлоками.
lanz
Цитата
я не о мьютесках писала, а о спинлоках. в них тред не засыпает. и это надо понимать, чтобы не наступить на суровые грабли с дедлоками.

Какие суровые грабли?
Разницы между мьютексами и спинлоками относительно дедлоков нет никакой. То что тред засыпает никак не спасет от дедлоков, если они возможны.
А если мьютекс гибридный? :lol:

Вот дедлок с мьютексами:
mutex A;
mutex B;

A:
{
  lock A;
  wait 3000;
  lock B;
  unlock B;
  unlock A;
}

B:
{
  lock B;
  wait 1000;
  lock A;
  unlock A;
  unlock B;
}

Нет разницы, если бы я заменил слово mutex на spinlock или semaphore.

В продолжение темы про барьеры, относительно конкретно Qt
http://blog.qt.digia.com/blog/2009/10/06/m...ring-semantics/
Iron Bug
я не буду спорить. возомжно, когда-нибудь позже ты поймёшь разницу. я просто предупредила, что спинлоки могут создавать блокирование системы на некоторых архитектурах.
lanz
Цитата
я просто предупредила, что спинлоки могут создавать блокирование системы на некоторых архитектурах

Каким образом?
Iron Bug
Цитата(lanz @ 17.1.2015, 3:49) *
Цитата
я просто предупредила, что спинлоки могут создавать блокирование системы на некоторых архитектурах

Каким образом?

Цитата(Iron Bug @ 13.1.2015, 11:46) *
спинлок отличается от мьютекса тем, что при залоченном ресурсе поток будет долбиться в него, пока не получит доступ, не уступая процессорное время другим потокам. спинлок шустрее мьютекса (за счёт отсутствия загрузки контекстов спящих потоков при переключении), но больше грузит проц. обрати внимание, что на одноядерном проце или на машине с одним простым процессором спинлок сожрёт всё процессорное время и, скорее всего, завесит машину. так что его использовать можно, но если ты уверен, что у тебя не возникнет проблем с залочиванием потоков.
lanz
Цитата
обрати внимание, что на одноядерном проце или на машине с одним простым процессором спинлок сожрёт всё процессорное время и, скорее всего, завесит машину

Не совсем так

Возможно два случая
- когда ОС может переключить тред, который пытается захватить спинлок - тут нет никаких проблем, когда понадобится, тред переключится, спинлок освободится
- ОС не может переключить тред - если ОС не может преключить тред, тогда любой вечный цикл, спинлок или нет, завешивает всю систему, с этим как бы никто и не спорит

http://www.makelinux.net/ldd3/chp-5-sect-5
Цитата
If a nonpreemptive uniprocessor system ever went into a spin on a lock, it would spin forever; no other thread would ever be able to obtain the CPU to release the lock.

Цитата
For this reason, spinlock operations on uniprocessor systems without preemption enabled are optimized to do nothing, with the exception of the ones that change the IRQ masking status.

(барабанная дробь)
Цитата
The kernel preemption case is handled by the spinlock code itself. Any time kernel code holds a spinlock, preemption is disabled on the relevant processor.

То есть если я пишу в режиме ядра под Linux и использую spinlock_t, то процессор зависнет, если я попытаюсь захватить один и тот же спинлок два раза на одном процессоре.
Вот это интересно, и про это могли бы и рассказать, раз уж обмолвились :lol:
Iron Bug
в десятый раз: архитектура проца не зависит от оси. спинлок на синхронизации доступа к памяти - это ассемблерный код. линюкс, венда, что угодно одинаково используют fence. и он совершенно одинаково лочит обращения. и неважно, какой там уровень и какая система.
lanz
fence ничего не блокирует, он просто запрещает перестановку различных операций на процессоре.

Вот например код mutex_lock в ядре использует спинлок:
http://www.cs.fsu.edu/~baker/devices/lxr/h...el/mutex.c#L207
Iron Bug
слушай, мне надоело тут заниматься культпросветом. не понимаешь - не приводи код из ядра. читай Linux Device Drivers, главу 5, про спинлоки:
Цитата
Spinlocks are, by their nature, intended for use on multiprocessor systems, although
a uniprocessor workstation running a preemptive kernel behaves like SMP, as far as
concurrency is concerned. If a nonpreemptive uniprocessor system ever went into a
spin on a lock, it would spin forever; no other thread would ever be able to obtain
the CPU to release the lock. For this reason, spinlock operations on uniprocessor sys-
tems without preemption enabled are optimized to do nothing, with the exception of
the ones that change the IRQ masking status. Because of preemption, even if you
never expect your code to run on an SMP system, you still need to implement proper
locking.

а тот отрывок, что ты привёл, просто не соберётся на однопроцессорных архитектурах без прерываний. это работает только в случае с прерываниями, иначе проц зависнет. и последние ядра требуют опеделённых характеристик от железа. собственно, раньше таких локов и не было. а сейчас, скорее всего, это разруливается флагами сборки ядра.
lanz
Нормально жи общались :lol:

Я привел именно этот отрывок несколькими постами выше, с тем же выделенным жирным текстом.
На однопроцессорных архитектурах без прерываний, любой цикл while(true) завесит систему!
Спинлок в этом отношении обычный цикл и фенсы и атомарность тут не причем.
Почему именно спинлок не угодил?
Iron Bug
Спинлок не обычный цикл. В нём часто запрещены прерывания. И под вендой, кстати, совершенно то же самое. Это всё исходит из архитектуры процессоров и не зависит от систем.
lanz
Со всем согласен, кроме вот этого
Цитата
Это всё исходит из архитектуры процессоров и не зависит от систем.

Можно написать спинлок так чтобы он запрещал прерывания, а можно так чтобы не запрещал, и это как раз свойство самого спинлока или его реализации в ОС.
Вот например спинлок из pthreads без запрещения прерываний
(раз уж мне нельзя код из ядра носить :lol: )
http://code.metager.de/source/xref/gnu/gli...ead_spin_lock.c
Iron Bug
надеюсь, ты понимаешь, что так или иначе ты всё равно полезешь из этой функции к залочиванию ресурсов средствами процессора. в GCC это функция в итоге вызывает __arch_compare_and_exchange, а она (та-даам!) залочивает прерывания и (та-даам!) не работает на системах типа i386 с одним процессором.
потому нет никакой "системной" атомарности. атомарность обеспечивается процессором. система так или иначе лезет всё к тем же инструкциям синхронизации и обойти их она никак не может.
lanz
Цитата
надеюсь, ты понимаешь, что так или иначе ты всё равно полезешь из этой функции к залочиванию ресурсов средствами процессора

Конечно.
Цитата
потому нет никакой "системной" атомарности. атомарность обеспечивается процессором. система так или иначе лезет всё к тем же инструкциям синхронизации и обойти их она никак не может.

Абсолютно точно.
И отсюда вывод, что мьютекс, что спинлок - сводятся к одному и тому же. Только мьютекс засыпает а спинлок нет.

Цитата
__arch_compare_and_exchange, а она (та-даам!) залочивает прерывания

Вот же она
http://code.metager.de/source/xref/gnu/gli...its/atomic.h#61

Точнее вот она из предыдущего примера
http://code.metager.de/source/xref/gnu/gli...ts/atomic.h#185
Не вижу где залочивает прерывания.

Iron Bug
xchgX залочивает шину, вне зависимости от наличия или отсутствия префикса lock:
http://www.fermimn.gov.it/linux/quarta/x86/xchg.htm
иначе не было бы никакой атомарности.
lanz
Ну и пусть залочивает, выключение прерываний перед-во время захвата спинлока и залочивание шины на одну инструкцию это же совсем разные вещи.
И повторюсь, мьютекс так же использует атомарные операции - а значит и он будет лочить шину.

Если же ваш посыл в том, что беспрерывное залочивание шины в попытках захвата спинлока скажется на производительности, то вы совершенно правы.
Та реализация спинлока, которую я предлагал страдает от этого, но например в той же pthreads спинлок сделан по другому, без блокировки при чтении.
А в более современных процессорах залочивается только этот адрес.
Для просмотра полной версии этой страницы, пожалуйста, пройдите по ссылке.
Форум IP.Board © 2001-2018 IPS, Inc.