Глава 27 : Клиент-сервер
Код данной главы лежит тут
Серверную архитектуру можно разбить на следующие направления :
1 Последовательные серверы - в текущий момент может обрабатывать одного клиента ,
остальные стоят в очереди
2 Параллельные серверы на основе fork - традиционное большинство юниксовых серверов
3 Параллельные серверы на основе select
4 Параллельные серверы на основе потоков
В этой главе рассматриваются еще 2 разновидности параллельных серверов :
1 Предварительное создание дочерних процессов - preforking . При запуске сервера создается
пул дочерних процессов
2 Предварительное создание дочерних потоков - prethreading . При запуске сервера создается
пул дочерних процессов
Сравнительные характеристики различных серверных архитектур можно свести в таблице :
|
Для всех серверов будет использоваться одна версия клиента -
//server/client.c
#define MAXN 16384 /* max #bytes to request from server */
int
main(int argc, char **argv)
{
int i, j, fd, nchildren, nloops, nbytes;
pid_t pid;
ssize_t n;
char request[MAXLINE], reply[MAXN];
if (argc != 6)
err_quit("usage: client < hostname or IPaddr> <#children> "
"<#loops/child> <#bytes/request>");
nchildren = atoi(argv[3]);
nloops = atoi(argv[4]);
nbytes = atoi(argv[5]);
snprintf(request, sizeof(request), "%d\n", nbytes); /* newline at end */
for (i = 0; i < nchildren; i++) {
if ( (pid = Fork()) == 0) { /* child */
for (j = 0; j < nloops; j++) {
fd = Tcp_connect(argv[1], argv[2]);
Write(fd, request, strlen(request));
if ( (n = Readn(fd, reply, nbytes)) != nbytes)
err_quit("server returned %d bytes", n);
Close(fd); /* TIME_WAIT on client, not server */
}
printf("child %d done\n", i);
exit(0);
}
/* parent loops around to fork() again */
}
while (wait(NULL) > 0) /* now parent waits for all children */
;
if (errno != ECHILD)
err_sys("wait error");
exit(0);
}
Каждый раз при запуске клиента мы задаем ip , порт сервера , число дочерних процессов ,
порождаемых с помощью fork , число запросов , которое каждый дочерний процесс будет посылать серверу ,
и количество байт , отсылаемых сервером. Родительский процесс клиента форкает дочерние процессы ,
и каждый из них порождает коннект с сервером .
Потом клиент закрывает каждое соединение , при этом TIME_WAIT имеет место на его стороне.
Формат запуска клиента из командной строки :
# client < hostname or IPaddr> < port> < #children> < #loops/child> < #bytes/request>
Рассмотрим параллельный сервер с одним дочерним процессом для каждого клиента .
//server/serv01.c
int
main(int argc, char **argv)
{
int listenfd, connfd;
pid_t childpid;
void sig_chld(int), sig_int(int), web_child(int);
socklen_t clilen, addrlen;
struct sockaddr *cliaddr;
if (argc == 2)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 3)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv01 [ < host> ] < port#>");
cliaddr = Malloc(addrlen);
Signal(SIGCHLD, sig_chld);
Signal(SIGINT, sig_int);
for ( ; ; ) {
clilen = addrlen;
if ( (connfd = accept(listenfd, cliaddr, &clilen)) < 0) {
if (errno == EINTR)
continue; /* back to for() */
else
err_sys("accept error");
}
if ( (childpid = Fork()) == 0) { /* child process */
Close(listenfd); /* close listening socket */
web_child(connfd); /* process the request */
exit(0);
}
Close(connfd); /* parent closes connected socket */
}
}
/* end serv01 */
/* include sigint */
void
sig_int(int signo)
{
void pr_cpu_time(void);
pr_cpu_time();
exit(0);
}
/* end sigint */
Рассмотрим вариант preforking-сервера , который работает для систем 4.4BSD .
Сервер генерирует сразу при старте необходимое количество процессов.
Если в какой-то момент их число будет исчерпано , сервер будет продолжать обслуживать клиентов ,
но процесс замедлится .
Сервер должен постоянно проверять количество свободных дочерних процессов , и когда их число становится
ниже допустимой нормы , генерировать их опять .
С другой стороны , если их слишком много , сервер должен их прибивать .
Рассмотрим первую версию такого сервера .
//server/serv02.c
static int nchildren;
static pid_t *pids;
int
main(int argc, char **argv)
{
int listenfd, i;
socklen_t addrlen;
void sig_int(int);
pid_t child_make(int, int, int);
if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv02 [ < host> ] < port#> <#children>");
nchildren = atoi(argv[argc-1]);
pids = Calloc(nchildren, sizeof(pid_t));
for (i = 0; i < nchildren; i++)
pids[i] = child_make(i, listenfd, addrlen); /* parent returns */
Signal(SIGINT, sig_int);
for ( ; ; )
pause(); /* everything done by children */
}
/* end serv02 */
/* include sigint */
void
sig_int(int signo)
{
int i;
void pr_cpu_time(void);
/* 4terminate all children */
for (i = 0; i < nchildren; i++)
kill(pids[i], SIGTERM);
while (wait(NULL) > 0) /* wait for all children */
;
if (errno != ECHILD)
err_sys("wait error");
pr_cpu_time();
exit(0);
}
/* end sigint */
Дополнительный аргумент в командной строке указывает , сколько нужно задать
дочерних процессов . Выделяется массив , в который записываются из pid-ы.
Дочерний процесс генерится с помощью child_make :
//server/child02.c
pid_t
child_make(int i, int listenfd, int addrlen)
{
pid_t pid;
void child_main(int, int, int);
if ( (pid = Fork()) > 0)
return(pid); /* parent */
child_main(i, listenfd, addrlen); /* never returns */
}
/* end child_make */
/* include child_main */
void
child_main(int i, int listenfd, int addrlen)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr;
cliaddr = Malloc(addrlen);
printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
web_child(connfd); /* process the request */
Close(connfd);
}
}
/* end child_main */
В следующей версии сервера будет реализована защита вызова функции accept
при помощи блокировки на основе функции fcntl .
В главной функции main будет добавлена функция my_lock_init перед началом цикла ,
в котором будут создаваться дочерние процессы .
В функции child_main появляется блокировка перед вызовом функции accept и снятие блокировки
после ее завершения .
my_lock_wait();
connfd = Accept(listenfd, cliaddr, &clilen);
my_lock_release();
Функция my_lock_init :
void
my_lock_init(char *pathname)
{
char lock_file[1024];
/* 4must copy caller's string, in case it's a constant */
strncpy(lock_file, pathname, sizeof(lock_file));
Mktemp(lock_file);
lock_fd = Open(lock_file, O_CREAT | O_WRONLY, FILE_MODE);
Unlink(lock_file); /* but lock_fd remains open */
lock_it.l_type = F_WRLCK;
lock_it.l_whence = SEEK_SET;
lock_it.l_start = 0;
lock_it.l_len = 0;
unlock_it.l_type = F_UNLCK;
unlock_it.l_whence = SEEK_SET;
unlock_it.l_start = 0;
unlock_it.l_len = 0;
}
/* end my_lock_init */
Создается временный файл , который хранится до тех пор , пока есть ссылки.
Инициализируются 2 структуры flock : одна для блокировки файла , другая для снятия .
Функции , устанавливающие и снимающие блокировку на файл :
/* include my_lock_wait */
void
my_lock_wait()
{
int rc;
while ( (rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < 0) {
if (errno == EINTR)
continue;
else
err_sys("fcntl error for my_lock_wait");
}
}
void
my_lock_release()
{
if (fcntl(lock_fd, F_SETLKW, &unlock_it) < 0)
err_sys("fcntl error for my_lock_release");
}
/* end my_lock_wait */
Модернизируем последнюю версию сервера и заменим блокировку на основе файловых операций
блокировкой с помощью мьютексов .
Функция my_lock_init теперь будет выглядеть так :
void
my_lock_init(char *pathname)
{
int fd;
pthread_mutexattr_t mattr;
fd = Open("/dev/zero", O_RDWR, 0);
mptr = Mmap(0, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
Close(fd);
Pthread_mutexattr_init(&mattr);
Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
Pthread_mutex_init(mptr, &mattr);
}
/* end my_lock_init */
/* include my_lock_wait */
void
my_lock_wait()
{
Pthread_mutex_lock(mptr);
}
void
my_lock_release()
{
Pthread_mutex_unlock(mptr);
}
/* end my_lock_wait */
Последней версией сервера с предварительным порождением процессов будет версия ,
в которой accept вызывается только родителем , и который передает присоединенный сокет потомку .
Создадим структуру , содержащую информацию о дочернем процессе :
typedef struct {
pid_t child_pid; /* process ID */
int child_pipefd; /* parent's stream pipe to/from child */
int child_status; /* 0 = ready */
long child_count; /* #connections handled */
} Child;
Функция child_make :
pid_t
child_make(int i, int listenfd, int addrlen)
{
int sockfd[2];
pid_t pid;
void child_main(int, int, int);
Socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
if ( (pid = Fork()) > 0) {
Close(sockfd[1]);
cptr[i].child_pid = pid;
cptr[i].child_pipefd = sockfd[0];
cptr[i].child_status = 0;
return(pid); /* parent */
}
Dup2(sockfd[1], STDERR_FILENO); /* child's stream pipe to parent */
Close(sockfd[0]);
Close(sockfd[1]);
Close(listenfd); /* child does not need this open */
child_main(i, listenfd, addrlen); /* never returns */
}
Функция main :
//server/serv05.c
static int nchildren;
int
main(int argc, char **argv)
{
int listenfd, i, navail, maxfd, nsel, connfd, rc;
void sig_int(int);
pid_t child_make(int, int, int);
ssize_t n;
fd_set rset, masterset;
socklen_t addrlen, clilen;
struct sockaddr *cliaddr;
if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv05 [ ] <#children>");
FD_ZERO(&masterset);
FD_SET(listenfd, &masterset);
maxfd = listenfd;
cliaddr = Malloc(addrlen);
nchildren = atoi(argv[argc-1]);
navail = nchildren;
cptr = Calloc(nchildren, sizeof(Child));
/* 4prefork all the children */
for (i = 0; i < nchildren; i++) {
child_make(i, listenfd, addrlen); /* parent returns */
FD_SET(cptr[i].child_pipefd, &masterset);
maxfd = max(maxfd, cptr[i].child_pipefd);
}
Signal(SIGINT, sig_int);
for ( ; ; ) {
rset = masterset;
if (navail <= 0)
FD_CLR(listenfd, &rset); /* turn off if no available children */
nsel = Select(maxfd, &rset, NULL, NULL, NULL);
/* 4check for new connections */
if (FD_ISSET(listenfd, &rset)) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
for (i = 0; i < nchildren; i++)
if (cptr[i].child_status == 0)
break; /* available */
if (i == nchildren)
err_quit("no available children");
cptr[i].child_status = 1; /* mark child as busy */
cptr[i].child_count++;
navail--;
n = Write_fd(cptr[i].child_pipefd, "", 1, connfd);
Close(connfd);
if (--nsel == 0)
continue; /* all done with select() results */
}
/* 4find any newly-available children */
for (i = 0; i < nchildren; i++) {
if (FD_ISSET(cptr[i].child_pipefd, &rset)) {
if ( (n = Read(cptr[i].child_pipefd, &rc, 1)) == 0)
err_quit("child %d terminated unexpectedly", i);
cptr[i].child_status = 0;
navail++;
if (--nsel == 0)
break; /* all done with select() results */
}
}
}
}
Теперь перейдем к серверу на основе потоков : в ней содержится один поток для каждого клиента .
//server/serv06.c
int
main(int argc, char **argv)
{
int listenfd, connfd;
void sig_int(int);
void *doit(void *);
pthread_t tid;
socklen_t clilen, addrlen;
struct sockaddr *cliaddr;
if (argc == 2)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 3)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv06 [ < host> ] < port#>");
cliaddr = Malloc(addrlen);
Signal(SIGINT, sig_int);
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_create(&tid, NULL, &doit, (void *) connfd);
}
}
Основной поток блокируется в вызове функции accept , и каждый раз , когда приходит
новое клиентское соединение , pthread_create создает новый поток с функцией doit .
Потоки неприсоединенные .
В следующей версии мы напишем сервер , в котором каждый поток будет вызывать accept .
Структура , содержащая информацию о потоке :
//server/pthread07.h
typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* #connections handled */
} Thread;
Thread *tptr; /* array of Thread structures; calloc'ed */
Функция main :
//server/serv07.c
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;
int
main(int argc, char **argv)
{
int i;
void sig_int(int), thread_make(int);
if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv07 [ < host> ] < port#> <#threads>");
nthreads = atoi(argv[argc-1]);
tptr = Calloc(nthreads, sizeof(Thread));
for (i = 0; i < nthreads; i++)
thread_make(i); /* only main thread returns */
Signal(SIGINT, sig_int);
for ( ; ; )
pause(); /* everything done by threads */
}
Фугкция thread_main ;
//server/pthread07.c
void
thread_make(int i)
{
void *thread_main(void *);
Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
return; /* main thread returns */
}
void *
thread_main(void *arg)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr;
cliaddr = Malloc(addrlen);
printf("thread %d starting\n", (int) arg);
for ( ; ; ) {
clilen = addrlen;
Pthread_mutex_lock(&mlock);
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_mutex_unlock(&mlock);
tptr[(int) arg].thread_count++;
web_child(connfd); /* process the request */
Close(connfd);
}
}
Ну и наконец последняя версия сервера : главный поток создает пул потоков ,
после чего он же вызывает accept и передает клиентское соединение одному
из свободных потоков .
Структура потока :
//server/pthread08.h
typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* #connections handled */
} Thread;
Thread *tptr; /* array of Thread structures; calloc'ed */
#define MAXNCLI 32
int clifd[MAXNCLI], iget, iput;
pthread_mutex_t clifd_mutex;
pthread_cond_t clifd_cond;
Функция main :
//server/serv08.c
static int nthreads;
pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clifd_cond = PTHREAD_COND_INITIALIZER;
int
main(int argc, char **argv)
{
int i, listenfd, connfd;
void sig_int(int), thread_make(int);
socklen_t addrlen, clilen;
struct sockaddr *cliaddr;
if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv08 [ ] <#threads>");
cliaddr = Malloc(addrlen);
nthreads = atoi(argv[argc-1]);
tptr = Calloc(nthreads, sizeof(Thread));
iget = iput = 0;
/* 4create all the threads */
for (i = 0; i < nthreads; i++)
thread_make(i); /* only main thread returns */
Signal(SIGINT, sig_int);
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_mutex_lock(&clifd_mutex);
clifd[iput] = connfd;
if (++iput == MAXNCLI)
iput = 0;
if (iput == iget)
err_quit("iput = iget = %d", iput);
Pthread_cond_signal(&clifd_cond);
Pthread_mutex_unlock(&clifd_mutex);
}
}
/* end serv08 */
void
sig_int(int signo)
{
int i;
void pr_cpu_time(void);
pr_cpu_time();
for (i = 0; i < nthreads; i++)
printf("thread %d, %ld connections\n", i, tptr[i].thread_count);
exit(0);
}
Функция thread_make :
void
thread_make(int i)
{
void *thread_main(void *);
Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
return; /* main thread returns */
}
void *
thread_main(void *arg)
{
int connfd;
void web_child(int);
printf("thread %d starting\n", (int) arg);
for ( ; ; ) {
Pthread_mutex_lock(&clifd_mutex);
while (iget == iput)
Pthread_cond_wait(&clifd_cond, &clifd_mutex);
connfd = clifd[iget]; /* connected socket to service */
if (++iget == MAXNCLI)
iget = 0;
Pthread_mutex_unlock(&clifd_mutex);
tptr[(int) arg].thread_count++;
web_child(connfd); /* process the request */
Close(connfd);
}
}
|