Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
 Books
  Краткое описание
 Linux
 W. R. Стивенс TCP 
 W. R. Стивенс IPC 
 A.Rubini-J.Corbet 
 K. Bauer 
 Gary V. Vaughan 
 Д Вилер 
 В. Сталлинг 
 Pramode C.E. 
 Steve Pate 
 William Gropp 
 K.A.Robbins 
 С Бекман 
 Р Стивенс 
 Ethereal 
 Cluster 
 Languages
 C
 Perl
 M.Pilgrim 
 А.Фролов 
 Mendel Cooper 
 М Перри 
 Kernel
 C.S. Rodriguez 
 Robert Love 
 Daniel Bovet 
 Д Джеф 
 Максвелл 
 G. Kroah-Hartman 
 B. Hansen 
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Linux Kernel 2.6...5169 
 Trees...938 
 Максвелл 3...870 
 Go Web ...821 
 William Gropp...802 
 Ethreal 3...786 
 Gary V.Vaughan-> Libtool...772 
 Ethreal 4...770 
 Rodriguez 6...763 
 Ext4 FS...754 
 Steve Pate 1...754 
 Clickhouse...753 
 Ethreal 1...741 
 Secure Programming for Li...731 
 C++ Patterns 3...716 
 Ulrich Drepper...696 
 Assembler...694 
 DevFS...660 
 Стивенс 9...649 
 MySQL & PosgreSQL...630 
 
  01.01.2024 : 3621733 посещений 

iakovlev.org

Глава 6 : Очереди сообщений System V

Исходники для этой страницы лежат тут

Очередь сообщений можно рассматривать как связный список сообщений . Сообщения помещаются в очередь и извлекаются из нее. Каждое сообщение есть запись , и каждому сообщению присваивается приоритет. Отличие очереди от фифо в том , что для фифо нельзя произвести запись в канал до тех пор , пока не появится считывающий данные процесс. Для добавления в очередь нет необходимости в ожидающем процессе . Отличие очереди от каналов также в том , что данные в очереди сохраняются и после завершения работы процесса , который создал эту очередь , в то время как в каналах это невозможно .

Ядро хранит очереди в обьектах типа msgid_ds :
Пример очереди из 3-х сообщений :
Создать новую очередь или получить доступ к уже существующей можно с помощью

       
 	int msgget(key_t key , int flag)
 
Функция возвращает положительный идентификатор в случае успеха. Ключ нужно получить с помощью ftok. Флаг представляет из себя комбинацию флагов на чтение-запись.

При создании новой очереди инициализируются поля :

       
 	msg_perm.uid  - пользователь
 	msg_perm.cuid - группа
 	msg_perm.mode - флаги
 	msg_qnum = msg_lspid = msg_lrpid = msg_time = 0 
 	msg_ctime - текущее время
 	msg_qbytes - системное ограничение на размер очереди
 
Помещать сообщения в очередь можно с помощью
       
 	int msgsnd(int msgid , const void *ptr , size_t len , int flag)
 	
 	Возвращает 0 в случае успеха
 
Указатель ptr указывает на тип сообщения :
       
 	struct msgbuf
 	{
 		long mtype;
 		char mtext[]
 	}
 
Можно указывать свои собственные произвольные типы сообщений - например такое , которое будет состоять из 2-х полей - числа и текста :
       
 	struct my_msgbuf
 	{
 		long mtype;
 		int m1;
 		char mtext[1024];
 	}
 
Аргумент flag в функции msgsnd может быть 0 либо IPC_NOWAIT. Последний означает , что если в очереди нет свободного места , ядро не будет ждать , когда оно появится , и блокировки не будет . В этом случае msgsnd вернет ошибку .

Сообщение может быть считано с помощью функции

       
 	ssize_t msgrcv(int msgid , const void *ptr , size_t len , long type , int flag)
 
ptr указывает на приемник данных.

Управлять очередями сообщений можно с помощью

       
 	int msgctl(int msgid , int cmd , struct msgid_ds *buf)
 
 	Возвращает 0 в случае успеха
 
Аргумент cmd может принимать 3 значения :
       
 	IPC_RMID - удаление очереди с идентификатором msgid
 	IPC_SET  - модификация пермишинов очереди
 	IPC_STAT - возвращает структуру очереди
 
Напишем программу , которая создаст очередь , поместит в нее одно-байтовое сообщение , вызовет функцию msgstat , выполнит команду ipcs , а затем удалит очередь .

       
 //svmsg/ctl.c
 
 int main(int argc, char **argv)
 {
 	int				msqid;
 	struct msqid_ds	info;
 	struct msgbuf buf ;
 
 
 	msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
 
 	buf.mtype = 1;
 	buf.mtext[0] = 1;
 	Msgsnd(msqid, &buf, 1, 0);
 
 	Msgctl(msqid, IPC_STAT, &info);
 	printf("read-write: %03o, cbytes = %lu, qnum = %lu, qbytes = %lu\n",
 		   info.msg_perm.mode & 0777, (long) info.msg_cbytes,
 		   (long) info.msg_qnum, (long) info.msg_qbytes);
 
 	system("ipcs -q");
 
 	Msgctl(msqid, IPC_RMID, NULL);
 	exit(0);
 }
 
У меня вывод был такой
       
 read-write: 000, cbytes = 0, qnum = 0, qbytes = 0
 
 ------ Message Queues --------
 key        msqid      owner      perms      used-bytes   messages
 0x00000000 425984     root       311        1            1
 
Напишем клиент-серверную программу с использованием 2-х очередей сообщений , одна из них будет передавать сообщения от клиента серверу , вторая наоборот

Сценарий работы следующий : запускаем сначала сервер , потом в другом окне клиента. В клиенте набираем путь к файлу , содержание которого хотим получить . Сервер создает обе очереди , если они уже существуют , это не страшно , ибо мы не указываем флаг IPC_EXCL . В функции server() будут вызваны функции mesg_send и mesg_recv :

       
 //svmsgcliserv/server_main.c
 
 int main(int argc, char **argv)
 {
 	int		readid, writeid;
 
 	readid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);
 	writeid = Msgget(MQ_KEY2, SVMSG_MODE | IPC_CREAT);
 
 	server(readid, writeid);
 
 	exit(0);
 }
 
Клиент открывает созданные сервером очереди :
       
 //svmsgcliserv/client_main.c
 
 int main(int argc, char **argv)
 {
 	int		readid, writeid;
 
 		/* 4assumes server has created the queues */
 	writeid = Msgget(MQ_KEY1, 0);
 	readid = Msgget(MQ_KEY2, 0);
 
 	client(readid, writeid);
 
 		/* 4now we can delete the queues */
 	Msgctl(readid, IPC_RMID, NULL);
 	Msgctl(writeid, IPC_RMID, NULL);
 
 	exit(0);
 }
 
 
Функции mesg_send и mesg_recv :
       
  ssize_t mesg_send(int id, struct mymesg *mptr)
 {
 	return(msgsnd(id, &(mptr->mesg_type), mptr->mesg_len, 0));
 }
 
 
 ssize_t mesg_recv(int id, struct mymesg *mptr)
 {
 	ssize_t	n;
 
 	n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);
 	mptr->mesg_len = n;		/* return #bytes of data */
 
 	return(n);				/* -1 on error, 0 at EOF, else >0 */
 }
 
Наличие поля type в каждом сообщении дает возможность установить произвольный доступ к сообщениям. В pipe и fifo данные считываются именно в том порядке , в котором они поступили. В очередях можно установить произвольный доступ .

В следующем примере будет один сервер обслуживать одновременно несколько клиентов , причем мы создадим всего одну очередь , которая будет обслуживать всех. Клиент в качестве своего идентификатора будет отсылать серверу pid процесса . Сообщение , посылаемое серверу , будет иметь тип 1 , сообщения , посылаемые сервером , будут иметь pid текущего клиента . Создается единственная очередь , идентификатор которой будет использоваться в качестве аргумента.

Сценарий все тот же : запускаем сервер , а затем несколько клиентов , каждый из которых запрашивает путь к файлу и получает от сервера его содержимое , и все это выполняется с помощью одной очереди :

       
 //svmsgmpxlq/server_main.c
 
 int main(int argc, char **argv)
 {
 	int		msqid;
 
 	msqid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);
 
 	server(msqid, msqid);	/* same queue for both directions */
 
 	exit(0);
 }
 
 void server(int readfd, int writefd)
 {
 	FILE	*fp;
 	char	*ptr;
 	pid_t	pid;
 	ssize_t	n;
 	struct mymesg	mesg;
 
 	for ( ; ; ) {
 			/* 4read pathname from IPC channel */
 		mesg.mesg_type = 1;
 		if ( (n = Mesg_recv(readfd, &mesg)) == 0) {
 			err_msg("pathname missing");
 			continue;
 		}
 		mesg.mesg_data[n] = '\0';	/* null terminate pathname */
 
 		if ( (ptr = strchr(mesg.mesg_data, ' ')) == NULL) {
 			err_msg("bogus request: %s", mesg.mesg_data);
 			continue;
 		}
 
 		*ptr++ = 0;			/* null terminate PID, ptr = pathname */
 		pid = atol(mesg.mesg_data);
 		mesg.mesg_type = pid;	/* for messages back to client */
 
 		if ( (fp = fopen(ptr, "r")) == NULL) {
 				/* 4error: must tell client */
 			snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
 					 ": can't open, %s\n", strerror(errno));
 			mesg.mesg_len = strlen(ptr);
 			memmove(mesg.mesg_data, ptr, mesg.mesg_len);
 			Mesg_send(writefd, &mesg);
 	
 		} else {
 				/* 4fopen succeeded: copy file to IPC channel */
 			while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
 				mesg.mesg_len = strlen(mesg.mesg_data);
 				Mesg_send(writefd, &mesg);
 			}
 			Fclose(fp);
 		}
 	
 			/* 4send a 0-length message to signify the end */
 		mesg.mesg_len = 0;
 		Mesg_send(writefd, &mesg);
 	}
 }
 
 
 
 
 
 int main(int argc, char **argv)
 {
 	int		msqid;
 
 		/* 4server must create the queue */
 	msqid = Msgget(MQ_KEY1, 0);
 
 	client(msqid, msqid);	/* same queue for both directions */
 
 	exit(0);
 }
 
 void client(int readfd, int writefd)
 {
 	size_t	len;
 	ssize_t	n;
 	char	*ptr;
 	struct mymesg	mesg;
 
 		/* 4start buffer with pid and a blank */
 	snprintf(mesg.mesg_data, MAXMESGDATA, "%ld ", (long) getpid());
 	len = strlen(mesg.mesg_data);
 	ptr = mesg.mesg_data + len;
 
 		/* 4read pathname */
 	Fgets(ptr, MAXMESGDATA - len, stdin);
 	len = strlen(mesg.mesg_data);
 	if (mesg.mesg_data[len-1] == '\n')
 		len--;				/* delete newline from fgets() */
 	mesg.mesg_len = len;
 	mesg.mesg_type = 1;
 
 		/* 4write PID and pathname to IPC channel */
 	Mesg_send(writefd, &mesg);
 
 		/* 4read from IPC, write to standard output */
 	mesg.mesg_type = getpid();
 	while ( (n = Mesg_recv(readfd, &mesg)) > 0)
 		Write(STDOUT_FILENO, mesg.mesg_data, n);
 }
 
 

Мы только что имели дело с последовательным сервером , все клиенты вставали в одну очередь на обслуживание .

Теперь мы напишем другую программу , в которой сервер станет параллельным и будет работать по такой схеме :
Каждый клиент будет создавать свою очередь с ключом IPC_PRIVATE. Здесь вместо pid процесса клиенты будут сообщать серверу id очереди , с которой сервер и будет работать .

       
 //svmsgmpxnq/client_main.c
 
 int main(int argc, char **argv)
 {
 	int		readid, writeid;
 
 		/* 4server must create its well-known queue */
 	writeid = Msgget(MQ_KEY1, 0);
 		/* 4we create our own private queue */
 	readid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
 
 	client(readid, writeid);
 
 		/* 4and delete our private queue */
 	Msgctl(readid, IPC_RMID, NULL);
 
 	exit(0);
 }
 
 void client(int readid, int writeid)
 {
 	size_t	len;
 	ssize_t	n;
 	char	*ptr;
 	struct mymesg	mesg;
 
 		/* 4start buffer with msqid and a blank */
 	snprintf(mesg.mesg_data, MAXMESGDATA, "%d ", readid);
 	len = strlen(mesg.mesg_data);
 	ptr = mesg.mesg_data + len;
 
 		/* 4read pathname */
 	Fgets(ptr, MAXMESGDATA - len, stdin);
 	len = strlen(mesg.mesg_data);
 	if (mesg.mesg_data[len-1] == '\n')
 		len--;				/* delete newline from fgets() */
 	mesg.mesg_len = len;
 	mesg.mesg_type = 1;
 
 		/* 4write msqid and pathname to server's well-known queue */
 	Mesg_send(writeid, &mesg);
 
 		/* 4read from our queue, write to standard output */
 	while ( (n = Mesg_recv(readid, &mesg)) > 0)
 		Write(STDOUT_FILENO, mesg.mesg_data, n);
 }
 
 
Поскольку для каждого клиента будет порождаться процесс , нужно позаботиться о зомби. Функция обработки SIGCHLD :
       
 void sig_chld(int signo)
 {
 	pid_t	pid;
 	int		stat;
 
 	while ( (pid = waitpid(-1, &stat, WNOHANG)) > 0)
 		;
 	return;
 }
 
В ней происходит опрос статусов всех созданных дочерних процессов. При выходе из этой функции происходит нежелательное прерывание другой серверной функции - msgrcv . Для обработки возврата из функции sig_chld мы напишем обертку Mesg_recv , которая будет проверять , что вернула функция sig_chld , и если это ошибка , мы просто еще раз вызываем mesg_recv.
       
 void server(int readid, int writeid)
 {
 	FILE	*fp;
 	char	*ptr;
 	ssize_t	n;
 	struct mymesg	mesg;
 	void	sig_chld(int);
 
 	Signal(SIGCHLD, sig_chld);
 
 	for ( ; ; ) {
 			/* 4read pathname from our well-known queue */
 		mesg.mesg_type = 1;
 		if ( (n = Mesg_recv(readid, &mesg)) == 0) {
 			err_msg("pathname missing");
 			continue;
 		}
 		mesg.mesg_data[n] = '\0';	/* null terminate pathname */
 
 		if ( (ptr = strchr(mesg.mesg_data, ' ')) == NULL) {
 			err_msg("bogus request: %s", mesg.mesg_data);
 			continue;
 		}
 		*ptr++ = 0;			/* null terminate msgid, ptr = pathname */
 		writeid = atoi(mesg.mesg_data);
 
 		if (Fork() == 0) {		/* child */
 			if ( (fp = fopen(ptr, "r")) == NULL) {
 					/* 4error: must tell client */
 				snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
 						 ": can't open, %s\n", strerror(errno));
 				mesg.mesg_len = strlen(ptr);
 				memmove(mesg.mesg_data, ptr, mesg.mesg_len);
 				Mesg_send(writeid, &mesg);
 		
 			} else {
 					/* 4fopen succeeded: copy file to client's queue */
 				while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
 					mesg.mesg_len = strlen(mesg.mesg_data);
 					Mesg_send(writeid, &mesg);
 				}
 				Fclose(fp);
 			}
 		
 				/* 4send a 0-length message to signify the end */
 			mesg.mesg_len = 0;
 			Mesg_send(writeid, &mesg);
 			exit(0);		/* child terminates */
 		}
 		/* parent just loops around */
 	}
 }
 
 
 
 
 ssize_t mesg_recv(int id, struct mymesg *mptr)
 {
 	ssize_t	n;
 
 	n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);
 	mptr->mesg_len = n;		/* return #bytes of data */
 
 	return(n);				/* -1 on error, 0 at EOF, else >0 */
 }
 /* end mesg_recv */
 
 /* include Mesg_recv */
 ssize_t Mesg_recv(int id, struct mymesg *mptr)
 {
 	ssize_t	n;
 
 	do {
 		n = mesg_recv(id, mptr);
 	} while (n == -1 && errno == EINTR);
 
 	if (n == -1)
 		err_sys("mesg_recv error");
 
 	return(n);
 }
 

Теперь несколько слов об недостатках очередей System V : они идентифицируются не дескрипторами , а идентификаторами , поэтому с ними нельзя использовать функции select / poll . Этот недостаток проявляется тогда , когда нужно написать приложение , которое одновременно работает с сетевыми соединениями и с IPC.

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье