Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
iakovlev.org

Глава 23 : Многопоточность

Код данной главы лежит тут

Использование fork для клонирования процессов имеет свои недостатки : стоимость этой операции достаточно высока , и обмен данными между родителем и потомком в этом случае - нетривиальная задача , решаемая с помощью IPC.

Программные потоки - threads - иногда называются облегченными процессами - lightweight processes. Для его создания требуется на порядок меньше времени. Все потоки внутри процесса разделяют :

  
 	глобальные переменные 
 	данные
 	открытые дескрипторы файлов
 	обработчики сигналов
 	текущий рабочий каталог
 
У каждого потока имеется собственные :
  
 	идентификатор
 	стек
 	приоритет
 
Потоки создаются функцией
  
 	int pthread_create (pthread_t * t , const pthread_attr_t * attr , void *arg)
 
 	возвращает 0 в случае успеха
 
При создании потока мы должны указать , какую функцию он будет выполнять . Выполнение потока начинается с нее , а завершение - либо явно с помощью pthread_exit , либо неявно при выходе из этой функции .

Функция pthread_join выполняет аналогичную роль , что и waitpid для fork :

  
 	int pthread_join( pthread_t t , void status)
 
 	возвращает 0 в случае успеха
 
Узнать свой идентификатор потока может с помощью
  
  	pthread_t pthread_self()
 
 	это аналог getpid для процессов
 
Поток может быть либо присоединенным (joinable) , либо отсоединенным (detached). В первом случае поток после своего завершения остается в подвешенном состоянии и не освобождает своих ресурсов до тех пор , пока не будет вызвана для него pthread_join(). Во втором случае поток сам все освобождает .

Функция pthread_detach меняет состояние потока , превращая его из присоединенного в отсоединенный . Поток сам может вызвать эту функцию

  
 	pthread_detach(pthread_self())	
 
Одним из способов завершения является функция pthread_exit(void * status).

Теперь мы перепишем эхо-сервер , приведенный в главе 5 . Для каждого клиента вместо процесса будет создаваться поток .

  
 //threads/tcpserv01.c
 
 int main(int argc, char **argv)
 {
 	int				listenfd, connfd;
 	socklen_t		addrlen, len;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: tcpserv01 [  ] ");
 
 	cliaddr = Malloc(addrlen);
 
 	for ( ; ; ) {
 		len = addrlen;
 		connfd = Accept(listenfd, cliaddr, &len);
 
 		Pthread_create(NULL, NULL, &doit, (void *) connfd);
 	}
 }
 
 static void *
 doit(void *arg)
 {
 	Pthread_detach(pthread_self());
 	str_echo((int) arg);	/* same function as before */
 	Close((int) arg);		/* we are done with connected socket */
 	return(NULL);
 }
 
 
Когда accept возвращает управление , мы создаем поток и передаем в функцию потока doit дескриптор присоединенного сокета . Внутри потка мы вызываем Pthread_detach и делаем поток отсоединенным . Мы обязаны в ней закрыть сокет с помощью close() , потому что за поток этого никто не сделает .

В коде родительского процесса есть одна проблема : дескриптор сокета connfd не синхронизирован , и может возникнуть ситуацию , когда один и тот же дескриптор будет передан двум разным потокам . Для этого мы перепишем функцию main :

  
 //threads/tcpserv02.c
 
 int main(int argc, char **argv)
 {
 	int				listenfd, *iptr;
 	socklen_t		addrlen, len;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: tcpserv01 [  ] ");
 
 	cliaddr = Malloc(addrlen);
 
 	for ( ; ; ) {
 		len = addrlen;
 		iptr = Malloc(sizeof(int));
 		*iptr = Accept(listenfd, cliaddr, &len);
 
 		Pthread_create(NULL, NULL, &doit, iptr);
 	}
 }
 
 static void *
 doit(void *arg)
 {
 	int		connfd;
 
 	connfd = *((int *) arg);
 	free(arg);
 
 	Pthread_detach(pthread_self());
 	str_echo(connfd);		/* same function as before */
 	Close(connfd);			/* we are done with connected socket */
 	return(NULL);
 }
 
Каждый раз перед вызовом accept мы выделяем память для дескриптора , каждый поток получает свой дескриптор и освобождает память .

В параллельном программировании существует проблема совместного доступа разных потоков к глобальным данным , которые не синхронизированы . В следующем примере создаются 2 потока , которые по очереди делают инкремент одной глобальной переменной. Это пример того , как НЕ НАДО писать :

  
 #define	NLOOP 5000
 
 int				counter;		/* this is incremented by the threads */
 
 void	*doit(void *);
 
 int
 main(int argc, char **argv)
 {
 	pthread_t	tidA, tidB;
 
 	Pthread_create(&tidA, NULL, &doit, NULL);
 	Pthread_create(&tidB, NULL, &doit, NULL);
 
 	Pthread_join(tidA, NULL);
 	Pthread_join(tidB, NULL);
 
 	exit(0);
 }
 
 void *
 doit(void *vptr)
 {
 	int		i, val;
 
 	for (i = 0; i < NLOOP; i++) {
 		val = counter;
 		printf("%d: %d\n", pthread_self(), val + 1);
 		counter = val + 1;
 	}
 
 	return(NULL);
 }
 
 
Правильным является следующий подход , когда мы используем мьютекс .
  
 
 pthread_mutex_t	counter_mutex = PTHREAD_MUTEX_INITIALIZER;
 ...
 
 	for (i = 0; i < NLOOP; i++) {
 		Pthread_mutex_lock(&counter_mutex);
 
 		val = counter;
 		printf("%d: %d\n", pthread_self(), val + 1);
 		counter = val + 1;
 
 		Pthread_mutex_unlock(&counter_mutex);
 	}
 
Код внутри мьютекса гарантированно блокируется от доступа другим процессам .

Но это решение неоптимально , поскольку тратится дополнительное время процессора на проверку . Более правильным решением является использование условной переменной (conditional variable) в комбинации с мьютексом .

Условная переменная - это переменная типа pthread_cond_t , которая используется в 2-х функциях :

  
 	int pthread_cond_wait   ( pthread_cond_t *ptr , pthread_mutex_t * m)
 	int pthread_cond_signal ( pthread_cond_t *ptr , pthread_mutex_t * m)
 
 	обе возвращают 0 в случае успеха
 
