Sending KNX telegrams via appdaemon

Sad, I am so close to finishing this. What I do not understand in the integration where to look for is the lines of code which connect the KNX bus with xknx and travel calculator.

My appdaemon code is pretty straight forward. I have put in the cover element and KNX address. However not the KNX actor is handling this, but the app daemon code.

Let me share what I have so far for others to make use of it. Perhaps we can make this happen. There must be somewhere an example of a successful usage of the ´knx.send´ service from within appdaemon.

Here is the change in the knx section

knx:
  ...
  expose:
    - type: 'percentU8'
      entity_id: cover.arbeiten
      attribute: position
      address: '1/3/8'
  ...
  fire_event: true
  fire_event_filter:
    - "1/3/1-6"     # Rolladen Ergeschoss Lang/Kurz Objekte
    - "2/3/1-4"     # Rolladen Obergeschoss Lang/Kurz Objekte
    - "1/3/7-12"    # Rolladen Ergeschoss Status Objekte
    - "2/3/9-12"    # Rolladen Obergeschoss Status Objekte

Here is the knx cover item

  - platform: knx
    name: "Arbeiten"
    move_short_address: '1/3/1'
    move_long_address: '1/3/2'
    position_address: '1/3/7'       # My actor has now position adress.
    position_state_address: '1/3/8' # Same here. Both of these will be taken care by app daemon
    travelling_time_down: 12
    travelling_time_up: 12

Here is my appdaemon app in file cover_update_position.py

import appdaemon.plugins.hass.hassapi as hass
import json
from datetime import datetime, timedelta

class CoverUpdatePosition(hass.Hass):

    def initialize(self):
        self.listen_event(self.on_state_change, "knx_event", address = self.args["move_long_address"])
        self.listen_event(self.on_stop,         "knx_event", address = self.args["move_short_address"])
        self.listen_event(self.on_set_position, "knx_event", address = self.args["position_address"])

        self.currentPosition     = 1.0                # 0.0 to 1.0 representing 1.0 is top, 0.0 is botton
        self.timestampLastChange = self.datetime()    # ts in milliseconds of last Motion
        self.directionLastMotion = 0                  # Direction -1 = down; +1 = up

    # The following 5 routines take care of all updating the self.currentPosition by
    # just listening to the STOP/UP/DOWN command on the knx bus

    def on_state_change(self, event_name, data, *kwargs):
        self.update_position()
        self.directionLastMotion = -1 if data['data'] == 1 else +1 
        self.handle = self.run_in(self.timeout_handler, self.args["travelling_time"] + 2)

    def on_stop(self, event_name, data, *kwargs):
        self.update_position()
        self.directionLastMotion = 0 

    def timeout_handler(self, *kwargs):
        self.update_position()
        self.directionLastMotion = 0

    def update_position(self):
        self.currentPosition = (
            max(
                min(
                    1.0, 
                    self.currentPosition + 
                        self.directionLastMotion * 
                        (self.datetime() - self.timestampLastChange).total_seconds() / 
                        self.args["travelling_time"]
                ), 
                0.0)
            )
        self.timestampLastChange = self.datetime()
        self.log('Updating position to %f ' % (self.currentPosition))
        self.cover_set_position()

    # Here the newly calculated position is sent to the KNX position adress.
    # the actuall knx.send service is not working yet

    def cover_set_position(self, *kwargs):
        data = {"address": self.args["position_address"], "data": [int( 255.0 * self.currentPosition )]}
        self.log("setting Position %f" %(self.currentPosition))
        # self.call_service("knx/send", data = data)

    # The following 4 routines take care for setting the position to be translated
    # into proper UP/DOWN/STOP commands to respect the requried changes.

    def on_set_position(self, event_name, data, *kwargs):
        desiredPosition  = data['data'][0] / 255
        desiredDirection = +1 if desiredPosition - self.currentPosition > 0 else -1
        desiredTime      = abs(desiredPosition - self.currentPosition) * self.args["travelling_time"]
        self.log('sending SET notification with (from %f to %f direction %d time %f)' % (self.currentPosition, desiredPosition, desiredDirection, desiredTime))
        self.cover_down() if desiredDirection == -1 else self.cover_up()
        self.handle = self.run_in(self.cover_stop, desiredTime)

    # Here the UP/DOWN/STOP commands is sent to the KNX position adress.
    # the actuall knx.send service is not working yet

    def cover_stop(self, *kwargs):
        self.log('send STOP')
        data = {"address": self.args["move_short_address"], "data": [1]}
        # self.call_service("knx/send", data = data)

    def cover_up(self, *kwargs):
        self.log('send UP')
        data = {"address": self.args["move_long_address"], "data": [0]}
        # self.call_service("knx/send", data = data)

    def cover_down(self, *kwargs):
        self.log('send DOWN')
        data = {"address": self.args["move_long_address"], "data": [1]}
        # self.call_service("knx/send", data = data)

and this is the actual definition of the app in apps.yaml.

cup_arbeiten:
  module: cover_update_position
  class: CoverUpdatePosition
  move_short_address: '1/3/1'
  move_long_address: '1/3/2'
  position_address: '1/3/7'
  travelling_time: 12

I hope someone finds this useful and helps me to get this working. I am happy to assist.