Mqtt persistent mode

Ah just answered my own question, found the use case. So I guess what you would be looking for the queue of messages to be processed by changing the sensor’s state multiple times in rapid succession. This way your utility meter (or whatever your accumulator sensor is) would see all those state changes and accumulate all of them.

Well definitely make sure to mention this use case in the feature request. I’m not sure how feasible it is but perhaps someone will figure it out.

I understand why phanosp wants the MQTT Sensor’s topic to receive a queue of all values that occurred while Home Assistant was offline (or merely disconnected from the MQTT broker). However, persistence would apply for all subscribed topics and here’s how it would affect, for example, an MQTT Binary Sensor for a door or motion sensor.

  • While Home Assistant is disconnected from the broker, the broker creates a queue containing every on/off state (open/close) of the motion sensor.

  • The moment Home Assistant re-connects to the broker, it would receive the queued payloads and rapidly toggle the binary_sensor’s state.

  • An automation monitoring the binary_sensor would be triggered repeatedly and perform actions in succession that were never anticipated when it was designed (like sending a flurry of consecutive notifications or toggling devices repeatedly).

2 Likes

Hi Mike and thanks for clarifying things up. I was suspecting that the mqtt integration was not supporting persistent session but I was not sure. Basically what I want is what you explain on your second post. HA should receive all messages for a certain period which it was down/offline and a utilitiy meter should accumulated them. Not sure if the utility meter will be able to do that since to my understanding in order to accumulate “correctly” all the info the time frames should also be consistent. I mean if my wattage was 500 watt for 5 minutes and then its was 1500 watt for 1 minute and then 500 watt again the utility meter should take this into consideration.

To my understanding the broker should include timestamps so utility meter in HA should not have a problem with this but I am not sure. If someone can clarify things this will be better.

Regarding Taras comment I was under the impression that I can have persistent mode in MQTT per topic not necessarily for everything and everyone that is subscribe to the mosquito broker. For example have something on the sensor like

sensor energy_red:
  platform: mqtt
  persistent: true
  client_id: sesnor_red
  qos: 2
  ...

This way the problem you mention in your post should never occurred unless you specifically ask for that topic to be persistent.

Not sure 100% if this how it will work. Maybe someone can also clarify things here as well. If what I mention above is correct I will continue to make a feature request

Regards,

Phanos

A timestamp for the MQTT message would allow you to reconstruct the timeline to perform the integral calculation needed. But it would also require the MQTT integration to pass that timestamp on to the recorder as the timestamp of the change, and I very much doubt that. But that would be nice indeed, if not for all the side effects of a very long down time. Automations would need to be suppressed.

I was under the impression that persistent topics would replace the previous ones for that topic. So what you are saying is that is not the case per se for persistent clients, as they will queue them until read?

I also do not think the utility_meter sensor is capable of converting from W to kWh. Are we talking about a different sensor, or is this just a thought experiment? I’m constantly amazed that there seems to be a sensor type for everything, but I cannot always find the right one that easy.

Hi Edwin,

the utility meter does not convert W to kWh directly. First I get the watt in a sensor of platform: mqtt (current consumption), then I use a sensor of platform: intergration to accumulate the wattage. Then the platform:integration is passed to a utility meter to get the value.

I do not pass the timestamps anywhere so I am not sure if they are actually needed to be pass directly or somehow are being done by the mqtt integration.

Another setup that I have I am sending the values to an influxdb and then I can graph the data using grafana. In this case I do not pass timestamps and influxdb takes cares of the timestamps. Not sure though 100% maybe I am missing something here. Maybe influxdb (being a time series platform) simply adds the timestamps when the message is received.

True told although that I am using persistent in the influxdb/grafana system I am not sure if any data received after the system was down have any advert effect and those data are being record incorrectly. Since both grafana/influxdb and mosquito broker are on the same machine I can not really test it through to be sure.

Maybe someone with more knowledge can clarify things more about how does the mqtt integration in home assistant and platform:integration work here.

Regards,

Phanos

All state changes are timestamped. The sensor uses the timestamps that are recorded to reconstruct the timeline. So that is why the sensor can do the calculation.

But the main question is, does the recorder use the timestamp now() when it receives a state change, or can the integration (MQTT) specify one of its own when applying the state change. And if it can, does it? I suspect the recorder will make up the timestamp on its own. In that case MQTT processing the queue in rapid succession will not fix the downtime period properly.

Hi Edwin,

I see what you mean. If all messages that will be received though the broker due to downtime will have the same timestamp that will create many more issues and solve none I think.

In a perfect world messages sent to the mosquito broker should (at least in theory) include the timestamp of the client who published it at the time it sent it. If this is in the specification of mosquito broker then the mqtt integration of Home Assistant should be able to implement this as well.

But I do not have more knowledge on the matter. I will try to research little bit more before I make a feature request. Maybe in the meantime someone else with more knowledge on the matter can shed more light on this.

Regards,

Phanos

The documentation link you provided in your first post explains it is a persistent session. The term “session” in this context means from the moment a client connects to the broker to when it disconnects. All of the client’s subscriptions during the persistent session are stored.

Which part of the documentation led you to believe you could specify persistence by topic?

Hi Taras,

my comments come from experiencing with the command line mosquito_sub. When a client (a single command/client subscripts to a topic), the user can provide the ability for that command/subscription to be persistent and with a unique client id.

So as I understand it you can have more than one subscribing client of the same machine with different topics and client_ids. For example

mosquitto_sub -h broker_host -t "waterheater/command/#" -u "xxxxxxxx" -P "xxxxx" -q 2 -c -i client_id_1
mosquitto_sub -h broker_host -t "wattage/consumption/#" -u "xxxxxxxx" -P "xxxxx" -q 2 -c -i client_id_2

Of course I am not expert and I could be wrong here. Do you know more and can clarify things better please?

Regards,

Phanos

I already clarified it in my previous post but if it failed to help you understand the concept, I suggest you review HiveMQ’s documentation or this one:

MQTT Message Queuing & Persistent Session

It’s not true. Or rather @phanosp didn’t mention session in his the first post different way than mentioning MQTT essentials docs.
For sake of clarifying the feature: the “persistent session” feature is to collect generated messages even if the client has disconnected, in order to deliver those messages later.
The persistent session is initialized by connecting with cleanSession = false. And it will collect messages in case the client has disconnected. To disable queuing messages, the client has to connect with cleanSession = true. It’s even mentioned in docs you linked.

But also it’s true that the persistence is not set by topic. But by client id.
However… it doesn’t mean HA cannot make 2 connections to MQTT, one with and one without persistence

What is “not true”?

Phanos first post does use the term “mqtt session” in the same manner as HiveMQ’s documentation for “persistent session”. However, in a later post, they make the assumption that persistence can be per topic.

That is not true; it’s per session.

TBH I had no problem understanding his intention. I’m not even sure using ‘mqtt session’ in his case is totally wrong. Especially in the context of the persistence feature.

It’s not true either. It’s not per session, It’s per device ID. :wink: By connecting and disconnecting you can create multiple different sessions (depending on level of networking we taking into consideration). However if all those sessions manifest the same device id, it allows creating persistent session within mqtt protocol.

When a client connects to the broker, the session it establishes can be either “clean” or “persistent”. For example, Home Assistant establishes clean sessions.

What is this “deviceID” you are referring to? If you mean “clientID” (like in HiveMQ’s documentation) then that serves to identify the one session from another (a client can establish multiple concurrent sessions with the broker).

So this isn’t really possible tbh. I’ll try to explain the two challenges you face here, one is feasible but not supported the other is probably impossible. I mean I’m not going to definitively rule it out but extremely unlikely to happen seems like a fair assessment.

The first is that HA does not support importing historical data. An integration can only change the state of an entity right now. There is no way for an integration to change the history of an entity and say “this entity had this state from time X to time Y”. There’s a number of feature requests around this capability, particularly when it comes to historical energy data (here, here and here for example). It makes sense but its not possible currently.

The best an integration could do right now would be to rapidly change states for the queued messages. This ensures all state changes occurred but there’s nothing it can really do with the timestamp of those messages currently.

The second thing you’d need is for an integration to be able to import historical data and trigger state changes as if they occurred at that moment in time. Remember, each integration is totally independent. The energy meter integration has no idea about MQTT or anything like that. The only thing it is doing listening for is a state change event for an entity with id sensor.energy_red. Same with potentially many other things (templates for example).

So for this accumulator to be able to work correctly it would somehow need to receive and account for a backdated state change event. This is a major problem because to properly process a backdated state change event most integrations would need to see everything as it existed at that time. Think about a simple template like this for example:

{{ states('sensor.energy_red') | float + states('sensor.energy_blue') | float }}

If this template was used in a template sensor or a template trigger how would this work if it receives a state change event for sensor.energy_red in the past? It would also have to know the state of sensor.energy_blue at that moment in time as well. If it used the current value of the blue sensor but the past value of the red sensor then the result is nonsense.

Perhaps this second feature request could be somehow limited to just energy meter or integrations that only look at the value of a single entity. But if there’s any way an integration could look at the state of multiple entities there’s really no way to handle a backdated state change event. Recreating the entire state machine at that moment in time is just not feasible.

1 Like

Hi Mike,

given your explanation I do come to agree that is unlikely what I request to be able to be implemented fast and easy on HA :). I mean so far I am sending the current wattage of each phase (red, yellow, blue) to an MQTT sensor, then I create sensor to add them all together and then use a sensor with platform: integration to accumulate the wattage in kWh which in turn I pass that sensor to a utility meter.

Since HA can not “change older entries” in history it is very unlikely that I can achieve what I describe earlier when the client is offline/disconnected as you describing in you latest post.

Maybe the solution to my problem is to have the publishing client (the one that sends the three phase’s value to the broker), to first calculate the total accumulation in kWh and then sent that value to HA (through broker of course) as a sensor that does not have the current wattage but rather the total kWh so far.

In this case even if the messages are lost at some point when HA establishes connection to the broker again it will receive a value of the accumulated wattage of time “now” which I believe is more in line with the description about how HA works.

Thanks

Phanos

1 Like

Not sure why you are mentioning concurrent connections. We are talking here about the persistent session as a remedy against losing data when an mqtt client is disconnected. The persistent session allows receiving the sequence of missing data after reconnection.

Other than that, Mike above very well explained the usage of the persistent session in context of Home Assistant.

Review the thread and find the post by phanos where two MQTT topics are subscribed in separate sessions.

However my response was to your post which introduced an incorrect description of persistence mode’s scope.

I wouldn’t call it “incorrect description” just because I differently named the string used for client identification. But yes, you are right. it’s clientId

Actually he wanted to have separated connections from HA for each entity, which is not supported by HA. However, it should be explicitly said, that it’s not possible to create two or more connections identified by the same clientId. That;s why I don’t understand why you mentioned it in context of my contribution.

Reviewing the thread I’m still hitting your answer which sounds wrong to me:

The documentation link you provided in your first post explains it is a persistent session . The term “session” in this context means from the moment a client connects to the broker to when it disconnects. All of the client’s subscriptions during the persistent session are stored.

The documentation he provided mentions persistent session mode, which obviously doesn’t end with a client disconnection. In fact, after disconnection, the server still queues data subscribed by connection identified with a particular clientID. Any next connection identified by the same clientID is just a continuation of the previous (interrupted) connection. This allows receiving missing data (in the meantime stored on server side)

Possibly because you misread it.