Shell 与 Python 自动化运维:从重复操作到智能脚本的工程实践 Shell 与 Python 自动化运维从重复操作到智能脚本的工程实践一、运维自动化的核心价值消除重复减少人为失误运维工作中大量时间消耗在重复性操作上批量检查服务器状态、清理过期日志、同步配置文件、执行数据库备份。手动操作不仅效率低更容易出错——一个参数输错可能导致服务中断。自动化脚本的核心价值不是替代人工而是将易出错的手动操作转化为可审计、可重复的自动化流程。更深层的问题是知识沉淀。运维经验往往存在于个人脑中人员变动后知识流失。自动化脚本是运维知识的代码化载体——脚本中包含了操作步骤、错误处理、边界条件判断这些信息不会随人员变动而丢失。二、自动化运维脚本的架构设计flowchart TB subgraph 脚本分层 A[基础工具层br/SSH/SCP/HTTP 客户端] -- B[任务编排层br/并行执行 错误处理] B -- C[业务逻辑层br/巡检/清理/备份/部署] end subgraph 执行模式 C -- D1[单机执行br/本地脚本] C -- D2[批量执行br/Ansible / Parallel SSH] C -- D3[定时执行br/Cron / Airflow] end subgraph 输出与报告 D1 -- E[执行日志br/结构化 JSON] D2 -- E D3 -- E E -- F[报告生成br/HTML/Markdown] F -- G[告警通知br/IM/邮件] end style B fill:#f9f,stroke:#333 style E fill:#9ff,stroke:#333三、自动化运维脚本的核心实现3.1 批量巡检脚本#!/bin/bash # cluster-healthcheck.sh —— K8s 集群健康巡检脚本 set -euo pipefail REPORT_FILEhealthcheck-$(date %Y%m%d-%H%M%S).md CLUSTER_NAME${CLUSTER_NAME:-production} ALERT_WEBHOOK${ALERT_WEBHOOK:-} echo # 集群健康巡检报告 $REPORT_FILE echo **集群**: $CLUSTER_NAME $REPORT_FILE echo **时间**: $(date %Y-%m-%d %H:%M:%S) $REPORT_FILE echo $REPORT_FILE CRITICAL_ISSUES0 WARNING_ISSUES0 # 1. 节点状态检查 echo ## 1. 节点状态 $REPORT_FILE NOT_READY_NODES$(kubectl get nodes --no-headers | grep -v Ready || true) if [ -n $NOT_READY_NODES ]; then echo ❌ 以下节点不健康: $REPORT_FILE echo $REPORT_FILE echo $NOT_READY_NODES $REPORT_FILE echo $REPORT_FILE CRITICAL_ISSUES$((CRITICAL_ISSUES 1)) else echo ✅ 所有节点健康 $REPORT_FILE fi # 节点资源压力 echo $REPORT_FILE echo ### 节点资源使用 $REPORT_FILE echo | 节点 | CPU 请求 | CPU 限制 | 内存请求 | 内存限制 | 状态 | $REPORT_FILE echo | --- | --- | --- | --- | --- | --- | $REPORT_FILE kubectl get nodes -o json | jq -r .items[] | | \(.metadata.name) | \(.status.allocatable.cpu) | \(.status.allocatable.cpu) | \(.status.allocatable.memory) | \(.status.allocatable.memory) | \(.status.conditions[] | select(.typeReady) | .status) | $REPORT_FILE 2/dev/null || true # 2. Pod 状态检查 echo $REPORT_FILE echo ## 2. Pod 状态 $REPORT_FILE # CrashLoopBackOff CRASHING_PODS$(kubectl get pods -A --field-selectorstatus.phase!Running --no-headers 2/dev/null | grep -E CrashLoopBackOff|Error|OOMKilled || true) if [ -n $CRASHING_PODS ]; then echo ❌ 异常 Pod: $REPORT_FILE echo $REPORT_FILE echo $CRASHING_PODS $REPORT_FILE echo $REPORT_FILE CRITICAL_ISSUES$((CRITICAL_ISSUES 1)) else echo ✅ 所有 Pod 运行正常 $REPORT_FILE fi # 高重启次数 Pod HIGH_RESTART_PODS$(kubectl get pods -A --no-headers 2/dev/null | \ awk {if ($50 5) print $0} || true) if [ -n $HIGH_RESTART_PODS ]; then echo $REPORT_FILE echo ⚠️ 重启次数 5 的 Pod: $REPORT_FILE echo $REPORT_FILE echo $HIGH_RESTART_PODS $REPORT_FILE echo $REPORT_FILE WARNING_ISSUES$((WARNING_ISSUES 1)) fi # 3. 资源限制检查 echo $REPORT_FILE echo ## 3. 资源限制 $REPORT_FILE NO_LIMIT_PODS$(kubectl get pods -A -o json 2/dev/null | \ jq -r .items[] | select(.spec.containers[]?.resources.limits null) | \(.metadata.namespace)/\(.metadata.name) | sort -u || true) if [ -n $NO_LIMIT_PODS ]; then echo ⚠️ 以下 Pod 未设置资源限制: $REPORT_FILE echo $REPORT_FILE echo $NO_LIMIT_PODS $REPORT_FILE echo $REPORT_FILE WARNING_ISSUES$((WARNING_ISSUES 1)) else echo ✅ 所有 Pod 已设置资源限制 $REPORT_FILE fi # 4. PVC 磁盘使用检查 echo $REPORT_FILE echo ## 4. 存储使用 $REPORT_FILE HIGH_USAGE_PVCS$(kubectl get pods -A -o json 2/dev/null | \ jq -r .items[] | .metadata.namespace as $ns | .metadata.name as $pod | .spec.volumes[]? | select(.persistentVolumeClaim ! null) | \($ns)/\($pod): \(.persistentVolumeClaim.claimName) | sort -u || true) if [ -n $HIGH_USAGE_PVCS ]; then echo PVC 挂载情况: $REPORT_FILE echo $REPORT_FILE echo $HIGH_USAGE_PVCS $REPORT_FILE echo $REPORT_FILE fi # 5. 最近事件 echo $REPORT_FILE echo ## 5. 最近事件最近 1 小时 $REPORT_FILE RECENT_EVENTS$(kubectl get events -A --sort-by.lastTimestamp \ --field-selectortypeWarning --no-headers 2/dev/null | tail -20 || true) if [ -n $RECENT_EVENTS ]; then echo $REPORT_FILE echo $RECENT_EVENTS $REPORT_FILE echo $REPORT_FILE fi # 汇总 echo $REPORT_FILE echo --- $REPORT_FILE echo **汇总**: ❌ 严重问题: $CRITICAL_ISSUES | ⚠️ 警告: $WARNING_ISSUES $REPORT_FILE # 发送告警 if [ $CRITICAL_ISSUES -gt 0 ] [ -n $ALERT_WEBHOOK ]; then curl -s -X POST $ALERT_WEBHOOK \ -H Content-Type: application/json \ -d {\content\: \ 集群 $CLUSTER_NAME 巡检发现 $CRITICAL_ISSUES 个严重问题请查看报告\} fi echo echo 巡检完成报告已生成: $REPORT_FILE echo 严重问题: $CRITICAL_ISSUES | 警告: $WARNING_ISSUES3.2 Python 自动化运维框架#!/usr/bin/env python3 # ops_automation.py —— 运维自动化框架 import subprocess import json import time import logging from dataclasses import dataclass, field from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, Optional logging.basicConfig( levellogging.INFO, format%(asctime)s [%(levelname)s] %(message)s ) logger logging.getLogger(__name__) dataclass class TaskResult: task_name: str success: bool output: str error: str duration_ms: float 0 dataclass class Task: name: str func: Callable[[], TaskResult] timeout: int 300 # 秒 retry: int 1 critical: bool True # 失败是否中断后续任务 class OpsAutomation: 运维自动化框架任务编排 并行执行 错误处理 def __init__(self, max_workers: int 4): self.tasks: list[Task] [] self.results: list[TaskResult] [] self.max_workers max_workers def add_task(self, task: Task) - None: self.tasks.append(task) def run_sequential(self) - list[TaskResult]: 顺序执行任务失败可中断 for task in self.tasks: result self._execute_task(task) self.results.append(result) if not result.success and task.critical: logger.error(f关键任务失败: {task.name}中止执行) break return self.results def run_parallel(self) - list[TaskResult]: 并行执行所有任务 with ThreadPoolExecutor(max_workersself.max_workers) as executor: futures { executor.submit(self._execute_task, task): task for task in self.tasks } for future in as_completed(futures): task futures[future] try: result future.result(timeouttask.timeout) self.results.append(result) except Exception as e: self.results.append(TaskResult( task_nametask.name, successFalse, errorstr(e), )) return self.results def _execute_task(self, task: Task) - TaskResult: 执行单个任务支持重试 last_error None for attempt in range(task.retry): try: start time.time() result task.func() result.duration_ms (time.time() - start) * 1000 if result.success: logger.info(f✓ {task.name} ({result.duration_ms:.0f}ms)) return result else: last_error result.error logger.warning(f✗ {task.name}: {result.error}) except Exception as e: last_error str(e) logger.warning(f✗ {task.name} 异常: {e}) if attempt task.retry - 1: wait 2 ** attempt logger.info(f 重试 {attempt 1}/{task.retry}等待 {wait}s) time.sleep(wait) return TaskResult( task_nametask.name, successFalse, errorlast_error or Unknown error, ) def generate_report(self) - str: 生成执行报告 success_count sum(1 for r in self.results if r.success) fail_count sum(1 for r in self.results if not r.success) report f# 运维自动化执行报告\n\n report f**执行时间**: {time.strftime(%Y-%m-%d %H:%M:%S)}\n report f**结果**: ✅ {success_count} 成功 | ❌ {fail_count} 失败\n\n report | 任务 | 状态 | 耗时 | 错误 |\n report | --- | --- | --- | --- |\n for r in self.results: status ✅ if r.success else ❌ error r.error[:50] if r.error else report f| {r.task_name} | {status} | {r.duration_ms:.0f}ms | {error} |\n return report # 使用示例数据库备份自动化 def backup_database() - TaskResult: PostgreSQL 数据库备份 timestamp time.strftime(%Y%m%d_%H%M%S) backup_file f/backup/db_{timestamp}.sql.gz try: result subprocess.run( [ pg_dump, -h, localhost, -U, postgres, -d, production, --formatcustom, |, gzip, , backup_file ], capture_outputTrue, textTrue, timeout1800 ) if result.returncode ! 0: return TaskResult( task_namedatabase_backup, successFalse, errorresult.stderr[:200], ) # 验证备份文件 file_size os.path.getsize(backup_file) if file_size 1024: # 小于 1KB 可能是空备份 return TaskResult( task_namedatabase_backup, successFalse, errorf备份文件过小: {file_size} bytes, ) return TaskResult( task_namedatabase_backup, successTrue, outputf备份完成: {backup_file} ({file_size / 1024 / 1024:.1f}MB), ) except subprocess.TimeoutExpired: return TaskResult( task_namedatabase_backup, successFalse, error备份超时30分钟, ) except Exception as e: return TaskResult( task_namedatabase_backup, successFalse, errorstr(e), ) import os if __name__ __main__: automation OpsAutomation(max_workers2) automation.add_task(Task(namedatabase_backup, funcbackup_database, retry2)) automation.run_sequential() print(automation.generate_report())四、自动化脚本的工程化要求幂等性脚本必须支持重复执行而不产生副作用。例如创建目录前先检查是否已存在修改配置前先备份原文件部署前先检查当前版本。错误处理每个关键操作都必须有错误处理——命令执行失败时输出有意义的错误信息而非静默失败。使用set -euo pipefail确保脚本在错误时立即退出。日志与审计脚本执行过程必须记录日志包括操作时间、操作对象、执行结果。日志应输出到标准输出供日志采集器收集和本地文件供事后审计。五、总结Shell 和 Python 自动化运维脚本将重复性操作转化为可审计、可重复的自动化流程。批量巡检脚本替代手动检查自动化框架提供任务编排和错误处理能力。脚本工程化的核心要求是幂等性、错误处理和日志审计。自动化不是目的而是手段——目标是减少人为失误、提高操作一致性、沉淀运维知识。