Thank you so much for sharing your work on that.
I tried your step by step instructions but get stuck on the read test with that :
Can you maybe help?
thanks
Thank you so much for sharing your work on that.
I tried your step by step instructions but get stuck on the read test with that :
If you exit python, then run “pip list” at the command prompt, does xcom_proto 0.3.3 appear on the list of installed packages?
after the
houseload = xcom.getValue(param.AC_POWER_OUT)
try hitting the enter key and wait for the “>>>” python prompt to appear before you enter the print commands.
Also, check that your Moxa is configured like this where you have added the ip address of the machine that is running the python script on the destination ip address line 2. Also the data packing section of the Moxa configuration should be the same as shown.
Thank you for your answer. I tried with Python prompt line by line but still the same problem. Probably a Python problem. I will reinstall xcom lan library.
my Moxa config is as shown in your answer.
Had you also run these three import commands prior to all other python commands?
Yes I did.
I tried to reinstall xcom-protocol-master.zip. I still have same error.
By the way thank you very much for your Studer RESTFUL tutorial, It is working very well for me : ( https://community.home-assistant.io/t/studer-innotec-solar-components-integration )
I just tried it and found that from the command prompt, I have to execute the assign statements separate from the print statements. Try entering:
with XcomLANTCP(port=4001) as xcom:
soc = xcom.getValue(param.BATT_SOC)
gridpower = xcom.getValue(param.AC_POWER_IN)
houseload = xcom.getValue(param.AC_POWER_OUT)
Then hit the return key to get back the python command prompt “>>>”
Then enter the print statements:
print('soc = ',soc, '%')
print('gridpower = ',gridpower, 'kW')
print('houseload = ',houseload, 'kW')
Other things to check:
It seems like the assignment statements aren’t assigning values to the variables soc, gridpower and houseload. Are you sure that you entered the correct ip address of the machine where the python package was installed into the Moxa ip server list? Could you have a firewall or anything else on that machine that blocks ip traffic?
To install the Xcom library and run the script locally on the same pi that runs HA, I followed the instructions here for creating a venv virtual python environment: Installing Red on Debian 12 Bookworm — Red - Discord Bot 3.5.5 documentation
Later I will look into making the script self starting so that it will auto start whenever the pi restarts or if the script were to stop.
It is now working after having tried your last recommandation to go back to comamnd prompt, thanks ! : )
Glad to hear it’s now working. Sorry I lead you astray saying there was no waiting between the read and print commands.
Update to script for calling Xcom-LAN library functions. The main change was adding the capability for HA to pause SCOM to allow the nightly datalog uploads to the Studer portal to take place.
import sys
import datetime
import time
import requests
import logging
import math
# XcomLANTCP custom imports
from xcom_proto import XcomP as param # Used for the xcom.getValueByID
from xcom_proto import XcomC # Used for the xcom.getValue
from xcom_proto import XcomLANTCP
class HomeAssistant:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.log = logging.getLogger("HomeAssistant")
def sendData(self, sensorName: str, readableName: str, sensorState, sensorUnit: str, binarySensor=False):
if binarySensor:
url = self.url + "binary_sensor." + sensorName
else:
url = self.url + "sensor." + sensorName
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
if sensorUnit == "":
attr = {
"friendly_name": readableName
}
else:
attr = {
"unit_of_measurement": sensorUnit,
"friendly_name": readableName
}
data = {
"state": sensorState,
"attributes": attr
}
self.log.debug(data)
resp = requests.post(url=url, headers=headers, json=data)
if 200 <= resp.status_code <= 201:
self.log.debug(f"HA send OK ({resp.status_code})")
else:
self.log.error(f"HA send failed! ({resp.status_code} / {url})")
def read_sensor(self, sensor_name: str):
url = self.url + f"sensor.{sensor_name}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
resp = requests.get(url=url, headers=headers)
if 200 <= resp.status_code <= 201:
data = resp.json()
sensor_state = data.get("state")
return sensor_state
else:
self.log.error(f"HA read failed! ({resp.status_code} / {url})")
return None
HOME_ASS_URL = "http://<ip>:8123/api/states/" # use loopbak ip address 127.0.0.1 if this script runs on same machine as HA
HOME_ASS_TOKEN = "<your token>" # must be made using an administrative account, account should be local access only
ha = HomeAssistant(HOME_ASS_URL, HOME_ASS_TOKEN)
def main():
# Passed parameters from command line
num_measurements = int(sys.argv[1]) if len(sys.argv) > 1 else 1
delay = float(sys.argv[2]) if len(sys.argv) > 2 else 1
# Initialize variables
measurements = 0 # Initialize loop counter measurements to 0
previous_studer_python_script_loop_delay = 0
previous_datetime = 0
scom_error_code = 0
previous_tariff_price = 0
previous_studer_smart_boost_limit = 0
previous_studer_soc_level_for_backup = 0
previous_studer_inverter_charge_current = 0
while (measurements < num_measurements) or (num_measurements < 0):
try:
# When the loop delay value received from Home Assistant HA) is negative, pause XcomLAN activity indefinitly until a positive value is received.
# This is to allow the Studer portal to receive the nightly datalog upload which can't occur when there is other XcomLAN traffic.
# The upload takes place sometime after midnight. An automation in HA can be used to start the pause by setting the delay value to a negaive value shortly after midnight.
# Make sure that the RCC clock is correctly set, you will know by checking the today solar energy production totalizer "scom_solar_prod_vs1" HA sensor that this script creates,
# is reset to 0 at midnight.
# The XcomLAN traffic can be unpaused by an HA automation either with a fixed timer, a RESTful sensor that monitors the Studer portal for a new datalog,
# or a combination of the two.
# Get loop delay value from HA, and check if it is negative. If it is negative, pause XcomLAN activity indefinitly until a positive value is received from HA.
current_studer_python_script_loop_delay = ha.read_sensor(sensor_name="ha_scom_studer_python_script_loop_delay")
start_datetime = datetime.datetime.now()
if float(current_studer_python_script_loop_delay) < 0:
if current_studer_python_script_loop_delay != previous_studer_python_script_loop_delay:
delay = float(current_studer_python_script_loop_delay)
print(f"At {start_datetime} the Current Studer Python Script Loop Delay is: {current_studer_python_script_loop_delay} s Loop paused due to negative delay")
previous_studer_python_script_loop_delay = current_studer_python_script_loop_delay
continue # Skip the rest of the loop (XcomLAN activity) and restart at begining of loop
# Test if loop delay value has changed since last pass through loop, if it has
if current_studer_python_script_loop_delay != previous_studer_python_script_loop_delay:
delay = float(current_studer_python_script_loop_delay)
print(f"At {start_datetime} the Current Studer Python Script Loop Delay is: {current_studer_python_script_loop_delay} s")
time.sleep(float(current_studer_python_script_loop_delay)) # Pause the loop using the latest delay value
# Calculate seconds since last read
previous_studer_python_script_loop_delay = current_studer_python_script_loop_delay
if previous_datetime != 0:
delta = start_datetime - previous_datetime
seconds_since_last_read = delta.total_seconds()
else:
seconds_since_last_read = "NaN"
previous_datetime = start_datetime
# Not currently used. (Part A) Used to measure time required to open TCP Channel
xcom_channel_open_start_datetime = datetime.datetime.now()
# Open Xcom channel
with XcomLANTCP(port=4001) as xcom: # Do not move this to before the start of the while loop since TCP channel might close if delay > 40 seconds.
# Not currently used. (Part B) Used to measure time required to open TCP Channel
xcom_channel_open_complete_datetime = datetime.datetime.now()
xcom_channel_open_time = (xcom_channel_open_complete_datetime - xcom_channel_open_start_datetime).total_seconds()
# Perform Xcom reads
# Energy
has_solar = "NaN" # 1 place holder, 11015 Model of VarioTrack
solar_prod_vs1 = xcom.getValue(param.VS_PV_PROD, dstAddr=701) # 2A
solar_prod_vs2 = xcom.getValue(param.VS_PV_PROD, dstAddr=702) # 2B
yesterday_solar_energy_vs1 = xcom.getValueByID(15027, XcomC.TYPE_FLOAT, dstAddr=701) # 3A There was an error in the library parameter list for 15027, but it has now been corrected
yesterday_solar_energy_vs2 = xcom.getValueByID(15027, XcomC.TYPE_FLOAT, dstAddr=702) # 3B
hasInverter = "NaN" # 4 place holder, 3124 ID type
grid_prod = xcom.getValue(param.AC_ENERGY_IN_CURR_DAY, dstAddr=101) # 5
ac_energy_in_prev_day = xcom.getValue(param.AC_ENERGY_IN_PREV_DAY, dstAddr=101) # 6
ac_energy_out_curr_day = xcom.getValue(param.AC_ENERGY_OUT_CURR_DAY, dstAddr=101) # 7
ac_energy_out_prev_day = xcom.getValue(param.AC_ENERGY_OUT_PREV_DAY, dstAddr=101) # 8
# Battery
has_battery_status_processor = "NaN" # 9 place holder, 15074 ID type
battery_charge = xcom.getValue(param.BATT_CHARGE, dstAddr=601) # 10
yesterday_battery_charge = xcom.getValue(param.BATT_CHARGE_PREV_DAY, dstAddr=601) # 11
battery_discharge = xcom.getValue(param.BATT_DISCHARGE, dstAddr=601) # 12
yesterday_battery_discharge = xcom.getValue(param.BATT_DISCHARGE_PREV_DAY, dstAddr=601) # 13
has_ac_coupling = "NaN" # 14 place holder
ac_coupling = "NaN" # 15 place holder
yesterday_ac_coupling = "NaN" # 16 place holder
battery_voltage = xcom.getValue(param.BATT_VOLTAGE, dstAddr=601) # 17
battery_current = xcom.getValue(param.BATT_CURRENT, dstAddr=601) # 18
soc = xcom.getValue(param.BATT_SOC, dstAddr=601) # 19
battery_temp = xcom.getValue(param.BATT_TEMP) # 20
batt_cycle_phase = xcom.getValue(param.BATT_CYCLE_PHASE_XT, dstAddr=101) # 21
soc_level_for_grid_feeding = xcom.getValue(param.SOC_LEVEL_FOR_GRID_FEEDING, dstAddr=601)
# Power
solar_power_vs1 = xcom.getValue(param.VS_PV_POWER, dstAddr=701) # 22A
solar_power_vs2 = xcom.getValue(param.VS_PV_POWER, dstAddr=702) # 22B
gridpower = xcom.getValue(param.AC_POWER_IN, dstAddr=101) # 23
houseload = xcom.getValue(param.AC_POWER_OUT, dstAddr=101) # 24
battpower = xcom.getValue(param.BATT_POWER) # 25
has_ac_coupling = "NaN" # 26 place holder
ac_coupling = "NaN" # 27 place holder
ac_in_current = xcom.getValue(param.AC_CURRENT_IN, dstAddr=101) # 28
ac_out_current = xcom.getValue(param.AC_CURRENT_OUT, dstAddr=101) # 29
ac_voltage_in = xcom.getValue(param.AC_VOLTAGE_IN, dstAddr=101) # 30
ac_voltage_out = xcom.getValue(param.AC_VOLTAGE_OUT, dstAddr=101) # 31
ac_freq_in = xcom.getValue(param.AC_FREQ_IN, dstAddr=101) # 32
ac_freq_out = xcom.getValue(param.AC_FREQ_OUT, dstAddr=101) # 33
# Push data to Home Assistant
# Status
ha.sendData("scom_error_code", "Scom Error Code", scom_error_code, " ")
ha.sendData("scom_seconds_since_last_read", "Scom Seconds Since Last Read", seconds_since_last_read, "s")
# Energy
ha.sendData("scom_solar_prod_vs1", "Scom Solar Prod VS1", round(solar_prod_vs1, 2), "kWh") #2A
ha.sendData("scom_solar_prod_vs2", "Scom Solar Prod VS2", round(solar_prod_vs2, 2), "kWh") #2B
if not math.isnan(yesterday_solar_energy_vs1):
ha.sendData("scom_yesterday_solar_energy_vs1", "Scom Yesterday Solar Energy VS1", round(yesterday_solar_energy_vs1, 2), "kWh") #3A
if not math.isnan(yesterday_solar_energy_vs1):
ha.sendData("scom_yesterday_solar_energy_vs2", "Scom Yesterday Solar Energy VS2", round(yesterday_solar_energy_vs2, 2), "kWh") #3B
ha.sendData("scom_grid_prod", "Scom Grid Production", round(grid_prod, 2), "kWh") #5
ha.sendData("scom_ac_energy_in_prev_day", "Scom AC Energy In Previous Day", round(ac_energy_in_prev_day, 2), "kWh") #6
ha.sendData("scom_ac_energy_out_curr_day", "Scom AC Energy Out Current Day", round(ac_energy_out_curr_day, 2), "kWh") #7
ha.sendData("scom_ac_energy_out_prev_day", "Scom AC Energy Out Previous Day", round(ac_energy_out_prev_day, 2), "kWh") #8
# Battery
ha.sendData("scom_battery_charge", "Scom Battery Charge", round(battery_charge, 2), "Ah") #10
ha.sendData("scom_yesterday_battery_charge", "Scom Yesterday Battery Charge", round(yesterday_battery_charge, 2), "Ah") #11
ha.sendData("scom_battery_discharge", "Scom Battery Discharge", round(battery_discharge, 2), "Ah") #12
ha.sendData("scom_yesterday_battery_discharge", "Scom Yesterday Battery Discharge", round(yesterday_battery_discharge, 2), "Ah") #13
ha.sendData("scom_battery_voltage", "Scom Battery Voltage", round(battery_voltage, 2), "V") #17
ha.sendData("scom_battery_current", "Scom Battery Current", round(battery_current, 2), "A") #18
ha.sendData("scom_soc", "Scom Battery State of Charge", int(soc), "%") #19
ha.sendData("scom_battery_temp", "Scom Battery Temperature", round(battery_temp, 2), "°C") #20
ha.sendData("scom_batt_cycle_phase", "Scom Battery Cycle Phase", batt_cycle_phase, "") #21
ha.sendData("scom_soc_level_for_grid_feeding", "Scom SOC Level for Grid Feeding", soc_level_for_grid_feeding, "%")
# Power
ha.sendData("scom_solar_power_vs1", "Scom Solar Power VS1", round(solar_power_vs1, 3), "kW") #22A
ha.sendData("scom_solar_power_vs2", "Scom Solar Power VS2", round(solar_power_vs2, 3), "kW") #22B
ha.sendData("scom_gridpower", "Scom Grid Power", round(gridpower, 3), "kW") #23
ha.sendData("scom_houseload", "Scom Houseload Power", round(houseload, 3), "kW") #24
ha.sendData("scom_battpower", "Scom Battery Power", round(battpower/1000, 3), "kW") #25
ha.sendData("scom_ac_in_current", "Scom AC In Current", round(ac_in_current, 2), "A") #28
ha.sendData("scom_ac_out_current", "Scom AC Out Current", round(ac_out_current, 2), "A") #29
ha.sendData("scom_ac_voltage_in", "Scom AC Voltage In", int(ac_voltage_in), "V") #30
ha.sendData("scom_ac_voltage_out", "Scom AC Voltage Out", int(ac_voltage_out), "V") #31
ha.sendData("scom_ac_freq_in", "Scom AC Frequency In", round(ac_freq_in, 2), "Hz") #32
ha.sendData("scom_ac_freq_out", "Scom AC Frequency Out", round(ac_freq_out, 2), "Hz") #33
# Read values from HA, test if changed, and if changed, print to console, and write new values to Studer RAM.
current_tariff_price = ha.read_sensor(sensor_name="tariff_price") # reads spot electric rate from an HA sensor in my installation that is updated every 15 minutes
if current_tariff_price != previous_tariff_price:
print(f"At {start_datetime} the Current Tariff Price is: {current_tariff_price} €/kWh")
previous_tariff_price = current_tariff_price
current_studer_smart_boost_limit = ha.read_sensor(sensor_name="ha_scom_studer_smart_boost_limit")
if current_studer_smart_boost_limit != previous_studer_smart_boost_limit:
xcom.setValue(param.SMART_BOOST_LIMIT, float(current_studer_smart_boost_limit), dstAddr=101)
print(f"At {start_datetime} the Current Studer Smart Boost Limit is: {current_studer_smart_boost_limit} %")
previous_studer_smart_boost_limit = current_studer_smart_boost_limit
current_studer_inverter_charge_current = ha.read_sensor(sensor_name="ha_scom_studer_inverter_charge_current")
if current_studer_inverter_charge_current != previous_studer_inverter_charge_current:
xcom.setValue(param.BATTERY_CHARGE_CURR, float(current_studer_inverter_charge_current), dstAddr=101) # Writing to RAM is not working for this parameter, but writing to flash is. Send a question to Studer to investigate why.
# xcom.setValue(param.BATTERY_CHARGE_CURR, float(current_studer_inverter_charge_current), dstAddr=101, propertyID=XcomC.QSP_VALUE) # One time test write to flash to see on Rcc
print(f"At {start_datetime} the Current Studer Inverter Charge Current is: {current_studer_inverter_charge_current} A")
previous_studer_inverter_charge_current = current_studer_inverter_charge_current
current_studer_soc_level_for_backup = ha.read_sensor(sensor_name="ha_scom_studer_soc_level_for_backup")
if current_studer_soc_level_for_backup != previous_studer_soc_level_for_backup:
if float(current_studer_soc_level_for_backup) >= float(soc_level_for_grid_feeding): # The maximum allowed value cannot be greater than SOC Level for Grid Feeding
current_studer_soc_level_for_backup = soc_level_for_grid_feeding - 1.0
xcom.setValue(param.SOC_LEVEL_FOR_BACKUP, float(current_studer_soc_level_for_backup), dstAddr=601)
print(f"At {start_datetime} the Current Studer SOC Level for Backup is: {current_studer_soc_level_for_backup} %")
previous_studer_soc_level_for_backup = current_studer_soc_level_for_backup
scom_error_code = 0
if num_measurements >= 0:
measurements += 1
except AssertionError as e:
print("Assertion Error at ", datetime.datetime.now(), e)
scom_error_code = 1
except ValueError as e:
print("Value Error at ", datetime.datetime.now(), e)
scom_error_code = 2
except ConnectionRefusedError as e:
print("Connection Refused Error at ", datetime.datetime.now(), e)
scom_error_code = 3
except ConnectionResetError as e:
print("Connection Reset Error at ", datetime.datetime.now(), e)
scom_error_code = 4
if __name__ == "__main__": #1 move this to the end
main()
HA template in configuration.yaml to check status of Studer datalog last update:
rest:
# Studer Datalog Last Date request, provides state change when Last Available Datalog date changes. Unfortunatly it doesn't give the time of the update.
# Since an automation needs to pause the SCOM communication to allow the DatalogUse upload to the Studer portal after midnight,
# use this sensor to trigger the automation to resume (end pause) of SCOM communication.
- authentication: basic
method: GET
scan_interval: 10
timeout: 300
resource: "https://api.studer-innotec.com/api/v1/datalog/available-dates/<your system id number>"
headers:
Accept: application/json
UHASH: !secret UHASH
PHASH: !secret PHASH
sensor:
- name: "Studer Datalog First Date"
unique_id: "studer_datalog_first_date"
value_template: "{{ value_json[0] }}"
# SENSOR INTEGRATIONS
sensor:
# Date and time stamp for Studer Datalog update in portal, produces a sensor with a timestamp to indicate when the Datalog was uploaded to the Studer portal.
- platform: template
sensors:
studer_datalog_timestamp:
friendly_name: "Studer Datalog Timestamp"
unique_id: "studer_datalog_timestamp"
value_template: >-
{% set date_state = states('sensor.studer_datalog_first_date') %}
{% set current_time = now().strftime('%H:%M:%S') %}
{% if date_state != state_attr('sensor.studer_datalog_timestamp', 'last_date') %}
{{ date_state | regex_replace('T00:00:00$', 'T' + current_time) }}
{% else %}
{{ states('sensor.studer_datalog_timestamp') }}
{% endif %}
attribute_templates:
last_date: "{{ states('sensor.studer_datalog_first_date') }}"
last_updated: "{{ now().isoformat() }}"
HA Automation to pause SCOM to allow datalog upload to cloud. This automation sends pause to python script before midnight, then waits for RESTful datalog sensor state change to unpause SCOM.
I made a change to this automation that eliminated the 4 minute delay on the datlog state change trigger. Comparing Wireshark reports and the datalog timestamp, I could see that the datalog update timestamp occurred after the datalog upload to the server had completed, so no trigger delay is needed. I also noted that in the case of my system which includes the following components: 1 Xtender inverter, 2 Vario String charge controllers, 1 Rcc interface, 1 Xcom-LAN Ethernet to Studer bus adapter, 1 Xcom-CAN CAN bus to Studer bus adapter, and 1 PylonTech server rack battery bank, the upload duration is just over 1.5 minutes.
alias: SCOM Pause
description: Datalog uploads to Studer portal are possible only when SCOM is paused.
trigger:
- platform: time
at: "23:53:00"
id: Pause SCOM Loop
- platform: state
entity_id:
- sensor.studer_datalog_first_date
for:
hours: 0
minutes: 0
seconds: 0
id: datalog_uploaded
- platform: time
at: "00:45:00"
id: Short loop delay
condition: []
action:
- if:
- condition: trigger
id:
- Pause SCOM Loop
then:
- service: input_number.set_value
data:
value: -1
target:
entity_id: input_number.studer_python_script_loop_delay
- service: input_number.set_value
data:
value: 1
target:
entity_id: input_number.ha_scom_studer_scom_pause
enabled: false
- if:
- condition: trigger
id:
- Short loop delay
- datalog_uploaded
then:
- service: input_number.set_value
target:
entity_id: input_number.studer_python_script_loop_delay
data:
value: 3
- service: input_number.set_value
data:
value: 0
target:
entity_id: input_number.ha_scom_studer_scom_pause
enabled: false
mode: single
I followed the instructions and read functionality is working fine. Writing SMART_BOOST_LIMIT is not getting the value there. It does not give any error, but the value stays at 100%. Just to make sure, the value we are trying to modify is “Limitation of the power Boost”, parameter id 1607, right?
Yes, the smart boost level is parameter 1607. If you set it to a low value like 2.0 or 0.0, and the actual state of charge is above the SOC Level for backup, the power taken from the battery should be very small, and the power to feed the house should mostly come from the grid.
If you are using the default write, it writes to RAM instead of flash memory. Writes to RAM can’t be read from either Xcom or the RCC, you can only confirm by the behavior of the PV system. I write to RAM and not flash since the flash is only rated for about 1000 writes and I don’t want to wear it out.
Exactly, that was my only issue with writing! Writing was working, but I was not able to read the value back which made me believe writing failed.
Today I noticed the system was not able to boost at all although all related parameters were showing correct in the portal. Setting 1607 again to a differenct value in the portal, and then back to 100% solved the issue, so I was just left at 0% after my write tests.
Glad it worked out for you I’m really like that the author of the Xcom-LAN library gave us the option (default) to write to RAM to avoid wearing out the flash memory. Too bad Studer didn’t give us a way to read the RAM, but at least we can monitor system behavior.
Now we have in this thread a well described working method for implementing and testing the Studer local communication. Next step would be to put it into a working HA configuration. What would be the best method for starting the python script automatically when HA is restarted?
I haven’t tried it yet, but this sounds like it will work to both automatically start the script and to restart it should it stop: https://www.youtube.com/watch?v=nvx9jJhSELQ
There was a mention in the first post of using the Xcom-485i for local communications. Has anybody tried that?