WiFi PowerPet Door

With apologies in advance for waking up this old thread, has anyone created a non-MQTT HA integration based on the above API information? I have just begun the migration from another ecosystem to HA, and would like to avoid dealing with MQTT because my learning curve is steep enough right now with just the HA basics.

FWIW, my use case is to disable the inside sensor when my lightning detector senses an incoming thunderstorm, and to trigger the outside lights to turn on after dark when the door opens…

1 Like

I would like the ability to control this via HA also. Any interest in funding this project? These doors are not cheap and I’m willing to throw down a some money to get it integrated

2 Likes

I slapped together a simple Q&D shell script to poll the door status and control some of the functions from HA. Not something that I would share, but good enough to turn on the outside light (most of the time!) through an automation when Sir Barxalot lets himself out after dark.

As I become more comfortable/familiar with the HA environment, I may take a stab at writing this properly in Python at some point in the future…

Sorry. I appear to have missed these posts and questions. Here is the code as requested. Its VERY messy but has been running for over a year without issue. You will need to add your MQTT info and create MQTT sensors/buttons in HA to control it.

import socket
from queue import Queue
from threading import Thread
import time
import json
from datetime import datetime
import paho.mqtt.client as mqtt

def connectMQTT():
    server="FIXME
    t = 1833
    user = "FIXME
    passwd = "FIXME
    rc=-1
    while rc!=0:
        client = mqtt.Client()
        client.username_pw_set(user,passwd)
        client.on_message=on_message
        client.on_disconnect=on_disconnect
        client.on_connect=on_connect
        client.connect(server)
        test=client.publish("foo/bar","test")
        rc = test.rc
        log("rc="+str(test.rc))
        log("mid="+str(test.mid))
        log("published="+str(test.is_published()))
        if rc !=0:
          log("Unable to publish. Retrying in 5 seconds....")
          time.sleep(5)
        else:
          log("Connected!")
    return client


def pp(c,topic,payload):
    rc=-1
    fails=0
    totalfails=0
    while rc!=0:
        if fails > 5:
            fails=0
            c=connectMQTT()
        test=c.publish(topic,payload)
        rc = test.rc
        if rc!=0:
            log("rc="+str(test.rc))
            log("mid="+str(test.mid))
            log("published="+str(test.is_published()))
            log("Unable to publish. Retrying in 1 seconds....")
            time.sleep(1)
            fails=fails+1
            totalfails=totalfails+1
        else:
            log(topic)
            log(payload)
    return c


def on_connect(client, userdata, flags, rc):
    client.subscribe("dog/cmd")
    log("Subscribing...")


def on_disconnect(client, userdata, rc):
    if rc != 0:
        log("Unexpected MQTT disconnection. Will auto-reconnect")

def on_message(client, userdata, message):
    topics = str(message.topic).strip().split("/")
    if topics[0]=="dog":
        if topics[1]=="cmd":
            cmd=message.payload.decode("utf-8")
            add_command(q,cmd)
            return 0


def log(obj):
    now = datetime.now()
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
    print(dt_string + " - " + str(obj))


def add_command(q, cmd, safe=True):
    shortcuts= {'DISABLE_INSIDE':'{"config":"DISABLE_INSIDE","msgId":759,"dir":"p2d"}',
    'ENABLE_INSIDE':'{"config":"ENABLE_INSIDE","msgId":806,"dir":"p2d"}',
    'DISABLE_OUTSIDE':'{"config":"DISABLE_OUTSIDE","msgId":809,"dir":"p2d"}',
    'ENABLE_OUTSIDE':'{"config":"ENABLE_OUTSIDE","msgId":810,"dir":"p2d"}',
    'OPEN_AND_HOLD':'{"cmd":"OPEN_AND_HOLD","msgId":812,"dir":"p2d"}',
    'HOLD':'{"cmd":"OPEN_AND_HOLD","msgId":812,"dir":"p2d"}',
    'OPEN':'{"cmd":"OPEN","msgId":799,"dir":"p2d"}',
    'CLOSE':'{"cmd":"CLOSE","msgId":815,"dir":"p2d"}',
    'POWER_ON':'{"config":"POWER_ON","msgId":820,"dir":"p2d"}',
    'POWER_OFF':'{"config":"POWER_OFF","msgId":818,"dir":"p2d"}',
    'GET_DOOR_STATUS':'{"config":"GET_DOOR_STATUS","msgId":754,"dir":"p2d"}',
    'STATUS':'{"config":"GET_DOOR_STATUS","msgId":754,"dir":"p2d"}',
    'GET_SETTINGS':'{"config":"GET_SETTINGS","msgId":752,"dir":"p2d"}',
    'SETTINGS':'{"config":"GET_SETTINGS","msgId":752,"dir":"p2d"}',
    'RESET':'RESET'}
    log("Incoming Command - " + cmd)
    if cmd.upper() in shortcuts:
        cmd2=shortcuts[cmd.upper()]
        q.put(cmd2)
        if cmd.upper().startswith("ENABLE") or cmd.upper().startswith("DISABLE") or cmd.upper().startswith("POWER"):
            q.put('{"config":"GET_SETTINGS","msgId":752,"dir":"p2d"}')
    elif not safe and cmd:
        if cmd != "":
            q.put(cmd)


def commands(q):#, event):
    while True:
        cmd = input("")
        add_command(q, cmd, False)



def logger(d,mqtt_client):
    door_status=""
    door_raw={}
    settings={}
    door_status_old=None
    door_raw_old=None
    settings_old=None
    ping=time.time()
    healthy=False
    healthy_old=None
    read_delay_seconds=.1
    ping_heath_seconds=30
    refresh_time=300
    next_refresh_time=time.time()+refresh_time
    while True:
        if not d.empty():
            telem=d.get()
            #log(telem)
            try:
                telem_obj = json.loads(telem)
                if "door_status" in telem_obj:
                    door_status = telem_obj['door_status']
                    door_raw = telem_obj
                    if door_status != door_status_old or json.dumps(door_raw) != json.dumps(door_raw_old):
                        log("Door = " + str(door_status))
                        mqtt_client = pp(mqtt_client,("dog/door"),door_status)
                        mqtt_client = pp(mqtt_client,("dog/door_detail"),json.dumps(door_raw))
                        next_refresh_time=time.time()+refresh_time
                    door_status_old=door_status
                    door_raw_old=door_raw
                    healthy=True
                if "settings" in telem_obj:
                    settings = telem_obj['settings']
                    if json.dumps(settings) != json.dumps(settings_old):
                        log("Settings = " + str(settings))
                        mqtt_client = pp(mqtt_client,("dog/door_settings"),json.dumps(settings))
                        next_refresh_time=time.time()+refresh_time
                    settings_old=settings
                    healthy=True
                if "PONG" in telem_obj:
                    ping = int(telem_obj['PONG'])
                    mqtt_client = pp(mqtt_client,("dog/door_ping"),str(ping))
                    healthy=True
                if time.time() > next_refresh_time:
                    log("Refresh MQTT data...")
                    log("Door = " + str(door_status))
                    log("Settings = " + str(settings))
                    mqtt_client = pp(mqtt_client,("dog/door"),door_status_old)
                    mqtt_client = pp(mqtt_client,("dog/door_detail"),json.dumps(door_raw_old))
                    mqtt_client = pp(mqtt_client,("dog/door_settings"),json.dumps(settings_old))
            except:
                telem_obj={}
        else:
            time.sleep(read_delay_seconds)




def broker_connection(q,d,TCP_IP,TCP_PORT,mqtt_client):#, event):
    connected=False
    encoding='utf-8'
    lf=b"\n"
    ping_seconds=10
    door_seconds=5
    setting_seconds=30
    error_seconds=3
    read_delay_seconds=.1
    while True:
        try:
            s.close()
        except:
            i=0
        log("Connecting...")
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except:
            connected=False
        connected=True
        s.settimeout(1)
        s.connect((TCP_IP, TCP_PORT))
        s.setblocking(0)
        for c in ('{"cmd":"CANCEL_FIRMWARE_UPDATE","msgId":797,"dir":"p2d"}', '{"config":"GET_DOOR_STATUS","msgId":754,"dir":"p2d"}', '{"config":"GET_SETTINGS","msgId":752,"dir":"p2d"}'):
            try:
                sent=s.send(c.encode(encoding)+lf)
            except:
                sent=0
            if sent==0:
                connected=False
            time.sleep(.75)
        i=0
        nextPing = time.time()+ping_seconds
        nextDoor = time.time()+door_seconds
        nextSetting = time.time()+setting_seconds
        while connected:
            i=i+1
            full_msg = ""
            while True:
                try:
                    msg = s.recv(1).decode("utf-8")
                except:
                    msg=""
                if len(msg) <= 0:
                    break
                full_msg += msg
            if len(full_msg) > 0:
                for data in full_msg.replace("}{","}±{").split("±"):
                    d.put(data)
            else:
                time.sleep(read_delay_seconds)
            if not q.empty():
                cmd = q.get()
                if cmd=="RESET":
                    connected=False
                else:
                    try:
                        log("Sending - "+cmd)
                        sent=s.send(cmd.encode(encoding)+lf)
                        log("Sent - "+cmd)
                    except:
                        sent=0
                    if sent==0:
                        connected=False
            elif time.time()>nextDoor:
                nextDoor = time.time()+door_seconds
                door='{"config":"GET_DOOR_STATUS","msgId":754,"dir":"p2d"}'
                try:
                    sent=s.send(door.encode(encoding)+lf)
                except:
                    sent=0
                if sent==0:
                    connected=False
            elif time.time()>nextSetting:
                nextSetting = time.time()+setting_seconds
                sett='{"config":"GET_SETTINGS","msgId":752,"dir":"p2d"}'
                try:
                    sent=s.send(sett.encode(encoding)+lf)
                except:
                    sent=0
                if sent==0:
                    connected=False
            elif time.time()>nextPing:
                nextPing = time.time()+ping_seconds
                ping='{"PING":"'+str(round(time.time()*1000))+'","msgId":764,"dir":"p2d"}'
                try:
                    sent=s.send(ping.encode(encoding)+lf)
                except:
                    sent=0
                if sent==0:
                    connected=False
        time.sleep(error_seconds)



def dog_door():
    mqtt_client=connectMQTT()
    mqtt_client.loop_start()
    myBoker = Thread(target=broker_connection, args=(q,d,'FIXME,3000,mqtt_client))
    myBoker.setDaemon(True)
    myBoker.start()
    myLogger = Thread(target=logger, args=(d,mqtt_client,))
    myLogger.setDaemon(True)
    myLogger.start()
    #commands(q) #Uncomment this to have interactive from the cmdline
    while True:
        time.sleep(1)


q = Queue()
d = Queue()
if __name__ == "__main__":
    dog_door()
3 Likes

Thanks for doing the pcap and sharing your MQTT script! I just installed my hitecpet door today, and checking on HA was one of my first steps.

I have the zwavejs2mqtt installed, but I’m not currently using MQTT for anything. I’d rather use a non-MQTT option unless I have to. I’m not sure what would be involved in porting your work?

So basically the Python above is acting as a connector to the JSON serial(really telnet) commands that the door uses. Possible approaches are:

  • HiTecPet <—> Native HA/HACS integration
  • HiTecPet <—> MQTT/(Python Code Above) <—> HA MQTT Sensor / MQTT Switch
  • HiTecPet <—> Local Python Script <—> HA Command Line Sensor / Command Line Switch
    • You would just rewrite the above to take arguments on the commandline and do your bidding/report status.

I don’t have zwave in my setup but the idea is certainly not something you should give up on. There is a thriving zwave community here that would be able to better direct you.

Another option would be some sort of generic telnet bridge. I would not be surprised if someone somewhere has already solved that - you could start with Telnet Switch and also look at the work here.

Hope that helps!

1 Like

I did mine as a simple/ugly shell script and command line sensors + switches. Skimming the Python code from @int-rnd above, it appears that both of us are polling the door for status every 3-5 seconds versus keeping an open socket and listening for status changes as they happen. Our OCD psycho dog can get through the door so fast when he’s in full knucklehead mode where it completely misses the polling interval to detect the door being opened.

IMHO, the proper way to do this is with a non-polling listener as a native HA/HACS integration. I have developed add-on code for another huge OSS project that reads live UDP network broadcasts in Python that’s being used by thousands of people around the world, and setting up a TCP socket listener for the pet door seems trivial. I just need time to look at the HA developer docs, to see if/how it can be done as a listener and not as the more common polled sensor…

Here is my attempt.

It’s a native integration (no MQTT). It’s also my first attempt at writing an integration for Home Assistant.
However, it DOES work for me. (I can open and close the door, and see it’s status, enable and disable the inside and outside sensors and auto mode and power it off and on).

3 Likes

I bought our non-WiFi HiTec door just before they released the wifi one, so the only smart control I have for it is that I have it plugged into an HA controlled outlet, so I can basically just turn it on and turn it off.

Can anyone tell me what the WiFi door is using for network connectivity? Maybe they just hung an ESP8266 or variant off the original board, and I could do the same…

I’m pretty sure that it is just an ESP32 or ESP8266 of some sort, because it is always powered on even when the door’s power button is toggled off.

It may be worth contacting High Tech Pet and asking if they will sell you the parts. They do sell repair parts for almost everything in the door, including the electronics and microphones/harnesses…

It may be worth contacting High Tech Pet and asking if they will sell you the parts. They do sell repair parts for almost everything in the door, including the electronics and microphones/harnesses…

I did just that. $199.99 for the circuit board for the Wifi version. I told them that was ridiculous and I’m going to try to stick an ESP32 on a spare non-Wifi board I have and if successful, provide the ESP32 modules and instructions to anyone who wants to upgrade their old version.

1 Like

Did you get a link or a part number for that? I’m here trying to convert my old (2019) edition to modern wifi specifically so that I can integrate with HA.

The tech support person who responded to my inquiry only said that I would need to call in to order one of the wifi enabled circuit boards for $199.99. :expressionless:

1 Like

did you have that into a cheaper dog door? care to share?

I’m unfortunately getting an instantaneous error when trying to configure my door. The plugin tells me “Could not connect to remote host” but it’s so instant I’m not even sure I believe that. HA has a “ping” virtual device that I installed and it pings the door no problem.

I wonder if there’s a manual way to config the door via YAML? Or maybe someone just knows what’s going wrong with my setup workflow? I don’t see anything obvious in the logs but I don’t know if I’m looking in all the right places.

The only suggestion that I can make is to telnet to port 3000 of the door’s IP address, to confirm that it answers and that you are using the correct IP address…

Yeah that works fine (unfortunately?). I’m also very careful to terminate the session cleanly before trying to run my integration, as these doors are apparently very temperamental about only having 1 client connected.

I’m thinking something is broken with the config workflow and I’m hoping someone else here can share how to manually configure in YAML files?

1 Like

Is this still an active functioning product? I need to replace our current dumb dog door, and think that even though the high tech door is a bit pricey if it can integrate to HA that would be amazing for our home.

Thank you!

@corporategoth’s driver is actively being maintained, and AFAIK High Tech Pet is still in business. They are also still selling repair parts on their web site.

My only tip is to look at YouTube for “hacks” to make standard coin cell batteries work in the collar ultrasonic transmitter, because their proprietary replacement batteries are pricey…

Thank you for your reply, I was considering their rechargeable collar that is now available.