Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
iakovlev.org

Глава 27 : Клиент-сервер

Код данной главы лежит тут

Серверную архитектуру можно разбить на следующие направления :

  
 	1 Последовательные серверы - в текущий момент может обрабатывать одного клиента ,
 		остальные стоят в очереди
 	2 Параллельные серверы на основе fork - традиционное большинство юниксовых серверов
 	3 Параллельные серверы на основе select 
 	4 Параллельные серверы на основе потоков 
 
В этой главе рассматриваются еще 2 разновидности параллельных серверов :
  
 	1 Предварительное создание дочерних процессов - preforking . При запуске сервера создается
 		пул дочерних процессов  
 	2 Предварительное создание дочерних потоков - prethreading . При запуске сервера создается
 		пул дочерних процессов
 
Сравнительные характеристики различных серверных архитектур можно свести в таблице :
Для всех серверов будет использоваться одна версия клиента -
  
 //server/client.c
 
 #define	MAXN	16384		/* max #bytes to request from server */
 
 int
 main(int argc, char **argv)
 {
 	int		i, j, fd, nchildren, nloops, nbytes;
 	pid_t	pid;
 	ssize_t	n;
 	char	request[MAXLINE], reply[MAXN];
 
 	if (argc != 6)
 		err_quit("usage: client < hostname or IPaddr>  <#children> "
 				 "<#loops/child> <#bytes/request>");
 
 	nchildren = atoi(argv[3]);
 	nloops = atoi(argv[4]);
 	nbytes = atoi(argv[5]);
 	snprintf(request, sizeof(request), "%d\n", nbytes); /* newline at end */
 
 	for (i = 0; i < nchildren; i++) {
 		if ( (pid = Fork()) == 0) {		/* child */
 			for (j = 0; j < nloops; j++) {
 				fd = Tcp_connect(argv[1], argv[2]);
 
 				Write(fd, request, strlen(request));
 
 				if ( (n = Readn(fd, reply, nbytes)) != nbytes)
 					err_quit("server returned %d bytes", n);
 
 				Close(fd);		/* TIME_WAIT on client, not server */
 			}
 			printf("child %d done\n", i);
 			exit(0);
 		}
 		/* parent loops around to fork() again */
 	}
 
 	while (wait(NULL) > 0)	/* now parent waits for all children */
 		;
 	if (errno != ECHILD)
 		err_sys("wait error");
 
 	exit(0);
 }
  	
 
Каждый раз при запуске клиента мы задаем ip , порт сервера , число дочерних процессов , порождаемых с помощью fork , число запросов , которое каждый дочерний процесс будет посылать серверу , и количество байт , отсылаемых сервером. Родительский процесс клиента форкает дочерние процессы , и каждый из них порождает коннект с сервером . Потом клиент закрывает каждое соединение , при этом TIME_WAIT имеет место на его стороне. Формат запуска клиента из командной строки :
  
 # client < hostname or IPaddr> < port> < #children> < #loops/child> < #bytes/request>
 
Рассмотрим параллельный сервер с одним дочерним процессом для каждого клиента .
  
 //server/serv01.c 
 
 int
 main(int argc, char **argv)
 {
 	int					listenfd, connfd;
 	pid_t				childpid;
 	void				sig_chld(int), sig_int(int), web_child(int);
 	socklen_t			clilen, addrlen;
 	struct sockaddr		*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv01 [ < host> ] < port#>");
 	cliaddr = Malloc(addrlen);
 
 	Signal(SIGCHLD, sig_chld);
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		clilen = addrlen;
 		if ( (connfd = accept(listenfd, cliaddr, &clilen)) < 0) {
 			if (errno == EINTR)
 				continue;		/* back to for() */
 			else
 				err_sys("accept error");
 		}
 
 		if ( (childpid = Fork()) == 0) {	/* child process */
 			Close(listenfd);	/* close listening socket */
 			web_child(connfd);	/* process the request */
 			exit(0);
 		}
 		Close(connfd);			/* parent closes connected socket */
 	}
 }
 /* end serv01 */
 
 /* include sigint */
 void
 sig_int(int signo)
 {
 	void	pr_cpu_time(void);
 
 	pr_cpu_time();
 	exit(0);
 }
 /* end sigint */
 
 

Рассмотрим вариант preforking-сервера , который работает для систем 4.4BSD . Сервер генерирует сразу при старте необходимое количество процессов. Если в какой-то момент их число будет исчерпано , сервер будет продолжать обслуживать клиентов , но процесс замедлится . Сервер должен постоянно проверять количество свободных дочерних процессов , и когда их число становится ниже допустимой нормы , генерировать их опять . С другой стороны , если их слишком много , сервер должен их прибивать . Рассмотрим первую версию такого сервера .

  
 //server/serv02.c
 
 static int		nchildren;
 static pid_t	*pids;
 
 int
 main(int argc, char **argv)
 {
 	int			listenfd, i;
 	socklen_t	addrlen;
 	void		sig_int(int);
 	pid_t		child_make(int, int, int);
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv02 [ < host> ] < port#> <#children>");
 	nchildren = atoi(argv[argc-1]);
 	pids = Calloc(nchildren, sizeof(pid_t));
 
 	for (i = 0; i < nchildren; i++)
 		pids[i] = child_make(i, listenfd, addrlen);	/* parent returns */
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; )
 		pause();	/* everything done by children */
 }
 /* end serv02 */
 
 /* include sigint */
 void
 sig_int(int signo)
 {
 	int		i;
 	void	pr_cpu_time(void);
 
 		/* 4terminate all children */
 	for (i = 0; i < nchildren; i++)
 		kill(pids[i], SIGTERM);
 	while (wait(NULL) > 0)		/* wait for all children */
 		;
 	if (errno != ECHILD)
 		err_sys("wait error");
 
 	pr_cpu_time();
 	exit(0);
 }
 /* end sigint */
 
 
Дополнительный аргумент в командной строке указывает , сколько нужно задать дочерних процессов . Выделяется массив , в который записываются из pid-ы. Дочерний процесс генерится с помощью child_make :
  
 //server/child02.c
 
 pid_t
 child_make(int i, int listenfd, int addrlen)
 {
 	pid_t	pid;
 	void	child_main(int, int, int);
 
 	if ( (pid = Fork()) > 0)
 		return(pid);		/* parent */
 
 	child_main(i, listenfd, addrlen);	/* never returns */
 }
 /* end child_make */
 
 /* include child_main */
 void
 child_main(int i, int listenfd, int addrlen)
 {
 	int				connfd;
 	void			web_child(int);
 	socklen_t		clilen;
 	struct sockaddr	*cliaddr;
 
 	cliaddr = Malloc(addrlen);
 
 	printf("child %ld starting\n", (long) getpid());
 	for ( ; ; ) {
 		clilen = addrlen;
 		connfd = Accept(listenfd, cliaddr, &clilen);
 
 		web_child(connfd);		/* process the request */
 		Close(connfd);
 	}
 }
 /* end child_main */
 
 
В следующей версии сервера будет реализована защита вызова функции accept при помощи блокировки на основе функции fcntl . В главной функции main будет добавлена функция my_lock_init перед началом цикла , в котором будут создаваться дочерние процессы . В функции child_main появляется блокировка перед вызовом функции accept и снятие блокировки после ее завершения .
  
 		my_lock_wait();
 		connfd = Accept(listenfd, cliaddr, &clilen);
 		my_lock_release();
 
