Skip to main content

mqttclient.subscribe(topics, qos, callback)

Subscribe to one or more specific MQTT topics

Availability

Agent

Parameters

Name Type Description
topics String or blob, or array of strings/blobs Topic(s) to be subscribed to. Must be valid UTF-8
qos Integer or array of integers Optional delivery mode constant(s)
callback Function Optional function to be called with MQTT broker’s acknowledgement

Returns

Integer — 0, or an error code

Description

This method allows you to subscribe to the specified topic or topics.

To subscribe to multiple topics, pass the names of the topics (each a strings or a blob containing valid UTF-8 text) in an array. This array may include no more than ten topics, or an error will be thrown.

You can request a delivery mode by passing one of the following integer constants into the qos parameter:

Constant Value Description
mqtt.AT_MOST_ONCE 0 The message is sent but receipt is not confirmed by the recipient so there is no guarantee of delivery. The message is not stored and re-transmitted by the sender
mqtt.AT_LEAST_ONCE 1 The message may be delivered more than once. The sender stores and periodically re-transmits the message until the receiver acknowledges receipt of the message
mqtt.EXACTLY_ONCE 2 The message is delivered only once. The sender stores and periodically re-transmits the message until the receiver acknowledges receipt of the message, but subsequent sends mark the message as a duplicate, allowing the receiver to process the message only once. This also utilizes multiple acknowledge messages transmitted by sender and recipient. It is the safest but slowest QoS level

You should note that this is simply a request made to the broker — which may apply a different delivery mode. To discover the actual delivery mode, provide a callback function: this has one parameter, qosMode, which receives an integer (or an array of integers if multiple topics were included in the subscribe request) indicating the actual delivery mode — or 128 (0x80) if the subscription request was rejected by the broker:

Value State Actual QoS
0x00 Success mqtt.AT_MOST_ONCE
0x01 Success mqtt.AT_LEAST_ONCE
0x02 Success mqtt.EXACTLY_ONCE
0x80 Failure N/A

You can provide the QoS value as a single integer or as an array of integers. In the case of a single integer, its value will be applied to all of the topics included in the subscription request:

mqttclient.subscribe("mytopic", 0, function(qos) {
    server.log("Subscribed QoS = " + qos);
});

mqttclient.subscribe(["mytopic1", "mytopic2"], 0, function(qosList) {
    foreach (qos in qosList) server.log("Subscribed QoS = " + qos);
});

To request specific QoS values for specific topics, pass in an array of integers. The topic and QoS arrays must contain the same number of elements, or an error will be thrown. The nth topic will be subscribed with the nth QoS value:

mqttclient.subscribe(["mytopic1", "mytopic2"], [0, 2], function(qosList) {
    foreach (qos in qosList) server.log("Subscribed QoS = " + qos);
});

An exception will be thrown if you call subscribe() on an mqttclient instance that is currently disconnected.

We recommend that you wrap all mqttclient.subscribe() calls in a try... catch statement. Even if the client is connected at the time of the call, the socket could nonetheless fail as a result of the send process itself.

If you subsequently call disconnect the mqttclient instance that you used to subscribe to topics, you will need to re-subscribe to those topics if you later connect the mqttclient once more. Re-connecting does not cause these subscriptions to be remade automatically. The recommended way to do this is to include all your calls to mqttclient.subscribe() in a callback function registered using mqttclient.onconnect(). This is shown in the example code below.

Note Inbound and outbound MQTT messages are subject to rate limits.

Example Code

The following code sets up an MQTT client and connects to the specified MQTT broker. If the connection is made, the code attempts to subscribe to a topic using mqttclient.subscribe(). If this succeeds in turn, the code registers the handler that will log any incoming messages that have been posted to the chosen topic.

// CONSTANTS
const URL  = "tcp://test.mosquitto.org";
const PORT = 1883
const UN   = "generalgrugger";
const PW   = "gaztakh1ghc0mmand";

// GLOBALS
local client = null;
local cid = "imp.mqtt.test." + imp.configparams.deviceid;

// FUNCTIONS
function error(code) {
    switch (code) {
        case -1: return "Socket Error";
        case 1:  return "Unsupported MQTT version";
        case 2:  return "Client ID rejected";
        case 3:  return "Server unavailable";
        case 4:  return "Invalid username/password";
        case 5:  return "Not authorized";
        default: return "Unknown error";
    }
}

function onMessage(message) {
    // Called on receipt of a message
    server.log("Message \'" + message.message + "\' received under topic \'" + message.topic + "\'");
}

function onConnect(resultCode) {
    // Called when the client connects (or fails to connect) to the MQTT broker
    local s = URL + ":" + PORT.tostring();
    if (resultCode == 0) {
        server.log("Connected to " + s);

        // We're connected to try to subscribe to a topic
        client.subscribe("imp.mqtt.test.pings",
                         mqtt.AT_MOST_ONCE,
                         function(qosMode) {
                             // This is the subscription acknowledgement callback
                             if (qosMode < 0x80) {
                                 // We are subscribed, so inform the user...
                                 server.log("Subscribed to \'imp.mqtt.test.pings\' with delivery mode: " + qosMode + ".");

                                 // ...and set the message handler
                                 client.onmessage(onMessage);
                             } else {
                                 // We couldn't subscribe for some reason so inform the user
                                 server.log("Not subscribed to \'imp.mqtt.test.pings\'.");
                             }
                         });

    } else {
        server.error("Failed to connect to " + s + " Error: " + error(resultCode));
    }
}

function onLost() {
    // Called when the connection to the MQTT broker is unexpectedly lost
    local s = URL + ":" + PORT.tostring();
    server.log("Connected to " + s + " broken");
}

// RUNTIME START
// Instance an MQTT client
client = mqtt.createclient();

// Set up the initial handlers
client.onconnect(onConnect);
client.onconnectionlost(onLost);

// Connect with credentials
local options = {};
options.username <- UN;
options.password <- PW;
client.connect(URL + ":" + PORT.tostring(), cid, options);