The usual caveats. This worked (or seemed to work so far!) for me, use it at your own risk, but it speeded up an annoying job. Made with Claude
Usage is:
python3 convert.py configuration.yaml
#!/usr/bin/env python3
"""
YAML Template Legacy Configuration Migrator (Text-based)
Automatically migrates legacy 'platform: template' definitions to modern
template syntax while preserving original YAML formatting.
"""
import re
import sys
import shutil
from pathlib import Path
from typing import Dict, List, Tuple, Any
from datetime import datetime
from collections import defaultdict
import yaml
class TextBasedTemplateMigrator:
"""Migrates legacy templates using text manipulation to preserve formatting"""
# Mapping for domains that don't follow simple pluralization
DOMAIN_KEY_MAP = {
'binary_sensor': 'sensors', # binary_sensor uses 'sensors:' not 'binary_sensors:'
'switch': 'switches',
}
def __init__(self, yaml_file_path: str):
self.file_path = Path(yaml_file_path)
self.original_content = None
self.modified_content = None
self.warnings = []
self.migrations = []
self.backup_file = None
self.legacy_blocks = []
self.original_entity_count = 0
self.converted_entity_count = 0
def load_file(self) -> bool:
"""Load the original file"""
try:
with open(self.file_path, 'r') as f:
self.original_content = f.read()
self.modified_content = self.original_content
return True
except FileNotFoundError:
self.warnings.append(f"File not found: {self.file_path}")
return False
def create_backup(self) -> bool:
"""Create a backup of the original file"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.file_path.parent / f"{self.file_path.stem}_backup_{timestamp}.yaml"
shutil.copy2(self.file_path, backup_path)
self.backup_file = backup_path
return True
except Exception as e:
self.warnings.append(f"Failed to create backup: {e}")
return False
def parse_yaml_for_extraction(self) -> Dict[str, List[Dict]]:
"""Parse YAML to extract legacy templates (for logic, not output)"""
try:
loader = yaml.SafeLoader
loader.add_constructor('!include', lambda loader, node: f"!include {node.value}")
loader.add_constructor('!secret', lambda loader, node: f"!secret {node.value}")
loader.add_multi_constructor('!', lambda loader, suffix, node: f"!{suffix}")
config = yaml.load(self.original_content, Loader=loader)
except:
self.warnings.append("Could not parse YAML for extraction")
return {}
legacy_templates = defaultdict(list)
if not config:
return legacy_templates
for domain, domain_config in config.items():
if not isinstance(domain_config, list):
domain_config = [domain_config] if domain_config else []
for item in domain_config:
if not isinstance(item, dict):
continue
if item.get('platform') == 'template':
# Get the correct key for nested entities (handle special cases like binary_sensor)
entity_key = self.DOMAIN_KEY_MAP.get(domain, f'{domain}s')
# Format 1: Nested entities under domain plural key (e.g., sensors:, locks:)
if entity_key in item and isinstance(item[entity_key], dict):
for entity_name, entity_config in item[entity_key].items():
if isinstance(entity_config, dict):
legacy_templates[domain].append({
'entity_name': entity_name,
'config': entity_config
})
# Format 2: Direct entity with name at top level
elif 'name' in item:
# Generate entity_id from name
entity_name = item['name'].lower().replace(' ', '_').replace('-', '_')
# Remove special characters
entity_name = re.sub(r'[^a-z0-9_]', '', entity_name)
# Create config without platform and name keys
entity_config = {k: v for k, v in item.items() if k not in ['platform', 'name']}
entity_config['friendly_name'] = item['name']
legacy_templates[domain].append({
'entity_name': entity_name,
'config': entity_config
})
return legacy_templates
def find_and_remove_legacy_blocks(self) -> bool:
"""Find and remove legacy platform: template blocks from text"""
lines = self.modified_content.split('\n')
new_lines = []
i = 0
in_template_block = False
template_block_start = None
list_item_indent = None
while i < len(lines):
line = lines[i]
current_indent = len(line) - len(line.lstrip())
stripped = line.strip()
# Check if this line starts a platform: template block
if re.match(r'^\s*-\s+platform:\s+template\s*$', line):
# Start of a template block
in_template_block = True
template_block_start = i
# The indent of the '-' character
list_item_indent = current_indent
# Store info about what we're removing
self.legacy_blocks.append({
'text': line,
'lines': (i, i)
})
i += 1
continue
# If we're in a template block, check if we've reached the end
if in_template_block:
# End conditions:
# 1. Empty line followed by another list item at same level
# 2. A new list item at the same or lower indent level
# 3. A new top-level key (no indent, ends with :)
# 4. Line with indent at or less than the list item indent (unless it's empty)
if stripped == '':
# Empty line - could be end or just spacing
new_lines.append(line)
i += 1
continue
elif current_indent <= list_item_indent:
# We've outdented back to or past the list item level - end of block
in_template_block = False
template_block_start = None
list_item_indent = None
# Don't skip this line, process it normally
new_lines.append(line)
i += 1
continue
else:
# Still inside the template block, skip this line
i += 1
continue
# Normal processing - keep the line
new_lines.append(line)
i += 1
self.modified_content = '\n'.join(new_lines)
return len(self.legacy_blocks) > 0
def generate_modern_templates(self, legacy_templates: Dict[str, List[Dict]]) -> str:
"""Generate modern template YAML"""
if not legacy_templates:
return ""
lines = []
lines.append("template:")
for domain in sorted(legacy_templates.keys()):
entities = legacy_templates[domain]
lines.append(f"- {domain}:")
for entity in entities:
entity_name = entity['entity_name']
config = entity['config']
lines.append(f" - default_entity_id: {domain}.{entity_name}")
# Quote friendly_name if it contains special YAML characters
friendly_name = config.get('friendly_name', entity_name)
if ':' in friendly_name or '{' in friendly_name or '[' in friendly_name:
lines.append(f" name: '{friendly_name}'")
else:
lines.append(f" name: {friendly_name}")
# Handle state/value_template
value_template = config.get('value_template', '')
if value_template:
# Check if it's multi-line
if '\n' in value_template:
# Use literal block scalar for multi-line
lines.append(f" state: |")
# Indent the template
for template_line in value_template.split('\n'):
lines.append(f" {template_line}")
else:
# Check if template has single quotes - if so, use double quotes and escape
if "'" in value_template and '"' not in value_template:
# Use double quotes and escape any internal double quotes
escaped = value_template.replace('"', '\\"')
lines.append(f' state: "{escaped}"')
elif "'" in value_template and '"' in value_template:
# Has both - use literal block scalar
lines.append(f" state: |")
lines.append(f" {value_template}")
else:
# Safe to use single quotes
lines.append(f" state: '{value_template}'")
# Add other attributes
skip_keys = {'friendly_name', 'value_template', 'platform'}
for key in sorted(config.keys()):
if key not in skip_keys:
value = config[key]
if isinstance(value, str):
lines.append(f" {key}: {value}")
elif isinstance(value, bool):
lines.append(f" {key}: {str(value).lower()}")
elif isinstance(value, dict):
# Handle nested dictionaries (like lock/unlock actions)
lines.append(f" {key}:")
self._add_nested_dict(lines, value, indent=6)
elif isinstance(value, list):
# Handle lists
lines.append(f" {key}:")
for item in value:
if isinstance(item, dict):
lines.append(f" -")
self._add_nested_dict(lines, item, indent=8)
else:
lines.append(f" - {item}")
else:
lines.append(f" {key}: {value}")
self.migrations.extend([
{
'domain': domain,
'entity_name': e['entity_name'],
'status': 'converted'
}
for e in entities
])
return '\n'.join(lines)
def _add_nested_dict(self, lines: List[str], d: dict, indent: int):
"""Helper to add nested dictionary content with proper indentation"""
indent_str = ' ' * indent
for key, value in d.items():
if isinstance(value, dict):
lines.append(f"{indent_str}{key}:")
self._add_nested_dict(lines, value, indent + 2)
elif isinstance(value, list):
lines.append(f"{indent_str}{key}:")
for item in value:
if isinstance(item, dict):
lines.append(f"{indent_str} -")
self._add_nested_dict(lines, item, indent + 4)
else:
lines.append(f"{indent_str} - {item}")
elif isinstance(value, str):
# Quote strings that might need it
if ':' in value or '{' in value or value.startswith('!'):
lines.append(f'{indent_str}{key}: "{value}"')
else:
lines.append(f"{indent_str}{key}: {value}")
elif isinstance(value, bool):
lines.append(f"{indent_str}{key}: {str(value).lower()}")
else:
lines.append(f"{indent_str}{key}: {value}")
def insert_modern_templates(self, modern_yaml: str) -> bool:
"""Insert modern templates into the config"""
if not modern_yaml:
return False
lines = self.modified_content.split('\n')
# Find existing template: section
template_idx = None
for i, line in enumerate(lines):
if re.match(r'^template:\s*$', line):
template_idx = i
break
if template_idx is not None:
# Find where the template section ends
insert_idx = template_idx + 1
while insert_idx < len(lines):
line = lines[insert_idx]
# Stop if we hit a new top-level key (not indented)
if line.strip() and not line.startswith(' '):
break
insert_idx += 1
# Remove the 'template:' line from our generated YAML
generated_lines = modern_yaml.split('\n')
if generated_lines[0] == 'template:':
generated_lines = generated_lines[1:]
# Insert the new content before the next top-level section
new_lines = lines[:insert_idx] + generated_lines + [''] + lines[insert_idx:]
self.modified_content = '\n'.join(new_lines)
else:
# No existing template section, insert at end
self.modified_content = self.modified_content.rstrip() + '\n\n' + modern_yaml + '\n'
return True
def count_jinja_templates(self, content: str) -> int:
"""Count Jinja2 template occurrences ({{...}}) as a sanity check"""
# Find all {{ ... }} patterns, handling multiline
pattern = r'\{\{[^}]*\}\}'
matches = re.findall(pattern, content, re.DOTALL)
return len(matches)
def count_legacy_platform_templates(self, content: str) -> int:
"""Count legacy 'platform: template' blocks"""
count = 0
lines = content.split('\n')
for i, line in enumerate(lines):
# Match both list style and dict style
if re.search(r'(-\s+)?platform:\s+template\s*$', line):
count += 1
return count
def validate_conversion(self) -> bool:
"""Validate that all templates were converted"""
# Count entities that were migrated
self.converted_entity_count = len(self.migrations)
# Count Jinja2 templates before and after (sanity check)
original_jinja_count = self.count_jinja_templates(self.original_content)
modified_jinja_count = self.count_jinja_templates(self.modified_content)
# Count legacy blocks that were found
legacy_block_count = len(self.legacy_blocks)
# Validation checks
validation_passed = True
# Check 1: Did we find and convert the expected number of entities?
if self.converted_entity_count != self.original_entity_count:
self.warnings.append(
f"Entity count mismatch: Found {self.original_entity_count} legacy entities "
f"but only converted {self.converted_entity_count}"
)
validation_passed = False
# Check 2: Sanity check - Jinja2 template count should be similar (within reason)
# Allow some difference due to formatting changes or other templates in config
jinja_diff = abs(original_jinja_count - modified_jinja_count)
if jinja_diff > self.converted_entity_count * 2: # Allow ~2 templates per entity
self.warnings.append(
f"Large Jinja2 template count difference: Original had {original_jinja_count}, "
f"converted has {modified_jinja_count} (difference: {jinja_diff})"
)
# Check 3: Ensure we removed the legacy blocks
remaining_legacy = self.count_legacy_platform_templates(self.modified_content)
if remaining_legacy > 0:
self.warnings.append(
f"Warning: {remaining_legacy} legacy 'platform: template' blocks still remain in file"
)
validation_passed = False
return validation_passed
def migrate(self) -> Tuple[bool, str]:
"""Execute the full migration"""
if not self.load_file():
return False, "Failed to load file"
if not self.create_backup():
self.warnings.append("Warning: backup creation failed, continuing anyway")
# Parse to get legacy template info
legacy_templates = self.parse_yaml_for_extraction()
if not legacy_templates:
return False, "No legacy template definitions found"
# Count original entities found
self.original_entity_count = sum(len(entities) for entities in legacy_templates.values())
# Generate modern format
modern_yaml = self.generate_modern_templates(legacy_templates)
# Remove legacy blocks from text
self.find_and_remove_legacy_blocks()
# Insert modern templates
self.insert_modern_templates(modern_yaml)
# Validate the conversion
validation_passed = self.validate_conversion()
message = f"Migrated {len(self.migrations)} entities"
if not validation_passed:
message += " (with validation warnings - see report)"
return True, message
def write_config(self) -> bool:
"""Write the modified config back to file"""
try:
with open(self.file_path, 'w') as f:
f.write(self.modified_content)
return True
except Exception as e:
self.warnings.append(f"Failed to write config: {e}")
return False
def get_report(self) -> Dict[str, Any]:
"""Generate a migration report"""
return {
'file': str(self.file_path),
'backup': str(self.backup_file) if self.backup_file else None,
'migrations': self.migrations,
'total_migrated': len(self.migrations),
'original_count': self.original_entity_count,
'converted_count': self.converted_entity_count,
'warnings': self.warnings,
'success': len(self.warnings) == 0
}
def print_report(report: Dict[str, Any]):
"""Pretty print the migration report"""
print("\n" + "=" * 70)
print("YAML TEMPLATE MIGRATION REPORT")
print("=" * 70)
print(f"\nFile: {report['file']}")
print(f"Total Entities Migrated: {report['total_migrated']}")
# Validation info
print(f"\nđ VALIDATION:")
print(f" Original entities found: {report['original_count']}")
print(f" Entities converted: {report['converted_count']}")
if report['original_count'] == report['converted_count']:
print(f" â All entities successfully converted!")
else:
print(f" â ď¸ Mismatch detected!")
if report['backup']:
print(f"\nđž Backup Created: {report['backup']}")
if report['migrations']:
print(f"\nâ Successfully migrated {len(report['migrations'])} entities:")
by_domain = defaultdict(list)
for mig in report['migrations']:
by_domain[mig['domain']].append(mig['entity_name'])
for domain in sorted(by_domain.keys()):
entities = by_domain[domain]
print(f" {domain}: {len(entities)} entity/entities")
for entity in sorted(entities):
print(f" - {entity}")
if report['warnings']:
print(f"\nâ ď¸ WARNINGS ({len(report['warnings'])}):")
for warning in report['warnings']:
print(f" - {warning}")
else:
print("\nâ No warnings!")
print("\n" + "=" * 70)
def main():
if len(sys.argv) < 2:
print("Usage: python3 yaml_template_migrator.py <yaml_file> [--dry-run]")
print("\nExample:")
print(" python3 yaml_template_migrator.py configuration.yaml")
print(" python3 yaml_template_migrator.py configuration.yaml --dry-run")
sys.exit(1)
yaml_file = sys.argv[1]
dry_run = '--dry-run' in sys.argv
migrator = TextBasedTemplateMigrator(yaml_file)
success, message = migrator.migrate()
if not success:
print(f"â Migration failed: {message}")
sys.exit(1)
print(f"â {message}")
if not dry_run:
if migrator.write_config():
print("â Configuration file updated")
else:
print("â Failed to write configuration file")
sys.exit(1)
else:
print("(Dry run - no changes written)")
report = migrator.get_report()
print_report(report)
if __name__ == '__main__':
main()