Функция my_lock_init :
  
 void
 my_lock_init(char *pathname)
 {
     char	lock_file[1024];
 
 		/* 4must copy caller's string, in case it's a constant */
     strncpy(lock_file, pathname, sizeof(lock_file));
     Mktemp(lock_file);
 
     lock_fd = Open(lock_file, O_CREAT | O_WRONLY, FILE_MODE);
     Unlink(lock_file);			/* but lock_fd remains open */
 
 	lock_it.l_type = F_WRLCK;
 	lock_it.l_whence = SEEK_SET;
 	lock_it.l_start = 0;
 	lock_it.l_len = 0;
 
 	unlock_it.l_type = F_UNLCK;
 	unlock_it.l_whence = SEEK_SET;
 	unlock_it.l_start = 0;
 	unlock_it.l_len = 0;
 }
 /* end my_lock_init */
 
 
Создается временный файл , который хранится до тех пор , пока есть ссылки. Инициализируются 2 структуры flock : одна для блокировки файла , другая для снятия . Функции , устанавливающие и снимающие блокировку на файл :
  
 /* include my_lock_wait */
 void
 my_lock_wait()
 {
     int		rc;
     
     while ( (rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < 0) {
 		if (errno == EINTR)
 			continue;
     	else
 			err_sys("fcntl error for my_lock_wait");
 	}
 }
 
 void
 my_lock_release()
 {
     if (fcntl(lock_fd, F_SETLKW, &unlock_it) < 0)
 		err_sys("fcntl error for my_lock_release");
 }
 /* end my_lock_wait */
 
Модернизируем последнюю версию сервера и заменим блокировку на основе файловых операций блокировкой с помощью мьютексов . Функция my_lock_init теперь будет выглядеть так :
  
 void
 my_lock_init(char *pathname)
 {
 	int		fd;
 	pthread_mutexattr_t	mattr;
 
 	fd = Open("/dev/zero", O_RDWR, 0);
 
 	mptr = Mmap(0, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
 				MAP_SHARED, fd, 0);
 	Close(fd);
 
 	Pthread_mutexattr_init(&mattr);
 	Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
 	Pthread_mutex_init(mptr, &mattr);
 }
 /* end my_lock_init */
 
 /* include my_lock_wait */
 void
 my_lock_wait()
 {
 	Pthread_mutex_lock(mptr);
 }
 
 void
 my_lock_release()
 {
 	Pthread_mutex_unlock(mptr);
 }
 /* end my_lock_wait */
 	
 
Последней версией сервера с предварительным порождением процессов будет версия , в которой accept вызывается только родителем , и который передает присоединенный сокет потомку . Создадим структуру , содержащую информацию о дочернем процессе :
  
 typedef struct {
   pid_t		child_pid;		/* process ID */
   int		child_pipefd;	/* parent's stream pipe to/from child */
   int		child_status;	/* 0 = ready */
   long		child_count;	/* #connections handled */
 } Child;
 
Функция child_make :
  
 pid_t
 child_make(int i, int listenfd, int addrlen)
 {
 	int		sockfd[2];
 	pid_t	pid;
 	void	child_main(int, int, int);
 
 	Socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
 
 	if ( (pid = Fork()) > 0) {
 		Close(sockfd[1]);
 		cptr[i].child_pid = pid;
 		cptr[i].child_pipefd = sockfd[0];
 		cptr[i].child_status = 0;
 		return(pid);		/* parent */
 	}
 
 	Dup2(sockfd[1], STDERR_FILENO);		/* child's stream pipe to parent */
 	Close(sockfd[0]);
 	Close(sockfd[1]);
 	Close(listenfd);					/* child does not need this open */
 	child_main(i, listenfd, addrlen);	/* never returns */
 }
 
Функция main :
  
 //server/serv05.c
 
 static int		nchildren;
 
 int
 main(int argc, char **argv)
 {
 	int			listenfd, i, navail, maxfd, nsel, connfd, rc;
 	void		sig_int(int);
 	pid_t		child_make(int, int, int);
 	ssize_t		n;
 	fd_set		rset, masterset;
 	socklen_t	addrlen, clilen;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv05 [  ]  <#children>");
 
 	FD_ZERO(&masterset);
 	FD_SET(listenfd, &masterset);
 	maxfd = listenfd;
 	cliaddr = Malloc(addrlen);
 
 	nchildren = atoi(argv[argc-1]);
 	navail = nchildren;
 	cptr = Calloc(nchildren, sizeof(Child));
 
 		/* 4prefork all the children */
 	for (i = 0; i < nchildren; i++) {
 		child_make(i, listenfd, addrlen);	/* parent returns */
 		FD_SET(cptr[i].child_pipefd, &masterset);
 		maxfd = max(maxfd, cptr[i].child_pipefd);
 	}
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		rset = masterset;
 		if (navail <= 0)
 			FD_CLR(listenfd, &rset);	/* turn off if no available children */
 		nsel = Select(maxfd, &rset, NULL, NULL, NULL);
 
 			/* 4check for new connections */
 		if (FD_ISSET(listenfd, &rset)) {
 			clilen = addrlen;
 			connfd = Accept(listenfd, cliaddr, &clilen);
 
 			for (i = 0; i < nchildren; i++)
 				if (cptr[i].child_status == 0)
 					break;				/* available */
 
 			if (i == nchildren)
 				err_quit("no available children");
 			cptr[i].child_status = 1;	/* mark child as busy */
 			cptr[i].child_count++;
 			navail--;
 
 			n = Write_fd(cptr[i].child_pipefd, "", 1, connfd);
 			Close(connfd);
 			if (--nsel == 0)
 				continue;	/* all done with select() results */
 		}
 
 			/* 4find any newly-available children */
 		for (i = 0; i < nchildren; i++) {
 			if (FD_ISSET(cptr[i].child_pipefd, &rset)) {
 				if ( (n = Read(cptr[i].child_pipefd, &rc, 1)) == 0)
 					err_quit("child %d terminated unexpectedly", i);
 				cptr[i].child_status = 0;
 				navail++;
 				if (--nsel == 0)
 					break;	/* all done with select() results */
 			}
 		}
 	}
 }
 
Теперь перейдем к серверу на основе потоков : в ней содержится один поток для каждого клиента .
  
 //server/serv06.c
 
 int
 main(int argc, char **argv)
 {
 	int				listenfd, connfd;
 	void			sig_int(int);
 	void			*doit(void *);
 	pthread_t		tid;
 	socklen_t		clilen, addrlen;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv06 [ < host> ] < port#>");
 	cliaddr = Malloc(addrlen);
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		clilen = addrlen;
 		connfd = Accept(listenfd, cliaddr, &clilen);
 
 		Pthread_create(&tid, NULL, &doit, (void *) connfd);
 	}
 }
 
 
Основной поток блокируется в вызове функции accept , и каждый раз , когда приходит новое клиентское соединение , pthread_create создает новый поток с функцией doit . Потоки неприсоединенные .

В следующей версии мы напишем сервер , в котором каждый поток будет вызывать accept . Структура , содержащая информацию о потоке :

  
 //server/pthread07.h 
 
 typedef struct {
   pthread_t		thread_tid;		/* thread ID */
   long			thread_count;	/* #connections handled */
 } Thread;
 Thread	*tptr;		/* array of Thread structures; calloc'ed */
 
Функция main :
  
 //server/serv07.c
 
 pthread_mutex_t	mlock = PTHREAD_MUTEX_INITIALIZER;
 
 int
 main(int argc, char **argv)
 {
 	int		i;
 	void	sig_int(int), thread_make(int);
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv07 [ < host> ] < port#> <#threads>");
 	nthreads = atoi(argv[argc-1]);
 	tptr = Calloc(nthreads, sizeof(Thread));
 
 	for (i = 0; i < nthreads; i++)
 		thread_make(i);			/* only main thread returns */
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; )
 		pause();	/* everything done by threads */
 }
  
 
Фугкция thread_main ;
  
 //server/pthread07.c
 
 void
 thread_make(int i)
 {
 	void	*thread_main(void *);
 
 	Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
 	return;		/* main thread returns */
 }
 
 void *
 thread_main(void *arg)
 {
 	int				connfd;
 	void			web_child(int);
 	socklen_t		clilen;
 	struct sockaddr	*cliaddr;
 
 	cliaddr = Malloc(addrlen);
 
 	printf("thread %d starting\n", (int) arg);
 	for ( ; ; ) {
 		clilen = addrlen;
     	Pthread_mutex_lock(&mlock);
 		connfd = Accept(listenfd, cliaddr, &clilen);
 		Pthread_mutex_unlock(&mlock);
 		tptr[(int) arg].thread_count++;
 
 		web_child(connfd);		/* process the request */
 		Close(connfd);
 	}
 }
 
 
Ну и наконец последняя версия сервера : главный поток создает пул потоков , после чего он же вызывает accept и передает клиентское соединение одному из свободных потоков . Структура потока :
  
 //server/pthread08.h
 
 typedef struct {
   pthread_t		thread_tid;		/* thread ID */
   long			thread_count;	/* #connections handled */
 } Thread;
 Thread	*tptr;		/* array of Thread structures; calloc'ed */
 
 #define	MAXNCLI	32
 int					clifd[MAXNCLI], iget, iput;
 pthread_mutex_t		clifd_mutex;
 pthread_cond_t		clifd_cond;
 
Функция main :
  
 //server/serv08.c
 
 static int			nthreads;
 pthread_mutex_t		clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t		clifd_cond = PTHREAD_COND_INITIALIZER;
 
 int
 main(int argc, char **argv)
 {
 	int			i, listenfd, connfd;
 	void		sig_int(int), thread_make(int);
 	socklen_t	addrlen, clilen;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv08 [  ]  <#threads>");
 	cliaddr = Malloc(addrlen);
 
 	nthreads = atoi(argv[argc-1]);
 	tptr = Calloc(nthreads, sizeof(Thread));
 	iget = iput = 0;
 
 		/* 4create all the threads */
 	for (i = 0; i < nthreads; i++)
 		thread_make(i);		/* only main thread returns */
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		clilen = addrlen;
 		connfd = Accept(listenfd, cliaddr, &clilen);
 
 		Pthread_mutex_lock(&clifd_mutex);
 		clifd[iput] = connfd;
 		if (++iput == MAXNCLI)
 			iput = 0;
 		if (iput == iget)
 			err_quit("iput = iget = %d", iput);
 		Pthread_cond_signal(&clifd_cond);
 		Pthread_mutex_unlock(&clifd_mutex);
 	}
 }
 /* end serv08 */
 
 void
 sig_int(int signo)
 {
 	int		i;
 	void	pr_cpu_time(void);
 
 	pr_cpu_time();
 
 	for (i = 0; i < nthreads; i++)
 		printf("thread %d, %ld connections\n", i, tptr[i].thread_count);
 
 	exit(0);
 }
 
Функция thread_make :
  
 void
 thread_make(int i)
 {
 	void	*thread_main(void *);
 
 	Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
 	return;		/* main thread returns */
 }
 
 void *
 thread_main(void *arg)
 {
 	int		connfd;
 	void	web_child(int);
 
 	printf("thread %d starting\n", (int) arg);
 	for ( ; ; ) {
     	Pthread_mutex_lock(&clifd_mutex);
 		while (iget == iput)
 			Pthread_cond_wait(&clifd_cond, &clifd_mutex);
 		connfd = clifd[iget];	/* connected socket to service */
 		if (++iget == MAXNCLI)
 			iget = 0;
 		Pthread_mutex_unlock(&clifd_mutex);
 		tptr[(int) arg].thread_count++;
 
 		web_child(connfd);		/* process the request */
 		Close(connfd);
 	}
 }
 

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье