MQTT subscriber example
[lpc1224] / apps / base / mqtt_sub / mqtt_comm.c
1 /****************************************************************************
2  *   apps/base/mqtt_pub/mqtt_comm.c
3  *
4  * MQTT client example using data from onboard TMP101 I2C temperature sensor
5  *
6  * Copyright 2013-2014 Nathael Pajani <nathael.pajani@ed3l.fr>
7  *
8  *
9  * This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  *************************************************************************** */
25 #include "lib/stdio.h"
26 #include "lib/errno.h"
27 #include "lib/utils.h"
28 #include "lib/protocols/mqtt.h"
30 #include "drivers/serial.h"
32 #include "mqtt_comm.h"
35 /***************************************************************************** */
36 /* MQTT protocol */
38 /* Our MQTT buffer. MQTT packets may be up to 2^28 bytes, but we will keep them
39  * very small for our needs */
40 uint8_t txbuf[MQTT_BUFF_SIZE];
41 uint16_t app_board_addr = APP_BOARD_ADDRESS;
43 /* This is our static connect data */
44 const struct mqtt_connect_pkt mqtt_conn = {
45         .keep_alive_seconds = 600,
46         .clean_session_flag = MQTT_SESSION_NEW, /* or MQTT_SESSION_RESUME */
47         .client_id = "tsub",
48         .will_topic = { .name = "will/tsub", .QoS = 0 },
49         .will_retain = 1,
50         .will_msg_size = 4,
51         .will_msg = (uint8_t*)"Bye",
52 };
54 uint8_t mqtt_comm_state = MQTT_UNCONNECTED;
57 void add_packet_header(int pkt_len)
58 {
59         struct app_header* header = (struct app_header*)txbuf;
60         uint8_t i = 0, cksum = 0;
62         /* Compute data checksum */
63         for (i = HEADER_SIZE; i < (HEADER_SIZE + pkt_len); i++) {
64                 cksum += txbuf[i];
65         }
66         
67         /* Add the header */
68         header->start = FIRST_PACKET_CHAR;
69         header->type = 'M';
70         header->addr = htons(app_board_addr);
71         header->seqnum = 0;
72         header->data_len = pkt_len;
73         header->data_cksum = cksum;
74         header->header_cksum = 0; /* Erase header checksum from previous packet */
75         /* Compute header chksum */
76         cksum = 0;
77         for (i = 0; i < HEADER_SIZE; i++) {
78                 cksum += txbuf[i];
79         }
80         header->header_cksum = (uint8_t)(256 - cksum);
81 }
83 int mqtt_init(int comm_uart, int dbg_uart)
84 {
85         int len = 0, ret = 0;
87         memset(txbuf, 0, MQTT_BUFF_SIZE);
88         /* Create the MQTT connect packet */
89         len = mqtt_pack_connect_packet(&mqtt_conn, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE));
90         if (len <= 0) {
91                 uprintf(dbg_uart, "MQTT connect pkt pack error : %d\n", len);
92                 return -E2BIG;
93         }
94         /* Add our packet header */
95         add_packet_header(len);
97         /* Send connect MQTT packet */
98         ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE));
99         if (ret < 0) {
100                 uprintf(dbg_uart, "MQTT send error: %d\n", ret);
101                 return ret;
102         }
103         mqtt_comm_state = MQTT_WAITING_CONNECT_ACK;
104         return 0;
108 struct mqtt_topic topics[] = {
109         { .name = "temp/test", .QoS = 2, },
110         { .name = "will/temp", .QoS = 1, },
111 #ifdef TEST
112         { .name = "#/bla", .QoS = 0, }, /* This one will fail, for test purpose */
113 #else
114         { .name = "+/bla", .QoS = 0, },
115 #endif
116 };
117 struct mqtt_sub_pkt sub_pkt = {
118         .nb_topics = 3,
119         .topics = topics,
120 };
121 static uint16_t sub_packet_id = 1;
123 int mqtt_temp_subscribe(int comm_uart, int dbg_uart)
125         int len = 0, ret = 0;
127         memset(txbuf, 0, MQTT_BUFF_SIZE);
128         /* Create the MQTT publish packet */
129         sub_pkt.packet_id = sub_packet_id;
130         len = mqtt_pack_subscribe_pkt(&sub_pkt, (txbuf + HEADER_SIZE), (MQTT_BUFF_SIZE - HEADER_SIZE));
131         if (len <= 0) {
132                 uprintf(dbg_uart, "MQTT subscribe pkt pack error : %d\n", len);
133                 return len;
134         }
135         /* Add our packet header */
136         add_packet_header(len);
138         /* Send the subscribe MQTT packet */
139         ret = serial_write(comm_uart, (char*)txbuf, (len + HEADER_SIZE));
140         if (ret < 0) {
141                 uprintf(dbg_uart, "MQTT send error: %d\n", ret);
142                 return ret;
143         }
144         mqtt_comm_state = MQTT_WAITING_SUBACK;
145         return 0;
149 int mqtt_handle_packet(char* buf, int comm_uart, int dbg_uart)
151         struct app_header* header = (struct app_header*)buf;
152         int ret = 0;
154         /* Decode address */
155         header->addr = ntohs(header->addr);
156         if ((!buf) || (header->addr != APP_BOARD_ADDRESS)) {
157                 /* This one is not for us, drop it */
158                 uprintf(dbg_uart, "Dropping packet, not for us\n");
159                 return -1;
160         }
161         if (header->type != 'M') {
162                 uprintf(dbg_uart, "Dropping packet, not MQTT\n");
163                 return -2;
164         }
165         /* This is an MQTT packet for us ... handle it */
166         switch (mqtt_comm_state) {
167                 case MQTT_WAITING_CONNECT_ACK: {
168                         /* We must not receive anything before the connect acknowledge */
169                         struct mqtt_connack_reply_pkt* reply = (struct mqtt_connack_reply_pkt*)(buf + HEADER_SIZE);
170                         ret = mqtt_check_connack_reply_pkt(reply);
171                         if (ret != 0) {
172                                 uprintf(dbg_uart, "Not a connack packet (ret: %d) ... protocol flow error\n", ret);
173                                 return -3;
174                         }
175                         /* Valid packet, check return code */
176                         if (reply->ret_code == MQTT_CONNACK_ACCEPTED) {
177                                 uprintf(dbg_uart, "Connection accepted, sending subscriptions\n");
178                                 mqtt_comm_state = MQTT_IDLE;
179                                 ret = mqtt_temp_subscribe(comm_uart, dbg_uart);
180                                 if (ret != 0) {
181                                         uprintf(dbg_uart, "Unable to send our subscriptions: %d\n", ret);
182                                         return ret;
183                                 }
184                         } else {
185                                 uprintf(dbg_uart, "Connection refused: %d\n", reply->ret_code);
186                                 mqtt_comm_state = MQTT_UNCONNECTED;
187                                 return -4;
188                         }
189                         break;
190                 }
191                 case MQTT_WAITING_SUBACK: {
192                         struct mqtt_sub_reply_pkt* reply = (struct mqtt_sub_reply_pkt*)(buf + HEADER_SIZE);
193                         uint8_t* acks = (uint8_t*)(buf + HEADER_SIZE + sizeof(struct mqtt_sub_reply_pkt));
194                         int i = 0;
195                         ret = mqtt_check_suback_reply_pkt(reply, header->data_len);
196                         if (ret != 0) {
197                                 uprintf(dbg_uart, "Not a suback packet (ret: %d) ... protocol flow error\n", ret);
198                                 return -5;
199                         }
200                         if (ntohs(reply->acked_pkt_id) != sub_packet_id) {
201                                 uprintf(dbg_uart, "Not a suback for the right packet (got %d, our is %d) ...\n",
202                                                         ntohs(reply->acked_pkt_id), sub_packet_id);
203                                 return -6;
204                         }
205                         /* Check that all subscriptions got accepted */
206                         for (i = 0; i < sub_pkt.nb_topics; i++) {
207                                 if (acks[i] > MQTT_QoS_2) {
208                                         /* Display info on debug UART ... not much to be done about it */
209                                         uprintf(dbg_uart, "Subscription n°%d (%s) got refused\n", i, sub_pkt.topics[i]);
210                                 }
211                         }
212                         uprintf(dbg_uart, "Subscriptions accepted\n");
213                         mqtt_comm_state = MQTT_IDLE;
214                         sub_packet_id++;
215                         break;
216                 }
217                 case MQTT_WAITING_PUBACK:
218                 case MQTT_WAITING_PUBREC:
219                 case MQTT_WAITING_PUBREL: {
220                         /* Check for this being a pubrel (release) and send pubcomp (complete) */
221                         struct mqtt_publish_reply_pkt* pub_comp = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE);
222                         struct mqtt_publish_reply_pkt* pub_rel = (struct mqtt_publish_reply_pkt*)(buf + HEADER_SIZE);
223                         int ret = mqtt_check_publish_reply_pkt(pub_rel, MQTT_CONTROL_PUBREL);
224                         int len = 0;
225                         if (ret < 0) {
226                                 uprintf(dbg_uart, "MQTT packet is not a valid pubrel.\n");
227                                 return -7;
228                         }
229                         uprintf(dbg_uart, "MQTT pubrel received, sending pubcomp\n");
230                         len = mqtt_pack_publish_reply_pkt((uint8_t*)pub_comp, ntohs(pub_rel->acked_pkt_id), MQTT_CONTROL_PUBCOMP);
231                         if (len != 4) {
232                                 uprintf(dbg_uart, "MQTT pack pubrel error: %d\n", len);
233                                 return len;
234                         }
235                         /* Add our packet header */
236                         add_packet_header(len);
237                         ret = serial_write(comm_uart, (char*)&txbuf, (len + HEADER_SIZE));
238                         if (ret < 0) {
239                                 uprintf(dbg_uart, "MQTT send error: %d\n", ret);
240                                 return ret;
241                         }
242                         mqtt_comm_state = MQTT_IDLE;
243                         break;
244                 }
245                 case MQTT_WAITING_PUBCOMP:
246                 case MQTT_WAITING_UNSUBACK:
247                 case MQTT_WAITING_PINGRESP:
248                 case MQTT_IDLE: {
249                         struct mqtt_publish_pkt pub_rx;
250                         struct mqtt_publish_reply_pkt* pub_ack = (struct mqtt_publish_reply_pkt*)(txbuf + HEADER_SIZE);
251                         int ret = 0, len = 0;
253                         /* Make sure this is a publish packet */
254                         ret = mqtt_unpack_publish_packet(&pub_rx, (uint8_t*)(buf + HEADER_SIZE),
255                                                                                                         header->data_len);
256                         if (ret <= 0) {
257                                 uprintf(dbg_uart, "Received packet in idle state is not a publish packet (got %d): %d.\n",
258                                                         ((buf[HEADER_SIZE] >> 4) & 0x0F), ret);
259                                 return -7;
260                         }
261                         uprintf(dbg_uart, "Received publication\n");
262                         /* Send the pub ack packet ? */
263                         if (pub_rx.topic.QoS != 0) {
264                                 uint8_t type = MQTT_CONTROL_PUBACK;
265                                 if (pub_rx.topic.QoS == MQTT_QoS_2) {
266                                         type = MQTT_CONTROL_PUBREC;
267                                 }
268                                 len = mqtt_pack_publish_reply_pkt((uint8_t*)pub_ack, pub_rx.packet_id, type);
269                                 if (len != 4) {
270                                         uprintf(dbg_uart, "MQTT pack puback error: %d\n", len);
271                                         return len;
272                                 }
273                                 /* Add our packet header */
274                                 add_packet_header(len);
275                                 ret = serial_write(comm_uart, (char*)&txbuf, (len + HEADER_SIZE));
276                                 if (ret < 0) {
277                                         uprintf(dbg_uart, "MQTT send error: %d\n", ret);
278                                         return ret;
279                                 }
280                                 if (pub_rx.topic.QoS == MQTT_QoS_2) {
281                                         mqtt_comm_state = MQTT_WAITING_PUBREL;
282                                 }
283                         }
284                         /* Perform something with the packet ... */
285                         break;
286                 }
287         }
288         return 0;