Пример использования комбинации мютекс + условная переменная :

  
 #define	Pthread_mutex_lock(mptr) \
 	{	int  n; \
 		if ( (n = pthread_mutex_lock(mptr)) != 0) \
 			{ errno = n; err_sys("pthread_mutex_lock error"); } \
 	}
 #define	Pthread_mutex_unlock(mptr) \
 	{	int  n; \
 		if ( (n = pthread_mutex_unlock(mptr)) != 0) \
 			{ errno = n; err_sys("pthread_mutex_unlock error"); } \
 	}
 #define	Pthread_cond_wait(cptr,mptr) \
 	{	int  n; \
 		if ( (n = pthread_cond_wait(cptr,mptr)) != 0) \
 			{ errno = n; err_sys("pthread_cond_wait error"); } \
 	}
 #define	Pthread_cond_signal(cptr) \
 	{	int  n; \
 		if ( (n = pthread_cond_signal(cptr)) != 0) \
 			{ errno = n; err_sys("pthread_cond_signal error"); } \
 	}
 
 #define	NLOOP	   	 50
 #define	BUFFSIZE	 10
 
 struct buf_t {
   int		b_buf[BUFFSIZE];	/* the buffer which contains integer items */
   int		b_nitems;			/* #items currently in buffer */
   int		b_nextget;
   int		b_nextput;
   pthread_mutex_t	b_mutex;
   pthread_cond_t	b_cond_consumer;	/* consumer waiting to get */
   pthread_cond_t	b_cond_producer;	/* producer waiting to put */
 } buf_t;
 
 void	*produce_loop(void *);
 void	*consume_loop(void *);
 
 int
 main(int argc, char **argv)
 {
 	int			n;
 	pthread_t	tidA, tidB;
 
 	printf("main, addr(stack) = %x, addr(global) = %x, addr(func) = %x\n",
 			&n, &buf_t, &produce_loop);
 	if ( (n = pthread_create(&tidA, NULL, &produce_loop, NULL)) != 0)
 		errno = n, err_sys("pthread_create error for A");
 	if ( (n = pthread_create(&tidB, NULL, &consume_loop, NULL)) != 0)
 		errno = n, err_sys("pthread_create error for B");
 
 		/* wait for both threads to terminate */
 	if ( (n = pthread_join(tidA, NULL)) != 0)
 		errno = n, err_sys("pthread_join error for A");
 	if ( (n = pthread_join(tidB, NULL)) != 0)
 		errno = n, err_sys("pthread_join error for B");
 
 	exit(0);
 }
 
 void
 produce(struct buf_t *bptr, int val)
 {
 	Pthread_mutex_lock(&bptr->b_mutex);
 		/* Wait if buffer is full */
 	while (bptr->b_nitems >= BUFFSIZE)
 		Pthread_cond_wait(&bptr->b_cond_producer, &bptr->b_mutex);
 
 		/* There is room, store the new value */
 	printf("produce %d\n", val);
 	bptr->b_buf[bptr->b_nextput] = val;
 	if (++bptr->b_nextput >= BUFFSIZE)
 		bptr->b_nextput = 0;
 	bptr->b_nitems++;
 
 		/* Signal consumer */
 	Pthread_cond_signal(&bptr->b_cond_consumer);
 	Pthread_mutex_unlock(&bptr->b_mutex);
 }
 
 int
 consume(struct buf_t *bptr)
 {
 	int		val;
 
 	Pthread_mutex_lock(&bptr->b_mutex);
 		/* Wait if buffer is empty */
 	while (bptr->b_nitems <= 0)
 		Pthread_cond_wait(&bptr->b_cond_consumer, &bptr->b_mutex);
 
 		/* There is data, fetch the value */
 	val = bptr->b_buf[bptr->b_nextget];
 	printf("consume %d\n", val);
 	if (++bptr->b_nextget >= BUFFSIZE)
 		bptr->b_nextget = 0;
 	bptr->b_nitems--;
 
 		/* Signal producer; it might be waiting for space to store */
 	Pthread_cond_signal(&bptr->b_cond_producer);
 	Pthread_mutex_unlock(&bptr->b_mutex);
 
 	return(val);
 }
 
 void *
 produce_loop(void *vptr)
 {
 	int		i;
 
 	printf("produce_loop thread, addr(stack) = %x\n", &i);
 	for (i = 0; i < NLOOP; i++) {
 		produce(&buf_t, i);
 	}
 
 	return(NULL);
 }
 
 void *
 consume_loop(void *vptr)
 {
 	int		i, val;
 
 	printf("consume_loop thread, addr(stack) = %x\n", &i);
 	for (i = 0; i < NLOOP; i++) {
 		val = consume(&buf_t);
 	}
 
 	return(NULL);
 }
 
 

Яковлев С: По мотивам этой главы я написал тестовое приложение , которое выполняет следующую работу в модели "producer-consumer":

 1 Собираем утилиту и запускаем ее с 2-мя параметрами ,
    первый параметр - размер линейного списка , второй параметр - число потоков
 2 Создаем 2 массива потоков , которые параллельно начинают "насиловать" эту очередь :
 		добавлять в нее элементы и забирать их из нее .  
   Запустить можно например так :
     prodcons 100000 10 
 
 Код главной функции :
 
 int main(int argc, char **argv)
 {
 	int			i, nthreads, count[MAXNTHREADS] , count2[MAXNTHREADS];
 	pthread_t	tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
 
 	List = NULL ;
 
 	if (argc != 3)
 	{
 		printf("usage: prodcons <#items> <#threads>\n");
 		exit(0);
 	}
 	nitems = atoi(argv[1]);
 	nthreads = atoi(argv[2]);
 
 	for (i = 0; i < nthreads; i++) 
 	{
 		count[i] = 0;
 		pthread_create(&tid_produce[i], NULL, produce, &count[i]);
 		count2[i] = 0;
 		pthread_create(&tid_consume[i], NULL, consume, &count2[i]);
 	}
 
 	for (i = 0; i < nthreads; i++) 
 	{
 		pthread_join(tid_produce[i], NULL);
 		pthread_join(tid_consume[i], NULL);
 		printf("count[%d] = %d  count2[%d] = %d\n", i, count[i], i, count2[i]);	
 	}
 
 	printf("producer_sum =%d  consumer_sum =%d\n" , put.producer_sum , get.consumer_sum);  // results in len == 3
 	printf("producer counter =%d  consumer counter =%d\n" , put.count , get.count);  // results in len == 3
     printf("queue length =%d\n",Length("List",List));  // results in len == 3
 
 	exit(0);
 }
 
 Исходники prodcons
 
 

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье