Custom integration using requests_html

I’ve been stuck for so long now, so i’m asking for help. I’m working on making a scraping sensor using requests_html as the site i’m scraping ha js content. My current working file is a bit of a mess, but I keep trying to change and move functions around to get it to scrape. Initially I tried to do it non-async, i get the html and can view it in the logs, but the command r.html.render(sleep=2) keeps resulting in RuntimeError: There is no current event loop in thread 'SyncWorker_4'. So now I’ve tried to changing it to be async, but it’s still the same line that fails, however now with a different error: RuntimeError: Blocking calls must be done in the executor or a separate thread; Use `await hass.async_add_executor_job()` at custom_components.euronext.sensor.py, line 129: await self._response.html.arender()
This is the code i’m currently working on, it’s a bit of a mess:

##############################
# configuration.yaml example #
##############################

# sensor:
  # - platform: euronext
    # funds:
      # - NO0010582984.DKGLBIX-WOMF
      # - NO0010337959.DIUSA-WOMF
      # - NO0010582976.DKNORIX-WOMF

from datetime import timedelta
import logging

from requests_html import AsyncHTMLSession
import voluptuous as vol

from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
    ATTR_ATTRIBUTION,
    CONF_SCAN_INTERVAL,
    CONF_CURRENCY
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity

_LOGGER = logging.getLogger(__name__)

ATTRIBUTION = "Data provided by Euronext"

CONF_FUNDS = 'funds'

DEFAULT_SCAN_INTERVAL = timedelta(minutes=1)


PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
    {
        vol.Required(CONF_FUNDS): vol.All(cv.ensure_list),
        vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL): cv.time_period
    }
)


# def scrape_data(input):
    # try:
        # _LOGGER.debug('scraping......')
        # session = requests_html.AsyncHTMLSession()
        # response = session.get('https://live.euronext.com/en/product/funds/'+input)
        # _LOGGER.debug(response.html.html)
        # response.html.render(sleep=2)
        # return response, session
    # except AttributeError:
        # print('AttributeError for %s, please check configuration', input)


async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
    '''Setup the Euronext sensor. '''
    _LOGGER.debug('setting up......')
    funds = config.get(CONF_FUNDS)
    unit_of_measurement = config.get(CONF_CURRENCY, 'kr')
    _LOGGER.info("List of funds: %s", funds)
    for fund in funds:
        _LOGGER.info("for-loop fund: %s", fund)
        try:
            _LOGGER.info('Configuring fund %s', fund)
            if ' ' not in fund:
                # session = AsyncHTMLSession()
                async_add_entities([EuronextSensor(fund, unit_of_measurement)], True)
            else:
                _LOGGER.error('Values for "fund:" can not contain spaces, found "%s"', fund)
        except ValueError:
            _LOGGER.error('Error loading fund %s, please check config', fund)

    _LOGGER.info('Setup of funds complete')


class EuronextSensor(Entity):
    '''Representation of the Euronext Fond sensor.'''
    def __init__(self, fund, unit_of_measurement):
        self._fund = fund
        self._asession = None
        self._state = None
        self._unique_id = None
        self._name = ""
        self._icon = "mdi:timer-sand-empty"
        self._unit_of_measurement = unit_of_measurement
        self._attrs = {ATTR_ATTRIBUTION: ATTRIBUTION}
        # self._response = None

    @property
    def name(self):
        '''Return the name of the sensor.'''
        return self._name

    @property
    def state(self):
        '''Return the state of the device.'''
        return self._state

    @property
    def unique_id(self):
        '''Return the unique ID.'''
        return self._unique_id

    @property
    def unit_of_measurement(self):
        '''Return the unit of measurement of this entity.'''
        return self._unit_of_measurement

    @property
    def state_attributes(self):
        '''Return the state attributes.'''
        return self._attrs


    @property
    def icon(self):
        '''Return icon to use based on preformance.'''
        return self._icon

    async def async_update(self):
        # _LOGGER.debug('updating......')
        _LOGGER.info('Requesting new data for %s', self._fund)
        
        _LOGGER.debug('scraping......')
        self._asession = AsyncHTMLSession()
        self._response = await self._asession.get('https://live.euronext.com/en/product/funds/'+self._fund)
        # _LOGGER.debug(self._response.html.html)
        # _LOGGER.debug('scraping......')
        # response = session.get('https://live.euronext.com/en/product/funds/'+input)
        # _LOGGER.debug(self._response.html.html)
        await self._response.html.arender(sleep=2)
        # self._response = scrape_data(self._fund)
        # _LOGGER.info('Data updated for fund %s (%s)', self._fund, self._response.html.find('h1', first=True).text)
        self._name = self._response.html.find('h1', first=True).text
        _LOGGER.debug(self._name)
        self._state = self._response.html.find('#header-instrument-price', first=True).text.replace(',', '.').replace(' ', '')
        self._unique_id = self._response.html.find('.enx-symbol-top-custom', first=True).text
        
        date = self._response.html.find('#fs_fund_nav_block tr:nth-of-type(3) td:nth-of-type(3)', first=True).text
        historical = self._response.html.find('#fs_fund_historical_prices_block table', first=True)
        historical = [[td.text for td in tr.find('td')[:-2]] for tr in historical.find('tr')][2:]
        self._attrs = {item[0]: item[1].replace(',', '.') + ' %' for item in historical if item[1] != '-'}
        self._attrs['Dato'] = f'{date[8:10]}.{date[5:7]}.{date[0:4]}'
        
        iconval = float(response.html.find('.data-24', first=True).text.replace(',', '.'))
        self._icon = 'mdi:trending-up' if iconval > 0 else 'mdi:trending-down' if iconval < 0 else 'mdi:trending-neutral'

I have no experience with this specific library, but have a small amount of experience with the concepts. I may not have terminology exactly right.

When you make code that can plug-in to another application, you want to be sure that when the app calls into your code your code doesn’t hold up the main application. One way this might be done is to run the main application in a separate thread from the plugins. If a plugin thread gets blocked, it doesn’t affect the main app.

Threading relies on the Operating System to share time between the different threads. The OS may at any moment pause one thread and swap it in for another, but for a single-threaded plugin the code doesn’t really have to be aware that it may be paused and resumed. This is your non-async function.

Another way you could do this is to have one thread run an event loop. This event loop can manage several coroutines. A coroutine is function written in a special way that allows it to co-exist with other functions on the same thread. Functions written in this special way are marked as async.

These async functions make use of special functions to ‘give up’ their time slot and allow other coroutines to run. An async function is one which declares it is aware that it must give up time and not rely on the OS threading to swap tasks.

They give up time by either yielding or awaiting. A yield simply says “check if any other coroutines want to run on the event loop then resume me.” An await starts a new co-routine to run the function you are awaiting, and says “I yield my time until this new co-routine finishes.”

This is why you cannot call an async function from a normal thread. There is no event loop to handle the yielding or awaiting that the function has declared it can do.


So, taking a look at your code.

# def scrape_data(input):
    # try:
        # _LOGGER.debug('scraping......')
        # session = requests_html.AsyncHTMLSession()
        # response = session.get('https://live.euronext.com/en/product/funds/'+input)
        # _LOGGER.debug(response.html.html)
        # response.html.render(sleep=2)
        # return response, session
    # except AttributeError:
        # print('AttributeError for %s, please check configuration', input)

Here your function is not marked async. You create an AsyncHTMLSession. Judging by the name, there is a pretty good chance that it is going to give you async functions, so you’re gonna need an event loop. If you want to call this from a normal thread, perhaps there is non-async HTMLSession object you could be creating instead?

        self._asession = AsyncHTMLSession()
        self._response = await self._asession.get('https://live.euronext.com/en/product/funds/'+self._fund)
        # _LOGGER.debug(self._response.html.html)
        # _LOGGER.debug('scraping......')
        # response = session.get('https://live.euronext.com/en/product/funds/'+input)
        # _LOGGER.debug(self._response.html.html)
        await self._response.html.arender(sleep=2)
        # self._response = scrape_data(self._fund)
        # _LOGGER.info('Data updated for fund %s (%s)', self._fund, self._response.html.find('h1', first=True).text)
        self._name = self._response.html.find('h1', first=True).text
        _LOGGER.debug(self._name)
        self._state = self._response.html.find('#header-instrument-price', first=True).text.replace(',', '.').replace(' ', '')
        self._unique_id = self._response.html.find('.enx-symbol-top-custom', first=True).text
        

Here we are within an async function, and… well, it looks okay to me. I would expect that error message if you called ‘await’ on a non-async function, but you called arender() from an AsyncHTMLSession. I can’t imagine why a name like that wouldn’t be an async function. So I can’t right off tell you why you are getting that error. But, what it is suggestion you do is use:

await hass.async_add_executor_job()

I’d bet that async_add_executor_job will take a non-async function, spin off a new thread to run the function, and then await that thread finishing. That is, it will call a non-async function (which you pass it) in a way that can be awaited. I would expect ‘arender’ to already be this way. But since it apparently is not, I believe it is suggesting you call

await hass.async_add_executor_job( self._response.html.arender )

Notice I passed in ‘arender’ and not ‘arender()’. I need to pass it the function to call. If I passed arender(), that would instead call the function and pass the result, not the function itself. Passing the function like this would work if you had no parameters to pass in, but since you want to pass in ‘sleep’ we need a workaround. There’s a few ways to do this. I’d try something like:

await hass.async_add_executor_job( lambda: self._response.html.arender(sleep=2) )

This (hopefully) works by essentially creating a new, nameless and parameter-less function (the lambda function) and returning that address. From within that a function you make the real call with the parameters. ‘async_add_executor_job’ gets to call a function with no parameters, you get to pass the parameters where it matters, everyone is happy.


I have no idea why your code isn’t working, but maybe my rambling will give you some insight to fix the problem.

Thank you for your response. I have the same perception about async_add_executor_job(), which is wierd as the .arender() is supposedly the async way of rendering the target, as opposed to .render() which is for non-async fuction calls. I also find it wierd (Maybe because of my lacking knowledge about async) that i get the RuntimeError: There is no current event loop in thread 'SyncWorker_4' when i try to do it synchronous. I mean, i shouldn’t have to deal with event loops unless it async, right?

Adding your suggestion of lambda to that equation get’s rid of that error, but it doesn’t seem to apply the sleep arguement. I have a _LOGGER.info(self._response.html.html) before and after that render function, and they show up in the log just 1 second aparat, even though i have the sleep argument set to 5 secs, and the html looks the same.

I have figured out ways of doing this differently, but I have not done it yet as I don’t like it and want to make this work. I also know that requests_html has to download the web browser, and maybe that get’s interrupted due to the integration failing. I don’t know if trying a different approach might be easier, such as replacing requests_html with selenium could be worth a shot, or maybe other ways. Perhaps if I can figure out how to use the drupal/ajax api call, as it seems that is how Euronext get it’s data.

I also find it wierd (Maybe because of my lacking knowledge about async) that i get the RuntimeError: There is no current event loop in thread 'SyncWorker_4' when i try to do it synchronous. I mean, i shouldn’t have to deal with event loops unless it async, right?

It might still be async. You created it from an AsyncHTMLSession object:

session = requests_html.AsyncHTMLSession()
response = session.get('https://live.euronext.com/en/product/funds/'+input)
response.html.render(sleep=2)

Since you had an AsyncHTMLSession, it is possible that everything that you get from that (the response, and then the HTML from the response) is async as well. If you used

session = requests_html.HTMLSession()

maybe everything that is created by session will now be synchronous? You’d think with names like ‘render’ and ‘arender’ that one would be async, one would not, but maybe it really just matters what kind of HTMLSession made it.

I have a _LOGGER.info(self._response.html.html) before and after that render function, and they show up in the log just 1 second aparat, even though i have the sleep argument set to 5 secs, and the html looks the same.

What if you logged something else besides self._response.html.html? Is it possible for it to provide HTML without rendering? I’m wondering if maybe this log statement is causing some behind-the-scenes rendering, which then causes the actual ‘render’ call to just nope out and say “I already did that”. If you just logged “TEST” would there be a proper delay at the wait?

Just to take the relevant documentation real quick:

>>> from requests_html import HTMLSession
>>> session = HTMLSession()
>>> r = session.get('https://python.org/')
>>> r.html.render()
>>> from requests_html import AsyncHTMLSession
>>> asession = AsyncHTMLSession()
>>> r = await asession.get('https://python.org/')
>>> await r.html.arender()

I did change the bits around when i changed from synchronous to async. I did set up a test environment and had it working in PyCharm, but I needed to add the sleep=2 argument to the render for it to fully render. That was on a few years old midrange laptop, i’m running the HA dev setup on a Pi 3B+, so I tried adding more time due to the weaker hardware.

I do receive the HTML, but only the empty template without the DOM elements filled in by JavaScript, that’s what the render function does. I think that putting arender within a lambda within the await hass.async_add_executor_job(), it awaits the hass-function to be executed, but not the nested content inside of it.

  • If I keep the full line with lambda, I get the next error which is related to the selector as the content has not been rendered.
  • If I keep hass.async_add_executor_job without lambda, i get TypeError: 'coroutine' object is not callable.
  • If I only keep await self._response.html.arender(sleep=5), i get RuntimeError: Blocking calls must be done in the executor or a separate thread; Use `await hass.async_add_executor_job()` at custom_components/euronext/sensor.py, line 149: await self._response.html.arender(sleep=5).

However, I noticed something intersting in that log: INFO (MainThread) [pyppeteer.chromium_downloader] Starting Chromium download. So that might be it, i need to let HA allow me to download chromium for this to work.

I have now once again gone back to sync, making sure it’s all “correct”, but I still get RuntimeError: There is no current event loop in thread 'SyncWorker_2'

Any luck with this? I’m trying to do the same thing and everything seems to be working async apart from the Chrome download.
Looking at the pyppeteer code it appears the chromium download process itself is not async, so this might be bricking the whole script.

Unfortunately no, I took a different approach with my custom_component and use the Morningstar source instead of Euronext. There i can use aiohttp instead of requests_html, but I don’t like it as that source is much slower to update. It helps me improve my Python skills though and keeps me going. I hope to come back to this project at a later stage and finalize it. I can’t remember from the top of my head, but there are ways to call non-async functions from an async component, but we might have to break down requests_html and run the pyppeteer functions directly…