Sensors for Telekom Speedport Smart 4

Okay, I finally found a working solution.

1. Install https://github.com/AlexxIT/PythonScriptsPro (see also https://smarterkram.de/2524/) and restart HomeAssistant.

2. Add “sensor include” and requirements to configuration.yaml:

...
sensor: !include_dir_merge_list includes/sensors/
...
# Python Scripts Pro (HACS)
python_script:
  requirements:
  - influxdb==5.3.1
  - lxml==4.9.2
  - pycryptodome==3.17
  - requests==2.28.2

3. Create folders /config/www/json/ and /config/python_scripts/ if necessary.

4. Copy the dsl-monitoring-script (thanks again @aaronk6!) and save it to /config/python_scripts/ as “smart4-vdsl.py”:

#!/usr/bin/env python3

import json
import sys
import os
import argparse
import requests
import datetime
from time import sleep
from influxdb import line_protocol
from Crypto.Cipher import AES

FORMAT_JSON = 'json'
FORMAT_INFLUXDB = 'influxdb'
FORMAT_RAW = 'raw'

OUTPUT_FORMATS = [ FORMAT_JSON, FORMAT_INFLUXDB, FORMAT_RAW ]

DEFAULT_HOSTNAME = '169.254.2.1'
DEFAULT_PORT = 80
DEFAULT_KEY = 'cdc0cac1280b516e674f0057e4929bca84447cca8425007e33a88a5cf598a190'
DEFAULT_FORMAT = FORMAT_JSON
STATUS_ROUTE = '/data/Status.json'

HTTP_TIMEOUT = 5
MAX_RETRIES = 3
RETRY_WAIT = 3

MEASUREMENT_NAME = "smart4_vdsl_status"
REPORT_FIELDS = [ 'dsl_link_status', 'dsl_downstream', 'dsl_upstream', 'firmware_version' ]
# Fields are strings by default. Cast these to integers:
INTEGER_FIELDS = [ 'dsl_downstream', 'dsl_upstream' ]

def http_get_encrypted_json(encryptionKey, url, params={}):
    res = None

    for i in range(MAX_RETRIES+1):
        try:
            headers = { 'Accept': 'application/json' }
            response = requests.get(url, params=params, headers=headers, timeout=HTTP_TIMEOUT)

            try:
                res = response.json()
            except ValueError:
                try:
                    decrypted = decrypt_response(encryptionKey, response.text)
                    res = json.loads(decrypted)
                except ValueError:
                    eprint("Decryption or JSON parsing failed")
                    continue

        except Exception as e:
            eprint("Error: %s" % e)
            if i < MAX_RETRIES:
                eprint("Will do %i. retry in %i sec(s)..." % (i+1, RETRY_WAIT ))
                sleep(RETRY_WAIT)
            else:
                eprint("Maximum number of retries exceeded, giving up")
            continue
        break

    return res

def decrypt_response(keyHex, data):
    # thanks to https://stackoverflow.com/a/69054338/1387396

    key = bytes.fromhex(keyHex)
    nonce = bytes.fromhex(keyHex)[:8]

    ciphertextTag = bytes.fromhex(data)
    ciphertext = ciphertextTag[:-16]
    tag = ciphertextTag[-16:]

    cipher = AES.new(key, AES.MODE_CCM, nonce)
    decrypted = cipher.decrypt_and_verify(ciphertext, tag)
    return decrypted.decode('utf-8')

def get_field(report, name):
    field = next((x for x in report if x['varid'] == name), None)
    if name in INTEGER_FIELDS:
        return int(field['varvalue'])
    return field['varvalue']

def get_vdsl_status(hostname, port, key, raw=False):
    url = "http://%s:%i%s" % ( hostname, port, STATUS_ROUTE )
    report = http_get_encrypted_json(key, url)

    if raw: return report

    status = {}
    for field in REPORT_FIELDS:
        status[field] = get_field(report, field)

    return status

def get_data_points(vdsl_status):
    data = { "points": [] }
    time = get_current_utc_time()

    data["points"].append({
        "measurement": MEASUREMENT_NAME,
        "fields": vdsl_status,
        "time": time
    })
    return data

def get_formatted_output(status, format):
    if (format == FORMAT_INFLUXDB):
        return line_protocol.make_lines(get_data_points(status))
    elif (format in [ FORMAT_JSON, FORMAT_RAW ]):
        return json.dumps(status)
    else:
        eprint("Unknown format %s" % format)
        return ''

def get_current_utc_time():
    return datetime.datetime.utcnow().replace(microsecond=0).isoformat()

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--hostname", default=DEFAULT_HOSTNAME,
                        help="Specify hostname or IP address (defaults to %s)" % DEFAULT_HOSTNAME)
    parser.add_argument("--port", default=DEFAULT_PORT, type=int,
                        help="Specify port (defaults to %i" % DEFAULT_PORT)
    parser.add_argument("--key", default=DEFAULT_KEY,
                        help="Specify key for AES decryption (defaults to %s)" % DEFAULT_KEY)
    parser.add_argument("--format", default=DEFAULT_FORMAT,
                        help="Specify the output format (one of %s; defaults to %s)" % ( ', '.join(OUTPUT_FORMATS), DEFAULT_FORMAT ) )

    params = parser.parse_args()

    status = get_vdsl_status(params.hostname, params.port, params.key, params.format == FORMAT_RAW)
    output = get_formatted_output(status, params.format)

    if(output == ''): exit(1)

    print(output)

if __name__ == '__main__':
    main()

5. Add the following code to you secrets.yaml:

...
# Speedport Smart 4-JSON-Adresse
Speedport_Smart_4_json_link: https://homeassistant.<your_domain>.<your_tld>/local/json/Speedport_Smart_4_Status.json
...

6. Save the following code to /config/includes/sensors/Speedport_Smart_4.yaml:

# Python-Aufruf

  - platform: command_line
    name: Speedport Smart 4 - Python-Script-Aufruf
    command: "python3 /config/python_scripts/smart4-vdsl.py --hostname 192.168.1.1 --format raw > /config/www/json/Speedport_Smart_4_Status.json"
    scan_interval: 45


# Telekom Speedport Smart 4-Sensoren

  - platform: rest
    name: Speedport Smart 4 - Router-Name
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'device_name') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Router-Status
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'router_state') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - DSL-Link-Status
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'dsl_link_status') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Online-Status
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'onlinestatus') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Tage online
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'days_online') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Zeit online
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'time_online') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Internet online seit
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'inet_uptime') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Dual-Stack-Modus
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: >
       {% if (value_json | selectattr('varid', 'eq', 'dualstack') | map(attribute='varvalue') | first | default) == "0" %}
         nein
       {% elif (value_json | selectattr('varid', 'eq', 'dualstack') | map(attribute='varvalue') | first | default) == "1" %}
         ja
       {% else %}
         "{{ value_json | selectattr('varid', 'eq', 'dualstack') | map(attribute='varvalue') | first | default }}"
       {% endif %}

  - platform: rest
    name: Speedport Smart 4 - DSL-Tunnel
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: >
       {% if (value_json | selectattr('varid', 'eq', 'dsl_tunnel') | map(attribute='varvalue') | first | default) == "0" %}
         offline
       {% elif (value_json | selectattr('varid', 'eq', 'dsl_tunnel') | map(attribute='varvalue') | first | default) == "1" %}
         online
       {% else %}
         "{{ value_json | selectattr('varid', 'eq', 'dsl_tunnel') | map(attribute='varvalue') | first | default }}"
       {% endif %}

  - platform: rest
    name: Speedport Smart 4 - LTE-Tunnel
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: >
       {% if (value_json | selectattr('varid', 'eq', 'lte_tunnel') | map(attribute='varvalue') | first | default) == "0" %}
         offline
       {% elif (value_json | selectattr('varid', 'eq', 'lte_tunnel') | map(attribute='varvalue') | first | default) == "1" %}
         online
       {% else %}
         "{{ value_json | selectattr('varid', 'eq', 'lte_tunnel') | map(attribute='varvalue') | first | default }}"
       {% endif %}

  - platform: rest
    name: Speedport Smart 4 - Hybrid-Tunnel
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: >
       {% if (value_json | selectattr('varid', 'eq', 'hybrid_tunnel') | map(attribute='varvalue') | first | default) == "0" %}
         offline
       {% elif (value_json | selectattr('varid', 'eq', 'hybrid_tunnel') | map(attribute='varvalue') | first | default) == "1" %}
         online
       {% else %}
         "{{ value_json | selectattr('varid', 'eq', 'hybrid_tunnel') | map(attribute='varvalue') | first | default }}"
       {% endif %}

  - platform: rest
    name: Speedport Smart 4 - Domain-Name
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'domain_name') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Status
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'status') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Datum + Zeit
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'datetime') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Firmware-Version
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'firmware_version') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - Serien-Nummer
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'serial_number') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - DSL-Downstream
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'dsl_downstream') | map(attribute='varvalue') | first | default | float/1000000 }}"
    unit_of_measurement: "MBit/s"

  - platform: rest
    name: Speedport Smart 4 - DSL-Upstream
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'dsl_upstream') | map(attribute='varvalue') | first | default | float/1000000 }}"
    unit_of_measurement: "MBit/s"

  - platform: rest
    name: Speedport Smart 4 - DSL-PoP
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'dsl_pop') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - WLAN-SSID (2,4 GHz)
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'wlan_ssid') | map(attribute='varvalue') | first | default }}"

  - platform: rest
    name: Speedport Smart 4 - WLAN-SSID (5 GHz)
    resource: !secret Speedport_Smart_4_json_link
    scan_interval: 60
    value_template: "{{ value_json | selectattr('varid', 'eq', 'wlan_5ghz_ssid') | map(attribute='varvalue') | first | default }}"

7. Restart HomeAssistant - your sensors should now be ready.

There are even more sensors available than configured here - I will configure them as soon as my hybrid 5G-DSL-internet access has been installed in May.

Best regards,
David

EDIT 17.04.:
Changed the way the JSON values are read out in order to make sensor output more reliable.

1 Like