TCP/IP, Net Video Player

Why writing this article?

There are many TCP/IP examples. But most of them are trivial.

I’d like to learn TCP/IP and do something cool with it.

A video player is a good choice.

I am not a expert on distributed system design. Please let me know if I did something weird 🙂

What’s in this article?

A implementation orientated introduction to TCP/IP protocol.

  1. A simple introduction to TCP\IP.
  2. A simple implementation of a video player uses the TCP\IP protocol in C++.
    1. System design
    2. Message queue
    3. Serialization
    4. TCP package synchronization

1. What is TCP/IP?

TCP is a protocol (a set of rules) to transfer data between 2 processes (actually you can call the API anywhere, even transfer data in a single thread). One process is called the server, another is called the client.

IP is an address system.

Put TCP and IP together, a server bind to an address that consists of an IP and a port number. a client connects to the address. Then the client and server send and receive data use the TCP protocol.

TCP/IP is ubiquitous. For example, when you send an image to your friend by an APP, the image gets serialized into bytes. Then the bytes get transferred by TCP/IP to a server. The server then uses TCP/IP again to transfer the data to your friend. Note, for a real system, there can be multiple client/server pairs.

TCP/IP More Details

TCP/IP connects a server and a client.

A server and a client establish connections and sent/recv data to/from each other.

  1. Establish connections
    1. The server binds to an address (specified by IP or Port number) and waits for clients.
    2. A client tries to connect the server at the address.
    3. The server and client make sure they are connected by handshakes.
  2. Server and client send/receive data.
  3. Client disconnect, The server wait for a new client.

TCP is well defined. It is implemented as Linux system calls which are C APIs.

The APIs can be summarized in this picture.

Picture from: www.geeksforgeeks.org/tcp-server-client-implementation-in-c/

Establish connections step are implemented in c as:

setsockopt, bind, listen, connect, accept

There are many tutorials about the C API. I will skip them since how to use these functions is not important.

But the C TCP/IP API is painful to use. For more info about the TCP/IP API:
TCP Server-Client implementation in C
https://www.geeksforgeeks.org/tcp-server-client-implementation-in-c/
Beej’s Guide to Network Programming
https://beej.us/guide/bgnet/html//index.html

However, problems exist in step 2 (send/recv data). For example,

  1. TCP recv API is a blocking call, which means the calling thread will hang for data. How to hide it from users?
  2. How to implement serialization?
  3. When transfer data, TCP can cut the data into pieces. How to recover the original data?
  4. Due to networking or whatever reason, many datums can come in at once. Are we going to throw old datums? How to keep track of the history of data?
  5. Lose connection? How to exit or recover?

We learn by doing.

Let’s build a net video player.

2. Net Video Player Implementation

2.1 The goal and requirement

A client process reads images, send them to a client. A client receives the images, and display them. When Enter key pressed on the client-side, the client sends a control command to the server. The server stop/resume playing.

Note, the server only send images and receive controls. The client only sends controls and receives images. Only one type of message been transferred in one TCP stream. This requirement makes the system simple to implement and easy to understand.

The guideline for this requirements: a system which is easy to understand.
UDP is better choice for a video steaming player because TCP “insists” to send all data which can hurt real-time performance.
But using TCP is just fine and it is easier to implement.
For more info please read:
Beej’s Guide to Network Programming
https://beej.us/guide/bgnet/html//index.html

2.2 The C API

The C API for TCP can be abstract as this picture:

Picture from: www.geeksforgeeks.org/tcp-server-client-implementation-in-c/

After initialization, the server and client behave the same. They send/recv data stream.

So, we can have a base class tcp_peer to handle send/recv. We let tcp_server and tcp_client to inherit the tcp_peer, and implement of their TCP initialization functions.

Picture from: www.geeksforgeeks.org/tcp-server-client-implementation-in-c/

3. Send Implementation

Send can be broken into 3 steps.

  1. Serialize the object into bytes.
  2. Write bytes into a buffer.
  3. Send all available data in the buffer.

User calls send_to_peer to serialize a message and let send buffer to send it.

// This TCP_base
template <typename SendMessageType, typename RecvMessageType>
class TcpPeer {
public:
    bool send_to_peer(const SendMessageType& message)
    {
        // TODO: direct serialize to send buffer
        char buffer[message::size_of_message<SendMessageType>()];
        message::serialize<SendMessageType>(message, buffer);

        return send_buffer_.write_to_buff_and_trige_send(buffer, tcp_data_.connected_sockfd);
    }

    ...

    TcpSendBuffer<message::size_of_message<SendMessageType>()> send_buffer_;
    ... 
};

The send_buffer_ write the bytes into a circular buffer and send all available data in the buffer.

template <size_t CellSizeByte>
class TcpSendBuffer {
public:
    // Copy to buffer, and trige tcp send.
    // Not guarantee to publish successfully.
    bool write_to_buff_and_trige_send(char const* const data_ptr, const Socket& connected_client)
    {
        // copy data to buffer
        if (send_buffer_.write(data_ptr)) {
            return send_avaliable_data_in_queue_to_client(connected_client);
        } else {
            std::cout << "write failed!" << std::endl;
            return false;
        }
    }

    bool send_avaliable_data_in_queue_to_client(const Socket& connected_client)
    {
        auto icp_send_function = [&connected_client](char* const data_ptr) {
            package_sync::send_control_package(connected_client);
            if (sendall(connected_client, data_ptr, CellSizeByte) == false) {
                return false;
            }

            return true;
        };

        // send everything in buffer
        while (send_buffer_.process(icp_send_function) == true)
            ;

        return true;
    }

private:
    static constexpr size_t BUFFER_LENGTH{ 10 };
    CircularBuffer<BUFFER_LENGTH, CellSizeByte> send_buffer_;
};

The buffer is not important to send. But it is critical to receive. I will discuss the buffer later.

Let’s discuss the serialization first.

3.1 Serialization/Deserialization

Data is passed as bytes on the internet.

To pass a C++ object over the internet, we need to convert the object to bytes, pass the bytes, convert bytes back to a C++ object.

Convert an object to bytes is Serialization.

Convert bytes to an object is Deserialization.

Serialization Example

Consider serializing a 32 bit integer.

constexpr uint32_t size_of_int32_t()
{
    return sizeof(int32_t);
}

void serialize_int32_t(const int32_t& obj, char* const buffer)
{
    memcpy(buffer, &obj, sizeof(int32_t));
}

void deserialize_int32_t(char const* const buffer, int32_t& obj)
{
    memcpy(&obj, buffer, sizeof(int32_t));
}

We just need to copy the memory for fundamental types.

int can confuse computes. Due to historical reason, int can mean a 32 bit int or a 64 bit int in different computes/systems.
To solve this problem, engineer specify which int to use. For example, 32 bit integer is int32_t in C++.
Read this for more info:
http://www.cplusplus.com/reference/cstdint/
I assumed we are using the same operating system. So we don’t need to consider the big endian vs little endian problem. But it can be handled easily by a C API call.

Serialization Interface

I did the serialization interface by template specification.

C++ template is basically hard-coding with compiler support.

The compiler generates a copy of the templated function whenever we use the function on a type. Since hard coding is fast. The template code is fast. But because there are many copies of the template function, the compiling time is longer and the binary is larger.

C++ template specification says: I want to specifically hard code a template function for a type.

Introduction to C++ template:
https://www.geeksforgeeks.org/templates-cpp/

A awesome book about C++. It also covered template.
C++ High Performance: Boost and optimize the performance of your C++17 code
https://www.amazon.com/gp/product/B01MZX1E3Q/ref=ppx_yo_dt_b_search_asin_title?ie=UTF8&psc=1

Serialization Implementation

The base template,

template <typename T>
constexpr uint32_t size_of_message()
{
    assert(false && strcat("template specification not implement for", typeid(T).name()));
    return 0;
}

template <typename T>
void serialize(const T& obj, char* const buffer)
{
    assert(false && strcat("template specification not implement for", typeid(T).name()));
}

template <typename T>
void deserialize(char const* const buffer, T& obj)
{
    assert(false && strcat("template specification not implement for", typeid(T).name()));
}

No type should instantiate this template. If a type is not specified, the base case output an error at running time.

static_assert can output a error at compiling time. But output a dynamic message in compiling time can be tricky.

Template specification for double,

// Note the gramma for template specification
template <>
constexpr uint32_t size_of_message<double>()
{
    return sizeof(double);
}

template <>
void serialize<double>(const double& obj, char* const buffer)
{
    // assume same platform
    memcpy(buffer, &obj, sizeof(double));
}

template <>
void deserialize<double>(char const* const buffer, double& obj)
{
    // assume same platform
    memcpy(&obj, buffer, sizeof(double));
}

We say to the compiler: Hey dude! please hard code size_of_message, serialize and deserialize this way!

Now, we can serialize/deserialize a double like this,

double var = 23333;

// Since size_of_message is a constexpr function, 
// we can allocate array using it.
char bytes[size_of_message<double>];

// same as serialize<double>(var, bytes)
serialize(var, bytes);

// serialized double are in bytes

double var_deserialized = 0;
deserialize(bytes, var_deserialized);

assert(var_deserialized == var);

The compiler calls the template specified functions for double.

To implement serialization for images and video control, we just need to specify these 3 functions.

For cv::Mat serialization, please write the code.
We can do
1. template specification for fundamental types using std::enable_if and std::is_fundamental.
2. recursion calls for specified types.
Fancy template confuse people. And confusing code is bad code. I’d like to stick with simple implementation.
But it is a cool design. Try this by yourself.
A good book about template and generate C++ programming:
C++ High Performance: Boost and optimize the performance of your C++17 code
 Björn Andrist, Viktor Sehr

4. Receive implementation

Receive is tricky to implement due to:

  1. recv API is a blocking call.
  2. we want to keep the history of data.

To solve these problems, we can

  1. Use a recv thread.
  2. Use a message buffer (queue, vector, list …)

4.1 Recv thread

Default C recv API is a blocking call. It means every time you call it, your program will stop and wait for new data. This’s not practical. For a video player, the user should still able to mess around when the underlining TCP socket is waiting for data.

It means we need a thread to do the recv.

4.2 Message buffer

What’s happens if we received more that one image?

We can keep the newest one. But it is sub-optimal. A simple yet powerful solution is to have a message queue to keep all the new messages. And the user can process the data later.

In sum, to have a usable system, we need,

  1. A thread to recv data, so that a user won’t be blocked.
  2. A message queue to save all the data, so that we don’t lose messages.

Message Buffer Implementation

template <uint32_t BufferLength, uint32_t CellSizeByte>
class CircularBuffer {
public:
    using Byte = char;
    struct Cell {
        // a mutex to protect each cell for better concurrency.
        std::mutex mtx;
        Byte blob[CellSizeByte];
    };

    bool has_data()
    {
        std::lock_guard<std::mutex> lck(index_mtx_);
        assert(write_idx_ >= read_idx_);
        return write_idx_ != read_idx_;
    }

    bool write(char const* const data_ptr)
    {
        const size_t write_idx_warp = write_idx_ % BufferLength;
        {
            std::lock_guard<std::mutex> lck(buffer_.at(write_idx_warp).mtx);
            memcpy(buffer_[write_idx_warp].blob, data_ptr, CellSizeByte);
        }

        {
            std::lock_guard<std::mutex> lck(index_mtx_);
            ++write_idx_;

            if (write_idx_ - read_idx_ > BufferLength) {
                read_idx_ = write_idx_ - BufferLength;
            }
        }
        return true;
    }

    // it is read and operation on read data.
    template <typename ProcessFunction>
    bool process(ProcessFunction& process_function)
    {
        if (read_idx_ == write_idx_) {
            return false;
        } else {
            bool status = false;
            {
                std::lock_guard<std::mutex> lck(buffer_.at(read_idx_ % BufferLength).mtx);
                status = process_function(buffer_[read_idx_ % BufferLength].blob);
            }

            {
                std::lock_guard<std::mutex> lck(index_mtx_);
                ++read_idx_;
            }
            return status;
        }
    }

private:
    // seperate index mutex and data mutex for efficiency
    std::mutex index_mtx_;

    size_t read_idx_{ 0 };
    size_t write_idx_{ 0 };

    std::vector<Cell> buffer_ = std::vector<Cell>(BufferLength);
};

The message buffer is implemented as a circular buffer.

Since we assume messages are fixed size. The internal storage is a vector of fixed size bytes.

We have a mutex for indexes and a mutex for each cell. Why? in a single mutex design, only 1 thread can operate on the buffer. Whereas in the multi-mutex design, multiple threads can operate on the buffer (but only 1 thread can operate on a cell). We achieved higher concurrency.

The ProcessFunction is basically read and process. If there is available data, the ProcessFunction operates on the data. If it makes a copy of the cell, it is read.

Recv Thread Implementation

void start_recv_thread(Socket connected_socket)
{
    recv_thread_ = std::thread(
        [this, connected_socket]() {
            ...
            this->recv_data_loop(connected_socket);
            ...
        });
}


void recv_data_loop(Socket connected_socket)
{
    while (control::program_exit() == false) {
        bool status = recv_data_blob_and_write_to_queue(connected_socket);
        if (status == false) {
            std::cout << "recv data thread quit" << std::endl;
            break;
        } else {
            std::cout << "background thread receive data" << std::endl;
        }
    }
}

// Blocking call
// TODO: if sender crash in the middle of sending, the function is blocked forever.
bool recv_data_blob_and_write_to_queue(Socket connected_socket)
{
    // TODO: super large array in stack is not good.
    char buf[CellSizeByte];
    namespace sync = comms::package_sync;
    sync::SyncStatus status = sync::wait_for_control_packge(...);
    if (status == sync::SyncStatus::success) {
        ...
            bool recv_status = recv_all(...);
            if (recv_status == false) {
                return false;
            }
        ...


        buffer_.write(buf);
        return true;
    } else {
        std::cout << "tcp recv fail" << std::endl;
        return false;
    }
}

CircularBuffer<BUFFER_LENGTH, CellSizeByte> buffer_;
std::thread recv_thread_;


Line 1-9: We call the start_recv_thread when the client/server starts. The thread calls recv_data_loop.

Line 12-23: In recv_data_loop, the control::program_exit() ensures when ctrl + c is press, we jump out of the loop. In the while loop, we call recv_data_blob_and_write_to_queue.

Line 27-48: recv_data_blob_and_write_to_queue tries to synchronize TCP stream and receive all packages for an object. I will discuss it later. Note recv_all is a blocking call. If recv_all returns true, we receive all bytes for an object. We copy the received data into the buffer.

4.3 Package Management

Recv all packages

TCP delivers packages in order. It is a guarantee.

But it may break or regroup packages.

For example, A client sends 100 bytes and 200 bytes. The server can receive 10 bytes, 120 bytes, 70 bytes.

Big surprise!

To solve this problem, we need to have a function to receive all the packages.

// https://beej.us/guide/bgnet/html//index.html
bool recv_all(int socket, char* buf, int want_size_byte)
{
    int total = 0;
    int n = -1;
    int want = want_size_byte;

    while (total < want_size_byte) {
        n = recv(socket, buf + total, want - total, 0);
        if (n == -1) {
            return false;
        }
        if (n == 0) {
            return false;
        }
        total += n;

    }
    assert(want_size_byte == total && "len != total");

    return true;
}

Message Separator/Header

There is another potential problem. We may not able to receive all the bytes for an object. For example, When the server start sends before the client process starts. In this case, we need to know this boundary of messages.

The general solution is to use a header for each message. But since our application only transfers a single type of fixed-size message. Using an end-of-message indicator is just fine.

The code to indicate end-of-message is in https://github.com/yimuw/yimu-blog/blob/master/comms/tcp_ip/comms_utils.h

We don’t need a separator since TCP guarantee to send all packages. But if we want to send general messages, we can easily make the refactor.

The Big Picture for Receive

Literally, a big picture.

Code

Code is here: https://github.com/yimuw/yimu-blog/tree/master/comms/tcp_ip

It was a fun project. Try to run it on 2 computers. Remember you can press enter key to stop/resume video.

Discussion

It was a toy example.

We can do more.

  1. The IP address was LAN address. How to make it work for WAN address?
  2. The message size is fixed. How to make it work for whatever messages?
  3. Using UDP is another option. Why not try it?
  4. How to test a distributed system?
I don’t know how to
1. Make the C API work for WAN IP.
2. Test a distributed system.
Please help!

It was the basis of the internet.

Fancy distributed system frameworks are based on the TCP/UDP system calls. The difference in their performance was due to implementation, system-design and different trade-offs.

Their have a humble start,

int sockfd = socket(domain, type, protocol)

# yes, int

Paper unlock

Software Infrastructure for an Autonomous Ground Vehicle
Matthew McNaughton, Christopher R. Baker, Tugrul Galatali† , Bryan Salesky, Christopher Urmson, Jason Ziglar

In section V. Inter-Process Communications of the paper, the authors designed a distributed system to send/recv messages. Within a computer, Unix Domain Socket is used to send data. TCP/IP is used to send data between computers.

What’s next?

In the previous paper, the authors send data within a computer by Unix Domain Socket. The API of the Unix Domain Socket is very similar to TCP. So implementing the Unix Domain Socket isn’t fun.

There is another more interesting way to do inter-process communication.

Shared-Memory.

6 thoughts on “TCP/IP, Net Video Player

  1. Pingback: sudoku
  2. Pingback: slot online
  3. Pingback: 220
  4. Pingback: paito hongkong

Leave a Reply