Octopus Energy Agile Tariff

Here’s my script for working out the cheapest n-block cost, which I call octoplan.py. Seems to do broadly the same as @badguy’s octoblock but without needing appdaemon. It’s a bit less “integrated” as a result, of course: it’s read with a command_line sensor to give the cheapest times display as per my screenshot above; and also queried with a bash script (run via a shell_command automation) that writes the immersion start time to a text file that HA reads with another command_line sensor. Clunky, but I understand what it’s doing.

The from and to arguments need to be in recognizable time formats, and it’ll probably fail in all sorts of horrible ways if you feed it invalid arguments.

My most recent addition is the latest flag, which I use for the immersion schedule. If there are two or more periods with exactly the same cost in the time span being evaluated, the default is to return the earliest. If you specify --latest, it’ll instead return the latest block which makes more sense for an overnight immersion run.

The first line refers to octopus_lib. This is Octopus’s David Winterbottom’s client.py from here which I have downloaded and renamed octopus_lib.py to give me half a chance of remembering what it is in three years’ time. You will need to ensure you have installed the Python requests library as well as the imports below (dateutil is installed as py-dateutil).

The final section of the script generates an output list of time/cost dictionaries of cheapest times averaged over 30m in output[0] to 4h in output[7]. The final line prints out the JSON for 30m, 1h, 2h and 4h only, but you can select others if needed.

If you use it and it breaks, you get to keep all the pieces. Please be considerate in querying Octopus’s API: I run this half-hourly via an automation trigger (see earlier posts).

from octopus_lib import APIClient                                               
import argparse                                                                 
import datetime                                                                 
import json                                                                     
from dateutil import parser                                                     
from dateutil.tz import tzlocal                                                 

APIKEY = 'YOUR_API_KEY'
REGIONCODE = 'E'                 # replace with your region code

argparser = argparse.ArgumentParser(description='Finds cheapest Agile chunks')  

argparser.add_argument('-f',                                                    
                       '--from',                                                
                       help='Start time for search',                            
                       required=False)                                          
argparser.add_argument('-t',                                                    
                       '--to',                                                  
                       help='End time for search',                              
                       required=False)                                          
argparser.add_argument('-l',                                                    
                       '--latest',                                              
                       help='Find latest rather than earliest occurrence',      
                       action='store_true',                                     
                       required=False)                                          

args = vars(argparser.parse_args())                                             

from_ts = None                                                                  
to_ts = None                                                                    

if args['from'] is not None:                                                    
    from_ts = parser.parse(args['from']).astimezone(tzlocal())                  

if args['to'] is not None:                                                      
    to_ts = parser.parse(args['to']).astimezone(tzlocal())                      

latest = args['latest']                                                         

# pull in the rates                                                             
octo = APIClient(APIKEY)                                                        
now = datetime.datetime.now()                                                   
rates = octo.agile_tariff_unit_rates(REGIONCODE, period_from=now)                      
# API returns rates as latest-first, fix that...                                
rates['results'].reverse()                                                      

# Extract costs and times into lists                                            
costs = []                                                                      
times = []                                                                      
for result in rates['results']:                                                 
    period_str = result['valid_from']                                           
    time_ts = parser.parse(timestr=period_str).astimezone(tzlocal())            
    if from_ts is not None and time_ts < from_ts:                               
        # start specified and not yet reached: continue searching               
        continue                                                                
    if to_ts is not None and time_ts >= to_ts:                                  
        # end specified and reached: break out                                  
        break                                                                   
    times.append(time_ts)                                                       
    costs.append(result['value_inc_vat'])                                       

# Build a matrix of half-hour periods (vertical) and 1-8 averaging periods      
# (horizontal) (so second column is hour average costs)                         
cost_matrix = []                                                                

for x in range(len(costs)):                                                     
    cost_row = [costs[x]]                                                       
    for y in range(1, 8):                                                       
        if len(costs) - x < y + 1:                                              
            # table ends before averaging period                                
            cost_row.append(None)                                               
        else:                                                                   
            cost_total = 0                                                      
            for z in range(y + 1):                                              
                cost_total = cost_total + costs[x + z]                          
            cost_row.append(round(cost_total / (z + 1), 2))                     
    cost_matrix.append(cost_row)                                                

output = []                                                                     
for x in range(8):                                                              
    # Build column, find minimum                                                
    cost_col = []                                                               
    for y in range(len(costs)):                                                 
        if cost_matrix[y][x] is not None:                                       
            cost_col.append(cost_matrix[y][x])                                  

    if latest:                                                                  
        mindex = len(cost_col) - 1 - cost_col[::-1].index(min(cost_col))        
    else:                                                                       
        mindex = cost_col.index(min(cost_col))                                  

    output.append({"time": "%s" % times[mindex].isoformat(),                    
                   "cost": "%.2f" % cost_col[mindex]})                          

# output a list for cheapest 30m, 1h, 2h and 4h                                 
print(json.dumps([output[0], output[1], output[3], output[7]]))                                                     
1 Like