Lutron Grafik Eye telnet based integration

I have an older Lutron GRX-CI lighting system that I would like to get integrated with HA. It is simple a telnet session where you can send commands to both set and read lighting zones and scenes. Trying to find an existing component that might make the work a little easier.

1 Like

The Denon mediaplayer platform is using telnet.

There are more: https://github.com/home-assistant/home-assistant/search?utf8=βœ“&q=telnetlib

1 Like

Thanks fabaff. Will check them out.

@drizzay Andre, just came across this old thread of yours. While HA has a telnet integration, it doesn’t handle login or commands other than on/off. Consequently, for a hospitality client needing to control a large legacy Lutron GRAFIK Eye 4000 system, we were able to use an AppDaemon Home Assistant plug-in to run the Python code that responds to the changes to a HA drop-down helper listing the Lutron scenes, issues the appropriate telnet commands, and reports the status.

There are a lot of small hotels & large restaurants on Lutron GRAFIK Eye, and they may not think that it can be integrated with anything current like Home Assistant, but it can.

I actually cobbled something together that has worked for years. I used shell commands. I recenty asked chatGPT to write code for home assistant for the grapfik eye and it did it better but i haven’t implemented it yet.
lol.

import json
import telnetlib
import time
import paho.mqtt.client as mqtt
import logging
import sys
import warnings
import re

# --- CALIBRATION SETTING ---
FADE_TIME = 5.0 
# ---------------------------

warnings.filterwarnings("ignore", message=".*telnetlib.*")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

logger.info("πŸš€ Script starting up...")

try:
    with open('/data/options.json') as f:
        config = json.load(f)
    logger.info("βœ… Configuration loaded")
except Exception as e:
    logger.error(f"❌ Options error: {e}")
    sys.exit(1)

GE_CONF = config.get('grafik_eye', {})
MQTT_CONF = config.get('mqtt', {})
UNITS = config.get('units', [])

tn = None
authenticated = False
zone_levels = {} 

SCENE_TO_CHAR = {str(i): (str(i) if i < 10 else chr(55 + i)) for i in range(17)}
STATUS_CHAR_TO_SCENE = {v: k for k, v in SCENE_TO_CHAR.items()}

def lutron_hex_to_pct(hex_str, zone_index):
    try:
        val = int(hex_str, 16)
        if val == zone_index: return None 
        return round((val / 127) * 100)
    except: return None

def publish_discovery():
    logger.info("πŸ“‘ [DISCOVERY] Starting publication...")
    for unit in UNITS:
        u_id = unit['id']
        u_name = unit.get('name', f"Grafik Eye {u_id}")
        base_topic = f"{MQTT_CONF['base_topic']}/unit_{u_id}"
        device_info = {"identifiers": [f"ge_unit_{u_id}"], "name": u_name, "manufacturer": "Lutron", "model": "Grafik Eye 3000", "sw_version": "v5.7"}

        # Scene Select Discovery
        unit_scenes = unit.get('scenes', [])
        if unit_scenes:
            scene_names = [s.get('name', str(s['id'])) for s in unit_scenes]
            client.publish(f"homeassistant/select/ge_u{u_id}/config", json.dumps({
                "name": f"{u_name} Scene", "unique_id": f"ge_u{u_id}_s",
                "command_topic": f"{base_topic}/scene/set", 
                "state_topic": f"{base_topic}/scene/state",
                "options": scene_names, "device": device_info
            }), retain=True)

        # Zone Sliders Discovery
        custom_zones = unit.get('zones', [])
        for z_id in range(1, 7):
            z_idx = z_id - 1
            z_name = custom_zones[z_idx] if z_idx < len(custom_zones) else f"Zone {z_id}"
            client.publish(f"homeassistant/number/ge_u{u_id}_z{z_id}/config", json.dumps({
                "name": f"{z_name}", "unique_id": f"ge_u{u_id}_z{z_id}",
                "command_topic": f"{base_topic}/zone_{z_id}/set", "state_topic": f"{base_topic}/zone_{z_id}/state",
                "min": 0, "max": 100, "step": 1, "unit_of_measurement": "%", "device": device_info
            }), retain=True)
    logger.info("πŸ“‘ [DISCOVERY] Complete")

def update_status():
    global tn, authenticated, zone_levels
    if not authenticated or tn is None: return
    
    try:
        # 1. STATUS SYNC USING :G (Handles Scene State)
        tn.read_very_eager()
        tn.write(b":G\r")
        time.sleep(0.6)
        g_resp = tn.read_very_eager().decode('ascii', errors='ignore').strip()
        
        if g_resp:
            g_match = re.search(r'~:ss\s+([0-9A-F]+)', g_resp, re.IGNORECASE)
            if g_match:
                scene_chars = g_match.group(1)
                for i, char in enumerate(scene_chars):
                    u_id = i + 1
                    if char.upper() == 'M': break
                    
                    s_id = STATUS_CHAR_TO_SCENE.get(char.upper(), "0")
                    unit_cfg = next((u for u in UNITS if u['id'] == u_id), None)
                    if unit_cfg:
                        # Find the Name for this Scene ID to update the HA dropdown
                        name_match = next((s for s in unit_cfg['scenes'] if str(s['id']) == s_id), None)
                        state_val = name_match['name'] if name_match else s_id
                        client.publish(f"{MQTT_CONF['base_topic']}/unit_{u_id}/scene/state", state_val, retain=True)

        # 2. ZONE LEVELS (:rzi)
        for unit in UNITS:
            u_id = unit['id']
            tn.write(f":rzi {u_id}\r".encode('ascii'))
            time.sleep(0.6)
            z_resp = tn.read_very_eager().decode('ascii', errors='ignore').strip()
            z_match = re.search(fr'~:zi\s+{u_id}\s+([0-9A-F\s]+)', z_resp, re.IGNORECASE)
            if z_match:
                ints = z_match.group(1).split()
                if u_id not in zone_levels: zone_levels[u_id] = {}
                for i, hex_val in enumerate(ints):
                    z_id = i + 1
                    if z_id <= 6:
                        pct = lutron_hex_to_pct(hex_val, i)
                        if pct is not None and zone_levels[u_id].get(z_id) != pct:
                            zone_levels[u_id][z_id] = pct
                            client.publish(f"{MQTT_CONF['base_topic']}/unit_{u_id}/zone_{z_id}/state", str(pct), retain=True)
    except Exception as e: logger.error(f"❌ [POLL] Error: {e}")

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        logger.info("βœ… [MQTT] Connected")
        publish_discovery()
        for unit in UNITS: client.subscribe(f"{MQTT_CONF['base_topic']}/unit_{unit['id']}/#")

def on_message(client, userdata, msg):
    global tn, zone_levels
    try:
        topic_parts = msg.topic.split('/')
        payload = msg.payload.decode().strip()
        if not authenticated or tn is None: return

        # Handle Slider (Zone) Control
        if 'zone_' in msg.topic and msg.topic.endswith('/set'):
            z_id, u_id = int(topic_parts[-2].split('_')[-1]), int(topic_parts[-3].split('_')[-1])
            target_val = int(float(payload))
            current_val = zone_levels.get(u_id, {}).get(z_id, 0)
            diff = target_val - current_val
            if abs(diff) < 3: return
            start, stop = (f":B{u_id}{z_id}\r", ":C\r") if diff > 0 else (f":D{u_id}{z_id}\r", ":E\r")
            duration = (abs(diff) / 100.0) * FADE_TIME
            tn.write(start.encode('ascii'))
            time.sleep(duration)
            tn.write(stop.encode('ascii'))
            time.sleep(0.5)
            update_status()

        # Handle Named Scene Selection
        elif msg.topic.endswith('/scene/set'):
            u_id = int(topic_parts[-3].split('_')[-1])
            unit_cfg = next((u for u in UNITS if u['id'] == u_id), None)
            if unit_cfg:
                scene_match = next((s for s in unit_cfg['scenes'] if s['name'] == payload), None)
                if scene_match:
                    s_id = str(scene_match['id'])
                    char = SCENE_TO_CHAR.get(s_id, '0')
                    logger.info(f"🎭 Setting Scene {payload} (ID {s_id}) on Unit {u_id}")
                    tn.write(f":A{char}{u_id}\r".encode('ascii'))
                    time.sleep(1.0)
                    update_status()
    except Exception as e: logger.error(f"MQTT Error: {e}")

def connect_telnet():
    global tn, authenticated
    try:
        tn = telnetlib.Telnet(GE_CONF['host'], GE_CONF['port'], timeout=5)
        tn.read_until(b"login:", timeout=2)
        tn.write(GE_CONF.get('user', 'nwk').encode('ascii') + b"\r")
        time.sleep(1.5)
        tn.read_very_eager()
        authenticated = True
        logger.info("🟒 [TELNET] Connected")
        return True
    except: return False

client = mqtt.Client() 
if MQTT_CONF.get('user'): client.username_pw_set(MQTT_CONF['user'], MQTT_CONF.get('password', ''))
client.on_connect, client.on_message = on_connect, on_message

if not connect_telnet(): authenticated = False
try:
    client.connect(MQTT_CONF['broker'], MQTT_CONF['port'], 60)
    client.loop_start()
except: logger.error("❌ [MQTT] Connection failed")

while True:
    if not authenticated:
        if connect_telnet(): publish_discovery(); update_status()
        else: time.sleep(30); continue
    update_status()
    time.sleep(5)