C++进程间通信

一.为什么进程间需要通信?

1.数据传输

一个进程需要将它的数据发送给另一个进程;

2.资源共享

多个进程之间共享同样的资源;

3.事件通知

一个进程需要向另一个或一组进程发送消息,通知它们发生了某种事件;

4.进程控制

有些进程希望完全控制另一个进程的执行(如Debug进程),该控制进程希望能够拦截另一个进程的所有操作,并能够及时知道它的状态改变。

基于以上几个原因,所以就有了进程间通信的概念,那么进程间通信的原理是什么呢?目前有哪几种进程间通信的机制?他们是如何实现进程间通信的呢?在这篇文章中我会就这几个问题进行详细的讲解。

 

二.进程间通信的原理

每个进程各自有不同的用户地址空间,任何一个进程的全局变量在另一个进程中都看不到,所以进程之间要交换数据必须通过内核,在内核中开辟一块缓冲区,进程1把数据从用户空间拷到内核缓冲区,进程2再从内核缓冲区把数据读走,内核提供的这种机制称为进程间通信机制。

主要的过程如下图所示:

 

三.进程间通信的几种方式

常见的几种进程间通信方式:

  • 1.管道(Pipe):管道可用于具有亲缘关系进程间的通信,允许一个进程和另一个与它有共同祖先的进程之间进行通信。
  • 2.命名管道(named pipe/FIFO):命名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。命名管道在文件系统中有对应的文件名。命名管道通过命令mkfifo或系统调用mkfifo来创建。
  • 3.共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
  • 4.信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;Linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数)。
  • 5.内存映射(mmap):mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用read,write等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。
  • 6.消息(Message)队列:消息队列是消息链式队列,消息被读完就删除,可以供多个进程间通信。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点

具体展开来看:

1.管道(pipe)

管道又名匿名管道,这是一种最基本的IPC机制,由pipe函数创建:

int _pipe[2] = {0, 0};
int ret = pipe(_pipe);  // 创建无名管道,参数返回写和读文件操作符;ret返回值:成功返回0,失败返回-1;

调用pipe函数时在内核中开辟一块缓冲区用于通信,它有一个读端,一个写端:_pipe[0]指向管道的读端,_pipe[1]指向管道的写端。所以管道在用户程序看起来就像一个打开的文件,通过read(_pipe[0])或者write(_pipe[1])向这个文件读写数据,其实是在读写内核缓冲区。

使用管道的通信过程:

1.父进程调用pipe开辟管道,得到两个文件描述符指向管道的两端。

2.父进程调用fork创建子进程,那么子进程也有两个文件描述符指向同一管道。

3.父进程关闭管道读端,子进程关闭管道写端。父进程可以往管道里写,子进程可以从管道里读,管道是用环形队列实现的,数据从写端流入从读端流出,这样就实现了进程间通信。

管道出现的四种特殊情况:

1.写端关闭,读端不关闭;

那么管道中剩余的数据都被读取后,再次read会返回0,就像读到文件末尾一样。

2.写端不关闭,但是也不写数据,读端不关闭;

此时管道中剩余的数据都被读取之后再次read会被阻塞,直到管道中有数据可读了才重新读取数据并返回;

3.读端关闭,写端不关闭;

此时该进程会收到信号SIGPIPE,通常会导致进程异常终止。

4.读端不关闭,但是也不读取数据,写端不关闭;

此时当写端被写满之后再次write会阻塞,直到管道中有空位置了才会写入数据并重新返回。

使用管道的缺点:

1.两个进程通过一个管道只能实现单向通信,如果想双向通信必须再重新创建一个管道或者使用sockpair才可以解决这类问题;

2.只能用于具有亲缘关系的进程间通信,例如父子,兄弟进程。

匿名管道实例:

/*
 * 进程间通信-pipe匿名管道方式
 *  管道是特殊的文件,底层用队列实现,管道中内容,读完就删除了
 *  缺点:只能在父子进程或有亲缘关系的进程中使用
 *
 * 编译: 
 *      g++ mars/ipc/pipe.cc -o build/ipc-pipe
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>

int main() {
    // 创建pipe管道
    int _pipe[2] = {0, 0};
    int ret = pipe(_pipe);  // 创建无名管道,参数返回写和读文件操作符
    if (ret == -1) {
        std::cout << "create pipe fail!" << std::endl;
        return 1;
    }
    std::cout << "create pipe: " << _pipe[0] << _pipe[1] << std::endl;

    // fork一个子进程出来(在fork的时候,子进程会拿到父进程的内存拷贝,同时子进程和父进程都可以拿到之前创建的管道并访问文件描述符)
    pid_t cid = fork();  
    if (cid < 0) {   //fork失败
        std::cout << "fork fail!" << std::endl;
        return 2;
    }
    // 测试子进程写->父进程读消息
    if (cid == 0){  // 子进程执行逻辑
        std::cout << "Child Writing..." << std::endl;
        close(_pipe[0]);    //子进程关闭管道读文件操作符
        // 发送数据到pipe
        int count = 5;
        const char* msg = "I'm a child msg from pipe!";
        while (count--) {
            write(_pipe[1], msg, strlen(msg));
            sleep(1);
        }
        close(_pipe[1]);
        exit(1);    //子进程退出
    } else{  // 父进程执行逻辑
        std::cout << "Father Reading..." << std::endl;
        close(_pipe[1]);    //子进程关闭管道写文件操作符
        // 读取pipe数据
        char msg[1024];
        int count = 5;
        while (true) {
            ssize_t n = read(_pipe[0], msg, sizeof(msg) - 1);
            if (n > 0) {
                msg[n] = '\0';
                std::cout << "recive from child: " << msg << std::endl;
            } else {
                std::cout << "read empty!" << std::endl;
                //exit(1);
                break;
            }
        }
        // 等待子进程结束再退出父进程
        if (waitpid(cid, 0, 0) != -1) {
            std::cout << "child closed!" << std::endl;
        }
    }
    return 0;
}


// 运行结果 
$ ./build/ipc-pipe 
create pipe: 34
Father Reading...
Child Writing...
recive from child: I'm a child msg from pipe!
recive from child: I'm a child msg from pipe!
recive from child: I'm a child msg from pipe!
recive from child: I'm a child msg from pipe!
recive from child: I'm a child msg from pipe!
read empty!
child closed!

 

2.命名管道(fifo)

上一种进程间通信的方式是匿名的,所以只能用于具有亲缘关系的进程间通信,命名管道的出现正好解决了这个问题。FIFO不同于管道之处在于它提供一个路径名与之关联,以FIFO的文件形式存储文件系统中。命名管道是一个设备文件,因此即使进程与创建FIFO的进程不存在亲缘关系,只要可以访问该路径,就能够通过FIFO相互通信。

命名管道的创建与读写:

1).是在程序中使用系统函数建立命名管道;

2).是在Shell下交互地建立一个命名管道,Shell方式下可使用mknod或mkfifo命令来创建管道,两个函数均定义在头文件sys/stat.h中;

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
int mknod(const char *pathname, mode_t mode, dev_t dev);
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);

返回值:都是成功返回0,失败返回-1;

path为创建的命名管道的全路径名;

mod为创建的命名管道的模式,指明其存取权限;

dev为设备值,该值取决于文件创建的种类,它只在创建设备文件时才会用到;

mkfifo函数的作用:在文件系统中创建一个文件,该文件用于提供FIFO功能,即命名管道。

命名管道的特点:

1.命名管道是一个存在于硬盘上的文件,而管道是存在于内存中的特殊文件。所以当使用命名管道的时候必须先open将其打开。

2.命名管道可以用于任何两个进程之间的通信,不管这两个进程是不是父子进程,也不管这两个进程之间有没有关系。

命名管道实例:

/*
 * 进程间通信-fifo有名管道server
 *  命名管道是一个设备文件,因此即使进程与创建FIFO的进程不存在亲缘关系,只要可以访问该路径,就能够通过FIFO相互通信
 *
 * 编译:
 *      g++ mars/ipc/fifo-server.cc -o build/ipc-fifo-server
 */
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

int main() {
    std::string name = ".myfifo";

    // 创建一个存取权限为0666的命名管道
    unlink(name.c_str());
    int namepipe = mkfifo(name.c_str(), S_IFIFO | 0666);  
    if (namepipe == -1) {
        perror("mkfifo fail!");
        exit(1);
    }

    // 只写方式打开该命名管道
    int fd = open(name.c_str(), O_WRONLY);  // O_RDWR);
    if (fd == -1) {
        perror("open fifo fail!");
        exit(2);
    }
    // 向fifo管道发送数据
    char buf[1024];
    while (1) {
        printf("sendto fifo: ");
        fflush(stdout);
        ssize_t n = read(0, buf, sizeof(buf) - 1);  //从标准输入获取消息
        if (n > 0) {
            buf[n - 1] = '\0';  //过滤掉从标准输入中获取的换行
            if (write(fd, buf, n) == -1) {  //把该消息写入到命名管道中
                perror("write fifo fail!");
                exit(3);
            }
        }
    }
    close(fd);

    return 0;
}



/*
 * 进程间通信-fifo有名管道client
 *  命名管道是一个设备文件,因此即使进程与创建FIFO的进程不存在亲缘关系,只要可以访问该路径,就能够通过FIFO相互通信
 *
 * 编译:
 *      g++ mars/ipc/fifo-client.cc -o build/ipc-fifo-client
 */
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

int main() {
    std::string name = ".myfifo";
    // 只读方式打开该命名管道
    int fd = open(name.c_str(), O_RDONLY);  // O_RDWR);
    if (fd == -1) {
        perror("open fifo fail!");
        exit(1);
    }
    // 接收管道数据
    char buf[1024];
    while (1) {
        ssize_t s = read(fd, buf, sizeof(buf) - 1);
        if (s > 0) {
            printf("receive from fifo: %s\n", buf);
        } else {  //读失败或者是读取到字符结尾
            perror("read fifo fail!");
            exit(2);
        }
    }
    close(fd);

    return 0;
}



// 运行结果
$ ./build/ipc-fifo-server 
sendto fifo: hello
sendto fifo: 123
sendto fifo: fifo

$ ./build/ipc-fifo-client
receive from fifo: hello
receive from fifo: 123
receive from fifo: fifo

 

3.共享内存(shm)

共享内存的原理图:

与共享内存有关的函数:

key_t	ftok(const char *, int);//创建key
int	shmget(key_t, size_t, int);//创建共享内存对象
void	*shmat (int, const void *, int);//内存映射
int	shmdt(const void *);//将用户空间的内存释放
int	shmctl(int, int, struct shmid_ds *)//将内核空间的内存释放

共享内存的特点:

共享内存是这几种进程间通信方式中效率最高的。但是因为共享内存没有提供相应的互斥机制,所以一般共享内存都和信号量配合起来使用。

为什仫共享内存的方式比其他进程间通信的方式效率高?

消息队列,FIFO,管道的消息传递方式一般为 :

1).服务器获取输入的信息;

2).通过管道,消息队列等写入数据至内存中,通常需要将该数据拷贝到内核中;

3).客户从内核中将数据拷贝到自己的客户端进程中;

4).然后再从进程中拷贝到输出文件;

上述过程通常要经过4次拷贝,才能完成文件的传递。

而共享内存只需要:

1).输入内容到共享内存区

2).从共享内存输出到文件

上述过程不涉及到内核的拷贝,这些进程间数据的传递就不再通过执行任何进入内核的系统调用来传递彼此的数据,节省了时间,所以共享内存是这几种进程间通信方式中效率最高的

共享内存实例:

/**
 * @file	share-memory.h
 * @brief   进程间通信-共享内存头文件封装
 *              共享内存创建之后,一直存在于内核中,读完之后,内容还存在,直到被删除或系统关闭
 * @author	yanjingang
 */
#pragma once

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include <unistd.h>
#include <iostream>

#define PATHNAME "."
#define PROCID 'b' //0x6666
#define MEMSIZE 4096*1

using namespace std;

namespace mars {
namespace ipc {
    // 共享内存类
    class ShareMemory{
        private:
            // key
            int key_;
            // 共享内存ID
            int shmid_;
            // 共享内存大小
            int size_;
            // 创建共享内存
            int create_share_memory();
        public:
            ShareMemory();
            ShareMemory(const size_t m_size);
            virtual ~ShareMemory();
            // 变量返回
            int key(){return key_;}
            int shmid(){return shmid_;}
            int size(){return size_;}
            // 共享内存指针
            char* mem;
    };


    // 构造函数
    ShareMemory::ShareMemory(){
        this->size_ = MEMSIZE;
        int ret = this->create_share_memory();
        if(ret != 0){
            cout << "create_share_memory fail!" << endl;
        }
    }
    // 构造函数
    ShareMemory::ShareMemory(const size_t m_size){
        this->size_ = m_size;
        int ret = this->create_share_memory();
        if(ret != 0){
            cout << "create_share_memory fail!" << endl;
        }
    }
    
    // 创建共享内存
    int ShareMemory::create_share_memory(){
        // 创建key, 以实现非亲缘关系进程间通信
        key_ = ftok(PATHNAME, PROCID);  
        if (key_ == -1) {
            cout << "create key file..." << endl;
            FILE* fp;
            if ((fp=fopen(PATHNAME,"a")) == nullptr){
                cout << "keyfile created failed" << endl;
                return -2;
            }
            fclose(fp);
            key_ = ftok(PATHNAME, PROCID);  
            if (key_ == -1) {
                cout << "key created failed" << endl;
                return -1;
            }
        }
        cout << "mq key:" << key_ << endl;

        // 创建共享内存
        shmid_ = shmget(key_, this->size(), IPC_CREAT | 0777);    // 以ftok创建的key,需要IPC_CREAT参数 
        //shmid_ = shmget(IPC_PRIVATE, 128, 0777);   // 在内核中生成共享内存的对象;相当于缓存,只能实现有亲缘关系进程间通信
        if (shmid_ == -1) {
            cout << "shmget create share memory fail!" << endl;
            return -3;
        }
        cout << "shmget create share memory success! shmid:" << shmid() << " size:" << size() << endl;
        
        // 返回这块内存的虚拟地址(将申请的共享内存挂接在该进程的页表上,是将虚拟内存和物理内存相对应)
        mem = (char*)shmat(shmid_, NULL, 0);
        if (mem == nullptr) {
            cout << "shmat share memory mapping fail!" << endl;
            return -4;
        }

        return 0;
    }


    // 析构函数
    ShareMemory::~ShareMemory()
    {
        // 将用户空间的内存释放
        shmdt(mem);
        // 将内核空间的内存释放
        shmctl(shmid_, IPC_RMID, NULL);
    }

}
}


/**
 * @file	share-memory-set.cc
 * @brief   进程间通信-共享内存set测试
 *              共享内存创建之后,一直存在于内核中,读完之后,内容还存在,直到被删除或系统关闭
 * @author	yanjingang
 * @note    编译:g++ mars/ipc/share-memory-set.cc -std=c++11 -Wall -o build/ipc-share-memory-set
 */
#include "share-memory.h"

using namespace std;

int main() {
    mars::ipc::ShareMemory shm; //(4096);

    while (true) {
        // cout << "input content: ";
        // fgets(shm.mem, shm.size(), stdin);
        printf("input content: ");
        fflush(stdout);
        ssize_t n = read(0, shm.mem, 4096 - 1);  //从标准输入获取消息
        if (n > 0) {
            shm.mem[n - 1] = '\0';  //过滤掉从标准输入中获取的换行
        }
        cout << "get share memory content: " << shm.mem << endl;
    }

    return 0;
}



/**
 * @file	share-memory-get.cc
 * @brief   进程间通信-共享内存get测试
 *              共享内存创建之后,一直存在于内核中,读完之后,内容还存在,直到被删除或系统关闭
 * @author	yanjingang
 * @note    编译:g++ mars/ipc/share-memory-get.cc -std=c++11 -Wall -o build/ipc-share-memory-get
 */
#include "share-memory.h"

using namespace std;

int main() {
    mars::ipc::ShareMemory shm; //(4096);

    while (true) {
        sleep(1);
        cout << "get share memory content: " << strlen(shm.mem) << " - " << shm.mem << endl;
    }

    return 0;
}



// 执行结果
$ ./build/ipc-share-memory-set
mq key:1644681260
shmget create share memory success! shmid:589825 size:4096
input content: abc
get share memory content: abc
input content: 123
get share memory content: 123
input content: xyz
get share memory content: xyz

$ ./build/ipc-share-memory-get                                                   
mq key:1644681260
shmget create share memory success! shmid:589825 size:4096
get share memory content: 0 -
get share memory content: 3 - abc
get share memory content: 3 - 123
get share memory content: 3 - 123
get share memory content: 3 - xyz
get share memory content: 3 - xyz


// 查看共享内存
$ ipcs -m
IPC status from <running system> as of Mon Jan  4 19:29:31 CST 2021
T     ID     KEY        MODE       OWNER    GROUP
Shared Memory:
m 327682 0x6607d82c --rw-rw-rw- yanjingang    staff

// 关闭共享内存
$ ipcrm -m 327682

4.信号通信(signal)

信号通信特点:

只能使用内核中已经存在的信号对象,可以通过kill -l或/usr/include/sys/signal.h查看内核中支持多少种信号;

raise函数把信号发给当前进程; alarm函数:定时一段时间发出信号给当前进程,终止进程;sleep;while(1); pause()与sleep状态一样。

信号通信实例:

/*
 * 进程间通信-信号
 *  使用内核中已经存在的信号对象
 *  kill -l  OR /usr/include/sys/signal.h 查看内核中支持多少种信号
 *      HUP INT QUIT ILL TRAP ABRT EMT FPE KILL BUS SEGV SYS PIPE ALRM TERM URG STOP TSTP CONT CHLD TTIN TTOU IO XCPU XFSZ VTALRM PROF WINCH INFO USR1 USR2
 *
 * 编译:
 *      g++ mars/ipc/signal.cc -o build/signal
 */
#include <signal.h>
#include <zconf.h>
#include <iostream>

using namespace std;

// 信号触发自定义回调函数
void process(int signum) {
    cout << "process signum=" << signum << endl;
    int i = 0;
    while (i < 6) {
        i++;
        sleep(1);
        cout << "process:" << i << endl;
    }
}

int main() {
    signal(14, process);  //接收到14的信号后,执行process
    //signal(14, SIG_IGN);  //接收到14的信号后,忽略
    //signal(14, SIG_DFL);  //接收到14的信号后,安装默认方式处理
    cout << "before get signal." << endl;
    //raise(SIGALRM);  // 模拟信号(SIGINT:程序终止(interrupt)信号;SIGABRT:程序的异常终止,如调用 abort;SIGSEGV:非法访问内存;SIGILL:检测非法指令;SIGFPE:错误的算术运算,比如除以零或导致溢出的操作;SIGTERM:发送到程序的终止请求.)
    alarm(10);  // 10s之后终止当前进程,触发SIGALRM
    cout << "after get signal." << endl;
    int i = 0;
    while (i < 20) {
        i++;
        sleep(1);
        cout << "main:" << i << endl;
    }
    return 0;
}

// 运行结果
$ ./build/signal                        
before get signal.
after get signal.
main:1
main:2
main:3
main:4
main:5
main:6
main:7
main:8
main:9
process signum=14  //10s之后被alarm(10)终止当前进程,触发SIGALRM信号,执行process
process:1
process:2
process:3
process:4
process:5
process:6
main:10
main:11
main:12
main:13
main:14
main:15
main:16
main:17
main:18
main:19
main:20

 

5.内存映射(mmap)

mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用read,write等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

mmap内存映射的实现过程,总的来说可以分为三个阶段:

(一)进程启动映射过程,并在虚拟地址空间中为映射创建虚拟映射区域

1、进程在用户空间调用库函数mmap,原型:void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);

2、在当前进程的虚拟地址空间中,寻找一段空闲的满足要求的连续的虚拟地址

3、为此虚拟区分配一个vm_area_struct结构,接着对这个结构的各个域进行了初始化

4、将新建的虚拟区结构(vm_area_struct)插入进程的虚拟地址区域链表或树中

(二)调用内核空间的系统调用函数mmap(不同于用户空间函数),实现文件物理地址和进程虚拟地址的一一映射关系

5、为映射分配了新的虚拟地址区域后,通过待映射的文件指针,在文件描述符表中找到对应的文件描述符,通过文件描述符,链接到内核“已打开文件集”中该文件的文件结构体(struct file),每个文件结构体维护着和这个已打开文件相关各项信息。

6、通过该文件的文件结构体,链接到file_operations模块,调用内核函数mmap,其原型为:int mmap(struct file *filp, struct vm_area_struct *vma),不同于用户空间库函数。

7、内核mmap函数通过虚拟文件系统inode模块定位到文件磁盘物理地址。

8、通过remap_pfn_range函数建立页表,即实现了文件地址和虚拟地址区域的映射关系。此时,这片虚拟地址并没有任何数据关联到主存中。

(三)进程发起对这片映射空间的访问,引发缺页异常,实现文件内容到物理内存(主存)的拷贝

注:前两个阶段仅在于创建虚拟区间并完成地址映射,但是并没有将任何文件数据的拷贝至主存。真正的文件读取是当进程发起读或写操作时。

9、进程的读或写操作访问虚拟地址空间这一段映射地址,通过查询页表,发现这一段地址并不在物理页面上。因为目前只建立了地址映射,真正的硬盘数据还没有拷贝到内存中,因此引发缺页异常。

10、缺页异常进行一系列判断,确定无非法操作后,内核发起请求调页过程。

11、调页过程先在交换缓存空间(swap cache)中寻找需要访问的内存页,如果没有则调用nopage函数把所缺的页从磁盘装入到主存中。

12、之后进程即可对这片主存进行读或者写的操作,如果写操作改变了其内容,一定时间后系统会自动回写脏页面到对应磁盘地址,也即完成了写入到文件的过程。

注:修改过的脏页面并不会立即更新回文件中,而是有一段时间的延迟,可以调用msync()来强制同步, 这样所写的内容就能立即保存到文件里了。

mmap函数的具体定义:

void *mmap(void *start,size_t length,int prot,int flags,int fd,off_t offsize);
具体参数含义:
start :  指向欲映射的内存起始地址,通常设为 NULL,代表让系统自动选定地址,映射成功后返回该地址。
length:  代表将文件中多大的部分映射到内存(必须为4k的整倍数,原因是,内存的最小粒度是页,而进程虚拟地址空间和内存的映射也是以页为单位。为了匹配内存的操作,mmap从磁盘到虚拟地址空间的映射也必须是页)
prot  :  映射区域的保护方式。可以为以下几种方式的组合:
PROT_EXEC 映射区域可被执行
PROT_READ 映射区域可被读取
PROT_WRITE 映射区域可被写入
PROT_NONE 映射区域不能存取
flags :  影响映射区域的各种特性。在调用mmap()时必须要指定MAP_SHARED 或MAP_PRIVATE。
MAP_FIXED 如果参数start所指的地址无法成功建立映射时,则放弃映射,不对地址做修正。通常不鼓励用此旗标。
MAP_SHARED 对映射区域的写入数据会复制回文件内,而且允许其他映射该文件的进程共享。
MAP_PRIVATE 对映射区域的写入操作会产生一个映射文件的复制,即私人的“写入时复制”(copy on write)对此区域作的任何修改都不会写回原来的文件内容。
MAP_ANONYMOUS建立匿名映射。此时会忽略参数fd,不涉及文件,而且映射区域无法和其他进程共享。
MAP_DENYWRITE只允许对映射区域的写入操作,其他对文件直接写入的操作将会被拒绝。
MAP_LOCKED 将映射区域锁定住,这表示该区域不会被置换(swap)。
fd    :  要映射到内存中的文件描述符。如果使用匿名内存映射时,即flags中设置了MAP_ANONYMOUS,fd设为-1。有些系统不支持匿名内存映射,则可以使用fopen打开/dev/zero文件,
然后对该文件进行映射,可以同样达到匿名内存映射的效果。
offset:文件映射的偏移量,通常设置为0,代表从文件最前方开始对应,offset必须是PAGE_SIZE的整数倍。

返回值:
若映射成功则返回映射区的内存起始地址,否则返回MAP_FAILED(-1),错误原因存于errno 中。

错误代码:
EBADF  参数fd 不是有效的文件描述词
EACCES 存取权限有误。如果是MAP_PRIVATE 情况下文件必须可读,使用MAP_SHARED则要有PROT_WRITE以及该文件要能写入。
EINVAL 参数start、length 或offset有一个不合法。
EAGAIN 文件被锁住,或是有太多内存被锁住。
ENOMEM 内存不足。

用户层的调用很简单,其具体功能就是直接将物理内存直接映射到用户虚拟内存,使用户空间可以直接对物理空间操作。但是对于内核层而言,其具体实现比较复杂。

内存映射实例:

/*
 * 进程间通信-内存映射
 *
 * 编译:
 *      g++ mars/ipc/mmap.cc -o build/mmap
 * 
 * 运行:
 *      ./build/mmap p1
 *      ./build/mmap p2
 */
#include <fcntl.h>
#include <sys/mman.h>
#include <zconf.h>
#include <iostream>

using namespace std;

// 数据结构体
struct Persion {
    int sex;
    int age;
};

// 创建并返回内存映射指针
struct Persion* get_mmap(size_t length) {
    std::string name = ".mymmap";

    // 准备内存映射的共享文件
    //unlink(name.c_str());
    int fd = open(name.c_str(), O_CREAT | O_RDWR, 0666);
    if (fd == -1) {
        perror("open file fail!");
        exit(2);
    }
    ftruncate(fd, length);
    // 申请内存
    struct Persion* mm = (struct Persion*)mmap(
        nullptr,                    // 分配的首地址
        length,                     // 分配内存大小(必须是页的整数倍, 32位1页=4k)
        PROT_WRITE | PROT_READ,     // 映射区域保护权限:读|写
        MAP_SHARED,                 // 对映射区域的写入数据会复制回文件内,而且允许其他映射该文件的进程共享
        fd,                         // 要映射到内存中的文件描述符
        0);                         // 文件映射的偏移量,通常设置为0,必须是页的整数倍
    close(fd);
    if (mm == MAP_FAILED) {
        cout << "mmap error" << endl;
        return nullptr;
    }
    return mm;
}

int main(int argc,char *argv[]){
    // 参数
    std::string name = "default";
    if(argc >= 2){
	    cout << "arg: " << argc << " - " << argv[1] << endl;
        name = argv[1];
    }

    // 测试
    size_t length = 4096;
    //int pid = fork();
    //if (pid == 0) {
    while (true)
    {
        sleep(1);
        struct Persion* mm = get_mmap(length);
        cout << name <<" mm:" << mm << endl;
        // get
        cout << "mm get:  age=" << mm->age << " sex=" << mm->sex << endl;
        // set
        mm->sex = rand()%2;     //2
        mm->age = rand()%100;   //3
        cout << "mm set:  age=" << mm->age << " sex=" << mm->sex << endl;
        int mun_ret = munmap(mm, length);  // 释放指针指向的内存区域,并制定释放的内存大小(即使不释放,内容也会在文件中修改)
        if (mun_ret == -1) {
            cout << "munmap error" << endl;
        }
    }
    return 0;
}




// 执行结果
$ ./build/mmap p1                   
arg: 2 - p1
p1 mm:0x102e1e000
mm get:  age=0 sex=0
mm set:  age=29 sex=1
p1 mm:0x102e1e000
mm get:  age=49 sex=1
mm set:  age=12 sex=0
p1 mm:0x102e1e000
mm get:  age=58 sex=1
mm set:  age=69 sex=1


$ ./build/mmap p2
arg: 2 - p2
p2 mm:0x105490000
mm get:  age=29 sex=1
mm set:  age=49 sex=1
p2 mm:0x105490000
mm get:  age=12 sex=0
mm set:  age=58 sex=1
p2 mm:0x105490000
mm get:  age=69 sex=1
mm set:  age=72 sex=0

 

6.消息队列(msg)

特点

消息队列是个链式队列;读完内容之后,消息就删除了; 一个消息队列可以供两个进程双向通信。

消息队列的可以用在特定的每个消息都需要处理而不是只关心最新状态的多进程消息共享场景。

使用

int msgget(key_t, int);//创建消息队列对象
int msgsnd(int, const void *, size_t, int)//发送消息
ssize_t msgrcv(int, void *, size_t, long, int)//接收消息
int msgctl(int, int, struct msqid_ds *)//删除、设置读取消息队列对象

消息队列实例:

/**
 * @file	msgqueue.h
 * @brief   进程间通信-消息队列头文件封装
 *              消息队列是个链式队列;读完内容之后,消息就删除了; 一个消息队列可以供两个进程双向通信
 * @author	yanjingang
 */
#include <unistd.h>
#include <sys/msg.h>
#include <iostream>
#include <thread>

#define PATHNAME "."
#define PROCID 'b' //0x6666
#define SIZE 128*1

using namespace std;

namespace mars {
namespace ipc {
    // 自定义消息格式
    struct Msg {
        long type;  //msgp可以是任何类型的结构体,但第一个字段必须为long类型,即表明此发送消息的类型msgtyp,msgrcv根据此接收消息
        char content[SIZE];
    };

    // 消息队列类
    class MsgQueue{
        private:
            // key_
            int key_;
            // 消息队列标识符
            int msqid_;
            // 队列停止标志
            bool stop_;
        public:
            MsgQueue();
            virtual ~MsgQueue();
            // 变量返回
            int key(){return key_;}
            int stop(){return stop_;}
            int msqid(){return msqid_;}
            // 写入队列消息
            int send(Msg* msg);
            // 读取队列消息
            int receive(const int msgtyp, Msg* msg);

    };


    // 构造函数
    inline MsgQueue::MsgQueue()
        :   stop_(false)
    {
        // 同样和共享内存一样使用ftok生成key
        key_ = ftok(PATHNAME, PROCID);  
        if (key_ == -1) {
            cout << "create key file..." << endl;
            FILE* fp;
            if ((fp=fopen(PATHNAME,"a")) == nullptr){
                cout << "keyfile created failed" << endl;
                return;
            }
            fclose(fp);
            key_ = ftok(PATHNAME, PROCID);  
            if (key_ == -1) {
                cout << "key created failed" << endl;
                return;
            }
        }
        cout << "mq key:" << key_ << endl;

        // 在内核空间创建消息队列(返回消息队列标识符msqid)
        msqid_ = msgget(key_, IPC_CREAT | 0777);
        if (msqid_ == -1) {
            cout << "msg created failed" << endl;
            return;
        }
        cout << "mq msqid:" << msqid_ << endl;
    }
    // 析构函数
    inline MsgQueue::~MsgQueue()
    {
        // 停止标志置true
        stop_ = true;
        // 关闭消息队列
        msgctl(msqid_, IPC_RMID, NULL);
    }

    /**
     * 写入队列消息
     * 
     * return: 0成功,-1失败
     * */
    int MsgQueue::send(Msg* msg) {
        if (stop_) {
            cout << "send mq has stoped!" << endl;
            return -1;
        }
        // 将新的消息添加到消息队列的尾端
        int ret = msgsnd(
            msqid_,          //消息队列标识符(由msgget生成)
            //(void*)&msg,    //指向用户自定义的缓冲区(自定义msgp)
            (void*)msg,    //指向用户自定义的缓冲区(自定义msgp)
            SIZE,            //发送信息的大小。范围在0~系统对消息队列的限制值
            0               //指定在达到系统为消息队列限定的界限时应采取的操作(IPC_NOWAIT 如果需要等待,则不发送消息并且调用进程立即返回,errno为EAGAIN;如果设置为0,则调用进程挂起执行,直到达到系统所规定的最大值为止,并发送消息)
        );
        return ret;
    }

