Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
 Books
  Краткое описание
 Linux
 W. R. Стивенс TCP 
 W. R. Стивенс IPC 
 A.Rubini-J.Corbet 
 K. Bauer 
 Gary V. Vaughan 
 Д Вилер 
 В. Сталлинг 
 Pramode C.E. 
 Steve Pate 
 William Gropp 
 K.A.Robbins 
 С Бекман 
 Р Стивенс 
 Ethereal 
 Cluster 
 Languages
 C
 Perl
 M.Pilgrim 
 А.Фролов 
 Mendel Cooper 
 М Перри 
 Kernel
 C.S. Rodriguez 
 Robert Love 
 Daniel Bovet 
 Д Джеф 
 Максвелл 
 G. Kroah-Hartman 
 B. Hansen 
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Linux Kernel 2.6...5170 
 Trees...940 
 Максвелл 3...871 
 Go Web ...823 
 William Gropp...803 
 Ethreal 3...787 
 Gary V.Vaughan-> Libtool...774 
 Ethreal 4...771 
 Rodriguez 6...766 
 Ext4 FS...755 
 Clickhouse...755 
 Steve Pate 1...755 
 Ethreal 1...742 
 Secure Programming for Li...732 
 C++ Patterns 3...716 
 Ulrich Drepper...698 
 Assembler...695 
 DevFS...662 
 Стивенс 9...651 
 MySQL & PosgreSQL...632 
 
  01.01.2024 : 3621733 посещений 

iakovlev.org

Глава 7 : Мьютексы и условные переменные

Исходники для этой страницы лежат тут

Взаимные исключения - mutual exclusion или mutex - и условные переменные - conditional variables - являются основными инструментами синхронизации . Они применяются как правило для потоков в рамках одного родительского процесса.

Схема работы мьютекса следующая :

       
 	блокировать_mutex()...
 	критическая область 
 	разблокировать_mutex()...
 
pthread_mutex_t - базовый тип мьютекса.

Мютекс можно создать либо статически , либо динамически :

       
 Статически :
 	static pthread_mutex_t	lock = PTHREAD_MUTEX_INITIALIZER;
 Динамически:
 	pthread_mutex_init
 
Для установки-снятия блокировка мьютекса есть 3 функции :
       
 	int pthread_mutex_lock(pthread_mutex_t * mptr);
 	int pthread_mutex_trylock(pthread_mutex_t * mptr);
 	int pthread_mutex_unlock(pthread_mutex_t * mptr);
 
 В случае успеха все 3 возвращают 0 .
 
Первая отличается от второй тем , что будет ждать в случае , если мьютекс уже заблокирован , а вторая сразу вернет ошибку .

Классической задачей синхронизации является схема производитель-потребитель :
Имеется несколько потоков-производителей , которые заполняют интовый массив значениями по порядку , а один поток-потребитель проверяет правильность заполнения .

       
 //mutex/prodcons2.c
 
 int		nitems;			/* read-only by producer and consumer */
 struct {
   pthread_mutex_t	mutex;
   int	buff[MAXNITEMS];
   int	nput;
   int	nval;
 } shared = { PTHREAD_MUTEX_INITIALIZER };
 
 void	*produce(void *), *consume(void *);
 
 int
 main(int argc, char **argv)
 {
 	int			i, nthreads, count[MAXNTHREADS];
 	pthread_t	tid_produce[MAXNTHREADS], tid_consume;
 
 	if (argc != 3)
 		err_quit("usage: prodcons2 <#items> <#threads>");
 	nitems = min(atoi(argv[1]), MAXNITEMS);
 	nthreads = min(atoi(argv[2]), MAXNTHREADS);
 
 	Set_concurrency(nthreads);
 		/* 4start all the producer threads */
 	for (i = 0; i < nthreads; i++) {
 		count[i] = 0;
 		Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
 	}
 
 		/* 4wait for all the producer threads */
 	for (i = 0; i < nthreads; i++) {
 		Pthread_join(tid_produce[i], NULL);
 		printf("count[%d] = %d\n", i, count[i]);	
 	}
 
 		/* 4start, then wait for the consumer thread */
 	Pthread_create(&tid_consume, NULL, consume, NULL);
 	Pthread_join(tid_consume, NULL);
 
 	exit(0);
 }
 /* end main */
 
 /* include producer */
 void *
 produce(void *arg)
 {
 	for ( ; ; ) {
 		Pthread_mutex_lock(&shared.mutex);
 		if (shared.nput >= nitems) {
 			Pthread_mutex_unlock(&shared.mutex);
 			return(NULL);		/* array is full, we're done */
 		}
 		shared.buff[shared.nput] = shared.nval;
 		shared.nput++;
 		shared.nval++;
 		Pthread_mutex_unlock(&shared.mutex);
 		*((int *) arg) += 1;
 	}
 }
 
 void *
 consume(void *arg)
 {
 	int		i;
 
 	for (i = 0; i < nitems; i++) {
 		if (shared.buff[i] != i)
 			printf("buff[%d] = %d\n", i, shared.buff[i]);
 	}
 	return(NULL);
 }
 /* end producer */
 
 
Глобальные переменные мы обьединяем в структуру shared вместе с мьютексом . Первый аргумент командной строки - это размерность массива , второй - число создаваемых потоков . Каждый поток вызывает функцию produce . id-шники потоков хранятся в массиве tid_produce. Критическая область кода - это функция produce , которую мы блокируем с помощью
       
 	Pthread_mutex_lock(&shared.mutex);
 	...
 	Pthread_mutex_unlock(&shared.mutex);
 
Запустив эту команду , мы должны получить результат что-то типа :
       
 >> prodcons2 1000000 5
 count[0]=123456
 count[1]=4456
 count[2]=56345
 count[3]=456
 count[4]=1256
 
Т.е. запущено 5 потоков , которые в сумме породили массив на 1000000 элементов , причем каждый успел заполнить различное количество элементов массива. Если мы закомментируем блокировку в функции produce , то все элементы будут созданы первым потоком .

Теперь изменим предыдущий пример , запустив потребителя сразу же после запуска всех производителей. Это даст возможность потребителю сразу обрабатывать данные по мере их поступления. Для этого нужно синхронизировать данные .

       
 //mutex/prodcons3.c
 
 int main(int argc, char **argv)
 {
 	int			i, nthreads, count[MAXNTHREADS];
 	pthread_t	tid_produce[MAXNTHREADS], tid_consume;
 
 	if (argc != 3)
 		err_quit("usage: prodcons3 <#items> <#threads>");
 	nitems = min(atoi(argv[1]), MAXNITEMS);
 	nthreads = min(atoi(argv[2]), MAXNTHREADS);
 
 		/* 4create all producers and one consumer */
 	Set_concurrency(nthreads + 1);
 	for (i = 0; i < nthreads; i++) {
 		count[i] = 0;
 		Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
 	}
 	Pthread_create(&tid_consume, NULL, consume, NULL);
 
 		/* 4wait for all producers and the consumer */
 	for (i = 0; i < nthreads; i++) {
 		Pthread_join(tid_produce[i], NULL);
 		printf("count[%d] = %d\n", i, count[i]);	
 	}
 	Pthread_join(tid_consume, NULL);
 
 	exit(0);
 }
 
 
Меняется функция consume , которая вызывает новую функцию consume_wait :

 void
 consume_wait(int i)
 {
 	for ( ; ; ) {
 		Pthread_mutex_lock(&shared.mutex);
 		if (i < shared.nput) {
 			Pthread_mutex_unlock(&shared.mutex);
 			return;			/* an item is ready */
 		}
 		Pthread_mutex_unlock(&shared.mutex);
 	}
 }
 
 void *
 consume(void *arg)
 {
 	int		i;
 
 	for (i = 0; i < nitems; i++) {
 		consume_wait(i);
 		if (shared.buff[i] != i)
 			printf("buff[%d] = %d\n", i, shared.buff[i]);
 	}
 	return(NULL);
 }
 /* end consume */
      
 
Функция consume_wait ждет , пока производители не создадут 1-й элемент . Для этого блокируется мьютекс и проверяется индекс производителя nput . Это цикл проверки называется опросом (spinning или polling) и является по сути лишней тратой времени процессора . Было бы лучше использовать какое-то другое средство для проверки , которое происходило бы при наступлении определенного события .

Мьютекс используется для блокировки , а условная переменная - для ожидания . Условная переменная представляет из себя тип pthread_cond_t , и для работы с ней есть 2 функции :

       
 	int pthread_cond_wait(pthread_cond_t *cptr , pthread_mutex_t *mptr)
 	int pthread_cond_signal(pthread_cond_t *cptr)
 
  В первой функции оба параметра являются обязательными .
 
Мы в очередной раз переписываем предыдущий пример. Переменные nput и nval ассоциируются с мьютексом , и мы их обьединим в структуру put . В другой структуре , nready , содержутся счетчик , мьютекс и условная переменная . Условная переменная инициируется с помощью PTHREAD_COND_INITIALIZER .
       
 //mutex/prodcons6.c
 
 #define	MAXNITEMS 		1000000
 #define	MAXNTHREADS			100
 
 		/* globals shared by threads */
 int		nitems;				/* read-only by producer and consumer */
 int		buff[MAXNITEMS];
 struct {
   pthread_mutex_t	mutex;
   int				nput;	/* next index to store */
   int				nval;	/* next value to store */
 } put = { PTHREAD_MUTEX_INITIALIZER };
 
 struct {
   pthread_mutex_t	mutex;
   pthread_cond_t	cond;
   int				nready;	/* number ready for consumer */
 } nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
 
 
