Samsung AC

I’ve made a bit easier solution for everybody who has an older model (2878).

I have mocked up a script (with the use of ChatGPT using the js script from one of your sources) to get the token from an old version (2878) AC.

It can be used from a HA terminal like python3 .\ac_2878_get_token.py 192.168.1.100 , just need to include in the same folder the ac14k_m.pem file.

The pem file is here.

For controlling the AC the following integration can be used (you can find it in HACS) once you have the token.

And this is the script to get the Token, just save it as ac_2878_get_token.py.

import socket
import ssl
import logging
import re
import argparse

class SamsungAirconditioner:
    def __init__(self, ip, logger):
        self.options = {'ip': ip}
        self.logger = logger
        self.token = None

    def get_token(self, callback):
        if not callable(callback):
            raise ValueError("callback is mandatory for get_token")
        # Setup SSL context with TLSv1 and AES256-SHA cipher
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)  # Force TLSv1
        context.set_ciphers('ALL:@SECLEVEL=0')
        
        # Load the certificate you trust (cert.pem)
        context.load_cert_chain(certfile='ac14k_m.pem')
        context.check_hostname = False  # Disable hostname verification for self-signed certs
        context.verify_mode = ssl.CERT_NONE  # Optional certificate verification (allow self-signed)

        try:
            with socket.create_connection((self.options['ip'], 2878)) as raw_socket:
                with context.wrap_socket(raw_socket, server_hostname=self.options['ip']) as tls_socket:
                    self.logger.info('connected', {'ipaddr': self.options['ip'], 'port': 2878, 'tls': True})

                    # Receive loop to handle messages line-by-line
                    buffer = ""
                    while True:
                        data = tls_socket.recv(1024).decode('utf-8')
                        if not data:
                            if not self.token:
                                callback(Exception('premature eof'))
                            break

                        buffer += data
                        while '\n' in buffer:
                            line, buffer = buffer.split('\n', 1)
                            self.logger.debug('read: %s', line.strip())

                            # Handle specific responses
                            if line.strip() == 'DRC-1.00':
                                continue
                            if line.strip() == '<?xml version="1.0" encoding="utf-8" ?><Update Type="InvalidateAccount"/>':
                                tls_socket.sendall(b'<Request Type="GetToken" />\r\n')
                                continue
                            if line.strip() == '<?xml version="1.0" encoding="utf-8" ?><Response Type="GetToken" Status="Ready"/>':
                                self.logger.info('waiting for token - turn on the AC in the next 30 seconds')
                                continue
                            if line.strip() == '<?xml version="1.0" encoding="utf-8" ?><Response Status="Fail" Type="Authenticate" ErrorCode="301" />':
                                callback(Exception('Failed authentication'))
                                return

                            # Check for token in response
                            token_match = re.search(r'Token="(.*?)"', line)
                            if token_match:
                                self.token = token_match.group(1)
                                self.logger.info('authenticated')
                                callback(None, self.token)
                                return

                            # Handle status updates
                            if 'Update Type="Status"' in line:
                                state_match = re.search(r'Attr ID="(.*?)" Value="(.*?)"', line)
                                if state_match:
                                    state = {state_match.group(1): state_match.group(2)}
                                    self.logger.info('stateChange', state)
                                    continue

                            # Handle device state
                            if 'Response Type="DeviceState" Status="Okay"' in line:
                                state = {}
                                attributes = line.split("><")
                                for attr in attributes:
                                    attr_match = re.search(r'Attr ID="(.*?)" Type=".*?" Value="(.*?)"', attr)
                                    if attr_match:
                                        state[attr_match.group(1)] = attr_match.group(2)
                                self.logger.info('stateChange', state)

        except (socket.error, ssl.SSLError) as e:
            if not self.token:
                callback(e)


# Set up a basic logger
logger = logging.getLogger('SamsungAirconditioner')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

# Define the callback function to handle the response from get_token
def token_callback(error, token=None):
    if error:
        print("Error: {}".format(error))
    else:
        print("Retrieved token: {}".format(token))

# Set up argument parser for command-line input
parser = argparse.ArgumentParser(description="Samsung Air Conditioner Token Retrieval")
parser.add_argument('ip', type=str, help='IP address of the Samsung Air Conditioner')
args = parser.parse_args()

# Create an instance of the SamsungAirconditioner class with the provided IP
aircon = SamsungAirconditioner(ip=args.ip, logger=logger)

# Call the get_token method, passing the token_callback function
aircon.get_token(token_callback)

@wifi75, @MatAlpi, @sebdoan, @peterbuga, @ventudan