1. Why writing this post?
Cloud computing enable people use unlimited computational power to solve a problem.
Recently discovery (GPT-3) in Deep Learning shows a super huge simple neural net obtains amazing results. However, the training and inference of the huge network may not be done in a single GPU.
Distributed training is quite popular today. Basically, we break the training task into small sub-tasks and assign each sub-task to different machines.
For the inference, We can wait until Nvidia gives us a super GPU. Or, in my mind, It’s easier to send the data to a cloud by 5G. In the cloud, we can use multiple GPUs to run the inference (Assuming the super huge network is parallelable) and send back the inference result.
In this post, I’d like to implement a concrete could computing system using the Zookeeper library and the Kafka library.
It’s a simple tutorial for people who do not familiar with the topic (and also a study note for myself).
2. The Goal
2.1 The System
The goal is to build a cloud computing pipeline in Java using the Zookeeper library and the Kafka library.
Assuming there are many worker nodes (The worker nodes are basically program runs at different physical computers). Each worker node runs the computation requests from a client and sent the results back to the client.
A client split a large computation task into pieces and sent them to different worker nodes. After all the worker nodes finish, the client combines all the results together.
There is a monitor that show the status of servers. Each server uses the Kafka message queue to send status info to the monitor.
2.2 Problems
The idea was simple. However, there are two main problems in this system.
- How does the client know the existence of all the worker nodes?
- The problem is called the Service Discovery.
- One possible solution can be:
- Define a master node by configuring the master’s address in the config.
- Workers nodes register to the master node and sending their information (e.g. IP address, port number).
- When the client has a task, it can request the addresses of workers by asking the master node.
- The solution is already implemented by the Zookeeper library.
- How to represent a computational task?
- A simple solution is to send a binary and its config to the worker node. The worker node runs the binary with the config. Once the worker finishes its computation, it sends back the outputs of the task.
- We can achieve this by the HTTP request.
In the later sections, I will discuss how to solve the Service Discovery problem and how to implement the HTTP request.
3. Service Discovery by Zookeeper
Recall that Service discovery basically means how does a compute knows the existence of other tasks in a distributed system.
Zookeeper provides a shared distributed file system for programs in the cloud.
In our application, worker nodes register themselves to the Zookeeper and save their addresses to the Zookeeper shared file system.
When a client wants to know all the available workers, the client can query all the registered workers’ addresses from the Zookeeper. Then the client can simply send computation requests to these workers.
In the next section, we will implement the Service Discovery by Zookeeper.
3.1 Start Zookeeper
3.2 Worker nodes connect to Zookeeper
In the worker node implementation, we need (1) connect to the Zookeeper (2) store its address to Zookeeper.
To connect to the Zookeeper,
... import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; ... public class Application implements Watcher { private static final String ZOOKEEPER_ADDRESS = "localhost:2181"; private static final int SESSION_TIMEOUT = 3000; private static final int DEFAULT__SERVER_PORT = 8081; private ZooKeeper zooKeeper; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { int serverPort = args.length == 1 ? Integer.parseInt(args[0]) : DEFAULT__SERVER_PORT; Application application = new Application(); ZooKeeper zooKeeper = application.connectToZookeeper(); ... } public ZooKeeper connectToZookeeper() throws IOException { this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this); return zooKeeper; } ... }
We create a Zookeeper instance by new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);
.
Note, Application class implemented the Watcher interface, the Watcher interface is an event handler for Zookeeper. I will skip it for now.
3.3 Register the worker node to Zookeeper
The Zookeeper shared file system stores data in a tree structure. It’s very similar to the tree file system of an operating system.
In a file system, to save addresses for all workers (servers) we can,
- Create a directory called
workers
- Under the
workers
directory, we create a file for each work to save its address and other information.
In Zookeeper, it’s similar,
- We create a znode called
workers
. It is similar to a directory in a file system. - under the
workers
znode, we create a node for each worker. In the node, we put the address of the worker.
To create the “directory” znode,
import org.apache.zookeeper.*; ... public class ServiceRegistry implements Watcher { private static final String REGISTRY_ZNODE = "/workers"; public ServiceRegistry(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; createServiceRegistryZnode(); } ... private void createServiceRegistryZnode() { try { if (zooKeeper.exists(REGISTRY_ZNODE, false) == null) { zooKeeper.create(REGISTRY_ZNODE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } ... } ... }
zooKeeper.create(REGISTRY_ZNODE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
creates the “directory” znode.
We created a znode called workers
. The znode have a property called CreateMode.PERSISTENT
. It means, when the program disconnects to the Zookeeper, the workers
znode will not be deleted.
To save a worker’s address,
... public class ServiceRegistry implements Watcher { private static final String REGISTRY_ZNODE = "/workers"; ... public void registerToCluster(String metadata) throws KeeperException, InterruptedException { if (this.currentZnode != null) { System.out.println("Already registered to service registry"); return; } this.currentZnode = zooKeeper.create(REGISTRY_ZNODE + "/n_", metadata.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Registered to service registry"); } ... }
We called
zooKeeper.create(REGISTRY_ZNODE + "/n_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
It means to create a znode under the “directory” znode workers
. The name of the node will be uniquely generated sequentially. We are going to save the string metadata into the node. The EPHEMERAL means the node will be deleted when the program disconnects to Zookeeper.
The worker program (or process) calls the ServiceRegistry
like this,
String currentServerAddress = String.format("http://%s:%d", InetAddress.getLocalHost().getCanonicalHostName(), serverPort); serviceRegistry.registerToCluster(currentServerAddress);
We create a worker znode with the address (IP address and port number) of the current worker.
3.4 Run the code
Download Kafka from this link. The Kafka is a messaging system that builds on top of Zookeeper. Zookeeper is included in its release.
First make sure, the port number in the /config/zookeeper.properties
is 2181
... clientPort=2181 ...
Then, start Zookeeper by,
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
We can use a command line tool to check the current znodes in zookeeper.
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181 ls / []
At the beginning, there is no znode under the root /
.
Then, we compile and start a worker node at (ip: localhost, port: 8081).
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ mvn clean package yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar 8081
In the zookeeper command line tool, I can check for the new znode.
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181 ls / [workers] ls /workers [n_0000000037]
The “directory” znode /workers
is created. The worker znode n_0000000037
is also created.
We can check the meta-data inside the n_0000000037
by using the get command.
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181 get /workers/n_0000000037 http://yimu-mate:8081 [..other info...]
The address for the worker node associated with n_0000000037
is http://yimu-mate:8081
I start another worker at (ip:localhost, port:8082). ls
command shows the new worker.
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181 ls /workers [n_0000000037, n_0000000038] get /workers/n_0000000038 http://yimu-mate:8082 [...other info...]
I can kill the worker on port 8082 , then the available works become,
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-shell.sh localhost:2181 ls /workers [n_0000000037] get /workers/n_0000000037 http://yimu-mate:8081 [...other info...]
In sum, Zookeeper makes the service discovery easy. To get the addresses of servers, the client simply needs to read the addresses from the Zookeeper shared file system.
3.5 Client side
The client sends computational requests to workers. The workers do the computation and send back results to the client.
To achieve it, the client simply needs to go to the Zookeeper shared file system and reads the addresses for available workers.
import org.apache.zookeeper.*; ... public class ServiceRegistry implements Watcher { private static final String REGISTRY_ZNODE = "/workers"; private final ZooKeeper zooKeeper; private String currentZnode = null; private List<String> allServiceAddresses = null; public synchronized List<String> getAllServiceAddresses() throws KeeperException, InterruptedException { if (allServiceAddresses == null) { updateAddresses(); } return allServiceAddresses; } ... private synchronized void updateAddresses() throws KeeperException, InterruptedException { List<String> workerZnodes = zooKeeper.getChildren(REGISTRY_ZNODE, this); List<String> addresses = new ArrayList<>(workerZnodes.size()); for (String workerZnode : workerZnodes) { String workerFullPath = REGISTRY_ZNODE + "/" + workerZnode; Stat stat = zooKeeper.exists(workerFullPath, false); if (stat == null) { continue; } byte[] addressBytes = zooKeeper.getData(workerFullPath, false, stat); String address = new String(addressBytes); addresses.add(address); } this.allServiceAddresses = Collections.unmodifiableList(addresses); System.out.println("The cluster addresses are: " + this.allServiceAddresses); } ... }
In updateAddresses()
, we simple iterate and save all available workers in the shared file system.
The service discovery by ZooKeeper is done!
Thanks to ZooKeeper, service discovery becomes a simple file system operation. What an elegant solution!
Next, we will define the computational tasks sent by the client.
4. The Cloud Computing Task
Let’s do a simple example.
Let’s say we want to compute the sum of integers from 0 to 50000000. To use the cloud computing system to do it, we need to create the task into sub-tasks.
A sub-task can be defined as computing the sum from integer a to integer b. It can be implemented as,
import java.io.*; import java.math.BigInteger; public class Application { public static void main(String[] args) throws FileNotFoundException { BigInteger start = new BigInteger(args[0]); BigInteger end = new BigInteger(args[1]); BigInteger sum = new BigInteger("0"); for(; start.compareTo(end) < 0; start = start.add(BigInteger.ONE)) { sum = sum.add(start); } System.out.println("sum: " + sum); PrintWriter p = new PrintWriter(new FileOutputStream("output.txt", false)); p.println(sum.toString()); p.close(); System.out.println("file saved"); } }
The program take a start and a end as argument. It saves the result into a output file.
To compute the sum from 0 to 50000000, we can break the task into:
- compute the sum from 0 to 10000000;
- the sum from 10000000 to 20000000;
- the sum from 20000000 to 30000000;
- the sum from 30000000 to 40000000;
- the sum from 40000000 to 50000000.
For each sub-task, we can send the task (program binary and arguments) to workers as HTTP requests. The worker computes the result and send it back as an HTTP response. In the end, the client combines the results together.
In the client code,
... class Task { Task(String id, String bin, String arg) { this.id = id; this.binName = bin; this.argName = arg; } String id; String binName; String argName; CompletableFuture<String> serverResponse; String resultPath; } public class Application { static private byte[] readData(String fileName) throws IOException { .... } static private List<String> getWorkAddress() { try { WorkerRegistry workerRegistry = new WorkerRegistry(); return workerRegistry.getAllServiceAddresses(); } ... } public static void main(String[] args) throws Exception { List<String> workAddresses = getWorkAddress(); ... ArrayList<Task> tasks = new ArrayList<Task>(); final String PATH_TO_BIN = "./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar"; tasks.add(new Task("task1", PATH_TO_BIN, "0 100000000")); tasks.add(new Task("task2", PATH_TO_BIN, "100000000 200000000")); tasks.add(new Task("task3", PATH_TO_BIN, "200000000 300000000")); tasks.add(new Task("task4", PATH_TO_BIN, "300000000 40000000")); tasks.add(new Task("task5", PATH_TO_BIN, "40000000 50000000")); for (int i = 0; i < tasks.size(); ++i) { Task task = tasks.get(i); task.id += "-" + System.currentTimeMillis(); byte[] bin = readData(task.binName); ComputingTask computingTask = new ComputingTask(bin, task.argName, task.binName, task.id); WebClient webClient = new WebClient(); System.out.println(String.format("send task [%s] to server [%s]", task.id, workAddresses.get(i % workAddresses.size()))); task.serverResponse = webClient.sendTask(workAddresses.get(i % workAddresses.size()) + "/task", ComputingTask.serialize(computingTask)); } BigInteger sum = new BigInteger("0"); for (int i = 0; i < tasks.size(); ++i) { Task task = tasks.get(i); try { task.serverResponse.join(); String result = task.serverResponse.get(); System.out.println(String.format("recv result for task : %s", task.id)); BigInteger subResult = new BigInteger(result.replace("\n", "")); sum = sum.add(subResult); } catch (InterruptedException | ExecutionException e) { System.out.println("exception when getting response from servers"); e.printStackTrace(); ; throw e; } } System.out.println("sum: " + sum); } }
In line 3-15: we define the class to hold all data related to a cloud computing task.
In line 38-44: we break the task to 5 sub tasks.
In line 46-57: for each task, we read the binary and send the binary and arguments to a worker.
In line 59~72: we combine the results from workers.
On the worker side,
... public class Worker { private static final String TASK_ENDPOINT = "/task"; private static final String STATUS_ENDPOINT = "/status"; private final int port; private HttpServer server; private LinkedList<String> tasksProcessed; public Worker(int port) { tasksProcessed = new LinkedList<String>(); this.port = port; } private void startKafkaSendingThread() { KafkaStatusThread t = new KafkaStatusThread(this.tasksProcessed, server.getAddress().getHostString() + port); t.start(); } public void startServer() { try { this.server = HttpServer.create(new InetSocketAddress(port), 0); } catch (IOException e) { e.printStackTrace(); return; } ... HttpContext taskContext = server.createContext(TASK_ENDPOINT); taskContext.setHandler(this ::handleTaskRequest); server.setExecutor(Executors.newFixedThreadPool(1)); server.start(); ... } ... private void handleTaskRequest(HttpExchange exchange) throws IOException { if (!exchange.getRequestMethod().equalsIgnoreCase("post")) { exchange.close(); return; } System.out.println("============ Handling new computing request ============="); byte[] requestBytes = exchange.getRequestBody().readAllBytes(); messages.ComputingTask task = messages.ComputingTask.deserialize(requestBytes); trackTasks(task.taskId); TaskStatus status = calculateResponse(task.bin, task.taskId, task.binName, task.args); if (status.success) { sendResponse(status.result, exchange, 200); } else { sendResponse(status.result, exchange, 400); } System.out.println("============ Done handling request ============="); } ... private void setUp(byte[] requestBytes, String taskDir, String binName) throws IOException { ... } private byte[] postProcess(String taskDir) throws IOException { ... } private TaskStatus calculateResponse(byte[] requestBytes, String taskDir, String binName, String binArgs) { try { setUp(requestBytes, taskDir, binName); startCommand(taskDir, binName, binArgs); return new TaskStatus(postProcess(taskDir), true); } catch (IOException | InterruptedException e) { e.printStackTrace(); return new TaskStatus(new byte[0], false); } } private void startCommand(String taskDir, String binName, String binArgs) throws IOException, InterruptedException { File file = new File(binName); String command = "java -jar " + file.getName() + " " + binArgs; String fullCommand = String.format("cd %s; %s", taskDir, command); System.out.println(String.format("Launching worker instance : %s ", fullCommand)); String[] cmd1 = { "/bin/sh", "-c", fullCommand }; Process p1 = Runtime.getRuntime().exec(cmd1); p1.waitFor(); System.out.println("process finish!"); } ... private void sendResponse(byte[] responseBytes, HttpExchange exchange, int code) throws IOException { exchange.sendResponseHeaders(code, responseBytes.length); OutputStream outputStream = exchange.getResponseBody(); outputStream.write(responseBytes); outputStream.flush(); outputStream.close(); exchange.close(); } }
In line 33: we set the handler for computation requests.
The handler is defined in line 81. We basically deserialize the binary and run the binary with arguments. Once the task is finished, we serialized the result to bytes and sent the bytes back.
4.1 Run the task
Now we can test the cloud computing task.
First download Kafka with Zookeeper from this link.
Then, start the Zookeeper server. Please make sure the Zookeeper port is 2181.
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
Also, start a Kafka server. I will explain its usage in the next section. Please make sure (1) the log.dirs in the Kafka server config points to a valid place if you are using Ubuntu and (2) listeners=PLAINTEXT://:9092
yimu@yimu-mate:~/kafka_2.12-2.2.0$ ./bin/kafka-server-start.sh config/server-0.properties
Then compile these 3 java applications by run mvn clean package
in their directories.
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ ls addIntegers cloudclient worker yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ cd addIntegers yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/addIntegers$ mvn clean package yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/addIntegers$ cd .. yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ cd cloudclient yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/cloudclient mvn clean package yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/cloudclient cd .. yimu@yimu-mate:~/yimu-blog/comms/cloud_computing$ cd worker yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker mvn clean package
Then start a worker in a terminal. If you see a few Kafka warnings, you can ignore it for now.
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar 8081 Successfully connected to Zookeeper server start! kafka producer created kafka update thread start start kafka thread Registered to service registry
Now, copy the binary for addIntegers
to the cloudclient
fold and start the client. You should able to see the following outputs.
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/cloudclient$ java -jar target/cloud.client-1.0-SNAPSHOT-jar-with-dependencies.jar Successfully connected to Zookeeper worker address: http://yimu-mate:8081 send task [task1-1594001158933] to server [http://yimu-mate:8081] send task [task2-1594001159256] to server [http://yimu-mate:8081] send task [task3-1594001159277] to server [http://yimu-mate:8081] send task [task4-1594001159301] to server [http://yimu-mate:8081] send task [task5-1594001159335] to server [http://yimu-mate:8081] recv result for task : task1-1594001158933 recv result for task : task2-1594001159256 recv result for task : task3-1594001159277 recv result for task : task4-1594001159301 recv result for task : task5-1594001159335 sum: 45449999845000000
And the following output from the worker,
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar 8081 Successfully connected to Zookeeper server start! kafka producer created kafka update thread start start kafka thread Registered to service registry ============ Handling new computing request ============= saving bin at:task2-1594001159256/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task2-1594001159256; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 100000000 200000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task1-1594001158933/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task1-1594001158933; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 0 100000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task3-1594001159277/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task3-1594001159277; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 200000000 300000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task4-1594001159301/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task4-1594001159301; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 300000000 40000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task5-1594001159335/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task5-1594001159335; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 40000000 50000000 process finish! ============ Done handling request =============
4.2 Multiple Workers
We can add another worker at port 8082 by,
yimu@yimu-mate:~/yimu-blog/comms/cloud_computing/worker$ java -jar ./target/worker-1.0-SNAPSHOT-jar-with-dependencies.jar 8082 Successfully connected to Zookeeper server start! kafka producer created kafka update thread start start kafka thread Registered to service registry
Then, we client can auto-detect these 2 workers using Zookeeper and sends tasks to both of them. Let’s start the client again.
yimu@yimu-mate:~/Desktop/blog/yimu-blog/comms/cloud_computing/cloudclient$ java -jar target/cloud.client-1.0-SNAPSHOT-jar-with-dependencies.jar Successfully connected to Zookeeper worker address: http://yimu-mate:8082 worker address: http://yimu-mate:8081 send task [task1-1594001356150] to server [http://yimu-mate:8082] send task [task2-1594001356492] to server [http://yimu-mate:8081] send task [task3-1594001356512] to server [http://yimu-mate:8082] send task [task4-1594001356537] to server [http://yimu-mate:8081] send task [task5-1594001356578] to server [http://yimu-mate:8082] recv result for task : task1-1594001356150 recv result for task : task2-1594001356492 recv result for task : task3-1594001356512 recv result for task : task4-1594001356537 recv result for task : task5-1594001356578 sum: 45449999845000000
You can see 3 tasks went to a worker at http://yimu-mate:8082
and 2 tasks went to http://yimu-mate:8081
The outputs from the worker at 8081 are,
============ Handling new computing request ============= saving bin at:task2-1594001356492/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task2-1594001356492; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 100000000 200000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task4-1594001356537/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task4-1594001356537; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 300000000 40000000 process finish! ============ Done handling request =============
The outputs from the worker at port 8082 are,
============ Handling new computing request ============= saving bin at:task3-1594001356512/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task3-1594001356512; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 200000000 300000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task1-1594001356150/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task1-1594001356150; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 0 100000000 process finish! ============ Done handling request ============= ============ Handling new computing request ============= saving bin at:task5-1594001356578/./addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar bin received and saved Launching worker instance : cd task5-1594001356578; java -jar addIntegers-1.0-SNAPSHOT-jar-with-dependencies.jar 40000000 50000000 process finish! ============ Done handling request =============
5 Kafka Message Queue
Message queue is an important tool for streaming data processing.
Kafka is a distributed message queue library. Let’s use the message queue to build a task monitor.
The workers will send it’s processed tasks to the task monitor by Kafka. The task monitor reads the messages and displays them.
We can define a message type call WokerStatus
.
package messages.kafka; import java.util.LinkedList; public class WorkerStatus { public WorkerStatus() { } public WorkerStatus(String workerName, LinkedList<String> processedTask) { this.processedTask = processedTask; this.workerName = workerName; } private String workerName = new String(); private LinkedList<String> processedTask = new LinkedList<String>(); public String getWorkerName() { return this.workerName; } public LinkedList<String> getProcessedTask() { return this.processedTask; } }
The worker sends the WorkerStatus
messages to a Kafka topic called workers_status
.
On the worker side we can send the message like this,
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.LongSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaHelper<T> { private static final String TOPIC = "workers_status"; private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; private String typeSerializerName; private Producer<Long, T> kafkaProducer; public KafkaHelper(String typeSerializerName) { this.typeSerializerName = typeSerializerName; this.kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS); System.out.println("kafka producer created"); ; } public void produceMessages(T message) throws ExecutionException, InterruptedException { ProducerRecord<Long, T> record = new ProducerRecord<>(TOPIC, message); RecordMetadata recordMetadata = kafkaProducer.send(record).get(); } public Producer<Long, T> createKafkaProducer(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.typeSerializerName); return new KafkaProducer<>(properties); } }
Note we set the serializer for the WorkerStatus
type in line 35. Please check the code for the implementation of the serializer.
The worker monitor simply needs to subscribe the the worker_status
topic and process the messages.
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.LongDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; import messages.kafka.*; /** * Apache Kafka - Building Kafka Consumers, Scalability and Pub/Sub */ public class Application { private static final String TOPIC = "workers_status"; private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; public static void main(String[] args) { String consumerGroup = "defaultConsumerGroup"; if (args.length == 1) { consumerGroup = args[0]; } System.out.println("Consumer is part of consumer group " + consumerGroup); Consumer<Long, WorkerStatus> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup); consumeMessages(TOPIC, kafkaConsumer); } public static void consumeMessages(String topic, Consumer<Long, WorkerStatus> kafkaConsumer) { kafkaConsumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<Long, WorkerStatus> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); if (consumerRecords.isEmpty()) { // do something else } for (ConsumerRecord<Long, WorkerStatus> record : consumerRecords) { System.out.println(String.format("Status for task %s", record.value().getWorkerName())); System.out.println("current processed tasks:"); String tasksInfo = String.join(",", record.value().getProcessedTask()); System.out.println(tasksInfo); } // do something with the records kafkaConsumer.commitAsync(); } } public static Consumer<Long, WorkerStatus> createKafkaConsumer(String bootstrapServers, String consumerGroup) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, WorkerStatusDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return new KafkaConsumer<>(properties); } }
In line 56, we set the deserializer for the WorkerStatus
. In the consumeMessages
function, we keep reading and print the status for all workers.
Now, all the works send status messages to the monitor task. The Monitor task prints the received status messages.
5.2 Run the worker monitor task
We can run the worker monitor binary by,
yimu@yimu-mate:~/Desktop/blog/yimu-blog/comms/cloud_computing/workermoniter$ java -jar target/workermoniter-1.0-SNAPSHOT-jar-with-dependencies.jar
The monitor outputs the current status of workers. For example,
Status for task 0:0:0:0:0:0:0:08081 current processed tasks: task2-1599935832948,task4-1599935833005,task2-1599936085190,task4-1599936085269,task2-1599936286741,task4-1599936286807,task2-1599936366300,task4-1599936366369,task2-1599936632817,task4-1599936632861 Status for task 0:0:0:0:0:0:0:08082 current processed tasks: task1-1599936286357,task3-1599936286772,task5-1599936286837,task1-1599936365916,task3-1599936366333,task5-1599936366409,task3-1599936632842,task1-1599936632439,task5-1599936632903
6 Conclusion
In this post, we built a simple cloud computing system.
- We used the Zookeeper to do the service discovery. A client is able to find all worker nodes easily without considering all the corner cases of a distributed system.
- We used the HTTP request to send computation tasks to workers and receive results from them.
- We used the Kafka to send server status to a monitor task,
If you want to learn more about the distributed system, I highly recommend this course (in fact, almost all of the materials in this post are base on it).