ZeroMQ (also known as ØMQ) is a high-performance asynchronous messaging library aimed at use in distributed or concurrent applications. It provides a messaging framework that abstracts away the complexities of socket programming, making it easier to create scalable and fault-tolerant applications.
Key Features of ZeroMQ:
-
Socket Abstraction: Unlike traditional TCP sockets, ZeroMQ provides more advanced socket types like
REQ/REP
,PUB/SUB
,PUSH/PULL
,PAIR
, etc., which implement various messaging patterns. -
Asynchronous I/O: ZeroMQ supports non-blocking I/O operations, allowing applications to perform other tasks while waiting for messages, which enhances performance in concurrent environments.
-
Multiple Transports: ZeroMQ supports multiple underlying transports, including TCP, IPC (inter-process communication), in-process (for threads within the same process), and multicast (over UDP).
-
Scalability: It is designed to scale from small applications to large-scale distributed systems. It can efficiently handle many connections, large volumes of messages, and different network topologies.
-
Flexible Messaging Patterns: ZeroMQ allows you to implement various messaging patterns, such as publish/subscribe, request/reply, and pipeline (task distribution), which can be combined to build complex communication patterns.
-
Language Bindings: It has bindings for many programming languages, including C, C++, Python, Go, Java, and more, making it a versatile choice for cross-language communication.
Use Cases:
-
Distributed Systems: ZeroMQ is commonly used in distributed systems where components need to communicate across different machines.
-
Real-Time Applications: It's well-suited for real-time systems that require low-latency messaging.
-
Service-Oriented Architectures: ZeroMQ can be used to build microservices or other service-oriented architectures that need robust communication mechanisms.
-
Financial Systems: Due to its performance and reliability, it is often used in financial applications that require rapid message passing.
Overall, ZeroMQ simplifies the process of creating distributed applications by providing an easy-to-use and powerful messaging layer.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.tiago</groupId>
<artifactId>lollo</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>5.6.0</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.3</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Request Reply Pattern
It works like P2P communication allowing client to acknowledge message delivery.
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
public class ServerRequestReply {
public static void main(String[] args) {
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket.recv(0); // block until a message is received
System.out.println("Received: [" + new String(reply, ZMQ.CHARSET) + "]");
String response = "Hello, world!";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
}
}
}
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
public class ClientRequestReply {
public static void main(String args[]) throws InterruptedException {
try (ZMQ.Context context = ZMQ.context(1)) {
ZMQ.Socket requester = context.socket(SocketType.REQ);
requester.connect("tcp://localhost:5555");
requester.send("Hello".getBytes(), 0);
byte[] reply = requester.recv(0);
String replyValue = new String(reply);
System.out.println("Acknowledged...");
requester.close();
context.term();
}
}
}
Pub Sub Pattern
The publish-subscribe pattern is used for one-to-many distribution of data from a single publisher to multiple subscriber.
package io.tiago.zero;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
public class ServerPubSubOne {
public static void main(String[] args) throws InterruptedException {
try (ZMQ.Context ctx = ZMQ.context(1)) {
try (ZMQ.Socket sub = ctx.socket(SocketType.SUB)) {
sub.connect("tcp://*:12345");
sub.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
System.out.println("SUB: " + sub.recvStr());
}
}
}
}
}
package io.tiago.zero;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
public class ServerPubSubTwo {
public static void main(String[] args) throws InterruptedException {
try (ZMQ.Context ctx = ZMQ.context(1)) {
try (ZMQ.Socket sub = ctx.socket(SocketType.SUB)) {
sub.connect("tcp://*:12345");
sub.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
System.out.println("SUB: " + sub.recvStr());
}
}
}
}
}
package io.tiago.zero;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import java.util.Random;
public class ClientPubSub {
public static void main(String[] args) throws InterruptedException {
try (ZMQ.Context ctx = ZMQ.context(1)) {
try (ZMQ.Socket pub = ctx.socket(SocketType.PUB)) {
pub.bind("tcp://*:12345");
Random rand = new Random(System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
String msg = String.format("%c-%05d", 'A' + rand.nextInt(10), rand.nextInt(100000));
System.out.println("Sending: " + msg);
pub.send(msg);
Thread.sleep(500);
}
}
}
}
}
Reference
https://zeromq.org/socket-api/