Update MQTT subscribe example to add keep-alive (ping) packets
authorNathael Pajani <nathael.pajani@ed3l.fr>
Thu, 11 Apr 2019 18:18:05 +0000 (20:18 +0200)
committerNathael Pajani <nathael.pajani@ed3l.fr>
Tue, 8 Nov 2022 16:14:23 +0000 (17:14 +0100)
mqtt_sub/README [new file with mode: 0644]
mqtt_sub/main.c
mqtt_sub/mqtt_comm.c
mqtt_sub/mqtt_comm.h

diff --git a/mqtt_sub/README b/mqtt_sub/README
new file mode 100644 (file)
index 0000000..ea50f66
--- /dev/null
@@ -0,0 +1,42 @@
+MQTT protocol example, Subscriber part
+
+Copyright 2019 Nathael Pajani <nathael.pajani@ed3l.fr>
+
+
+/* ****************************************************************************
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ *************************************************************************** */
+
+This example shows the support of the subscriber part ot he MQTT protocol
+implementation provided in lib/protocols/mqtt.c
+
+This implementation has been designed to implement only the MQTT packet
+building and decoding, as specified by the MQTT protocol specification, and
+leave all the MQTT message flow implementation to the application, as it is not
+part of the MQTT protocol specification and is application dependent.
+
+One can thus implement either a simple "single in-flight packet" mechanism for
+very lightweight applications or complex multi-packet mechanisms.
+
+This example subscribes to the temperature publication topic created by the
+publish example.
+
+As MQTT requires a lossless, ordered, addressable transport protocol, we used a
+simple protocol over serial communication, which would allow communication of
+multiple "clients" over an RS485 serial link.
+This protocol is decoded by the bridge example provided in host/mqtt_bridge,
+which simply transfers the MQTT part of received packets on a TCP socket
+connected to an MQTT message broker, and encapsulate MQTT packets received on
+ths socket in our simple protocol before sending them on the serial link.
index 69d600b..d4ff388 100644 (file)
@@ -120,6 +120,12 @@ void data_rx(uint8_t c)
        }
 }
 
+volatile int mqtt_send_ping_flag = 0;
+void set_flag_mqtt_need_ping(uint32_t tick)
+{
+       mqtt_send_ping_flag = 1;
+}
+
 /***************************************************************************** */
 void system_init()
 {
@@ -148,6 +154,8 @@ int main(void)
 
        /* Connect to brocker and start MQTT protocol handling */
        mqtt_init(UART0, DEBUG_UART);
+       /* For tests, send MQTT ping packets every 2 seconds */
+       add_systick_callback(set_flag_mqtt_need_ping, 2*1000);
 
        while (1) {
                /* Check for received packet on serial line and handle them */
@@ -160,6 +168,10 @@ int main(void)
                        }
                        packet_ok = NULL;
                }
+               if (mqtt_send_ping_flag == 1) {
+                       mqtt_send_ping(UART0, DEBUG_UART);
+                       mqtt_send_ping_flag = 0;
+               }
                chenillard(250);
        }
        return 0;
index ecbb3eb..aca4341 100644 (file)
@@ -169,8 +169,8 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
                        struct mqtt_connack_reply_pkt* reply = (struct mqtt_connack_reply_pkt*)(buf + HEADER_SIZE);
                        ret = mqtt_check_connack_reply_pkt(reply);
                        if (ret != 0) {
-                               uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret);
-                               return -3;
+                               uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error unless pub pkt ...\n", ret);
+                               goto chek_for_pub_pkt;
                        }
                        /* Valid packet, check return code */
                        if (reply->ret_code == MQTT_CONNACK_ACCEPTED) {
@@ -195,7 +195,7 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
                        ret = mqtt_check_suback_reply_pkt(reply, header->data_len);
                        if (ret != 0) {
                                uprintf(dbg_uart, "Not a suback packet (ret: %d) ... protocol flow error\n", ret);
-                               return -5;
+                               goto chek_for_pub_pkt;
                        }
                        if (ntohs(reply->acked_pkt_id) != sub_packet_id) {
                                uprintf(dbg_uart, "Not a suback for the right packet (got %d, our is %d) ...\n",
@@ -214,8 +214,6 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
                        sub_packet_id++;
                        break;
                }
-               case MQTT_WAITING_PUBACK:
-               case MQTT_WAITING_PUBREC:
                case MQTT_WAITING_PUBREL: {
                        /* Check for this being a pubrel (release) and send pubcomp (complete) */
                        struct mqtt_publish_reply_pkt* pub_comp = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE);
@@ -224,7 +222,7 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
                        int len = 0;
                        if (ret < 0) {
                                uprintf(dbg_uart, "MQTT packet is not a valid pubrel.\n");
-                               return -7;
+                               goto chek_for_pub_pkt;
                        }
                        uprintf(dbg_uart, "MQTT pubrel received, sending pubcomp\n");
                        len = mqtt_pack_publish_reply_pkt((uint8_t*)pub_comp, ntohs(pub_rel->acked_pkt_id), MQTT_CONTROL_PUBCOMP);
@@ -242,10 +240,24 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
                        mqtt_comm_state = MQTT_IDLE;
                        break;
                }
+               case MQTT_WAITING_PINGRESP: {
+                       struct mqtt_ping_pkt* ppkt = (struct mqtt_ping_pkt*)(buf + HEADER_SIZE);
+                       int ret = 0;
+                       ret = mqtt_check_ping_reply_pkt(ppkt);
+                       if (ret < 0) {
+                               uprintf(dbg_uart, "MQTT packet is not a valid ping reply packet: %d.\n", ret);
+                               goto chek_for_pub_pkt;
+                       }
+                       mqtt_comm_state = MQTT_IDLE;
+                       break;
+               }
+               case MQTT_WAITING_PUBREC:
                case MQTT_WAITING_PUBCOMP:
                case MQTT_WAITING_UNSUBACK:
-               case MQTT_WAITING_PINGRESP:
-               case MQTT_IDLE: {
+               case MQTT_WAITING_PUBACK:
+                       uprintf(dbg_uart, "Received something, but current state (%d) is invlaid\n", mqtt_comm_state);
+               case MQTT_IDLE:
+               chek_for_pub_pkt: {
                        struct mqtt_publish_pkt pub_rx;
                        struct mqtt_publish_reply_pkt* pub_ack = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE);
                        int ret = 0, len = 0;
@@ -288,3 +300,25 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
        return 0;
 }
 
+int mqtt_send_ping(int comm_uart, int dbg_uart)
+{
+       int len = 0, ret = 0;
+       memset(txbuf, 0, MQTT_BUFF_SIZE);
+       len = mqtt_pack_ping_pkt(txbuf + HEADER_SIZE);
+       if (len <= 0) {
+               uprintf(dbg_uart, "MQTT pack ping pkt error : %d\n", len);
+               return len;
+       }
+       /* Add our packet header */
+       add_packet_header(len);
+
+       /* Send ping MQTT packet */
+       ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE));
+       if (ret < 0) {
+               uprintf(dbg_uart, "MQTT send error: %d\n", ret);
+               return ret;
+       }
+       mqtt_comm_state = MQTT_WAITING_PINGRESP;
+       return 0;
+}
+
index fbcf49c..a9368af 100644 (file)
@@ -70,5 +70,6 @@ int mqtt_temp_publish(int comm_uart, int dbg_uart, int temp);
 
 int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart);
 
+int mqtt_send_ping(int comm_uart, int dbg_uart);
 #endif  /* MQTT_COMM_H */