NDU Gateway

Integrate legacy and third-party systems with NDU platform using IoT Gateway.

MQTT Connector Configuration

This guide will help you to get familiar with MQTT Connector configuration for NDU Gateway. Use general configuration to enable this Connector. The purpose of this Connector is to connect to external MQTT broker and subscribe to data feed from devices. Connector is also able to push data to MQTT brokers based on the updates/commands from NDU.

This Connector is useful when you have local MQTT broker in your facility or corporate network and you would like to push data from this broker to NDU.

We will describe connector configuration file below.

Connector configuration: mqtt.json

Connector configuration is a JSON file that contains information about how to connect to external MQTT broker, what topics to use when subscribing to data feed and how to process the data. Let’s review the format of the configuration file using example below.


Example of MQTT Connector config file. Press to expand. Example listed below will connect to MQTT broker in a local network deployed on server with IP 192.168.1.100. Connector will use basic MQTT auth using username and password. Then, connector will subscribe to a list of topics using topic filters from mapping section. See more info in a description below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
{
  "broker": {
    "name":"Default Local Broker",
    "host":"192.168.1.100",
    "port":1883,
    "security": {
      "type": "basic",
      "username": "user",
      "password": "password"
    }
  },
  "mapping": [
    {
      "topicFilter": "/sensor/data",
      "converter": {
        "type": "json",
        "deviceNameJsonExpression": "${serialNumber}",
        "deviceTypeJsonExpression": "${sensorType}",
        "timeout": 60000,
        "attributes": [
          {
            "type": "string",
            "key": "model",
            "value": "${sensorModel}"
          }
        ],
        "timeseries": [
          {
            "type": "double",
            "key": "temperature",
            "value": "${temp}"
          },
          {
            "type": "double",
            "key": "humidity",
            "value": "${hum}"
          }
        ]
      }
    },
    {
      "topicFilter": "/sensor/+/data",
      "converter": {
        "type": "json",
        "deviceNameTopicExpression": "(?<=sensor\/)(.*?)(?=\/data)",
        "deviceTypeTopicExpression": "Thermometer",
        "timeout": 60000,
        "attributes": [
          {
            "type": "string",
            "key": "model",
            "value": "${sensorModel}"
          }
        ],
        "timeseries": [
          {
            "type": "double",
            "key": "temperature",
            "value": "${temp}"
          },
          {
            "type": "double",
            "key": "humidity",
            "value": "${hum}"
          }
        ]
      }
    },
    {
      "topicFilter": "/custom/sensors/+",
      "converter": {
        "type": "custom",
        "extension": "CustomMqttUplinkConverter",
        "extension-config": {
            "temperatureBytes" : 2,
            "humidityBytes" :  2,
            "batteryLevelBytes" : 1
        }
      }
    }
  ],
  "connectRequests": [
    {
      "topicFilter": "sensor/connect",
      "deviceNameJsonExpression": "${SerialNumber}"
    },
    {
      "topicFilter": "sensor/+/connect",
      "deviceNameTopicExpression": "(?<=sensor\/)(.*?)(?=\/connect)"
    }
  ],
  "disconnectRequests": [
    {
      "topicFilter": "sensor/disconnect",
      "deviceNameJsonExpression": "${SerialNumber}"
    },
    {
      "topicFilter": "sensor/+/disconnect",
      "deviceNameTopicExpression": "(?<=sensor\/)(.*?)(?=\/disconnect)"
    }
  ],
  "attributeUpdates": [
    {
      "deviceNameFilter": "SmartMeter.*",
      "attributeFilter": "uploadFrequency",
      "topicExpression": "sensor/${deviceName}/${attributeKey}",
      "valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"
    }
  ],
  "serverSideRpc": [
    {
      "deviceNameFilter": ".*",
      "methodFilter": "echo",
      "requestTopicExpression": "sensor/${deviceName}/request/${methodName}/${requestId}",
      "responseTopicExpression": "sensor/${deviceName}/response/${methodName}/${requestId}",
      "responseTimeout": 10000,
      "valueExpression": "${params}"
    },
    {
      "deviceNameFilter": ".*",
      "methodFilter": "no-reply",
      "requestTopicExpression": "sensor/${deviceName}/request/${methodName}/${requestId}",
      "valueExpression": "${params}"
    }
  ]
}

Section “broker”

Parameter Default value Description
name Default Broker Broker name for logs and saving to persistent devices.
host localhost Mqtt broker hostname or ip address.
port 1883 Mqtt port on the broker.

Subsection “security”

Subsection “security” provides configuration for client authorization at Mqtt Broker.

One type of security configuration is basic, For authorization will be used combination of username/password, provided in this section in config.

Parameter Default value Description
type basic Type of authorization.
username username Username for authorization.
password password Password for authorization.

Security subsection in configuration file will look like this:

1
2
3
4
5
    "security": {
      "type": "basic",
      "username": "username",
      "password": "password"
    }

Anonymous auth is the most simple option. It is useful for testing on public MQTT brokers, like test.mosquitto.org.

Parameter Default value Description
type anonymous Type of authorization.

Security subsection in configuration file will look like this:

1
2
3
    "security": {
      "type": "anonymous"
    }

In table below described parameters to configure authorization on mqtt broker .

Parameter Default value Description
caCert /etc/thingsboard-gateway/ca.pem Path to CA file.
privateKey /etc/thingsboard-gateway/privateKey.pem Path to private key file.
cert /etc/thingsboard-gateway/certificate.pem Path to certificate file.

Security subsection in configuration file will look like this:

1
2
3
4
5
  "security":{
    "caCert": "/etc/thingsboard-gateway/ca.pem",
    "privateKey": "/etc/thingsboard-gateway/privateKey.pem",
    "cert": "/etc/thingsboard-gateway/certificate.pem"
  }

Section “mapping”

This configuration section contains array of topics that the gateway will subscribe to after connecting to the broker and settings about processing incoming messages (converter).

Parameter Default value Description
topicFilter /sensor/data Topic address for subscribing.

The topicFilter supports special symbols: ‘#’ and ‘+’ to allow to subscribe to multiple topics.

Let’s assume we would like to subscribe and process following data from Thermometer devices:

Example Name Topic Topic Filter Payload Comments
Example 1 /sensor/data /sensor/data {“serialNumber”: “SN-001”, “sensorType”: “Thermometer”, “sensorModel”: “T1000”, “temp”: 42, “hum”: 58} Device Name is part of the payload
Example 2 /sensor/SN-001/data /sensor/+/data { “sensorType”: “Thermometer”, “sensorModel”: “T1000”, “temp”: 42, “hum”: 58} Device Name is part of the topic

Now let’s review how we can configure JSON converter to parse this data

Subsection “converter”

This subsection contains configuration for processing incoming messages.

Types of mqtt converters:

  1. json – Default converter
  2. custom – Custom converter (You can write it by yourself, and it will use to convert incoming data from the broker.)

Json converter is default converter, it looks for deviceName, deviceType, attributes and telemetry in the incoming message from the broker, with rules, described in this subsection:

Parameter Default value Description
type json Provides information to connector that default converter will be uses for converting data from topic.
deviceNameJsonExpression ${serialNumber} Simple JSON expression, uses for looking device name in the incoming message (parameter “serialNumber” will be used as device name).
deviceTypeJsonExpression ${sensorType} Simple JSON expression, uses for looking device type in the incoming message (parameter “sensorType” will be used as device type).
timeout 60000 Timeout for triggering “Device Disconnected” event
attributes   This subsection contains parameters of the incoming message, that will be interpreted as attributes for the device.
… type string Type of incoming data for a current attribute.
… key model Attribute name, that will sends to NDU instance.
… value ${sensorModel} Simple JSON expression, uses for looking value in the incoming message, that will send to NDU instance as value of key parameter.
timeseries   This subsection contains parameters of the incoming message, that will be interpreted as telemetry for the device.
… type double Type of incoming data for a current telemetry.
… key temperature Telemetry name, that will sends to NDU instance.
… value ${temp} Simple JSON expression, uses for looking value in the incoming message, that will send to NDU instance as value of key parameter.

Parameters in attributes and telemetry section may differ from those presented above, but will have the same structure.

Mapping subsection for Example 1 will look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    {
      "topicFilter": "/sensor/data",
      "converter": {
        "type": "json",
        "deviceNameJsonExpression": "${serialNumber}",
        "deviceTypeJsonExpression": "${sensorType}",
        "timeout": 60000,
        "attributes": [
          {
            "type": "string",
            "key": "model",
            "value": "${sensorModel}"
          }
        ],
        "timeseries": [
          {
            "type": "double",
            "key": "temperature",
            "value": "${temp}"
          },
          {
            "type": "double",
            "key": "humidity",
            "value": "${hum}"
          }
        ]
      }
    }

Mapping for Example 2 will look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    {
      "topicFilter": "/sensor/+/data",
      "converter": {
        "type": "json",
        "deviceNameTopicExpression": "(?<=sensor\/)(.*?)(?=\/data)",
        "deviceTypeTopicExpression": "Thermometer",
        "timeout": 60000,
        "attributes": [
          {
            "type": "string",
            "key": "model",
            "value": "${sensorModel}"
          }
        ],
        "timeseries": [
          {
            "type": "double",
            "key": "temperature",
            "value": "${temp}"
          },
          {
            "type": "double",
            "key": "humidity",
            "value": "${hum}"
          }
        ]
      }
    }

A custom converter is converter written for some device:

Parameter Default value Description
type custom Provides information to connector that custom converter will be uses for converting data from topic.
extension CustomMqttUplinkConverter Name of custom converter class.
extension-config   This subsection is a configuration for the custom converter. In default example it contains number of bytes and keys for telemetry.
temperatureBytes 2 In default example first 2 bytes from received message will be interpreted as temperature key of telemetry (Substring “Bytes” will remove if exists).
humidityBytes 2 In default example the second and third byte from received message will be interpreted as humidity key of telemetry (Substring “Bytes” will remove if exists).
batteryLevelBytes 1 In default example the fifth byte from received message will be interpreted as batteryLevel key of telemetry (Substring “Bytes” will removed if exists).

All parameters from this subsection and topic will be transferred as dictionary during initialization to the converter object.

Converter subsection in the configuration will look like:

1
2
3
4
5
6
7
8
9
      "converter": {
        "type": "custom",
        "extension": "CustomMqttUplinkConverter",
        "extension-config": {
            "temperatureBytes" : 2,
            "humidityBytes" :  2,
            "batteryLevelBytes" : 1
        }
      }

Note: You can specify multiple mapping objects inside the array.

Mapping process subscribes to the MQTT topics using topicFilter parameter of the mapping object. Each message that is published to this topic by other devices or applications is analyzed to extract device name, type and data (attributes and/or timeseries values). By default, gateway uses Json converter, but it is possible to provide custom converter. See examples in the source code.

Section “connectRequests”

NDU allows sending RPC commands and notifications about device attribute updates to the device. But in order to send them, the platform needs to know if the target device is connected and what gateway or session is used to connect the device at the moment. If your device is constantly sending telemetry data - NDU already knows how to push notifications. If your device just connects to MQTT broker and waits for commands/updates, you need to send a message to the Gateway and inform that device is connected to the broker.

1. Name in a message from broker:

Parameter Default value Description
topicFilter sensors/connect Topic address on the broker, where the broker sends information about new connected devices.
deviceNameJsonExpression ${SerialNumber} JSON-path expression, for looking the new device name.

2. Name in topic address:

Parameter Default value Description
topicFilter sensors/+/connect Topic address on the broker, where the broker sends information about new connected devices.
deviceNameTopicExpression (?<=sensor\/)(.*?)(?=\/connect) Regular expression for looking the device name in topic path.

This section in configuration looks like:

1
2
3
4
5
6
7
8
9
10
  "connectRequests": [
    {
      "topicFilter": "sensors/connect",
      "deviceNameJsonExpression": "${$.SerialNumber}"
    },
    {
      "topicFilter": "sensor/+/connect",
      "deviceNameTopicExpression": "(?<=sensor\/)(.*?)(?=\/connect)"
    }
  ]

In this case following messages are valid:

1
2
mosquitto_pub -h YOUR_MQTT_BROKER_HOST -p YOUR_MQTT_BROKER_PORT -t "sensors/connect" -m '{"serialNumber":"SN-001"}'
mosquitto_pub -h YOUR_MQTT_BROKER_HOST -p YOUR_MQTT_BROKER_PORT -t "sensor/SN-001/connect" -m ''

Section “disconnectRequest”

This configuration section is optional.
Configuration, provided in this section will be used to get information from the broker about disconnecting device.
If your device just disconnects from MQTT broker and waits for commands/updates, you need to send a message to the Gateway and inform that device is disconnected from the broker.

1. Name in a message from broker:

Parameter Default value Description
topicFilter sensors/disconnect Topic address on the broker, where the broker sends information about disconnected devices.
deviceNameJsonExpression ${SerialNumber} JSON-path expression, for looking the new device name.

2. Name in topic address:

Parameter Default value Description
topicFilter sensors/+/disconnect Topic address on the broker, where the broker sends information about disconnected devices.
deviceNameTopicExpression (?<=sensor\/)(.*?)(?=\/connect) Regular expression for looking the device name in topic path.

This section in configuration file looks like:

1
2
3
4
5
6
7
8
9
10
  "disconnectRequests": [
    {
      "topicFilter": "sensors/disconnect",
      "deviceNameJsonExpression": "${SerialNumber}"
    },
    {
      "topicFilter": "sensor/+/disconnect",
      "deviceNameTopicExpression": "(?<=sensor\/)(.*?)(?=\/disconnect)"
    }
  ]

In this case following messages are valid:

1
2
mosquitto_pub -h YOUR_MQTT_BROKER_HOST -p YOUR_MQTT_BROKER_PORT -t "sensors/disconnect" -m '{"serialNumber":"SN-001"}'
mosquitto_pub -h YOUR_MQTT_BROKER_HOST -p YOUR_MQTT_BROKER_PORT -t "sensor/SN-001/disconnect" -m ''

Section “attributeUpdates”

This configuration section is optional.
NDU allows to provision device attributes and fetch some of them from the device application. You can treat this as a remote configuration for devices. Your devices are able to request shared attributes from NDU. See user guide for more details.

The “attributeRequests” configuration allows configuring the format of the corresponding attribute request and response messages.

Parameter Default value Description
deviceNameFilter SmartMeter.* Regular expression device name filter, uses to determine, which function to execute.
attributeFilter uploadFrequency Regular expression attribute name filter, uses to determine, which function to execute.
topicExpression sensor/${deviceName}/${attributeKey} JSON-path expression uses for creating topic address to send a message.
valueExpression {\”${attributeKey}\”:\”${attributeValue}\”} JSON-path expression uses for creating the message data that will send to topic.

This section in configuration file looks like:

1
2
3
4
5
6
7
8
  "attributeUpdates": [
    {
      "deviceNameFilter": "SmartMeter.*",
      "attributeFilter": "uploadFrequency",
      "topicExpression": "sensor/${deviceName}/${attributeKey}",
      "valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"
    }
  ]
Server side RPC commands

NDU allows sending RPC commands to the device that is connected to NDU directly or via Gateway.

Configuration, provided in this section uses for sending RPC requests from NDU to device.

Parameter Default value Description
deviceNameFilter SmartMeter.* Regular expression device name filter, uses to determine, which function to execute.
methodFilter echo Regular expression method name filter, uses to determine, which function to execute.
requestTopicExpression sensor/${deviceName}/request/${methodName}/${requestId} JSON-path expression, uses for creating topic address to send RPC request.
responseTopicExpression sensor/${deviceName}/response/${methodName}/${requestId} JSON-path expression, uses for creating topic address to subscribe for response message.
responseTimeout 10000 Value in milliseconds, if no response in this period after sending request, gateway will unsubscribe from response topic.
valueExpression ${params} JSON-path expression, uses for creating data for sending to broker.


There are 2 options for RPC request:

  1. With response – If in the configuration exists responseTopicExpression, gateway will try to subscribe on it and wait for response.
  2. Without response – If in the configuration not exists responseTopicExpression, gateway have just send message and won’t wait for response.

This section in configuration file looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  "serverSideRpc": [
    {
      "deviceNameFilter": ".*",
      "methodFilter": "echo",
      "requestTopicExpression": "sensor/${deviceName}/request/${methodName}/${requestId}",
      "responseTopicExpression": "sensor/${deviceName}/response/${methodName}/${requestId}",
      "responseTimeout": 10000,
      "valueExpression": "${params}"
    },
    {
      "deviceNameFilter": ".*",
      "methodFilter": "no-reply",
      "requestTopicExpression": "sensor/${deviceName}/request/${methodName}/${requestId}",
      "valueExpression": "${params}"
    }
  ]

As you can use deviceNameFilter and methodFilter to apply different mapping rules for different devices/methods. Once Gateway receives RPC request from the server to the device, it will publish the corresponding message based on requestTopicExpression and valueExpression. In case you expect the reply to the request from device, you should also specify responseTopicExpression and responseTimeout. The Gateway will subscribe to the “response” topic and wait for device reply until “responseTimeout” is detected (in milliseconds).

Example of RPC request (rpc-request.json) that need to be sent from the server:

1
2
3
4
5
6
{
  "method": "echo",
  "params": {
    "message": "Hello!"
  }
}

Next steps

Explore guides related to main NDU features: