Automation Patterns¶
Automation scales your operations
Once you have basic scripting down you need patterns for making scripts reliable , repeatable , and robust
Idempotency
An idempotent operation produces the same result no matter how many times you run it:
# BAD - creates duplicate content on re-run
echo "config line" >> /etc/service.conf
# GOOD - checks first, only adds if missing
grep -q "config line" /etc/service.conf || echo "config line" >> /etc/service.conf
Logging
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('automation.log'),
logging.StreamHandler()
]
)
logging.info("Starting automation run")
logging.warning("Non-critical issue detected")
logging.error("Critical failure, aborting")
Retry Logic
import time
from functools import wraps
def retry(max_attempts=3, delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
logging.warning(f"Attempt {attempt + 1} failed: {e}")
time.sleep(delay * (attempt + 1))
return None
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def fetch_data(url):
return requests.get(url, timeout=5)
Configuration Management
# config.yaml
targets:
- host: 192.168.1.1
ports: [22, 80, 443]
- host: 192.168.1.2
ports: [3306, 5432]
import yaml
with open('config.yaml') as f:
config = yaml.safe_load(f)
Error Handling Pattern
import sys
def run_command(cmd, timeout=30):
"""Run shell command with timeout and error handling"""
import subprocess
try:
result = subprocess.run(
cmd, shell=True, capture_output=True,
text=True, timeout=timeout
)
if result.returncode != 0:
logging.error(f"Command failed: {result.stderr}")
return None
return result.stdout.strip()
except subprocess.TimeoutExpired:
logging.error(f"Command timed out after {timeout}s")
return None
Parallel Execution
from concurrent.futures import ThreadPoolExecutor, as_completed
def scan_target(target):
host, ports = target
open_ports = []
for port in ports:
if check_port(host, port):
open_ports.append(port)
return host, open_ports
targets = [('192.168.1.1', [22, 80]), ('192.168.1.2', [443, 8080])]
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(scan_target, t): t for t in targets}
for future in as_completed(futures):
host, ports = future.result()
print(f"{host}: open ports {ports}")
Scheduling
# Cron - every day at 2AM
0 2 * * * /path/to/script.sh
# Systemd timer
[Unit]
Description=Weekly Security Scan
[Timer]
OnCalendar=weekly
Persistent=true
[Install]
WantedBy=timers.target