Build a Cloud Computing System by Zookeeper and Kafka

1. Why writing this post?

Cloud computing enable people use unlimited computational power to solve a problem.

Recently discovery (GPT-3) in Deep Learning shows a super huge simple neural net obtains amazing results. However, the training and inference of the huge network may not be done in a single GPU.

Distributed training is quite popular today. Basically, we break the training task into small sub-tasks and assign each sub-task to different machines.

For the inference, We can wait until Nvidia gives us a super GPU. Or, in my mind, It’s easier to send the data to a cloud by 5G. In the cloud, we can use multiple GPUs to run the inference (Assuming the super huge network is parallelable) and send back the inference result.

In this post, I’d like to implement a concrete could computing system using the Zookeeper library and the Kafka library.

It’s a simple tutorial for people who do not familiar with the topic (and also a study note for myself).

Source code

2. The Goal

2.1 The System

The goal is to build a cloud computing pipeline in Java using the Zookeeper library and the Kafka library.

Assuming there are many worker nodes (The worker nodes are basically program runs at different physical computers). Each worker node runs the computation requests from a client and sent the results back to the client.

A client split a large computation task into pieces and sent them to different worker nodes. After all the worker nodes finish, the client combines all the results together.

There is a monitor that show the status of servers. Each server uses the Kafka message queue to send status info to the monitor.

2.2 Problems

The idea was simple. However, there are two main problems in this system.

  1. How does the client know the existence of all the worker nodes?
    1. The problem is called the Service Discovery.
    2. One possible solution can be:
      1. Define a master node by configuring the master’s address in the config.
      2. Workers nodes register to the master node and sending their information (e.g. IP address, port number).
      3. When the client has a task, it can request the addresses of workers by asking the master node.
    3. The solution is already implemented by the Zookeeper library.
  2. How to represent a computational task?
    1. A simple solution is to send a binary and its config to the worker node. The worker node runs the binary with the config. Once the worker finishes its computation, it sends back the outputs of the task.
    2. We can achieve this by the HTTP request.

In the later sections, I will discuss how to solve the Service Discovery problem and how to implement the HTTP request.

3. Service Discovery by Zookeeper

Recall that Service discovery basically means how does a compute knows the existence of other tasks in a distributed system.

Zookeeper provides a shared distributed file system for programs in the cloud.

In our application, worker nodes register themselves to the Zookeeper and save their addresses to the Zookeeper shared file system.

When a client wants to know all the available workers, the client can query all the registered workers’ addresses from the Zookeeper. Then the client can simply send computation requests to these workers.

In the next section, we will implement the Service Discovery by Zookeeper.

3.1 Start Zookeeper

3.2 Worker nodes connect to Zookeeper

In the worker node implementation, we need (1) connect to the Zookeeper (2) store its address to Zookeeper.

To connect to the Zookeeper,

...
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
...
public class Application implements Watcher {
    private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
    private static final int SESSION_TIMEOUT = 3000;
    private static final int DEFAULT__SERVER_PORT = 8081;
    private ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException
    {
        int serverPort = args.length == 1 ? Integer.parseInt(args[0]) : DEFAULT__SERVER_PORT;
        Application application = new Application();
        ZooKeeper zooKeeper = application.connectToZookeeper();
        
        ...
    }
    public ZooKeeper connectToZookeeper() throws IOException
    {
        this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);
        return zooKeeper;
    }
    ...
}

We create a Zookeeper instance by new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);.

Note, Application class implemented the Watcher interface, the Watcher interface is an event handler for Zookeeper. I will skip it for now.

3.3 Register the worker node to Zookeeper

The Zookeeper shared file system stores data in a tree structure. It’s very similar to the tree file system of an operating system.

In a file system, to save addresses for all workers (servers) we can,

  1. Create a directory called workers
  2. Under the workers directory, we create a file for each work to save its address and other information.

In Zookeeper, it’s similar,

  1. We create a znode called workers. It is similar to a directory in a file system.
  2. under the workers znode, we create a node for each worker. In the node, we put the address of the worker.

To create the “directory” znode,

