MQTT short note
What is mqtt
mqtt 是個基於 Pub/Sub 模式的 protocol ,主要會由一個 message broker 與多個 client 組成。
What is Pub/Sub
在 Pub/Sub 的傳輸模式下訊息的傳輸不會以一對一的方式傳送訊息,而是將訊息發布到一個 Topic 中,而所有有訂閱該 Topic 的用戶都會收到訊息。可以想像為 slack 的頻道,我可以在 d1 中發訊息(發布訊息到 d1 這個 topic),而只會有在 d1 這個頻道的使用者收到訊息(有訂閱 d1 topic 的使用者)。不過在 Pub/Sub 模式下收到訊息不會知道是誰發佈的。(除非在訊息中有寫)
MQTT 傳輸模式
QOS
- QoS 0: 最多一次傳送:client 傳送訊息後不在乎訊息是否真的有其他 client 端收到。
- QoS 1: 至少一次傳送:client 傳送訊息後會等待接收方回應,如果沒有回應則會重新再傳一次。(此方法可以確保訊息會送達,但不會確保訊息不會重複)
- QoS 2: 正好一次傳送:與 QoS 2 相似但會確保訊息不會重複。
How
目前打算使用 emqx 作為 message broker ,讓所有使用者作為 client 端,同時 chat services 也會作為 client 發布訊息給所有 client 端。
Client
以下我會用 typescript 搭配 MQTT.js 示範
import mqtt 並定義設定
1import mqtt from 'mqtt'
2
3const options = {
4 clean: true, // 當 clean 設為 true 時,每次 client 連線到 broker 後不會接續上一次的 session
5 connectTimeout: 4000,
6 clientId: 'typescript_mqtt_client', // clientId 必須為獨一的,不能同時有相同 clientid 的 client 連上 message broker
7}
8const client = mqtt.connect("ws://localhost:8083/mqtt", options)
接著我們定義我們在不同 event 的 callback function。
1client.on('connect',()=>{
2 client.subscribe('mytopic')
3})
4
5client.on('message', (topic, message) => {
6 console.log('receive message:', topic, message.toString())
7})
8
9client.on('reconnect', (error) => {
10 console.log('reconnecting:', error)
11})
12
13client.on('error', (error) => {
14 console.log('Connection failed:', error)
15})
以下我會用 golang 搭配 paho-mqtt-golang 示範
先定義 mqtt 的設定
1 opts := mqtt.NewClientOptions()
2 opts.AddBroker("tcp://localhost:1883").SetClientID("golang_mqtt_client")
3 opts.ConnectTimeout = 2 * time.Second
4 opts.WriteTimeout = 2 * time.Second
5 opts.KeepAlive = 10
6 opts.PingTimeout = 2 * time.Second
7 opts.ConnectRetry = true
8 opts.AutoReconnect = true
定義 mqtt 在發生某些 event 時的 call back function
1 opts.OnConnectionLost = func(c mqtt.Client, e error) {
2 log.Println("connection lost")
3 }
4 opts.OnConnect = func(c mqtt.Client) {
5 log.Println("connected")
6 }
7 opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) {
8 log.Println("reconnecting")
9 }
成功與 message broker 建立連線後發布一個訊息到 “mytopic” 中。
1 client := mqtt.NewClient(opts)
2 if token := client.Connect(); token.Wait() && token.Error() != nil {
3 panic(token.Error())
4 }
5 log.Println("Connection is up")
6 t := client.Publish("mytopic", 1, false, "test")
7 go func() {
8 <-t.Done()
9 if t.Error() != nil {
10 log.Println("ERROR")
11 }
12 }()
13 time.Sleep(6 * time.Second)
14 client.Disconnect(250)
如果剛剛的 typescript client 還在運行,應該會看到 receive message: mytopic test
。
EMQx
Extension Hook
當 emqx 發生某些 events 時,會對我們的 grpc server 端呼叫 rpc call 傳送訊息給我們,我們可以根據 emqx 傳給我們的資料做相對應的處理。
舉例:
1service HookProvider {
2 rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
3 rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
4 rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
5}
當有使用者上線後會 emqx 會呼叫 OnClientConnect
我們可以將該使用者的狀態設定為上線,當使用者傳送訊息時 emqx 會呼叫 OnMessagePublish
我們可以將該訊息存入至 DB 中。
Extension Protocol
與 extension hook 概念相似,不同的是我們不再是以監聽的概念,而是 emqx 會傳訊息給我們,我們需要回傳訊息給 emqx , emqx 會根據我們回傳的訊息做相對應的動作。
舉例:
1service ConnectionAdapter {
2 rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {};
3 rpc Publish(PublishRequest) returns (CodeResponse) {};
4}
5enum ResultCode {
6 SUCCESS = 0;
7 UNKNOWN = 1;
8 CONN_PROCESS_NOT_ALIVE = 2;
9 REQUIRED_PARAMS_MISSED = 3;
10 PARAMS_TYPE_ERROR = 4;
11 PERMISSION_DENY = 5;
12}
13
14message CodeResponse {
15 ResultCode code = 1;
16 string message = 2;
17}
當使用者嘗試透過 mqtt 連到我們的 message broker 時, emqx 會呼叫 Authenticate
,我們收到 AuthenticateRequest
訊息後可以檢視該使用者並回傳 ResultCode
告訴 emqx 該不該讓此使用者連線。
補充
exhook
1service HookProvider { 2 3 rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; 4 5 rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; 6 7 rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; 8 9 rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; 10 11 rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; 12 13 rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; 14 15 rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; 16 17 rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {}; 18 19 rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; 20 21 rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; 22 23 rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; 24 25 rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; 26 27 rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; 28 29 rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; 30 31 rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; 32 33 rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; 34 35 rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; 36 37 rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; 38 39 rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; 40 41 rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; 42 43 rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; 44}
exproto
1service ConnectionAdapter { 2 3 // -- socket layer 4 5 rpc Send(SendBytesRequest) returns (CodeResponse) {}; 6 7 rpc Close(CloseSocketRequest) returns (CodeResponse) {}; 8 9 // -- protocol layer 10 11 rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {}; 12 13 rpc StartTimer(TimerRequest) returns (CodeResponse) {}; 14 15 // -- pub/sub layer 16 17 rpc Publish(PublishRequest) returns (CodeResponse) {}; 18 19 rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; 20 21 rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; 22} 23 24service ConnectionHandler { 25 26 // -- socket layer 27 28 rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {}; 29 30 rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {}; 31 32 rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {}; 33 34 // -- pub/sub layer 35 36 rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {}; 37 38 rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {}; 39}