From ba68446f827d68ec0fe12857399d4e369289666b Mon Sep 17 00:00:00 2001 From: Nathael Pajani Date: Thu, 11 Apr 2019 20:18:05 +0200 Subject: [PATCH] Update MQTT subscribe example to add keep-alive (ping) packets --- mqtt_sub/README | 42 +++++++++++++++++++++++++++++++++++++ mqtt_sub/main.c | 12 +++++++++++ mqtt_sub/mqtt_comm.c | 50 +++++++++++++++++++++++++++++++++++++------- mqtt_sub/mqtt_comm.h | 1 + 4 files changed, 97 insertions(+), 8 deletions(-) create mode 100644 mqtt_sub/README diff --git a/mqtt_sub/README b/mqtt_sub/README new file mode 100644 index 0000000..ea50f66 --- /dev/null +++ b/mqtt_sub/README @@ -0,0 +1,42 @@ +MQTT protocol example, Subscriber part + +Copyright 2019 Nathael Pajani + + +/* **************************************************************************** + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +This example shows the support of the subscriber part ot he MQTT protocol +implementation provided in lib/protocols/mqtt.c + +This implementation has been designed to implement only the MQTT packet +building and decoding, as specified by the MQTT protocol specification, and +leave all the MQTT message flow implementation to the application, as it is not +part of the MQTT protocol specification and is application dependent. + +One can thus implement either a simple "single in-flight packet" mechanism for +very lightweight applications or complex multi-packet mechanisms. + +This example subscribes to the temperature publication topic created by the +publish example. + +As MQTT requires a lossless, ordered, addressable transport protocol, we used a +simple protocol over serial communication, which would allow communication of +multiple "clients" over an RS485 serial link. +This protocol is decoded by the bridge example provided in host/mqtt_bridge, +which simply transfers the MQTT part of received packets on a TCP socket +connected to an MQTT message broker, and encapsulate MQTT packets received on +ths socket in our simple protocol before sending them on the serial link. diff --git a/mqtt_sub/main.c b/mqtt_sub/main.c index 69d600b..d4ff388 100644 --- a/mqtt_sub/main.c +++ b/mqtt_sub/main.c @@ -120,6 +120,12 @@ void data_rx(uint8_t c) } } +volatile int mqtt_send_ping_flag = 0; +void set_flag_mqtt_need_ping(uint32_t tick) +{ + mqtt_send_ping_flag = 1; +} + /***************************************************************************** */ void system_init() { @@ -148,6 +154,8 @@ int main(void) /* Connect to brocker and start MQTT protocol handling */ mqtt_init(UART0, DEBUG_UART); + /* For tests, send MQTT ping packets every 2 seconds */ + add_systick_callback(set_flag_mqtt_need_ping, 2*1000); while (1) { /* Check for received packet on serial line and handle them */ @@ -160,6 +168,10 @@ int main(void) } packet_ok = NULL; } + if (mqtt_send_ping_flag == 1) { + mqtt_send_ping(UART0, DEBUG_UART); + mqtt_send_ping_flag = 0; + } chenillard(250); } return 0; diff --git a/mqtt_sub/mqtt_comm.c b/mqtt_sub/mqtt_comm.c index ecbb3eb..aca4341 100644 --- a/mqtt_sub/mqtt_comm.c +++ b/mqtt_sub/mqtt_comm.c @@ -169,8 +169,8 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) struct mqtt_connack_reply_pkt* reply = (struct mqtt_connack_reply_pkt*)(buf + HEADER_SIZE); ret = mqtt_check_connack_reply_pkt(reply); if (ret != 0) { - uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret); - return -3; + uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error unless pub pkt ...\n", ret); + goto chek_for_pub_pkt; } /* Valid packet, check return code */ if (reply->ret_code == MQTT_CONNACK_ACCEPTED) { @@ -195,7 +195,7 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) ret = mqtt_check_suback_reply_pkt(reply, header->data_len); if (ret != 0) { uprintf(dbg_uart, "Not a suback packet (ret: %d) ... protocol flow error\n", ret); - return -5; + goto chek_for_pub_pkt; } if (ntohs(reply->acked_pkt_id) != sub_packet_id) { uprintf(dbg_uart, "Not a suback for the right packet (got %d, our is %d) ...\n", @@ -214,8 +214,6 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) sub_packet_id++; break; } - case MQTT_WAITING_PUBACK: - case MQTT_WAITING_PUBREC: case MQTT_WAITING_PUBREL: { /* Check for this being a pubrel (release) and send pubcomp (complete) */ struct mqtt_publish_reply_pkt* pub_comp = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE); @@ -224,7 +222,7 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) int len = 0; if (ret < 0) { uprintf(dbg_uart, "MQTT packet is not a valid pubrel.\n"); - return -7; + goto chek_for_pub_pkt; } uprintf(dbg_uart, "MQTT pubrel received, sending pubcomp\n"); len = mqtt_pack_publish_reply_pkt((uint8_t*)pub_comp, ntohs(pub_rel->acked_pkt_id), MQTT_CONTROL_PUBCOMP); @@ -242,10 +240,24 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) mqtt_comm_state = MQTT_IDLE; break; } + case MQTT_WAITING_PINGRESP: { + struct mqtt_ping_pkt* ppkt = (struct mqtt_ping_pkt*)(buf + HEADER_SIZE); + int ret = 0; + ret = mqtt_check_ping_reply_pkt(ppkt); + if (ret < 0) { + uprintf(dbg_uart, "MQTT packet is not a valid ping reply packet: %d.\n", ret); + goto chek_for_pub_pkt; + } + mqtt_comm_state = MQTT_IDLE; + break; + } + case MQTT_WAITING_PUBREC: case MQTT_WAITING_PUBCOMP: case MQTT_WAITING_UNSUBACK: - case MQTT_WAITING_PINGRESP: - case MQTT_IDLE: { + case MQTT_WAITING_PUBACK: + uprintf(dbg_uart, "Received something, but current state (%d) is invlaid\n", mqtt_comm_state); + case MQTT_IDLE: + chek_for_pub_pkt: { struct mqtt_publish_pkt pub_rx; struct mqtt_publish_reply_pkt* pub_ack = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE); int ret = 0, len = 0; @@ -288,3 +300,25 @@ int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) return 0; } +int mqtt_send_ping(int comm_uart, int dbg_uart) +{ + int len = 0, ret = 0; + memset(txbuf, 0, MQTT_BUFF_SIZE); + len = mqtt_pack_ping_pkt(txbuf + HEADER_SIZE); + if (len <= 0) { + uprintf(dbg_uart, "MQTT pack ping pkt error : %d\n", len); + return len; + } + /* Add our packet header */ + add_packet_header(len); + + /* Send ping MQTT packet */ + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + mqtt_comm_state = MQTT_WAITING_PINGRESP; + return 0; +} + diff --git a/mqtt_sub/mqtt_comm.h b/mqtt_sub/mqtt_comm.h index fbcf49c..a9368af 100644 --- a/mqtt_sub/mqtt_comm.h +++ b/mqtt_sub/mqtt_comm.h @@ -70,5 +70,6 @@ int mqtt_temp_publish(int comm_uart, int dbg_uart, int temp); int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart); +int mqtt_send_ping(int comm_uart, int dbg_uart); #endif /* MQTT_COMM_H */ -- 2.43.0