I have an older Lutron GRX-CI lighting system that I would like to get integrated with HA. It is simple a telnet session where you can send commands to both set and read lighting zones and scenes. Trying to find an existing component that might make the work a little easier.
The Denon mediaplayer platform is using telnet.
Thanks fabaff. Will check them out.
@drizzay Andre, just came across this old thread of yours. While HA has a telnet integration, it doesnβt handle login or commands other than on/off. Consequently, for a hospitality client needing to control a large legacy Lutron GRAFIK Eye 4000 system, we were able to use an AppDaemon Home Assistant plug-in to run the Python code that responds to the changes to a HA drop-down helper listing the Lutron scenes, issues the appropriate telnet commands, and reports the status.
There are a lot of small hotels & large restaurants on Lutron GRAFIK Eye, and they may not think that it can be integrated with anything current like Home Assistant, but it can.
I actually cobbled something together that has worked for years. I used shell commands. I recenty asked chatGPT to write code for home assistant for the grapfik eye and it did it better but i havenβt implemented it yet.
lol.
import json
import telnetlib
import time
import paho.mqtt.client as mqtt
import logging
import sys
import warnings
import re
# --- CALIBRATION SETTING ---
FADE_TIME = 5.0
# ---------------------------
warnings.filterwarnings("ignore", message=".*telnetlib.*")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("π Script starting up...")
try:
with open('/data/options.json') as f:
config = json.load(f)
logger.info("β
Configuration loaded")
except Exception as e:
logger.error(f"β Options error: {e}")
sys.exit(1)
GE_CONF = config.get('grafik_eye', {})
MQTT_CONF = config.get('mqtt', {})
UNITS = config.get('units', [])
tn = None
authenticated = False
zone_levels = {}
SCENE_TO_CHAR = {str(i): (str(i) if i < 10 else chr(55 + i)) for i in range(17)}
STATUS_CHAR_TO_SCENE = {v: k for k, v in SCENE_TO_CHAR.items()}
def lutron_hex_to_pct(hex_str, zone_index):
try:
val = int(hex_str, 16)
if val == zone_index: return None
return round((val / 127) * 100)
except: return None
def publish_discovery():
logger.info("π‘ [DISCOVERY] Starting publication...")
for unit in UNITS:
u_id = unit['id']
u_name = unit.get('name', f"Grafik Eye {u_id}")
base_topic = f"{MQTT_CONF['base_topic']}/unit_{u_id}"
device_info = {"identifiers": [f"ge_unit_{u_id}"], "name": u_name, "manufacturer": "Lutron", "model": "Grafik Eye 3000", "sw_version": "v5.7"}
# Scene Select Discovery
unit_scenes = unit.get('scenes', [])
if unit_scenes:
scene_names = [s.get('name', str(s['id'])) for s in unit_scenes]
client.publish(f"homeassistant/select/ge_u{u_id}/config", json.dumps({
"name": f"{u_name} Scene", "unique_id": f"ge_u{u_id}_s",
"command_topic": f"{base_topic}/scene/set",
"state_topic": f"{base_topic}/scene/state",
"options": scene_names, "device": device_info
}), retain=True)
# Zone Sliders Discovery
custom_zones = unit.get('zones', [])
for z_id in range(1, 7):
z_idx = z_id - 1
z_name = custom_zones[z_idx] if z_idx < len(custom_zones) else f"Zone {z_id}"
client.publish(f"homeassistant/number/ge_u{u_id}_z{z_id}/config", json.dumps({
"name": f"{z_name}", "unique_id": f"ge_u{u_id}_z{z_id}",
"command_topic": f"{base_topic}/zone_{z_id}/set", "state_topic": f"{base_topic}/zone_{z_id}/state",
"min": 0, "max": 100, "step": 1, "unit_of_measurement": "%", "device": device_info
}), retain=True)
logger.info("π‘ [DISCOVERY] Complete")
def update_status():
global tn, authenticated, zone_levels
if not authenticated or tn is None: return
try:
# 1. STATUS SYNC USING :G (Handles Scene State)
tn.read_very_eager()
tn.write(b":G\r")
time.sleep(0.6)
g_resp = tn.read_very_eager().decode('ascii', errors='ignore').strip()
if g_resp:
g_match = re.search(r'~:ss\s+([0-9A-F]+)', g_resp, re.IGNORECASE)
if g_match:
scene_chars = g_match.group(1)
for i, char in enumerate(scene_chars):
u_id = i + 1
if char.upper() == 'M': break
s_id = STATUS_CHAR_TO_SCENE.get(char.upper(), "0")
unit_cfg = next((u for u in UNITS if u['id'] == u_id), None)
if unit_cfg:
# Find the Name for this Scene ID to update the HA dropdown
name_match = next((s for s in unit_cfg['scenes'] if str(s['id']) == s_id), None)
state_val = name_match['name'] if name_match else s_id
client.publish(f"{MQTT_CONF['base_topic']}/unit_{u_id}/scene/state", state_val, retain=True)
# 2. ZONE LEVELS (:rzi)
for unit in UNITS:
u_id = unit['id']
tn.write(f":rzi {u_id}\r".encode('ascii'))
time.sleep(0.6)
z_resp = tn.read_very_eager().decode('ascii', errors='ignore').strip()
z_match = re.search(fr'~:zi\s+{u_id}\s+([0-9A-F\s]+)', z_resp, re.IGNORECASE)
if z_match:
ints = z_match.group(1).split()
if u_id not in zone_levels: zone_levels[u_id] = {}
for i, hex_val in enumerate(ints):
z_id = i + 1
if z_id <= 6:
pct = lutron_hex_to_pct(hex_val, i)
if pct is not None and zone_levels[u_id].get(z_id) != pct:
zone_levels[u_id][z_id] = pct
client.publish(f"{MQTT_CONF['base_topic']}/unit_{u_id}/zone_{z_id}/state", str(pct), retain=True)
except Exception as e: logger.error(f"β [POLL] Error: {e}")
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("β
[MQTT] Connected")
publish_discovery()
for unit in UNITS: client.subscribe(f"{MQTT_CONF['base_topic']}/unit_{unit['id']}/#")
def on_message(client, userdata, msg):
global tn, zone_levels
try:
topic_parts = msg.topic.split('/')
payload = msg.payload.decode().strip()
if not authenticated or tn is None: return
# Handle Slider (Zone) Control
if 'zone_' in msg.topic and msg.topic.endswith('/set'):
z_id, u_id = int(topic_parts[-2].split('_')[-1]), int(topic_parts[-3].split('_')[-1])
target_val = int(float(payload))
current_val = zone_levels.get(u_id, {}).get(z_id, 0)
diff = target_val - current_val
if abs(diff) < 3: return
start, stop = (f":B{u_id}{z_id}\r", ":C\r") if diff > 0 else (f":D{u_id}{z_id}\r", ":E\r")
duration = (abs(diff) / 100.0) * FADE_TIME
tn.write(start.encode('ascii'))
time.sleep(duration)
tn.write(stop.encode('ascii'))
time.sleep(0.5)
update_status()
# Handle Named Scene Selection
elif msg.topic.endswith('/scene/set'):
u_id = int(topic_parts[-3].split('_')[-1])
unit_cfg = next((u for u in UNITS if u['id'] == u_id), None)
if unit_cfg:
scene_match = next((s for s in unit_cfg['scenes'] if s['name'] == payload), None)
if scene_match:
s_id = str(scene_match['id'])
char = SCENE_TO_CHAR.get(s_id, '0')
logger.info(f"π Setting Scene {payload} (ID {s_id}) on Unit {u_id}")
tn.write(f":A{char}{u_id}\r".encode('ascii'))
time.sleep(1.0)
update_status()
except Exception as e: logger.error(f"MQTT Error: {e}")
def connect_telnet():
global tn, authenticated
try:
tn = telnetlib.Telnet(GE_CONF['host'], GE_CONF['port'], timeout=5)
tn.read_until(b"login:", timeout=2)
tn.write(GE_CONF.get('user', 'nwk').encode('ascii') + b"\r")
time.sleep(1.5)
tn.read_very_eager()
authenticated = True
logger.info("π’ [TELNET] Connected")
return True
except: return False
client = mqtt.Client()
if MQTT_CONF.get('user'): client.username_pw_set(MQTT_CONF['user'], MQTT_CONF.get('password', ''))
client.on_connect, client.on_message = on_connect, on_message
if not connect_telnet(): authenticated = False
try:
client.connect(MQTT_CONF['broker'], MQTT_CONF['port'], 60)
client.loop_start()
except: logger.error("β [MQTT] Connection failed")
while True:
if not authenticated:
if connect_telnet(): publish_discovery(); update_status()
else: time.sleep(30); continue
update_status()
time.sleep(5)