From d20f7a7f4849a82f3622bed32f8b641e0246f3c9 Mon Sep 17 00:00:00 2001 From: Nathael Pajani Date: Sun, 10 Mar 2019 14:23:35 +0100 Subject: [PATCH] Split mqtt publish example in two files. --- mqtt_pub/main.c | 201 +---------------------------------------- mqtt_pub/mqtt_comm.c | 208 +++++++++++++++++++++++++++++++++++++++++++ mqtt_pub/mqtt_comm.h | 74 +++++++++++++++ 3 files changed, 283 insertions(+), 200 deletions(-) create mode 100644 mqtt_pub/mqtt_comm.c create mode 100644 mqtt_pub/mqtt_comm.h diff --git a/mqtt_pub/main.c b/mqtt_pub/main.c index d96ba07..2d2c8ba 100644 --- a/mqtt_pub/main.c +++ b/mqtt_pub/main.c @@ -38,6 +38,7 @@ #include "extdrv/status_led.h" #include "extdrv/tmp101_temp_sensor.h" +#include "mqtt_comm.h" #define MODULE_VERSION 0x04 #define MODULE_NAME "GPIO Demo Module" @@ -104,206 +105,6 @@ int temp_read(int uart_num) return deci_degrees; } -/***************************************************************************** */ -/* MQTT protocol */ - -/* Our MQTT buffer. MQTT packets may be up to 2^28 bytes, but we will keep them - * very small for our needs */ -#define MQTT_BUFF_SIZE 62 -#define FIRST_PACKET_CHAR '#' -uint8_t txbuf[MQTT_BUFF_SIZE]; -uint16_t app_board_addr = APP_BOARD_ADDRESS; - -/* This is our static connect data */ -const struct mqtt_connect_pkt mqtt_conn = { - .keep_alive_seconds = 600, - .clean_session_flag = MQTT_SESSION_RESUME, - .client_id = "test", - .will_topic = "will/temp/test", - .will_msg_size = 4, - .will_msg = (uint8_t*)"Bye", - .will_QoS = 0, - .will_retain = 1, - .username = "test", - .password = "pass", -}; - -#define HEADER_SIZE sizeof(struct app_header) -struct app_header { - uint8_t start; /* '#' */ - uint8_t type; /* 'M' for MQTT messages */ - uint16_t addr; - uint8_t seqnum; - uint8_t header_cksum; - uint8_t data_len; - uint8_t data_cksum; -}; - -/* Possible states for MQTT communication flow */ -enum mqtt_client_states { - MQTT_UNCONNECTED, - MQTT_IDLE, - MQTT_WAITING_CONNECT_ACK, - MQTT_WAITING_PUBACK, - MQTT_WAITING_PUBREC, - MQTT_WAITING_PUBREL, - MQTT_WAITING_PUBCOMP, - MQTT_WAITING_SUBACK, - MQTT_WAITING_UNSUBACK, - MQTT_WAITING_PINGRESP, -}; -uint8_t mqtt_comm_state = MQTT_UNCONNECTED; - - -void add_packet_header(int pkt_len) -{ - struct app_header* header = (struct app_header*)txbuf; - uint8_t i = 0, cksum = 0; - - /* Compute data checksum */ - for (i = HEADER_SIZE; i < (HEADER_SIZE + pkt_len); i++) { - cksum += txbuf[i]; - } - - /* Add the header */ - header->start = FIRST_PACKET_CHAR; - header->type = 'M'; - header->addr = htons(app_board_addr); - header->seqnum = 0; - header->data_len = pkt_len; - header->data_cksum = cksum; - /* Compute header chksum */ - cksum = 0; - for (i = 0; i < HEADER_SIZE; i++) { - cksum += txbuf[i]; - } - header->header_cksum = (uint8_t)(256 - cksum); -} - -int mqtt_init(int comm_uart, int dbg_uart) -{ - int len = 0, ret = 0; - - memset(txbuf, 0, MQTT_BUFF_SIZE); - /* Create the MQTT connect packet */ - len = mqtt_pack_connect_packet(&mqtt_conn, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE)); - if (len <= 0) { - uprintf(dbg_uart, "MQTT connect pkt pack error : %d\n", len); - return -E2BIG; - } - /* Add our packet header */ - add_packet_header(len); - - /* Send connect MQTT packet */ - ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); - if (ret < 0) { - uprintf(dbg_uart, "MQTT send error: %d\n", ret); - return ret; - } - mqtt_comm_state = MQTT_WAITING_CONNECT_ACK; - return 0; -} - - -struct mqtt_publish_pkt pub_pkt = { - .topic = "temp/test", - .QoS = 1, - .retain_flag = 1, - .message_size = sizeof(int), -}; -static uint16_t pub_packet_id = 1; - -int mqtt_temp_publish(int comm_uart, int dbg_uart, int temp) -{ - int len = 0, ret = 0; - - memset(txbuf, 0, MQTT_BUFF_SIZE); - /* Create the MQTT publish packet */ - pub_pkt.packet_id = pub_packet_id; - pub_pkt.application_message = (uint8_t*)&temp; - len = mqtt_pack_publish_packet(&pub_pkt, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE)); - if (len <= 0) { - uprintf(dbg_uart, "MQTT publish pkt pack error : %d\n", len); - return -E2BIG; - } - /* Add our packet header */ - add_packet_header(len); - - /* Send publish MQTT packet */ - ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); - if (ret < 0) { - uprintf(dbg_uart, "MQTT send error: %d\n", ret); - return ret; - } - mqtt_comm_state = MQTT_WAITING_PUBACK; - return 0; -} - - -int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) -{ - struct app_header* header = (struct app_header*)buf; - int ret = 0; - - /* Decode address */ - header->addr = ntohs(header->addr); - if ((!buf) || (header->addr != APP_BOARD_ADDRESS)) { - /* This one is not for us, drop it */ - uprintf(dbg_uart, "Dropping packet, not for us\n"); - return -1; - } - if (header->type != 'M') { - uprintf(dbg_uart, "Dropping packet, not MQTT\n"); - return -2; - } - /* This is an MQTT packet for us ... handle it */ - switch (mqtt_comm_state) { - case MQTT_WAITING_CONNECT_ACK: { - /* We must not receive anything before the connect acknowledge */ - struct mqtt_connack_response_pkt* response = (struct mqtt_connack_response_pkt*)(buf + HEADER_SIZE); - ret = mqtt_check_connack_response_pkt(response); - if (ret != 0) { - uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret); - return -3; - } - /* Valid packet, check return code */ - if (response->ret_code == MQTT_CONNACK_ACCEPTED) { - uprintf(dbg_uart, "Connection accepted\n"); - mqtt_comm_state = MQTT_IDLE; - } else { - uprintf(dbg_uart, "Connection refused: %d\n", response->ret_code); - mqtt_comm_state = MQTT_UNCONNECTED; - return -4; - } - break; - } - case MQTT_WAITING_PUBACK: { - struct mqtt_publish_response_pkt* response = (struct mqtt_publish_response_pkt*)(buf + HEADER_SIZE); - ret = mqtt_check_publish_response_pkt(response, MQTT_WAITING_PUBACK); - if (ret != 0) { - uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret); - return -5; - } - if (response->acked_pkt_id != pub_packet_id) { - uprintf(dbg_uart, "Not a connack for the right packet (got %d, our is %d) ...\n", - response->acked_pkt_id, pub_packet_id); - return -6; - } - mqtt_comm_state = MQTT_IDLE; - pub_packet_id++; - break; - } - case MQTT_WAITING_PUBREC: - case MQTT_WAITING_PUBREL: - case MQTT_WAITING_PUBCOMP: - case MQTT_WAITING_SUBACK: - case MQTT_WAITING_UNSUBACK: - case MQTT_WAITING_PINGRESP: - break; - } - return 0; -} - uint8_t rx_packets[2][MQTT_BUFF_SIZE]; volatile uint8_t* rxbuf = rx_packets[0]; diff --git a/mqtt_pub/mqtt_comm.c b/mqtt_pub/mqtt_comm.c new file mode 100644 index 0000000..0a7be41 --- /dev/null +++ b/mqtt_pub/mqtt_comm.c @@ -0,0 +1,208 @@ +/**************************************************************************** + * apps/base/mqtt_pub/mqtt_comm.c + * + * MQTT client example using data from onboard TMP101 I2C temperature sensor + * + * Copyright 2013-2014 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + + +#include "lib/stdio.h" +#include "lib/errno.h" +#include "lib/utils.h" +#include "lib/protocols/mqtt.h" + +#include "drivers/serial.h" + +#include "mqtt_comm.h" + + +/***************************************************************************** */ +/* MQTT protocol */ + +/* Our MQTT buffer. MQTT packets may be up to 2^28 bytes, but we will keep them + * very small for our needs */ +uint8_t txbuf[MQTT_BUFF_SIZE]; +uint16_t app_board_addr = APP_BOARD_ADDRESS; + +/* This is our static connect data */ +const struct mqtt_connect_pkt mqtt_conn = { + .keep_alive_seconds = 600, + .clean_session_flag = MQTT_SESSION_RESUME, + .client_id = "test", + .will_topic = "will/temp/test", + .will_msg_size = 4, + .will_msg = (uint8_t*)"Bye", + .will_QoS = 0, + .will_retain = 1, + .username = "test", + .password = "pass", +}; + +uint8_t mqtt_comm_state = MQTT_UNCONNECTED; + + +void add_packet_header(int pkt_len) +{ + struct app_header* header = (struct app_header*)txbuf; + uint8_t i = 0, cksum = 0; + + /* Compute data checksum */ + for (i = HEADER_SIZE; i < (HEADER_SIZE + pkt_len); i++) { + cksum += txbuf[i]; + } + + /* Add the header */ + header->start = FIRST_PACKET_CHAR; + header->type = 'M'; + header->addr = htons(app_board_addr); + header->seqnum = 0; + header->data_len = pkt_len; + header->data_cksum = cksum; + /* Compute header chksum */ + cksum = 0; + for (i = 0; i < HEADER_SIZE; i++) { + cksum += txbuf[i]; + } + header->header_cksum = (uint8_t)(256 - cksum); +} + +int mqtt_init(int comm_uart, int dbg_uart) +{ + int len = 0, ret = 0; + + memset(txbuf, 0, MQTT_BUFF_SIZE); + /* Create the MQTT connect packet */ + len = mqtt_pack_connect_packet(&mqtt_conn, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE)); + if (len <= 0) { + uprintf(dbg_uart, "MQTT connect pkt pack error : %d\n", len); + return -E2BIG; + } + /* Add our packet header */ + add_packet_header(len); + + /* Send connect MQTT packet */ + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + mqtt_comm_state = MQTT_WAITING_CONNECT_ACK; + return 0; +} + + +struct mqtt_publish_pkt pub_pkt = { + .topic = "temp/test", + .QoS = 1, + .retain_flag = 1, + .message_size = sizeof(int), +}; +static uint16_t pub_packet_id = 1; + +int mqtt_temp_publish(int comm_uart, int dbg_uart, int temp) +{ + int len = 0, ret = 0; + + memset(txbuf, 0, MQTT_BUFF_SIZE); + /* Create the MQTT publish packet */ + pub_pkt.packet_id = pub_packet_id; + pub_pkt.application_message = (uint8_t*)&temp; + len = mqtt_pack_publish_packet(&pub_pkt, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE)); + if (len <= 0) { + uprintf(dbg_uart, "MQTT publish pkt pack error : %d\n", len); + return -E2BIG; + } + /* Add our packet header */ + add_packet_header(len); + + /* Send publish MQTT packet */ + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + mqtt_comm_state = MQTT_WAITING_PUBACK; + return 0; +} + + +int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) +{ + struct app_header* header = (struct app_header*)buf; + int ret = 0; + + /* Decode address */ + header->addr = ntohs(header->addr); + if ((!buf) || (header->addr != APP_BOARD_ADDRESS)) { + /* This one is not for us, drop it */ + uprintf(dbg_uart, "Dropping packet, not for us\n"); + return -1; + } + if (header->type != 'M') { + uprintf(dbg_uart, "Dropping packet, not MQTT\n"); + return -2; + } + /* This is an MQTT packet for us ... handle it */ + switch (mqtt_comm_state) { + case MQTT_WAITING_CONNECT_ACK: { + /* We must not receive anything before the connect acknowledge */ + struct mqtt_connack_response_pkt* response = (struct mqtt_connack_response_pkt*)(buf + HEADER_SIZE); + ret = mqtt_check_connack_response_pkt(response); + if (ret != 0) { + uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret); + return -3; + } + /* Valid packet, check return code */ + if (response->ret_code == MQTT_CONNACK_ACCEPTED) { + uprintf(dbg_uart, "Connection accepted\n"); + mqtt_comm_state = MQTT_IDLE; + } else { + uprintf(dbg_uart, "Connection refused: %d\n", response->ret_code); + mqtt_comm_state = MQTT_UNCONNECTED; + return -4; + } + break; + } + case MQTT_WAITING_PUBACK: { + struct mqtt_publish_response_pkt* response = (struct mqtt_publish_response_pkt*)(buf + HEADER_SIZE); + ret = mqtt_check_publish_response_pkt(response, MQTT_WAITING_PUBACK); + if (ret != 0) { + uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret); + return -5; + } + if (response->acked_pkt_id != pub_packet_id) { + uprintf(dbg_uart, "Not a connack for the right packet (got %d, our is %d) ...\n", + response->acked_pkt_id, pub_packet_id); + return -6; + } + mqtt_comm_state = MQTT_IDLE; + pub_packet_id++; + break; + } + case MQTT_WAITING_PUBREC: + case MQTT_WAITING_PUBREL: + case MQTT_WAITING_PUBCOMP: + case MQTT_WAITING_SUBACK: + case MQTT_WAITING_UNSUBACK: + case MQTT_WAITING_PINGRESP: + break; + } + return 0; +} + diff --git a/mqtt_pub/mqtt_comm.h b/mqtt_pub/mqtt_comm.h new file mode 100644 index 0000000..fbcf49c --- /dev/null +++ b/mqtt_pub/mqtt_comm.h @@ -0,0 +1,74 @@ +/**************************************************************************** + * apps/base/mqtt_pub/mqtt_comm.h + * + * MQTT client example using data from onboard TMP101 I2C temperature sensor + * + * Copyright 2013-2014 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +#ifndef MQTT_COMM_H +#define MQTT_COMM_H + +#include "lib/stdio.h" +#include "lib/errno.h" +#include "lib/utils.h" +#include "lib/protocols/mqtt.h" + +#define APP_BOARD_ADDRESS 0x0102 + + +/***************************************************************************** */ +/* MQTT protocol */ + +/* MQTT packets may be up to 2^28 bytes, but we will keep them very small for our needs */ +#define MQTT_BUFF_SIZE 62 +#define FIRST_PACKET_CHAR '#' + +#define HEADER_SIZE sizeof(struct app_header) +struct app_header { + uint8_t start; /* '#' */ + uint8_t type; /* 'M' for MQTT messages */ + uint16_t addr; + uint8_t seqnum; + uint8_t header_cksum; + uint8_t data_len; + uint8_t data_cksum; +}; + +/* Possible states for MQTT communication flow */ +enum mqtt_client_states { + MQTT_UNCONNECTED, + MQTT_IDLE, + MQTT_WAITING_CONNECT_ACK, + MQTT_WAITING_PUBACK, + MQTT_WAITING_PUBREC, + MQTT_WAITING_PUBREL, + MQTT_WAITING_PUBCOMP, + MQTT_WAITING_SUBACK, + MQTT_WAITING_UNSUBACK, + MQTT_WAITING_PINGRESP, +}; + +int mqtt_init(int comm_uart, int dbg_uart); + +int mqtt_temp_publish(int comm_uart, int dbg_uart, int temp); + +int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart); + +#endif /* MQTT_COMM_H */ + -- 2.43.0