#!/usr/bin/env python3 import json import sys import os import urllib.request import urllib.error from argparse import ArgumentParser class LinodeInventory: def __init__(self): self.api_token = os.environ.get('LINODE_API_TOKEN') if not self.api_token: raise ValueError("LINODE_API_TOKEN environment variable is required") self.base_url = "https://api.linode.com/v4" def make_request(self, endpoint): """Make HTTP request to Linode API using urllib""" url = f"{self.base_url}{endpoint}" req = urllib.request.Request(url) req.add_header('Authorization', f'Bearer {self.api_token}') req.add_header('Content-Type', 'application/json') try: with urllib.request.urlopen(req, timeout=30) as response: data = response.read().decode('utf-8') return json.loads(data) except urllib.error.HTTPError as e: error_msg = e.read().decode('utf-8') if e.fp else str(e) raise Exception(f"HTTP Error {e.code}: {error_msg}") except urllib.error.URLError as e: raise Exception(f"URL Error: {e}") except Exception as e: raise Exception(f"Request failed: {e}") def get_instances(self): """Fetch all Linode instances""" try: # Get first page to check pagination response = self.make_request("/linode/instances?page=1") instances = response['data'] # Handle pagination if there are more pages if response.get('pages', 1) > 1: for page in range(2, response['pages'] + 1): page_response = self.make_request(f"/linode/instances?page={page}") instances.extend(page_response['data']) return instances except Exception as e: print(f"Error fetching instances: {e}", file=sys.stderr) return [] def generate_inventory(self): """Generate Ansible inventory from Linode instances""" inventory = { '_meta': { 'hostvars': {} }, 'all': { 'children': ['ungrouped'] }, 'ungrouped': { 'hosts': [] } } # Group definitions regions = {} types = {} statuses = {} try: instances = self.get_instances() if not instances: print("No instances found or API request failed", file=sys.stderr) return inventory for instance in instances: # Use Linode label as hostname hostname = instance['label'] # Get primary IPv4 address ipv4_addresses = instance.get('ipv4', []) primary_ip = ipv4_addresses[0] if ipv4_addresses else None if not primary_ip: print(f"Warning: No IPv4 address found for {hostname}", file=sys.stderr) continue # Add to ungrouped hosts inventory['ungrouped']['hosts'].append(hostname) # Host variables inventory['_meta']['hostvars'][hostname] = { 'ansible_host': primary_ip, 'linode_id': instance['id'], 'linode_label': instance['label'], 'linode_region': instance['region'], 'linode_type': instance['type'], 'linode_status': instance['status'], 'linode_ipv4': instance.get('ipv4', []), 'linode_ipv6': instance.get('ipv6'), 'linode_tags': instance.get('tags', []), 'linode_specs': instance.get('specs', {}), 'linode_hypervisor': instance.get('hypervisor'), 'linode_created': instance.get('created'), 'linode_updated': instance.get('updated'), 'linode_group': instance.get('group', ''), 'linode_image': instance.get('image', {}).get('id', '') if instance.get('image') else '', 'linode_backups': instance.get('backups', {}).get('enabled', False) } # Group by region region_group = f"region_{instance['region'].replace('-', '_')}" if region_group not in regions: regions[region_group] = {'hosts': []} regions[region_group]['hosts'].append(hostname) # Group by instance type type_group = f"type_{instance['type'].replace('-', '_').replace('.', '_')}" if type_group not in types: types[type_group] = {'hosts': []} types[type_group]['hosts'].append(hostname) # Group by status status_group = f"status_{instance['status']}" if status_group not in statuses: statuses[status_group] = {'hosts': []} statuses[status_group]['hosts'].append(hostname) # Group by tags for tag in instance.get('tags', []): tag_group = f"tag_{tag.replace('-', '_').replace(' ', '_').replace('.', '_')}" if tag_group not in inventory: inventory[tag_group] = {'hosts': []} inventory[tag_group]['hosts'].append(hostname) # Group by Linode group (if set) if instance.get('group'): group_name = f"group_{instance['group'].replace('-', '_').replace(' ', '_')}" if group_name not in inventory: inventory[group_name] = {'hosts': []} inventory[group_name]['hosts'].append(hostname) except Exception as e: print(f"Error generating inventory: {e}", file=sys.stderr) return inventory # Add all groups to inventory inventory.update(regions) inventory.update(types) inventory.update(statuses) # Add group children to 'all' all_groups = list(regions.keys()) + list(types.keys()) + list(statuses.keys()) tag_groups = [k for k in inventory.keys() if k.startswith('tag_')] group_groups = [k for k in inventory.keys() if k.startswith('group_')] all_groups.extend(tag_groups) all_groups.extend(group_groups) if all_groups: inventory['all']['children'].extend(all_groups) return inventory def get_host_vars(self, hostname): """Get variables for a specific host""" inventory = self.generate_inventory() return inventory['_meta']['hostvars'].get(hostname, {}) def main(): parser = ArgumentParser(description='Linode Dynamic Inventory') parser.add_argument('--list', action='store_true', help='List all hosts') parser.add_argument('--host', help='Get variables for specific host') args = parser.parse_args() try: inventory = LinodeInventory() if args.list: result = inventory.generate_inventory() print(json.dumps(result, indent=2)) elif args.host: result = inventory.get_host_vars(args.host) print(json.dumps(result, indent=2)) else: parser.print_help() except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()