Add python script to custom_scripts in project root so AWX can find it..

This commit is contained in:
2025-08-07 13:47:00 -05:00
parent 099dcf7e61
commit eae894e681

250
custom_scripts/linode_inventory.py Executable file → Normal file
View File

@@ -1,82 +1,208 @@
#!/usr/bin/env python3
import os
import sys
import json
import requests
import sys
import os
import urllib.request
import urllib.error
from argparse import ArgumentParser
API_URL = "https://api.linode.com/v4/linode/instances"
TOKEN = os.getenv("LINODE_TOKEN")
class LinodeInventory:
def __init__(self):
self.api_token = os.environ.get('LINODE_API_TOKEN')
if not self.api_token:
raise ValueError("LINODE_API_TOKEN environment variable is required")
if not TOKEN:
print("ERROR: LINODE_TOKEN environment variable not set", file=sys.stderr)
sys.exit(1)
self.base_url = "https://api.linode.com/v4"
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json"
}
def make_request(self, endpoint):
"""Make HTTP request to Linode API using urllib"""
url = f"{self.base_url}{endpoint}"
req = urllib.request.Request(url)
req.add_header('Authorization', f'Bearer {self.api_token}')
req.add_header('Content-Type', 'application/json')
def get_linode_instances():
instances = []
page = 1
while True:
r = requests.get(API_URL, headers=headers, params={"page": page})
if r.status_code != 200:
print(f"ERROR: Failed to fetch Linodes (status code {r.status_code})", file=sys.stderr)
sys.exit(1)
data = r.json()
instances.extend(data["data"])
if not data["pages"] or page >= data["pages"]:
break
page += 1
return instances
try:
with urllib.request.urlopen(req, timeout=30) as response:
data = response.read().decode('utf-8')
return json.loads(data)
except urllib.error.HTTPError as e:
error_msg = e.read().decode('utf-8') if e.fp else str(e)
raise Exception(f"HTTP Error {e.code}: {error_msg}")
except urllib.error.URLError as e:
raise Exception(f"URL Error: {e}")
except Exception as e:
raise Exception(f"Request failed: {e}")
def build_inventory():
instances = get_linode_instances()
def get_instances(self):
"""Fetch all Linode instances"""
try:
# Get first page to check pagination
response = self.make_request("/linode/instances?page=1")
instances = response['data']
inventory = {
"_meta": {
"hostvars": {}
}
}
# Handle pagination if there are more pages
if response.get('pages', 1) > 1:
for page in range(2, response['pages'] + 1):
page_response = self.make_request(f"/linode/instances?page={page}")
instances.extend(page_response['data'])
for linode in instances:
label = linode["label"]
ipv4 = linode["ipv4"][0] if linode["ipv4"] else None
region = linode["region"]
tags = linode.get("tags", [])
return instances
except Exception as e:
print(f"Error fetching instances: {e}", file=sys.stderr)
return []
if not ipv4:
continue
inventory["_meta"]["hostvars"][label] = {
"ansible_host": ipv4,
"linode_id": linode["id"],
"region": region,
"tags": tags,
"type": linode["type"]
def generate_inventory(self):
"""Generate Ansible inventory from Linode instances"""
inventory = {
'_meta': {
'hostvars': {}
},
'all': {
'children': ['ungrouped']
},
'ungrouped': {
'hosts': []
}
}
# Group by region
region_group = f"region_{region}"
inventory.setdefault(region_group, {"hosts": []})["hosts"].append(label)
# Group definitions
regions = {}
types = {}
statuses = {}
# Group by tag
for tag in tags:
tag_group = f"tag_{tag}"
inventory.setdefault(tag_group, {"hosts": []})["hosts"].append(label)
try:
instances = self.get_instances()
return inventory
if not instances:
print("No instances found or API request failed", file=sys.stderr)
return inventory
for instance in instances:
# Use Linode label as hostname
hostname = instance['label']
# Get primary IPv4 address
ipv4_addresses = instance.get('ipv4', [])
primary_ip = ipv4_addresses[0] if ipv4_addresses else None
if not primary_ip:
print(f"Warning: No IPv4 address found for {hostname}", file=sys.stderr)
continue
# Add to ungrouped hosts
inventory['ungrouped']['hosts'].append(hostname)
# Host variables
linode_tags = instance.get('tags', [])
inventory['_meta']['hostvars'][hostname] = {
'ansible_host': primary_ip,
'ansible_user': 'phlux', # Set default SSH user
'linode_id': instance['id'],
'linode_label': instance['label'],
'linode_region': instance['region'],
'linode_type': instance['type'],
'linode_status': instance['status'],
'linode_ipv4': instance.get('ipv4', []),
'linode_ipv6': instance.get('ipv6'),
'linode_tags': linode_tags,
'linode_specs': instance.get('specs', {}),
'linode_hypervisor': instance.get('hypervisor'),
'linode_created': instance.get('created'),
'linode_updated': instance.get('updated'),
'linode_group': instance.get('group', ''),
'linode_image': instance.get('image', {}).get('id', '') if isinstance(instance.get('image'), dict) else '',
'linode_backups': instance.get('backups', {}).get('enabled', False) if isinstance(instance.get('backups'), dict) else False,
# Add individual tag variables for easy access
'is_debian': 'Debian' in linode_tags,
'is_ubuntu': 'Ubuntu' in linode_tags,
'is_k3s': 'k3s' in linode_tags,
'is_control_plane': 'control-plane' in linode_tags,
'is_worker_node': 'worker-node' in linode_tags,
# Tag string for easy filtering
'tag_string': ','.join(linode_tags).lower(),
}
# Group by region
region_group = f"region_{instance['region'].replace('-', '_')}"
if region_group not in regions:
regions[region_group] = {'hosts': []}
regions[region_group]['hosts'].append(hostname)
# Group by instance type
type_group = f"type_{instance['type'].replace('-', '_').replace('.', '_')}"
if type_group not in types:
types[type_group] = {'hosts': []}
types[type_group]['hosts'].append(hostname)
# Group by status
status_group = f"status_{instance['status']}"
if status_group not in statuses:
statuses[status_group] = {'hosts': []}
statuses[status_group]['hosts'].append(hostname)
# Group by tags
for tag in instance.get('tags', []):
tag_group = f"tag_{tag.replace('-', '_').replace(' ', '_').replace('.', '_')}"
if tag_group not in inventory:
inventory[tag_group] = {'hosts': []}
inventory[tag_group]['hosts'].append(hostname)
# Group by Linode group (if set)
if instance.get('group'):
group_name = f"group_{instance['group'].replace('-', '_').replace(' ', '_')}"
if group_name not in inventory:
inventory[group_name] = {'hosts': []}
inventory[group_name]['hosts'].append(hostname)
except Exception as e:
print(f"Error generating inventory: {e}", file=sys.stderr)
return inventory
# Add all groups to inventory
inventory.update(regions)
inventory.update(types)
inventory.update(statuses)
# Add group children to 'all'
all_groups = list(regions.keys()) + list(types.keys()) + list(statuses.keys())
tag_groups = [k for k in inventory.keys() if k.startswith('tag_')]
group_groups = [k for k in inventory.keys() if k.startswith('group_')]
all_groups.extend(tag_groups)
all_groups.extend(group_groups)
if all_groups:
inventory['all']['children'].extend(all_groups)
return inventory
def get_host_vars(self, hostname):
"""Get variables for a specific host"""
inventory = self.generate_inventory()
return inventory['_meta']['hostvars'].get(hostname, {})
def main():
if len(sys.argv) == 2 and sys.argv[1] == "--list":
print(json.dumps(build_inventory(), indent=2))
elif len(sys.argv) == 2 and sys.argv[1] == "--host":
print(json.dumps({})) # Not used
else:
print("Usage: linode_inventory.py --list", file=sys.stderr)
parser = ArgumentParser(description='Linode Dynamic Inventory')
parser.add_argument('--list', action='store_true', help='List all hosts')
parser.add_argument('--host', help='Get variables for specific host')
args = parser.parse_args()
try:
inventory = LinodeInventory()
if args.list:
result = inventory.generate_inventory()
print(json.dumps(result, indent=2))
elif args.host:
result = inventory.get_host_vars(args.host)
print(json.dumps(result, indent=2))
else:
parser.print_help()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
if __name__ == '__main__':
main()