Функции produce и consume :
       
 void *
 produce(void *arg)
 {
 	for ( ; ; ) {
 		Pthread_mutex_lock(&put.mutex);
 		if (put.nput >= nitems) {
 			Pthread_mutex_unlock(&put.mutex);
 			return(NULL);		/* array is full, we're done */
 		}
 		buff[put.nput] = put.nval;
 		put.nput++;
 		put.nval++;
 		Pthread_mutex_unlock(&put.mutex);
 
 		Pthread_mutex_lock(&nready.mutex);
 		if (nready.nready == 0)
 			Pthread_cond_signal(&nready.cond);
 		nready.nready++;
 		Pthread_mutex_unlock(&nready.mutex);
 
 		*((int *) arg) += 1;
 	}
 }
 
 void *
 consume(void *arg)
 {
 	int		i;
 
 	for (i = 0; i < nitems; i++) {
 		Pthread_mutex_lock(&nready.mutex);
 		while (nready.nready == 0)
 			Pthread_cond_wait(&nready.cond, &nready.mutex);
 		nready.nready--;
 		Pthread_mutex_unlock(&nready.mutex);
 
 		if (buff[i] != i)
 			printf("buff[%d] = %d\n", i, buff[i]);
 	}
 	return(NULL);
 }
 
Для блокировки теперь используется put.mutex . nready.nready - это счетчик , в котором хранися число элементов , готовых для обработки . Перед его увеличением мы проверяем , не было ли оно нулевым , и если да , то вызывается функция pthread_cond_signal , позволяющая возобновит работу потребителя , который ждет , когда эта переменная станет болше нуля. Этот счетчик используется совместно потребителем и производителями , поэтому доступ к нему осуществляется с помощью мьютекса .

Потребитель просто ждет , когда значение счетчика nready.nready станет больше нуля . Если его значение равно нулю , мы вызываем pthread_cond_wait , при этом выполняются два атомарных процесса :

       
 	1 разблокировка nready.mutex
 	2 поток приостанавливается , пока другой поток не вызовет pthread_cond_signal 
 
При выходе из pthread_cond_wait блокируется nready.mutex. Если тут мы обнаруживаем , что счетчик больше нуля , мы обнуляем его , зная при этом , что мьютекс заблокирован , и разблокируем мьютекс .

Условный код , передающий сигнал условной переменой , выглядит так :

       
 	...	
 	Pthread_mutex_lock
 	установка истинного значения условия
 	Pthread_cond_signal
 	Pthread_mutex_unlock
 
Условный код , проверяющий условие и приостанавливающий процесс , если условие не выполняется , выглядит так :
       
 	Pthread_mutex_lock
 	while(условие ложно)	
 		Pthread_cond_wait
 	изменение условия
 	Pthread_mutex_unlock
 
Можно воспользоваться функцией pthread_cond_broadcast для пробуждения всех процессов , заблокированных в ожидании сигнала данной условной переменной .
       
  int pthread_cond_broadcast(pthread_cond_t * ptr)
  int pthread_cond_timedwait(pthread_cond_t * ptr, pthread_mutex_t *mptr , ...)
 
  В случае успеха обе возвращают 0
 
pthread_cond_timedwait позволяет установить ограничение на время блокировки процесса в абсолютном формате, т.е. число секунд с 1970 года .

Инициализировать мьютексы и условные переменные можно с помощью других функций :

       
 	int pthread_mutex_init
 	int pthread_mutex_destroy
 	int pthread_cond_init
 	int pthread_cond_destroy
 	
 Все возвращают 0 в случае успеха
 
Атрибуты имеют тип pthread_mutexattr_t и pthread_condattr_t соответственно , для их инициализации и удаления есть свой набор функций . После инициализации их можно изменить с помощью другого набора специальных функций .

Следующий код показывает , как надо проинициализировать мьютекс , чтобы его можно было использовать нескольким процессам :

       
 	pthread_mutex_t *mptr;
 	pthread_mutexattr_t mattr;
 	mprt = pthread_mutexattr_init(&mattr);
 	pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED)
 	pthread_mutex_init(mptr,&mattr); 
 
Когда мьютекс используется совместно несколькими процессами , всегда есть возможность , что процесс будет завершен во время работы с заблокированным ресурсом . И нет способа заставить систему автоматически снять эту блокировку . Единственный тип блокировки , снимаемый автоматически - блокировка fcntl . При использовании семафоров System V можно указать ядру , следует ли автоматом снимать блокировку .

Т.о. мьютексы и взаимные исключения могут инициализироваться как статически , так и динамически . Динамическая инициализация позволяет указывать дополнительные атрибуты .

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье