A Tutorial on Shared Memory Inter-Process Communication

What’s in this article

  • A simple tutorial of Linux Shared Memory.
  • Design of a shared memory inter-process communication(IPC) system.
  • Implementation in C++

Inter-Process Communication (IPC)

IPC basically means a process can send/receive data from other processes. Many modern software architectures are based on IPC.

As a rule of thumb, breaking software into processes (verse have a simple process with multi-threads) makes the software development and maintenance easier.

The benefits come from we can clearly define the input/output between processes. As a result, the internal implementation of these processes becomes physically independent which makes them easy to develop and maintain.

To support the input/output relationship between processes, we need an IPC system.

There are many ways to IPC. I discussed the TCP/IP communication in this post

In that post, I implemented a net video player using Linux TCP/IP API. Due to the steaming nature of TCP, we designed a protocol to break large messages into small packages. We send packages by TCP. The receiver collects all the packages and reconstructs the original message. It works fine for most the cases.

However, if you have large images (10 MB), and you want to send the images across processes. Break a large image into packages, send them, and resemble them can be a huge overhead.

Are there other ways to do it?

Yes, we can send large files by Shared Memory between processes.

For shared memory IPC, we allocate a block of shared memory for 2 processes. Then one thread can write data to the memory and the other thread can read from it.

Allocating shared memory is easy. In Linux, there are simple API calls to do it.

A simple toy example for shared-memory IPC: link
Another good tutorial on Shared memory: link

However, the shared-memory is just raw memory. If two processes write to it at the same time, there will be data racing problems.

We have to synchronize write/read for Shared Memory IPC.

Shared Memory Synchronization

To make it simple, let’s start with the familiar multi-threads single-process case.

In the multi-threads single process environment, the process’ memory is shared across threads.

To avoid the racing conditions, we have to synchronize each thread. There are lots of built-in modules for thread synchronization. For example, we can use a Lock to make sure only one thread writes to the block of memory at one time.

For multiple processes shared memory IPC, it is the same.

Lots of processes can write to the same block of shared-memory. We have to synchronize them. We have to make sure only one process can write to the same block of shared memory at the same time. However, there are no build-in modules to do the synchronization for processes.

For a shared memory IPC system, we have to implement the synchronization.

Atomic Variables

Read/write Atomic variables is safe in a multi-thread environment.

Objects of atomic types are the only C++ objects that are free from data races; that is, if one thread writes to an atomic object while another thread reads from it, the behavior is well-defined.

The synchronization for Atomic variables is done at the instruction level. As a result, Atomic variables are safe in a multi-threading environment as well as a multi-process environment.

Normal C++ variables are unsafe in a multi-process environment. Even for a simple read, normal C++ variables are unsafe. It’s due to memory architecture and compiler optimization in the instruction level. Please read Memory Ordering at Compile Time and Memory Ordering at Runtime for details.

One simple idea to guard the shared-memory is to use a lock. Interestingly we can implement an inter-process lock by an Atomic variable.

My understanding of the Software Engineering is answering the question of how to handle complexity.
To save my hair, I start with something simple.

The Inter-process Lock

We can use an atomic variable to implement an inter-process lock. And we can use this lock to protect shared memory.

The simplest lock we can implement using an atomic variable is a Spin Lock.

class Spinlock {
public:
    std::atomic_flag flag_{ ATOMIC_FLAG_INIT };

    void lock()
    {
        while (!try_lock()) {
            usleep(speed_time_us);
        }
    }

    inline bool try_lock()
    {
        return !flag_.test_and_set();
    }

    void unlock()
    {
        flag_.clear();
    }

    // config
    uint32_t speed_time_us = 50;
};

The flag_ is an atomic flag variable. It is basically a bool. flag_.test_and_set() sets the flag to true. flag_.clear() sets the flag to false.

Assuming lock is shared between 2 processes. When a process A called the lock(), the flag_ is set the true. Process B also call the lock(). But the try_lock() function return false since the flag_ is true. Process B will sleep for 50 us and call try_lock() again until flag_ become false.

Once process A finished. It calls unlock(). The flag_ is set to false. For process B, It is still trying. In this time, the try_lock() returns true. Process B finishes the call to lock() and do whatever in the critical section. In the end, process B calls the unlock() to free the lock.

class lock_guard {
public:
    Lock& l_;

    lock_guard(Lock& l)
        : l_(l)
    {
        l_.lock();
    }

    ~lock_guard()
    {
        l_.unlock();
    }
};

lock_guard is a helper class to manage Lock. It used a programming technique called RAII. lock_guard is a simplified version of std::lock_guard.

When lock_guard is constructed, it calls the lock() function. When it is destructed, it calls the unlock().

We use the lock_guard like this.

Spinlock critical_section_lock;

{
    lock_guard lock(critical_section_lock);
    
    ... critical section...
    ... the destruction is controlled by {} ...
}

The critical section is protected by the lock.

With the inter-process lock implemented, we can use it to protect the shared memory.

Please read C++ Concurrency in Action for a detail discussion about locks and Atomic variables.

Message Queue In the Shared Memory

Recall the motivation of shared-memory communication. There is a writer keeps generating data. There are multiple readers consuming the data. Each reader does a different task using the full data generated by the writer.

For example, the writer keeps getting images from a camera and sends them to readers. A reader gets the images and does obstacle detection. Another reader gets the images and sends them to a remote monitor.

What we can do is basically: having a shared memory for all processes. The writer copies objects into the shared memory. The readers copy data from shared memory. The copy operation is guarded by locks.

As I did in the TCP/IP article, having a queue makes the inter-process communication practical. The reader and the writer can process the data at its own pace.

So, the design for the Shared-Memory IPC is:

  1. Put a message queue in the shared memory.
  2. The message queue is protected by the Inter-process lock.

To use the message queue, users need to

  1. construct the message queue in the shared memory, or get a pointer to the message queue in the shared memory.
  2. read/write to the queue.

Nothing fancy. Let’s go into the implementation.

The Message Implementation

constexpr size_t RAW_DATA_CHAR_SIZE = 1024 * 1024 * 10;

struct DataBlob {
    double check_sum = -1;
    char raw_data[RAW_DATA_CHAR_SIZE];
};

struct Cell {
    Lock writer_lock_;
    Semaphore smf_;

    DataBlob data;
};

For simplicity, I assume the data is fixed size. Each cell (message) is protected by a Lock and a Semaphore.

The Lock is used to protect write to the message cell. i.e. When a process is writing to the cell, no other processes can read the cell.

Semaphore is basically a safe counter for how many reader are reading the current cell. The Semaphore is used to support multiple readers. Each active reader increases the Semaphore by one when it is copying data. When the reader finishes, the Semaphore is decreased by one. A writer is allowed to write to the cell when the Semaphore is 0.

The principle is to increase concurrency (total data processed at a period of time).

  1. Having a lock for each cell. If the writer is writing to a cell, the readers read other cells just fine.
  2. Using Semaphore allows multiple readers to read a cell at the same time.

The Semaphore is implemented using std::atomic.

class Semaphore {
public:
    std::atomic<int32_t> count_;

    inline void increase()
    {
        count_++;
    }

    inline void decrease()
    {
        count_--;
    }

    inline bool is_zero()
    {
        int32_t expect = 0;
        return count_.compare_exchange_strong(expect, 0);
    }
};

The Shared Queue

This Shared Queue has shared data for all processes. It lives in the shared memory.

struct MutipleReaderQueueShared {
    bool try_write(const DataBlob& d)
    {
        Cell& m = messages_[write_idx_ % QUEUE_LEN];

        // If some processes is reading, do nothing
        if (!m.smf_.is_zero()) {
            return false;
        }

        {
            // The write operation is protected by a lock *in the message*.
            lock_guard lock(m.writer_lock_);
            m.data = d;

            {
                // Index operation is protected by lock for *write* and *read*.
                // Because index operation is light,
                // using a lock doesn't hurt the performance much.
                lock_guard lock(operation_lock_);
                ++write_idx_;

                // Queue warp around.
                if (write_idx_ - farthest_read_idx_ > QUEUE_LEN) {
                    int32_t last_farest_read_idx = farthest_read_idx_;
                    farthest_read_idx_++;

                    assert(farthest_read_idx_ = write_idx_ - QUEUE_LEN);
                    // doesn't hold when int overflow
                    assert(farthest_read_idx_ > last_farest_read_idx);
                }
            }
        }

        return true;
    }

    bool write(const DataBlob& d)
    {
        while (!try_write(d)) {
            // Unlikely. Happends when the queue warp around.
            std::cout << "write|spinning" << std::endl;
            usleep(1000);
        }

        return true;
    }

    std::atomic<int32_t> write_idx_{ 0 };
    std::atomic<int32_t> farthest_read_idx_{ 0 };

    Lock operation_lock_;
    Cell messages_[QUEUE_LEN];
};

  1. Cell messages[QUEUE_LEN] is the buffer. Each element is protected by a lock and a semaphore. The Cell is defined in the previous section.
  2. std::atomic write_idx_ and std::atomic farthest_read_idx_ are the standard index operation for circular buffer. The farthest_read_idx_ is the beginning of valid data in the circular buffer. It provides information for readers to recover if readers are out of sync.
  3. Multiple readers can read at the same time. Lock operation_lock_ locks index operations. We need to make sure the index operations happen together for a write/read operation.

The Local Queue

Each process has local data for the queue. Because of the design, each reader process reads the queue independently. So each queue in the reader process should have its only read index.

The local queue has,

  1. A pointer to the shared queue MutipleReaderQueueShared* q_shared_ in the shared memory.
  2. Read related data and functions.
bool try_read(DataBlob& d)
{
    Cell* m_ptr;
    {
        // make sure index operation is locked.
        // Index operation is light-weighted.
        // Using a lock doesn't hurt performance much.
        lock_guard lock(q_shared_->operation_lock_);

        if (read_idx >= q_shared_->write_idx_) {
            return false;
        }

        // Jump to lastest message.
        // It is the flexibility of shared memory. In TCP, you can't do this.
        if (read_idx < q_shared_->farthest_read_idx_) {
            const int32_t last_read_idx = read_idx;
            read_idx = q_shared_->farthest_read_idx_.load();
            // It doesn't hold when int overflow
            assert(last_read_idx < read_idx);
        }

        m_ptr = &q_shared_->messages_[read_idx % QUEUE_LEN];
        
        // Using a semaphore to track how many process is reading the current message.
        // "signal" a reader is here
        m_ptr->smf_.increase();

        // Check if someone is writing to this cell
        // This only happen if the queue warp around.
        if (!m_ptr->writer_lock_.try_lock()) {
            m_ptr->smf_.decrease();
            std::cout << "try_read|someone is writing" << std::endl;
            std::cout << "try_read|m.test_num :" << m_ptr->data.check_sum << std::endl;
            return false;
        } else {
            // Unlock it since I lock the cell in if statement.
            m_ptr->writer_lock_.unlock();
        }

        ++read_idx;
    }

    d = m_ptr->data;

    // TODO: what if program crashes here? RAII ?
    m_ptr->smf_.decrease();

    return true;
}

Note, in line 15~22, If the read_idx points to an invalid cell, we jump to the farthest available message directly. In TCP, there is no way to skip messages due to the streaming nature of the TCP protocol.

For example, If we just want to read the latest message, we can jump to the latest message directly. Whereas, we can’t do that in TCP.

The Cons for Shared-memory IPC: we need to implement an IPC system from scratch.
The Pros: we are free to do whatever we want. (Freedom usually comes with hair loss).

There are some very scary locks operations and interactions. I am not 100% sure my implementation is bug-free since I can’t find a good way to test an IPC system.

Shared-Memory Allocation

There is a Linux API for shared-memory. Not very interesting, just confusing Linux C APIs.

template <class T>
T* mmap_shmem(const std::string& shmem_name, std::string& error_msg)
{
    int fd = -1;
    fd = shm_open(shmem_name.c_str(), O_CREAT | O_RDWR, 0666);

    if (fd == -1) {
        error_msg = "open";
        return nullptr;
    }
    if (ftruncate(fd, sizeof(T))) {
        error_msg = "ftruncate";
        close(fd);
        return nullptr;
    }

    T* ret = (T*)mmap(0, sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    close(fd);

    if (ret == MAP_FAILED) {
        error_msg = "mmap";
        return nullptr;
    }
    return ret;
}

Note, each shared-memory block is indexed by a string.

In this function, we allocate a block of shared-memory call shmem_name with the size of type T, and return a pointer to the uninitialized T.

MutipleReaderQueueShared* allocate_shmem(const std::string& key)
{
    MutipleReaderQueueShared* shm_ptr = nullptr;

    std::string error_msg;
    shm_ptr = mmap_shmem<MutipleReaderQueueShared>(key, error_msg);
    assert(shm_ptr);

    return shm_ptr;
}

A helper function calls the allocate_shmem to allocate the shared memory for a MutipleReaderQueueShared. Note, the size if MutipleReaderQueueShared is known at the compile time.

A process needs to initialize the MutipleReaderQueueShared. I dictated the writer process to do it. To make the writer process feeling better, there is a SharedMemoryGuard to guard the shared-memory by RAII.

class SharedMemoryGuard {
public:
    SharedMemoryGuard(const std::string& key)
    {
        hacky_init_helper = new MutipleReaderQueueShared();

        shm_ptr_ = allocate_shmem(key);

        //hacky init
        memcpy(shm_ptr_, hacky_init_helper, sizeof(MutipleReaderQueueShared));
    }

    ~SharedMemoryGuard()
    {
        memcpy(shm_ptr_, hacky_init_helper, sizeof(MutipleReaderQueueShared));
        munmap(shm_ptr_, sizeof(MutipleReaderQueueShared));

        delete hacky_init_helper;
    }

    MutipleReaderQueueShared* hacky_init_helper;
    MutipleReaderQueueShared* shm_ptr_;
};

On the Users Side

Recall the use case is: a writer writes images, multiple readers read those images independently.

The writer allocates the shared memory and writes images to it.

Readers get a pointer to the shared memory and read from it.

The Writer

#include "comms.h"
#include "utils.h"

#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>

int main(int argc, char* argv[])
{
    ...
    
    std::string image_dir_path = argv[1];

    SharedMemoryGuard shared_mem("shared_memory");
    MutipleReaderQueue queue_write;

    queue_write.q_shared_ = shared_mem.shm_ptr_;

    DataBlob* d = new DataBlob;
    for (size_t i = 0; i < 725; ++i) {
       ...

        const cv::Mat image = cv::imread(path);
        
        const std::vector<char> smat = serialize_cvmat(image);
        std::copy(smat.begin(), smat.end(), d->raw_data);

        queue_write.write(*d);
        ...
    }
    ...
}

Line 13~16, initialization of the shared-memory IPC. The name for the shared-memory block is shared_memory.

Line 24~25, serialize OpenCV image. For shallow copyable objects, we can simply copy the objects. But the OpenCV image has data in the heap. So, we still need to do the serialization.

We can probably do a type trait (or concept) for shallow copyable objects and vice versa.

Line 27, write to the shared queue.

The Readers

#include "comms.h"
#include "utils.h"

#include "opencv2/imgproc/imgproc.hpp"
#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>

using time_point = std::chrono::steady_clock::time_point;

int main()
{
    MutipleReaderQueue queue_read;
    queue_read.q_shared_ = allocate_shmem("shared_memory");

    DataBlob* d = new DataBlob;
    
    ...

    while (true) {
        ...

        while (queue_read.read(*d)) {
            cv::Mat mat = deserialize_cvmat(d->raw_data);

            // do whatever you want for the image
        }
        ...
    }

    return 0;
}

Line 12~13, ask Linux to retrieve the shared-memory block called shared_memory.

Line 22, read all available data from the shared queue. But only keep the latest one.

Line 25, do whatever you want for the image. different readers do different tasks here.


Code link

A not very exciting demo. The simpleScreenRecorder take too much resource.

Discussion

The Readers-Writers Problem

In basically implemented the Readers-Writers Problem using shared memory for IPC.

Computer Systems: A Programmer’s Perspective

Tests?

I still don’t know how to test an IPC system.

The input and output for the IPC system depend on networking or system load. In other words, the input and output are not defined. So, unit tests don’t work for the IPC system.

I would only test the necessary conditions for an IPC system as the best effort.

Please reference me to available resources.

4 thoughts on “A Tutorial on Shared Memory Inter-Process Communication

  1. Hallo, can you please explain, how the the Inter-process lock works exactly ? because the std::atomic_flag flag_ ins’t shared between process A and B, right ? How does process B know, if process A has set the flag ?

  2. Are you really sure that “Atomic variables are safe in a multi-threading environment as well as a multi-process environment”? In multi core environments? I don’t think so.

    1. Atomicity is achieved by CPU instructions. In the perspective of CPU, thread and process are the same. Let me know if you have a counter example.

Leave a Reply