#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
K8s 巡检工具
功能:全面检查 K8s 集群健康状态,生成炫酷 HTML + Word 报告
依赖:kubernetes, jinja2, requests, pyyaml, python-docx
用法:python k8sinspect.py [--namespace NAMESPACE] [--all-namespaces] [--format {html,word,both}]
"""

import os
import sys
import json
import time
import base64
import argparse
import subprocess
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Tuple, Optional, Any

# 第三方库导入检查
missing_libs = []
try:
    from kubernetes import client, config
    from kubernetes.client.rest import ApiException
except ImportError:
    missing_libs.append("kubernetes")
try:
    import jinja2
except ImportError:
    missing_libs.append("jinja2")
try:
    import requests
except ImportError:
    missing_libs.append("requests")
try:
    import yaml
except ImportError:
    missing_libs.append("pyyaml")
try:
    from docx import Document
    from docx.shared import Inches, Pt, RGBColor
    from docx.enum.text import WD_ALIGN_PARAGRAPH
    from docx.enum.table import WD_TABLE_ALIGNMENT
    from docx.oxml.ns import qn
    from docx.oxml import OxmlElement
except ImportError:
    missing_libs.append("python-docx")

if missing_libs:
    print(f"缺少依赖库: {', '.join(missing_libs)}")
    print("请执行: pip install kubernetes jinja2 requests pyyaml python-docx")
    sys.exit(1)

# =============================================================
# 全局配置
# =============================================================
REPORT_DIR = os.environ.get("REPORT_DIR", "/tmp/k8s-inspect")
THRESHOLD_CPU = 80          # CPU 告警阈值 (%)
THRESHOLD_MEM = 80          # 内存告警阈值 (%)
THRESHOLD_DISK = 85         # 磁盘告警阈值 (%)
THRESHOLD_RESTART = 5       # Pod 重启次数告警阈值
CERT_WARN_DAYS = 30         # 证书到期提前告警天数

# 告警 Webhook (可选)
DINGTALK_WEBHOOK = os.environ.get("DINGTALK_WEBHOOK", "")
WECOM_WEBHOOK = os.environ.get("WECOM_WEBHOOK", "")

# 确保报告目录存在
os.makedirs(REPORT_DIR, exist_ok=True)

# =============================================================
# HTML 模板 (Jinja2)
# =============================================================
HTML_TEMPLATE = """





K8s 巡检报告



🔍 K8s 集群巡检报告
生成时间:{{ timestamp }}
{{ node_total }}
总节点数
{{ node_ready }}
就绪节点
{{ pod_total }}
总 Pod 数
{{ ns_total }}
命名空间数
🖥️ 节点健康检查
{% for node in nodes %} {% endfor %}
节点名称状态角色K8s 版本CPU 使用内存使用运行时间内核版本
{{ node.name }} {{ node.status }} {{ node.role }} {{ node.version }} {{ node.cpu_usage }} {{ node.mem_usage }} {{ node.uptime }} {{ node.kernel }}
🚀 Pod 状态检查
{{ pod_stats.running }}
Running
{{ pod_stats.abnormal }}
异常 Pod
{{ pod_stats.pending }}
Pending
{% for pod in abnormal_pods %} {% endfor %}
命名空间Pod 名称状态重启次数运行时间所在节点镜像
{{ pod.namespace }} {{ pod.name }} {{ pod.status }} {{ pod.restarts }} {{ pod.age }} {{ pod.node }} {{ pod.image }}
💰 资源配额 & 存储检查
{% for quota in resource_quotas %} {% endfor %}
命名空间配额名称CPU 请求CPU 限制内存请求内存限制Pod 数量
{{ quota.namespace }} {{ quota.name }} {{ quota.cpu_req }} {{ quota.cpu_limit }} {{ quota.mem_req }} {{ quota.mem_limit }} {{ quota.pods }}

{% for pvc in pvcs %} {% endfor %}
命名空间PVC 名称状态容量存储类访问模式
{{ pvc.namespace }} {{ pvc.name }} {{ pvc.status }} {{ pvc.capacity }} {{ pvc.storage_class }} {{ pvc.access_modes }}
🔒 证书安全检查
{% for cert in certificates %} {% endfor %}
证书名称到期时间剩余天数状态
{{ cert.name }} {{ cert.expire_date }} {{ cert.days_left }} 天 {{ cert.status_text }}
🌐 网络组件检查
{% for comp in network_components %} {% endfor %}
组件命名空间期望副本就绪副本状态
{{ comp.name }} {{ comp.namespace }} {{ comp.desired }} {{ comp.ready }} {{ comp.status_text }}
⚡ 近期异常事件(最近 1 小时)
{% for event in warning_events %} {% endfor %}
时间命名空间类型对象原因消息
{{ event.last_timestamp }} {{ event.namespace }} Warning {{ event.object }} {{ event.reason }} {{ event.message }}
巡检时间:{{ timestamp }} | 报告由 k8s_inspect.py 自动生成
""" # ============================================================= # K8s 巡检类 # ============================================================= class K8sInspector: def __init__(self, namespace: str = None, all_namespaces: bool = True): # 加载 kubeconfig try: config.load_incluster_config() except config.ConfigException: config.load_kube_config() self.core_v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() self.batch_v1 = client.BatchV1Api() self.storage_v1 = client.StorageV1Api() self.networking_v1 = client.NetworkingV1Api() self.rbac_v1 = client.RbacAuthorizationV1Api() self.custom_objects = client.CustomObjectsApi() self.namespace = namespace self.all_namespaces = all_namespaces self.report_data = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "node_total": 0, "node_ready": 0, "pod_total": 0, "ns_total": 0, "nodes": [], "pod_stats": {"running": 0, "abnormal": 0, "pending": 0}, "abnormal_pods": [], "resource_quotas": [], "pvcs": [], "certificates": [], "network_components": [], "warning_events": [] } # 收集基础统计数据 self._collect_basic_stats() def _collect_basic_stats(self): """收集节点、Pod、命名空间总数""" try: nodes = self.core_v1.list_node() self.report_data["node_total"] = len(nodes.items) self.report_data["node_ready"] = sum( 1 for n in nodes.items if any(c.type == "Ready" and c.status == "True" for c in n.status.conditions) ) except ApiException: pass try: pods = self.core_v1.list_pod_for_all_namespaces() self.report_data["pod_total"] = len(pods.items) except ApiException: pass try: ns = self.core_v1.list_namespace() self.report_data["ns_total"] = len(ns.items) except ApiException: pass def _get_node_metrics(self) -> Dict[str, Dict]: """通过 metrics.k8s.io 获取节点 CPU/内存使用率""" metrics = {} try: result = self.custom_objects.list_cluster_custom_object( group="metrics.k8s.io", version="v1beta1", plural="nodes" ) for item in result.get("items", []): name = item["metadata"]["name"] cpu = item["usage"]["cpu"] mem = item["usage"]["memory"] metrics[name] = {"cpu": cpu, "memory": mem} except Exception: pass return metrics def check_nodes(self): """节点健康检查""" print("[INFO] 检查节点健康状态...") nodes = [] node_metrics = self._get_node_metrics() try: node_list = self.core_v1.list_node() for node in node_list.items: name = node.metadata.name status = "NotReady" conditions = {c.type: c.status for c in node.status.conditions} if conditions.get("Ready") == "True": status = "Ready" # 角色 role = "worker" if "node-role.kubernetes.io/control-plane" in node.metadata.labels or \ "node-role.kubernetes.io/master" in node.metadata.labels: role = "control-plane" # 版本信息 version = node.status.node_info.kubelet_version kernel = node.status.node_info.kernel_version # 运行时间 uptime = "N/A" if node.metadata.creation_timestamp: delta = datetime.now(node.metadata.creation_timestamp.tzinfo) - node.metadata.creation_timestamp days = delta.days hours = delta.seconds // 3600 uptime = f"{days}d{hours}h" # CPU/内存使用 cpu_usage = "N/A" mem_usage = "N/A" if name in node_metrics: cpu_usage = node_metrics[name]["cpu"] mem_usage = node_metrics[name]["memory"] status_class = "ok" if status == "Ready" else "err" nodes.append({ "name": name, "status": status, "status_class": status_class, "role": role, "version": version, "cpu_usage": cpu_usage, "mem_usage": mem_usage, "uptime": uptime, "kernel": kernel }) except ApiException as e: print(f"[ERROR] 获取节点列表失败: {e}") self.report_data["nodes"] = nodes def check_pods(self): """Pod 状态检查""" print("[INFO] 检查 Pod 状态...") abnormal_pods = [] running = 0 pending = 0 abnormal = 0 try: if self.all_namespaces: pods = self.core_v1.list_pod_for_all_namespaces() else: pods = self.core_v1.list_namespaced_pod(namespace=self.namespace or "default") for pod in pods.items: status = pod.status.phase if status == "Running": running += 1 elif status == "Pending": pending += 1 abnormal += 1 elif status in ["Failed", "Unknown"]: abnormal += 1 # 检查容器状态中的异常 container_statuses = pod.status.container_statuses or [] for cs in container_statuses: if cs.state.waiting: reason = cs.state.waiting.reason if reason in ["CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull", "OOMKilled"]: abnormal += 1 if status == "Running": running -= 1 break # 收集非 Running 或异常 Pod if status != "Running" or any( cs.state.waiting and cs.state.waiting.reason in ["CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull"] for cs in (pod.status.container_statuses or []) ): pod_status = pod.status.phase restarts = sum(cs.restart_count for cs in (pod.status.container_statuses or [])) age = "N/A" if pod.status.start_time: delta = datetime.now(pod.status.start_time.tzinfo) - pod.status.start_time age = f"{delta.days}d{delta.seconds//3600}h" if delta.days > 0 else f"{delta.seconds//60}m" node = pod.spec.node_name or "N/A" image = "N/A" if pod.spec.containers: image = pod.spec.containers[0].image status_class = "warn" if pod_status in ["Failed", "Unknown"]: status_class = "err" for cs in (pod.status.container_statuses or []): if cs.state.waiting and cs.state.waiting.reason in ["CrashLoopBackOff", "OOMKilled", "ImagePullBackOff"]: status_class = "err" pod_status = cs.state.waiting.reason break abnormal_pods.append({ "namespace": pod.metadata.namespace, "name": pod.metadata.name, "status": pod_status, "status_class": status_class, "restarts": restarts, "age": age, "node": node, "image": image }) self.report_data["pod_stats"] = { "running": running, "abnormal": abnormal, "pending": pending } self.report_data["abnormal_pods"] = abnormal_pods except ApiException as e: print(f"[ERROR] 获取 Pod 列表失败: {e}") def check_resources(self): """资源配额和 PVC 检查""" print("[INFO] 检查资源配额与存储...") quotas = [] pvcs = [] try: # ResourceQuota if self.all_namespaces: quota_list = self.core_v1.list_resource_quota_for_all_namespaces() else: quota_list = self.core_v1.list_namespaced_resource_quota(namespace=self.namespace or "default") for quota in quota_list.items: ns = quota.metadata.namespace name = quota.metadata.name hard = quota.status.hard or {} used = quota.status.used or {} def format_quota(key): u = used.get(key, "0") h = hard.get(key, "0") return f"{u}/{h}" if h != "0" else "N/A" quotas.append({ "namespace": ns, "name": name, "cpu_req": format_quota("requests.cpu"), "cpu_limit": format_quota("limits.cpu"), "mem_req": format_quota("requests.memory"), "mem_limit": format_quota("limits.memory"), "pods": format_quota("pods") }) except ApiException as e: print(f"[WARN] 获取 ResourceQuota 失败: {e}") try: # PVC if self.all_namespaces: pvc_list = self.core_v1.list_persistent_volume_claim_for_all_namespaces() else: pvc_list = self.core_v1.list_namespaced_persistent_volume_claim(namespace=self.namespace or "default") for pvc in pvc_list.items: status = pvc.status.phase capacity = pvc.status.capacity.get("storage", "N/A") if pvc.status.capacity else "N/A" sc = pvc.spec.storage_class_name or "N/A" access_modes = ", ".join(pvc.spec.access_modes) if pvc.spec.access_modes else "N/A" status_class = "ok" if status == "Bound" else "err" pvcs.append({ "namespace": pvc.metadata.namespace, "name": pvc.metadata.name, "status": status, "status_class": status_class, "capacity": capacity, "storage_class": sc, "access_modes": access_modes }) except ApiException as e: print(f"[WARN] 获取 PVC 失败: {e}") self.report_data["resource_quotas"] = quotas self.report_data["pvcs"] = pvcs def check_certificates(self): """证书到期检查""" print("[INFO] 检查证书到期时间...") certs = [] # 检查 kubeadm 证书(如果可用) try: result = subprocess.run( ["kubeadm", "certs", "check-expiration"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: lines = result.stdout.strip().split("\n") for line in lines[1:]: # 跳过表头 parts = line.split() if len(parts) >= 7: name = parts[0] expire_date = f"{parts[2]} {parts[3]} {parts[4]}" residual = parts[6] # e.g., "364d" days_left = int(''.join(filter(str.isdigit, residual))) if residual else 0 cls = "err" if days_left <= 7 else ("warn" if days_left <= CERT_WARN_DAYS else "ok") status_text = "紧急" if days_left <= 7 else ("警告" if days_left <= CERT_WARN_DAYS else "正常") certs.append({ "name": name, "expire_date": expire_date, "days_left": days_left, "class": cls, "status_text": status_text }) except Exception as e: print(f"[WARN] kubeadm 证书检查失败: {e}") # 检查 Ingress TLS 证书 try: secrets = self.core_v1.list_secret_for_all_namespaces(field_selector="type=kubernetes.io/tls") for secret in secrets.items: if "tls.crt" not in secret.data: continue cert_data = base64.b64decode(secret.data["tls.crt"]).decode("utf-8") # 使用 openssl 解析 proc = subprocess.run( ["openssl", "x509", "-noout", "-enddate"], input=cert_data, capture_output=True, text=True ) if proc.returncode == 0: enddate_str = proc.stdout.strip().split("=")[1] expire_ts = datetime.strptime(enddate_str, "%b %d %H:%M:%S %Y %Z") now = datetime.now(expire_ts.tzinfo) days_left = (expire_ts - now).days cls = "err" if days_left <= 7 else ("warn" if days_left <= CERT_WARN_DAYS else "ok") status_text = "紧急" if days_left <= 7 else ("警告" if days_left <= CERT_WARN_DAYS else "正常") certs.append({ "name": f"TLS:{secret.metadata.namespace}/{secret.metadata.name}", "expire_date": enddate_str, "days_left": days_left, "class": cls, "status_text": status_text }) except Exception as e: print(f"[WARN] Ingress TLS 证书检查失败: {e}") self.report_data["certificates"] = certs def check_network(self): """网络组件检查""" print("[INFO] 检查网络组件...") components = [] target_components = ["coredns", "kube-proxy", "calico-node", "flannel", "cilium"] try: # 获取所有 Deployment 和 DaemonSet deps = self.apps_v1.list_deployment_for_all_namespaces() dss = self.apps_v1.list_daemon_set_for_all_namespaces() all_workloads = [] for dep in deps.items: all_workloads.append({ "kind": "Deployment", "name": dep.metadata.name, "namespace": dep.metadata.namespace, "desired": dep.spec.replicas, "ready": dep.status.ready_replicas or 0 }) for ds in dss.items: all_workloads.append({ "kind": "DaemonSet", "name": ds.metadata.name, "namespace": ds.metadata.namespace, "desired": ds.status.desired_number_scheduled, "ready": ds.status.number_ready or 0 }) for wl in all_workloads: if any(comp in wl["name"].lower() for comp in target_components): cls = "ok" if wl["ready"] >= wl["desired"] else "warn" status_text = "正常" if cls == "ok" else "降级" components.append({ "name": wl["name"], "namespace": wl["namespace"], "desired": wl["desired"], "ready": wl["ready"], "class": cls, "status_text": status_text }) except ApiException as e: print(f"[WARN] 网络组件检查失败: {e}") self.report_data["network_components"] = components def check_events(self): """近期 Warning 事件""" print("[INFO] 检查近期异常事件...") events_data = [] try: events = self.core_v1.list_event_for_all_namespaces( field_selector="type=Warning", limit=50 ) # 按时间排序(最新的在前面) sorted_events = sorted( events.items, key=lambda e: e.last_timestamp or e.event_time or datetime.min, reverse=True ) # 过滤最近1小时 one_hour_ago = datetime.now(sorted_events[0].last_timestamp.tzinfo) - timedelta(hours=1) if sorted_events else None for event in sorted_events: if one_hour_ago and event.last_timestamp and event.last_timestamp < one_hour_ago: continue events_data.append({ "last_timestamp": event.last_timestamp.strftime("%H:%M:%S") if event.last_timestamp else "N/A", "namespace": event.metadata.namespace, "object": f"{event.involved_kind}/{event.involved_name}", "reason": event.reason, "message": (event.message or "")[:80] + ("..." if len(event.message or "") > 80 else "") }) except ApiException as e: print(f"[WARN] 获取事件失败: {e}") self.report_data["warning_events"] = events_data def generate_html_report(self) -> str: """生成 HTML 报告""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_file = os.path.join(REPORT_DIR, f"k8s_report_{timestamp}.html") template = jinja2.Template(HTML_TEMPLATE) html_content = template.render(**self.report_data) with open(report_file, "w", encoding="utf-8") as f: f.write(html_content) print(f"[INFO] HTML 报告已生成: {report_file}") return report_file def generate_word_report(self) -> str: """生成 Word 报告 (.docx)""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_file = os.path.join(REPORT_DIR, f"k8s_report_{timestamp}.docx") doc = Document() # 设置文档默认字体 style = doc.styles['Normal'] style.font.name = '微软雅黑' style._element.rPr.rFonts.set(qn('w:eastAsia'), '微软雅黑') style.font.size = Pt(10) # 标题 title = doc.add_heading('K8s 集群巡检报告', level=1) title.alignment = WD_ALIGN_PARAGRAPH.CENTER run = title.runs[0] run.font.color.rgb = RGBColor(56, 189, 248) run.font.size = Pt(28) run.font.bold = True subtitle = doc.add_paragraph(f"生成时间:{self.report_data['timestamp']}") subtitle.alignment = WD_ALIGN_PARAGRAPH.CENTER subtitle.runs[0].font.color.rgb = RGBColor(148, 163, 184) doc.add_paragraph() # 概览统计表格 summary_table = doc.add_table(rows=1, cols=4) summary_table.style = 'Table Grid' summary_table.alignment = WD_TABLE_ALIGNMENT.CENTER headers = ['总节点数', '就绪节点', '总 Pod 数', '命名空间数'] values = [ self.report_data['node_total'], self.report_data['node_ready'], self.report_data['pod_total'], self.report_data['ns_total'] ] hdr_cells = summary_table.rows[0].cells for i, header in enumerate(headers): hdr_cells[i].text = header hdr_cells[i].paragraphs[0].runs[0].font.bold = True hdr_cells[i].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER row_cells = summary_table.add_row().cells for i, val in enumerate(values): row_cells[i].text = str(val) row_cells[i].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER doc.add_paragraph() # 辅助函数:添加章节标题 def add_section_title(text): heading = doc.add_heading(text, level=2) run = heading.runs[0] run.font.color.rgb = RGBColor(56, 189, 248) run.font.size = Pt(18) return heading # 辅助函数:设置单元格颜色(通过背景色) def set_cell_background(cell, hex_color): shading_elm = OxmlElement('w:shd') shading_elm.set(qn('w:fill'), hex_color) cell._tc.get_or_add_tcPr().append(shading_elm) # 1. 节点健康检查 add_section_title('🖥️ 节点健康检查') if self.report_data['nodes']: node_table = doc.add_table(rows=1, cols=8) node_table.style = 'Table Grid' node_headers = ['节点名称', '状态', '角色', 'K8s版本', 'CPU使用', '内存使用', '运行时间', '内核版本'] for i, h in enumerate(node_headers): node_table.rows[0].cells[i].text = h node_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for node in self.report_data['nodes']: cells = node_table.add_row().cells cells[0].text = node['name'] cells[1].text = node['status'] cells[2].text = node['role'] cells[3].text = node['version'] cells[4].text = node['cpu_usage'] cells[5].text = node['mem_usage'] cells[6].text = node['uptime'] cells[7].text = node['kernel'] # 状态颜色标记 if node['status_class'] == 'ok': cells[1].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) elif node['status_class'] == 'err': cells[1].paragraphs[0].runs[0].font.color.rgb = RGBColor(239, 68, 68) else: cells[1].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) else: doc.add_paragraph("无节点数据") doc.add_paragraph() # 2. Pod 状态检查 add_section_title('🚀 Pod 状态检查') stats = self.report_data['pod_stats'] pod_summary = doc.add_paragraph() pod_summary.add_run(f"Running: {stats['running']} ").bold = True pod_summary.add_run(f"异常 Pod: {stats['abnormal']} ").font.color.rgb = RGBColor(239, 68, 68) pod_summary.add_run(f"Pending: {stats['pending']}").font.color.rgb = RGBColor(245, 158, 11) if self.report_data['abnormal_pods']: pod_table = doc.add_table(rows=1, cols=7) pod_table.style = 'Table Grid' pod_headers = ['命名空间', 'Pod名称', '状态', '重启次数', '运行时间', '所在节点', '镜像'] for i, h in enumerate(pod_headers): pod_table.rows[0].cells[i].text = h pod_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for pod in self.report_data['abnormal_pods']: cells = pod_table.add_row().cells cells[0].text = pod['namespace'] cells[1].text = pod['name'] cells[2].text = pod['status'] cells[3].text = str(pod['restarts']) cells[4].text = pod['age'] cells[5].text = pod['node'] cells[6].text = pod['image'] if pod['status_class'] == 'err': cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(239, 68, 68) elif pod['status_class'] == 'warn': cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) else: cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) else: doc.add_paragraph("无异常 Pod") doc.add_paragraph() # 3. 资源配额 add_section_title('💰 资源配额检查') if self.report_data['resource_quotas']: quota_table = doc.add_table(rows=1, cols=7) quota_table.style = 'Table Grid' quota_headers = ['命名空间', '配额名称', 'CPU请求', 'CPU限制', '内存请求', '内存限制', 'Pod数量'] for i, h in enumerate(quota_headers): quota_table.rows[0].cells[i].text = h quota_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for q in self.report_data['resource_quotas']: cells = quota_table.add_row().cells cells[0].text = q['namespace'] cells[1].text = q['name'] cells[2].text = q['cpu_req'] cells[3].text = q['cpu_limit'] cells[4].text = q['mem_req'] cells[5].text = q['mem_limit'] cells[6].text = q['pods'] else: doc.add_paragraph("无 ResourceQuota 配置") doc.add_paragraph() # 4. PVC 状态 add_section_title('💾 PVC 状态检查') if self.report_data['pvcs']: pvc_table = doc.add_table(rows=1, cols=6) pvc_table.style = 'Table Grid' pvc_headers = ['命名空间', 'PVC名称', '状态', '容量', '存储类', '访问模式'] for i, h in enumerate(pvc_headers): pvc_table.rows[0].cells[i].text = h pvc_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for p in self.report_data['pvcs']: cells = pvc_table.add_row().cells cells[0].text = p['namespace'] cells[1].text = p['name'] cells[2].text = p['status'] cells[3].text = p['capacity'] cells[4].text = p['storage_class'] cells[5].text = p['access_modes'] if p['status_class'] == 'ok': cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) else: cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(239, 68, 68) else: doc.add_paragraph("无 PVC 资源") doc.add_paragraph() # 5. 证书检查 add_section_title('🔒 证书安全检查') if self.report_data['certificates']: cert_table = doc.add_table(rows=1, cols=4) cert_table.style = 'Table Grid' cert_headers = ['证书名称', '到期时间', '剩余天数', '状态'] for i, h in enumerate(cert_headers): cert_table.rows[0].cells[i].text = h cert_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for c in self.report_data['certificates']: cells = cert_table.add_row().cells cells[0].text = c['name'] cells[1].text = c['expire_date'] cells[2].text = f"{c['days_left']} 天" cells[3].text = c['status_text'] if c['class'] == 'err': cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(239, 68, 68) cells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(239, 68, 68) elif c['class'] == 'warn': cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) cells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) else: cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) cells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) else: doc.add_paragraph("无证书信息") doc.add_paragraph() # 6. 网络组件 add_section_title('🌐 网络组件检查') if self.report_data['network_components']: net_table = doc.add_table(rows=1, cols=5) net_table.style = 'Table Grid' net_headers = ['组件', '命名空间', '期望副本', '就绪副本', '状态'] for i, h in enumerate(net_headers): net_table.rows[0].cells[i].text = h net_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for comp in self.report_data['network_components']: cells = net_table.add_row().cells cells[0].text = comp['name'] cells[1].text = comp['namespace'] cells[2].text = str(comp['desired']) cells[3].text = str(comp['ready']) cells[4].text = comp['status_text'] if comp['class'] == 'ok': cells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) cells[4].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 208, 132) else: cells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) cells[4].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) else: doc.add_paragraph("无网络组件信息") doc.add_paragraph() # 7. 异常事件 add_section_title('⚡ 近期异常事件(最近 1 小时)') if self.report_data['warning_events']: event_table = doc.add_table(rows=1, cols=6) event_table.style = 'Table Grid' event_headers = ['时间', '命名空间', '类型', '对象', '原因', '消息'] for i, h in enumerate(event_headers): event_table.rows[0].cells[i].text = h event_table.rows[0].cells[i].paragraphs[0].runs[0].font.bold = True for e in self.report_data['warning_events']: cells = event_table.add_row().cells cells[0].text = e['last_timestamp'] cells[1].text = e['namespace'] cells[2].text = 'Warning' cells[3].text = e['object'] cells[4].text = e['reason'] cells[5].text = e['message'] cells[2].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) cells[4].paragraphs[0].runs[0].font.color.rgb = RGBColor(245, 158, 11) else: doc.add_paragraph("无 Warning 事件") # 页脚 doc.add_paragraph() footer = doc.add_paragraph(f"巡检时间:{self.report_data['timestamp']} | 报告由 k8s_inspect.py 自动生成") footer.alignment = WD_ALIGN_PARAGRAPH.CENTER footer.runs[0].font.color.rgb = RGBColor(71, 85, 105) footer.runs[0].font.size = Pt(9) doc.save(report_file) print(f"[INFO] Word 报告已生成: {report_file}") return report_file def run_full_inspection(self, output_format: str = "both"): """执行完整巡检""" print("=" * 60) print("开始 K8s 全面巡检...") print("=" * 60) self.check_nodes() self.check_pods() self.check_resources() self.check_certificates() self.check_network() self.check_events() reports = {} if output_format in ["html", "both"]: reports["html"] = self.generate_html_report() if output_format in ["word", "both"]: reports["word"] = self.generate_word_report() # 发送告警(如果有异常) if DINGTALK_WEBHOOK or WECOM_WEBHOOK: self._send_alerts() return reports def _send_alerts(self): """发送钉钉/企微告警""" abnormal_count = self.report_data["pod_stats"]["abnormal"] if abnormal_count == 0: return msg = f"""## ⚠️ K8s 巡检告警 > 发现 **{abnormal_count}** 个异常 Pod,请及时处理! > > 巡检时间:{self.report_data['timestamp']} > 完整报告:http://your-nginx/reports/latest.html""" if DINGTALK_WEBHOOK: try: requests.post(DINGTALK_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"title": "K8s 集群异常告警", "text": msg} }, timeout=5) except Exception as e: print(f"[WARN] 钉钉告警发送失败: {e}") if WECOM_WEBHOOK: try: requests.post(WECOM_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"content": msg} }, timeout=5) except Exception as e: print(f"[WARN] 企业微信告警发送失败: {e}") # ============================================================= # 命令行入口 # ============================================================= def main(): parser = argparse.ArgumentParser(description="K8s 全面巡检脚本") parser.add_argument("--namespace", "-n", help="指定命名空间(默认所有命名空间)") parser.add_argument("--all-namespaces", "-A", action="store_true", default=True, help="检查所有命名空间(默认)") parser.add_argument("--output-dir", "-o", help="报告输出目录") parser.add_argument("--format", "-f", choices=["html", "word", "both"], default="both", help="输出报告格式 (默认: both)") args = parser.parse_args() if args.output_dir: global REPORT_DIR REPORT_DIR = args.output_dir os.makedirs(REPORT_DIR, exist_ok=True) inspector = K8sInspector( namespace=args.namespace, all_namespaces=args.all_namespaces ) reports = inspector.run_full_inspection(output_format=args.format) print("\n✅ 巡检完成!") for fmt, path in reports.items(): print(f"{fmt.upper()} 报告: {path}") if __name__ == "__main__": main()

使用示例

# 安装依赖
pip install kubernetes jinja2 requests pyyaml python-docx

# 默认生成 HTML + Word 两种报告
python3 k8sinspect.py

# 仅生成 HTML 报告
python3 k8s_inspect.py -f html

# 仅生成 Word 报告,并指定输出目录
python3 k8sinspect.py -f word -o /data/k8s-reports

# 检查指定命名空间
python3 k8sinspect.py -n default

报告效果预览

  • HTML 报告:深色仪表盘风格,炫酷直观。
  • Word 报告:白底黑字专业文档风格,关键状态用彩色字体标注,适合正式汇报和存档。

原文地址: https://www.cveoy.top/t/topic/qGtp 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录