    // 
    /**
     * 读取队列消息
     * 
     * return: 成功时返回实际读取到的消息数据长度;出错返回-1,错误原因存于error中
     * */
    int MsgQueue::receive(const int msgtyp, Msg* msg) {
        if (stop_) {
            cout << "receive mq has stoped!" << endl;
            return -1;
        }
        int ret = msgrcv(
            msqid_,          //消息队列标识符
            //(void*)&msg,    //指向用户自定义的缓冲区
            (void*)msg,    //指向用户自定义的缓冲区
            SIZE,            //接收信息的大小.如果收到的消息大于msgsz,并且msgflg&MSG_NOERROR为真,则将该消息截至msgsz字节,并且不发送截断提示
            msgtyp,        //指定请求的消息类型(msgtyp=0:收到的任意类型消息;msgtyp>0:收到的指定msgtyp类型消息。msgtyp<0:收到的小于或等于msgtyp绝对值的消息。)
            0               //指定所需类型的消息不再队列上时的将要采取的操作(如果设置了 IPC_NOWAIT ,若需要等待,则调用进程立即返回,同时返回-1,并设置errno为ENOMSG;如果未设置IPC_NOWAIT,则调用进程挂起执行,直至收到消息或出现异常)
        );
        return ret;
    }

}
}


/**
 * @file	msgqueue-send.cc
 * @brief   进程间通信-消息队列send测试
 * @author	yanjingang
 * @note    编译:g++ mars/ipc/msgqueue-send.cc -std=c++11 -Wall -o build/ipc-msgqueue-send
 */
#include "msgqueue.h"

#include <unistd.h>
#include <sys/msg.h>
#include <iostream>

using namespace std;

int main() {
    mars::ipc::MsgQueue mq;
    int msgtyp = 100;

    // 写消息队列
    cout << "Sending..." << endl;
    struct mars::ipc::Msg msg;
    msg.type = msgtyp;
    while (true) {
        // 将标准输入字符串保存到消息体中
        //cout << "input content: ";
        //fgets(msg.content, SIZE, stdin);
        cout << "input content: ";
        fflush(stdout);
        ssize_t n = read(0, msg.content, SIZE - 1);  //从标准输入获取消息
        if (n > 0) {
            msg.content[n - 1] = '\0';  //过滤掉从标准输入中获取的换行
        }
        
        // 发送消息
        int ret = mq.send(&msg);
        cout << "send msg ret=" << ret << ".  msg:type=" << msg.type << ",content=" << msg.content << endl;
    }
    
    //std::thread thw(mq.send, &msg);
    return 0;
}


/**
 * @file	msgqueue-receive.cc
 * @brief   进程间通信-消息队列receive测试
 * @author	yanjingang
 * @note    编译:g++ mars/ipc/msgqueue-receive.cc -std=c++11 -Wall -o build/ipc-msgqueue-receive
 */
#include "msgqueue.h"

#include <unistd.h>
#include <sys/msg.h>
#include <iostream>
#include <thread>

using namespace std;

int main() {
    mars::ipc::MsgQueue mq;
    int msgtyp = 100;

    // 读消息队列
    cout << "Receiving..." << endl;
    while (!mq.stop()) {
        struct mars::ipc::Msg msg;
        int ret = mq.receive(msgtyp, &msg);
        cout << "receive msg. ret=" << ret << "  msg:type=" << msg.type << ",content=" << msg.content << endl;
    }
    //std::thread thr(mq.receive, msgtyp);
    return 0;
}


// 运行结果
$ ./build/ipc-msgqueue-send                                              
mq key:1644681260
mq msqid:458752
Sending...
input content: abc
send msg ret=0.  msg:type=100,content=abc
input content: 123
send msg ret=0.  msg:type=100,content=123
input content: xxx
send msg ret=0.  msg:type=100,content=xxx

$ ./build/ipc-msgqueue-receive                                                   
mq key:1644681260
mq msqid:458752
Receiving...
receive msg. ret=128  msg:type=100,content=abc
receive msg. ret=128  msg:type=100,content=123
receive msg. ret=128  msg:type=100,content=xxx

 

各种进程间通信方法各有优劣,当追求极致通信性能时可以考虑共享内存,想要传输特定自定义格式并具备缓冲队列能力的话可以考虑消息队列,大家根据实际场景需要选择对应合适的通信方法即可。

yan 21.1.4

参考:

浅析进程间通信的几种方式

fanflame/cplusplus_ipc

认真分析mmap:是什么 为什么 怎么用

C/C++ 使用mmap/munmap函数分配内存

 

 

欢迎关注下方“非著名资深码农“公众号进行交流~

发表评论

邮箱地址不会被公开。