VirtualBox

source: vbox/trunk/src/libs/curl-7.83.1/lib/mqtt.c@ 97138

最後變更 在這個檔案從97138是 95312,由 vboxsync 提交於 3 年 前

libs/{curl,libxml2}: OSE export fixes, bugref:8515

  • 屬性 svn:eol-style 設為 native
檔案大小: 22.0 KB
 
1/***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 2020 - 2022, Daniel Stenberg, <[email protected]>, et al.
9 * Copyright (C) 2019, Björn Stenberg, <[email protected]>
10 *
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.se/docs/copyright.html.
14 *
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
18 *
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
21 *
22 ***************************************************************************/
23
24#include "curl_setup.h"
25
26#ifndef CURL_DISABLE_MQTT
27
28#include "urldata.h"
29#include <curl/curl.h>
30#include "transfer.h"
31#include "sendf.h"
32#include "progress.h"
33#include "mqtt.h"
34#include "select.h"
35#include "strdup.h"
36#include "url.h"
37#include "escape.h"
38#include "warnless.h"
39#include "curl_printf.h"
40#include "curl_memory.h"
41#include "multiif.h"
42#include "rand.h"
43
44/* The last #include file should be: */
45#include "memdebug.h"
46
47#define MQTT_MSG_CONNECT 0x10
48#define MQTT_MSG_CONNACK 0x20
49#define MQTT_MSG_PUBLISH 0x30
50#define MQTT_MSG_SUBSCRIBE 0x82
51#define MQTT_MSG_SUBACK 0x90
52#define MQTT_MSG_DISCONNECT 0xe0
53
54#define MQTT_CONNACK_LEN 2
55#define MQTT_SUBACK_LEN 3
56#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58/*
59 * Forward declarations.
60 */
61
62static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
63static CURLcode mqtt_done(struct Curl_easy *data,
64 CURLcode status, bool premature);
65static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
66static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
67 curl_socket_t *sock);
68static CURLcode mqtt_setup_conn(struct Curl_easy *data,
69 struct connectdata *conn);
70
71/*
72 * MQTT protocol handler.
73 */
74
75const struct Curl_handler Curl_handler_mqtt = {
76 "MQTT", /* scheme */
77 mqtt_setup_conn, /* setup_connection */
78 mqtt_do, /* do_it */
79 mqtt_done, /* done */
80 ZERO_NULL, /* do_more */
81 ZERO_NULL, /* connect_it */
82 ZERO_NULL, /* connecting */
83 mqtt_doing, /* doing */
84 ZERO_NULL, /* proto_getsock */
85 mqtt_getsock, /* doing_getsock */
86 ZERO_NULL, /* domore_getsock */
87 ZERO_NULL, /* perform_getsock */
88 ZERO_NULL, /* disconnect */
89 ZERO_NULL, /* readwrite */
90 ZERO_NULL, /* connection_check */
91 ZERO_NULL, /* attach connection */
92 PORT_MQTT, /* defport */
93 CURLPROTO_MQTT, /* protocol */
94 CURLPROTO_MQTT, /* family */
95 PROTOPT_NONE /* flags */
96};
97
98static CURLcode mqtt_setup_conn(struct Curl_easy *data,
99 struct connectdata *conn)
100{
101 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
102 during this request */
103 struct MQTT *mq;
104 (void)conn;
105 DEBUGASSERT(data->req.p.mqtt == NULL);
106
107 mq = calloc(1, sizeof(struct MQTT));
108 if(!mq)
109 return CURLE_OUT_OF_MEMORY;
110 data->req.p.mqtt = mq;
111 return CURLE_OK;
112}
113
114static CURLcode mqtt_send(struct Curl_easy *data,
115 char *buf, size_t len)
116{
117 CURLcode result = CURLE_OK;
118 struct connectdata *conn = data->conn;
119 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
120 struct MQTT *mq = data->req.p.mqtt;
121 ssize_t n;
122 result = Curl_write(data, sockfd, buf, len, &n);
123 if(!result)
124 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
125 if(len != (size_t)n) {
126 size_t nsend = len - n;
127 char *sendleftovers = Curl_memdup(&buf[n], nsend);
128 if(!sendleftovers)
129 return CURLE_OUT_OF_MEMORY;
130 mq->sendleftovers = sendleftovers;
131 mq->nsend = nsend;
132 }
133 else {
134 mq->sendleftovers = NULL;
135 mq->nsend = 0;
136 }
137 return result;
138}
139
140/* Generic function called by the multi interface to figure out what socket(s)
141 to wait for and for what actions during the DOING and PROTOCONNECT
142 states */
143static int mqtt_getsock(struct Curl_easy *data,
144 struct connectdata *conn,
145 curl_socket_t *sock)
146{
147 (void)data;
148 sock[0] = conn->sock[FIRSTSOCKET];
149 return GETSOCK_READSOCK(FIRSTSOCKET);
150}
151
152static int mqtt_encode_len(char *buf, size_t len)
153{
154 unsigned char encoded;
155 int i;
156
157 for(i = 0; (len > 0) && (i<4); i++) {
158 encoded = len % 0x80;
159 len /= 0x80;
160 if(len)
161 encoded |= 0x80;
162 buf[i] = encoded;
163 }
164
165 return i;
166}
167
168/* add the passwd to the CONNECT packet */
169static int add_passwd(const char *passwd, const size_t plen,
170 char *pkt, const size_t start, int remain_pos)
171{
172 /* magic number that need to be set properly */
173 const size_t conn_flags_pos = remain_pos + 8;
174 if(plen > 0xffff)
175 return 1;
176
177 /* set password flag */
178 pkt[conn_flags_pos] |= 0x40;
179
180 /* length of password provided */
181 pkt[start] = (char)((plen >> 8) & 0xFF);
182 pkt[start + 1] = (char)(plen & 0xFF);
183 memcpy(&pkt[start + 2], passwd, plen);
184 return 0;
185}
186
187/* add user to the CONN packet */
188static int add_user(const char *username, const size_t ulen,
189 unsigned char *pkt, const size_t start, int remain_pos)
190{
191 /* magic number that need to be set properly */
192 const size_t conn_flags_pos = remain_pos + 8;
193 if(ulen > 0xffff)
194 return 1;
195
196 /* set username flag */
197 pkt[conn_flags_pos] |= 0x80;
198 /* length of username provided */
199 pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
200 pkt[start + 1] = (unsigned char)(ulen & 0xFF);
201 memcpy(&pkt[start + 2], username, ulen);
202 return 0;
203}
204
205/* add client ID to the CONN packet */
206static int add_client_id(const char *client_id, const size_t client_id_len,
207 char *pkt, const size_t start)
208{
209 if(client_id_len != MQTT_CLIENTID_LEN)
210 return 1;
211 pkt[start] = 0x00;
212 pkt[start + 1] = MQTT_CLIENTID_LEN;
213 memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
214 return 0;
215}
216
217/* Set initial values of CONN packet */
218static int init_connpack(char *packet, char *remain, int remain_pos)
219{
220 /* Fixed header starts */
221 /* packet type */
222 packet[0] = MQTT_MSG_CONNECT;
223 /* remaining length field */
224 memcpy(&packet[1], remain, remain_pos);
225 /* Fixed header ends */
226
227 /* Variable header starts */
228 /* protocol length */
229 packet[remain_pos + 1] = 0x00;
230 packet[remain_pos + 2] = 0x04;
231 /* protocol name */
232 packet[remain_pos + 3] = 'M';
233 packet[remain_pos + 4] = 'Q';
234 packet[remain_pos + 5] = 'T';
235 packet[remain_pos + 6] = 'T';
236 /* protocol level */
237 packet[remain_pos + 7] = 0x04;
238 /* CONNECT flag: CleanSession */
239 packet[remain_pos + 8] = 0x02;
240 /* keep-alive 0 = disabled */
241 packet[remain_pos + 9] = 0x00;
242 packet[remain_pos + 10] = 0x3c;
243 /*end of variable header*/
244 return remain_pos + 10;
245}
246
247static CURLcode mqtt_connect(struct Curl_easy *data)
248{
249 CURLcode result = CURLE_OK;
250 int pos = 0;
251 int rc = 0;
252 /*remain length*/
253 int remain_pos = 0;
254 char remain[4] = {0};
255 size_t packetlen = 0;
256 size_t payloadlen = 0;
257 size_t start_user = 0;
258 size_t start_pwd = 0;
259 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
260 const size_t clen = strlen("curl");
261 char *packet = NULL;
262
263 /* extracting username from request */
264 const char *username = data->state.aptr.user ?
265 data->state.aptr.user : "";
266 const size_t ulen = strlen(username);
267 /* extracting password from request */
268 const char *passwd = data->state.aptr.passwd ?
269 data->state.aptr.passwd : "";
270 const size_t plen = strlen(passwd);
271
272 payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2;
273 /* The plus 2 are for the MSB and LSB describing the length of the string to
274 * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
275 if(ulen)
276 payloadlen += 2;
277 if(plen)
278 payloadlen += 2;
279
280 /* getting how much occupy the remain length */
281 remain_pos = mqtt_encode_len(remain, payloadlen + 10);
282
283 /* 10 length of variable header and 1 the first byte of the fixed header */
284 packetlen = payloadlen + 10 + remain_pos + 1;
285
286 /* allocating packet */
287 if(packetlen > 268435455)
288 return CURLE_WEIRD_SERVER_REPLY;
289 packet = malloc(packetlen);
290 if(!packet)
291 return CURLE_OUT_OF_MEMORY;
292 memset(packet, 0, packetlen);
293
294 /* set initial values for CONN pack */
295 pos = init_connpack(packet, remain, remain_pos);
296
297 result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
298 MQTT_CLIENTID_LEN - clen + 1);
299 /* add client id */
300 rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
301 if(rc) {
302 failf(data, "Client ID length mismatched: [%lu]", strlen(client_id));
303 result = CURLE_WEIRD_SERVER_REPLY;
304 goto end;
305 }
306 infof(data, "Using client id '%s'", client_id);
307
308 /* position where starts the user payload */
309 start_user = pos + 3 + MQTT_CLIENTID_LEN;
310 /* position where starts the password payload */
311 start_pwd = start_user + ulen;
312 /* if user name was provided, add it to the packet */
313 if(ulen) {
314 start_pwd += 2;
315
316 rc = add_user(username, ulen,
317 (unsigned char *)packet, start_user, remain_pos);
318 if(rc) {
319 failf(data, "Username is too large: [%lu]", ulen);
320 result = CURLE_WEIRD_SERVER_REPLY;
321 goto end;
322 }
323 }
324
325 /* if passwd was provided, add it to the packet */
326 if(plen) {
327 rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
328 if(rc) {
329 failf(data, "Password is too large: [%lu]", plen);
330 result = CURLE_WEIRD_SERVER_REPLY;
331 goto end;
332 }
333 }
334
335 if(!result)
336 result = mqtt_send(data, packet, packetlen);
337
338end:
339 if(packet)
340 free(packet);
341 Curl_safefree(data->state.aptr.user);
342 Curl_safefree(data->state.aptr.passwd);
343 return result;
344}
345
346static CURLcode mqtt_disconnect(struct Curl_easy *data)
347{
348 CURLcode result = CURLE_OK;
349 struct MQTT *mq = data->req.p.mqtt;
350 result = mqtt_send(data, (char *)"\xe0\x00", 2);
351 Curl_safefree(mq->sendleftovers);
352 return result;
353}
354
355static CURLcode mqtt_verify_connack(struct Curl_easy *data)
356{
357 CURLcode result;
358 struct connectdata *conn = data->conn;
359 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
360 unsigned char readbuf[MQTT_CONNACK_LEN];
361 ssize_t nread;
362
363 result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
364 if(result)
365 goto fail;
366
367 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
368
369 /* fixme */
370 if(nread < MQTT_CONNACK_LEN) {
371 result = CURLE_WEIRD_SERVER_REPLY;
372 goto fail;
373 }
374
375 /* verify CONNACK */
376 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
377 failf(data, "Expected %02x%02x but got %02x%02x",
378 0x00, 0x00, readbuf[0], readbuf[1]);
379 result = CURLE_WEIRD_SERVER_REPLY;
380 }
381
382fail:
383 return result;
384}
385
386static CURLcode mqtt_get_topic(struct Curl_easy *data,
387 char **topic, size_t *topiclen)
388{
389 char *path = data->state.up.path;
390 if(strlen(path) > 1)
391 return Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA);
392 failf(data, "No MQTT topic found. Forgot to URL encode it?");
393 return CURLE_URL_MALFORMAT;
394}
395
396static CURLcode mqtt_subscribe(struct Curl_easy *data)
397{
398 CURLcode result = CURLE_OK;
399 char *topic = NULL;
400 size_t topiclen;
401 unsigned char *packet = NULL;
402 size_t packetlen;
403 char encodedsize[4];
404 size_t n;
405 struct connectdata *conn = data->conn;
406
407 result = mqtt_get_topic(data, &topic, &topiclen);
408 if(result)
409 goto fail;
410
411 conn->proto.mqtt.packetid++;
412
413 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
414 + 2 bytes topic length + QoS byte */
415 n = mqtt_encode_len((char *)encodedsize, packetlen);
416 packetlen += n + 1; /* add one for the control packet type byte */
417
418 packet = malloc(packetlen);
419 if(!packet) {
420 result = CURLE_OUT_OF_MEMORY;
421 goto fail;
422 }
423
424 packet[0] = MQTT_MSG_SUBSCRIBE;
425 memcpy(&packet[1], encodedsize, n);
426 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
427 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
428 packet[3 + n] = (topiclen >> 8) & 0xff;
429 packet[4 + n ] = topiclen & 0xff;
430 memcpy(&packet[5 + n], topic, topiclen);
431 packet[5 + n + topiclen] = 0; /* QoS zero */
432
433 result = mqtt_send(data, (char *)packet, packetlen);
434
435fail:
436 free(topic);
437 free(packet);
438 return result;
439}
440
441/*
442 * Called when the first byte was already read.
443 */
444static CURLcode mqtt_verify_suback(struct Curl_easy *data)
445{
446 CURLcode result;
447 struct connectdata *conn = data->conn;
448 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
449 unsigned char readbuf[MQTT_SUBACK_LEN];
450 ssize_t nread;
451 struct mqtt_conn *mqtt = &conn->proto.mqtt;
452
453 result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
454 if(result)
455 goto fail;
456
457 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
458
459 /* fixme */
460 if(nread < MQTT_SUBACK_LEN) {
461 result = CURLE_WEIRD_SERVER_REPLY;
462 goto fail;
463 }
464
465 /* verify SUBACK */
466 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
467 readbuf[1] != (mqtt->packetid & 0xff) ||
468 readbuf[2] != 0x00)
469 result = CURLE_WEIRD_SERVER_REPLY;
470
471fail:
472 return result;
473}
474
475static CURLcode mqtt_publish(struct Curl_easy *data)
476{
477 CURLcode result;
478 char *payload = data->set.postfields;
479 size_t payloadlen;
480 char *topic = NULL;
481 size_t topiclen;
482 unsigned char *pkt = NULL;
483 size_t i = 0;
484 size_t remaininglength;
485 size_t encodelen;
486 char encodedbytes[4];
487 curl_off_t postfieldsize = data->set.postfieldsize;
488
489 if(!payload)
490 return CURLE_BAD_FUNCTION_ARGUMENT;
491 if(postfieldsize < 0)
492 payloadlen = strlen(payload);
493 else
494 payloadlen = (size_t)postfieldsize;
495
496 result = mqtt_get_topic(data, &topic, &topiclen);
497 if(result)
498 goto fail;
499
500 remaininglength = payloadlen + 2 + topiclen;
501 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
502
503 /* add the control byte and the encoded remaining length */
504 pkt = malloc(remaininglength + 1 + encodelen);
505 if(!pkt) {
506 result = CURLE_OUT_OF_MEMORY;
507 goto fail;
508 }
509
510 /* assemble packet */
511 pkt[i++] = MQTT_MSG_PUBLISH;
512 memcpy(&pkt[i], encodedbytes, encodelen);
513 i += encodelen;
514 pkt[i++] = (topiclen >> 8) & 0xff;
515 pkt[i++] = (topiclen & 0xff);
516 memcpy(&pkt[i], topic, topiclen);
517 i += topiclen;
518 memcpy(&pkt[i], payload, payloadlen);
519 i += payloadlen;
520 result = mqtt_send(data, (char *)pkt, i);
521
522fail:
523 free(pkt);
524 free(topic);
525 return result;
526}
527
528static size_t mqtt_decode_len(unsigned char *buf,
529 size_t buflen, size_t *lenbytes)
530{
531 size_t len = 0;
532 size_t mult = 1;
533 size_t i;
534 unsigned char encoded = 128;
535
536 for(i = 0; (i < buflen) && (encoded & 128); i++) {
537 encoded = buf[i];
538 len += (encoded & 127) * mult;
539 mult *= 128;
540 }
541
542 if(lenbytes)
543 *lenbytes = i;
544
545 return len;
546}
547
548#ifdef CURLDEBUG
549static const char *statenames[]={
550 "MQTT_FIRST",
551 "MQTT_REMAINING_LENGTH",
552 "MQTT_CONNACK",
553 "MQTT_SUBACK",
554 "MQTT_SUBACK_COMING",
555 "MQTT_PUBWAIT",
556 "MQTT_PUB_REMAIN",
557
558 "NOT A STATE"
559};
560#endif
561
562/* The only way to change state */
563static void mqstate(struct Curl_easy *data,
564 enum mqttstate state,
565 enum mqttstate nextstate) /* used if state == FIRST */
566{
567 struct connectdata *conn = data->conn;
568 struct mqtt_conn *mqtt = &conn->proto.mqtt;
569#ifdef CURLDEBUG
570 infof(data, "%s (from %s) (next is %s)",
571 statenames[state],
572 statenames[mqtt->state],
573 (state == MQTT_FIRST)? statenames[nextstate] : "");
574#endif
575 mqtt->state = state;
576 if(state == MQTT_FIRST)
577 mqtt->nextstate = nextstate;
578}
579
580
581/* for the publish packet */
582#define MQTT_HEADER_LEN 5 /* max 5 bytes */
583
584static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
585{
586 CURLcode result = CURLE_OK;
587 struct connectdata *conn = data->conn;
588 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
589 ssize_t nread;
590 unsigned char *pkt = (unsigned char *)data->state.buffer;
591 size_t remlen;
592 struct mqtt_conn *mqtt = &conn->proto.mqtt;
593 struct MQTT *mq = data->req.p.mqtt;
594 unsigned char packet;
595
596 switch(mqtt->state) {
597 MQTT_SUBACK_COMING:
598 case MQTT_SUBACK_COMING:
599 result = mqtt_verify_suback(data);
600 if(result)
601 break;
602
603 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
604 break;
605
606 case MQTT_SUBACK:
607 case MQTT_PUBWAIT:
608 /* we are expecting PUBLISH or SUBACK */
609 packet = mq->firstbyte & 0xf0;
610 if(packet == MQTT_MSG_PUBLISH)
611 mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
612 else if(packet == MQTT_MSG_SUBACK) {
613 mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
614 goto MQTT_SUBACK_COMING;
615 }
616 else if(packet == MQTT_MSG_DISCONNECT) {
617 infof(data, "Got DISCONNECT");
618 *done = TRUE;
619 goto end;
620 }
621 else {
622 result = CURLE_WEIRD_SERVER_REPLY;
623 goto end;
624 }
625
626 /* -- switched state -- */
627 remlen = mq->remaining_length;
628 infof(data, "Remaining length: %zd bytes", remlen);
629 if(data->set.max_filesize &&
630 (curl_off_t)remlen > data->set.max_filesize) {
631 failf(data, "Maximum file size exceeded");
632 result = CURLE_FILESIZE_EXCEEDED;
633 goto end;
634 }
635 Curl_pgrsSetDownloadSize(data, remlen);
636 data->req.bytecount = 0;
637 data->req.size = remlen;
638 mq->npacket = remlen; /* get this many bytes */
639 /* FALLTHROUGH */
640 case MQTT_PUB_REMAIN: {
641 /* read rest of packet, but no more. Cap to buffer size */
642 struct SingleRequest *k = &data->req;
643 size_t rest = mq->npacket;
644 if(rest > (size_t)data->set.buffer_size)
645 rest = (size_t)data->set.buffer_size;
646 result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
647 if(result) {
648 if(CURLE_AGAIN == result) {
649 infof(data, "EEEE AAAAGAIN");
650 }
651 goto end;
652 }
653 if(!nread) {
654 infof(data, "server disconnected");
655 result = CURLE_PARTIAL_FILE;
656 goto end;
657 }
658 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
659
660 mq->npacket -= nread;
661 k->bytecount += nread;
662 Curl_pgrsSetDownloadCounter(data, k->bytecount);
663
664 /* if QoS is set, message contains packet id */
665
666 result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
667 if(result)
668 goto end;
669
670 if(!mq->npacket)
671 /* no more PUBLISH payload, back to subscribe wait state */
672 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
673 break;
674 }
675 default:
676 DEBUGASSERT(NULL); /* illegal state */
677 result = CURLE_WEIRD_SERVER_REPLY;
678 goto end;
679 }
680 end:
681 return result;
682}
683
684static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
685{
686 CURLcode result = CURLE_OK;
687 *done = FALSE; /* unconditionally */
688
689 result = mqtt_connect(data);
690 if(result) {
691 failf(data, "Error %d sending MQTT CONN request", result);
692 return result;
693 }
694 mqstate(data, MQTT_FIRST, MQTT_CONNACK);
695 return CURLE_OK;
696}
697
698static CURLcode mqtt_done(struct Curl_easy *data,
699 CURLcode status, bool premature)
700{
701 struct MQTT *mq = data->req.p.mqtt;
702 (void)status;
703 (void)premature;
704 Curl_safefree(mq->sendleftovers);
705 return CURLE_OK;
706}
707
708static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
709{
710 CURLcode result = CURLE_OK;
711 struct connectdata *conn = data->conn;
712 struct mqtt_conn *mqtt = &conn->proto.mqtt;
713 struct MQTT *mq = data->req.p.mqtt;
714 ssize_t nread;
715 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
716 unsigned char *pkt = (unsigned char *)data->state.buffer;
717 unsigned char byte;
718
719 *done = FALSE;
720
721 if(mq->nsend) {
722 /* send the remainder of an outgoing packet */
723 char *ptr = mq->sendleftovers;
724 result = mqtt_send(data, mq->sendleftovers, mq->nsend);
725 free(ptr);
726 if(result)
727 return result;
728 }
729
730 infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
731 switch(mqtt->state) {
732 case MQTT_FIRST:
733 /* Read the initial byte only */
734 result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread);
735 if(result)
736 break;
737 else if(!nread) {
738 failf(data, "Connection disconnected");
739 *done = TRUE;
740 result = CURLE_RECV_ERROR;
741 break;
742 }
743 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
744 /* remember the first byte */
745 mq->npacket = 0;
746 mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
747 /* FALLTHROUGH */
748 case MQTT_REMAINING_LENGTH:
749 do {
750 result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
751 if(!nread)
752 break;
753 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
754 pkt[mq->npacket++] = byte;
755 } while((byte & 0x80) && (mq->npacket < 4));
756 if(nread && (byte & 0x80))
757 /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
758 127 * 128^3 bytes. server tried to send more */
759 result = CURLE_WEIRD_SERVER_REPLY;
760 if(result)
761 break;
762 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
763 mq->npacket = 0;
764 if(mq->remaining_length) {
765 mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
766 break;
767 }
768 mqstate(data, MQTT_FIRST, MQTT_FIRST);
769
770 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
771 infof(data, "Got DISCONNECT");
772 *done = TRUE;
773 }
774 break;
775 case MQTT_CONNACK:
776 result = mqtt_verify_connack(data);
777 if(result)
778 break;
779
780 if(data->state.httpreq == HTTPREQ_POST) {
781 result = mqtt_publish(data);
782 if(!result) {
783 result = mqtt_disconnect(data);
784 *done = TRUE;
785 }
786 mqtt->nextstate = MQTT_FIRST;
787 }
788 else {
789 result = mqtt_subscribe(data);
790 if(!result) {
791 mqstate(data, MQTT_FIRST, MQTT_SUBACK);
792 }
793 }
794 break;
795
796 case MQTT_SUBACK:
797 case MQTT_PUBWAIT:
798 case MQTT_PUB_REMAIN:
799 result = mqtt_read_publish(data, done);
800 break;
801
802 default:
803 failf(data, "State not handled yet");
804 *done = TRUE;
805 break;
806 }
807
808 if(result == CURLE_AGAIN)
809 result = CURLE_OK;
810 return result;
811}
812
813#endif /* CURL_DISABLE_MQTT */
注意: 瀏覽 TracBrowser 來幫助您使用儲存庫瀏覽器

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette