Глава 7 : Мьютексы и условные переменные
Исходники для этой страницы лежат тут
Взаимные исключения - mutual exclusion или mutex - и условные переменные -
conditional variables - являются основными инструментами синхронизации .
Они применяются как правило для потоков в рамках одного родительского процесса.
Схема работы мьютекса следующая :
блокировать_mutex()...
критическая область
разблокировать_mutex()...
pthread_mutex_t - базовый тип мьютекса.
Мютекс можно создать либо статически , либо динамически :
Статически :
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
Динамически:
pthread_mutex_init
Для установки-снятия блокировка мьютекса есть 3 функции :
int pthread_mutex_lock(pthread_mutex_t * mptr);
int pthread_mutex_trylock(pthread_mutex_t * mptr);
int pthread_mutex_unlock(pthread_mutex_t * mptr);
В случае успеха все 3 возвращают 0 .
Первая отличается от второй тем , что будет ждать в случае , если мьютекс уже заблокирован ,
а вторая сразу вернет ошибку .
Классической задачей синхронизации является схема производитель-потребитель :
|
Имеется несколько потоков-производителей , которые заполняют интовый массив значениями по порядку ,
а один поток-потребитель проверяет правильность заполнения .
//mutex/prodcons2.c
int nitems; /* read-only by producer and consumer */
struct {
pthread_mutex_t mutex;
int buff[MAXNITEMS];
int nput;
int nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };
void *produce(void *), *consume(void *);
int
main(int argc, char **argv)
{
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons2 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
Set_concurrency(nthreads);
/* 4start all the producer threads */
for (i = 0; i < nthreads; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
/* 4wait for all the producer threads */
for (i = 0; i < nthreads; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
/* 4start, then wait for the consumer thread */
Pthread_create(&tid_consume, NULL, consume, NULL);
Pthread_join(tid_consume, NULL);
exit(0);
}
/* end main */
/* include producer */
void *
produce(void *arg)
{
for ( ; ; ) {
Pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
Pthread_mutex_unlock(&shared.mutex);
return(NULL); /* array is full, we're done */
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
Pthread_mutex_unlock(&shared.mutex);
*((int *) arg) += 1;
}
}
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return(NULL);
}
/* end producer */
Глобальные переменные мы обьединяем в структуру shared вместе с мьютексом .
Первый аргумент командной строки - это размерность массива , второй - число создаваемых потоков .
Каждый поток вызывает функцию produce . id-шники потоков хранятся в массиве tid_produce.
Критическая область кода - это функция produce , которую мы блокируем с помощью
Pthread_mutex_lock(&shared.mutex);
...
Pthread_mutex_unlock(&shared.mutex);
Запустив эту команду , мы должны получить результат что-то типа :
>> prodcons2 1000000 5
count[0]=123456
count[1]=4456
count[2]=56345
count[3]=456
count[4]=1256
Т.е. запущено 5 потоков , которые в сумме породили массив на 1000000 элементов ,
причем каждый успел заполнить различное количество элементов массива.
Если мы закомментируем блокировку в функции produce , то все элементы будут созданы первым потоком .
Теперь изменим предыдущий пример , запустив потребителя сразу же после запуска всех производителей.
Это даст возможность потребителю сразу обрабатывать данные по мере их поступления.
Для этого нужно синхронизировать данные .
//mutex/prodcons3.c
int main(int argc, char **argv)
{
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons3 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
/* 4create all producers and one consumer */
Set_concurrency(nthreads + 1);
for (i = 0; i < nthreads; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);
/* 4wait for all producers and the consumer */
for (i = 0; i < nthreads; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);
exit(0);
}
Меняется функция consume , которая вызывает новую функцию consume_wait :
void
consume_wait(int i)
{
for ( ; ; ) {
Pthread_mutex_lock(&shared.mutex);
if (i < shared.nput) {
Pthread_mutex_unlock(&shared.mutex);
return; /* an item is ready */
}
Pthread_mutex_unlock(&shared.mutex);
}
}
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
consume_wait(i);
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return(NULL);
}
/* end consume */
Функция consume_wait ждет , пока производители не создадут 1-й элемент .
Для этого блокируется мьютекс и проверяется индекс производителя nput .
Это цикл проверки называется опросом (spinning или polling) и является по сути
лишней тратой времени процессора .
Было бы лучше использовать какое-то другое средство для проверки ,
которое происходило бы при наступлении определенного события .
Мьютекс используется для блокировки , а условная переменная - для ожидания .
Условная переменная представляет из себя тип pthread_cond_t , и для работы с ней есть 2 функции :
int pthread_cond_wait(pthread_cond_t *cptr , pthread_mutex_t *mptr)
int pthread_cond_signal(pthread_cond_t *cptr)
В первой функции оба параметра являются обязательными .
Мы в очередной раз переписываем предыдущий пример.
Переменные nput и nval ассоциируются с мьютексом , и мы их обьединим в структуру put .
В другой структуре , nready , содержутся счетчик , мьютекс и условная переменная .
Условная переменная инициируется с помощью PTHREAD_COND_INITIALIZER .
//mutex/prodcons6.c
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
/* globals shared by threads */
int nitems; /* read-only by producer and consumer */
int buff[MAXNITEMS];
struct {
pthread_mutex_t mutex;
int nput; /* next index to store */
int nval; /* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready; /* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
Функции produce и consume :
void *
produce(void *arg)
{
for ( ; ; ) {
Pthread_mutex_lock(&put.mutex);
if (put.nput >= nitems) {
Pthread_mutex_unlock(&put.mutex);
return(NULL); /* array is full, we're done */
}
buff[put.nput] = put.nval;
put.nput++;
put.nval++;
Pthread_mutex_unlock(&put.mutex);
Pthread_mutex_lock(&nready.mutex);
if (nready.nready == 0)
Pthread_cond_signal(&nready.cond);
nready.nready++;
Pthread_mutex_unlock(&nready.mutex);
*((int *) arg) += 1;
}
}
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
Pthread_mutex_lock(&nready.mutex);
while (nready.nready == 0)
Pthread_cond_wait(&nready.cond, &nready.mutex);
nready.nready--;
Pthread_mutex_unlock(&nready.mutex);
if (buff[i] != i)
printf("buff[%d] = %d\n", i, buff[i]);
}
return(NULL);
}
Для блокировки теперь используется put.mutex .
nready.nready - это счетчик , в котором хранися число элементов , готовых для обработки .
Перед его увеличением мы проверяем , не было ли оно нулевым , и если да , то вызывается функция
pthread_cond_signal , позволяющая возобновит работу потребителя , который ждет ,
когда эта переменная станет болше нуля. Этот счетчик используется совместно потребителем и
производителями , поэтому доступ к нему осуществляется с помощью мьютекса .
Потребитель просто ждет , когда значение счетчика nready.nready станет больше нуля .
Если его значение равно нулю , мы вызываем pthread_cond_wait , при этом выполняются два атомарных процесса :
1 разблокировка nready.mutex
2 поток приостанавливается , пока другой поток не вызовет pthread_cond_signal
При выходе из pthread_cond_wait блокируется nready.mutex.
Если тут мы обнаруживаем , что счетчик больше нуля , мы обнуляем его ,
зная при этом , что мьютекс заблокирован , и разблокируем мьютекс .
Условный код , передающий сигнал условной переменой , выглядит так :
...
Pthread_mutex_lock
установка истинного значения условия
Pthread_cond_signal
Pthread_mutex_unlock
Условный код , проверяющий условие и приостанавливающий процесс , если условие не выполняется ,
выглядит так :
Pthread_mutex_lock
while(условие ложно)
Pthread_cond_wait
изменение условия
Pthread_mutex_unlock
Можно воспользоваться функцией pthread_cond_broadcast для пробуждения всех процессов ,
заблокированных в ожидании сигнала данной условной переменной .
int pthread_cond_broadcast(pthread_cond_t * ptr)
int pthread_cond_timedwait(pthread_cond_t * ptr, pthread_mutex_t *mptr , ...)
В случае успеха обе возвращают 0
pthread_cond_timedwait позволяет установить ограничение на время блокировки процесса в абсолютном формате,
т.е. число секунд с 1970 года .
Инициализировать мьютексы и условные переменные можно с помощью других функций :
int pthread_mutex_init
int pthread_mutex_destroy
int pthread_cond_init
int pthread_cond_destroy
Все возвращают 0 в случае успеха
Атрибуты имеют тип pthread_mutexattr_t и pthread_condattr_t соответственно ,
для их инициализации и удаления есть свой набор функций . После инициализации их можно изменить с помощью другого
набора специальных функций .
Следующий код показывает , как надо проинициализировать мьютекс , чтобы его можно было использовать
нескольким процессам :
pthread_mutex_t *mptr;
pthread_mutexattr_t mattr;
mprt = pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED)
pthread_mutex_init(mptr,&mattr);
Когда мьютекс используется совместно несколькими процессами ,
всегда есть возможность , что процесс будет завершен во время работы с заблокированным ресурсом .
И нет способа заставить систему автоматически снять эту блокировку .
Единственный тип блокировки , снимаемый автоматически - блокировка fcntl .
При использовании семафоров System V можно указать ядру ,
следует ли автоматом снимать блокировку .
Т.о. мьютексы и взаимные исключения могут инициализироваться как статически , так и динамически .
Динамическая инициализация позволяет указывать дополнительные атрибуты .
|