--- /dev/null
+MQTT protocol example, Subscriber part
+
+Copyright 2019 Nathael Pajani <nathael.pajani@ed3l.fr>
+
+
+/* ****************************************************************************
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ *************************************************************************** */
+
+This example shows the support of the subscriber part ot he MQTT protocol
+implementation provided in lib/protocols/mqtt.c
+
+This implementation has been designed to implement only the MQTT packet
+building and decoding, as specified by the MQTT protocol specification, and
+leave all the MQTT message flow implementation to the application, as it is not
+part of the MQTT protocol specification and is application dependent.
+
+One can thus implement either a simple "single in-flight packet" mechanism for
+very lightweight applications or complex multi-packet mechanisms.
+
+This example subscribes to the temperature publication topic created by the
+publish example.
+
+As MQTT requires a lossless, ordered, addressable transport protocol, we used a
+simple protocol over serial communication, which would allow communication of
+multiple "clients" over an RS485 serial link.
+This protocol is decoded by the bridge example provided in host/mqtt_bridge,
+which simply transfers the MQTT part of received packets on a TCP socket
+connected to an MQTT message broker, and encapsulate MQTT packets received on
+ths socket in our simple protocol before sending them on the serial link.
}
}
+volatile int mqtt_send_ping_flag = 0;
+void set_flag_mqtt_need_ping(uint32_t tick)
+{
+ mqtt_send_ping_flag = 1;
+}
+
/***************************************************************************** */
void system_init()
{
/* Connect to brocker and start MQTT protocol handling */
mqtt_init(UART0, DEBUG_UART);
+ /* For tests, send MQTT ping packets every 2 seconds */
+ add_systick_callback(set_flag_mqtt_need_ping, 2*1000);
while (1) {
/* Check for received packet on serial line and handle them */
}
packet_ok = NULL;
}
+ if (mqtt_send_ping_flag == 1) {
+ mqtt_send_ping(UART0, DEBUG_UART);
+ mqtt_send_ping_flag = 0;
+ }
chenillard(250);
}
return 0;
struct mqtt_connack_reply_pkt* reply = (struct mqtt_connack_reply_pkt*)(buf + HEADER_SIZE);
ret = mqtt_check_connack_reply_pkt(reply);
if (ret != 0) {
- uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret);
- return -3;
+ uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error unless pub pkt ...\n", ret);
+ goto chek_for_pub_pkt;
}
/* Valid packet, check return code */
if (reply->ret_code == MQTT_CONNACK_ACCEPTED) {
ret = mqtt_check_suback_reply_pkt(reply, header->data_len);
if (ret != 0) {
uprintf(dbg_uart, "Not a suback packet (ret: %d) ... protocol flow error\n", ret);
- return -5;
+ goto chek_for_pub_pkt;
}
if (ntohs(reply->acked_pkt_id) != sub_packet_id) {
uprintf(dbg_uart, "Not a suback for the right packet (got %d, our is %d) ...\n",
sub_packet_id++;
break;
}
- case MQTT_WAITING_PUBACK:
- case MQTT_WAITING_PUBREC:
case MQTT_WAITING_PUBREL: {
/* Check for this being a pubrel (release) and send pubcomp (complete) */
struct mqtt_publish_reply_pkt* pub_comp = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE);
int len = 0;
if (ret < 0) {
uprintf(dbg_uart, "MQTT packet is not a valid pubrel.\n");
- return -7;
+ goto chek_for_pub_pkt;
}
uprintf(dbg_uart, "MQTT pubrel received, sending pubcomp\n");
len = mqtt_pack_publish_reply_pkt((uint8_t*)pub_comp, ntohs(pub_rel->acked_pkt_id), MQTT_CONTROL_PUBCOMP);
mqtt_comm_state = MQTT_IDLE;
break;
}
+ case MQTT_WAITING_PINGRESP: {
+ struct mqtt_ping_pkt* ppkt = (struct mqtt_ping_pkt*)(buf + HEADER_SIZE);
+ int ret = 0;
+ ret = mqtt_check_ping_reply_pkt(ppkt);
+ if (ret < 0) {
+ uprintf(dbg_uart, "MQTT packet is not a valid ping reply packet: %d.\n", ret);
+ goto chek_for_pub_pkt;
+ }
+ mqtt_comm_state = MQTT_IDLE;
+ break;
+ }
+ case MQTT_WAITING_PUBREC:
case MQTT_WAITING_PUBCOMP:
case MQTT_WAITING_UNSUBACK:
- case MQTT_WAITING_PINGRESP:
- case MQTT_IDLE: {
+ case MQTT_WAITING_PUBACK:
+ uprintf(dbg_uart, "Received something, but current state (%d) is invlaid\n", mqtt_comm_state);
+ case MQTT_IDLE:
+ chek_for_pub_pkt: {
struct mqtt_publish_pkt pub_rx;
struct mqtt_publish_reply_pkt* pub_ack = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE);
int ret = 0, len = 0;
return 0;
}
+int mqtt_send_ping(int comm_uart, int dbg_uart)
+{
+ int len = 0, ret = 0;
+ memset(txbuf, 0, MQTT_BUFF_SIZE);
+ len = mqtt_pack_ping_pkt(txbuf + HEADER_SIZE);
+ if (len <= 0) {
+ uprintf(dbg_uart, "MQTT pack ping pkt error : %d\n", len);
+ return len;
+ }
+ /* Add our packet header */
+ add_packet_header(len);
+
+ /* Send ping MQTT packet */
+ ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE));
+ if (ret < 0) {
+ uprintf(dbg_uart, "MQTT send error: %d\n", ret);
+ return ret;
+ }
+ mqtt_comm_state = MQTT_WAITING_PINGRESP;
+ return 0;
+}
+