Глава 23 : Многопоточность
Код данной главы лежит тут
Использование fork для клонирования процессов имеет свои недостатки :
стоимость этой операции достаточно высока , и обмен данными между родителем и потомком
в этом случае - нетривиальная задача , решаемая с помощью IPC.
Программные потоки - threads - иногда называются облегченными процессами - lightweight processes.
Для его создания требуется на порядок меньше времени.
Все потоки внутри процесса разделяют :
глобальные переменные
данные
открытые дескрипторы файлов
обработчики сигналов
текущий рабочий каталог
У каждого потока имеется собственные :
идентификатор
стек
приоритет
Потоки создаются функцией
int pthread_create (pthread_t * t , const pthread_attr_t * attr , void *arg)
возвращает 0 в случае успеха
При создании потока мы должны указать , какую функцию он будет выполнять .
Выполнение потока начинается с нее , а завершение - либо явно с помощью pthread_exit ,
либо неявно при выходе из этой функции .
Функция pthread_join выполняет аналогичную роль , что и waitpid для fork :
int pthread_join( pthread_t t , void status)
возвращает 0 в случае успеха
Узнать свой идентификатор потока может с помощью
pthread_t pthread_self()
это аналог getpid для процессов
Поток может быть либо присоединенным (joinable) , либо отсоединенным (detached).
В первом случае поток после своего завершения остается в подвешенном состоянии и не освобождает
своих ресурсов до тех пор , пока не будет вызвана для него pthread_join().
Во втором случае поток сам все освобождает .
Функция pthread_detach меняет состояние потока , превращая его из присоединенного
в отсоединенный . Поток сам может вызвать эту функцию
pthread_detach(pthread_self())
Одним из способов завершения является функция pthread_exit(void * status).
Теперь мы перепишем эхо-сервер , приведенный в главе 5 .
Для каждого клиента вместо процесса будет создаваться поток .
//threads/tcpserv01.c
int main(int argc, char **argv)
{
int listenfd, connfd;
socklen_t addrlen, len;
struct sockaddr *cliaddr;
if (argc == 2)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 3)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: tcpserv01 [ ] ");
cliaddr = Malloc(addrlen);
for ( ; ; ) {
len = addrlen;
connfd = Accept(listenfd, cliaddr, &len);
Pthread_create(NULL, NULL, &doit, (void *) connfd);
}
}
static void *
doit(void *arg)
{
Pthread_detach(pthread_self());
str_echo((int) arg); /* same function as before */
Close((int) arg); /* we are done with connected socket */
return(NULL);
}
Когда accept возвращает управление , мы создаем поток и передаем в функцию потока doit
дескриптор присоединенного сокета . Внутри потка мы вызываем Pthread_detach и делаем поток
отсоединенным . Мы обязаны в ней закрыть сокет с помощью close() , потому что за поток
этого никто не сделает .
В коде родительского процесса есть одна проблема : дескриптор сокета connfd не синхронизирован ,
и может возникнуть ситуацию , когда один и тот же дескриптор будет передан двум разным потокам .
Для этого мы перепишем функцию main :
//threads/tcpserv02.c
int main(int argc, char **argv)
{
int listenfd, *iptr;
socklen_t addrlen, len;
struct sockaddr *cliaddr;
if (argc == 2)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 3)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: tcpserv01 [ ] ");
cliaddr = Malloc(addrlen);
for ( ; ; ) {
len = addrlen;
iptr = Malloc(sizeof(int));
*iptr = Accept(listenfd, cliaddr, &len);
Pthread_create(NULL, NULL, &doit, iptr);
}
}
static void *
doit(void *arg)
{
int connfd;
connfd = *((int *) arg);
free(arg);
Pthread_detach(pthread_self());
str_echo(connfd); /* same function as before */
Close(connfd); /* we are done with connected socket */
return(NULL);
}
Каждый раз перед вызовом accept мы выделяем память для дескриптора , каждый поток получает
свой дескриптор и освобождает память .
В параллельном программировании существует проблема совместного доступа разных потоков к глобальным данным ,
которые не синхронизированы .
В следующем примере создаются 2 потока , которые по очереди делают инкремент одной глобальной переменной.
Это пример того , как НЕ НАДО писать :
#define NLOOP 5000
int counter; /* this is incremented by the threads */
void *doit(void *);
int
main(int argc, char **argv)
{
pthread_t tidA, tidB;
Pthread_create(&tidA, NULL, &doit, NULL);
Pthread_create(&tidB, NULL, &doit, NULL);
Pthread_join(tidA, NULL);
Pthread_join(tidB, NULL);
exit(0);
}
void *
doit(void *vptr)
{
int i, val;
for (i = 0; i < NLOOP; i++) {
val = counter;
printf("%d: %d\n", pthread_self(), val + 1);
counter = val + 1;
}
return(NULL);
}
Правильным является следующий подход , когда мы используем мьютекс .
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
...
for (i = 0; i < NLOOP; i++) {
Pthread_mutex_lock(&counter_mutex);
val = counter;
printf("%d: %d\n", pthread_self(), val + 1);
counter = val + 1;
Pthread_mutex_unlock(&counter_mutex);
}
Код внутри мьютекса гарантированно блокируется от доступа другим процессам .
Но это решение неоптимально , поскольку тратится дополнительное время процессора на проверку .
Более правильным решением является использование условной переменной (conditional variable)
в комбинации с мьютексом .
Условная переменная - это переменная типа pthread_cond_t , которая используется в 2-х функциях :
int pthread_cond_wait ( pthread_cond_t *ptr , pthread_mutex_t * m)
int pthread_cond_signal ( pthread_cond_t *ptr , pthread_mutex_t * m)
обе возвращают 0 в случае успеха
Пример использования комбинации мютекс + условная переменная :
#define Pthread_mutex_lock(mptr) \
{ int n; \
if ( (n = pthread_mutex_lock(mptr)) != 0) \
{ errno = n; err_sys("pthread_mutex_lock error"); } \
}
#define Pthread_mutex_unlock(mptr) \
{ int n; \
if ( (n = pthread_mutex_unlock(mptr)) != 0) \
{ errno = n; err_sys("pthread_mutex_unlock error"); } \
}
#define Pthread_cond_wait(cptr,mptr) \
{ int n; \
if ( (n = pthread_cond_wait(cptr,mptr)) != 0) \
{ errno = n; err_sys("pthread_cond_wait error"); } \
}
#define Pthread_cond_signal(cptr) \
{ int n; \
if ( (n = pthread_cond_signal(cptr)) != 0) \
{ errno = n; err_sys("pthread_cond_signal error"); } \
}
#define NLOOP 50
#define BUFFSIZE 10
struct buf_t {
int b_buf[BUFFSIZE]; /* the buffer which contains integer items */
int b_nitems; /* #items currently in buffer */
int b_nextget;
int b_nextput;
pthread_mutex_t b_mutex;
pthread_cond_t b_cond_consumer; /* consumer waiting to get */
pthread_cond_t b_cond_producer; /* producer waiting to put */
} buf_t;
void *produce_loop(void *);
void *consume_loop(void *);
int
main(int argc, char **argv)
{
int n;
pthread_t tidA, tidB;
printf("main, addr(stack) = %x, addr(global) = %x, addr(func) = %x\n",
&n, &buf_t, &produce_loop);
if ( (n = pthread_create(&tidA, NULL, &produce_loop, NULL)) != 0)
errno = n, err_sys("pthread_create error for A");
if ( (n = pthread_create(&tidB, NULL, &consume_loop, NULL)) != 0)
errno = n, err_sys("pthread_create error for B");
/* wait for both threads to terminate */
if ( (n = pthread_join(tidA, NULL)) != 0)
errno = n, err_sys("pthread_join error for A");
if ( (n = pthread_join(tidB, NULL)) != 0)
errno = n, err_sys("pthread_join error for B");
exit(0);
}
void
produce(struct buf_t *bptr, int val)
{
Pthread_mutex_lock(&bptr->b_mutex);
/* Wait if buffer is full */
while (bptr->b_nitems >= BUFFSIZE)
Pthread_cond_wait(&bptr->b_cond_producer, &bptr->b_mutex);
/* There is room, store the new value */
printf("produce %d\n", val);
bptr->b_buf[bptr->b_nextput] = val;
if (++bptr->b_nextput >= BUFFSIZE)
bptr->b_nextput = 0;
bptr->b_nitems++;
/* Signal consumer */
Pthread_cond_signal(&bptr->b_cond_consumer);
Pthread_mutex_unlock(&bptr->b_mutex);
}
int
consume(struct buf_t *bptr)
{
int val;
Pthread_mutex_lock(&bptr->b_mutex);
/* Wait if buffer is empty */
while (bptr->b_nitems <= 0)
Pthread_cond_wait(&bptr->b_cond_consumer, &bptr->b_mutex);
/* There is data, fetch the value */
val = bptr->b_buf[bptr->b_nextget];
printf("consume %d\n", val);
if (++bptr->b_nextget >= BUFFSIZE)
bptr->b_nextget = 0;
bptr->b_nitems--;
/* Signal producer; it might be waiting for space to store */
Pthread_cond_signal(&bptr->b_cond_producer);
Pthread_mutex_unlock(&bptr->b_mutex);
return(val);
}
void *
produce_loop(void *vptr)
{
int i;
printf("produce_loop thread, addr(stack) = %x\n", &i);
for (i = 0; i < NLOOP; i++) {
produce(&buf_t, i);
}
return(NULL);
}
void *
consume_loop(void *vptr)
{
int i, val;
printf("consume_loop thread, addr(stack) = %x\n", &i);
for (i = 0; i < NLOOP; i++) {
val = consume(&buf_t);
}
return(NULL);
}
Яковлев С: По мотивам этой главы я написал тестовое приложение , которое выполняет
следующую работу в модели "producer-consumer":
1 Собираем утилиту и запускаем ее с 2-мя параметрами ,
первый параметр - размер линейного списка , второй параметр - число потоков
2 Создаем 2 массива потоков , которые параллельно начинают "насиловать" эту очередь :
добавлять в нее элементы и забирать их из нее .
Запустить можно например так :
prodcons 100000 10
Код главной функции :
int main(int argc, char **argv)
{
int i, nthreads, count[MAXNTHREADS] , count2[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
List = NULL ;
if (argc != 3)
{
printf("usage: prodcons <#items> <#threads>\n");
exit(0);
}
nitems = atoi(argv[1]);
nthreads = atoi(argv[2]);
for (i = 0; i < nthreads; i++)
{
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
count2[i] = 0;
pthread_create(&tid_consume[i], NULL, consume, &count2[i]);
}
for (i = 0; i < nthreads; i++)
{
pthread_join(tid_produce[i], NULL);
pthread_join(tid_consume[i], NULL);
printf("count[%d] = %d count2[%d] = %d\n", i, count[i], i, count2[i]);
}
printf("producer_sum =%d consumer_sum =%d\n" , put.producer_sum , get.consumer_sum); // results in len == 3
printf("producer counter =%d consumer counter =%d\n" , put.count , get.count); // results in len == 3
printf("queue length =%d\n",Length("List",List)); // results in len == 3
exit(0);
}
Исходники prodcons
|