esp-mqtt subscribe and retained messages

User avatar
PaulVdBergh
Posts: 42
Joined: Fri Feb 23, 2018 4:45 pm
Location: Brasschaat, Belgium

esp-mqtt subscribe and retained messages

Postby PaulVdBergh » Sat Aug 25, 2018 8:18 pm

Hi All,

I'm having problems with receiving retained messages using the esp-mqtt component.

My setup:
Developing on Debian 9.5.0 running in a Win10 VirtualBox (5.2.18) using Eclipse Photon (4.8.0) and ESP-IDF (v3.2-dev-596-g7abed5fc-dirty).
Mosquitto MQTT broker running on RPi 3B+

My Code:

Code: Select all

/*
 * main.cpp
 *
 *  Created on: Aug 25, 2018
 *      Author: paul
 */

#include <cstring>

#include <freertos/FreeRTOS.h>
#include <freertos/event_groups.h>

#include <esp_event_loop.h>
#include <esp_log.h>
#include <esp_wifi.h>
#include <mqtt_client.h>
#include <nvs_flash.h>

#include "ESP32Tools.h"

using namespace IoTTV2;

#ifndef CONFIG_WIFI_SSID
//#define CONFIG_WIFI_SSID "IoT_and_Trains"
#define CONFIG_WIFI_SSID "devolo-8f4"
#endif

#ifndef CONFIG_WIFI_PASSWORD
//#define CONFIG_WIFI_PASSWORD "IoTTESP32"
#define CONFIG_WIFI_PASSWORD "AYBJMEITONIPHEBQ"
#endif

#ifndef CONFIG_MQTT_BROKER_IP
#define CONFIG_MQTT_BROKER_IP "mqtt://192.168.1.100"
#endif

static const char* TAG = "IoTT_ESP32";

static EventGroupHandle_t   wifi_event_group;
const static int CONNECTED_BIT = BIT0;

static esp_err_t wifi_event_handler(void* ctx, system_event_t* event)
{
   switch(event->event_id)
   {
      case SYSTEM_EVENT_STA_START:
      {
         ESP_LOGI(TAG, "SYSTEM_EVENT_STA_START");
         ESP_ERROR_CHECK(esp_wifi_connect());
         break;
      }

      case SYSTEM_EVENT_STA_GOT_IP:
      {
         ESP_LOGI(TAG, "SYSTEM_EVENT_STA_GOT_IP");
         xEventGroupSetBits(wifi_event_group, CONNECTED_BIT);
         break;
      }

      case SYSTEM_EVENT_STA_DISCONNECTED:
      {
         ESP_LOGI(TAG, "SYSTEM_EVENT_STA_DISCONNECTED");
         ESP_ERROR_CHECK(esp_wifi_connect());
         xEventGroupClearBits(wifi_event_group, CONNECTED_BIT);
         break;
      }

      default:
      {
         break;
      }
   }
   return ESP_OK;
}

static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
{
   esp_mqtt_client_handle_t client = event->client;
   int msg_id;

   switch(event->event_id)
   {
      case MQTT_EVENT_CONNECTED:
      {
            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
            msg_id = esp_mqtt_client_subscribe(client, "#", 0);
            esp_mqtt_client_publish(client, ESP32_Tools::getMqttStatusTopic(), "Online", strlen("Online"), 0, 1);
            ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
         break;
      }

      case MQTT_EVENT_DISCONNECTED:
      {
         ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
         break;
      }

      case MQTT_EVENT_SUBSCRIBED:
      {
            ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;
      }

      case MQTT_EVENT_UNSUBSCRIBED:
      {
         ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
         break;
      }

      case MQTT_EVENT_PUBLISHED:
      {
         ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
         break;
      }

      case MQTT_EVENT_DATA:
      {
            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
            printf("DATA=%.*s\r\n", event->data_len, event->data);
         break;
      }

      case MQTT_EVENT_ERROR:
      {
         ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
         break;
      }
   }
   return ESP_OK;
}

static void wifi_init(void)
{
   tcpip_adapter_init();
   wifi_event_group = xEventGroupCreate();
   ESP_ERROR_CHECK(esp_event_loop_init(wifi_event_handler, NULL));

   wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
   ESP_ERROR_CHECK(esp_wifi_init(&cfg));
   ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM));

   wifi_config_t wifi_config;
   memset(&wifi_config, 0, sizeof(wifi_config));
   memcpy(&wifi_config.sta.ssid, CONFIG_WIFI_SSID, strlen(CONFIG_WIFI_SSID));
   memcpy(&wifi_config.sta.password, CONFIG_WIFI_PASSWORD, strlen(CONFIG_WIFI_PASSWORD));

   ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
   ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config));
   ESP_LOGI(TAG, "Start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, CONFIG_WIFI_PASSWORD);
   ESP_ERROR_CHECK(esp_wifi_start());

   ESP_LOGI(TAG, "Waiting for wifi");
   xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY);
   ESP_LOGI(TAG, "Wifi connected to %s", CONFIG_WIFI_SSID);
}

static void mqtt_app_start(void)
{
   esp_mqtt_client_config_t mqtt_cfg;
   memset(&mqtt_cfg, 0, sizeof(mqtt_cfg));
   mqtt_cfg.uri = CONFIG_MQTT_BROKER_IP;
   mqtt_cfg.event_handle = mqtt_event_handler;

   mqtt_cfg.client_id = ESP32_Tools::getMqttUniqueName();
   mqtt_cfg.lwt_topic = ESP32_Tools::getMqttStatusTopic();
   mqtt_cfg.lwt_msg = "Offline";
   mqtt_cfg.lwt_msg_len = strlen(mqtt_cfg.lwt_msg);
   mqtt_cfg.lwt_qos = 0;
   mqtt_cfg.lwt_retain = 1;
   //   mqtt_cfg.user_context = (void*)xxx;

   esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
   esp_mqtt_client_start(client);
}

/*
 * Main program entry point
 */
extern "C" void app_main(void)
{
   ESP_LOGI(TAG, "Startup...");
   ESP_LOGI(TAG, "Free memory: %d bytes", esp_get_free_heap_size());
   ESP_LOGI(TAG, "IDF version: %s", esp_get_idf_version());

   nvs_flash_init();
   wifi_init();
   mqtt_app_start();
}


The problem appears when multiple retained messages are received when connecting to the broker. Monitoring the broker gives this result:
root@raspberrypi:~# mosquitto_sub -h "192.168.1.100" -t "#" -d -v
Client mosqsub/1848-raspberryp sending CONNECT
Client mosqsub/1848-raspberryp received CONNACK
Client mosqsub/1848-raspberryp sending SUBSCRIBE (Mid: 1, Topic: #, QoS: 0)
Client mosqsub/1848-raspberryp received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/1848-raspberryp received PUBLISH (d0, q0, r1, m0, 'TBTIoT/Devices/11936008677624121892/Status', ... (6 bytes))
TBTIoT/Devices/11936008677624121892/Status Online
Client mosqsub/1848-raspberryp received PUBLISH (d0, q0, r1, m0, 'IoTT/Devices/11936008677624121892/Status', ... (7 bytes))
IoTT/Devices/11936008677624121892/Status Offline
Client mosqsub/1848-raspberryp received PUBLISH (d0, q0, r1, m0, 'IoTT/Devices/ESP32_11A738/Status', ... (6 bytes))
IoTT/Devices/ESP32_11A738/Status Online

While ESP32 only receives/displays the first message:
paul@debian-Testing:~/IoTTV2/IoTT_ESP32$ make monitor
MONITOR
--- idf_monitor on /dev/ttyUSB1 115200 ---
--- Quit: Ctrl+] | Menu: Ctrl+T | Help: Ctrl+T followed by Ctrl+H ---
ets Jun 8 2016 00:22:57

rst:0x1 (POWERON_RESET),boot:0x3e (SPI_FAST_FLASH_BOOT)
flash read err, 1000
Falling back to built-in command interpreter.
OK
>ets Jun 8 2016 00:22:57

rst:0x10 (RTCWDT_RTC_RESET),boot:0x3e (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:2
load:0x3fff0010,len:4
load:0x3fff0014,len:5956
load:0x40078000,len:9308
load:0x40080400,len:5780
entry 0x40080744
I (177) boot: ESP-IDF v3.2-dev-596-g7abed5fc-dirty 2nd stage bootloader
I (177) boot: compile time 21:49:14
I (235) boot: Enabling RNG early entropy source...
I (235) boot: SPI Speed : 40MHz
I (236) boot: SPI Mode : DIO
I (243) boot: SPI Flash Size : 4MB
I (256) boot: Partition Table:
I (267) boot: ## Label Usage Type ST Offset Length
I (289) boot: 0 nvs WiFi data 01 02 00009000 00006000
I (313) boot: 1 phy_init RF data 01 01 0000f000 00001000
I (336) boot: 2 factory factory app 00 00 00010000 00100000
I (359) boot: End of partition table
I (372) esp_image: segment 0: paddr=0x00010020 vaddr=0x3f400020 size=0x18f78 (102264) map
I (621) esp_image: segment 1: paddr=0x00028fa0 vaddr=0x3ffb0000 size=0x03380 ( 13184) load
I (655) esp_image: segment 2: paddr=0x0002c328 vaddr=0x3ffb3380 size=0x00000 ( 0) load
I (656) esp_image: segment 3: paddr=0x0002c330 vaddr=0x40080000 size=0x00400 ( 1024) load
0x40080000: _WindowOverflow4 at /home/paul/esp/esp-idf/components/freertos/xtensa_vectors.S:1685

I (679) esp_image: segment 4: paddr=0x0002c738 vaddr=0x40080400 size=0x038d8 ( 14552) load
I (741) esp_image: segment 5: paddr=0x00030018 vaddr=0x400d0018 size=0x808d4 (526548) map
0x400d0018: _stext at ??:?

I (1878) esp_image: segment 6: paddr=0x000b08f4 vaddr=0x40083cd8 size=0x0ca5c ( 51804) load
0x40083cd8: DPORT_SEQUENCE_REG_READ at /home/paul/esp/esp-idf/components/spi_flash/flash_mmap.c:320
(inlined by) spi_flash_mmap_pages at /home/paul/esp/esp-idf/components/spi_flash/flash_mmap.c:193

I (2012) esp_image: segment 7: paddr=0x000bd358 vaddr=0x400c0000 size=0x00000 ( 0) load
I (2014) esp_image: segment 8: paddr=0x000bd360 vaddr=0x50000000 size=0x00000 ( 0) load
I (2094) boot: Loaded app from partition at offset 0x10000
I (2095) boot: Disabling RNG early entropy source...
I (2097) cpu_start: Pro cpu up.
I (2107) cpu_start: Starting app cpu, entry point is 0x400811cc
0x400811cc: call_start_cpu1 at /home/paul/esp/esp-idf/components/esp32/cpu_start.c:226

I (1) cpu_start: App cpu up.
I (2141) heap_init: Initializing. RAM available for dynamic allocation:
I (2161) heap_init: At 3FFAE6E0 len 00001920 (6 KiB): DRAM
I (2180) heap_init: At 3FFB9678 len 00026988 (154 KiB): DRAM
I (2200) heap_init: At 3FFE0440 len 00003BC0 (14 KiB): D/IRAM
I (2219) heap_init: At 3FFE4350 len 0001BCB0 (111 KiB): D/IRAM
I (2239) heap_init: At 40090734 len 0000F8CC (62 KiB): IRAM
I (2259) cpu_start: Pro cpu start user code
I (81) cpu_start: Starting scheduler on PRO CPU.
I (0) cpu_start: Starting scheduler on APP CPU.
I (82) IoTT_ESP32: Startup...
I (82) IoTT_ESP32: Free memory: 272252 bytes
I (82) IoTT_ESP32: IDF version: v3.2-dev-596-g7abed5fc-dirty
I (132) wifi: wifi driver task: 3ffc0f00, prio:23, stack:3584, core=0
I (132) wifi: wifi firmware version: 633012a
I (132) wifi: config NVS flash: enabled
I (132) wifi: config nano formating: disabled
I (142) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE
I (142) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE
I (192) wifi: Init dynamic tx buffer num: 32
I (192) wifi: Init data frame dynamic rx buffer num: 32
I (192) wifi: Init management frame dynamic rx buffer num: 32
I (202) wifi: Init static rx buffer size: 1600
I (202) wifi: Init static rx buffer num: 10
I (202) wifi: Init dynamic rx buffer num: 32
I (212) IoTT_ESP32: Start the WIFI SSID:[devolo-8f4] password:[AYBJMEITONIPHEBQ]
I (282) phy: phy_version: 3960, 5211945, Jul 18 2018, 10:40:07, 0, 0
I (282) wifi: mode : sta (24:0a:c4:11:a7:38)
I (282) IoTT_ESP32: Waiting for wifi
I (282) IoTT_ESP32: SYSTEM_EVENT_STA_START
I (1612) wifi: n:11 2, o:1 0, ap:255 255, sta:11 2, prof:1
I (2602) wifi: state: init -> auth (b0)
I (2612) wifi: state: auth -> assoc (0)
I (2622) wifi: state: assoc -> run (10)
I (2642) wifi: connected with devolo-8f4, channel 11
I (2642) wifi: pm start, type: 1

I (4632) event: sta ip: 192.168.1.33, mask: 255.255.255.0, gw: 192.168.1.1
I (4632) IoTT_ESP32: SYSTEM_EVENT_STA_GOT_IP
I (4632) IoTT_ESP32: Wifi connected to devolo-8f4
I (4632) ESP32_Tools: MqttUniqueName = ESP32_11A738
I (4642) ESP32_Tools: MqttStatusTopic = IoTT/Devices/ESP32_11A738/Status
I (4662) MQTT_CLIENT: Sending MQTT CONNECT message, type: 1, id: 0000
I (4672) IoTT_ESP32: MQTT_EVENT_CONNECTED
I (4672) IoTT_ESP32: sent subscribe successful, msg_id=25508
I (4672) IoTT_ESP32: MQTT_EVENT_SUBSCRIBED, msg_id=25508
I (4672) MQTT_CLIENT: deliver_publish, message_length_read=145, message_length=52
I (4682) IoTT_ESP32: MQTT_EVENT_DATA
TOPIC=TBTIoT/Devices/11936008677624121892/Status
DATA=Online
I (4902) MQTT_CLIENT: deliver_publish, message_length_read=42, message_length=42
I (4902) IoTT_ESP32: MQTT_EVENT_DATA
TOPIC=IoTT/Devices/ESP32_11A738/Status
DATA=Online

I excpected to receive all tree retained messages at startup of the esp-mqtt component.

Do I oversee something?

Thanks for your suggestions.

Paul.

User avatar
PaulVdBergh
Posts: 42
Joined: Fri Feb 23, 2018 4:45 pm
Location: Brasschaat, Belgium

Re: esp-mqtt subscribe and retained messages

Postby PaulVdBergh » Sat Aug 25, 2018 9:36 pm

What worries me is the log
I (4672) MQTT_CLIENT: deliver_publish, message_length_read=145, message_length=52
This makes me think that all the retained messages are effective delivered to the ESP (message_length_read=145), but only the first message message_length=52) is processed and dispatched in the system...
I'm suspecting the statements in mqtt_client.c at lines 486-487:

Code: Select all

        if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)
            break;
Isn't the break statement breaking the do;;;while() loop, thereby skipping the remaining messages?

User avatar
fly135
Posts: 353
Joined: Wed Jan 03, 2018 8:33 pm
Location: Orlando, FL

Re: esp-mqtt subscribe and retained messages

Postby fly135 » Mon Aug 27, 2018 8:15 pm

When I run into a situation like this I put some print statements in to see what that "lost" data would be. That might help you understand what's going on. That test statement looks like it breaks if it sees that it got as much or more than it expected.

John A

Who is online

Users browsing this forum: No registered users and 1 guest