import org.apache.zookeeper.*;
...
public class ServiceRegistry implements Watcher {
    private static final String REGISTRY_ZNODE = "/workers";
    public ServiceRegistry(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
        createServiceRegistryZnode();
    }
    ...
    private void createServiceRegistryZnode() {
        try {
            if (zooKeeper.exists(REGISTRY_ZNODE, false) == null) {
                zooKeeper.create(REGISTRY_ZNODE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } ...
    }
    ...
}

zooKeeper.create(REGISTRY_ZNODE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); creates the “directory” znode.

We created a znode called workers. The znode have a property called CreateMode.PERSISTENT. It means, when the program disconnects to the Zookeeper, the workers znode will not be deleted.

To save a worker’s address,

...
public class ServiceRegistry implements Watcher {
    private static final String REGISTRY_ZNODE = "/workers";
    ...
    public void registerToCluster(String metadata) throws KeeperException, InterruptedException {
        if (this.currentZnode != null) {
            System.out.println("Already registered to service registry");
            return;
        }
        this.currentZnode = zooKeeper.create(REGISTRY_ZNODE + "/n_", metadata.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Registered to service registry");
    }
    ...
}

We called

zooKeeper.create(REGISTRY_ZNODE + "/n_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

It means to create a znode under the “directory” znode workers. The name of the node will be uniquely generated sequentially. We are going to save the string metadata into the node. The EPHEMERAL means the node will be deleted when the program disconnects to Zookeeper.

The worker program (or process) calls the ServiceRegistry like this,

String currentServerAddress = String.format("http://%s:%d", InetAddress.getLocalHost().getCanonicalHostName(), serverPort);
serviceRegistry.registerToCluster(currentServerAddress);

We create a worker znode with the address (IP address and port number) of the current worker.

3.4 Run the code

Download Kafka from this link. The Kafka is a messaging system that builds on top of Zookeeper. Zookeeper is included in its release.

First make sure, the port number in the /config/zookeeper.properties is 2181

...
clientPort=2181
...

Then, start Zookeeper by,

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

We can use a command line tool to check the current znodes in zookeeper.

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181
ls /
[]

At the beginning, there is no znode under the root /.

Then, we compile and start a worker node at (ip: localhost, port: 8081).

yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ mvn clean package
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar  8081

In the zookeeper command line tool, I can check for the new znode.

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181
ls /
[workers]
ls /workers
[n_0000000037]

The “directory” znode /workers is created. The worker znode n_0000000037 is also created.

We can check the meta-data inside the n_0000000037 by using the get command.

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181
get /workers/n_0000000037
http://yimu-mate:8081
[..other info...]

The address for the worker node associated with n_0000000037 is http://yimu-mate:8081

I start another worker at (ip:localhost, port:8082). ls command shows the new worker.

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181
ls /workers
[n_0000000037, n_0000000038]
get /workers/n_0000000038
http://yimu-mate:8082
[...other info...]

I can kill the worker on port 8082 , then the available works become,

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181
ls /workers
[n_0000000037]
get /workers/n_0000000037
http://yimu-mate:8081
[...other info...]

In sum, Zookeeper makes the service discovery easy. To get the addresses of servers, the client simply needs to read the addresses from the Zookeeper shared file system.

3.5 Client side

The client sends computational requests to workers. The workers do the computation and send back results to the client.

To achieve it, the client simply needs to go to the Zookeeper shared file system and reads the addresses for available workers.

import org.apache.zookeeper.*;
...
public class ServiceRegistry implements Watcher {
    private static final String REGISTRY_ZNODE = "/workers";
    private final ZooKeeper zooKeeper;
    private String currentZnode = null;
    private List<String> allServiceAddresses = null;
    public synchronized List<String> getAllServiceAddresses() throws KeeperException, InterruptedException {
        if (allServiceAddresses == null) {
            updateAddresses();
        }
        return allServiceAddresses;
    }
    ...
    private synchronized void updateAddresses() throws KeeperException, InterruptedException {
        List<String> workerZnodes = zooKeeper.getChildren(REGISTRY_ZNODE, this);
        List<String> addresses = new ArrayList<>(workerZnodes.size());
        for (String workerZnode : workerZnodes) {
            String workerFullPath = REGISTRY_ZNODE + "/" + workerZnode;
            Stat stat = zooKeeper.exists(workerFullPath, false);
            if (stat == null) {
                continue;
            }
            byte[] addressBytes = zooKeeper.getData(workerFullPath, false, stat);
            String address = new String(addressBytes);
            addresses.add(address);
        }
        this.allServiceAddresses = Collections.unmodifiableList(addresses);
        System.out.println("The cluster addresses are: " + this.allServiceAddresses);
    }
    ...
}

In updateAddresses(), we simple iterate and save all available workers in the shared file system.

The service discovery by ZooKeeper is done!

Thanks to ZooKeeper, service discovery becomes a simple file system operation. What an elegant solution!

Next, we will define the computational tasks sent by the client.

4. The Cloud Computing Task

Let’s do a simple example.

Let’s say we want to compute the sum of integers from 0 to 50000000. To use the cloud computing system to do it, we need to create the task into sub-tasks.

A sub-task can be defined as computing the sum from integer a to integer b. It can be implemented as,

import java.io.*;
import java.math.BigInteger;
public class Application {
    public static void main(String[] args) throws FileNotFoundException
    {
        BigInteger start = new BigInteger(args[0]);
        BigInteger end = new BigInteger(args[1]);
        
        BigInteger sum = new BigInteger("0");
        for(; start.compareTo(end) < 0; start = start.add(BigInteger.ONE)) {
            sum = sum.add(start);
        }
        System.out.println("sum: " + sum);
        PrintWriter p = new PrintWriter(new FileOutputStream("output.txt", false));
        p.println(sum.toString());
        p.close();
        System.out.println("file saved");
    }
}

The program take a start and a end as argument. It saves the result into a output file.

To compute the sum from 0 to 50000000, we can break the task into:

  1. compute the sum from 0 to 10000000;
  2. the sum from 10000000 to 20000000;
  3. the sum from 20000000 to 30000000;
  4. the sum from 30000000 to 40000000;
  5. the sum from 40000000 to 50000000.

For each sub-task, we can send the task (program binary and arguments) to workers as HTTP requests. The worker computes the result and send it back as an HTTP response. In the end, the client combines the results together.

In the client code,

...
class Task {
    Task(String id, String bin, String arg)
    {
        this.id = id;
        this.binName = bin;
        this.argName = arg;
    }
    String id;
    String binName;
    String argName;
    CompletableFuture<String> serverResponse;
    String resultPath;
}

public class Application {
    static private byte[] readData(String fileName) throws IOException
    {
        ....
    }
    static private List<String> getWorkAddress()
    {
        try {
            WorkerRegistry workerRegistry = new WorkerRegistry();
            return workerRegistry.getAllServiceAddresses();
        } ...
    }
    public static void main(String[] args) throws Exception
    {
        List<String> workAddresses = getWorkAddress();
        ...
        
        ArrayList<Task> tasks = new ArrayList<Task>();
        final String PATH_TO_BIN = "./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar";
        tasks.add(new Task("task1", PATH_TO_BIN, "0 100000000"));
        tasks.add(new Task("task2", PATH_TO_BIN, "100000000 200000000"));
        tasks.add(new Task("task3", PATH_TO_BIN, "200000000 300000000"));
        tasks.add(new Task("task4", PATH_TO_BIN, "300000000 40000000"));
        tasks.add(new Task("task5", PATH_TO_BIN, "40000000 50000000"));
        for (int i = 0; i < tasks.size(); ++i) {
            Task task = tasks.get(i);
            task.id += "-" + System.currentTimeMillis();
            byte[] bin = readData(task.binName);
            ComputingTask computingTask = new ComputingTask(bin, task.argName, task.binName, task.id);
            WebClient webClient = new WebClient();
            System.out.println(String.format("send task [%s] to server [%s]", task.id, workAddresses.get(i % workAddresses.size())));
            task.serverResponse = webClient.sendTask(workAddresses.get(i % workAddresses.size()) + "/task",
                ComputingTask.serialize(computingTask));
        }
        BigInteger sum = new BigInteger("0");
        for (int i = 0; i < tasks.size(); ++i) {
            Task task = tasks.get(i);
            try {
                task.serverResponse.join();
                String result = task.serverResponse.get();
                System.out.println(String.format("recv result for task : %s", task.id));
                BigInteger subResult = new BigInteger(result.replace("\n", ""));
                sum = sum.add(subResult);
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("exception when getting response from servers");
                e.printStackTrace();
                ;
                throw e;
            }
        }
        System.out.println("sum: " + sum);
    }
}

In line 3-15: we define the class to hold all data related to a cloud computing task.

In line 38-44: we break the task to 5 sub tasks.

In line 46-57: for each task, we read the binary and send the binary and arguments to a worker.

In line 59~72: we combine the results from workers.

On the worker side,

...
public class Worker {
    private static final String TASK_ENDPOINT = "/task";
    private static final String STATUS_ENDPOINT = "/status";
    private final int port;
    private HttpServer server;
    private LinkedList<String> tasksProcessed;
    public Worker(int port)
    {
        tasksProcessed = new LinkedList<String>();
        this.port = port;
    }
    private void startKafkaSendingThread()
    {
        KafkaStatusThread t = new KafkaStatusThread(this.tasksProcessed, server.getAddress().getHostString() + port);
        t.start();
    }
    public void startServer()
    {
        try {
            this.server = HttpServer.create(new InetSocketAddress(port), 0);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }
        ...
        HttpContext taskContext = server.createContext(TASK_ENDPOINT);
        taskContext.setHandler(this ::handleTaskRequest);
        server.setExecutor(Executors.newFixedThreadPool(1));
        server.start();
        ...
    }
    ...
    
    private void handleTaskRequest(HttpExchange exchange) throws IOException
    {
        if (!exchange.getRequestMethod().equalsIgnoreCase("post")) {
            exchange.close();
            return;
        }
        System.out.println("============ Handling new computing request =============");
        byte[] requestBytes = exchange.getRequestBody().readAllBytes();
        messages.ComputingTask task = messages.ComputingTask.deserialize(requestBytes);
        trackTasks(task.taskId);
        TaskStatus status = calculateResponse(task.bin, task.taskId, task.binName, task.args);
        if (status.success) {
            sendResponse(status.result, exchange, 200);
        } else {
            sendResponse(status.result, exchange, 400);
        }
        System.out.println("============ Done handling request =============");
    }
    ...
    private void setUp(byte[] requestBytes, String taskDir, String binName) throws IOException
    {
        ...
    }
    private byte[] postProcess(String taskDir) throws IOException
    {
        ...
    }
    private TaskStatus calculateResponse(byte[] requestBytes, String taskDir, String binName, String binArgs)
    {
        try {
            setUp(requestBytes, taskDir, binName);
            startCommand(taskDir, binName, binArgs);
            return new TaskStatus(postProcess(taskDir), true);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            return new TaskStatus(new byte[0], false);
        }
    }
    private void startCommand(String taskDir, String binName, String binArgs) throws IOException, InterruptedException
    {
        File file = new File(binName);
        String command = "java -jar " + file.getName() + " " + binArgs;
        String fullCommand = String.format("cd %s; %s", taskDir, command);
        System.out.println(String.format("Launching worker instance : %s ", fullCommand));
        String[] cmd1 = { "/bin/sh", "-c", fullCommand };
        Process p1 = Runtime.getRuntime().exec(cmd1);
        p1.waitFor();
        System.out.println("process finish!");
    }
    ...
    private void sendResponse(byte[] responseBytes, HttpExchange exchange, int code) throws IOException
    {
        exchange.sendResponseHeaders(code, responseBytes.length);
        OutputStream outputStream = exchange.getResponseBody();
        outputStream.write(responseBytes);
        outputStream.flush();
        outputStream.close();
        exchange.close();
    }
}

In line 33: we set the handler for computation requests.

The handler is defined in line 81. We basically deserialize the binary and run the binary with arguments. Once the task is finished, we serialized the result to bytes and sent the bytes back.

4.1 Run the task

Now we can test the cloud computing task.

First download Kafka with Zookeeper from this link.

Then, start the Zookeeper server. Please make sure the Zookeeper port is 2181.

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

Also, start a Kafka server. I will explain its usage in the next section. Please make sure (1) the log.dirs in the Kafka server config points to a valid place if you are using Ubuntu and (2) listeners=PLAINTEXT://:9092

yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/kafka-server-start.sh  config/server-0.properties

Then compile these 3 java applications by run mvn clean package in their directories.

yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ ls
addIntegers  cloudclient  worker
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ cd addIntegers
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/addIntegers$ mvn clean package
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/addIntegers$ cd ..
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ cd cloudclient
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/cloudclient mvn clean package
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/cloudclient cd ..
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ cd worker
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker mvn clean package

Then start a worker in a terminal. If you see a few Kafka warnings, you can ignore it for now.

yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar  8081
Successfully connected to Zookeeper
server start!
kafka producer created
kafka update thread start
start kafka thread
Registered to service registry

Now, copy the binary for addIntegers to the cloudclient fold and start the client. You should able to see the following outputs.

yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/cloudclient$ java -jar target/cloud.client-1.0-SNAPSHOT-jar-with-dependencies.jar 
Successfully connected to Zookeeper
worker address: http://yimu-mate:8081
send task [task1-1594001158933] to server [http://yimu-mate:8081]
send task [task2-1594001159256] to server [http://yimu-mate:8081]
send task [task3-1594001159277] to server [http://yimu-mate:8081]
send task [task4-1594001159301] to server [http://yimu-mate:8081]
send task [task5-1594001159335] to server [http://yimu-mate:8081]
recv result for task : task1-1594001158933
recv result for task : task2-1594001159256
recv result for task : task3-1594001159277
recv result for task : task4-1594001159301
recv result for task : task5-1594001159335
sum: 45449999845000000

And the following output from the worker,

yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar  8081
Successfully connected to Zookeeper
server start!
kafka producer created
kafka update thread start
start kafka thread
Registered to service registry
============ Handling new computing request =============
saving bin at:task2-1594001159256/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task2-1594001159256; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 100000000 200000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task1-1594001158933/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task1-1594001158933; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 0 100000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task3-1594001159277/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task3-1594001159277; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 200000000 300000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task4-1594001159301/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task4-1594001159301; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 300000000 40000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task5-1594001159335/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task5-1594001159335; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 40000000 50000000 
process finish!
============ Done handling request =============

4.2 Multiple Workers

We can add another worker at port 8082 by,

yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar  8082
Successfully connected to Zookeeper
server start!
kafka producer created
kafka update thread start
start kafka thread
Registered to service registry

Then, we client can auto-detect these 2 workers using Zookeeper and sends tasks to both of them. Let’s start the client again.

yimu@yimu-mate:~/Desktop/blog/yimu-blog/comms/cloud_computing/cloudclient$ java -jar target/cloud.client-1.0-SNAPSHOT-jar-with-dependencies.jar 
Successfully connected to Zookeeper
worker address: http://yimu-mate:8082
worker address: http://yimu-mate:8081
send task [task1-1594001356150] to server [http://yimu-mate:8082]
send task [task2-1594001356492] to server [http://yimu-mate:8081]
send task [task3-1594001356512] to server [http://yimu-mate:8082]
send task [task4-1594001356537] to server [http://yimu-mate:8081]
send task [task5-1594001356578] to server [http://yimu-mate:8082]
recv result for task : task1-1594001356150
recv result for task : task2-1594001356492
recv result for task : task3-1594001356512
recv result for task : task4-1594001356537
recv result for task : task5-1594001356578
sum: 45449999845000000

You can see 3 tasks went to a worker at http://yimu-mate:8082 and 2 tasks went to http://yimu-mate:8081

The outputs from the worker at 8081 are,

============ Handling new computing request =============
saving bin at:task2-1594001356492/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task2-1594001356492; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 100000000 200000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task4-1594001356537/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task4-1594001356537; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 300000000 40000000 
process finish!
============ Done handling request =============

The outputs from the worker at port 8082 are,

============ Handling new computing request =============
saving bin at:task3-1594001356512/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task3-1594001356512; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 200000000 300000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task1-1594001356150/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task1-1594001356150; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 0 100000000 
process finish!
============ Done handling request =============
============ Handling new computing request =============
saving bin at:task5-1594001356578/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar
bin received and saved
Launching worker instance : cd task5-1594001356578; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 40000000 50000000 
process finish!
============ Done handling request =============

5 Kafka Message Queue

Message queue is an important tool for streaming data processing.

Kafka is a distributed message queue library. Let’s use the message queue to build a task monitor.

The workers will send it’s processed tasks to the task monitor by Kafka. The task monitor reads the messages and displays them.

We can define a message type call WokerStatus.

package messages.kafka;
import java.util.LinkedList;
public class WorkerStatus {
    public WorkerStatus() {
    }
    public WorkerStatus(String workerName, LinkedList<String> processedTask) {
        this.processedTask = processedTask;
        this.workerName = workerName;
    }
    private String workerName = new String();
    private LinkedList<String> processedTask = new LinkedList<String>();
    public String getWorkerName()
    {
        return this.workerName;
    }
    public LinkedList<String> getProcessedTask()
    {
        return this.processedTask;
    }
}

The worker sends the WorkerStatus messages to a Kafka topic called workers_status.

On the worker side we can send the message like this,

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaHelper<T> {
    private static final String TOPIC = "workers_status";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    private String typeSerializerName;
    private Producer<Long, T> kafkaProducer;
    public KafkaHelper(String typeSerializerName)
    {
        this.typeSerializerName = typeSerializerName;
        this.kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
        System.out.println("kafka producer created");
        ;
    }
    public void produceMessages(T message) throws ExecutionException, InterruptedException
    {
        ProducerRecord<Long, T> record = new ProducerRecord<>(TOPIC, message);
        RecordMetadata recordMetadata = kafkaProducer.send(record).get();
    }
    public Producer<Long, T> createKafkaProducer(String bootstrapServers)
    {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.typeSerializerName);
        return new KafkaProducer<>(properties);
    }
}

Note we set the serializer for the WorkerStatus type in line 35. Please check the code for the implementation of the serializer.

The worker monitor simply needs to subscribe the the worker_status topic and process the messages.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import messages.kafka.*;
/**
 * Apache Kafka - Building Kafka Consumers, Scalability and Pub/Sub
 */
public class Application {
    private static final String TOPIC = "workers_status";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    public static void main(String[] args) {
        String consumerGroup = "defaultConsumerGroup";
        if (args.length == 1) {
            consumerGroup = args[0];
        }
        System.out.println("Consumer is part of consumer group " + consumerGroup);
        Consumer<Long, WorkerStatus> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
        consumeMessages(TOPIC, kafkaConsumer);
    }
    public static void consumeMessages(String topic, Consumer<Long, WorkerStatus> kafkaConsumer) {
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<Long, WorkerStatus> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            if (consumerRecords.isEmpty()) {
                // do something else
            }
            for (ConsumerRecord<Long, WorkerStatus> record : consumerRecords) {
                System.out.println(String.format("Status for task %s", record.value().getWorkerName()));
                System.out.println("current processed tasks:");
                String tasksInfo = String.join(",", record.value().getProcessedTask());
                System.out.println(tasksInfo);
            }
            // do something with the records
            kafkaConsumer.commitAsync();
        }
    }
    public static Consumer<Long, WorkerStatus> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, WorkerStatusDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new KafkaConsumer<>(properties);
    }
}

In line 56, we set the deserializer for the WorkerStatus. In the consumeMessages function, we keep reading and print the status for all workers.

Now, all the works send status messages to the monitor task. The Monitor task prints the received status messages.

5.2 Run the worker monitor task

We can run the worker monitor binary by,

yimu@yimu-mate:~/Desktop/blog/yimu-blog/comms/cloud_computing/workermoniter$ java -jar target/workermoniter-1.0-SNAPSHOT-jar-with-dependencies.jar

The monitor outputs the current status of workers. For example,

Status for task 0:0:0:0:0:0:0:08081
current processed tasks:
task2-1599935832948,task4-1599935833005,task2-1599936085190,task4-1599936085269,task2-1599936286741,task4-1599936286807,task2-1599936366300,task4-1599936366369,task2-1599936632817,task4-1599936632861
Status for task 0:0:0:0:0:0:0:08082
current processed tasks:
task1-1599936286357,task3-1599936286772,task5-1599936286837,task1-1599936365916,task3-1599936366333,task5-1599936366409,task3-1599936632842,task1-1599936632439,task5-1599936632903

6 Conclusion

In this post, we built a simple cloud computing system.

  1. We used the Zookeeper to do the service discovery. A client is able to find all worker nodes easily without considering all the corner cases of a distributed system.
  2. We used the HTTP request to send computation tasks to workers and receive results from them.
  3. We used the Kafka to send server status to a monitor task,

If you want to learn more about the distributed system, I highly recommend this course (in fact, almost all of the materials in this post are base on it).

Leave a Reply