From de276b8ffe36bd5c05347301aaafb2bb644738e4 Mon Sep 17 00:00:00 2001 From: Nathael Pajani Date: Tue, 8 Nov 2022 16:44:38 +0100 Subject: [PATCH] Move host side app to a sub-directory of the related lpc122x app --- mqtt_pub/host_bridge/.gitignore | 1 + mqtt_pub/host_bridge/Makefile | 32 +++ mqtt_pub/host_bridge/README | 33 +++ mqtt_pub/host_bridge/list.h | 231 +++++++++++++++++++ mqtt_pub/host_bridge/main.c | 261 +++++++++++++++++++++ mqtt_pub/host_bridge/protocol.c | 343 ++++++++++++++++++++++++++++ mqtt_pub/host_bridge/protocol.h | 149 ++++++++++++ mqtt_pub/host_bridge/serial_utils.c | 59 +++++ mqtt_pub/host_bridge/serial_utils.h | 32 +++ mqtt_pub/host_bridge/sock_utils.c | 70 ++++++ mqtt_pub/host_bridge/sock_utils.h | 35 +++ 11 files changed, 1246 insertions(+) create mode 100644 mqtt_pub/host_bridge/.gitignore create mode 100644 mqtt_pub/host_bridge/Makefile create mode 100644 mqtt_pub/host_bridge/README create mode 100644 mqtt_pub/host_bridge/list.h create mode 100644 mqtt_pub/host_bridge/main.c create mode 100644 mqtt_pub/host_bridge/protocol.c create mode 100644 mqtt_pub/host_bridge/protocol.h create mode 100644 mqtt_pub/host_bridge/serial_utils.c create mode 100644 mqtt_pub/host_bridge/serial_utils.h create mode 100644 mqtt_pub/host_bridge/sock_utils.c create mode 100644 mqtt_pub/host_bridge/sock_utils.h diff --git a/mqtt_pub/host_bridge/.gitignore b/mqtt_pub/host_bridge/.gitignore new file mode 100644 index 0000000..68ebdb9 --- /dev/null +++ b/mqtt_pub/host_bridge/.gitignore @@ -0,0 +1 @@ +mqtt_bridge diff --git a/mqtt_pub/host_bridge/Makefile b/mqtt_pub/host_bridge/Makefile new file mode 100644 index 0000000..3ab932f --- /dev/null +++ b/mqtt_pub/host_bridge/Makefile @@ -0,0 +1,32 @@ +#CROSS_COMPILE ?= arm-linux-gnueabihf- +CC = $(CROSS_COMPILE)gcc + +CFLAGS = -Wall -O2 -Wextra + +EXEC = mqtt_bridge + +all: $(EXEC) + + +OBJDIR = objs +SRC = $(shell find . -name \*.c) +OBJS = ${SRC:%.c=${OBJDIR}/%.o} +INCLUDES = includes/ + +${OBJDIR}/%.o: %.c + @mkdir -p $(dir $@) + @echo "-- compiling" $< + @$(CC) -MMD -MP -MF ${OBJDIR}/$*.d $(CFLAGS) $< -c -o $@ -I$(INCLUDES) + +$(EXEC): $(OBJS) + @echo "Linking $@ ..." + @$(CC) $(LDFLAGS) -o $@ $(OBJS) + @echo Done. + + +clean: + find ${OBJDIR} -name "*.o" -exec rm {} \; + find ${OBJDIR} -name "*.d" -exec rm {} \; + +mrproper: clean + rm -f $(EXEC) diff --git a/mqtt_pub/host_bridge/README b/mqtt_pub/host_bridge/README new file mode 100644 index 0000000..7ec5122 --- /dev/null +++ b/mqtt_pub/host_bridge/README @@ -0,0 +1,33 @@ +MQTT protocol bridge example + +Copyright 2019 Nathael Pajani + + +/* **************************************************************************** + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +This example is a bridge used to demonstrate the support of the MQTT protocol. +It implements the serial protocol used in apps/base/mqtt_sub and +apps/base/mqtt_pub examples and forwards the MQTT packets to a MQTT broker +like mosquitto. + +As MQTT requires a lossless, ordered, addressable transport protocol, we used a +simple protocol over serial communication, which would allow communication of +multiple "clients" over an RS485 serial link. +This protocol is decoded by the bridge example provided in host/mqtt_bridge, +which simply transfers the MQTT part of received packets on a TCP socket +connected to an MQTT message broker, and encapsulate MQTT packets received on +this socket in our simple protocol before sending them on the serial link. diff --git a/mqtt_pub/host_bridge/list.h b/mqtt_pub/host_bridge/list.h new file mode 100644 index 0000000..c584086 --- /dev/null +++ b/mqtt_pub/host_bridge/list.h @@ -0,0 +1,231 @@ +/* + * list.h + * + * This code is from the linux kernel. It is a very simple and powerfull doubly + * linked list implementation. + * Not everything has been taken from the original file. + */ + +#ifndef _LINUX_LIST_H +#define _LINUX_LIST_H + + +/* linux/kernel.h */ + +/** + * container_of - cast a member of a structure out to the containing structure + * @ptr: the pointer to the member. + * @type: the type of the container struct this is embedded in. + * @member: the name of the member within the struct. + * + */ +#define container_of(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - offsetof(type,member) );}) + + + +/* linux/stddef.h */ + +#ifdef __compiler_offsetof +#define offsetof(TYPE,MEMBER) __compiler_offsetof(TYPE,MEMBER) +#else +#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) +#endif + + + +/* linux/list.h */ + +/* + * Simple doubly linked list implementation. + * + * Some of the internal functions ("__xxx") are useful when + * manipulating whole lists rather than single entries, as + * sometimes we already know the next/prev entries and we can + * generate better code by using them directly rather than + * using the generic single-entry routines. + */ + +struct list_head { + struct list_head *next, *prev; +}; + +/* + * These are non-NULL pointers that will result in page faults + * under normal circumstances, used to verify that nobody uses + * non-initialized list entries. + */ +#define LIST_POISON1 ((void *) 0x00100100) +#define LIST_POISON2 ((void *) 0x00200200) + + +#define LIST_HEAD_INIT(name) { &(name), &(name) } + +#define LIST_HEAD(name) \ + struct list_head name = LIST_HEAD_INIT(name) + +static inline void INIT_LIST_HEAD(struct list_head *list) +{ + list->next = list; + list->prev = list; +} + +/* + * Insert a new entry between two known consecutive entries. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_add(struct list_head *new, + struct list_head *prev, + struct list_head *next) +{ + next->prev = new; + new->next = next; + new->prev = prev; + prev->next = new; +} + +/** + * list_add - add a new entry + * @new: new entry to be added + * @head: list head to add it after + * + * Insert a new entry after the specified head. + * This is good for implementing stacks. + */ +static inline void list_add(struct list_head *new, struct list_head *head) +{ + __list_add(new, head, head->next); +} + + +/** + * list_add_tail - add a new entry + * @new: new entry to be added + * @head: list head to add it before + * + * Insert a new entry before the specified head. + * This is useful for implementing queues. + */ +static inline void list_add_tail(struct list_head *new, struct list_head *head) +{ + __list_add(new, head->prev, head); +} + +/* + * Delete a list entry by making the prev/next entries + * point to each other. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_del(struct list_head * prev, struct list_head * next) +{ + next->prev = prev; + prev->next = next; +} + +/** + * list_del - deletes entry from list. + * @entry: the element to delete from the list. + * Note: list_empty() on entry does not return true after this, the entry is + * in an undefined state. + */ +static inline void __list_del_entry(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); +} + +static inline void list_del(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + entry->next = LIST_POISON1; + entry->prev = LIST_POISON2; +} + +/** + * list_is_last - tests whether @list is the last entry in list @head + * @list: the entry to test + * @head: the head of the list + */ +static inline int list_is_last(const struct list_head *list, + const struct list_head *head) +{ + return list->next == head; +} + +/** + * list_empty - tests whether a list is empty + * @head: the list to test. + */ +static inline int list_empty(const struct list_head *head) +{ + return head->next == head; +} + +/** + * list_entry - get the struct for this entry + * @ptr: the struct list_head pointer. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + */ +#define list_entry(ptr, type, member) \ + container_of(ptr, type, member) + +/** + * list_first_entry - get the first element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +/** + * list_last_entry - get the last element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_last_entry(ptr, type, member) \ + list_entry((ptr)->prev, type, member) + +/** + * list_next_entry - get the next element in list + * @pos: the type * to cursor + * @member: the name of the list_struct within the struct. + */ +#define list_next_entry(pos, member) \ + list_entry((pos)->member.next, typeof(*(pos)), member) + +/** + * list_for_each_entry - iterate over list of given type + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry(pos, head, member) \ + for (pos = list_first_entry(head, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_next_entry(pos, member)) + +/** + * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry + * @pos: the type * to use as a loop cursor. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe(pos, n, head, member) \ + for (pos = list_first_entry(head, typeof(*pos), member), \ + n = list_next_entry(pos, member); \ + &pos->member != (head); \ + pos = n, n = list_next_entry(n, member)) + +#endif /* _LINUX_LIST_H */ diff --git a/mqtt_pub/host_bridge/main.c b/mqtt_pub/host_bridge/main.c new file mode 100644 index 0000000..4d26a48 --- /dev/null +++ b/mqtt_pub/host_bridge/main.c @@ -0,0 +1,261 @@ +/**************************************************************************** + * MQTT serial bridge. + * + * main.c + * + * + * Copyright 2019 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + +/* This protocol handler is designed to run on a host. */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "protocol.h" +#include "serial_utils.h" +#include "sock_utils.h" + + +#define BUF_SIZE 1024 + +#define PROG_NAME "MQTT protocol bridge" +#define VERSION "0.1" + + +void help(char *prog_name) +{ + fprintf(stderr, "---------------- "PROG_NAME" --------------------------------\n"); + fprintf(stderr, "Usage: %s [options]\n" \ + " Available options:\n" \ + " \t -a | --addr : IP address of MQTT server to connect to\n" \ + " \t -p | --port : Port on which the MQTT server is running\n" \ + " \t -d | --device : Serial device to use for serial communication with the module\n" \ + " \t -h | --help : Print this help\n" \ + " \t -v | --version : Print programm version\n" \ + " All other arguments are data for the command, handled depending on the command.\n", prog_name); + fprintf(stderr, "-----------------------------------------------------------------------\n"); +} + + +int main(int argc, char* argv[]) +{ + /* Main data structure */ + struct internal_data glob; + + memset(&glob, 0, sizeof(struct internal_data)); + + /* Parse options */ + while(1) { + int option_index = 0; + int c = 0; + + struct option long_options[] = { + {"addr", required_argument, 0, 'a'}, + {"port", required_argument, 0, 'p'}, + {"device", required_argument, 0, 'd'}, + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'v'}, + {0, 0, 0, 0} + }; + + c = getopt_long(argc, argv, "a:p:d:hv", long_options, &option_index); + + /* no more options to parse */ + if (c == -1) break; + switch (c) { + /* a, server ip address */ + case 'a': + glob.ip = optarg; + break; + + /* p, server port */ + case 'p': + glob.port = strtoul(optarg, NULL, 0); + break; + + /* d, device */ + case 'd': + glob.device = optarg; + break; + + /* v, version */ + case 'v': + printf("%s Version %s\n", PROG_NAME, VERSION); + return 0; + break; + + /* h, help */ + case 'h': + default: + help(argv[0]); + return 0; + } + } + + /* Need Serial port and server IP address and port as parameter */ + if ((glob.device == NULL) || (glob.ip == NULL) || (glob.port == 0)) { + printf("Error, need both serial (tty) device and IP address and port number\n"); + help(argv[0]); + return -1; + } + + /* Open tty */ + glob.ser.fd = serial_setup(glob.device); + if (glob.ser.fd < 0) { + printf("Unable to open specified serial port %s\n", glob.device); + return -2; + } + + /* ************************************************* */ + /* Global data init : prepare list of clients */ + INIT_LIST_HEAD(&glob.clients); + + /* ************************************************* */ + /* Initial file descriptor set setup */ + FD_ZERO(&(glob.read_fds)); + FD_SET(glob.ser.fd, &(glob.read_fds)); /* Serial link */ + glob.max_fd = glob.ser.fd + 1; /* No close, this one is the last open, so it's the higest */ + + + /* ************************************************* */ + /* And never stop handling data ! */ + while (1) { + /* Select related vars */ + fd_set tmp_read_fds; + int nb = 0; + /* And buffers ones */ + char buf[BUF_SIZE]; + + /* select() call .... be kind to other processes */ + memcpy(&tmp_read_fds, &glob.read_fds, sizeof(fd_set)); /* There's no pointers in an fd_set, so memcpy is OK */ + nb = select(glob.max_fd, &tmp_read_fds, NULL, NULL, NULL); + + /* Errors here are bad ones .. exit ?? */ + if (nb < 0) { + perror ("select failed"); + printf("BRIDGE Error: select failed, this is critical.\n"); + return -10; + } + + /* Data from Ethernet side ? check all clients */ + if (! list_empty(&glob.clients)) { + struct client* client = NULL, * tmp_client = NULL; + list_for_each_entry_safe(client, tmp_client, &glob.clients, list) { + int len = 0; + if (FD_ISSET(client->socket, &tmp_read_fds)) { + /* Track the number of found file descriptors */ + nb--; + /* Receive the new data */ + memset(buf, 0, BUF_SIZE); + len = read(client->socket, buf, BUF_SIZE); + printf("BRIDGE: Got data for client %d: %d bytes\n", client->address, len); + if (len > 0) { + int ret = 0, used = 0; + do { + /* Try to recover a packet within received data */ + ret = server_stream_decode((buf + used), (len - used), client); + /* If MQTT packet is malformed, then close connection */ + if (ret == -1) { + printf("BRIDGE: Error on TCP packet reception : bad packet size\n"); + len = -1; + break; + } + /* If packet is complete, then send it to the client */ + if (client->rx_ptr == client->data_len) { + handle_server_data(&glob.ser, client); + } + used += ret; + } while ((ret > 0) && ((len - used) > 0)); + } else if (len == 0) { + /* Server closed the connection ? */ + printf("BRIDGE: Server closed the connection for client %d\n", client->address); + len = -1; + } else { + /* Wait .. we received something but nothing came in ? Close socket */ + perror("BRIDGE: Receive error for client TCP connection"); + printf("BRIDGE: Error on TCP packet reception (ret: %d)\n", len); + len = -1; + } + /* Handle errors either from read() or from server_stream_decode() */ + if (len < 0) { + /* Remove client socket from select list, close socket, tell the client that the TCP + * connection got closed and remove client from list of clients */ + remove_client(&glob, client); + } + } + } + } + + /* Handle module messages */ + if (FD_ISSET(glob.ser.fd, &tmp_read_fds)) { + int idx = 0, len = 0; + /* Track the number of found file descriptors */ + nb--; + /* Get serial data and try to build a packet */ + memset(buf, 0, BUF_SIZE); + len = read(glob.ser.fd, buf, BUF_SIZE); + if (len < 0) { + perror("BRIDGE: serial read error"); + /* FIXME : handle errors */ + } else if (len == 0) { + /* Wait .. we received something but nothing came in ? */ + printf("\nBRIDGE: Error, got activity on serial link, but no data ...\n"); + } + while (idx < len) { + int ret = serial_protocol_decode(buf[idx], &glob.ser); + /* Check return code to know if we have a valid packet */ + if (ret == -1) { + /* Anything that's not part of a packet is printed as is (debug output) */ + printf("%c", buf[idx]); + } else if (ret < 0) { + printf("\nBRIDGE: Error in received packet. (ret: %d)\n", ret); + } else if (ret == 0) { + /* Packet is being built */ + } else { + /* Valid packet received */ + handle_serial_data(&glob); + } + idx++; + } + } + + if (nb != 0) { + /* This should never happen, kept for debug only ! */ + printf("BRIDGE: Looked through all clients ... but still got %d fd set !\n", nb); + return -5; + } + } /* End of infinite loop */ + + return 0; +} + + diff --git a/mqtt_pub/host_bridge/protocol.c b/mqtt_pub/host_bridge/protocol.c new file mode 100644 index 0000000..57b3352 --- /dev/null +++ b/mqtt_pub/host_bridge/protocol.c @@ -0,0 +1,343 @@ +/**************************************************************************** + * protocol.c + * + * + * Copyright 2019 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + +/* Host side implementation of the communication protocol */ + +#include +#include +#include +#include +#include +#include +#include "protocol.h" +#include "sock_utils.h" + + +/******************************************************************************/ +/* Handle packet reception, including checksums + * 'sum' is used to sum all the received characters, and if the last byte of sum is 0 for each + * part (header and data) then the packet is valid. + * 'full_size' is the size of the whole packet, including header, updated as soon as the header + * is checked and valid + * + * This function must be called for every received character. + * If the character is part of a packet but the packet is being built, then the function returns 0. + * When the character is the last one of a valid packet, then the function returns the packet size + * and the packet in ser->rx_packet is valid. + * If the character is the last one of a packet which has an invalid data checksum, this function + * returns -2 and the data is lost. + * If the character is not part of a packet it returns -1. The character may then be part of + * a debug message (and displayed by the host), or any other kind of communication. + * When a set of consecutive characters have been used to build a packet but the packet is + * not valid due to header error, then the function returns -3 (checksum error) or -4 (data size + * error). The data in ser->rx_packet is the received data but is not valid. + * The corresponding data size is always sizeof(struct header). + */ +int serial_protocol_decode(char c, struct serial* ser) +{ + struct app_header* info = (struct app_header*)(&(ser->packet)); + int ret = 0; + + /* Do not start reception before receiving the packet start character */ + if ((ser->rx_ptr == 0) && (c != FIRST_PACKET_CHAR)) { + /* This one id not a critical error, but this character is not part of a packet */ + return -1; + } + + /* Store the new byte in the packet */ + ser->packet[ser->rx_ptr++] = c; + ser->sum += c; + + /* Is this packet valid ? (at end of header reception) */ + if (ser->rx_ptr == HEADER_SIZE) { + /* Checksum OK ? */ + if (ser->sum != 0) { + ret = -2; + goto next_packet; + } + /* Start the new checksum for data (if any) */ + ser->sum = 0; + ser->full_size = HEADER_SIZE + info->data_len; + /* Make sure the packet will fit in the buffer */ + if (ser->full_size > MQTT_BUFF_SIZE) { + ret = -3; + goto next_packet; + } + } + + /* Did we receive the whole packet ? */ + if (ser->rx_ptr == ser->full_size) { + if (ser->sum != info->data_cksum) { + ret = -4; + goto next_packet; + } + ser->pkt_ok_size = ser->full_size; + ret = ser->full_size; + /* And get ready to receive the next packet */ + goto next_packet; + } + + return 0; + +next_packet: +#ifdef DEBUG + printf("BRIDGE: Current rx_ptr: %d, packet full size: %d, ret: %d\n", ser->rx_ptr, ser->full_size, ret); + if (ser->rx_ptr >= HEADER_SIZE) { + printf("BRIDGE: Pkt: type: %d, addr: %d, seq: %d, data len: %d\n", + info->type, ntohs(info->addr), info->seqnum, info->data_len); + } +#endif + /* Wether the packet was OK or not doesn't matter, go on for a new one :) */ + ser->full_size = 0; + ser->rx_ptr = 0; + ser->sum = 0; + return ret; +} + + +/******************************************************************************/ +/* This function handles sending packets to clients connected over the serial link. + * It returns the number of bytes of data sent. + */ +int serial_send_packet(struct serial* ser, struct client* target) +{ + struct app_header* info; + uint32_t len = HEADER_SIZE + target->data_len; + uint32_t i = 0, sent = 0; + uint8_t sum = 0; + + info = (struct app_header*)target->packet; + + /* Fill header */ + info->start = '#'; + info->type = 'M'; /* MQTT packet */ + info->addr = htons(target->address); + info->seqnum = target->seqnum; + info->data_len = target->data_len; + + /* Compute data checksum */ + for (i = HEADER_SIZE; i < len; i++) { + sum += target->packet[i]; + } + info->data_cksum = sum; + + /* Compute header checksum */ + sum = 0; + info->header_cksum = 0; /* Erase checksum of previous packet */ + for (i = 0; i < HEADER_SIZE; i++) { + sum += target->packet[i]; + } + info->header_cksum = ((uint8_t)(256 - sum)); + + /* And send the packet on the serial link */ + while (sent < len) { + int ret = write(ser->fd, (target->packet + sent), (len - sent)); + if (ret >= 0) { + sent += ret; + } else { + /* Sending error ... */ + perror("Serial send error:"); + /* FIXME : handle / report errors */ + break; + } + } + /* Packet got sent, get ready for the next one */ + target->data_len = 0; + target->rx_ptr = 0; + return sent; +} + + +/******************************************************************************/ +int handle_server_data(struct serial* ser, struct client* client) +{ + return serial_send_packet(ser, client); +} + + +/******************************************************************************/ +/* This function only decodes the "remaining length" field, in order to pack a complete packet + * before sending it to the device. + * Aside from getting a valid MQTTT packet size there is no check on validity of either content + * or protocol. + * On success, returns the number of bytes used from buf. + * Returns 0 if there was not enough data to read the size or fill the data buffer. + * Returns -1 if length is invalid and the connection should be closed. + */ +int server_stream_decode(char* buf, int len, struct client* client) +{ + char* mqttbuf = client->packet + HEADER_SIZE; + int idx = 0; + uint32_t size = 0; + + /* Try to get enough bytes to read the size of the packet */ + while ((client->data_len == 0) && (idx < len)) { + /* Read one byte */ + mqttbuf[client->rx_ptr++] = buf[idx++]; + /* Check for size */ + if ((client->rx_ptr >= 2) && ((mqttbuf[client->rx_ptr - 1] & 0x80) == 0)) { + int len_idx = 1; + uint32_t length = 0; + do { + length += (mqttbuf[len_idx] & 0x7F) << (7 * (len_idx - 1)); + } while (buf[len_idx++] & 0x80); + client->data_len = length + len_idx; + break; + } + /* If we already received 5 bytes and were not able to read a packet size, then + * there is a protocol error */ + if (client->rx_ptr >= 5) { + return -1; + } + } + /* Some MQTT packet have no data, We are done */ + if (client->rx_ptr == client->data_len) { + return idx; + } + /* Is there any more data available ? */ + if ((len - idx) == 0) { + return len; + } + /* Copy buffer to internal buffer, up to the size of one packet */ + size = len - idx; + if ((client->rx_ptr + size) > client->data_len) { + size = client->data_len - client->rx_ptr; + } + memcpy((mqttbuf + client->rx_ptr), (buf + idx), size); + client->rx_ptr += size; + return idx + size; +} + + +/******************************************************************************/ +int tcp_send_packet(struct serial* ser, struct client* client) +{ + struct app_header* info = NULL; + int ret = 0, size = 0; + info = (struct app_header*)ser->packet; + /* Store sequence number */ + client->seqnum = info->seqnum; + /* And send packet to the server (broker) */ + size = (ser->pkt_ok_size - HEADER_SIZE); + ret = write(client->socket, (ser->packet + HEADER_SIZE), size); + if (ret != size) { + perror("BRIDGE: Socket write error:"); + printf("BRIDGE: Transmit error for client(%d) TCP connection", client->address); + return ret; + } + ser->pkt_ok_size = 0; + return 0; +} + + +/******************************************************************************/ +void remove_client(struct internal_data* glob, struct client* client) +{ + struct client* ctmp = NULL; + + /* Remove client socket from select list and close socket */ + FD_CLR(client->socket, &glob->read_fds); + close(client->socket); + /* Tell the client that the TCP connection got closed ? */ + /* FIXME */ + /* Remove client from list of clients */ + list_del(&client->list); + /* Update max_fd */ + if (list_empty(&glob->clients)) { + glob->max_fd = glob->ser.fd + 1; + } else { + list_for_each_entry(ctmp, &glob->clients, list) { + if (ctmp->socket >= glob->max_fd) { + glob->max_fd = ctmp->socket + 1; + } + } + } + free(client); +} + +/******************************************************************************/ +/* This function is used to match the received packet to an existing client, or create a new + * one and open its connection to the server if the client is not in the list. + */ +#define MQTT_CONTROL_CONNECT (0x01 << 4) +int handle_serial_data(struct internal_data* glob) +{ + struct client* client = NULL; + struct app_header* info = NULL; + uint16_t address = 0; + int found = 0, ret = 0; + + if (glob == NULL) { + return -1; + } + + info = (struct app_header*)glob->ser.packet; + address = ntohs(info->addr); + + /* Read address from serial input buffer */ + if (! list_empty(&glob->clients)) { + list_for_each_entry(client, &glob->clients, list) { + if (client->address == address) { + found = 1; + break; + } + } + } + + /* Need to create a new client ? */ + if (found == 0) { + client = malloc(sizeof(struct client)); + memset(client, 0, sizeof(struct client)); + /* Create TCP client socket */ + client->socket = socket_tcp_client(glob->ip, glob->port); + if (client->socket <= 0) { + printf("BRIDGE: Unable to open the TCP socket on port %d\n", glob->port); + return -2; + } + FD_SET(client->socket, &glob->read_fds); + if (client->socket >= glob->max_fd) { + glob->max_fd = client->socket + 1; + } + /* Add the client to the list */ + list_add(&client->list, &glob->clients); + + /* Store address and sequence number */ + client->address = address; + /* This is a new client. + * We could check that the packet is an MQTT connect packet and refuse to send it if + * it is not, but we would then have to tell the client. + * Sending the packet anyway will have the server close the connection, which will + * be forwarded to the client, forcing him to re-open the connection using a valid + * connect packet. + */ + } + + /* We now have a valid client, */ + ret = tcp_send_packet(&glob->ser, client); + if (ret < 0) { + remove_client(glob, client); + return -1; + } + return 0; +} + diff --git a/mqtt_pub/host_bridge/protocol.h b/mqtt_pub/host_bridge/protocol.h new file mode 100644 index 0000000..ec5fa15 --- /dev/null +++ b/mqtt_pub/host_bridge/protocol.h @@ -0,0 +1,149 @@ +/**************************************************************************** + * protocol.h + * + * + * Copyright 2019 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + +/* Host side implementation of the communication protocol */ + +#ifndef PROTOCOL_HOST_H +#define PROTOCOL_HOST_H + + +#include +#include +#include "list.h" + +#define MQTT_BUFF_SIZE 64 +#define FIRST_PACKET_CHAR '#' +#define HEADER_SIZE sizeof(struct app_header) +struct app_header { + uint8_t start; /* '#' */ + uint8_t type; /* 'M' for MQTT messages */ + uint16_t addr; + uint8_t seqnum; + uint8_t header_cksum; + uint8_t data_len; + uint8_t data_cksum; +}; + + +/******************************************************************************/ +/* Handle TCP packet reception + * 'rx_ptr' points to current rx position in MQTT buffer while receiving data. MQTT buffer starts + * at 'HEADER_SIZE' in packet buffer. + * in order to leave some place for the header in the buffer. + * 'rx_ptr' and 'data_len' should be uint32_t for MQTT protocol, but we limmited our packets to + * 64 bytes, so uint8_t is OK. + */ +struct client { + struct list_head list; + int socket; + int mqtt_state; + uint16_t address; + uint8_t seqnum; + char packet[MQTT_BUFF_SIZE]; + uint8_t rx_ptr; /* Position for next received char in MQTT buffer */ + uint8_t data_len; /* Decoded data length of MQTT part of packet */ +}; + + +/******************************************************************************/ +/* Handle packet reception, including checksums */ +/* 'sum' is used to sum all the received characters, and if the last byte of sum is 0 for each + * part (header and data) then the packet is valid. + * 'full_size' is the size of the whole packet, including header, updated as soon as the header + * is checked and valid + * 'rx_ptr' and 'full_size' should be uint32_t for MQTT protocol, but we limmited our packets to + * 64 bytes, so uint8_t is OK. + */ +struct serial { + int fd; /* Serial link file descriptor */ + /* Packet building data */ + char packet[MQTT_BUFF_SIZE]; + uint8_t rx_ptr; + uint8_t sum; + uint8_t full_size; + uint8_t pkt_ok_size; /* Size of received and valid packet, only set once packet is checked */ +}; + + +/******************************************************************************/ +struct internal_data { + /* Serial link */ + char* device; /* Serial device name */ + struct serial ser; + /* Common MQTT Broker information */ + char* ip; + int port; + /* Handle list of clients and file descriptors on which we can receive data */ + fd_set read_fds; + int max_fd; + struct list_head clients; /* List of clients */ +}; + + + +/******************************************************************************/ +/* Handle packet reception, including checksums + * This function must be called for every received character. + * If the character is part of a packet but the packet is being built, then the function returns 0. + * When the character is the last one of a valid packet, then the function returns the packet size + * and the packet in rx_data->rx_packet is valid. + * If the character is the last one of a packet which has an invalid data checksum, this function + * returns -2 and the data is lost. + * If the character is not part of a packet it returns -1. The character may then be part of + * a debug message (and displayed by the host), or any other kind of communication. + * When a set of consecutive characters have been used to build a packet but the packet is + * not valid due to header error, then the function returns -3 (checksum error) or -4 (data size + * error). The data in rx_data->rx_packet is the received data but is not valid. + * The corresponding data size is always sizeof(struct header). + */ +int serial_protocol_decode(char c, struct serial* ser); + + +/******************************************************************************/ +/* This function handles sending packets to clients connected over the serial link. + * It returns the number of bytes of data sent. + */ +int serial_send_packet(struct serial* ser, struct client* target); + +/******************************************************************************/ +/* This function is used to match the received packet to an existing client, or create a new + * one and open its connection to the server if the client is not in the list. + */ +int handle_serial_data(struct internal_data* glob); + + +/******************************************************************************/ +int handle_server_data(struct serial* ser, struct client* client); + +/******************************************************************************/ +int server_stream_decode(char* buf, int len, struct client* client); + +/******************************************************************************/ +int tcp_send_packet(struct serial* ser, struct client* client); + +/******************************************************************************/ +void remove_client(struct internal_data* glob, struct client* client); + + +#endif /* PROTOCOL_HOST_H */ + diff --git a/mqtt_pub/host_bridge/serial_utils.c b/mqtt_pub/host_bridge/serial_utils.c new file mode 100644 index 0000000..5e17e42 --- /dev/null +++ b/mqtt_pub/host_bridge/serial_utils.c @@ -0,0 +1,59 @@ +/********************************************************************* + * + * Serial utility functions + * + * + * Copyright 2012-2014 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *********************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define SERIAL_BAUD B115200 + + +int serial_setup(char* name) +{ + struct termios tio; + int fd = -1; + + /* Open serial port */ + fd = open(name, O_RDWR); + if (fd < 0) { + perror("Unable to open communication with companion chip"); + return -1; + } + /* Setup serial port */ + memset(&tio, 0, sizeof(tio)); + tio.c_cflag = CS8 | CREAD | CLOCAL; /* 8n1, see termios.h for more information */ + tio.c_cc[VMIN] = 1; + tio.c_cc[VTIME] = 5; + cfsetospeed(&tio, SERIAL_BAUD); + cfsetispeed(&tio, SERIAL_BAUD); + tcsetattr(fd, TCSANOW, &tio); + + return fd; +} + diff --git a/mqtt_pub/host_bridge/serial_utils.h b/mqtt_pub/host_bridge/serial_utils.h new file mode 100644 index 0000000..7c17bcf --- /dev/null +++ b/mqtt_pub/host_bridge/serial_utils.h @@ -0,0 +1,32 @@ +/********************************************************************* + * + * Serial utility functions + * + * + * Copyright 2012 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *********************************************************************/ +#ifndef SERIAL_UTILS_H +#define SERIAL_UTILS_H + +/* Setup serial comunication, using name if given or saved name if name is NULL + * SERIAL_BAUD B38400 + * c_cflag (CS7 | PARENB | CREAD | CLOCAL) (7e1) + */ +int serial_setup(char* name); + +#endif /* SERIAL_UTILS_H */ + diff --git a/mqtt_pub/host_bridge/sock_utils.c b/mqtt_pub/host_bridge/sock_utils.c new file mode 100644 index 0000000..a7832ee --- /dev/null +++ b/mqtt_pub/host_bridge/sock_utils.c @@ -0,0 +1,70 @@ +/***************************************************************************** + * + * socket utils + * + * + * Copyright 2012 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *****************************************************************************/ + +#include /* perror */ +#include /* memset */ +#include /* close, fcntl */ +#include /* F_SETFL and O_NONBLOCK for fcntl */ + +/* For socket stuff */ +#include /* For portability for socket stuff */ +#include +#include +#include +#include /* For TCP_NODELAY to disable nagle algorithm */ + + +int socket_tcp_client(char* ip, int port) +{ + struct sockaddr_in addr; + int s, ret; + const int on = 1; + + if ((ip == NULL) || (port == 0)) { + printf("BRIDGE: Aborted socket creation, need both IP address and port\n"); + return -1; + } + addr.sin_addr.s_addr = inet_addr(ip); + addr.sin_port = htons(port); + addr.sin_family = AF_INET; + + s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) { + printf("BRIDGE: Socket creation error\n"); + return -1; + } + + ret = connect(s, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)); + if (ret != 0) { + close(s); + printf("BRIDGE: Socket unable to connect to peer"); + return -1; + } + + /* This is used to allow quicker sending of small packets on wires */ + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(int)); + + return s; +} + + + diff --git a/mqtt_pub/host_bridge/sock_utils.h b/mqtt_pub/host_bridge/sock_utils.h new file mode 100644 index 0000000..5b97104 --- /dev/null +++ b/mqtt_pub/host_bridge/sock_utils.h @@ -0,0 +1,35 @@ +/***************************************************************************** + * + * socket utils + * + * + * Copyright 2012 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *****************************************************************************/ + +#ifndef SOCK_UTILS_H +#define SOCK_UTILS_H + +#include /* struct sockaddr_in */ + + +/* Creates a socket and connect to the specified IP and port + * Returns the socket, after disabling nagle algorithm. + */ +int socket_tcp_client(char* ip, int port); + +#endif /* SOCK_UTILS_H */ + -- 2.43.0