How to check when your UPS is actively supporting the power with NUT

Hello,
I configured NUT adding to read my UPS data via USB. It looks to work fine because I can see some new entities in HA but I cannot understand what entity should I use to verify if my UPS is actively supporting the power.
I can see these sensors:

 sensor.zinto1000_battery_charge	100	state: Online
unit_of_measurement: %
friendly_name: Zinto1000 Battery Charge
device_class: battery
 sensor.zinto1000_input_voltage	229.1	state: Online
unit_of_measurement: V
friendly_name: Zinto1000 Input Voltage
icon: mdi:flash
 sensor.zinto1000_load	15	state: Online
unit_of_measurement: %
friendly_name: Zinto1000 Load
icon: mdi:gauge
 sensor.zinto1000_status	Online	state: Online
unit_of_measurement: 
friendly_name: Zinto1000 Status
icon: mdi:information-outline
 sensor.zinto1000_status_data	OL	state: Online
unit_of_measurement: 
friendly_name: Zinto1000 Status Data
icon: mdi:information-outline
 sensor.zinto1000_ups_type	offline / line interactive	state: Online
unit_of_measurement: 
friendly_name: Zinto1000 UPS Type
icon: mdi:information-outline

Thanks

1 Like

Pull the plug :wink: and watch the changes. This was how I found the various state changes in NUT config. Note, I send NUT via MQTT and not directly into HA, so my sensor names and attributes are non-standard. But I think the state info is same as you will see in direct NUT configuration:

  - alias: 'Home power outage'
    trigger:
    - platform: state
      entity_id: sensor.ups_status
      to: 'OB DISCHRG'
    action:
      service: notify.pushover
      data:
        message: "Power outage detected by UPS: {{ (state_attr('sensor.ups_status', 'battery_runtime') | float / 60) | round(0) }} minutes of battery power remaining."
        data:
          priority: 1
          sound: gamelan
  - alias: 'Home power recharging'
    trigger:
    - platform: state
      entity_id: sensor.ups_status
      to: 'OL CHRG'
      from: 'OB DISCHRG'
    action:
      service: notify.pushover
      data:
        message: 'Power restored detected by UPS, recharging'
        data:
          priority: 1
          sound: gamelan
  - alias: 'Home power restored no recharging'
    trigger:
    - platform: state
      entity_id: sensor.ups_status
      to: 'OL'
      from: 'OB DISCHRG'
    action:
      service: notify.pushover
      data:
        message: 'Power restored detected by UPS, no recharging done'
        data:
          priority: 1
          sound: gamelan
  - alias: 'Home power restored recharging started'
    trigger:
    - platform: state
      entity_id: sensor.ups_status
      to: 'OL CHRG'
      from: 'OL DISCHRG'
    action:
      service: notify.pushover
      data:
        message: 'Power restored detected by UPS, recharging started'
        data:
          priority: 1
          sound: gamelan
  - alias: 'Home power restored after recharge'
    trigger:
    - platform: state
      entity_id: sensor.ups_status
      to: 'OL'
      from: 'OL CHRG'
    action:
      service: notify.pushover
      data:
        message: 'UPS recharged'
        data:
          priority: 1
          sound: gamelan
2 Likes

This is not possible just with nut isn’t it?
I need some bridge “nut to MQTT” right?

Yes, I have a python script that loops every 30 seconds, executes upsmon, formats the output as JSON and publishes to MQTT.

I was having communications timeout issues with NUT and CyberPower UPS, so I went down this route so I could debug that issue. Never changed it. I sort of like using MQTT for my sensors general anyway.

3 Likes

any chance you can post a gist of this to github or post up your repo?

Code is pretty ugly, apologies. This python code is running on a raspberry pi zero plus that is USB attached to cyberpower UPS. On boot launches a tmux session that waits for network to be ready and then runs the python script that stays in loop querying NUT and publishing status to MQTT.


----- code on raspberry pi to launch tmux session running python code on boot
Notes: had to change first line, do the wait for network to startup and run tmux as pi not root

sudo chmod +x /etc/rc.local

sudo -s
vi /etc/rc.local

#!/bin/bash
#
#######!/bin/sh -e
#
# rc.local
#
# This script is executed at the end of each multiuser runlevel.
# Make sure that the script will "exit 0" on success or any other
# value on error.
#
# In order to enable or disable this script just change the execution
# bits.
#
# By default this script does nothing.

# Print the IP address
#_IP=$(hostname -I) || true
#if [ "$_IP" ]; then
#  printf "My IP address is %s\n" "$_IP"
#fi

# wait for networks to start

STATE="error";

while [  $STATE == "error" ]; do
    #do a ping and check that its not a default message or change to grep for something else
    STATE=$(ping -q -w 1 -c 1 `ip r | grep default | cut -d ' ' -f 3` > /dev/null && echo ok || echo error)

    #sleep for 2 seconds and try again
    sleep 2
done

su pi -c 'tmux new-session -d -s watts'
su pi -c 'tmux send-keys \'/home/pi/wattsup/wattsup_script_v01.py' C-m'


su pi -c 'tmux new-session -d -s ups'
su pi -c 'tmux send-keys cd\ /home/pi C-m'
su pi -c 'tmux send-keys /home/pi/upsmon/upsmon_script_v01.py C-m'


exit 0


----- in HA, this is the MQTT sensor


on home assistant
sensors.yaml

# nut ups monitor
  - platform: mqtt
    name: "UPS Status"
    state_topic: "upsmon/status"
    value_template: "{{ value_json.ups_status }}"
    json_attributes: 
      - "timestamp"
      - "input_voltage"
      - "ups_status"
      - "battery_runtime"
      - "battery_runtime_low"
      - "battery_voltage"
      - "output_voltage"
      - "battery_charge"
      - "ups_load"
      - "ups_test_result"
      - "ups_model"
      - "ups_vendorid"
      - "ups_serial"
 

----- # python code that loops reading NUT upsc, converts to JSON and publishes to MQTT
#!/usr/bin/env python

# upsmon_script_v01.py
# 2019-09-16
# 
# using mqtt example from here: https://techtutorialsx.com/2017/04/14/python-publishing-messages-to-mqtt-topic/

import sys
import os
import subprocess
import datetime
import time
import re
import paho.mqtt.client as mqttClient
import logging
import logging.handlers
import csv
import json
import pprint

####################

# mqtt server IP address
mqttAddress = "192.168.2.13"

# mqtt topics
upsmon_topic = "upsmon/status"

# global flag to indicate connection to mqtt server
Connected = False

####################

# Logging setup

LOG_FILENAME = 'upsmon.log'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('upsmon')
my_logger.setLevel(logging.INFO)

# setup log format
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
                              datefmt='%Y-%m-%d %H:%M:%S')


# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, backupCount=5)

handler.setFormatter(formatter)

my_logger.addHandler(handler)

# Roll over on application start
my_logger.handlers[0].doRollover()


def onConnect(client, userdata, flags, rc):
  global Connected
  # connect to mqtt server
  if rc == 0:
    Connected = True
  else:
    Connected = False

def main():

  global Connected

  my_logger.info("startup") 
  # set up connection to mqtt server
  client = mqttClient.Client("media-pi-01-upsmon")
  client.onConnect = onConnect
  client.connect(mqttAddress)

  client.loop_start()

  # pause to connection to complete
  time.sleep(0.5)

  my_logger.info("connected to mqtt server")

  try:
    while True:
#      print("=========================== start")
      retrieve_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
      # get output from nut upsc command
      p = subprocess.Popen('upsc cyberpower1@localhost', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
      rawline = p.communicate()

#      print(type(rawline))
#      pprint.pprint(rawline)

      my_logger.debug("At : " + retrieve_time + ", Received string from shell: [[" + rawline[0] + "]]")

      if len(rawline[0]) > 0:
          try:

             # check if NUT is not communicating with UPS
             if rawline[0].find("battery") == -1 :
               dct_string = "{\"timestamp\": " + retrieve_time + ", \"ups_status\": \"OFFLINE\"}"
               client.publish(upsmon_topic, dct_string, 1) 
               my_logger.error("NUT not communicating with UPS: " + dct_string + ", rawline = " + rawline[0])
               time.sleep(15.0)
               continue

             vals = rawline[0].split("\n")

             dct = {}

             for x in vals:
               if len(x) == 0 :
                 continue
               if x.find(":") == False :
                 continue
               if x.find("driver") <> -1 :
                 continue
               if x.find("device.type") <> -1 :
                 continue
               if x.find("battery.type") <> -1 :
                 continue
               if x.find("ups.timer") <> -1 :
                 continue
               if x.find("ups.delay") <> -1 :
                 continue
               if x.find("battery.charge.low") <> -1 :
                 continue
#               if x.find(".low") <> -1 :
#                 continue
               if x.find(".beeper") <> -1 :
                 continue
               if x.find(".mfr.date") <> -1 :
                 continue
               if x.find("battery.charge.warning") <> -1 :
                 continue
               if x.find("device.mfr") <> -1 :
                 continue
               if x.find("device.model") <> -1 :
                 continue
               if x.find("device.serial") <> -1 :
                 continue
               if x.find("ups.productid") <> -1 :
                 continue
               if x.find("battery.mfr.date") <> -1 :
                 continue
               if x.find("battery.voltage.nominal") <> -1 :
                 continue
               if x.find("ups.realpower.nominal") <> -1 :
                 continue
               if x.find("input.voltage.nominal") <> -1 :
                 continue
#               if x.find("ups.serial") <> -1 :
#                 continue
               if x.find("ups.mfr") <> -1 :
                 continue
#               if x.find("ups.test.result") <> -1 :
#                 continue
#               if x.find("ups.model") <> -1 :
#                 continue
               tup = x.split(":")
               # bush league code to strip spaces from the two elements
               tup = [tup[0].strip().replace(".", "_"), tup[1].strip()]
               #since list has only two items, use this trick to convert the list to a dictionary
               dct.update(dict([tup]))
#               print x
             dct.update({'timestamp': retrieve_time})
             dct_string = str(dct).replace("'","\"")
#             print(dct_string)

#             print("----- publishing to mqtt server")
             client.publish(upsmon_topic, dct_string, 1)
          except Exception as e:
            my_logger.error("Exception triggered: " + str(e))

#      print("=========================== end")
      time.sleep(15.0)
 
  except KeyboardInterrupt:

    client.disconnect()
    client.loop_stop()
    my_logger.info("keyboard interrupt")

if __name__ == '__main__':
  main()

-----

2 Likes

I also found this doc http://rogerprice.org/NUT/ConfigExamples.A5.pdf where you can find some NUT status.

Thanks! NUT is one of the really amazing open source projects. You just sit there and say ‘thank you!’ and ‘wow!’ to the amazing folks that created it and maintain it.

Back to my original answer to your first query. Since I rely on HA’s events and the value that NUT publishes as the UPS status, ‘OL CHRG’, ‘OL DISCHRG’, ‘OL’ … etc… If you do the same, I do recommend you test your UPS and the status changes that NUT passes along with various changes in state by the UPS. I am not sure, but I suspect that if you go the same route, the state change sequences and status string returned can be UPS specific. In the case of my CyberPower units, until I looked in detail during tests, I was missing some of the routes the UPS went from ‘on battery’ back to ‘fully charged and back on mains’.

Just a question: If I am not mistaken the status changes not immediately in HA. Is there any parameter to set the read interval fo the values in NUT or HA?

The pollinterval in UPS.CONF controls the rate at which NUT queries the UPS, this is where I had my problem with my CyberPower units, the default value of 2 seconds caused my UPS to quit talking to NUT over the USB connection. I ended up with a value of 15 seconds and it has been solid for over a year of continuous operation. I did not try to tune it down to a smaller value, so I do not know what the minimum is for the CyberPower:

[cyberpower1]
        driver = usbhid-ups
        port = auto
        desc = "CyberPower BRG850AVRLCD"
        pollinterval = 15

Since I am doing a rather inefficient polling loop of 15 seconds in my python code that polls NUT’s upsc utility and publishes to MQTT, Home Assistant will see the changes somewhere between 15 and 30 seconds after an event occurs at the UPS. This was what I saw during my testing.

If you use the direct HA to NUT integration, I looked at the HA documentation and I do not see a mention of the default sensor scan interval for NUT, nor do I see that it supports the ‘scan_interval:’ configuration key. Again I recommend doing testing with your configuration. And you can also look in GITHUB for the HA code for NUT and see if you can figure out it’s scanning interval.