Update to script for calling Xcom-LAN library functions. The main change was adding the capability for HA to pause SCOM to allow the nightly datalog uploads to the Studer portal to take place.
import sys
import datetime
import time
import requests
import logging
import math
# XcomLANTCP custom imports
from xcom_proto import XcomP as param # Used for the xcom.getValueByID
from xcom_proto import XcomC # Used for the xcom.getValue
from xcom_proto import XcomLANTCP
class HomeAssistant:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.log = logging.getLogger("HomeAssistant")
def sendData(self, sensorName: str, readableName: str, sensorState, sensorUnit: str, binarySensor=False):
if binarySensor:
url = self.url + "binary_sensor." + sensorName
else:
url = self.url + "sensor." + sensorName
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
if sensorUnit == "":
attr = {
"friendly_name": readableName
}
else:
attr = {
"unit_of_measurement": sensorUnit,
"friendly_name": readableName
}
data = {
"state": sensorState,
"attributes": attr
}
self.log.debug(data)
resp = requests.post(url=url, headers=headers, json=data)
if 200 <= resp.status_code <= 201:
self.log.debug(f"HA send OK ({resp.status_code})")
else:
self.log.error(f"HA send failed! ({resp.status_code} / {url})")
def read_sensor(self, sensor_name: str):
url = self.url + f"sensor.{sensor_name}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
resp = requests.get(url=url, headers=headers)
if 200 <= resp.status_code <= 201:
data = resp.json()
sensor_state = data.get("state")
return sensor_state
else:
self.log.error(f"HA read failed! ({resp.status_code} / {url})")
return None
HOME_ASS_URL = "http://<ip>:8123/api/states/" # use loopbak ip address 127.0.0.1 if this script runs on same machine as HA
HOME_ASS_TOKEN = "<your token>" # must be made using an administrative account, account should be local access only
ha = HomeAssistant(HOME_ASS_URL, HOME_ASS_TOKEN)
def main():
# Passed parameters from command line
num_measurements = int(sys.argv[1]) if len(sys.argv) > 1 else 1
delay = float(sys.argv[2]) if len(sys.argv) > 2 else 1
# Initialize variables
measurements = 0 # Initialize loop counter measurements to 0
previous_studer_python_script_loop_delay = 0
previous_datetime = 0
scom_error_code = 0
previous_tariff_price = 0
previous_studer_smart_boost_limit = 0
previous_studer_soc_level_for_backup = 0
previous_studer_inverter_charge_current = 0
while (measurements < num_measurements) or (num_measurements < 0):
try:
# When the loop delay value received from Home Assistant HA) is negative, pause XcomLAN activity indefinitly until a positive value is received.
# This is to allow the Studer portal to receive the nightly datalog upload which can't occur when there is other XcomLAN traffic.
# The upload takes place sometime after midnight. An automation in HA can be used to start the pause by setting the delay value to a negaive value shortly after midnight.
# Make sure that the RCC clock is correctly set, you will know by checking the today solar energy production totalizer "scom_solar_prod_vs1" HA sensor that this script creates,
# is reset to 0 at midnight.
# The XcomLAN traffic can be unpaused by an HA automation either with a fixed timer, a RESTful sensor that monitors the Studer portal for a new datalog,
# or a combination of the two.
# Get loop delay value from HA, and check if it is negative. If it is negative, pause XcomLAN activity indefinitly until a positive value is received from HA.
current_studer_python_script_loop_delay = ha.read_sensor(sensor_name="ha_scom_studer_python_script_loop_delay")
start_datetime = datetime.datetime.now()
if float(current_studer_python_script_loop_delay) < 0:
if current_studer_python_script_loop_delay != previous_studer_python_script_loop_delay:
delay = float(current_studer_python_script_loop_delay)
print(f"At {start_datetime} the Current Studer Python Script Loop Delay is: {current_studer_python_script_loop_delay} s Loop paused due to negative delay")
previous_studer_python_script_loop_delay = current_studer_python_script_loop_delay
continue # Skip the rest of the loop (XcomLAN activity) and restart at begining of loop
# Test if loop delay value has changed since last pass through loop, if it has
if current_studer_python_script_loop_delay != previous_studer_python_script_loop_delay:
delay = float(current_studer_python_script_loop_delay)
print(f"At {start_datetime} the Current Studer Python Script Loop Delay is: {current_studer_python_script_loop_delay} s")
time.sleep(float(current_studer_python_script_loop_delay)) # Pause the loop using the latest delay value
# Calculate seconds since last read
previous_studer_python_script_loop_delay = current_studer_python_script_loop_delay
if previous_datetime != 0:
delta = start_datetime - previous_datetime
seconds_since_last_read = delta.total_seconds()
else:
seconds_since_last_read = "NaN"
previous_datetime = start_datetime
# Not currently used. (Part A) Used to measure time required to open TCP Channel
xcom_channel_open_start_datetime = datetime.datetime.now()
# Open Xcom channel
with XcomLANTCP(port=4001) as xcom: # Do not move this to before the start of the while loop since TCP channel might close if delay > 40 seconds.
# Not currently used. (Part B) Used to measure time required to open TCP Channel
xcom_channel_open_complete_datetime = datetime.datetime.now()
xcom_channel_open_time = (xcom_channel_open_complete_datetime - xcom_channel_open_start_datetime).total_seconds()
# Perform Xcom reads
# Energy
has_solar = "NaN" # 1 place holder, 11015 Model of VarioTrack
solar_prod_vs1 = xcom.getValue(param.VS_PV_PROD, dstAddr=701) # 2A
solar_prod_vs2 = xcom.getValue(param.VS_PV_PROD, dstAddr=702) # 2B
yesterday_solar_energy_vs1 = xcom.getValueByID(15027, XcomC.TYPE_FLOAT, dstAddr=701) # 3A There was an error in the library parameter list for 15027, but it has now been corrected
yesterday_solar_energy_vs2 = xcom.getValueByID(15027, XcomC.TYPE_FLOAT, dstAddr=702) # 3B
hasInverter = "NaN" # 4 place holder, 3124 ID type
grid_prod = xcom.getValue(param.AC_ENERGY_IN_CURR_DAY, dstAddr=101) # 5
ac_energy_in_prev_day = xcom.getValue(param.AC_ENERGY_IN_PREV_DAY, dstAddr=101) # 6
ac_energy_out_curr_day = xcom.getValue(param.AC_ENERGY_OUT_CURR_DAY, dstAddr=101) # 7
ac_energy_out_prev_day = xcom.getValue(param.AC_ENERGY_OUT_PREV_DAY, dstAddr=101) # 8
# Battery
has_battery_status_processor = "NaN" # 9 place holder, 15074 ID type
battery_charge = xcom.getValue(param.BATT_CHARGE, dstAddr=601) # 10
yesterday_battery_charge = xcom.getValue(param.BATT_CHARGE_PREV_DAY, dstAddr=601) # 11
battery_discharge = xcom.getValue(param.BATT_DISCHARGE, dstAddr=601) # 12
yesterday_battery_discharge = xcom.getValue(param.BATT_DISCHARGE_PREV_DAY, dstAddr=601) # 13
has_ac_coupling = "NaN" # 14 place holder
ac_coupling = "NaN" # 15 place holder
yesterday_ac_coupling = "NaN" # 16 place holder
battery_voltage = xcom.getValue(param.BATT_VOLTAGE, dstAddr=601) # 17
battery_current = xcom.getValue(param.BATT_CURRENT, dstAddr=601) # 18
soc = xcom.getValue(param.BATT_SOC, dstAddr=601) # 19
battery_temp = xcom.getValue(param.BATT_TEMP) # 20
batt_cycle_phase = xcom.getValue(param.BATT_CYCLE_PHASE_XT, dstAddr=101) # 21
soc_level_for_grid_feeding = xcom.getValue(param.SOC_LEVEL_FOR_GRID_FEEDING, dstAddr=601)
# Power
solar_power_vs1 = xcom.getValue(param.VS_PV_POWER, dstAddr=701) # 22A
solar_power_vs2 = xcom.getValue(param.VS_PV_POWER, dstAddr=702) # 22B
gridpower = xcom.getValue(param.AC_POWER_IN, dstAddr=101) # 23
houseload = xcom.getValue(param.AC_POWER_OUT, dstAddr=101) # 24
battpower = xcom.getValue(param.BATT_POWER) # 25
has_ac_coupling = "NaN" # 26 place holder
ac_coupling = "NaN" # 27 place holder
ac_in_current = xcom.getValue(param.AC_CURRENT_IN, dstAddr=101) # 28
ac_out_current = xcom.getValue(param.AC_CURRENT_OUT, dstAddr=101) # 29
ac_voltage_in = xcom.getValue(param.AC_VOLTAGE_IN, dstAddr=101) # 30
ac_voltage_out = xcom.getValue(param.AC_VOLTAGE_OUT, dstAddr=101) # 31
ac_freq_in = xcom.getValue(param.AC_FREQ_IN, dstAddr=101) # 32
ac_freq_out = xcom.getValue(param.AC_FREQ_OUT, dstAddr=101) # 33
# Push data to Home Assistant
# Status
ha.sendData("scom_error_code", "Scom Error Code", scom_error_code, " ")
ha.sendData("scom_seconds_since_last_read", "Scom Seconds Since Last Read", seconds_since_last_read, "s")
# Energy
ha.sendData("scom_solar_prod_vs1", "Scom Solar Prod VS1", round(solar_prod_vs1, 2), "kWh") #2A
ha.sendData("scom_solar_prod_vs2", "Scom Solar Prod VS2", round(solar_prod_vs2, 2), "kWh") #2B
if not math.isnan(yesterday_solar_energy_vs1):
ha.sendData("scom_yesterday_solar_energy_vs1", "Scom Yesterday Solar Energy VS1", round(yesterday_solar_energy_vs1, 2), "kWh") #3A
if not math.isnan(yesterday_solar_energy_vs1):
ha.sendData("scom_yesterday_solar_energy_vs2", "Scom Yesterday Solar Energy VS2", round(yesterday_solar_energy_vs2, 2), "kWh") #3B
ha.sendData("scom_grid_prod", "Scom Grid Production", round(grid_prod, 2), "kWh") #5
ha.sendData("scom_ac_energy_in_prev_day", "Scom AC Energy In Previous Day", round(ac_energy_in_prev_day, 2), "kWh") #6
ha.sendData("scom_ac_energy_out_curr_day", "Scom AC Energy Out Current Day", round(ac_energy_out_curr_day, 2), "kWh") #7
ha.sendData("scom_ac_energy_out_prev_day", "Scom AC Energy Out Previous Day", round(ac_energy_out_prev_day, 2), "kWh") #8
# Battery
ha.sendData("scom_battery_charge", "Scom Battery Charge", round(battery_charge, 2), "Ah") #10
ha.sendData("scom_yesterday_battery_charge", "Scom Yesterday Battery Charge", round(yesterday_battery_charge, 2), "Ah") #11
ha.sendData("scom_battery_discharge", "Scom Battery Discharge", round(battery_discharge, 2), "Ah") #12
ha.sendData("scom_yesterday_battery_discharge", "Scom Yesterday Battery Discharge", round(yesterday_battery_discharge, 2), "Ah") #13
ha.sendData("scom_battery_voltage", "Scom Battery Voltage", round(battery_voltage, 2), "V") #17
ha.sendData("scom_battery_current", "Scom Battery Current", round(battery_current, 2), "A") #18
ha.sendData("scom_soc", "Scom Battery State of Charge", int(soc), "%") #19
ha.sendData("scom_battery_temp", "Scom Battery Temperature", round(battery_temp, 2), "°C") #20
ha.sendData("scom_batt_cycle_phase", "Scom Battery Cycle Phase", batt_cycle_phase, "") #21
ha.sendData("scom_soc_level_for_grid_feeding", "Scom SOC Level for Grid Feeding", soc_level_for_grid_feeding, "%")
# Power
ha.sendData("scom_solar_power_vs1", "Scom Solar Power VS1", round(solar_power_vs1, 3), "kW") #22A
ha.sendData("scom_solar_power_vs2", "Scom Solar Power VS2", round(solar_power_vs2, 3), "kW") #22B
ha.sendData("scom_gridpower", "Scom Grid Power", round(gridpower, 3), "kW") #23
ha.sendData("scom_houseload", "Scom Houseload Power", round(houseload, 3), "kW") #24
ha.sendData("scom_battpower", "Scom Battery Power", round(battpower/1000, 3), "kW") #25
ha.sendData("scom_ac_in_current", "Scom AC In Current", round(ac_in_current, 2), "A") #28
ha.sendData("scom_ac_out_current", "Scom AC Out Current", round(ac_out_current, 2), "A") #29
ha.sendData("scom_ac_voltage_in", "Scom AC Voltage In", int(ac_voltage_in), "V") #30
ha.sendData("scom_ac_voltage_out", "Scom AC Voltage Out", int(ac_voltage_out), "V") #31
ha.sendData("scom_ac_freq_in", "Scom AC Frequency In", round(ac_freq_in, 2), "Hz") #32
ha.sendData("scom_ac_freq_out", "Scom AC Frequency Out", round(ac_freq_out, 2), "Hz") #33
# Read values from HA, test if changed, and if changed, print to console, and write new values to Studer RAM.
current_tariff_price = ha.read_sensor(sensor_name="tariff_price") # reads spot electric rate from an HA sensor in my installation that is updated every 15 minutes
if current_tariff_price != previous_tariff_price:
print(f"At {start_datetime} the Current Tariff Price is: {current_tariff_price} €/kWh")
previous_tariff_price = current_tariff_price
current_studer_smart_boost_limit = ha.read_sensor(sensor_name="ha_scom_studer_smart_boost_limit")
if current_studer_smart_boost_limit != previous_studer_smart_boost_limit:
xcom.setValue(param.SMART_BOOST_LIMIT, float(current_studer_smart_boost_limit), dstAddr=101)
print(f"At {start_datetime} the Current Studer Smart Boost Limit is: {current_studer_smart_boost_limit} %")
previous_studer_smart_boost_limit = current_studer_smart_boost_limit
current_studer_inverter_charge_current = ha.read_sensor(sensor_name="ha_scom_studer_inverter_charge_current")
if current_studer_inverter_charge_current != previous_studer_inverter_charge_current:
xcom.setValue(param.BATTERY_CHARGE_CURR, float(current_studer_inverter_charge_current), dstAddr=101) # Writing to RAM is not working for this parameter, but writing to flash is. Send a question to Studer to investigate why.
# xcom.setValue(param.BATTERY_CHARGE_CURR, float(current_studer_inverter_charge_current), dstAddr=101, propertyID=XcomC.QSP_VALUE) # One time test write to flash to see on Rcc
print(f"At {start_datetime} the Current Studer Inverter Charge Current is: {current_studer_inverter_charge_current} A")
previous_studer_inverter_charge_current = current_studer_inverter_charge_current
current_studer_soc_level_for_backup = ha.read_sensor(sensor_name="ha_scom_studer_soc_level_for_backup")
if current_studer_soc_level_for_backup != previous_studer_soc_level_for_backup:
if float(current_studer_soc_level_for_backup) >= float(soc_level_for_grid_feeding): # The maximum allowed value cannot be greater than SOC Level for Grid Feeding
current_studer_soc_level_for_backup = soc_level_for_grid_feeding - 1.0
xcom.setValue(param.SOC_LEVEL_FOR_BACKUP, float(current_studer_soc_level_for_backup), dstAddr=601)
print(f"At {start_datetime} the Current Studer SOC Level for Backup is: {current_studer_soc_level_for_backup} %")
previous_studer_soc_level_for_backup = current_studer_soc_level_for_backup
scom_error_code = 0
if num_measurements >= 0:
measurements += 1
except AssertionError as e:
print("Assertion Error at ", datetime.datetime.now(), e)
scom_error_code = 1
except ValueError as e:
print("Value Error at ", datetime.datetime.now(), e)
scom_error_code = 2
except ConnectionRefusedError as e:
print("Connection Refused Error at ", datetime.datetime.now(), e)
scom_error_code = 3
except ConnectionResetError as e:
print("Connection Reset Error at ", datetime.datetime.now(), e)
scom_error_code = 4
if __name__ == "__main__": #1 move this to the end
main()