From: Nathael Pajani Date: Thu, 31 Oct 2024 13:30:57 +0000 (+0100) Subject: Add host app for Scialys Linux extention board X-Git-Url: http://git.techno-innov.fr/?a=commitdiff_plain;h=HEAD;p=soft%2Flpc122x%2Fscialys Add host app for Scialys Linux extention board Decode scialys frames and send relevent information to MQTT brocker --- diff --git a/host/scialys_mqtt_bridge/.gitignore b/host/scialys_mqtt_bridge/.gitignore new file mode 100644 index 0000000..aae5aca --- /dev/null +++ b/host/scialys_mqtt_bridge/.gitignore @@ -0,0 +1,20 @@ +# +# NOTE! Please use 'git ls-files -i --exclude-standard' +# command after changing this file, to see if there are +# any tracked files which get ignored after the change. +# +# Normal rules +# +*.o +*.d +*.bin +*.elf +*.map +*/objs/* +*.zip +*.svg +*.dump +*.img +*.bak +tags +scialys_mqtt_bridge diff --git a/host/scialys_mqtt_bridge/Makefile b/host/scialys_mqtt_bridge/Makefile new file mode 100644 index 0000000..6c2b5aa --- /dev/null +++ b/host/scialys_mqtt_bridge/Makefile @@ -0,0 +1,32 @@ +CROSS_COMPILE ?= aarch64-linux-gnu- +CC = $(CROSS_COMPILE)gcc + +CFLAGS = -Wall -O2 -Wextra -DHOST_HAS_STDINT -DHOST_SIDE + +EXEC = scialys_mqtt_bridge + +all: $(EXEC) + + +OBJDIR = objs +SRC = $(shell find . -name \*.c) +OBJS = ${SRC:%.c=${OBJDIR}/%.o} +INCLUDES = includes/ + +${OBJDIR}/%.o: %.c + @mkdir -p $(dir $@) + @echo "-- compiling" $< + @$(CC) -MMD -MP -MF ${OBJDIR}/$*.d $(CFLAGS) $< -c -o $@ -I$(INCLUDES) + +$(EXEC): $(OBJS) + @echo "Linking $@ ..." + @$(CC) $(LDFLAGS) -o $@ $(OBJS) + @echo Done. + + +clean: + find ${OBJDIR} -name "*.o" -exec rm {} \; + find ${OBJDIR} -name "*.d" -exec rm {} \; + +mrproper: clean + rm -f $(EXEC) diff --git a/host/scialys_mqtt_bridge/README b/host/scialys_mqtt_bridge/README new file mode 100644 index 0000000..ab05aa1 --- /dev/null +++ b/host/scialys_mqtt_bridge/README @@ -0,0 +1,4 @@ +Host app for Scialys Linux extention board : + +Decode scialys frames and send relevent information to MQTT brocker + diff --git a/host/scialys_mqtt_bridge/handlers.c b/host/scialys_mqtt_bridge/handlers.c new file mode 100644 index 0000000..3466e8c --- /dev/null +++ b/host/scialys_mqtt_bridge/handlers.c @@ -0,0 +1,166 @@ +/**************************************************************************** + * Host app for drzelec : + * Receive and decode data from Scialys module and send data to MQTT brocker + * + * + * Copyright 2024 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "scialys_uart_comm.h" +#include "mqtt_comm.h" + +#define DATE_BUF_SIZE 128 + + +int scialys_info_send(struct mqtt_info* mqtti, struct sc_module* sc) +{ + int ret = 0; + if (strncmp(sc->sc_info.module_version, sc->sc_info_old.module_version, MAX_VLEN) != 0) { + mqtt_publish(mqtti, "info/scialys/module_version", (uint8_t*)(sc->sc_info.module_version), + strnlen(sc->sc_info.module_version, MAX_VLEN)); + } + if (strncmp(sc->sc_info.soft_version, sc->sc_info_old.soft_version, MAX_VLEN) != 0) { + mqtt_publish(mqtti, "info/scialys/soft_version", (uint8_t*)(sc->sc_info.soft_version), + strnlen(sc->sc_info.soft_version, MAX_VLEN)); + } + /* Copy new Scialys info to old one */ + memcpy(&sc->sc_info_old, &sc->sc_info, sizeof(struct scialys_info)); + + return ret; +} + + +#define SC_CONF_CHECK_AND_PUBLISH(field, print_msg, unit) \ + if (sc->sc_conf.field != sc->sc_conf_old.field) { \ + mqtt_publish(mqtti, "data/scialys/conf/"#field, (uint8_t*)&(sc->sc_conf.field), sizeof(sc->sc_conf.field)); \ + printf(" " print_msg ": %d " unit "\n", sc->sc_conf.field); \ + } + +int scialys_config_send(struct mqtt_info* mqtti, struct sc_module* sc) +{ + int ret = 0; + + printf(" List of changes :\n"); + + SC_CONF_CHECK_AND_PUBLISH(grid_power_limit, "Grid power limit", "kW"); + SC_CONF_CHECK_AND_PUBLISH(source_power_max, "Source power limit", "W"); + SC_CONF_CHECK_AND_PUBLISH(source_has_power_ok, "Source power OK value", "W"); + SC_CONF_CHECK_AND_PUBLISH(load_power_max, "Load power max usage", "W"); + SC_CONF_CHECK_AND_PUBLISH(conf_max_temp, "Maximum heating temperature", "°C"); + SC_CONF_CHECK_AND_PUBLISH(enter_forced_mode_temp, "Enter forced mode temperature", "°C"); + SC_CONF_CHECK_AND_PUBLISH(auto_forced_mode_value, "Auto forced mode command value", "%%"); + SC_CONF_CHECK_AND_PUBLISH(auto_forced_target_heater_temp, "Auto forced target heater temp", "°C"); + SC_CONF_CHECK_AND_PUBLISH(auto_forced_heater_delay, "Auto forced heater delay", "hours"); + SC_CONF_CHECK_AND_PUBLISH(auto_forced_heater_duration, "Auto forced duration", "hours"); + SC_CONF_CHECK_AND_PUBLISH(auto_force_type, "Auto forced target type", ""); /* Off, Min, Max, Timer, Target */ + SC_CONF_CHECK_AND_PUBLISH(manual_forced_mode_value, "Manual forced mode command value", "%%"); + SC_CONF_CHECK_AND_PUBLISH(manual_target_heater_temp, "Manual forced target heater temp", "°C"); + SC_CONF_CHECK_AND_PUBLISH(manual_activation_duration, "Manual forced duration", "hours"); + SC_CONF_CHECK_AND_PUBLISH(manual_force_type, "Manual forced type", ""); /* Off, Min, Max, Timer, Target */ + SC_CONF_CHECK_AND_PUBLISH(load_type, "Load type", ""); /* LOAD_TYPE_AC_RES, LOAD_TYPE_AC_IND or LOAD_TYPE_DC */ + SC_CONF_CHECK_AND_PUBLISH(never_force, "Never force", ""); /* 0 or 1 */ + + /* Copy new Scialys info to old one */ + memcpy(&sc->sc_conf_old, &sc->sc_conf, sizeof(struct scialys_config)); + + return ret; +} + + +#define SC_DATA_CHECK_AND_PUBLISH(field) \ + if (sc->sc_data.field != sc->sc_data_old.field) { \ + mqtt_publish(mqtti, "data/scialys/data/"#field, (uint8_t*)&(sc->sc_data.field), sizeof(sc->sc_data.field)); \ + } +int scialys_data_send(struct mqtt_info* mqtti, struct sc_module* sc) +{ + int ret = 0; + + /* Publish full data first, for easier debug of Scialys behavior */ + mqtt_publish(mqtti, "data/scialys/full", (uint8_t*)&(sc->sc_data), sizeof(struct scialys_data)); + /* Publish all data separatly */ + SC_DATA_CHECK_AND_PUBLISH(solar_prod_value); + SC_DATA_CHECK_AND_PUBLISH(home_conso_value); + SC_DATA_CHECK_AND_PUBLISH(water_centi_degrees); + SC_DATA_CHECK_AND_PUBLISH(deci_degrees_power); + SC_DATA_CHECK_AND_PUBLISH(deci_degrees_disp); + SC_DATA_CHECK_AND_PUBLISH(load_power_lowest); + SC_DATA_CHECK_AND_PUBLISH(load_power_highest); + SC_DATA_CHECK_AND_PUBLISH(command_val); + SC_DATA_CHECK_AND_PUBLISH(act_cmd); + SC_DATA_CHECK_AND_PUBLISH(mode); + SC_DATA_CHECK_AND_PUBLISH(flags); + /* And re-publish flags separately */ + /* FIXME */ + + /* Copy new Scialys data to old one */ + memcpy(&sc->sc_data_old, &sc->sc_data, sizeof(struct scialys_data)); + + return ret; +} + + +int handle_module_data(struct sc_module* sc_mod, struct mqtt_info* mqtti, int pkt_type) +{ + char date_str[DATE_BUF_SIZE]; + time_t now = time(NULL); + + if (pkt_type != TYPE_DATA) { + ctime_r(&now, date_str); + date_str[strlen(date_str) - 1] = '\0'; /* Remove trailing \n */ + } + + switch (pkt_type) { + case TYPE_CONFIG: + printf("--- %s : ", date_str); + printf(" Updated config\n"); + scialys_config_send(mqtti, sc_mod); + break; + case TYPE_INFO: + printf("--- %s : ", date_str); + printf(" Received info : %s - %s\n", sc_mod->sc_info.module_version, sc_mod->sc_info.soft_version); + scialys_info_send(mqtti, sc_mod); + break; + case TYPE_DATA: + scialys_data_send(mqtti, sc_mod); + break; + } + return 0; +} + +int handle_udp_request(struct sc_module* scialys, char* buf, int len, struct sockaddr* addr, socklen_t addr_len) +{ + return 0; +} + +int mqtt_handle_rxpub_request(struct sc_module* scialys, struct mqtt_pub* publi) +{ + return 0; +} diff --git a/host/scialys_mqtt_bridge/handlers.h b/host/scialys_mqtt_bridge/handlers.h new file mode 100644 index 0000000..aaf2717 --- /dev/null +++ b/host/scialys_mqtt_bridge/handlers.h @@ -0,0 +1,42 @@ +/**************************************************************************** + * Host app for drzelec : Receive and decode data from Scialys module + * + * + * Copyright 2024 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + +#ifndef HANDLERS_H +#define HANDLERS_H + +#include +#include +#include "mqtt_comm.h" + + +int handle_module_data(struct sc_module* scialys, struct mqtt_info* mqtti, int pkt_type); + + +int handle_udp_request(struct sc_module* scialys, char* buf, int len, struct sockaddr* addr, socklen_t addr_len); + + +int mqtt_handle_rxpub_request(struct sc_module* scialys, struct mqtt_pub* publi); + + + +#endif /* HANDLERS_H */ diff --git a/host/scialys_mqtt_bridge/list.h b/host/scialys_mqtt_bridge/list.h new file mode 100644 index 0000000..c584086 --- /dev/null +++ b/host/scialys_mqtt_bridge/list.h @@ -0,0 +1,231 @@ +/* + * list.h + * + * This code is from the linux kernel. It is a very simple and powerfull doubly + * linked list implementation. + * Not everything has been taken from the original file. + */ + +#ifndef _LINUX_LIST_H +#define _LINUX_LIST_H + + +/* linux/kernel.h */ + +/** + * container_of - cast a member of a structure out to the containing structure + * @ptr: the pointer to the member. + * @type: the type of the container struct this is embedded in. + * @member: the name of the member within the struct. + * + */ +#define container_of(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - offsetof(type,member) );}) + + + +/* linux/stddef.h */ + +#ifdef __compiler_offsetof +#define offsetof(TYPE,MEMBER) __compiler_offsetof(TYPE,MEMBER) +#else +#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) +#endif + + + +/* linux/list.h */ + +/* + * Simple doubly linked list implementation. + * + * Some of the internal functions ("__xxx") are useful when + * manipulating whole lists rather than single entries, as + * sometimes we already know the next/prev entries and we can + * generate better code by using them directly rather than + * using the generic single-entry routines. + */ + +struct list_head { + struct list_head *next, *prev; +}; + +/* + * These are non-NULL pointers that will result in page faults + * under normal circumstances, used to verify that nobody uses + * non-initialized list entries. + */ +#define LIST_POISON1 ((void *) 0x00100100) +#define LIST_POISON2 ((void *) 0x00200200) + + +#define LIST_HEAD_INIT(name) { &(name), &(name) } + +#define LIST_HEAD(name) \ + struct list_head name = LIST_HEAD_INIT(name) + +static inline void INIT_LIST_HEAD(struct list_head *list) +{ + list->next = list; + list->prev = list; +} + +/* + * Insert a new entry between two known consecutive entries. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_add(struct list_head *new, + struct list_head *prev, + struct list_head *next) +{ + next->prev = new; + new->next = next; + new->prev = prev; + prev->next = new; +} + +/** + * list_add - add a new entry + * @new: new entry to be added + * @head: list head to add it after + * + * Insert a new entry after the specified head. + * This is good for implementing stacks. + */ +static inline void list_add(struct list_head *new, struct list_head *head) +{ + __list_add(new, head, head->next); +} + + +/** + * list_add_tail - add a new entry + * @new: new entry to be added + * @head: list head to add it before + * + * Insert a new entry before the specified head. + * This is useful for implementing queues. + */ +static inline void list_add_tail(struct list_head *new, struct list_head *head) +{ + __list_add(new, head->prev, head); +} + +/* + * Delete a list entry by making the prev/next entries + * point to each other. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_del(struct list_head * prev, struct list_head * next) +{ + next->prev = prev; + prev->next = next; +} + +/** + * list_del - deletes entry from list. + * @entry: the element to delete from the list. + * Note: list_empty() on entry does not return true after this, the entry is + * in an undefined state. + */ +static inline void __list_del_entry(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); +} + +static inline void list_del(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + entry->next = LIST_POISON1; + entry->prev = LIST_POISON2; +} + +/** + * list_is_last - tests whether @list is the last entry in list @head + * @list: the entry to test + * @head: the head of the list + */ +static inline int list_is_last(const struct list_head *list, + const struct list_head *head) +{ + return list->next == head; +} + +/** + * list_empty - tests whether a list is empty + * @head: the list to test. + */ +static inline int list_empty(const struct list_head *head) +{ + return head->next == head; +} + +/** + * list_entry - get the struct for this entry + * @ptr: the struct list_head pointer. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + */ +#define list_entry(ptr, type, member) \ + container_of(ptr, type, member) + +/** + * list_first_entry - get the first element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +/** + * list_last_entry - get the last element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_last_entry(ptr, type, member) \ + list_entry((ptr)->prev, type, member) + +/** + * list_next_entry - get the next element in list + * @pos: the type * to cursor + * @member: the name of the list_struct within the struct. + */ +#define list_next_entry(pos, member) \ + list_entry((pos)->member.next, typeof(*(pos)), member) + +/** + * list_for_each_entry - iterate over list of given type + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry(pos, head, member) \ + for (pos = list_first_entry(head, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_next_entry(pos, member)) + +/** + * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry + * @pos: the type * to use as a loop cursor. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe(pos, n, head, member) \ + for (pos = list_first_entry(head, typeof(*pos), member), \ + n = list_next_entry(pos, member); \ + &pos->member != (head); \ + pos = n, n = list_next_entry(n, member)) + +#endif /* _LINUX_LIST_H */ diff --git a/host/scialys_mqtt_bridge/main.c b/host/scialys_mqtt_bridge/main.c new file mode 100644 index 0000000..4bce55f --- /dev/null +++ b/host/scialys_mqtt_bridge/main.c @@ -0,0 +1,330 @@ +/**************************************************************************** + * Host app for drzelec : + * Receive and decode data from Scialys module and send data to MQTT brocker + * + * + * Copyright 2024 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY ; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "serial_utils.h" +#include "sock_utils.h" +#include "scialys_uart_comm.h" +#include "handlers.h" +#include "mqtt_comm.h" + +#undef FULL_DEBUG + +#define BUF_SIZE 1024 + +#define PROG_NAME "Scialys info to MQTT bridge" +#define VERSION "0.1" + + +void help(char *prog_name) +{ + fprintf(stderr, "---------------- "PROG_NAME" --------------------------------\n"); + fprintf(stderr, "Usage: %s [options]\n" \ + " Available options:\n" \ + " \t -s | --mqtt_server_ip : Connect to the specified MQTT server\n" \ + " \t -p | --mqtt_server_port : Use the specified MQTT server port instead of default (1883)\n" \ + " \t -l | --listen_port : Start listening for messages on this port\n" \ + " \t -d | --device : Serial device to use for serial communication with the module\n" \ + " \t -h | --help : Print this help\n" \ + " \t -v | --version : Print programm version\n", prog_name); + fprintf(stderr, "-----------------------------------------------------------------------\n"); +} + +/* Our only subscription : We want each and every command sent to us, so + * use wilcard to get them all. */ +struct mqtt_topic topics[] = { + { .name = "action/scialys/#", .QoS = 1, }, +}; + +const struct mqtt_sub_pkt mqtt_sub = { + .packet_id = 1, /* We send only one subscription packet, and it's always the first one. */ + .nb_topics = 1, + .topics = topics, +}; + +/* This is our static connect data */ +const struct mqtt_connect_pkt mqtt_conn = { + .keep_alive_seconds = 600, + .clean_session_flag = MQTT_SESSION_NEW, /* or MQTT_SESSION_RESUME */ + .client_id = "scialys", + .will_topic = { .name = "will/scialys/presence", .QoS = MQTT_QoS_0 }, + .will_retain = 0, + .will_msg = "Scialys MQTT Bridge Error", + .username = "home", + .password = "pass", +}; + + +int main(int argc, char* argv[]) +{ + /* Serial */ + char* device = NULL; + struct sc_module sc_mod; + /* MQTT server */ + struct mqtt_info mqtti = { + .conn = &mqtt_conn, + .sub = &mqtt_sub, + .server_ip = NULL, + .server_port = 1883, + .socket = -1, + .txbuf = NULL, + .rxbuf = NULL, + .rx_idx = 0, + .comm_state = MQTT_UNCONNECTED, + .curr_rx_payload_len = 0, + }; + /* TCP Server */ + int port = -1; + int server_socket = -1; + /* For select */ + fd_set read_fds; + int max_fd = 0; + + while(1) { + int option_index = 0; + int c = 0; + + struct option long_options[] = { + {"mqtt_server_ip", required_argument, 0, 's'}, + {"mqtt_server_port", required_argument, 0, 'p'}, + {"listen_port", required_argument, 0, 'l'}, + {"device", required_argument, 0, 'd'}, + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'v'}, + {0, 0, 0, 0} + }; + + c = getopt_long(argc, argv, "s:p:l:d:hv", long_options, &option_index); + + /* no more options to parse */ + if (c == -1) break; + switch (c) { + /* s, MQTT server IP */ + case 's': + mqtti.server_ip = optarg; + break; + + /* p, mqtt server port */ + case 'p': + mqtti.server_port = strtoul(optarg, NULL, 0); + break; + + /* l, listen port */ + case 'l': + port = strtoul(optarg, NULL, 0); + break; + + /* d, device name */ + case 'd': + device = optarg; + break; + + /* v, version */ + case 'v': + printf("%s Version %s\n", PROG_NAME, VERSION); + return 0; + break; + + /* h, help */ + case 'h': + default: + help(argv[0]); + return 0; + } + } + + + /* Some parameters are mandatory */ + if ((mqtti.server_ip == NULL) || (port == -1) || (device == NULL)) { + printf("Error, need all of MQTT server IP, serial (tty) device and local IP port number\n"); + help(argv[0]); + return -1; + } + + /* Open tty */ + memset(&sc_mod, 0, sizeof(struct sc_module)); + sc_mod.fd = serial_setup(device); + if (sc_mod.fd < 0) { + printf("Unable to open specified serial port %s\n", device); + return -2; + } + + /* Create UDP server socket */ + server_socket = create_bound_udp_socket(port, NULL); + if (server_socket <= 0) { + printf("Unable to open the server UDP socket on port %d\n", port); + return -3; + } + + /* Connect to MQTT server */ + if (mqtt_init(&mqtti) != 0) { + printf("Unable to connect and initialise communication with MQTT server.\n"); + return -4; + } + INIT_LIST_HEAD(&(mqtti.pub_tx_list)); + INIT_LIST_HEAD(&(mqtti.pub_rx_list)); + INIT_LIST_HEAD(&(mqtti.sub_list)); + + /* ************************************************* */ + /* Initial FD set setup */ + FD_ZERO(&read_fds); + FD_SET(STDIN_FILENO, &read_fds); /* Add stdin */ + FD_SET(sc_mod.fd, &read_fds); /* Serial link */ + FD_SET(server_socket, &read_fds); /* New connexions */ + FD_SET(mqtti.socket, &read_fds); /* MQTT Server */ + max_fd = mqtti.socket + 1; /* No close, this one is the last open, so it's the higest */ + + + /* ************************************************* */ + /* And never stop handling data ! */ + while (1) { + int nb = 0, len = 0, ret = 0; + fd_set tmp_read_fds = read_fds; + char buf[BUF_SIZE]; + + /* select() call .... be kind to other processes */ + nb = select(max_fd, &tmp_read_fds, NULL, NULL, NULL); + /* Errors here are bad ones .. exit ?? */ + if (nb < 0) { + perror ("select failed"); + printf("Error: select failed, this is critical.\n"); + return -10; + } + + /* Data from Ethernet side on UDP server socket */ + if (FD_ISSET(server_socket, &tmp_read_fds)) { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(struct sockaddr_in); + + /* Receive the new data */ + memset(buf, 0, BUF_SIZE); + len = recvfrom(server_socket, buf, BUF_SIZE, 0, (struct sockaddr *)&addr, &addr_len); + if (len > 0) { + handle_udp_request(&sc_mod, buf, len, (struct sockaddr *)&addr, addr_len); + } else { + /* Wait .. we received something but nothing came in ? */ + perror("UDP receive error"); + printf("\nError on UDP packet reception (ret: %d)\n", len); + } + } + + /* Data from MQTT socket */ + if (FD_ISSET(mqtti.socket, &tmp_read_fds)) { + int ret = mqtt_handle_data(&mqtti); + if (ret != 0) { + FD_CLR(mqtti.socket, &read_fds); + ret = mqtt_init(&mqtti); + if (ret != 0) { + printf("Unable to connect and initialise new communication with MQTT server.\n"); + break; /* Exit infinite loop */ + } + FD_SET(mqtti.socket, &read_fds); + } + } + + /* Read user input if any */ + if (FD_ISSET(STDIN_FILENO, &tmp_read_fds)) { + memset(buf, 0, BUF_SIZE); + len = read(STDIN_FILENO, buf, BUF_SIZE); + /* Do not know how to handle it yet, nothing defined. */ + } + + /* Handle module messages */ + if (FD_ISSET(sc_mod.fd, &tmp_read_fds)) { + int idx = 0; + memset(buf, 0, BUF_SIZE); + /* Get serial data and try to build a packet */ + len = read(sc_mod.fd, buf, BUF_SIZE); + if (len < 0) { + perror("serial read error"); + /* FIXME : handle errors */ + } else if (len == 0) { + /* EOF (End Of File) : close and wait for device to come back ... */ + printf("\nError : Serial link disapeared (EOF) ... Waiting for it to comme back ...\n"); + /* FIXME : Report error to MQTT brocker */ + FD_CLR(sc_mod.fd, &read_fds); + close(sc_mod.fd); + sc_mod.fd = -1; + while (sc_mod.fd < 0) { + sc_mod.fd = serial_setup(device); + usleep(2000); + } + FD_SET(sc_mod.fd, &read_fds); /* Serial link */ + } + while (idx < len) { + ret = protocol_decode(buf[idx], &sc_mod); + /* Check return code to know if we have a valid packet */ + if (ret == -1) { +#ifdef FULL_DEBUG + /* Anything that's not part of a packet is printed as is (debug output) */ + printf("%c", buf[idx]); +#endif + } else if (ret < 0) { + printf("\nError in received packet. (ret: %d)\n", ret); + /* FIXME : dump packet for debugging */ + } else if (ret == 0) { + /* Packet is being built */ + } else { + /* Valid packet received */ + handle_module_data(&sc_mod, &mqtti, ret); + } + idx++; + } + } + + /* Handle MQTT Publish packets */ + if (! list_empty(&(mqtti.pub_rx_list))) { + struct mqtt_pub* publi = NULL, * tmp_publi = NULL; + list_for_each_entry_safe(publi, tmp_publi, &mqtti.pub_rx_list, list) { + /* Handle the packet */ + printf("Received MQTT publish for topic %s\n", publi->pkt.topic.name); + /* FIXME : check that topic is for us */ + /* FIXME : Handle the packet (send the right packet to sc_mod) */ + /* We are done with the packet, acknowledge and remove it from list */ + mqtt_ack_publish(&mqtti, publi); + } + } + + } /* End of infinite loop */ + + close(sc_mod.fd); + close(server_socket); + return 0; +} + + diff --git a/host/scialys_mqtt_bridge/mqtt.c b/host/scialys_mqtt_bridge/mqtt.c new file mode 100644 index 0000000..86f037a --- /dev/null +++ b/host/scialys_mqtt_bridge/mqtt.c @@ -0,0 +1,616 @@ +/**************************************************************************** + * Host app for drzportail + * + * + * Copyright 2020 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +#include +#include +#include +#include + +#include "mqtt.h" + +/* + * MQTT client implementation - helpers for cimmunication part + * + * For protocol defiition, refer to + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html + * + * This implementation has a limitation on the subscription mechanism : the protocol allows for + * multiple subscriptions / unsubscriptions using a single subscribe packet, which is not supported + * by this code for code simplicity, memory usage limitation, and packets size limitation. + * All other features of the protocol should be fully supported. + * + * This code is the implementation of the packets part of MQTT communication. + * The MQTT protocol requires a lossless, ordered transport protocol layer with an address mechanism + * which is not part of the code found in mqtt.c and mqtt.h, making it possible to use any underlying + * communication layer which fulfills the above requirements. + * + * This code only handles the packet encoding and decoding according to the MQTT specification and + * provides usage information in order to respect the protocol flow, but does not provide the communication + * parts of the protocol, which should be application specific. + * + */ + + + +/***************************************************************************** */ +/* MQTT helper functions */ + +/* Encode remaining length + * Remaining length is encoded on 1 to four bytes. The least significant seven bits of each byte encode the + * data, and the most significant bit is used to indicate that there are following bytes in the representation. + * The size is encoded in little endianness (least significant 7 bits on first byte and so on). + * Refer to section 2.2.3 of mqtt-v3.1.1-os for translation of remaining_length on wire. + */ +static int encode_remaining_length(uint8_t* buf, uint32_t length) +{ + int idx = 0; + + do { + buf[idx] = length & 0x7F; + length >>= 7; + if (length > 0) { + buf[idx] |= 0x80; + } + idx++; + } while (length > 0); + + return idx; +} +/* Decode remaining length. + * Return the number of length bytes. length value is updated. + */ +int decode_remaining_length(uint8_t* buf, uint32_t* length) +{ + int idx = 0; + + do { + *length += (buf[idx] & 0x7F) << (7 * idx); + } while ((buf[idx++] & 0x80) && (idx <= 3)); + + return idx; +} + + +/* Pack a string to MQTT strings format. + * The string must already be UTF8 encoded. + */ +static int mqtt_pack_str(uint8_t *buf, char* str, uint16_t len) +{ + uint16_t tmp = htons(len); + memcpy(buf, &tmp, 2); + memcpy((buf + 2), str, len); + return len + 2; +} + +/* Pack fixed header */ +static int mqtt_pack_fixed_header(uint8_t *buf, uint8_t control, uint8_t flags, uint32_t length) +{ + *buf = (control << 4) | flags; + return 1 + encode_remaining_length((buf + 1), length); +} + + + +/***************************************************************************** */ +/* Create MQTT connect packet + * This function must be called in order to create the connect MQTT packet used to + * connect to the server. + */ +int mqtt_pack_connect_packet(const struct mqtt_connect_pkt* pkt, uint8_t* buf, uint32_t buf_size) +{ + struct mqtt_connect_pkt_fixed_payload fxpl; + uint32_t remaining_length = 0; + uint32_t len = 0; + uint16_t client_id_len = 0, will_topic_len = 0, will_msg_size = 0; + uint16_t username_len = 0, password_len = 0; + + if ((pkt == NULL) || (buf == NULL)) { + return -EINVAL; + } + /* Fixed payload part */ + memset(&fxpl, 0, sizeof(struct mqtt_connect_pkt_fixed_payload)); + fxpl.proto_name_len = htons(4); + memcpy(&(fxpl.protocol_name), MQTT_PROTOCOL_NAME, 4); + fxpl.protocol_level = MQTT_PROTOCOL_LEVEL; + fxpl.keep_alive_seconds = htons(pkt->keep_alive_seconds); + remaining_length = sizeof(struct mqtt_connect_pkt_fixed_payload); + + /* Client id is (almost) mandatory */ + if (pkt->client_id != NULL) { + client_id_len = strlen(pkt->client_id); + /* Update packet payload size */ + remaining_length += client_id_len + 2; + } else { + return -EINVAL; + } + + /* Set connect flags and compute message size for optionnal parts */ + if (pkt->clean_session_flag) { + fxpl.connect_flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; + } + /* Optionnal will message */ + if ((pkt->will_topic.name != NULL) && (pkt->will_msg != NULL)) { + fxpl.connect_flags |= MQTT_CONNECT_FLAG_WILL_FLAG; + fxpl.connect_flags |= MQTT_CONNECT_FLAG_WILL_QoS(pkt->will_topic.QoS); + if (pkt->will_retain != 0) { + fxpl.connect_flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; + } + will_topic_len = strlen(pkt->will_topic.name); + will_msg_size = strlen(pkt->will_msg); + /* Update packet payload size */ + remaining_length += will_topic_len + 2; + remaining_length += will_msg_size + 2; + } + /* Optionnal username and password */ + if (pkt->username != NULL) { + username_len = strlen(pkt->username); + fxpl.connect_flags |= MQTT_CONNECT_FLAG_USER_NAME; + /* Update packet payload size */ + remaining_length += username_len + 2; + if (pkt->password != NULL) { + password_len = strlen(pkt->password); + fxpl.connect_flags |= MQTT_CONNECT_FLAG_PASSWORD; + /* Update packet payload size */ + remaining_length += password_len + 2; + } + } + + /* Build MQTT Connect packet */ + /* Fixed header */ + len = mqtt_pack_fixed_header(buf, MQTT_CONTROL_CONNECT, 0, remaining_length); + if (remaining_length + len > buf_size) { + return -E2BIG; + } + /* Fixed payload */ + memcpy((buf + len), &fxpl, sizeof(struct mqtt_connect_pkt_fixed_payload)); + len += sizeof(struct mqtt_connect_pkt_fixed_payload); + + /* Client ID is mandatory, even if length of ID string is 0 */ + len += mqtt_pack_str((buf + len), pkt->client_id, client_id_len); + + /* Add optionnal Will message - NULL pointers checks already made above */ + if (will_msg_size != 0) { + uint16_t tmp = htons(will_msg_size); + len += mqtt_pack_str((buf + len), pkt->will_topic.name, will_topic_len); + memcpy((buf + len), &tmp, 2); + memcpy((buf + len + 2), pkt->will_msg, will_msg_size); + len += will_msg_size + 2; + } + /* Add optionnal username and password */ + if (pkt->username != NULL) { + len += mqtt_pack_str((buf + len), pkt->username, username_len); + /* Add password too ? */ + if (pkt->password != NULL) { + len += mqtt_pack_str((buf + len), pkt->password, password_len); + } + } + + return len; +} + + +/***************************************************************************** */ +/* Check MQTT connack packet + * This function may get called to check a supposed connect acknowledge packet. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid, + * regardless of the retrun code value. + */ +int mqtt_check_connack_reply_pkt(const struct mqtt_connack_reply_pkt* pkt) +{ + if (pkt == NULL) { + return -EINVAL; + } + if (pkt->control != (MQTT_CONTROL_CONNACK << 4)) { + return -EPROTO; + } + if (pkt->rem_len != 2) { + return -EPROTO; + } + if (pkt->flags & 0xFE) { + return -EPROTO; + } + if (pkt->ret_code >= MQTT_CONNACK_MAX) { + return -EPROTO; + } + if (pkt->ret_code != 0) { + return -EREMOTEIO; + } + return 0; +} + + + +/***************************************************************************** */ +/* Create MQTT publish packet + * This function must be called in order to create a publish MQTT packet used to + * publish data on a topic (send data to the server). + */ +int mqtt_pack_publish_packet(const struct mqtt_publish_pkt* pkt, uint8_t* buf, uint32_t buf_size) +{ + uint32_t remaining_length = 0; + uint32_t len = 0; + uint16_t topic_len = 0; + uint16_t tmp_packet_id = 0; + uint8_t publish_flags = 0; + + if ((pkt == NULL) || (buf == NULL)) { + return -EINVAL; + } + /* Check packet consistency */ + if ((pkt->message_size != 0) && (pkt->application_message == NULL)) { + return -ENODATA; + } + /* Compute message size + * There is no "message size" field in the publish packet, as the message size can be computed by + * substracting the topic string size and packet id size from the packet "remaining length" field. + */ + remaining_length = pkt->message_size; + /* Packet ID is 2 bytes long. If QoS is 0, then the packet must not include a packet ID */ + if (pkt->topic.QoS != 0) { + remaining_length += 2; + } + /* Topic is mandatory */ + if (pkt->topic.name == NULL) { + return -EINVAL; + } + topic_len = strlen(pkt->topic.name); + /* Update packet payload size */ + remaining_length += topic_len + 2; + + /* Set publish flags */ + publish_flags = MQTT_PUBLISH_QoS(pkt->topic.QoS); + if (pkt->dup_flag) { + publish_flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; + } + if (pkt->retain_flag) { + publish_flags |= MQTT_PUBLISH_RETAIN; + } + + /* Build MQTT Publish packet */ + /* Fixed header */ + len = mqtt_pack_fixed_header(buf, MQTT_CONTROL_PUBLISH, publish_flags, remaining_length); + if (remaining_length + len > buf_size) { + return -E2BIG; + } + /* Topic is mandatory */ + len += mqtt_pack_str((buf + len), pkt->topic.name, topic_len); + /* Packet ID */ + if (pkt->topic.QoS != 0) { + tmp_packet_id = htons(pkt->packet_id); + memcpy((buf + len), &tmp_packet_id, 2); + len += 2; + } + /* Add optionnal application message */ + if (pkt->message_size != 0) { + memcpy((buf + len), pkt->application_message, pkt->message_size); + len += pkt->message_size; + } + + return len; +} + + +/***************************************************************************** */ +/* Unpack MQTT publish packet + * This function must be called in order to transform a received publish MQTT packet to a + * mqtt_publish_pkt structure. + * The function also checks the validity of the packet. + * All returned pointers within the struct will point to parts of the provided buffer, so the buffer + * must not be discarded after the call. + * if the return value is positive, it is the topic string length. + */ +int mqtt_unpack_publish_packet(struct mqtt_publish_pkt* pkt, uint8_t* buf, uint32_t size) +{ + int ret = 0; + uint32_t idx = 0, pkt_len = 0; + uint16_t tmp = 0, topic_len = 0; + if ((pkt == NULL) || (buf == NULL)) { + return -EINVAL; + } + /* Check type */ + if ((buf[0] & 0xF0) != (MQTT_CONTROL_PUBLISH << 4)) { + return -EPROTO; + } + /* Check packet size */ + ret = decode_remaining_length((buf + 1), &pkt_len); + if ((ret > 4) || ((ret + pkt_len + 1) != size)) { + return -EPROTO; + } + idx = 1 + ret; + /* Decode flags */ + pkt->topic.QoS = (buf[0] >> 1) & 0x03; + pkt->dup_flag = (buf[0] & MQTT_PUBLISH_DUP); + pkt->retain_flag = (buf[0] & MQTT_PUBLISH_RETAIN); + if (pkt->topic.QoS > 2) { + return -EPROTO; + } + /* Decode topic string */ + memcpy(&tmp, (buf + idx), 2); + topic_len = ntohs(tmp); + pkt->topic.name = (char*)(buf + idx + 2); + idx += topic_len + 2; + /* Does it fit in the remaining length ? */ + tmp = idx; + if (pkt->topic.QoS != 0) { + tmp += 2; + } + if (tmp > size) { + return -EPROTO; + } + /* Decode packet ID */ + if (pkt->topic.QoS != 0) { + memcpy(&tmp, (buf + idx), 2); + pkt->packet_id = ntohs(tmp); + idx += 2; + } + /* Get application message */ + pkt->message_size = size - idx; + if (pkt->message_size != 0) { + pkt->application_message = (buf + idx); + } + return topic_len; +} + +/***************************************************************************** */ +/* Build MQTT puback, pubrec, pubrel or pubcomp packet, used in the publish acknowledge one-way or + * two-way hand-check mechanism. + */ +int mqtt_pack_publish_reply_pkt(uint8_t* buf, uint16_t acked_pkt_id, uint8_t type) +{ + uint16_t tmp_acked_pkt_id = 0; + if (buf == NULL) { + return -EINVAL; + } + buf[0] = (type << 4); + if (type == MQTT_CONTROL_PUBREL) { + buf[0] |= MQTT_PUBREL_FLAG; + } + buf[1] = 0x02; + tmp_acked_pkt_id = htons(acked_pkt_id); + memcpy((buf + 2), &tmp_acked_pkt_id, 2); + + return 4; +} + +/***************************************************************************** */ +/* Check MQTT puback, pubrec, pubrel or pubcomp packet + * This function may get called to check a supposed publish acknowledge packet in either one-way or + * two-way hand-check mechanism. + */ +int mqtt_check_publish_reply_pkt(struct mqtt_publish_reply_pkt* pkt, uint8_t type) +{ + uint8_t awaited_control = 0; + + if (pkt == NULL) { + return -EINVAL; + } + /* Check control byte */ + awaited_control = (type << 4); + /* The pubrel packet must also have a "pubrel" flag ... the protocol provides no indication about the + * reason behind this */ + if (type == MQTT_CONTROL_PUBREL) { + awaited_control |= MQTT_PUBREL_FLAG; + } + if (pkt->control != awaited_control) { + return -EPROTO; + } + /* Check size */ + if (pkt->rem_len != 2) { + return -EPROTO; + } + /* Valid packet */ + return 0; +} + + +/* Subscribe and unsubscribe packets only differ by the addition of the QoS byte after each topic. */ +static int mqtt_pack_sub_unsub(const struct mqtt_sub_pkt* pkt, + uint8_t* buf, uint32_t buf_size, uint8_t type) +{ + uint32_t remaining_length = 0; + uint32_t len = 0, i = 0; + uint16_t topic_len = 0; + uint16_t tmp_packet_id = 0; + + if ((pkt == NULL) || (buf == NULL)) { + return -EINVAL; + } + if ((type != MQTT_CONTROL_SUBSCRIBE) && (type != MQTT_CONTROL_UNSUBSCRIBE)) { + return -EINVAL; + } + /* Check packet consistency */ + if ((pkt->nb_topics == 0) || (pkt->topics == NULL)) { + return -ENODATA; + } + /* Limit the number of topics to 125 in orer to keep the variable length field on one byte in + * the subscribe acknowledge packet. */ + if (pkt->nb_topics > 125) { + return -EINVAL; + } + /* Compute message size + * Packet ID is 2 bytes long. */ + remaining_length = 2; + for (i = 0; i < pkt->nb_topics; i++) { + if (pkt->topics[i].name == NULL) { + return -EINVAL; + } + topic_len = strlen(pkt->topics[i].name); + /* Update packet payload size. */ + remaining_length += topic_len + 2; + if (type == MQTT_CONTROL_SUBSCRIBE) { + /* Add one for the associated QoS for each topic */ + remaining_length += 1; + } + } + + /* Build MQTT Subscribe or Unsubscribe packet */ + /* Fixed header */ + len = mqtt_pack_fixed_header(buf, type, MQTT_SUBSCRIBE_FLAG, remaining_length); + if (remaining_length + len > buf_size) { + return -E2BIG; + } + /* Packet ID */ + tmp_packet_id = htons(pkt->packet_id); + memcpy((buf + len), &tmp_packet_id, 2); + len += 2; + /* Topic(s) */ + for (i = 0; i < pkt->nb_topics; i++) { + topic_len = strlen(pkt->topics[i].name); + len += mqtt_pack_str((buf + len), pkt->topics[i].name, topic_len); + if (type == MQTT_CONTROL_SUBSCRIBE) { + /* Add the associated QoS */ + buf[len++] = pkt->topics[i].QoS & 0x03; + } + } + + return len; +} + +/***************************************************************************** */ +/* Build MQTT subscribe packet + * This function must be called in order to create a subscribe MQTT packet used to + * subscibe on a topic (or multiple topics) in order to receive data published on this + * or these topics. + * We limit the number of subscriptions sent at once to 125 in order to get a fixed size + * subscription acknoledgement packet. + */ +int mqtt_pack_subscribe_pkt(const struct mqtt_sub_pkt* pkt, uint8_t* buf, uint32_t buf_size) +{ + return mqtt_pack_sub_unsub(pkt, buf, buf_size, MQTT_CONTROL_SUBSCRIBE); +} + +/***************************************************************************** */ +/* Build MQTT unsubscribe packet + * This function must be called in order to create an unsubscribe MQTT packet used to unsubscibe + * from a topic (or multiple topics) in order to stop receiving data published on this or these + * topics. + */ +int mqtt_pack_unsubscribe_pkt(const struct mqtt_sub_pkt* pkt, uint8_t* buf, uint32_t buf_size) +{ + return mqtt_pack_sub_unsub(pkt, buf, buf_size, MQTT_CONTROL_UNSUBSCRIBE); +} + + +/***************************************************************************** */ +/* Check MQTT suback packet + * This function may get called to check a supposed subscribe acknowledge packet. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid, + * regardless of the return codes received. + * len must be the length of the full packet received, which includes the mqtt_subscribe_reply_pkt + * structure and all the return codes received. + */ +int mqtt_check_suback_reply_pkt(const struct mqtt_sub_reply_pkt* pkt, uint8_t len) +{ + int i = 0; + uint8_t* ret_codes = NULL; + if (pkt == NULL) { + return -EINVAL; + } + if (pkt->control != (MQTT_CONTROL_SUBACK << 4)) { + return -EPROTO; + } + if ((pkt->rem_len > 127) || (pkt->rem_len != (len - 2))) { + return -EPROTO; + } + ret_codes = (uint8_t*)pkt + 4; + for (i = 0; i < (pkt->rem_len - 2); i++) { + if ((ret_codes[i] != MQTT_SUBSCRIBE_FAILURE) && (ret_codes[i] > MQTT_QoS_2)) { + return -EREMOTEIO; + } + } + /* Valid packet */ + return 0; +} + + +/***************************************************************************** */ +/* Check MQTT unsuback packet + * This function may get called to check a supposed unsubscribe acknowledge packet. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid. + */ +int mqtt_check_unsuback_reply_pkt(const struct mqtt_sub_reply_pkt* pkt) +{ + if (pkt == NULL) { + return -EINVAL; + } + if (pkt->control != (MQTT_CONTROL_UNSUBACK << 4)) { + return -EPROTO; + } + if (pkt->rem_len != 2) { + return -EPROTO; + } + /* Valid packet */ + return 0; +} + + +/***************************************************************************** */ +/* Build MQTT ping packet + * This one is a fixed packet, easy. + */ +int mqtt_pack_ping_pkt(uint8_t* buf) +{ + struct mqtt_ping_pkt pkt = { + .control = (MQTT_CONTROL_PINGREQ << 4), + .rem_len = 0, + }; + if (buf == NULL) { + return -EINVAL; + } + memcpy(buf, &pkt, sizeof(struct mqtt_ping_pkt)); + return sizeof(struct mqtt_ping_pkt); +} + + +/***************************************************************************** */ +/* Check MQTT ping reply packet */ +int mqtt_check_ping_reply_pkt(const struct mqtt_ping_pkt* pkt) +{ + if (pkt == NULL) { + return -EINVAL; + } + if (pkt->control != (MQTT_CONTROL_PINGRESP << 4)) { + return -EPROTO; + } + if (pkt->rem_len != 0) { + return -EPROTO; + } + return 0; +} + + +/***************************************************************************** */ +/* Build MQTT disconnect packet + * This one is a fixed packet, easy. + */ +int mqtt_pack_disconnect_pkt(uint8_t* buf) +{ + struct mqtt_disconnect_pkt pkt = { + .control = (MQTT_CONTROL_DISCONNECT << 4), + .rem_len = 0, + }; + if (buf == NULL) { + return -EINVAL; + } + memcpy(buf, &pkt, sizeof(struct mqtt_disconnect_pkt)); + return sizeof(struct mqtt_disconnect_pkt); +} + diff --git a/host/scialys_mqtt_bridge/mqtt.h b/host/scialys_mqtt_bridge/mqtt.h new file mode 100644 index 0000000..26f80fa --- /dev/null +++ b/host/scialys_mqtt_bridge/mqtt.h @@ -0,0 +1,337 @@ +/**************************************************************************** + * Host app for drzportail + * + * + * Copyright 2020 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +/* + * MQTT client implementation. + * + * For protocol defiition, refer to + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html + * + * This code is the implementation of the packets part of MQTT communication. + * The MQTT protocol requires a lossless, ordered transport protocol layer with an address mechanism + * which is not part of the code found in mqtt.c and mqtt.h, making it possible to use any underlying + * communication layer which fulfills the above requirements. + * + * This code only handles the packet encoding and decoding according to the MQTT specification and + * provides usage information in order to respect the protocol flow, but does not provide the communication + * parts of the protocol, which should be application specific. + * This lets the application designer choose between any of the possible MQTT flow available, from single + * "in-flight" message window to a single server to multiple servers and complex message queues. + * + */ + +#ifndef MQTT_H +#define MQTT_H + +/***************************************************************************** */ + + +/* Protocol identifiers for MQTT v3.1.1. */ +#define MQTT_PROTOCOL_LEVEL 0x04 +#define MQTT_PROTOCOL_NAME "MQTT" + + +/* MQTT control packet types. */ +enum MQTT_message_types { + MQTT_CONTROL_CONNECT = 1, + MQTT_CONTROL_CONNACK, + MQTT_CONTROL_PUBLISH, + MQTT_CONTROL_PUBACK, + MQTT_CONTROL_PUBREC, + MQTT_CONTROL_PUBREL, + MQTT_CONTROL_PUBCOMP, + MQTT_CONTROL_SUBSCRIBE, + MQTT_CONTROL_SUBACK, + MQTT_CONTROL_UNSUBSCRIBE, + MQTT_CONTROL_UNSUBACK, + MQTT_CONTROL_PINGREQ, + MQTT_CONTROL_PINGRESP, + MQTT_CONTROL_DISCONNECT +}; + +/* MQTT QoS levels */ +#define MQTT_QoS_0 (0) +#define MQTT_QoS_1 (1) +#define MQTT_QoS_2 (2) + + +struct mqtt_topic { + char* name; + uint8_t QoS; /* 0, 1 or 2 */ +}; + + +/* Decode remaining length. + * Return the number of length bytes. length value is updated. + */ +int decode_remaining_length(uint8_t* buf, uint32_t* length); + +/***************************************************************************** */ +/* Connect and Connack packets */ + +/* CONNECT packet flags */ +#define MQTT_CONNECT_FLAG_CLEAN_SESSION (0x01 << 1) +#define MQTT_CONNECT_FLAG_WILL_FLAG (0x01 << 2) +#define MQTT_CONNECT_FLAG_WILL_QoS(x) (((x) & 0x03) << 3) +#define MQTT_CONNECT_FLAG_WILL_RETAIN (0x01 << 5) +#define MQTT_CONNECT_FLAG_PASSWORD (0x01 << 6) +#define MQTT_CONNECT_FLAG_USER_NAME (0x01 << 7) + +struct mqtt_connect_pkt_fixed_payload { + uint16_t proto_name_len; /* network endian */ + uint8_t protocol_name[4]; /* "MQTT" : use MQTT_PROTOCOL_NAME */ + uint8_t protocol_level; /* use MQTT_PROTOCOL_LEVEL */ + uint8_t connect_flags; + uint16_t keep_alive_seconds; /* network endian */ +}; + +#define MQTT_SESSION_RESUME 0x00 +#define MQTT_SESSION_NEW 0x01 +struct mqtt_connect_pkt { + uint16_t keep_alive_seconds; + uint8_t clean_session_flag; /* Either MQTT_SESSION_RESUME or MQTT_SESSION_NEW */ + char* client_id; + struct mqtt_topic will_topic; + uint8_t will_retain; /* 0 or 1 */ + char* will_msg; + char* username; + char* password; +}; + +/* Create MQTT connect packet + * This function must be called in order to create the connect MQTT packet used to connect to the + * server. + * The client must send a connect packet whenever the server does not acknoledge a publish, + * subscribe, unsubscribe, or ping packet, or when the server explicitely closes the connection. + * + * Caller must provide a buffer "buf" big enougth to receive the whole packet and indicate it's size + * in "buf_size". + * Returns the used buffer size (packet size) on success. + * Return value is negative on error : + * -EINVAL if either buf or pkt is NULL, + * -EINVAL if client_id string provided in mqtt_connect_pkt struct is NULL (it's length may be 0, + * but pointer cannot bu NULL). + * -E2BIG if buffer is not big enough for packet. + */ +int mqtt_pack_connect_packet(const struct mqtt_connect_pkt* pkt, uint8_t* buf, uint32_t buf_size); + + +/* Connack return codes returned in a CONNACK packet */ +enum MQTT_connack_return_codes { + MQTT_CONNACK_ACCEPTED = 0, + MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1, + MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2, + MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3, + MQTT_CONNACK_REFUSED_BAD_USER_NAME_OR_PASSWORD = 4, + MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5, + MQTT_CONNACK_MAX, +}; + +/* MQTT connack packet */ +struct mqtt_connack_reply_pkt { + uint8_t control; /* Paquet type : must be (MQTT_CONTROL_CONNACK << 4) */ + uint8_t rem_len; /* Remaining length : must be 0x02 */ + uint8_t flags; /* Connect acknowledge flags */ + uint8_t ret_code; /* Connect return code */ +} __attribute__ ((__packed__)); + +/* Connect acknowledge flags bit 0 set to 1 only if connect with Clean session flag was set to 0 and + * the server accepts the connection and has a stored session for this client id */ +#define MQTT_CONNACK_SESSION_PRESENT (0x01 << 0) + +/* Check MQTT connack packet + * This function may get called to check a supposed connect acknowledge packet. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid and + * the server accepted the connection. + * The function returns -EINVAL if pkt is NULL, -EPROTO in case of invalid connack packet or + * -EREMOTEIO in case of connection refused by the server (ret_code is then valid in the packet). + */ +int mqtt_check_connack_reply_pkt(const struct mqtt_connack_reply_pkt* pkt); + +/***************************************************************************** */ +/* publish and puback packets */ + +#define MQTT_PUBLISH_DUP (0x01 << 3) +#define MQTT_PUBLISH_QoS(x) (((x) & 0x03) << 1) +#define MQTT_PUBLISH_RETAIN (0x01 << 0) + +/* MQTT publish paquet + * A publish control packet is sent from a Client to a Server or from Server to a Client to transport + * an Application Message. */ +struct mqtt_publish_pkt { + struct mqtt_topic topic; + uint16_t packet_id; /* Packet identifier is required for publish if QoS > 0 */ + uint8_t dup_flag; /* Accept 1 or MQTT_PUBLISH_DUP as flag set */ + uint8_t retain_flag; + uint32_t message_size; + uint8_t* application_message; +}; + +/* Create MQTT publish packet + * This function must be called in order to create a publish MQTT packet used to publish data on a + * topic (send data to the server). + * The application message size can be 0, in which case the application_message pointer in the + * mqtt_publish_pkt struct may be NULL + * + * Caller must provide a buffer "buf" big enougth to receive the whole packet and indicate it's size + * in "buf_size". + * Returns the used buffer size (packet size) on success. + * Return value is negative on error : + * -EINVAL if either buf or pkt is NULL, + * -EINVAL if client_id string provided in mqtt_connect_pkt struct is NULL (its length may be 0, + * but pointer cannot be NULL). + * -E2BIG if buffer is not big enough for packet. + */ +int mqtt_pack_publish_packet(const struct mqtt_publish_pkt* pkt, uint8_t* buf, uint32_t buf_size); + + +/* Unpack MQTT publish packet + * This function must be called in order to transform a received publish MQTT packet to a + * mqtt_publish_pkt structure. + * The function also checks the validity of the packet. + * All returned pointers within the struct will point to parts of the provided buffer, so the buffer + * must not be discarded after the call. + * if the return value is positive, it is the topic string length. + */ +int mqtt_unpack_publish_packet(struct mqtt_publish_pkt* pkt, uint8_t* buf, uint32_t size); + + +#define MQTT_PUBREL_FLAG (0x01 << 1) + +/* MQTT publish replies packet, used for puback, pubrec, pubrel and pubcomp */ +/* control paquet type must be either (MQTT_CONTROL_PUBACK << 4) if QoS = 1 (no further reply required) + * or (MQTT_CONTROL_PUBREC << 4) if QoS = 2 and then a publish release must be received or sent + * (MQTT_CONTROL_PUBREL << 4), answered by a publish complete packet (MQTT_CONTROL_PUBCOMP << 4) */ +struct mqtt_publish_reply_pkt { + uint8_t control; /* Packet type */ + uint8_t rem_len; /* Remaining length : must be 0x02 */ + uint16_t acked_pkt_id; /* Id of packet that is being acknowledged, in network endianness */ +} __attribute__ ((__packed__)); + +/* Build MQTT puback, pubrec, pubrel or pubcomp packet, used in the publish acknowledge one-way or + * two-way hand-check mechanism. + * The provided buf must be big enough to hold 4 bytes. + * Note that buf may safely be cast to or from a mqtt_publish_reply_pkt struct pointer. + * type is one of MQTT_CONTROL_PUBACK, MQTT_CONTROL_PUBREC, MQTT_CONTROL_PUBREL or MQTT_CONTROL_PUBCOMP. + * acked_pkt_id is the ID of the concerned publish packet, in host endianness. + * Returns the used buffer size (4 bytes) on success, or -EINVAL in case of a NULL buf pointer. + */ +int mqtt_pack_publish_reply_pkt(uint8_t* buf, uint16_t acked_pkt_id, uint8_t type); + +/* Check MQTT puback, pubrec, pubrel or pubcomp packet + * This function may get called to check a supposed publish acknowledge packet in either one-way or + * two-way hand-check mechanism. + * type is either MQTT_CONTROL_PUBACK, MQTT_CONTROL_PUBREC, MQTT_CONTROL_PUBREL or MQTT_CONTROL_PUBCOMP. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid. + * The function returns -EPROTO in case of protocol error or -EINVAL in case of a NULL pkt pointer. + */ +int mqtt_check_publish_reply_pkt(struct mqtt_publish_reply_pkt* pkt, uint8_t type); + + + +/***************************************************************************** */ +/* subsribe, unsubsribe, suback and unsuback packets */ + +#define MQTT_SUBSCRIBE_FLAG (0x01 << 1) +#define MQTT_UNSUBSCRIBE_FLAG MQTT_SUBSCRIBE_FLAG + +/* MQTT subscribe or unsubsribe packet */ +struct mqtt_sub_pkt { + uint16_t packet_id; /* Packet identifier */ + uint8_t nb_topics; /* Number of topics in the topics table. Limited to 125 */ + struct mqtt_topic* topics; /* Table of topics */ +}; + +/* Build MQTT subscribe packet + * This function must be called in order to create a subscribe MQTT packet used to + * subscibe on a topic (or multiple topics) in order to receive data published on this + * or these topics. + * We limit the number of subscriptions sent at once to 125 in order to get a fixed size + * subscription acknoledgement packet. + */ +int mqtt_pack_subscribe_pkt(const struct mqtt_sub_pkt* pkt, uint8_t* buf, uint32_t buf_size); + +/* Build MQTT unsubscribe packet + * This function must be called in order to create an unsubscribe MQTT packet used to unsubscibe + * from a topic (or multiple topics) in order to stop receiving data published on this or these + * topics. + */ +int mqtt_pack_unsubscribe_pkt(const struct mqtt_sub_pkt* pkt, uint8_t* buf, uint32_t buf_size); + + +/* MQTT subscribe or unsubscribe reply packet */ +struct mqtt_sub_reply_pkt { + uint8_t control; /* Packet type */ + uint8_t rem_len; /* Remaining length : this is valid for up to 125 subscriptions sent at once ... */ + uint16_t acked_pkt_id; /* Id of packet that is being acknowledged, in network endianness */ +} __attribute__ ((__packed__)); + +#define MQTT_SUBSCRIBE_FAILURE (0x80) + +/* Check MQTT suback packet + * This function may get called to check a supposed subscribe acknowledge packet. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid, + * regardless of the return codes received. + * len must be the length of the full packet received, which includes the mqtt_subscribe_reply_pkt + * structure and all the return codes received. + */ +int mqtt_check_suback_reply_pkt(const struct mqtt_sub_reply_pkt* pkt, uint8_t len); + +/* Check MQTT unsuback packet + * This function may get called to check a supposed unsubscribe acknowledge packet. + * The function checks for conformance to the MQTT protocol and returns 0 if the packet is valid. + */ +int mqtt_check_unsuback_reply_pkt(const struct mqtt_sub_reply_pkt* pkt); + + + + +/***************************************************************************** */ +/* MQTT ping request and reply packet */ +struct mqtt_ping_pkt { + uint8_t control; /* Packet type : either (MQTT_CONTROL_PINGREQ << 4) or (MQTT_CONTROL_PINGRESP << 4) */ + uint8_t rem_len; /* Remaining length : must be 0x00 */ +} __attribute__ ((__packed__)); + +/* Build MQTT ping packet + * This one is a fixed packet, easy. + */ +int mqtt_pack_ping_pkt(uint8_t* buf); + +/* Check MQTT ping reply packet */ +int mqtt_check_ping_reply_pkt(const struct mqtt_ping_pkt* pkt); + + + +/***************************************************************************** */ +/* MQTT disconnect */ +/* This one may not be very useful in most applications, but it's defined by the MQTT standard and + * including the definition here takes no room in either code or data ... + */ +struct mqtt_disconnect_pkt { + uint8_t control; /* Packet type : (MQTT_CONTROL_DISCONNECT << 4) */ + uint8_t rem_len; /* Remaining length : must be 0x00 */ +} __attribute__ ((__packed__)); + + + +#endif /* MQTT_H */ + diff --git a/host/scialys_mqtt_bridge/mqtt_comm.c b/host/scialys_mqtt_bridge/mqtt_comm.c new file mode 100644 index 0000000..0cf8f4b --- /dev/null +++ b/host/scialys_mqtt_bridge/mqtt_comm.c @@ -0,0 +1,607 @@ +/**************************************************************************** + * Host app for drzportail + * + * + * Copyright 2022 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +#include +#include +#include +#include +#include +#include +#include + +#include "sock_utils.h" +#include "mqtt_comm.h" +#include "mqtt.h" + +/* + * MQTT client implementation - communication part + * + * For protocol defiition, refer to + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html + * + * This implementation has a limitation on the subscription mechanism : the protocol allows for + * multiple subscriptions / unsubscriptions using a single subscribe packet, which is not supported + * by this code for code simplicity, memory usage limitation, and packets size limitation. + * All other features of the protocol should be fully supported. + * + * The MQTT protocol requires a lossless, ordered transport protocol layer with an address mechanism. + * This code is the implementation of the MQTT transport part of the communication, over TCP-IP + * + * All the packet encoding and decoding is handled by code in mqtt.c + * + */ + + +/********************************************************************************************/ +/* MQTT Subscription */ + +int mqtt_send_subscriptions(struct mqtt_info* mqtti) +{ + int len = 0, ret = 0; + + /* Create the MQTT subscribe packet */ + len = mqtt_pack_subscribe_pkt(mqtti->sub, mqtti->txbuf, MQTT_BUFF_SIZE); + if (len <= 0) { + printf("MQTT subscribe packet pack error : %d\n", len); + return -8; + } + + ret = send(mqtti->socket, mqtti->txbuf, len, 0); + if (ret != len) { + perror("Unable to send MQTT subscribe packet:"); + return -9; + } + + mqtti->comm_state = MQTT_WAITING_SUBACK; + return 0; +} + +int mqtt_handle_suback(struct mqtt_info* mqtti) +{ + struct mqtt_sub_reply_pkt* reply = (struct mqtt_sub_reply_pkt*)(mqtti->rxbuf); + int ret = 0, i = 0; + uint8_t* ret_codes = NULL; + + ret = mqtt_check_suback_reply_pkt(reply, (mqtti->sub->nb_topics + sizeof(struct mqtt_sub_reply_pkt))); + if (ret != 0) { + printf("Not a valid suback MQTT packet (ret: %d) ... protocol flow error\n", ret); + return -10; + } + /* Check that the packet ID is the right one */ + if (ntohs(reply->acked_pkt_id) != mqtti->sub->packet_id) { + printf("Suback packet has wrong packet ID (%d) ... protocol flow error\n", ntohs(reply->acked_pkt_id)); + return -11; + } + /* Valid packet, check return code for each topic sent */ + ret_codes = (uint8_t*)(mqtti->rxbuf + 4); + for (i = 0; i < mqtti->sub->nb_topics; i++) { + if (ret_codes[i] == MQTT_SUBSCRIBE_FAILURE) { + printf("Subscription %d refused (%s)\n", i, mqtti->sub->topics[i].name); + return -12; + } else { + printf("Subscription %d accepted (%s)\n", i, mqtti->sub->topics[i].name); + } + } + printf("All MQTT subscriptions accepted\n"); + mqtti->comm_state = MQTT_IDLE; + return 0; +} + +/********************************************************************************************/ +/* MQTT connect */ + +void free_publish_packet(struct mqtt_pub* publi, int level); + +void mqtt_close(struct mqtt_info* mqtti, int rewind) +{ + /* Note : All FALLTHROUGH !!! */ + switch (rewind) { + case 2: + { + /* Free all Rx publish messages list */ + struct mqtt_pub* publi = NULL, * tmp_publi = NULL; + list_for_each_entry_safe(publi, tmp_publi, &mqtti->pub_rx_list, list) { + free_publish_packet(publi, 4); + } + list_for_each_entry_safe(publi, tmp_publi, &mqtti->pub_tx_list, list) { + free_publish_packet(publi, 4); + } + } + __attribute__ ((fallthrough)); /* Fallthrough */ + case 1: + close(mqtti->socket); + mqtti->comm_state = MQTT_UNCONNECTED; + /* Note : Do not loose fd number so it can be removed from select */ + __attribute__ ((fallthrough)); /* Fallthrough */ + case 0: + free(mqtti->txbuf); + free(mqtti->rxbuf); + } +} + +/* Init MQTT internal structure and connect to the server */ +int mqtt_init(struct mqtt_info* mqtti) +{ + int ret = 0, len = 0; + + /* Allocate buffers */ + mqtti->txbuf = malloc(MQTT_BUFF_SIZE); + if (mqtti->txbuf == NULL) { + printf("Unable to allocate MQTT Tx buffer.\n"); + return -1; + } + mqtti->rxbuf = malloc(MQTT_BUFF_SIZE); + if (mqtti->rxbuf == NULL) { + printf("Unable to allocate MQTT Rx buffer.\n"); + free(mqtti->txbuf); + return -1; + } + memset(mqtti->txbuf, 0, MQTT_BUFF_SIZE); + memset(mqtti->rxbuf, 0, MQTT_BUFF_SIZE); + + /* Connect to MQTT server */ + mqtti->socket = socket_tcp_client(mqtti->server_ip, mqtti->server_port); + if (mqtti->socket <= 0) { + printf("Unable to connect to MQTT server %s on port %d\n", mqtti->server_ip, mqtti->server_port); + mqtt_close(mqtti, 0); + return -2; + } + + /* Create the MQTT connect packet */ + len = mqtt_pack_connect_packet(mqtti->conn, mqtti->txbuf, MQTT_BUFF_SIZE); + if (len <= 0) { + printf("MQTT connect packet pack error : %d\n", len); + mqtt_close(mqtti, 1); + return -3; + } + + ret = send(mqtti->socket, mqtti->txbuf, len, 0); + if (ret != len) { + perror("Unable to send MQTT connect packet:"); + mqtt_close(mqtti, 1); + return -4; + } + + mqtti->comm_state = MQTT_WAITING_CONNECT_ACK; + return 0; +} + +int mqtt_handle_connack(struct mqtt_info* mqtti) +{ + struct mqtt_connack_reply_pkt* reply = (struct mqtt_connack_reply_pkt*)(mqtti->rxbuf); + int ret = mqtt_check_connack_reply_pkt(reply); + + if (ret != 0) { + printf("Not a connack MQTT packet (ret: %d) ... protocol flow error\n", ret); + return -5; + } + /* Valid packet, check return code */ + if (reply->ret_code != MQTT_CONNACK_ACCEPTED) { + printf("Connection refused: %d\n", reply->ret_code); + mqtti->comm_state = MQTT_UNCONNECTED; + return -6; + } + printf("MQTT Connection accepted\n"); + /* Send our subscriptions */ + ret = mqtt_send_subscriptions(mqtti); + if (ret != 0) { + printf("Unable to send MQTT Subscriptions.\n"); + return -7; + } + mqtti->comm_state = MQTT_WAITING_SUBACK; + return 0; +} + +/********************************************************************************************/ +/* MQTT Publish */ + +void free_publish_packet(struct mqtt_pub* publi, int level) +{ + /* Note : All FALLTHROUGH !!! */ + switch (level) { + case 4: + list_del(&publi->list); + __attribute__ ((fallthrough)); /* Fallthrough */ + case 3: + if (publi->pkt.application_message != NULL) { + free(publi->pkt.application_message); + } + __attribute__ ((fallthrough)); /* Fallthrough */ + case 2: + free(publi->pkt.topic.name); + __attribute__ ((fallthrough)); /* Fallthrough */ + case 1: + default: + free(publi); + } +} + +int mqtt_handle_rx_publi(struct mqtt_info* mqtti, int len) +{ + struct mqtt_pub* publi = NULL; + int topic_len = 0; + char* tmp = NULL; + + publi = malloc(sizeof(struct mqtt_pub)); + if (publi == NULL) { + printf("Unable to get memory for received publish packet structure\n"); + return -30; + } + memset(publi, 0, sizeof(struct mqtt_pub)); + + topic_len = mqtt_unpack_publish_packet(&(publi->pkt), mqtti->rxbuf, len); + if (topic_len <= 0) { + printf("Invalid publish message received.\n"); + free_publish_packet(publi, 1); + return -31; + } + + /* Copy topic */ + tmp = publi->pkt.topic.name; + publi->pkt.topic.name = malloc(topic_len + 1); + if (publi->pkt.topic.name == NULL) { + printf("Unable to get memory for received publish message topic\n"); + free_publish_packet(publi, 1); + return -32; + } + memcpy(publi->pkt.topic.name, tmp, topic_len); + publi->pkt.topic.name[topic_len] = '\0'; + + /* Copy message */ + tmp = (char*)publi->pkt.application_message; + publi->pkt.application_message = malloc(publi->pkt.message_size); + if (publi->pkt.application_message == NULL) { + printf("Unable to get memory for received publish application message\n"); + free_publish_packet(publi, 2); + return -33; + } + memcpy(publi->pkt.application_message, tmp, publi->pkt.message_size); + + /* Set reception date */ + publi->date = time(NULL); + + /* And add to the list of received messages */ + list_add(&publi->list, &mqtti->pub_rx_list); + + return 0; +} + + +/* Call this one once the packet has been taken care of (from main loop) */ +int mqtt_ack_publish(struct mqtt_info* mqtti, struct mqtt_pub* publi) +{ + int ret = 0, len = 0; + /* Put puback packet on tx buffer */ + len = mqtt_pack_publish_reply_pkt(mqtti->txbuf, publi->pkt.packet_id, MQTT_CONTROL_PUBACK); + /* Send buffer content */ + ret = send(mqtti->socket, mqtti->txbuf, len, 0); + /* And free publish packet */ + free_publish_packet(publi, 4); + /* If unable to send, the close connection to MQTT brocker */ + if (ret != len) { + perror("Unable to send MQTT puback packet:"); + mqtt_close(mqtti, 2); + return -1; + } + return 0; +} + +/* Get the next ID which is not in use in the list of packets sent, and not 0 or 1. + * 1 is used for the subscribe packet + * 0 is returned when there's no more ids available + */ +int get_next_free_pkt_id(struct mqtt_info* mqtti) +{ + struct mqtt_pub* publi = NULL; + static int next = 51; + uint8_t ids[256]; + int i = 0; + + /* All available */ + memset(ids, 0, 256); + /* And mark used ones */ + list_for_each_entry(publi, &mqtti->pub_tx_list, list) { + ids[publi->pkt.packet_id] = 1; + } + /* Increment "next" ID to use different IDs even if the first one is available */ + next++; + if (next >= 256) { + next = 2; + } + /* Find the first available packet ID (but not ID 0 and 1) */ + for (i = next; i < 256; i++) { + if (ids[i] == 0) { + return i; + } + } + return 0; +} + +/* Call this to publish a packet with data to be sent to MQTT brocker */ +int mqtt_publish(struct mqtt_info* mqtti, char* topic, uint8_t* msg, int msg_len) +{ + struct mqtt_pub* publi = NULL; + int topic_len = 0, len = 0, ret = 0; + + if ((mqtti == NULL) || (topic == NULL) || (msg == NULL)) { + return -EINVAL; + } + publi = malloc(sizeof(struct mqtt_pub)); + if (publi == NULL) { + printf("Unable to get memory for publish packet structure\n"); + return -ENOMEM; + } + + /* Copy topic */ + topic_len = strlen(topic); + publi->pkt.topic.name = malloc(topic_len + 1); + if (publi->pkt.topic.name == NULL) { + printf("Unable to get memory for publish message topic\n"); + free_publish_packet(publi, 1); + return -ENOMEM; + } + memcpy(publi->pkt.topic.name, topic, topic_len + 1); + publi->pkt.topic.QoS = MQTT_QoS_1; + + /* Copy message */ + if (msg_len != 0) { + publi->pkt.application_message = malloc(msg_len); + if (publi->pkt.application_message == NULL) { + printf("Unable to get memory for publish application message\n"); + free_publish_packet(publi, 2); + return -ENOMEM; + } + memcpy(publi->pkt.application_message, msg, msg_len); + } else { + publi->pkt.application_message = NULL; + } + publi->pkt.message_size = msg_len; + + /* Set dup and retain */ + publi->pkt.dup_flag = 0; /* First time the packet is being sent */ + publi->pkt.retain_flag = 0; + + /* Set sending date */ + publi->date = time(NULL); + + /* Get next publish packet ID */ + publi->pkt.packet_id = get_next_free_pkt_id(mqtti); + if (publi->pkt.packet_id == 0) { + printf("Too many packets waiting for acknoledgement, stop sending more.\n"); + free_publish_packet(publi, 3); + return -EBUSY; + } + + /* Build and send the packet ! */ + len = mqtt_pack_publish_packet(&publi->pkt, mqtti->txbuf, MQTT_BUFF_SIZE); + if (len <= 0) { + printf("Unable to pack publish message\n"); + free_publish_packet(publi, 3); + return len; + } + ret = send(mqtti->socket, mqtti->txbuf, len, 0); + if (ret != len) { + perror("Unable to send MQTT publish packet:"); + free_publish_packet(publi, 3); + mqtt_close(mqtti, 2); + return -ECOMM; + } + + /* And add to the list of sent messages */ + list_add(&publi->list, &mqtti->pub_tx_list); + mqtti->comm_state = MQTT_WAITING_PUBACK; + + return 0; +} + + +int mqtt_handle_puback(struct mqtt_info* mqtti) +{ + struct mqtt_pub* publi = NULL, * tmp_publi = NULL; + struct mqtt_publish_reply_pkt* reply; + int ret = 0; + /* Parse packet */ + reply = (struct mqtt_publish_reply_pkt*)(mqtti->rxbuf); + ret = mqtt_check_publish_reply_pkt(reply, MQTT_CONTROL_PUBACK); + if (ret != 0) { + printf("Not a puback packet (ret: %d) ... protocol flow error\n", ret); + return -40; + } + /* Find publication */ + list_for_each_entry_safe(publi, tmp_publi, &mqtti->pub_tx_list, list) { + if (publi->pkt.packet_id != ntohs(reply->acked_pkt_id)) { + continue; + } + free_publish_packet(publi, 4); + return 0; + } + /* Not found */ + printf("Received an unrequested puback (id : %d)...\n", ntohs(reply->acked_pkt_id)); + return -41; +} + +/********************************************************************************************/ +/* MQTT Stream handling */ + +/* Check that the buffer contains a full packet, and return it's length. + * If the buffer will never be big enough for the length, return -1. + * If we need to wait for more data, return 0. + */ +int mqtt_stream_check_len(struct mqtt_info* mqtti) +{ + uint32_t len = 0; + int nb_len_bytes = 0; + /* We need at least two bytes to check whether we received enough data for a packet */ + if (mqtti->rx_idx < 2) { + return 0; + } + /* Small packets - most common case */ + if (mqtti->rxbuf[1] <= 127) { + if (mqtti->rx_idx >= (mqtti->rxbuf[1] + 2)) { + return (mqtti->rxbuf[1] + 2); + } else { + return 0; + } + } + /* Bigger packet - try to compute packet size. + * We know we need to have more than 129 bytes in rx buffer, so do not bother computing size + * if we don't even have received those bytes. + */ + if (mqtti->rx_idx <= 129) { + return 0; + } + nb_len_bytes = decode_remaining_length((mqtti->rxbuf + 1), &len); + if (mqtti->rx_idx >= (len + nb_len_bytes + 1)) { + mqtti->curr_rx_payload_len = len; + return (len + nb_len_bytes + 1); + } else if ((len + nb_len_bytes + 1) > MQTT_BUFF_SIZE) { + /* Full packet does not fit in empty rx buffer ... */ + return -21; + } + return 0; +} + +void mqtt_consume_packet(struct mqtt_info* mqtti, uint32_t len) +{ + memmove(mqtti->rxbuf, (mqtti->rxbuf + len), (mqtti->rx_idx - len)); + mqtti->rx_idx -= len; +} + +/* Check that there is a packet, and handle it. + * Loop to handle multiple packets read at once. + */ +int mqtt_stream_decode(struct mqtt_info* mqtti) +{ + int ret = 0; + do { + uint32_t len = mqtt_stream_check_len(mqtti); + + if (len <= 0) { + return len; + } + + /* We must not receive anything before the connect acknowledge. Check this first. */ + if (mqtti->comm_state == MQTT_WAITING_CONNECT_ACK) { + ret = mqtt_handle_connack(mqtti); + mqtt_consume_packet(mqtti, len); /* Consume the connack packet */ + if (ret != 0) { + return ret; + } + continue; + } + + if (mqtti->comm_state == MQTT_WAITING_SUBACK) { + ret = mqtt_handle_suback(mqtti); + mqtt_consume_packet(mqtti, len); /* Consume the suback packet */ + if (ret != 0) { + return ret; + } + continue; + } + + /* We know we have a packet, with a payload of curr_rx_payload_len */ + switch ((mqtti->rxbuf[0] & 0xF0) >> 4) { + case MQTT_CONTROL_PUBLISH: + /* Handle publication and send puback. These can happen at any time. */ + ret = mqtt_handle_rx_publi(mqtti, len); + mqtt_consume_packet(mqtti, len); /* Consume the publish packet */ + if (ret != 0) { + return ret; + } + break; + + case MQTT_CONTROL_PUBACK: + if (mqtti->comm_state == MQTT_WAITING_PUBACK) { + ret = mqtt_handle_puback(mqtti); + mqtt_consume_packet(mqtti, len); /* Consume the puback packet */ + if (ret != 0) { + return ret; + } + } else { + printf("Received a puback while idle ...\n"); + mqtt_consume_packet(mqtti, len); /* Consume the puback packet anyway ... */ + return -12; + } + if (list_empty(&(mqtti->pub_tx_list))) { + mqtti->comm_state = MQTT_IDLE; + } + break; + + /* These should not be received when we do not specifically wait for them */ + case MQTT_CONTROL_CONNACK: + case MQTT_CONTROL_SUBACK: + /* We do not unsubscribe and we do not send ping request ... */ + case MQTT_CONTROL_UNSUBACK: + case MQTT_CONTROL_PINGRESP: + /* These are for QoS of 2 (exchange exactly once), which we do not use */ + case MQTT_CONTROL_PUBCOMP: + case MQTT_CONTROL_PUBREC: + case MQTT_CONTROL_PUBREL: + /* And these are only Client to Server */ + case MQTT_CONTROL_PINGREQ: + case MQTT_CONTROL_SUBSCRIBE: + case MQTT_CONTROL_UNSUBSCRIBE: + case MQTT_CONTROL_DISCONNECT: + default: + printf("Received an unsupported MQTT packet: %d (len: %d, rem: %d)\n", + ((mqtti->rxbuf[0] & 0xF0) >> 4), len, mqtti->rx_idx); + mqtt_consume_packet(mqtti, len); /* Consume the packet anyway */ + break; + } + } while (mqtti->rx_idx > 0); + + return 0; +} + +int mqtt_handle_data(struct mqtt_info* mqtti) +{ + int ret = 0, len = 0; + + len = recv(mqtti->socket, (mqtti->rxbuf + mqtti->rx_idx), (MQTT_BUFF_SIZE - mqtti->rx_idx), 0); + if (len > 0) { + /* We received some data */ + mqtti->rx_idx += len; + ret = mqtt_stream_decode(mqtti); + if (ret < 0) { + printf("MQTT stream decoding error : %d\n", ret); + ret = -1; + } + /* if (ret == 0) then we need more data to form a packet */ + } else if (len == 0) { + /* Server closed the connection ? */ + printf("MQTT Server closed the connection ???\n"); + ret = -2; + } else { + /* Wait .. we received something but nothing came in ? Close socket */ + perror("MQTT: Receive error for client TCP connection"); + ret = -3; + } + + if (ret < 0) { + mqtt_close(mqtti, 2); + return ret; + } + return 0; +} + + + + diff --git a/host/scialys_mqtt_bridge/mqtt_comm.h b/host/scialys_mqtt_bridge/mqtt_comm.h new file mode 100644 index 0000000..53cf551 --- /dev/null +++ b/host/scialys_mqtt_bridge/mqtt_comm.h @@ -0,0 +1,125 @@ +/**************************************************************************** + * Host app for drzportail + * + * + * Copyright 2022 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *************************************************************************** */ + +#include +#include "mqtt.h" + +/* + * MQTT client implementation - communication part + * + * For protocol defiition, refer to + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html + * + * This implementation has a limitation on the subscription mechanism : the protocol allows for + * multiple subscriptions / unsubscriptions using a single subscribe packet, which is not supported + * by this code for code simplicity, memory usage limitation, and packets size limitation. + * All other features of the protocol should be fully supported. + * + * The MQTT protocol requires a lossless, ordered transport protocol layer with an address mechanism. + * This code is the implementation of the MQTT transport part of the communication, over TCP-IP + * + * All the packet encoding and decoding is handled by code in mqtt.c + * + */ + +#ifndef MQTT_COMM +#define MQTT_COMM + +#include "mqtt.h" +#include "list.h" + + +/* That's quite big, and should far more than enough for about anything */ +#define MQTT_BUFF_SIZE 4096 + + +/* Track publish packet sent, so we can send them again on timeout and + * make sure they all got ack'ed. + */ +struct mqtt_pub { + struct mqtt_publish_pkt pkt; + time_t date; + struct list_head list; +}; + + +/* Track our subscriptions. + * Finally not used as we have only one. Kept for possible future use, or + * for other future apps */ +struct mqtt_subscriptions { + struct mqtt_sub_pkt* pkt; + time_t date; + struct list_head list; + int accepted; +}; + + +/* Our main data structure for MQTT handling part */ +struct mqtt_info { + const struct mqtt_connect_pkt* conn; + const struct mqtt_sub_pkt* sub; + int socket; + char* server_ip; + uint16_t server_port; + uint8_t comm_state; + uint8_t next_packet_id; + uint8_t* txbuf; + uint8_t* rxbuf; + uint16_t rx_idx; /* Protocol allows for far more, not our buffer. */ + uint16_t curr_rx_payload_len; + struct list_head pub_tx_list; + struct list_head pub_rx_list; + struct list_head sub_list; +}; + + +/* Possible states for MQTT communication flow */ +enum mqtt_client_states { + MQTT_UNCONNECTED, + MQTT_IDLE, + MQTT_WAITING_CONNECT_ACK, + MQTT_WAITING_PUBACK, + MQTT_WAITING_PUBREC, + MQTT_WAITING_PUBREL, + MQTT_WAITING_PUBCOMP, + MQTT_WAITING_SUBACK, + MQTT_WAITING_UNSUBACK, + MQTT_WAITING_PINGRESP, +}; + +/* Setup Connection to MQTT brocker */ +int mqtt_init(struct mqtt_info* mqtti); + +/* Handle data received on mqtt stream */ +int mqtt_handle_data(struct mqtt_info* mqtti); + +/* Call this to publish a packet with data to be sent to MQTT brocker */ +int mqtt_publish(struct mqtt_info* mqtti, char* topic, uint8_t* msg, int msg_len); + +/* Call this one once the packet has been taken care of (from main loop) */ +int mqtt_ack_publish(struct mqtt_info* mqtti, struct mqtt_pub* publi); + + + +#endif /* MQTT_COMM */ + + + diff --git a/host/scialys_mqtt_bridge/scialys_uart_comm.c b/host/scialys_mqtt_bridge/scialys_uart_comm.c new file mode 100644 index 0000000..fc1af28 --- /dev/null +++ b/host/scialys_mqtt_bridge/scialys_uart_comm.c @@ -0,0 +1,151 @@ +/**************************************************************************** + * Host app for drzelec : + * Receive and decode data from Scialys module and send data to MQTT brocker + * + * + * Copyright 2024 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + +/* + * Decodes the frames continuously sent by the scialys module on his serial line (UART1) + */ + + +#include +#include +#include +#include + +#undef DEBUG + +#ifdef DEBUG +#include +#endif + +#include "scialys_uart_comm.h" + + + +#define BUF_SIZE 80 +uint8_t data[BUF_SIZE]; +unsigned int data_idx = 0; + +unsigned int pkt_type = TYPE_NONE; + +int protocol_decode(uint8_t c, struct sc_module* scialys) +{ + /* Start of packet */ + if (data_idx == 0) { + switch (c) { + case TYPE_DATA: + pkt_type = TYPE_DATA; + break; + case TYPE_INFO: + pkt_type = TYPE_INFO; + break; + case TYPE_CONFIG: + pkt_type = TYPE_CONFIG; + break; + } + if (pkt_type != TYPE_NONE) { + data[0] = c; + data_idx = 1; + return 0; + } + return -1; + } + if (data_idx > 0) { + data[data_idx++] = c; + if (pkt_type == TYPE_DATA) { + /* Full header received, check for header validity */ + if (data_idx == 4) { + uint8_t sum = data[0] + data[1] + data[2] + data[3]; + if (sum != 0) { + scialys->errors_count++; + data_idx = 0; + return -1; + } + } + /* Full message received, validate data checksum */ + if (data_idx >= sizeof(struct scialys_data)) { + uint8_t i = 0, sum = 0; + for (i = 4; i < sizeof(struct scialys_data); i++) { + sum += data[i]; + } + data_idx = 0; /* Do it before we get a chance to return */ + pkt_type = TYPE_NONE; + if (sum != data[3]) { + scialys->errors_count++; + return -1; + } + /* Data is valid */ + memcpy(&scialys->sc_data, data, sizeof(struct scialys_data)); + return TYPE_DATA; + } + } else if (pkt_type == TYPE_CONFIG) { + /* Full config received, validate data checksum */ + if (data_idx >= sizeof(struct scialys_config)) { + uint8_t i = 0, sum = 0; + for (i = 0; i < sizeof(struct scialys_config); i++) { + sum += data[i]; + } + data_idx = 0; /* Do it before we get a chance to return */ + pkt_type = TYPE_NONE; + if (sum != 0) { + scialys->errors_count++; + return -1; + } + /* Config is valid */ + memcpy(&scialys->sc_conf, data, sizeof(struct scialys_config)); + return TYPE_CONFIG; + } + } else if (pkt_type == TYPE_INFO) { + /* Full message received, check for checksum */ + #define VERSION_BUF_LEN 16 + if (data_idx == VERSION_BUF_LEN) { + uint8_t i = 0, sum = 0; + for (i = 2; i < VERSION_BUF_LEN; i++) { + sum += data[i]; + } + data_idx = 0; /* Do it before we get a chance to return */ + pkt_type = TYPE_NONE; + if (sum != data[1]) { + scialys->errors_count++; + return -1; + } + for (i = 2; i < VERSION_BUF_LEN; i++) { + if (data[i] == '-') { + scialys->sc_info.module_version[i] = '\0'; + i++; + break; + } + scialys->sc_info.module_version[i-2] = data[i]; + } + memcpy(&scialys->sc_info.soft_version, (data + i), (VERSION_BUF_LEN - i)); + scialys->sc_info.compile_version = strtoul((data + i + 2), NULL, 10); + return TYPE_INFO; + } + } + return 0; + } + return -1; /* This byte is not part of a packet */ +} + + + diff --git a/host/scialys_mqtt_bridge/scialys_uart_comm.h b/host/scialys_mqtt_bridge/scialys_uart_comm.h new file mode 100644 index 0000000..e20dcc7 --- /dev/null +++ b/host/scialys_mqtt_bridge/scialys_uart_comm.h @@ -0,0 +1,223 @@ +/**************************************************************************** + * Host app for drzelec : + * Receive and decode data from Scialys module and send data to MQTT brocker + * + * + * Copyright 2024 Nathael Pajani + * + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + ****************************************************************************/ + + +#include + +enum packet_type { + TYPE_NONE = 0, + TYPE_DATA = '#', + TYPE_INFO = '&', + TYPE_CONFIG = '@', +}; + + +/******************************************************************************/ +/* Scialys data, as sent on UART1 or stored on SD card. + * + * Header fields : + * The start byte is dedicated to UART communication and is always '#" + * Note that this byte may be repeated within the data packet, so the rest of the header + * should be checked before handling a packet + * The first checksum byte is so that the sum of all header bytes is zero + * The version byte indicates the version of this structure. + * The length is fixed so knowing the version gives the length. + * The data checksum is equal to the sum of all the data bytes + * + * Data fields : + * ... + * The flags can be expanded to a "struct flags" (see below) by shifting one bit to each byte. + * + */ + +#define DATA_VERSION 0x01 +#define DATA_HEADER_LEN 4 + +struct scialys_data { + /* Header */ + uint8_t start; + uint8_t cksum; + uint8_t version; + uint8_t data_cksum; + /* Data */ + uint32_t solar_prod_value; + uint32_t home_conso_value; + int water_centi_degrees; + int deci_degrees_power; + int deci_degrees_disp; + uint16_t load_power_lowest; + uint16_t load_power_highest; + uint8_t command_val; + uint8_t act_cmd; + uint8_t mode; + uint8_t flags; +} __attribute__ ((__packed__)); + +struct flags { + uint8_t fan_on; + uint8_t force_fan; + uint8_t error_shutdown; + uint8_t temp_shutdown; + uint8_t overvoltage; + uint8_t external_disable; + uint8_t forced_heater_mode; + uint8_t manual_activation_request; +}; + +/******************************************************************************/ +/* Configuration storage */ + +/* This struct is stored to user flash when user validates new configuration + * and read from user flash upon statup + * Default values are defined within config.c + * Note that temperature values are stored as hundredth of degrees centigrade (1/100) + * + * Meaning of fields : + * + * grid_power_limit : + * Maximum power which can be used from the external (paid) power source, specified (in kWatt) + * source_power_max : + * Maximum power which can be produced by installation (in Watts) + * source_has_power_ok : + * Power production value to be considered as enough to trigger delayed force heating (in Watts) + * load_power_max : + * Maximum power used by the load. Currently unused by code (in Watts) + * conf_max_temp : + * Maximum temperature at which heating stops, whatever the condition (production or manual + * force included) [20 .. 90] + * + * enter_forced_mode_temp : + * temperature bellow which the system enters auto forced heating mode + * degrees centigrade [0 .. 50] (best not to set above 40) + * auto_forced_mode_value : + * command value to be used when in auto forced mode [10 .. 100] + * auto_forced_target_heater_temp : + * temperature up to which the water is heated when in auto forced mode [20 .. 90] + * auto_forced_heater_delay : + * time to wait before starting auto forced heating, in order to wait for production to start + * (sunrise, wind, ...) + * auto_forced_heater_duration : + * duration (hours) of auto forced heating [1 .. 10] + * auto_force_type : + * type of termination condition for manual force mode (off (no auto force) temp, time, both (first)) + * manual_forced_mode_value : + * command value to be used when in manual forced mode [10 .. 100] + * manual_target_heater_temp : + * temperature up to which the water is heated when in manual forced mode [20 .. 90] + * manual_activation_duration : + * duration (hours) of manual forced heating [1 .. 10] + * manual_force_type : + * type of termination condition for manual force mode (temp, time, both (first)) + * never_force : + * if set to 1, prevent entering manual and automatic forced modes + */ +#define CONFIG_OK 0x4F4B /* "OK" in ASCII */ +#define CONFIG_VERSION 0x02 +struct scialys_config { + /* Config version and info */ + uint8_t conf_version; + uint8_t checksum; + uint16_t config_ok; + /* Config limits */ + uint16_t grid_power_limit; /* specified in kWatt */ + uint16_t source_power_max; /* specified in Watt */ + uint16_t source_has_power_ok; /* specified in Watt */ + uint16_t load_power_max; /* specified in Watt */ + uint16_t conf_max_temp; /* degrees centigrade (0 - 90°C) */ + /* Auto Force limits */ + uint16_t enter_forced_mode_temp; /* degrees centigrade (2 - 60°C) */ + uint16_t auto_forced_mode_value; /* % : 25 - 100 */ + uint16_t auto_forced_target_heater_temp; /* degrees centigrade (0 - 90°C) */ + uint16_t auto_forced_heater_delay; /* Hours : 0 - 5 */ + uint16_t auto_forced_heater_duration; /* Hours : 1 - 5 */ + uint16_t auto_force_type; /* Off, Min, Max, Timer, Target */ + /* Manual Force limits */ + uint16_t manual_forced_mode_value; /* % : 25 - 100 */ + uint16_t manual_target_heater_temp; /* degrees centigrade (0 - 90°C) */ + uint16_t manual_activation_duration; + uint16_t manual_force_type; /* Off, Min, Max, Timer, Target */ + /* Other */ + uint16_t load_type; /* LOAD_TYPE_AC_RES, LOAD_TYPE_AC_IND or LOAD_TYPE_DC */ + uint16_t never_force; /* 0 or 1 */ +} __attribute__((packed)); + +enum force_types { + FORCE_TYPE_OFF, + FORCE_TYPE_MIN, + FORCE_TYPE_MAX, + FORCE_TYPE_TIMER, + FORCE_TYPE_TARGET, + NB_FORCE_TYPE, +}; + +enum load_types { + LOAD_TYPE_AC_RES, + LOAD_TYPE_AC_IND, + LOAD_TYPE_DC, + NB_LOAD_TYPES, +}; + + +/******************************************************************************/ +#define MAX_VLEN 12 +struct scialys_info { + char module_version[MAX_VLEN]; + char soft_version[MAX_VLEN]; + unsigned int compile_version; +} __attribute__((packed)); + +/******************************************************************************/ +/* Handle packet reception */ +struct sc_module { + int fd; + /* The latest data packet received */ + struct scialys_data sc_data; + struct scialys_data sc_data_old; + /* The latest config received */ + struct scialys_config sc_conf; + struct scialys_config sc_conf_old; + /* The latest info received */ + struct scialys_info sc_info; + struct scialys_info sc_info_old; + /* packet counts */ + uint32_t packet_rx_count; + uint32_t packet_tx_count; + uint32_t errors_count; +}; + +/* This function must be called for every received character. + * If the character is part of a packet but the packet is being built, then the function returns 0. + * When the character is the last one of a valid packet, then the function returns the packet size + * and the packet in rx_data->rx_packet is valid. + * If the character is the last one of a packet which has an invalid data checksum, this function + * returns -2 and the data is lost. + * If the character is not part of a packet it returns -1. The character may then be part of + * a debug message (and displayed by the host), or any other kind of communication. + * When a set of consecutive characters have been used to build a packet but the packet is + * not valid due to header error, then the function returns -3 (checksum error) or -4 (data size + * error). The data in rx_data->rx_packet is the received data but is not valid. + * The corresponding data size is always sizeof(struct header). + */ +int protocol_decode(uint8_t c, struct sc_module* scialys); + + diff --git a/host/scialys_mqtt_bridge/serial_utils.c b/host/scialys_mqtt_bridge/serial_utils.c new file mode 100644 index 0000000..5e17e42 --- /dev/null +++ b/host/scialys_mqtt_bridge/serial_utils.c @@ -0,0 +1,59 @@ +/********************************************************************* + * + * Serial utility functions + * + * + * Copyright 2012-2014 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *********************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define SERIAL_BAUD B115200 + + +int serial_setup(char* name) +{ + struct termios tio; + int fd = -1; + + /* Open serial port */ + fd = open(name, O_RDWR); + if (fd < 0) { + perror("Unable to open communication with companion chip"); + return -1; + } + /* Setup serial port */ + memset(&tio, 0, sizeof(tio)); + tio.c_cflag = CS8 | CREAD | CLOCAL; /* 8n1, see termios.h for more information */ + tio.c_cc[VMIN] = 1; + tio.c_cc[VTIME] = 5; + cfsetospeed(&tio, SERIAL_BAUD); + cfsetispeed(&tio, SERIAL_BAUD); + tcsetattr(fd, TCSANOW, &tio); + + return fd; +} + diff --git a/host/scialys_mqtt_bridge/serial_utils.h b/host/scialys_mqtt_bridge/serial_utils.h new file mode 100644 index 0000000..7c17bcf --- /dev/null +++ b/host/scialys_mqtt_bridge/serial_utils.h @@ -0,0 +1,32 @@ +/********************************************************************* + * + * Serial utility functions + * + * + * Copyright 2012 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *********************************************************************/ +#ifndef SERIAL_UTILS_H +#define SERIAL_UTILS_H + +/* Setup serial comunication, using name if given or saved name if name is NULL + * SERIAL_BAUD B38400 + * c_cflag (CS7 | PARENB | CREAD | CLOCAL) (7e1) + */ +int serial_setup(char* name); + +#endif /* SERIAL_UTILS_H */ + diff --git a/host/scialys_mqtt_bridge/sock_utils.c b/host/scialys_mqtt_bridge/sock_utils.c new file mode 100644 index 0000000..4dcda3f --- /dev/null +++ b/host/scialys_mqtt_bridge/sock_utils.c @@ -0,0 +1,245 @@ +/***************************************************************************** + * + * socket utils + * + * + * Copyright 2012 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *****************************************************************************/ + +#include /* perror */ +#include /* errno */ +#include /* memset */ +#include /* close, fcntl */ +#include /* F_SETFL and O_NONBLOCK for fcntl */ + +/* For socket stuff */ +#include /* For portability for socket stuff */ +#include +#include +#include /* For TCP_NODELAY to disable nagle algorithm */ +#include + +/* This is used to allow quicker sending of small packets on wires */ +static int sox_disable_nagle_algorithm(int socket) +{ + const int on = 1; + return setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof( int )); +} + +/* Creates a TCP socket on the specified "port", bind to port and setup to + * accept connections with listen, with up to "nb_pending" pending connections + * in the queue. + * Returns the socket, after disabling nagle algorithm. + * If "our" is not null, it will be used to store the listening socket information. + * If "port" is 0, then "our" must contain all requiered information for bind call. + */ +int socket_tcp_server(int port, int nb_pending, struct sockaddr_in* our) +{ + struct sockaddr_in local; + struct sockaddr_in* tmp; + int s; + int optval = 1; + + if (our == NULL ) { + tmp = &local; + if (port == 0) { + fprintf(stderr, "No port and no address information provided, won't be able to bind, aborting\n"); + return -1; + } + } else { + tmp = our; + } + + if (port != 0) { + memset(tmp, 0, sizeof(struct sockaddr_in)); + tmp->sin_family = AF_INET; + tmp->sin_addr.s_addr = htonl(INADDR_ANY); + tmp->sin_port = htons((short)port); + } + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Unable to create TCP socket"); + return -1; + } + + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void *)&optval, sizeof(int)) == -1) { + perror("Unable to set reuseaddress on socket"); + goto err_close; + } + if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (void *)&optval, sizeof(int)) == -1) { + perror("Unable to set keepalive on socket"); + goto err_close; + } + + if (bind(s, (struct sockaddr *)tmp, sizeof(struct sockaddr_in)) < 0) { + perror("Unable to bind TCP port"); + goto err_close; + } + + if (listen(s, nb_pending)) { + perror("Unable to listen to TCP port"); + goto err_close; + } + + if (sox_disable_nagle_algorithm(s) < 0) { + perror("Unable to disable nagle's algorithm"); + goto err_close; + } + return s; + +err_close: + close(s); + return -1; +} + + +/* Creates a (non blocking) TCP socket and connect to the specified IP and port + * Returns the socket, after disabling nagle algorithm. + */ +int socket_tcp_client(char* ip, int port) +{ + struct sockaddr_in addr; + int s, ret; + + if ((ip == NULL) || (port == 0)) { + printf("Aborted socket creation, need both IP address and port\n"); + return -1; + } + addr.sin_addr.s_addr = inet_addr(ip); + addr.sin_port = htons(port); + addr.sin_family = AF_INET; + + s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) { + perror("Unable to create TCP socket"); + return -1; + } + + ret = connect(s, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)); + if (ret != 0) { + perror("Unable to connect to peer"); + close(s); + return -1; + } + + /* This is used to allow quicker sending of small packets on wires */ + if (sox_disable_nagle_algorithm(s) < 0) { + perror("Unable to disable nagle's algorithm"); + close(s); + return -1; + } + + return s; +} + + + +/* Creates an UDP socket on the specified "port" and bind to port but no specific interface. + * Returns the socket. + * If "our" is not null, it will be used to store the listening socket information. + * "port" must not be 0.. + */ +int create_bound_udp_socket(int port, struct sockaddr_in* our) +{ + struct sockaddr_in local; + struct sockaddr_in* tmp; + int s; + int optval = 1; + + if (port == 0) { + return -1; + } + + if (our == NULL ) { + tmp = &local; + } else { + tmp = our; + } + + memset(tmp, 0, sizeof(struct sockaddr_in)); + tmp->sin_family = AF_INET; + tmp->sin_addr.s_addr = htonl(INADDR_ANY); + tmp->sin_port = htons((short)port); + + if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + perror("Unable to create UDP socket"); + return -1; + } + + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void *)&optval, sizeof(int)) == -1) { + perror("Unable to set reuseaddress on socket"); + goto err_close; + } + + if (bind(s, (struct sockaddr *)tmp, sizeof(struct sockaddr_in)) < 0) { + perror("Unable to bind UDP port"); + goto err_close; + } + + return s; + +err_close: + close(s); + return -1; +} + + + +/* Creates an UDP socket bound to no specific interface. + * The kernel choses the port to bind to. + * Returns the socket. + */ +int create_broadcast_udp_socket(void) +{ + struct sockaddr_in local; + struct sockaddr_in* tmp = &local; + int s; + int optval = 1; + + memset(tmp, 0, sizeof(struct sockaddr_in)); + tmp->sin_family = AF_INET; + tmp->sin_addr.s_addr = htonl(INADDR_ANY); + tmp->sin_port = 0; + + if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + perror("Unable to create UDP socket"); + return -1; + } + + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void *)&optval, sizeof(int)) == -1) { + perror("Unable to set reuseaddress on socket"); + goto err_close; + } + + if (setsockopt(s, SOL_SOCKET, SO_BROADCAST, (void *)&optval, sizeof(int)) == -1) { + perror("Unable to set broadcast on socket"); + goto err_close; + } + + if (bind(s, (struct sockaddr *)tmp, sizeof(struct sockaddr_in)) < 0) { + perror("Unable to bind UDP port"); + goto err_close; + } + + return s; + +err_close: + close(s); + return -1; +} + + diff --git a/host/scialys_mqtt_bridge/sock_utils.h b/host/scialys_mqtt_bridge/sock_utils.h new file mode 100644 index 0000000..2ece5e5 --- /dev/null +++ b/host/scialys_mqtt_bridge/sock_utils.h @@ -0,0 +1,58 @@ +/***************************************************************************** + * + * socket utils + * + * + * Copyright 2012 Nathael Pajani + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + *****************************************************************************/ + +#ifndef SOCK_UTILS_H +#define SOCK_UTILS_H + +#include +#include /* struct sockaddr_in */ + +/* Creates a TCP socket on the specified "port", bind to port and setup to + accept connections with listen, with up to "nb_pending" pending connections + in the queue. + Returns the socket, after disabling nagle algorithm. + If "our" is not null, it will be used to store the listening socket information. + */ +int socket_tcp_server(int port, int nb_pending, struct sockaddr_in* our); + +/* Creates a TCP socket and connect to the specified IP and port + * Returns the socket, after disabling nagle algorithm. + */ +int socket_tcp_client(char* ip, int port); + + +/* Creates an UDP socket on the specified "port" and bind to port but no specific interface. + * Returns the socket. + * If "our" is not null, it will be used to store the listening socket information. + * "port" must not be 0.. + */ +int create_bound_udp_socket(int port, struct sockaddr_in* our); + + +/* Creates an UDP socket bound to no specific interface. + * The kernel choses the port to bind to. + * Returns the socket. + */ +int create_broadcast_udp_socket(void); + +#endif /* SOCK_UTILS_H */ +