Can I just reset everything in a flow from a catch (esp semaphores)?

I have a flow that handles four thermostats and zones for heating and cooling in my home, and for a variety of reasons it processes a lot of events rapidly, and parts need to be single streamed with semaphores, plus occasionally (very infrequently) I get zwave errors, so a “catch” is needed.

I’ve tried to make it so that it properly unwinds whatever was going on in the catch, but one difficulty is the semaphores. I can unwind one (or any fixed number) but what I really want to do is say “clear all pending messages and reset all semaphores”.

Kind of like a deploy does (at least it seems to).

Is there any such facility to put inside a catch?

Trying to do the right number of “leaves” inside a catch (with the significant risk of getting another error inside the the catch) is hard, especially since I have two levels of semaphores.

Basically a “reset everything” type statement?

Linwood

An interesting question. After a week with no answers, I’ll have a go.

In the world of transactional processing, some language-systems have evolved to permit multiple-transaction sequences with a commit/rollback command that either confirms all transactions in the sequence group, or reverses any already processed back to the starting point. The classic example is booking a flight, an hotel, and a rental car - you either want all to succeed or none.

Node-RED is an event-based language and does not have such features built-in. As far as I understand it, Node-RED (back end, not the editor) operates as a single-thread on one core, hence only one thing is ever done at any one point in time. NR now runs pseudo concurrency, so that two messages active in one sequence (flow) will be processed together node by node rather than one message running to conclusion first. The actual node processing is run by a task manager with a queue, such that the next node in the queue is given the input message and left to process (execute) until the node completes and the output message is then put back onto the queue, allocated for the next node in the sequence.

So, at the level of the Node-RED process engine, cancelling any in-flight work would require asking Node-RED to remove specific queue items from the work queue. I agree that ‘Deploy’ can wipe the queue clean, and this works for either all flows or just one flow. Since Node-RED has back-end API calls, including GET and SET flows/state, it may indeed be possible to send an API call to ‘stop’ all/one flow.

I do not believe that there is an easy way to execute such a ‘break’ in an NR flow, and forcing a hard-stop of a flow from within the flow itself will probably cause the entire flow to stop, including the post-stop processing required. The Catch node only operates on the same flow, so stopping the entire flow would close all sequences including the Catch node. The alternative is to build the ‘work queue’ inside a flow and manage that. Node-RED certainly has the tools to queue messages.

As long as you are single-point processing the work, then a single queue is already there. The Delay node can either delay for a given period of time, or rate-limit as x messages in a period. Both can be used to queue any backlog, and there are several input-flags that can be used to manipulate the queue.

The msg.reset option is probably the one you are interested in. Adding in a new or existing message with the field ‘reset’ set to anything will cause the node to discard any queued messages (and the ‘reset’ input message). I use this quite a lot with queued tasks to cancel pending work in a queue, which can be either a data working message or a semaphore message.

There is also the option for ‘flush’ and ‘toFront’ permitting more advanced queue management. Flush is useful for example in the Join node to end a manual join on a given trigger.

msg.reset is actually used across several queuing nodes - delay, trigger, join, rbe (report by exception).

nodes where the msg.reset can be used

A test-bed flow:

[{"id":"ef225326f9d90494","type":"delay","z":"fbaea1aaab14874b","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"days","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":620,"y":400,"wires":[["b5bce079c5c8a642"]]},{"id":"7e0ac074b6f99299","type":"inject","z":"fbaea1aaab14874b","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1..6].(\"M\" & $)","payloadType":"jsonata","x":150,"y":320,"wires":[["df5467c8c9eee08d"]]},{"id":"df5467c8c9eee08d","type":"split","z":"fbaea1aaab14874b","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload","x":290,"y":320,"wires":[["ed41d3d3a5af0991"]]},{"id":"a3f0d061b1386a6c","type":"inject","z":"fbaea1aaab14874b","name":"reset","props":[{"p":"reset","v":"yes please","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":330,"y":380,"wires":[["ef225326f9d90494"]]},{"id":"34b1d53d8ac98ecd","type":"inject","z":"fbaea1aaab14874b","name":"flush 1","props":[{"p":"flush","v":"1","vt":"num"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":330,"y":420,"wires":[["ef225326f9d90494"]]},{"id":"b5bce079c5c8a642","type":"debug","z":"fbaea1aaab14874b","name":"debug 518","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":850,"y":400,"wires":[]},{"id":"2f15020007292266","type":"inject","z":"fbaea1aaab14874b","name":"flush all","props":[{"p":"flush","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":330,"y":460,"wires":[["ef225326f9d90494"]]},{"id":"ed41d3d3a5af0991","type":"change","z":"fbaea1aaab14874b","name":"tidy","rules":[{"t":"delete","p":"parts","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":410,"y":320,"wires":[["ef225326f9d90494","9df2dc75ab140164"]]},{"id":"6b9ddad0908832b3","type":"inject","z":"fbaea1aaab14874b","name":"to front","props":[{"p":"toFront","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":330,"y":600,"wires":[["9df2dc75ab140164"]]},{"id":"9df2dc75ab140164","type":"delay","z":"fbaea1aaab14874b","name":"","pauseType":"rate","timeout":"1","timeoutUnits":"days","rate":"1","nbRateUnits":"1","rateUnits":"day","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":630,"y":520,"wires":[["895d47e4336682eb"]]},{"id":"895d47e4336682eb","type":"debug","z":"fbaea1aaab14874b","name":"debug 519","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":850,"y":520,"wires":[]},{"id":"801e361105b5762f","type":"inject","z":"fbaea1aaab14874b","name":"to front & flush 1","props":[{"p":"toFront","v":"true","vt":"bool"},{"p":"flush","v":"1","vt":"num"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":360,"y":640,"wires":[["9df2dc75ab140164"]]},{"id":"99d8ca0477f8de7b","type":"inject","z":"fbaea1aaab14874b","name":"reset","props":[{"p":"reset","v":"yes please","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":330,"y":520,"wires":[["9df2dc75ab140164"]]},{"id":"bd341784b4f66e8f","type":"inject","z":"fbaea1aaab14874b","name":"flush 1","props":[{"p":"flush","v":"1","vt":"num"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":330,"y":560,"wires":[["9df2dc75ab140164"]]}]

In any computer language where the options are not already available in either code or operating system, I find that you end up having to write the stuff yourself…

I’ve had this flow sitting around for a while, it may be useful. It can disable/enable a flow(page) from the api. It works but I’ve never actually used it for anything.

[{"id":"4b82b0cc.556d58","type":"tab","label":"disable flows","disabled":false,"info":"","env":[]},{"id":"3dc73e50.d81c9a","type":"inject","z":"4b82b0cc.556d58","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"test","payload":"true","payloadType":"bool","x":160,"y":200,"wires":[["5f57da6d.0f5b64"]]},{"id":"dd42ff7.cd1438","type":"http request","z":"4b82b0cc.556d58","name":"Get single flow","method":"GET","ret":"obj","paytoqs":"ignore","url":"http://localhost:1880/flow/{{flowId}}","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"basic","senderr":false,"headers":[],"x":1000,"y":200,"wires":[["6e4f731e.abe73c"]]},{"id":"b9a36067.e23358","type":"debug","z":"4b82b0cc.556d58","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":1610,"y":200,"wires":[]},{"id":"6e4f731e.abe73c","type":"change","z":"4b82b0cc.556d58","name":"","rules":[{"t":"set","p":"payload.disabled","pt":"msg","to":"flowStatus","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":1210,"y":200,"wires":[["c7dc25cd.221b18"]]},{"id":"42cba935.33793","type":"inject","z":"4b82b0cc.556d58","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"test","payload":"false","payloadType":"bool","x":160,"y":320,"wires":[["5f57da6d.0f5b64"]]},{"id":"5f57da6d.0f5b64","type":"change","z":"4b82b0cc.556d58","name":"","rules":[{"t":"set","p":"flowStatus","pt":"msg","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":390,"y":200,"wires":[["c435d2e3.28e67"]]},{"id":"c7dc25cd.221b18","type":"http request","z":"4b82b0cc.556d58","name":"Update flow","method":"PUT","ret":"obj","paytoqs":"ignore","url":"http://localhost:1880/flow/{{payload.id}}","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"basic","senderr":false,"headers":[],"x":1430,"y":200,"wires":[["b9a36067.e23358"]]},{"id":"c435d2e3.28e67","type":"http request","z":"4b82b0cc.556d58","name":"Get flows","method":"GET","ret":"obj","paytoqs":"ignore","url":"http://localhost:1880/flows","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"basic","senderr":false,"headers":[],"x":580,"y":200,"wires":[["13d6bb95.3ba83c"]]},{"id":"13d6bb95.3ba83c","type":"change","z":"4b82b0cc.556d58","name":"get flowId by name","rules":[{"t":"set","p":"flowId","pt":"msg","to":"*[type = 'tab' and label = $$.topic].id","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":790,"y":200,"wires":[["dd42ff7.cd1438"]]},{"id":"1384c0a8.e3b88f","type":"comment","z":"4b82b0cc.556d58","name":"Enable flow","info":"","x":150,"y":280,"wires":[]},{"id":"abc9854d.c17598","type":"comment","z":"4b82b0cc.556d58","name":"Disable flow","info":"","x":150,"y":160,"wires":[]},{"id":"56008fd8.ad753","type":"comment","z":"4b82b0cc.556d58","name":"This will enable or disable a flow (tab)  The topic should be the flow name.","info":"","x":340,"y":80,"wires":[]},{"id":"432cbfc7.8cef6","type":"inject","z":"4b82b0cc.556d58","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"My Flow","payload":"true","payloadType":"bool","x":170,"y":460,"wires":[["5f57da6d.0f5b64"]]},{"id":"c8eb6499.0a599","type":"inject","z":"4b82b0cc.556d58","name":"","repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"My Flow","payload":"false","payloadType":"bool","x":170,"y":580,"wires":[["5f57da6d.0f5b64"]]},{"id":"7bcfd22b.9aa0b4","type":"comment","z":"4b82b0cc.556d58","name":"Enable flow","info":"","x":150,"y":540,"wires":[]},{"id":"3145edc5.e295d2","type":"comment","z":"4b82b0cc.556d58","name":"Disable flow","info":"","x":150,"y":420,"wires":[]}]

Sorry for the delay, real life intervened, plus I needed to think about this. But maybe the real question I should be asking is not how to fix the catch and take/leaves, but why I had to do all that mess, as it has a lot to do with queuing, which I do not well understand, but your description makes me wonder why what I had does not work.

Here’s the basic flow – events (several of them are possible) triggers a function node, which calculates how each thermostat should be set in simple procedural code. It uses node.send to emit updates in the proper order, and a single action node is expected to execute each in order.

The presumptions I had were:

  • The function node will execute to completion before it starts again (i.e. that one node will not be invoked again before it completes, pending messages will queue up.

  • Pending messages will queue up without loss and stay in order.

  • Node.Send (and the final output of the function node) will also queue up in order, and

  • The action node will execute these in order.

If all those things were true, I would have not had issues; I had issues, which led me to believe some aspect of NR was not queuing these as I believed it should, so I tried to force it with take/leaves. Which works fine except for failure recovery.

But let me go back to the very beginning – can you (or someone) help me understand if my assumptions are wrong?

Just to illustrate the simplified version, this is all I really need. I just want this single streamed, so that each event coming from the left side queues runs of the function node, and all the messages emitted from the function node (via node.send plus the last exit) execute in the action node in order.

But when it was set up like this, once in a while I got bad results because they operated out of order, e.g. turning heat on then off got executed as heat off then on for one zone.

Honestly this doesn’t benefit from NR that much, but HA’s scripting language is too difficult for all the decisions made in that function node, javascript works better.

PS. And yes, I’d want a catch in here, but it wouldn’t have to do anything but perhaps log the error, since the function node emits all settings each time, so it would clean up after itself if it missed one once, at worse when the average temperature changes (often given the number of zones and sensors).

Node-RED operates on a single thread basis. Once anything is handed to a node, that node does what it wants to do before handing back to the execution queue. While the node is doing its stuff, nothing else is going on (as far as I understand this suff).

Each of your trigger nodes can fire, generate a message, and exit to add this message to the NR execution queue. The messages will then be taken off the queue and handed to the function node. The NR execution queue means that more than one message can be in this flow at any one time, and (importantly) if there is more than one message in the flow, these will progress in step ‘together’ rather than one message going all the way through before the next message starts.

It is possible for a message to start at eg the top trigger, then a second message to start elsewhere, then the first message to move to the function node, then the second to move to the function node, then the first to move to the action node, then the second to move to the action node. I suspect that, generally, they don’t get ‘out of order’ but someone else may know better…

However, if your function node is asynchronous, then it can carry on doing stuff while the execution queue passes the/a/part of a message on to the action node, making the situation even more complicated. You could have one new message waiting to start the function node, one half of a message in the function node that is still doing things with the other half of the (same) message about to start the action node, at the same time as an earlier message is actually in the action node.

If the problem is really that, a message starting the function node must fully complete the function node work (in totality) and then run the action node, before any further triggers create messages that start in the function node…

… you need to queue all input triggers in a queue to be released only when any preceding messages have gone all the way through function and action.

Although it would be interesting to build a complete queue and feedback loop or semaphore interlocking to prevent more than one message within a part of the flow, an easy solution is to rate limit all input trigger message by a sufficient time period that each message will have naturally cleared the flow before the next message is released.

This is my suggestion, based on KISS.

There are questions that, if any of the trigger states fire, and a message is already in flight, should a new trigger be allowed to progress since the pending action from the first message may update the state upon which the new message was based, thus rendering the new message obsolete? To this end I recommend using ‘Drop intermediate messages’. You will have to work out a time period for which the rate limiting should run. Note that the very first message to arrive will be passed through, then a timer starts for the period/count in milliseconds. Any message arriving during this interval will be queued/discarded accordingly. After the time period is up, either a message is popped off the queue and released, restarting a new exclusion period, or the next new messages will again pass through, and restart a new exclusion period.

I’m not completely following that, but let me clarify one aspect. It does beg a question about the documentation.

It doesn’t matter at all what order the triggering events occur.

It doesn’t even matter what order the function node’s emitted messages, for any one invocation, are processed by the action node.

All that matters is that the function node is not re-entrant, i.e. it completes one message input and exits before it is re-invoked for the next, and that it’s group of messages stay together and are executed (the whole group) before the group from the next invocation goes through.

Now that said and from one of your screen shots I noticed a setting on the action node I had not seen before. Notice the “queue”. The documentation says this applies when not connected to home assistant, which I don’t care about since all the things invoking it come from home assistant, but… might that also apply to a backlog of messages? Not sure, but setting it to queue.

Also, node.send is called “asynchronous” in the documentation. I interpret that as meaning you can keep your javascript running and these messages start in flight, but if I do a node.done before the exit of the function node, I assume these are all grouped together. But… maybe there’s a larger meaning to asynchronous in this case?

My GUESS is that either the function node is actually re-entrant, i.e. it may be running more than once at the time time, or a GUESS is that the messages emitted by the function node are not staying in order.

The problem only occurs about once a week, and I can’t catch it in the act or force it, so am grasping a bit at straws to resolve. But if I can resolve the queuing issue, I can remove the semaphores, and my CATCH issue goes away.

Can you elaborate on this asynchronous aspect? Are you referring to my use of node.send, or is there some place that this is specified?

I do not care, for a given invocation of the function node, if the emitted messages start processing before the function node completes.

I do care if another instance of the function node can start and both are emitting (and mixing up) messages emitted.

As far as my understanding of how Node-RED works:

Node.js, and therefore Node-RED, is definitely single thread. Only ONE “thing” can be going on at any one time, be that the NR system, or one node doing stuff.

Node-RED is definitely asynchronous-processing. More than one message can be processed through a flow.

What this means is, for two messages, each message gets added to an internal queue. The top message is popped off, handed to the next node, and that node has complete use of the system, until the node finishes, then the node is done and the message is put back on the queue.

Two separate messages can go through a flow, each effectively taking turns on the next node that they are each due to go to. At this point, each node is hogging the resource, and both finishing and sending a message at the same point.

I don’t understand asynchronous processing within a node. I have read up on this, but my blunt guess is that no one really understands this well enough to be able to explain it in clear English that I can understand (challenge anyone?..)

My expectation is that, for things like the http request node, the node makes an http call, then hands back ‘control’ to NR so that other stuff can carry on, and when the response eventually comes back, the node springs back into life, builds the message, sends the message, and is done.

I would suggest that you post your questions on the Node-RED forum. These are the people who actually write and support Node-RED.

In doing yet more digging, I did find this Node-RED forum discussion and I noted the part about function nodes.

An important aspect to the function node is that it is executed in a VM sandbox and is isolated from the actual runtime. The send and done functions are mapped to an object named node along with some other properties. To be clear, the function code entered by a user is wrapped as below…

If function nodes are actually executed asynchronously to the runtime, and the node then uses asynchronous processing, who knows just what will happen?

Not sure about that… Under normal node operation, yes. If you use node.send() and node.done() I am no longer sure.

Almost 100% sure this is not guaranteed.

Edit
Worth reading up on cloning of messages in the asynchronous message processing environment, and how it is problematic with function nodes that use multiple node.send() and the need to impose cloning in such cases.

Thanks. Out for the evening but I need to do some more digging.

I did try this: Inject node → Function node → debug, and in the function node I did this horrible thing:

function sleep(milliseconds) {
  var start = new Date().getTime();
  for (var i = 0; i < 1e7; i++) {
    if ((new Date().getTime() - start) > milliseconds){
      break;
    }
  }
}
var start = new Date().getTime();
sleep(2000); 
msg.payload = start; 
return msg;

It’s essentially a “sleep” by running out computer time, so the node should stay busy for 2 seconds.

I then hit the inject several times, rapidly (like maybe 2 per second).

All the output appears at once after a long delay (about 2 x number of injects). I guess this is consistent with the idea NR as a whole is single streamed, and also that the function node doesn’t start a new instance until the old completes, but it appears it didn’t let the debug node do anything before all messages inbound to the function node were done.

image

If I use a delay node instead, and hit inject as quickly as I can, there is a 2 second delay then all five come out very quickly. The delay node apparently accepts and starts the timer on each message independently, it does not wait until the first completes. So a fast stream of messages still ends up fast. That’s consistent I think with the documentation, but is a bit at odds with the idea that a node completes before it accepts another message.

Bottom line, still confused, but obviously need to do some more research.

It does give me an idea though. I don’t need to process every event. Maybe I can put a switch node or something in front of the function, and one at the end, that checks a “busy” context variable, and discard any messages while busy. Not so much force synchronization of message processing, but just prevent anything queuing up at all until I’m done (and a catch to say “done”). That might be more reliable than a semaphore.

Be careful. Debug nodes do their stuff, but the actual job of shifting the output from the runtime to the editor is done by the WebSocket back to the NR server. There is no guarantee that debug messages are not queued. I would not assume that the order in which a debug message arrived in the editor relates to the order in which the debug node was executed.

Yup. The Delay node is a queue, and arriving messages are held, then the node exists and processing goes back and accepts the next message… Each message in the delay node is independent.

At least we can agree on that :laughing:

Yup. If only one thing is going on, it can’t possibly get muddled with anything else. There is a lot to be said for one-thread processing.

Enjoy!

Sorry, I should have expanded and formatted the payload. I had the start time in the payload each output, so I can see that they started at 2 sec intervals.

What was a little surprising to me is that the time stamp on the debug message itself is not from when it originated, it appears to be from when it is displayed; to me that’s a bug not a feature.

I need to stare at all this tomorrow. Thank you for spending time on it.