Глава 6 : Очереди сообщений System V
Исходники для этой страницы лежат тут
Очередь сообщений можно рассматривать как связный список сообщений .
Сообщения помещаются в очередь и извлекаются из нее.
Каждое сообщение есть запись , и каждому сообщению присваивается приоритет.
Отличие очереди от фифо в том , что для фифо нельзя произвести запись в канал до тех пор ,
пока не появится считывающий данные процесс.
Для добавления в очередь нет необходимости в ожидающем процессе .
Отличие очереди от каналов также в том , что данные в очереди сохраняются и после завершения работы
процесса , который создал эту очередь , в то время как в каналах это невозможно .
Ядро хранит очереди в обьектах типа msgid_ds :
Пример очереди из 3-х сообщений :
Создать новую очередь или получить доступ к уже существующей можно с помощью
int msgget(key_t key , int flag)
Функция возвращает положительный идентификатор в случае успеха.
Ключ нужно получить с помощью ftok.
Флаг представляет из себя комбинацию флагов на чтение-запись.
При создании новой очереди инициализируются поля :
msg_perm.uid - пользователь
msg_perm.cuid - группа
msg_perm.mode - флаги
msg_qnum = msg_lspid = msg_lrpid = msg_time = 0
msg_ctime - текущее время
msg_qbytes - системное ограничение на размер очереди
Помещать сообщения в очередь можно с помощью
int msgsnd(int msgid , const void *ptr , size_t len , int flag)
Возвращает 0 в случае успеха
Указатель ptr указывает на тип сообщения :
struct msgbuf
{
long mtype;
char mtext[]
}
Можно указывать свои собственные произвольные типы сообщений -
например такое , которое будет состоять из 2-х полей - числа и текста :
struct my_msgbuf
{
long mtype;
int m1;
char mtext[1024];
}
Аргумент flag в функции msgsnd может быть 0 либо IPC_NOWAIT.
Последний означает , что если в очереди нет свободного места , ядро не будет ждать ,
когда оно появится , и блокировки не будет . В этом случае msgsnd вернет ошибку .
Сообщение может быть считано с помощью функции
ssize_t msgrcv(int msgid , const void *ptr , size_t len , long type , int flag)
ptr указывает на приемник данных.
Управлять очередями сообщений можно с помощью
int msgctl(int msgid , int cmd , struct msgid_ds *buf)
Возвращает 0 в случае успеха
Аргумент cmd может принимать 3 значения :
IPC_RMID - удаление очереди с идентификатором msgid
IPC_SET - модификация пермишинов очереди
IPC_STAT - возвращает структуру очереди
Напишем программу , которая создаст очередь , поместит в нее одно-байтовое сообщение ,
вызовет функцию msgstat , выполнит команду ipcs , а затем удалит очередь .
//svmsg/ctl.c
int main(int argc, char **argv)
{
int msqid;
struct msqid_ds info;
struct msgbuf buf ;
msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
buf.mtype = 1;
buf.mtext[0] = 1;
Msgsnd(msqid, &buf, 1, 0);
Msgctl(msqid, IPC_STAT, &info);
printf("read-write: %03o, cbytes = %lu, qnum = %lu, qbytes = %lu\n",
info.msg_perm.mode & 0777, (long) info.msg_cbytes,
(long) info.msg_qnum, (long) info.msg_qbytes);
system("ipcs -q");
Msgctl(msqid, IPC_RMID, NULL);
exit(0);
}
У меня вывод был такой
read-write: 000, cbytes = 0, qnum = 0, qbytes = 0
------ Message Queues --------
key msqid owner perms used-bytes messages
0x00000000 425984 root 311 1 1
Напишем клиент-серверную программу с использованием 2-х очередей сообщений ,
одна из них будет передавать сообщения от клиента серверу , вторая наоборот
Сценарий работы следующий : запускаем сначала сервер , потом в другом окне клиента.
В клиенте набираем путь к файлу , содержание которого хотим получить .
Сервер создает обе очереди , если они уже существуют , это не страшно ,
ибо мы не указываем флаг IPC_EXCL .
В функции server() будут вызваны функции mesg_send и mesg_recv :
//svmsgcliserv/server_main.c
int main(int argc, char **argv)
{
int readid, writeid;
readid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);
writeid = Msgget(MQ_KEY2, SVMSG_MODE | IPC_CREAT);
server(readid, writeid);
exit(0);
}
Клиент открывает созданные сервером очереди :
//svmsgcliserv/client_main.c
int main(int argc, char **argv)
{
int readid, writeid;
/* 4assumes server has created the queues */
writeid = Msgget(MQ_KEY1, 0);
readid = Msgget(MQ_KEY2, 0);
client(readid, writeid);
/* 4now we can delete the queues */
Msgctl(readid, IPC_RMID, NULL);
Msgctl(writeid, IPC_RMID, NULL);
exit(0);
}
Функции mesg_send и mesg_recv :
ssize_t mesg_send(int id, struct mymesg *mptr)
{
return(msgsnd(id, &(mptr->mesg_type), mptr->mesg_len, 0));
}
ssize_t mesg_recv(int id, struct mymesg *mptr)
{
ssize_t n;
n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);
mptr->mesg_len = n; /* return #bytes of data */
return(n); /* -1 on error, 0 at EOF, else >0 */
}
Наличие поля type в каждом сообщении дает возможность установить произвольный доступ к сообщениям.
В pipe и fifo данные считываются именно в том порядке , в котором они поступили.
В очередях можно установить произвольный доступ .
В следующем примере будет один сервер обслуживать одновременно несколько клиентов ,
причем мы создадим всего одну очередь , которая будет обслуживать всех.
Клиент в качестве своего идентификатора будет отсылать серверу pid процесса .
Сообщение , посылаемое серверу , будет иметь тип 1 , сообщения , посылаемые сервером ,
будут иметь pid текущего клиента . Создается единственная очередь , идентификатор которой
будет использоваться в качестве аргумента.
Сценарий все тот же : запускаем сервер , а затем несколько клиентов , каждый из которых запрашивает
путь к файлу и получает от сервера его содержимое ,
и все это выполняется с помощью одной очереди :
//svmsgmpxlq/server_main.c
int main(int argc, char **argv)
{
int msqid;
msqid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);
server(msqid, msqid); /* same queue for both directions */
exit(0);
}
void server(int readfd, int writefd)
{
FILE *fp;
char *ptr;
pid_t pid;
ssize_t n;
struct mymesg mesg;
for ( ; ; ) {
/* 4read pathname from IPC channel */
mesg.mesg_type = 1;
if ( (n = Mesg_recv(readfd, &mesg)) == 0) {
err_msg("pathname missing");
continue;
}
mesg.mesg_data[n] = '\0'; /* null terminate pathname */
if ( (ptr = strchr(mesg.mesg_data, ' ')) == NULL) {
err_msg("bogus request: %s", mesg.mesg_data);
continue;
}
*ptr++ = 0; /* null terminate PID, ptr = pathname */
pid = atol(mesg.mesg_data);
mesg.mesg_type = pid; /* for messages back to client */
if ( (fp = fopen(ptr, "r")) == NULL) {
/* 4error: must tell client */
snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
": can't open, %s\n", strerror(errno));
mesg.mesg_len = strlen(ptr);
memmove(mesg.mesg_data, ptr, mesg.mesg_len);
Mesg_send(writefd, &mesg);
} else {
/* 4fopen succeeded: copy file to IPC channel */
while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
mesg.mesg_len = strlen(mesg.mesg_data);
Mesg_send(writefd, &mesg);
}
Fclose(fp);
}
/* 4send a 0-length message to signify the end */
mesg.mesg_len = 0;
Mesg_send(writefd, &mesg);
}
}
int main(int argc, char **argv)
{
int msqid;
/* 4server must create the queue */
msqid = Msgget(MQ_KEY1, 0);
client(msqid, msqid); /* same queue for both directions */
exit(0);
}
void client(int readfd, int writefd)
{
size_t len;
ssize_t n;
char *ptr;
struct mymesg mesg;
/* 4start buffer with pid and a blank */
snprintf(mesg.mesg_data, MAXMESGDATA, "%ld ", (long) getpid());
len = strlen(mesg.mesg_data);
ptr = mesg.mesg_data + len;
/* 4read pathname */
Fgets(ptr, MAXMESGDATA - len, stdin);
len = strlen(mesg.mesg_data);
if (mesg.mesg_data[len-1] == '\n')
len--; /* delete newline from fgets() */
mesg.mesg_len = len;
mesg.mesg_type = 1;
/* 4write PID and pathname to IPC channel */
Mesg_send(writefd, &mesg);
/* 4read from IPC, write to standard output */
mesg.mesg_type = getpid();
while ( (n = Mesg_recv(readfd, &mesg)) > 0)
Write(STDOUT_FILENO, mesg.mesg_data, n);
}
Мы только что имели дело с последовательным сервером , все клиенты вставали в одну очередь
на обслуживание .
Теперь мы напишем другую программу , в которой сервер станет параллельным и будет работать
по такой схеме :
|
Каждый клиент будет создавать свою очередь с ключом IPC_PRIVATE.
Здесь вместо pid процесса клиенты будут сообщать серверу id очереди ,
с которой сервер и будет работать .
//svmsgmpxnq/client_main.c
int main(int argc, char **argv)
{
int readid, writeid;
/* 4server must create its well-known queue */
writeid = Msgget(MQ_KEY1, 0);
/* 4we create our own private queue */
readid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
client(readid, writeid);
/* 4and delete our private queue */
Msgctl(readid, IPC_RMID, NULL);
exit(0);
}
void client(int readid, int writeid)
{
size_t len;
ssize_t n;
char *ptr;
struct mymesg mesg;
/* 4start buffer with msqid and a blank */
snprintf(mesg.mesg_data, MAXMESGDATA, "%d ", readid);
len = strlen(mesg.mesg_data);
ptr = mesg.mesg_data + len;
/* 4read pathname */
Fgets(ptr, MAXMESGDATA - len, stdin);
len = strlen(mesg.mesg_data);
if (mesg.mesg_data[len-1] == '\n')
len--; /* delete newline from fgets() */
mesg.mesg_len = len;
mesg.mesg_type = 1;
/* 4write msqid and pathname to server's well-known queue */
Mesg_send(writeid, &mesg);
/* 4read from our queue, write to standard output */
while ( (n = Mesg_recv(readid, &mesg)) > 0)
Write(STDOUT_FILENO, mesg.mesg_data, n);
}
Поскольку для каждого клиента будет порождаться процесс , нужно позаботиться о зомби.
Функция обработки SIGCHLD :
void sig_chld(int signo)
{
pid_t pid;
int stat;
while ( (pid = waitpid(-1, &stat, WNOHANG)) > 0)
;
return;
}
В ней происходит опрос статусов всех созданных дочерних процессов.
При выходе из этой функции происходит нежелательное прерывание другой серверной функции - msgrcv .
Для обработки возврата из функции sig_chld мы напишем обертку Mesg_recv , которая будет проверять ,
что вернула функция sig_chld , и если это ошибка , мы просто еще раз вызываем mesg_recv.
void server(int readid, int writeid)
{
FILE *fp;
char *ptr;
ssize_t n;
struct mymesg mesg;
void sig_chld(int);
Signal(SIGCHLD, sig_chld);
for ( ; ; ) {
/* 4read pathname from our well-known queue */
mesg.mesg_type = 1;
if ( (n = Mesg_recv(readid, &mesg)) == 0) {
err_msg("pathname missing");
continue;
}
mesg.mesg_data[n] = '\0'; /* null terminate pathname */
if ( (ptr = strchr(mesg.mesg_data, ' ')) == NULL) {
err_msg("bogus request: %s", mesg.mesg_data);
continue;
}
*ptr++ = 0; /* null terminate msgid, ptr = pathname */
writeid = atoi(mesg.mesg_data);
if (Fork() == 0) { /* child */
if ( (fp = fopen(ptr, "r")) == NULL) {
/* 4error: must tell client */
snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
": can't open, %s\n", strerror(errno));
mesg.mesg_len = strlen(ptr);
memmove(mesg.mesg_data, ptr, mesg.mesg_len);
Mesg_send(writeid, &mesg);
} else {
/* 4fopen succeeded: copy file to client's queue */
while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
mesg.mesg_len = strlen(mesg.mesg_data);
Mesg_send(writeid, &mesg);
}
Fclose(fp);
}
/* 4send a 0-length message to signify the end */
mesg.mesg_len = 0;
Mesg_send(writeid, &mesg);
exit(0); /* child terminates */
}
/* parent just loops around */
}
}
ssize_t mesg_recv(int id, struct mymesg *mptr)
{
ssize_t n;
n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);
mptr->mesg_len = n; /* return #bytes of data */
return(n); /* -1 on error, 0 at EOF, else >0 */
}
/* end mesg_recv */
/* include Mesg_recv */
ssize_t Mesg_recv(int id, struct mymesg *mptr)
{
ssize_t n;
do {
n = mesg_recv(id, mptr);
} while (n == -1 && errno == EINTR);
if (n == -1)
err_sys("mesg_recv error");
return(n);
}
Теперь несколько слов об недостатках очередей System V : они идентифицируются не дескрипторами ,
а идентификаторами , поэтому с ними нельзя использовать функции select / poll .
Этот недостаток проявляется тогда , когда нужно написать приложение ,
которое одновременно работает с сетевыми соединениями и с IPC.
|