Skip to content

Automation Patterns

Automation scales your operations
Once you have basic scripting down you need patterns for making scripts reliable , repeatable , and robust

Idempotency

An idempotent operation produces the same result no matter how many times you run it:

# BAD - creates duplicate content on re-run
echo "config line" >> /etc/service.conf

# GOOD - checks first, only adds if missing
grep -q "config line" /etc/service.conf || echo "config line" >> /etc/service.conf

Logging

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('automation.log'),
        logging.StreamHandler()
    ]
)

logging.info("Starting automation run")
logging.warning("Non-critical issue detected")
logging.error("Critical failure, aborting")

Retry Logic

import time
from functools import wraps

def retry(max_attempts=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    logging.warning(f"Attempt {attempt + 1} failed: {e}")
                    time.sleep(delay * (attempt + 1))
            return None
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2)
def fetch_data(url):
    return requests.get(url, timeout=5)

Configuration Management

# config.yaml
targets:
  - host: 192.168.1.1
    ports: [22, 80, 443]
  - host: 192.168.1.2
    ports: [3306, 5432]

import yaml
with open('config.yaml') as f:
    config = yaml.safe_load(f)

Error Handling Pattern

import sys

def run_command(cmd, timeout=30):
    """Run shell command with timeout and error handling"""
    import subprocess
    try:
        result = subprocess.run(
            cmd, shell=True, capture_output=True, 
            text=True, timeout=timeout
        )
        if result.returncode != 0:
            logging.error(f"Command failed: {result.stderr}")
            return None
        return result.stdout.strip()
    except subprocess.TimeoutExpired:
        logging.error(f"Command timed out after {timeout}s")
        return None

Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

def scan_target(target):
    host, ports = target
    open_ports = []
    for port in ports:
        if check_port(host, port):
            open_ports.append(port)
    return host, open_ports

targets = [('192.168.1.1', [22, 80]), ('192.168.1.2', [443, 8080])]

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(scan_target, t): t for t in targets}
    for future in as_completed(futures):
        host, ports = future.result()
        print(f"{host}: open ports {ports}")

Scheduling

# Cron - every day at 2AM
0 2 * * * /path/to/script.sh

# Systemd timer
[Unit]
Description=Weekly Security Scan

[Timer]
OnCalendar=weekly
Persistent=true

[Install]
WantedBy=timers.target