Bridge - MQTT

Bridge - MQTT

MQTT Bridge enables machbase-neo to send and receive message to/from any external MQTT brokers.

๐Ÿ“ข
The beauty of the MQTT bridge comes when any existing “MQTT based” platforms adopt machbase-neo, there is no changes required on the existing system.
  • Send messages to external MQTT broker
flowchart LR
  machbase-neo --PUBLISH-->external-system
  subgraph machbase-neo
      direction LR
      machbase[("machbase
                  engine")] --read--> tql
      tql["TQL Script"] --> bridge("bridge(mqtt)")
  end
  subgraph external-system
    direction LR
    broker[[MQTT broker]] --> subscriber["application
                                        (subscriber)"]
  end
  • Receive messages from external MQTT broker
flowchart RL
    external-system --PUBLISH--> machbase-neo
    machbase-neo --SUBSCRIBE--> external-system
    subgraph machbase-neo
        direction RL
        bridge("bridge(mqtt)") --> subscriber
        subscriber["TQL Script"] --Write--> machbase
        machbase[("machbase
                    engine")]
    end
    subgraph external-system
        direction RL
        client["application
              (publisher)"] --PUBLISH--> mqtt[["MQTT broker"]]
    end

Register a bridge to an external MQTT broker

Register a bridge

bridge add -t mqtt my_mqtt broker=127.0.0.1:1883 id=client-id;

A mqtt bridge just defines how machbase-neo can connect to the external MQTT broker, See the subscriber section below to get it to receive messages.

Available connect options

OptionDescriptionexample
brokerbroker address, If the broker has redundant access points, use multiple “broker” optionsbroker=192.0.1.100:1883
idclient id
usernameusername
passwordpassword
keepalivekeepalive in duration formatkeepalive=30s
cleansessioncleansessioncleansession=1 cleansession=false
cafileca cert (*.pem) file pathTLS
keyclient private key (*.pem) file pathTLS
certclient certificate (*.pem) file pathTLS

When all three of cafile, key, cert options are set, the secure mqtt connection is enabled with TLS.

Send messages

Run mosuqitto_sub with debug mode (-d) option. This will receive messages from topic ’neo/messages’ via mosquitto broker when machbase-neo publish messages to the topic.

mosquitto_sub -d -h 127.0.0.1 -p 1883 -i client-app -t neo/messages                                            1 โ†ต
Client client-app sending CONNECT
Client client-app received CONNACK (0)
Client client-app sending SUBSCRIBE (Mid: 1, Topic: neo/messages, QoS: 0, Options: 0x00)
Client client-app received SUBACK
Subscribed (mid: 1): 0

Make a TQL script that call the publish() function of the bridge.

โ„น๏ธ
TIMER This example uses FAKE() and execute manually for the briefness, the “publish” feature of bridges will be useful and powerful when it combines with Timer to send data automatically by any given schedule.
1
2
3
4
5
6
7
8
FAKE(linspace(0,10, 5))
SCRIPT({
  ctx := import("context")
  br := ctx.bridge("my_mqtt")
  br.publish("neo/messages", "The message number is "+ctx.value(0))
  ctx.yieldKey(ctx.key(), ctx.value()...)
})
CSV()

As soon as executing the script above the mosquitto_sub prints out the messages that it receives on the screen.

mosquitto_sub -d -h 127.0.0.1 -p 1883 -i client-app -t neo/messages                                            1 โ†ต
... omit ...
Client client-app received PUBLISH (d0, q0, r0, m0, 'neo/messages', ... (23 bytes))
The message number is 0
Client client-app received PUBLISH (d0, q0, r0, m0, 'neo/messages', ... (25 bytes))
The message number is 2.5
Client client-app received PUBLISH (d0, q0, r0, m0, 'neo/messages', ... (23 bytes))
The message number is 5
Client client-app received PUBLISH (d0, q0, r0, m0, 'neo/messages', ... (25 bytes))
The message number is 7.5
Client client-app received PUBLISH (d0, q0, r0, m0, 'neo/messages', ... (24 bytes))
The message number is 10

Receive messages - Subscriber

Let’s make an example that receives messages from MQTT broker and storing the data into database utilizing bridge and subscriber.

In this demonstration we will use the mosquitto as MQTT broker and mosquitto_pub as MQTT client. These tools are simulating an “external” system.

flowchart RL
    external-system --PUBLISH--> machbase-neo
    machbase-neo --SUBSCRIBE--> external-system
    subgraph machbase-neo
        direction RL
        bridge("bridge(mq)") --> subscriber
        subscriber["mqttsubr.tql"] --Write--> machbase
        machbase[("machbase
                    engine")]
    end
    subgraph external-system
        direction RL
        client["mosquitto_pub"] --PUBLISH--> mqtt[["mosquitto"]]
    end

Run MQTT Broker

The MQTT bridge of machbase-neo should work with any MQTT broker that is compatible MQTT v3.1.1 specification.

If you don’t have an installed MQTT broker, get and run mosquitto for the demo. https://mosquitto.org

$ mosquitto -p 1883

1691466522: mosquitto version 2.0.15 starting
1691466522: Using default config.
1691466522: Starting in local only mode. Connections will only be possible from clients running on this machine.
1691466522: Create a configuration file which defines a listener to allow remote access.
1691466522: For more details see https://mosquitto.org/documentation/authentication-methods/
1691466522: Opening ipv4 listen socket on port 1883.
1691466522: Opening ipv6 listen socket on port 1883.
1691466522: mosquitto version 2.0.15 running

Register a bridge

Open machbase-neo shell, and execute bridge add... command.

bridge add -t mqtt my_mqtt broker=127.0.0.1:1883 id=demo;

It defines the way how machbase-neo can connect to the designated broker.

machbase-neoยป bridge list;
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ NAME    โ”‚ TYPE     โ”‚ CONNECTION                      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ my_mqtt โ”‚ mqtt     โ”‚ broker=127.0.0.1:1883 id=demo   โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

When the bridge my_mqtt successfully registered, machbase-neo connects to the broker and mosquitto shows the connection log like below.

If there is any network problem or the broker is down, machbase-neo does periodically retry to connect, so that it keeps the best efforts to make the bridge available.

1691466529: New connection from 127.0.0.1:65440 on port 1883.
1691466529: New client connected from 127.0.0.1:65440 as demo (p2, c1, k30).

Data transforming script

Open machbase-neo TQL editor, and copy the code below and save it as mqttsubr.tql.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
BYTES(payload())
SCRIPT({
  // get current time
  times := import("times")
  ts := times.now()
  // get the payload of the message
  ctx := import("context")
  val := ctx.value()
  // parse json
  json := import("json")
  msg := json.decode(val[0])
  // yield key-value records
  ctx.yield(msg.client+".temp", ts, msg.data.temp)
  ctx.yield(msg.client+".humidity", ts, msg.data.humidity)
})
APPEND(table("example"))

This script gets the payload of incoming messages with BYTES(payload()). It parses and transforms the data shape to write into the table example of machbase-neo.

First it gets current time to apply the timestamp with ts := times.now(), since the message doesn’t have a timestamp. Decode the JSON payload and yields two new records which are .temp and .humidity.

Register a subscriber

Open machbase-neo shell to add a new subscriber which makes a pipeline between the bridge and the tql script.

It specifies …

  • --autostart makes the subscriber starts along with machbase-neo starts
  • --qos 1 subscribe to the topic QoS 1, MQTT bridges support QoS 0 and 1
  • mqttsubr name of the subscriber
  • my_mqtt name of bridge that the subscriber is going to use
  • iot/sensor topic name to subscribe. it supports standard MQTT topic syntax includes # and +
subscriber add --autostart --qos 1 mqttsubr my_mqtt iot/sensor /mqttsubr.tql;

Check the newly register subscriber is in RUNNING state, since it is registered with --autostart option.

machbase-neoยป subscriber list;
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ NAME     โ”‚ BRIDGE  โ”‚ TOPIC      โ”‚ TQL           โ”‚ AUTOSTART โ”‚ STATE   โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ MQTTSUBR โ”‚ my_mqtt โ”‚ iot/sensor โ”‚ /mqttsubr.tql โ”‚ true      โ”‚ RUNNING โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

Send messages with mosquitto_pub

Make a data file data.json as like below.

{
  "client":"mqtt-demo",
  "data":{"temp":34.1, "humidity":67.8}
}

Execute mosquitto_pub publishing the data.json to the MQTT broker.

mosquitto_pub -d -h 127.0.0.1 -p 1883 \
    -t iot/sensor \
    -f data.json

Query the stored data.

machbase-neoยป select * from example where name in ('mqtt-demo.temp', 'mqtt-demo.humidity');
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ ROWNUM โ”‚ NAME               โ”‚ TIME(LOCAL)             โ”‚ VALUE     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚      1 โ”‚ mqtt-demo.temp     โ”‚ 2023-08-08 13:51:37.923 โ”‚ 34.100000 โ”‚
โ”‚      2 โ”‚ mqtt-demo.humidity โ”‚ 2023-08-08 13:51:37.923 โ”‚ 67.800000 โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
Last updated on