Yungen's blog

MQTT short note

What is mqtt

mqtt 是個基於 Pub/Sub 模式的 protocol ,主要會由一個 message broker 與多個 client 組成。

What is Pub/Sub

在 Pub/Sub 的傳輸模式下訊息的傳輸不會以一對一的方式傳送訊息,而是將訊息發布到一個 Topic 中,而所有有訂閱該 Topic 的用戶都會收到訊息。可以想像為 slack 的頻道,我可以在 d1 中發訊息(發布訊息到 d1 這個 topic),而只會有在 d1 這個頻道的使用者收到訊息(有訂閱 d1 topic 的使用者)。不過在 Pub/Sub 模式下收到訊息不會知道是誰發佈的。(除非在訊息中有寫)

MQTT 傳輸模式

QOS

  1. QoS 0: 最多一次傳送:client 傳送訊息後不在乎訊息是否真的有其他 client 端收到。
  2. QoS 1: 至少一次傳送:client 傳送訊息後會等待接收方回應,如果沒有回應則會重新再傳一次。(此方法可以確保訊息會送達,但不會確保訊息不會重複)
  3. QoS 2: 正好一次傳送:與 QoS 2 相似但會確保訊息不會重複。

How

目前打算使用 emqx 作為 message broker ,讓所有使用者作為 client 端,同時 chat services 也會作為 client 發布訊息給所有 client 端。

Client

以下我會用 typescript 搭配 MQTT.js 示範

import mqtt 並定義設定

1import mqtt from 'mqtt'
2
3const options = {
4  		clean: true, // 當 clean 設為 true 時,每次 client 連線到 broker 後不會接續上一次的 session
5      connectTimeout: 4000,
6      clientId: 'typescript_mqtt_client', // clientId 必須為獨一的,不能同時有相同 clientid 的 client 連上 message broker
7}
8const client = mqtt.connect("ws://localhost:8083/mqtt", options)

接著我們定義我們在不同 event 的 callback function。

 1client.on('connect',()=>{
 2  client.subscribe('mytopic')
 3})
 4
 5client.on('message', (topic, message) => {
 6  console.log('receive message:', topic, message.toString())
 7})
 8
 9client.on('reconnect', (error) => {
10    console.log('reconnecting:', error)
11})
12
13client.on('error', (error) => {
14    console.log('Connection failed:', error)
15})

以下我會用 golang 搭配 paho-mqtt-golang 示範

先定義 mqtt 的設定

1	opts := mqtt.NewClientOptions()
2	opts.AddBroker("tcp://localhost:1883").SetClientID("golang_mqtt_client")
3	opts.ConnectTimeout = 2 * time.Second
4	opts.WriteTimeout = 2 * time.Second
5	opts.KeepAlive = 10
6	opts.PingTimeout = 2 * time.Second
7	opts.ConnectRetry = true
8	opts.AutoReconnect = true

定義 mqtt 在發生某些 event 時的 call back function

1	opts.OnConnectionLost = func(c mqtt.Client, e error) {
2		log.Println("connection lost")
3	}
4	opts.OnConnect = func(c mqtt.Client) {
5		log.Println("connected")
6	}
7	opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) {
8		log.Println("reconnecting")
9	}

成功與 message broker 建立連線後發布一個訊息到 “mytopic” 中。

 1	client := mqtt.NewClient(opts)
 2	if token := client.Connect(); token.Wait() && token.Error() != nil {
 3		panic(token.Error())
 4	}
 5	log.Println("Connection is up")
 6	t := client.Publish("mytopic", 1, false, "test")
 7	go func() {
 8		<-t.Done()
 9		if t.Error() != nil {
10			log.Println("ERROR")
11		}
12	}()
13	time.Sleep(6 * time.Second)
14	client.Disconnect(250)

如果剛剛的 typescript client 還在運行,應該會看到 receive message: mytopic test

EMQx

Extension Hook

當 emqx 發生某些 events 時,會對我們的 grpc server 端呼叫 rpc call 傳送訊息給我們,我們可以根據 emqx 傳給我們的資料做相對應的處理。

舉例:

1service HookProvider {
2	rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};	
3	rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
4	rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
5}

當有使用者上線後會 emqx 會呼叫 OnClientConnect 我們可以將該使用者的狀態設定為上線,當使用者傳送訊息時 emqx 會呼叫 OnMessagePublish 我們可以將該訊息存入至 DB 中。

Extension Protocol

與 extension hook 概念相似,不同的是我們不再是以監聽的概念,而是 emqx 會傳訊息給我們,我們需要回傳訊息給 emqx , emqx 會根據我們回傳的訊息做相對應的動作。

舉例:

 1service ConnectionAdapter {
 2	rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {};
 3	rpc Publish(PublishRequest) returns (CodeResponse) {};
 4}
 5enum ResultCode {
 6  SUCCESS = 0;
 7  UNKNOWN = 1;
 8  CONN_PROCESS_NOT_ALIVE = 2;
 9  REQUIRED_PARAMS_MISSED = 3;
10  PARAMS_TYPE_ERROR = 4;
11  PERMISSION_DENY = 5;
12}
13
14message CodeResponse {
15  ResultCode code = 1;
16  string message = 2;
17}

當使用者嘗試透過 mqtt 連到我們的 message broker 時, emqx 會呼叫 Authenticate ,我們收到 AuthenticateRequest 訊息後可以檢視該使用者並回傳 ResultCode 告訴 emqx 該不該讓此使用者連線。

補充

#Posts