To make the connection to the MQTT broker it is possible to use one of the many languages that have specific libraries for it and even directly with libraries such as Mosquito MQTT.
In this case, we are going to see an example made with Java and SpringBoot.
Connecting to MQTT with Java and SpringBoot
To include the library in a Maven project, we will have to add the following dependency:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.6</version>
</dependency>
Client configuration
When using the library, the first thing we must do to receive messages from an MQTT broker is to obtain an implementation of the IMqttClient interface. This interface contains all the methods required by an application to establish a connection with the server and receive messages.
The library comes with two implementations of this interface, an asynchronous (MqttAsyncClient) and a synchronous (MqttClient). In our case, we will focus on the synchronous version.
The configuration itself is a two-step process: we first create an instance of the MqttClient class and then connect it to our server.
Creating a new IMqttClient instance
The following code snippet shows how to create a new synchronous IMqttClient instance:
String subscriberId = UUID.randomUUID().toString();
IMqttClient subscriber = new MqttClient("ssl://mqtt-dev.cmobility30.es:8883",subscriberId);
In this case, we are using the simplest constructor available, which takes the address of the endpoint of our MQTT broker and a client identifier, which uniquely identifies our client.
Connection with the server
Our newly created MqttClient instance is not connected to the server. We do this by calling its connect() method, optionally passing an MqttConnectOptions instance that allows us to customize some aspects of the protocol.
In particular, we can use those options to pass additional information such as security credentials, session recovery mode, reconnection mode, etc.
The MqttConnectionOptions class exposes those options as simple properties that we can set using normal set methods. We only need to set the required properties for our scenario; the rest will assume default values.
The code used to establish a connection to the server looks like this:
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
subscriber.connect(options);
Here, we define our connection options so that:
-
The library will automatically try to reconnect to the server in the event of a network failure
-
Will discard unsent messages from a previous run
-
Connection timeout is set to 10 seconds
Receive messages
To receive messages from the MQTT broker, we need to use one of the variants of the subscribe() method, which allows us to specify:
-
One or more topic filters for the messages we want to receive.
-
The associated QoS
-
The callback handler to process received messages
In the following example, we show how to add a message sink to an existing IMqttClient instance to receive messages from a given topic.
subscriber.subscribe("TOPIC", (tpic, msg) -> {
byte[] payload = msg.getPayload();
});
With this example we have seen how easy it is to implement a client to an MQTT broker.
If you want more information, you can find it here.