I have a heat meter at home and can read it and display the data in Home Assistant.
Things required:
-
USB reading head (e.g. Hichi USB reading head)
-
Python Script Pro Integration
-
Now the USB reading head is connected to Home Assistant and the Python Script Pro Integration is installed. Now the following script is uploaded to Home Assistant, e.g. with the File Editor Addon.
-
The template sensors must then be created
-
The script can now be called with an automation and the data should be transferred successfully
How the script works:
-
A connection is established to the USB reader
-
Data is converted,
-
Transfer of data to Home Assistant
Sensoren anlegen:
# Wärmmemengenzähler sensoren
mqtt:
sensor:
- name: "Energieverbrauch"
state_topic: "mbus/data"
value_template: >-
{% for record in value_json.body.records %}
{% if record.type == "VIFUnit.ENERGY_WH" %}
{{ record.value }}
{% endif %}
{% endfor %}
unit_of_measurement: "Wh"
unique_id: wmz_energieverbrauch
device_class: energy
state_class: total_increasing
device:
name: "Wärmemengenzähler"
identifiers:
- "22835426"
manufacturer: "Itron"
model: "CF Echo 2"
- name: "Volumenstrom"
state_topic: "mbus/data"
value_template: >-
{% for record in value_json.body.records %}
{% if record.type == "VIFUnit.VOLUME" %}
{{ record.value }}
{% endif %}
{% endfor %}
unit_of_measurement: "m3"
unique_id: wmz_volumen
device:
name: "Wärmemengenzähler"
identifiers:
- "22835426"
manufacturer: "Itron"
model: "CF Echo 2"
- name: "Power"
state_topic: "mbus/data"
value_template: >-
{% for record in value_json.body.records %}
{% if record.type == "VIFUnit.POWER_W" %}
{{ record.value }}
{% endif %}
{% endfor %}
unit_of_measurement: "W"
unique_id: wmz_Power
device:
name: "Wärmemengenzähler"
identifiers:
- "22835426"
manufacturer: "Itron"
model: "CF Echo 2"
- name: "Vorlauf"
state_topic: "mbus/data"
value_template: >-
{% for record in value_json.body.records %}
{% if record.type == "VIFUnit.FLOW_TEMPERATURE" %}
{{ record.value }}
{% endif %}
{% endfor %}
unit_of_measurement: C
unique_id: wmz_vorlauf
device:
name: "Wärmemengenzähler"
identifiers:
- "22835426"
manufacturer: "Itron"
model: "CF Echo 2"
- name: "Rücklauf"
state_topic: "mbus/data"
value_template: >-
{% for record in value_json.body.records %}
{% if record.type == "VIFUnit.RETURN_TEMPERATURE" %}
{{ record.value }}
{% endif %}
{% endfor %}
unit_of_measurement: "C"
unique_id: wmz_rücklauf
device:
name: "Wärmemengenzähler"
identifiers:
- "22835426"
manufacturer: "Itron"
model: "CF Echo 2"
- name: "Differenz"
state_topic: "mbus/data"
value_template: >-
{% for record in value_json.body.records %}
{% if record.type == "VIFUnit.TEMPERATURE_DIFFERENCE" %}
{{ record.value }}
{% endif %}
{% endfor %}
unit_of_measurement: "K"
unique_id: wmz_differenz
device:
name: "Wärmemengenzähler"
identifiers:
- "22835426"
manufacturer: "Itron"
model: "CF Echo 2"
Script zum auslesen der Daten:
import serial
import time
import binascii
import logging
import meterbus
import paho.mqtt.client as mqtt
import json
# Logger konfigurieren
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# MQTT Konfiguration (Anpassen!)
MQTT_BROKER = "xxx" # z.B. 192.168.178.20
MQTT_PORT = 1883
MQTT_USER = "xxx"
MQTT_PASSWORD = "xxx"
MQTT_TOPIC = "mbus/data"
# MQTT Callbacks
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Erfolgreich mit MQTT Broker verbunden!")
else:
logger.error(f"Verbindung mit MQTT Broker fehlgeschlagen, Fehlercode: {rc}")
def on_publish(client, userdata, mid):
logger.info(f"Daten erfolgreich an MQTT gesendet (MID: {mid})")
def on_disconnect(client, userdata, rc):
if rc != 0:
logger.warning(f"Verbindung zum MQTT Broker getrennt, Fehlercode: {rc}. Versuche erneut zu verbinden...")
client.reconnect() # Automatische Wiederverbindung
else:
logger.info("Erfolgreich vom MQTT Broker getrennt.")
# === get_data ==================================================================================
def get_data(ser=None, test_data=None):
if test_data:
logger.info("Verwende Testdaten...")
return test_data
else:
try:
ser.write(b'\x55' * 528)
time.sleep(0.350)
ser.parity = serial.PARITY_EVEN
logger.info("Daten lesen")
read_data = b'\x10\x5B\xFE\x59\x16'
ser.write(read_data)
result = ser.read(620)
if not result:
logger.warning("Keine Daten vom seriellen Port empfangen.")
return None
byte_array_hex = binascii.hexlify(result)
logger.info(f"gelesene Daten (hex): {byte_array_hex.decode()}")
return result
except serial.SerialException as e:
logger.error(f"Fehler bei der seriellen Kommunikation in get_data: {e}")
return None
# === daten_filtern ==============================================================================
def daten_filtern(daten):
if daten is None:
return None
start_byte = b'\x68'
end_byte = b'\x16'
start_index = 0
while True:
try:
start_index = daten.index(start_byte, start_index)
end_index = daten.find(end_byte, start_index + 1)
if end_index != -1:
gefilterte_daten = daten[start_index: end_index + 1]
logger.info(f"Gefilterte Daten (hex): {binascii.hexlify(gefilterte_daten).decode()}")
return gefilterte_daten
else:
logger.warning("Kein Endbyte nach dem Startbyte gefunden.")
return None
except ValueError:
logger.warning("Kein Startbyte mehr gefunden.")
return None
start_index += 1
def transform_json(json_string):
"""
Transformiert einen JSON-String.
"""
try:
data = json.loads(json_string)
except json.JSONDecodeError:
# Fließkommazahlen runden
for record in body_content.get("records", []):
if isinstance(record.get("value"), float):
record["value"] = round(record["value"], 2)
data.update(body_content)
return json.dumps(data, indent=4)
# === main ========================================================================================
logger.info('Starte Hauptprogramm')
try:
# Testdaten definieren
#test_daten_hex = "684d4d680800722654832277040904360000000c78265483220406493500000c14490508000b2d0000000b3b0000000a5a18060a5e89020b61883200046d0d0c2c310227c80309fd0e2209fd0f470f00008d16"
#test_daten = bytes.fromhex(test_daten_hex)
# Entweder mit Testdaten testen...
#roh_daten = get_data(test_data=test_daten)
#...oder mit der seriellen Schnittstelle (auskommentieren für Tests!)
ser = serial.Serial("/dev/ttyUSB1", baudrate=2400, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=10)
roh_daten = get_data(ser)
ser.close()
if roh_daten:
gefilterte_daten = daten_filtern(roh_daten)
if gefilterte_daten:
try:
frame = meterbus.load(gefilterte_daten)
json_data = frame.to_JSON()
logger.info(f"M-Bus Daten (JSON): {json_data}")
# JSON transformieren
transformed_json = transform_json(json_data)
if transformed_json:
logger.info(f"Transformierte M-Bus Daten (JSON): {transformed_json}")
json_data_to_mqtt = transformed_json # Verwende die transformierte JSON für MQTT
else:
logger.error("Fehler bei der JSON Transformation. Sende untransformierte Daten.")
json_data_to_mqtt = json_data # Bei Fehler die ursprünglichen Daten senden
# MQTT Client erstellen und konfigurieren
client = mqtt.Client()
if MQTT_USER and MQTT_PASSWORD:
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
# Callbacks setzen
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect
try:
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
client.publish(MQTT_TOPIC, json_data_to_mqtt) # Sende die (transformierten) JSON Daten
time.sleep(1)
client.loop_stop()
client.disconnect()
logger.info("MQTT Operationen abgeschlossen.")
except Exception as e:
logger.error(f"Fehler bei der MQTT Verbindung oder beim Senden: {e}")
except meterbus.exceptions.MBusFrameDecodeError as e:
logger.error(f"Fehler beim Decodieren des M-Bus Frames: {e}")
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Parsen des M-Bus Frames: {e}")
else:
logger.info("Keine gültigen Daten zum Filtern gefunden.")
else:
logger.info("Keine Daten empfangen (weder seriell noch Testdaten).")
except serial.SerialException as e:
logger.error(f"Fehler bei der seriellen Kommunikation im Hauptprogramm: {e}")
except Exception as e:
logger.error(f"Unerwarteter Fehler im Hauptprogramm: {e}")
logger.info('Programm beendet')
Aufrufen des Scripts:
action: python_script.exec
data:
file: wmz_mbus/wmz_mbus.py