IMAP Email Content - Character encoding

Hi all,
I was able to configure IMAP, however when the sensor reads messages body which contains some accented characters it messes with the message as below.

Email:
Tipo de Alarme: Perda de Vídeo
Alarme no Canal No.: 8
Horário do inicio do alarme(D/M/A H:M:S): 27/07/2017 21:36:01
Nome do dispositivo de alarme: MHDX

Sensor:
Tipo de Alarme: Perda de V=C3=ADdeo
Alarme no Canal No.: 8
Hor=C3=A1rio do inicio do alarme(D/M/A H:M:S): 27/07/2017 21:36:01
Nome do dispositivo de alarme: MHDX

There is any clue how to fix it?

Appreciate any help.

Solution I Created a custom sensor as copy from orginal imap_email_content.py

"""
Email sensor support.

For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.email/
"""
import logging
import datetime
import email
from collections import deque

import voluptuous as vol

from homeassistant.helpers.entity import Entity
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
    CONF_NAME, CONF_PORT, CONF_USERNAME, CONF_PASSWORD, CONF_VALUE_TEMPLATE,
    CONTENT_TYPE_TEXT_PLAIN, ATTR_DATE)
import homeassistant.helpers.config_validation as cv

"""custom_changes"""
import re
"""custom_changes"""
from unicodedata import normalize

_LOGGER = logging.getLogger(__name__)

CONF_SERVER = 'server'
CONF_SENDERS = 'senders'

ATTR_FROM = 'from'
ATTR_BODY = 'body'
ATTR_SUBJECT = 'subject'

DEFAULT_PORT = 993

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
    vol.Optional(CONF_NAME): cv.string,
    vol.Required(CONF_USERNAME): cv.string,
    vol.Required(CONF_PASSWORD): cv.string,
    vol.Required(CONF_SERVER): cv.string,
    vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
    vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})


def setup_platform(hass, config, add_devices, discovery_info=None):
    """Set up the Email sensor platform."""
    reader = EmailReader(
        config.get(CONF_USERNAME), config.get(CONF_PASSWORD),
        config.get(CONF_SERVER), config.get(CONF_PORT))

    value_template = config.get(CONF_VALUE_TEMPLATE)
    if value_template is not None:
        value_template.hass = hass
    sensor = EmailContentSensor(
        hass, reader, config.get(CONF_NAME) or config.get(CONF_USERNAME),
        config.get(CONF_SENDERS), value_template)

    if sensor.connected:
        add_devices([sensor])
    else:
        return False


class EmailReader(object):
    """A class to read emails from an IMAP server."""

    def __init__(self, user, password, server, port):
        """Initialize the Email Reader."""
        self._user = user
        self._password = password
        self._server = server
        self._port = port
        self._last_id = None
        self._unread_ids = deque([])
        self.connection = None

    def connect(self):
        """Login and setup the connection."""
        import imaplib
        try:
            self.connection = imaplib.IMAP4_SSL(self._server, self._port)
            self.connection.login(self._user, self._password)
            return True
        except imaplib.IMAP4.error:
            _LOGGER.error("Failed to login to %s", self._server)
            return False

    def _fetch_message(self, message_uid):
        """Get an email message from a message id."""
        _, message_data = self.connection.uid(
            'fetch', message_uid, '(RFC822)')

        raw_email = message_data[0][1]
        email_message = email.message_from_bytes(raw_email)
        return email_message

    def read_next(self):
        """Read the next email from the email server."""
        import imaplib
        try:
            self.connection.select()

            if not self._unread_ids:
                search = "SINCE {0:%d-%b-%Y}".format(datetime.date.today())
                if self._last_id is not None:
                    search = "UID {}:*".format(self._last_id)

                _, data = self.connection.uid("search", None, search)
                self._unread_ids = deque(data[0].split())

            while self._unread_ids:
                message_uid = self._unread_ids.popleft()
                if self._last_id is None or int(message_uid) > self._last_id:
                    self._last_id = int(message_uid)
                    return self._fetch_message(message_uid)

        except imaplib.IMAP4.error:
            _LOGGER.info(
                "Connection to %s lost, attempting to reconnect", self._server)
            try:
                self.connect()
            except imaplib.IMAP4.error:
                _LOGGER.error("Failed to reconnect")


class EmailContentSensor(Entity):
    """Representation of an EMail sensor."""

    def __init__(self, hass, email_reader, name, allowed_senders,
                 value_template):
        """Initialize the sensor."""
        self.hass = hass
        self._email_reader = email_reader
        self._name = name
        self._allowed_senders = [sender.upper() for sender in allowed_senders]
        self._value_template = value_template
        self._last_id = None
        self._message = None
        self._state_attributes = None
        self.connected = self._email_reader.connect()

    @property
    def name(self):
        """Return the name of the sensor."""
        return self._name

    @property
    def state(self):
        """Return the current email state."""
        return self._message

    @property
    def device_state_attributes(self):
        """Return other state attributes for the message."""
        return self._state_attributes

    def render_template(self, email_message):
        """Render the message template."""
        variables = {
            ATTR_FROM: EmailContentSensor.get_msg_sender(email_message),
            ATTR_SUBJECT: EmailContentSensor.get_msg_subject(email_message),
            ATTR_DATE: email_message['Date'],
            ATTR_BODY: EmailContentSensor.get_msg_text(email_message)
        }
        return self._value_template.render(variables)

    def sender_allowed(self, email_message):
        """Check if the sender is in the allowed senders list."""
        return EmailContentSensor.get_msg_sender(email_message).upper() in (
            sender for sender in self._allowed_senders)

    @staticmethod
    def get_msg_sender(email_message):
        """Get the parsed message sender from the email."""
        return str(email.utils.parseaddr(email_message['From'])[1])

    @staticmethod
    def get_msg_subject(email_message):
        """Decode the message subject."""
        decoded_header = email.header.decode_header(email_message['Subject'])
        header = email.header.make_header(decoded_header)
        return str(header)

    @staticmethod
    def get_msg_text(email_message):
        """
        Get the message text from the email.

        Will look for text/plain or use text/html if not found.
        """
        message_text = None
        message_html = None
        message_untyped_text = None

        for part in email_message.walk():
            """custom_changes"""
            charset = part.get_content_charset()
            """---------"""
            if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN:
                if message_text is None:
                    """custom_changes """
                    message_text = str(part.get_payload(decode=True), str(charset), "ignore").encode('utf-8', 'replace')
                    """message_text = part.get_payload()   --custom_changes  disabled"""
            elif part.get_content_type() == 'text/html':
                if message_html is None:
                    message_html = part.get_payload()
            elif part.get_content_type().startswith('text'):
                if message_untyped_text is None:
                    """custom_changes """
                    message_text = str(part.get_payload(decode=True), str(charset), "ignore").encode('utf-8', 'replace')
                    """message_untyped_text = part.get_payload() --custom_changes  disabled"""

        if message_text is not None:
            """custom_changes  -- remove newlines and break lines"""
            message_text = re.sub(r"\r\n", " ", message_text.decode('utf-8'))
            """custom_changes -- remove special charact"""
            message_text = normalize('NFKD', message_text).encode('ASCII','ignore').decode('utf-8')
            return message_text

        if message_html is not None:
            return message_html

        if message_untyped_text is not None:
            """custom_changes  -- remove newlines and break lines"""
            message_untyped_text = re.sub(r"\r\n", " ", message_untyped_text.decode('utf-8'))
            """custom_changes -- remove special charact"""
            message_untyped_text = normalize('NFKD', message_untyped_text).encode('ASCII','ignore').decode('utf-8')
            return message_untyped_text

        return email_message.get_payload()

    def update(self):
        """Read emails and publish state change."""
        email_message = self._email_reader.read_next()

        if email_message is None:
            return

        if self.sender_allowed(email_message):
            message_body = EmailContentSensor.get_msg_text(email_message)

            if self._value_template is not None:
                message_body = self.render_template(email_message)

            self._message = message_body
            self._state_attributes = {
                ATTR_FROM:
                    EmailContentSensor.get_msg_sender(email_message),
                ATTR_SUBJECT:
                    EmailContentSensor.get_msg_subject(email_message),
                ATTR_DATE:
                    email_message['Date']
            }
1 Like

hi, this is really interesting
in my case the body email was base64 decoded, but i dont know how to modify it?

i did
import base64

then changed
message_html = part.get_payload()
TO
message_html = base64.b64decode(part.get_payload())

didnt work

also tried changing

if message_html is not None:
return message_html

TO:
if message_html is not None:
message.html = base64.b64decode(message_html)
return message_html

not working either

it should be simple , but i am doing something wrong, i am not a coder , but probably i need to change a string or something? i just dont know where :slight_smile:
thnx

ok, got it :slight_smile:

needed this line to change :slight_smile:

return base64.b64decode(message_text).decode(‘utf-8’)

done

1 Like

Great! Glad it is working for you.

Hy Fabio ,

Please give more info like were to put the code , i`m at the point were i have this :slight_smile:

from: [email protected]
subject: Test
date: Tue, 11 May 2021 15:42:08 +0200
body: QWxhcm0gRXZlbnQ6IFRlc3QNCkFsYXJtIElucHV0IENoYW5uZWwgTm8uOiANCkFsYXJtIElucHV0
IENoYW5uZWwgTmFtZTogDQpBbGFybSBTdGFydCBUaW1lIChEL00vWSBIOk06Uyk6IDExLzA1LzIw
MjEgMTU6NDI6MDgNCkFsYXJtIERldmljZSBOYW1lOiBYVlINCkFsYXJtIE5hbWU6IA0KU2VuZGVy
IElQIEFkZHJlc3M6IDE5Mi4xNjguMC4xMCAoMDowOjA6OjApDQo=

friendly_name: nvr_motion

i dont use it anymore :slight_smile: , my alarm panel has now an integration…
but you need to load the imap sensor as a custom component, then you can change the files you want

download this folder : core/homeassistant/components/imap_email_content at master · home-assistant/core · GitHub

then place the “imap_email_content” folder in your “config\custom_components” folder, then it will be loaded as a custom component
sensor.py file is the one you can change

Thank you , now one more question i hope :slight_smile: , witch line did you changed ?

“needed this line to change :slight_smile:

no idea anymore, was 2 years ago :slight_smile:
if i read my earlier replys, probably this line 230

return message_text

to

return base64.b64decode(message_text).decode(‘utf-8’)