Source for BYD Atto 3 battery state of charge

Look at this code. It works perfect for me. The BYD app runs on a badly scratched Samsung Galaxy S10. At least now I have a use for it.

import json
import subprocess
import time
import xml.etree.ElementTree as ET
import paho.mqtt.client as mqtt

# ===== MQTT Einstellungen =====
MQTT_BROKER = "192.168.1.40"
MQTT_PORT = 1883
MQTT_USER = "user"
MQTT_PASS = "password"
MQTT_TOPIC_BASE = "byd/app"

# ===== XML Mapping =====
XML_MAP = {
    "com.byd.bydautolink:id/h_km_tv": "range_km",
    "com.byd.bydautolink:id/tv_batter_percentage": "battery_soc",
    "com.byd.bydautolink:id/tv_charging_tip": "charging_tip",
    "com.byd.bydautolink:id/tv_banner": "warning",
    "com.byd.bydautolink:id/car_inner_temperature": "cabin_temp",
    "com.byd.bydautolink:id/tem_tv": "ac_set_temp",
    "com.byd.bydautolink:id/h_car_name_tv": "car_status",
    "com.byd.bydautolink:id/tv_update_time": "last_update",
    "com.byd.bydautolink:id/tire_tv_01": "tire_pressure_fl",
    "com.byd.bydautolink:id/tire_tv_02": "tire_pressure_rl",
    "com.byd.bydautolink:id/tire_tv_03": "tire_pressure_fr",
    "com.byd.bydautolink:id/tire_tv_04": "tire_pressure_rr",
    "com.byd.bydautolink:id/iv_value_front_hood": "front_hood",
    "com.byd.bydautolink:id/iv_value_door": "door",
    "com.byd.bydautolink:id/iv_value_window": "window",
    "com.byd.bydautolink:id/iv_value_sunroof": "sunroof",
    "com.byd.bydautolink:id/iv_value_trunk": "trunk",
    "com.byd.bydautolink:id/tv_value_mileage": "odometer",
    "com.byd.bydautolink:id/tv_recent_energy_value": "recent_energy_value",
    "com.byd.bydautolink:id/tv_total_energy_value": "total_energy_value",
    "com.byd.bydautolink:id/tv_tv_car_state_value": "car_state_value",
    "com.byd.bydautolink:id/tv_car_travel_state_title": "car_travel_state_title",
}

# ===== ADB Aktionen =====
ACTIONS = {
    "refresh": ("input tap 546 370", 2),
    "car_status": ("input tap 284 1131", 1),
    "car_position": ("input tap 854 1822", 1),
    "navigation": ("input tap 726 1983", 1),
    "map_back": ("input tap 92 242", 1),
    "finish": ("input tap 84 170", 1),
    "swipe_up": ("input swipe 500 1500 500 500 500", 2),
    "swipe_down": ("input swipe 500 500 500 1500 500", 2),
    "stop_google_maps": ("am force-stop com.google.android.apps.maps", 2),
}


# ===== MQTT Setup =====
def connect_mqtt():
    client = mqtt.Client()
    client.username_pw_set(MQTT_USER, MQTT_PASS)
    client.on_disconnect = lambda c, u, rc: print("⚠️ MQTT getrennt, versuche Reconnect...")
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.loop_start()
    return client


client = connect_mqtt()


# ===== ADB Hilfsfunktionen =====
def adb_command(cmd: str, wait: float = 1.0):
    subprocess.run(["adb", "shell"] + cmd.split(), check=False)
    if wait:
        time.sleep(wait)


def perform(action: str):
    if action not in ACTIONS:
        print(f"❌ Unbekannte Aktion: {action}")
        return
    cmd, wait = ACTIONS[action]
    adb_command(cmd, wait)


def get_ui_dump(filename="window_dump.xml"):
    remote_file = f"/sdcard/{filename}"
    adb_command(f"uiautomator dump {remote_file}", 1)
    subprocess.run(["adb", "pull", remote_file, filename],
                   check=False,
                   stdout=subprocess.DEVNULL,
                   stderr=subprocess.DEVNULL)
    return filename


# ===== XML Parsing =====
def extract_values(xml_file: str):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    values = {}

    for node in root.iter("node"):
        rid = node.attrib.get("resource-id", "")
        text = node.attrib.get("text", "")
        bounds = node.attrib.get("bounds", "")
        cls = node.attrib.get("class", "")

        if not text:
            continue

        if rid in XML_MAP:
            values[XML_MAP[rid]] = text
        elif cls == "android.widget.TextView" and "[83,153]" in bounds:
            values["car_position"] = text
    return values


# ===== MQTT Publish Funktionen =====
def publish_car_position(raw_coords):
    try:
        lat, lon = map(str.strip, raw_coords.split(","))
        payload = {
            "latitude": float(lat),
            "longitude": float(lon),
            "gps_accuracy": 10,
            "status": "driving"
        }
        client.publish(f"{MQTT_TOPIC_BASE}/location", json.dumps(payload))
        print(f"MQTT publish -> {payload}")
    except Exception as e:
        print(f"Fehler beim Verarbeiten der Koordinaten: {e}")


def publish_values(values: dict):
    if not values:
        print("⚠️ Keine Werte gefunden.")
        return

    # Einzelne Sensoren
    for key, val in values.items():
        topic = f"{MQTT_TOPIC_BASE}/{key}"
        client.publish(topic, val)
        print(f"➡️ Publish {topic} -> {val}")


# ===== Hauptschleife =====
def main_loop():
    while True:
        all_values = {}

        # Reihenfolge der Dumps/Aktionen
        dumps_sequence = [
            ("window_dump_1.xml", []),
            ("window_dump_2.xml", ["swipe_up", "car_status"]),
            ("window_dump_3.xml", ["swipe_up"]),
            ("window_dump_4.xml", ["finish", "swipe_down", "car_position"]),
            ("window_dump_5.xml", ["navigation"]),
        ]

        for filename, actions in dumps_sequence:
            for action in actions:
                perform(action)
            get_ui_dump(filename)
            all_values.update(extract_values(filename))

        # Position separat publishen
        if "car_position" in all_values:
            publish_car_position(all_values["car_position"])

        # Alle Sensorwerte publishen
        publish_values(all_values)

        # Google Maps stoppen & zurück
        perform("stop_google_maps")
        perform("map_back")

        time.sleep(60)


if __name__ == "__main__":
    try:
        main_loop()
    except KeyboardInterrupt:
        print("🚪 Script beendet.")
        client.loop_stop()
        client.disconnect()
1 Like