From 672f7fdffcbfc4a4352627dbb111b9f6ce0aa7f2 Mon Sep 17 00:00:00 2001 From: Nathael Pajani Date: Sat, 16 Mar 2019 17:36:26 +0100 Subject: [PATCH] Add subscriber example. Under test. --- mqtt_sub/Makefile | 21 ++++ mqtt_sub/main.c | 167 ++++++++++++++++++++++++++ mqtt_sub/mqtt_comm.c | 278 +++++++++++++++++++++++++++++++++++++++++++ mqtt_sub/mqtt_comm.h | 74 ++++++++++++ 4 files changed, 540 insertions(+) create mode 100644 mqtt_sub/Makefile create mode 100644 mqtt_sub/main.c create mode 100644 mqtt_sub/mqtt_comm.c create mode 100644 mqtt_sub/mqtt_comm.h diff --git a/mqtt_sub/Makefile b/mqtt_sub/Makefile new file mode 100644 index 0000000..41ae555 --- /dev/null +++ b/mqtt_sub/Makefile @@ -0,0 +1,21 @@ +# Makefile for apps + +MODULE = $(shell basename $(shell cd .. && pwd && cd -)) +NAME = $(shell basename $(CURDIR)) + +# Add this to your ~/.vimrc in order to get proper function of :make in vim : +# let $COMPILE_FROM_IDE = 1 +ifeq ($(strip $(COMPILE_FROM_IDE)),) + PRINT_DIRECTORY = --no-print-directory +else + PRINT_DIRECTORY = + LANG = C +endif + +.PHONY: $(NAME).bin +$(NAME).bin: + @make -C ../../.. ${PRINT_DIRECTORY} NAME=$(NAME) MODULE=$(MODULE) apps/$(MODULE)/$(NAME)/$@ + +clean mrproper: + @make -C ../../.. ${PRINT_DIRECTORY} $@ + diff --git a/mqtt_sub/main.c b/mqtt_sub/main.c new file mode 100644 index 0000000..69d600b --- /dev/null +++ b/mqtt_sub/main.c @@ -0,0 +1,167 @@ +/**************************************************************************** + * apps/base/mqtt_sub/main.c + * + * MQTT client example using data from onboard TMP101 I2C temperature sensor + * + * Copyright 2013-2014 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + + +#include "core/system.h" +#include "core/systick.h" +#include "core/pio.h" + +#include "lib/stdio.h" +#include "lib/errno.h" +#include "lib/utils.h" +#include "lib/protocols/mqtt.h" + +#include "drivers/i2c.h" +#include "drivers/serial.h" +#include "drivers/gpio.h" + +#include "extdrv/status_led.h" +#include "extdrv/tmp101_temp_sensor.h" + +#include "mqtt_comm.h" + +#define MODULE_VERSION 0x04 +#define MODULE_NAME "GPIO Demo Module" + + +#define SELECTED_FREQ FREQ_SEL_48MHz + +/***************************************************************************** */ +/* Pins configuration */ +/* pins blocks are passed to set_pins() for pins configuration. + * Unused pin blocks can be removed safely with the corresponding set_pins() call + * All pins blocks may be safelly merged in a single block for single set_pins() call.. + */ +const struct pio_config common_pins[] = { + /* UART 0 */ + { LPC_UART0_RX_PIO_0_1, LPC_IO_DIGITAL }, + { LPC_UART0_TX_PIO_0_2, LPC_IO_DIGITAL }, + /* UART 1 */ + { LPC_UART1_RX_PIO_0_8, LPC_IO_DIGITAL }, + { LPC_UART1_TX_PIO_0_9, LPC_IO_DIGITAL }, + /* I2C 0 */ + { LPC_I2C0_SCL_PIO_0_10, (LPC_IO_DIGITAL | LPC_IO_OPEN_DRAIN_ENABLE) }, + { LPC_I2C0_SDA_PIO_0_11, (LPC_IO_DIGITAL | LPC_IO_OPEN_DRAIN_ENABLE) }, + ARRAY_LAST_PIO, +}; + +const struct pio status_led_green = LPC_GPIO_1_4; +const struct pio status_led_red = LPC_GPIO_1_5; + + +uint8_t rx_packets[2][MQTT_BUFF_SIZE]; +volatile uint8_t* rxbuf = rx_packets[0]; +volatile uint8_t* packet_ok = NULL; +void data_rx(uint8_t c) +{ + static uint8_t idx = 0; + static uint8_t sum = 0; + /* Not receiving a packet yet, wait for FIRST_PACKET_CHAR, drop */ + if ((idx == 0) && (c != FIRST_PACKET_CHAR)) { + return; + } + rxbuf[idx++] = c; + sum += c; + /* If header received, make sure checksum is OK */ + if (idx == HEADER_SIZE) { + uint8_t start = 1; + if (sum == 0) { + /* Header received and checksum OK */ + return; + } + /* Shift buffer content if buffer holds a FIRST_PACKET_CHAR */ + while ((rxbuf[start] != FIRST_PACKET_CHAR) && (start <= HEADER_SIZE)) { + start++; + } + idx = 0; /* Restart at 0 (and if no FIRST_PACKET_CHAR found, this drops everything */ + sum = 0; + /* FIRST_PACKET_CHAR found ? */ + while (start < HEADER_SIZE) { + rxbuf[idx] = rxbuf[start++]; + sum += rxbuf[idx]; + idx++; + } + } else if (idx > HEADER_SIZE) { + struct app_header* header = (struct app_header*)rxbuf; + /* Header received, check for data part */ + if (idx == (HEADER_SIZE + header->data_len)) { + /* Full packet received, move to next packet */ + if (sum == header->data_cksum) { + packet_ok = rxbuf; + if (rxbuf == rx_packets[0]) { + rxbuf = rx_packets[1]; + } else { + rxbuf = rx_packets[0]; + } + } + idx = 0; + sum = 0; + } + } +} + +/***************************************************************************** */ +void system_init() +{ + /* Stop the watchdog */ + startup_watchdog_disable(); /* Do it right now, before it gets a chance to break in */ + system_brown_out_detection_config(0); /* No ADC used */ + system_set_default_power_state(); + clock_config(SELECTED_FREQ); + set_pins(common_pins); + gpio_on(); + status_led_config(&status_led_green, &status_led_red); + /* System tick timer MUST be configured and running in order to use the sleeping + * functions */ + systick_timer_on(1); /* 1ms */ + systick_start(); +} + + +/***************************************************************************** */ +#define DEBUG_UART UART0 +int main(void) +{ + system_init(); + uart_on(UART0, 115200, data_rx); + uart_on(UART1, 115200, NULL); + + /* Connect to brocker and start MQTT protocol handling */ + mqtt_init(UART0, DEBUG_UART); + + while (1) { + /* Check for received packet on serial line and handle them */ + if (packet_ok != NULL) { + int ret = mqtt_handle_packet((char*)packet_ok, UART0, DEBUG_UART); + if (ret != 0) { + uprintf(DEBUG_UART, "Packet handling returned error %d\n", ret); + } else { + uprintf(DEBUG_UART, "Packet handled\n"); + } + packet_ok = NULL; + } + chenillard(250); + } + return 0; +} + diff --git a/mqtt_sub/mqtt_comm.c b/mqtt_sub/mqtt_comm.c new file mode 100644 index 0000000..bc2fbcc --- /dev/null +++ b/mqtt_sub/mqtt_comm.c @@ -0,0 +1,278 @@ +/**************************************************************************** + * apps/base/mqtt_pub/mqtt_comm.c + * + * MQTT client example using data from onboard TMP101 I2C temperature sensor + * + * Copyright 2013-2014 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + + +#include "lib/stdio.h" +#include "lib/errno.h" +#include "lib/utils.h" +#include "lib/protocols/mqtt.h" + +#include "drivers/serial.h" + +#include "mqtt_comm.h" + + +/***************************************************************************** */ +/* MQTT protocol */ + +/* Our MQTT buffer. MQTT packets may be up to 2^28 bytes, but we will keep them + * very small for our needs */ +uint8_t txbuf[MQTT_BUFF_SIZE]; +uint16_t app_board_addr = APP_BOARD_ADDRESS; + +/* This is our static connect data */ +const struct mqtt_connect_pkt mqtt_conn = { + .keep_alive_seconds = 600, + .clean_session_flag = MQTT_SESSION_NEW, /* or MQTT_SESSION_RESUME */ + .client_id = "tsub", + .will_topic = { .name = "will/tsub", .QoS = 0 }, + .will_retain = 1, + .will_msg_size = 4, + .will_msg = (uint8_t*)"Bye", +}; + +uint8_t mqtt_comm_state = MQTT_UNCONNECTED; + + +void add_packet_header(int pkt_len) +{ + struct app_header* header = (struct app_header*)txbuf; + uint8_t i = 0, cksum = 0; + + /* Compute data checksum */ + for (i = HEADER_SIZE; i < (HEADER_SIZE + pkt_len); i++) { + cksum += txbuf[i]; + } + + /* Add the header */ + header->start = FIRST_PACKET_CHAR; + header->type = 'M'; + header->addr = htons(app_board_addr); + header->seqnum = 0; + header->data_len = pkt_len; + header->data_cksum = cksum; + header->header_cksum = 0; /* Erase header checksum from previous packet */ + /* Compute header chksum */ + cksum = 0; + for (i = 0; i < HEADER_SIZE; i++) { + cksum += txbuf[i]; + } + header->header_cksum = (uint8_t)(256 - cksum); +} + +int mqtt_init(int comm_uart, int dbg_uart) +{ + int len = 0, ret = 0; + + memset(txbuf, 0, MQTT_BUFF_SIZE); + /* Create the MQTT connect packet */ + len = mqtt_pack_connect_packet(&mqtt_conn, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE)); + if (len <= 0) { + uprintf(dbg_uart, "MQTT connect pkt pack error : %d\n", len); + return -E2BIG; + } + /* Add our packet header */ + add_packet_header(len); + + /* Send connect MQTT packet */ + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + mqtt_comm_state = MQTT_WAITING_CONNECT_ACK; + return 0; +} + + +struct mqtt_topic topics[] = { + { .name = "temp/test", .QoS = 2, }, + { .name = "will/temp", .QoS = 1, }, +#ifdef TEST + { .name = "#/bla", .QoS = 0, }, /* This one will fail, for test purpose */ +#else + { .name = "+/bla", .QoS = 0, }, +#endif +}; +struct mqtt_sub_pkt sub_pkt = { + .nb_topics = 3, + .topics = topics, +}; +static uint16_t sub_packet_id = 1; + +int mqtt_temp_subscribe(int comm_uart, int dbg_uart) +{ + int len = 0, ret = 0; + + memset(txbuf, 0, MQTT_BUFF_SIZE); + /* Create the MQTT publish packet */ + sub_pkt.packet_id = sub_packet_id; + len = mqtt_pack_subscribe_pkt(&sub_pkt, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE)); + if (len <= 0) { + uprintf(dbg_uart, "MQTT subscribe pkt pack error : %d\n", len); + return len; + } + /* Add our packet header */ + add_packet_header(len); + + /* Send the subscribe MQTT packet */ + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + mqtt_comm_state = MQTT_WAITING_SUBACK; + return 0; +} + + +int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart) +{ + struct app_header* header = (struct app_header*)buf; + int ret = 0; + + /* Decode address */ + header->addr = ntohs(header->addr); + if ((!buf) || (header->addr != APP_BOARD_ADDRESS)) { + /* This one is not for us, drop it */ + uprintf(dbg_uart, "Dropping packet, not for us\n"); + return -1; + } + if (header->type != 'M') { + uprintf(dbg_uart, "Dropping packet, not MQTT\n"); + return -2; + } + /* This is an MQTT packet for us ... handle it */ + switch (mqtt_comm_state) { + case MQTT_WAITING_CONNECT_ACK: { + /* We must not receive anything before the connect acknowledge */ + struct mqtt_connack_reply_pkt* reply = (struct mqtt_connack_reply_pkt*)(buf + HEADER_SIZE); + ret = mqtt_check_connack_reply_pkt(reply); + if (ret != 0) { + uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret); + return -3; + } + /* Valid packet, check return code */ + if (reply->ret_code == MQTT_CONNACK_ACCEPTED) { + uprintf(dbg_uart, "Connection accepted, sending subscriptions\n"); + mqtt_comm_state = MQTT_IDLE; + ret = mqtt_temp_subscribe(comm_uart, dbg_uart); + if (ret != 0) { + uprintf(dbg_uart, "Unable to send our subscriptions: %d\n", ret); + return ret; + } + } else { + uprintf(dbg_uart, "Connection refused: %d\n", reply->ret_code); + mqtt_comm_state = MQTT_UNCONNECTED; + return -4; + } + break; + } + case MQTT_WAITING_SUBACK: { + struct mqtt_sub_reply_pkt* reply = (struct mqtt_sub_reply_pkt*)(buf + HEADER_SIZE); + uint8_t* acks = (uint8_t*)(buf + HEADER_SIZE + sizeof(struct mqtt_sub_reply_pkt)); + int i = 0; + ret = mqtt_check_suback_reply_pkt(reply, header->data_len); + if (ret != 0) { + uprintf(dbg_uart, "Not a suback packet (ret: %d) ... protocol flow error\n", ret); + return -5; + } + if (ntohs(reply->acked_pkt_id) != sub_packet_id) { + uprintf(dbg_uart, "Not a suback for the right packet (got %d, our is %d) ...\n", + ntohs(reply->acked_pkt_id), sub_packet_id); + return -6; + } + /* Check that all subscriptions got accepted */ + for (i = 0; i < sub_pkt.nb_topics; i++) { + if (acks[i] > MQTT_QoS_2) { + /* Display info on debug UART ... not much to be done about it */ + uprintf(dbg_uart, "Subscription n°%d (%s) got refused\n", i, sub_pkt.topics[i]); + } + } + uprintf(dbg_uart, "Subscriptions accepted\n"); + mqtt_comm_state = MQTT_IDLE; + sub_packet_id++; + break; + } + case MQTT_WAITING_PUBACK: + case MQTT_WAITING_PUBREC: + case MQTT_WAITING_PUBREL: { + /* Check for this being a pubrel (release) and send pubcomp (complete */ + struct mqtt_publish_reply_pkt* ack = (struct mqtt_publish_reply_pkt*)(buf + HEADER_SIZE); + int ret = mqtt_check_publish_reply_pkt(ack, MQTT_CONTROL_PUBREL); + int len = 0; + if (ret < 0) { + uprintf(dbg_uart, "MQTT packet is not a valid pubrel.\n"); + return -7; + } + len = mqtt_pack_publish_reply_pkt((uint8_t*)ack, htons(ack->acked_pkt_id), MQTT_CONTROL_PUBCOMP); + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + mqtt_comm_state = MQTT_IDLE; + break; + } + case MQTT_WAITING_PUBCOMP: + case MQTT_WAITING_UNSUBACK: + case MQTT_WAITING_PINGRESP: + case MQTT_IDLE: { + struct mqtt_publish_pkt pub_rx; + struct mqtt_publish_reply_pkt pub_ack; + int ret = 0, len = 0; + + /* Make sure this is a publish packet */ + ret = mqtt_unpack_publish_packet(&pub_rx, (uint8_t*)(buf + HEADER_SIZE), + header->data_len); + if (ret <= 0) { + uprintf(dbg_uart, "Received packet in idle state is not a publish packet (got %d): %d.\n", + ((buf[HEADER_SIZE] >> 4) & 0x0F), ret); + return -7; + } + uprintf(dbg_uart, "Received publication\n"); + /* Send the pub ack packet ? */ + if (pub_rx.topic.QoS != 0) { + uint8_t type = MQTT_CONTROL_PUBACK; + if (pub_rx.topic.QoS == MQTT_QoS_2) { + type = MQTT_CONTROL_PUBREC; + } + len = mqtt_pack_publish_reply_pkt((uint8_t*)&pub_ack, pub_rx.packet_id, type); + ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE)); + if (ret < 0) { + uprintf(dbg_uart, "MQTT send error: %d\n", ret); + return ret; + } + if (pub_rx.topic.QoS == MQTT_QoS_2) { + mqtt_comm_state = MQTT_WAITING_PUBREL; + } else { + mqtt_comm_state = MQTT_IDLE; + } + } + /* Perform something with the packet ... */ + break; + } + } + return 0; +} + diff --git a/mqtt_sub/mqtt_comm.h b/mqtt_sub/mqtt_comm.h new file mode 100644 index 0000000..fbcf49c --- /dev/null +++ b/mqtt_sub/mqtt_comm.h @@ -0,0 +1,74 @@ +/**************************************************************************** + * apps/base/mqtt_pub/mqtt_comm.h + * + * MQTT client example using data from onboard TMP101 I2C temperature sensor + * + * Copyright 2013-2014 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +#ifndef MQTT_COMM_H +#define MQTT_COMM_H + +#include "lib/stdio.h" +#include "lib/errno.h" +#include "lib/utils.h" +#include "lib/protocols/mqtt.h" + +#define APP_BOARD_ADDRESS 0x0102 + + +/***************************************************************************** */ +/* MQTT protocol */ + +/* MQTT packets may be up to 2^28 bytes, but we will keep them very small for our needs */ +#define MQTT_BUFF_SIZE 62 +#define FIRST_PACKET_CHAR '#' + +#define HEADER_SIZE sizeof(struct app_header) +struct app_header { + uint8_t start; /* '#' */ + uint8_t type; /* 'M' for MQTT messages */ + uint16_t addr; + uint8_t seqnum; + uint8_t header_cksum; + uint8_t data_len; + uint8_t data_cksum; +}; + +/* Possible states for MQTT communication flow */ +enum mqtt_client_states { + MQTT_UNCONNECTED, + MQTT_IDLE, + MQTT_WAITING_CONNECT_ACK, + MQTT_WAITING_PUBACK, + MQTT_WAITING_PUBREC, + MQTT_WAITING_PUBREL, + MQTT_WAITING_PUBCOMP, + MQTT_WAITING_SUBACK, + MQTT_WAITING_UNSUBACK, + MQTT_WAITING_PINGRESP, +}; + +int mqtt_init(int comm_uart, int dbg_uart); + +int mqtt_temp_publish(int comm_uart, int dbg_uart, int temp); + +int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart); + +#endif /* MQTT_COMM_H */ + -- 2.43.0