Skip to content
作者:daily5am创建:-更新:-
字数:预计阅读: 分钟访问量:--

监控告警

AI生成声明: 本文档由AI辅助生成,旨在提供监控告警的基础知识和实践指南。

🎯 学习目标

通过本章节的学习,你将能够:

  • 理解监控告警的重要性
  • 掌握监控指标的分类和选择
  • 了解常见的监控工具
  • 学习告警策略和最佳实践

📚 监控体系

监控层次

1. 基础设施监控

  • 服务器监控: CPU、内存、磁盘、网络
  • 网络监控: 带宽、延迟、丢包率
  • 存储监控: 容量、IOPS、读写速度

2. 应用监控

  • 性能监控: 响应时间、吞吐量、错误率
  • 业务监控: 订单量、支付成功率、用户活跃度
  • 日志监控: 错误日志、异常日志、访问日志

3. 用户体验监控

  • 真实用户监控(RUM): 页面加载时间、交互响应时间
  • 合成监控: 模拟用户行为监控
  • 可用性监控: 服务可用性、SLA达成情况

🔍 关键监控指标

系统指标

CPU使用率

python
import psutil

def get_cpu_metrics():
    """获取CPU指标"""
    return {
        'cpu_percent': psutil.cpu_percent(interval=1),
        'cpu_count': psutil.cpu_count(),
        'cpu_load': psutil.getloadavg()  # Linux/Unix
    }

内存使用率

python
def get_memory_metrics():
    """获取内存指标"""
    memory = psutil.virtual_memory()
    return {
        'total': memory.total,
        'available': memory.available,
        'used': memory.used,
        'percent': memory.percent
    }

磁盘使用率

python
def get_disk_metrics():
    """获取磁盘指标"""
    disk = psutil.disk_usage('/')
    return {
        'total': disk.total,
        'used': disk.used,
        'free': disk.free,
        'percent': disk.percent
    }

应用指标

响应时间

python
import time
from functools import wraps

def monitor_response_time(func):
    """监控响应时间装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            response_time = (time.time() - start_time) * 1000  # 转换为毫秒
            
            # 记录指标
            record_metric('response_time', response_time, {
                'function': func.__name__
            })
            
            return result
        except Exception as e:
            # 记录错误
            record_error(func.__name__, str(e))
            raise
    return wrapper

# 使用示例
@monitor_response_time
def process_request(data):
    # 处理请求
    time.sleep(0.1)
    return {'status': 'success'}

错误率

python
class ErrorRateMonitor:
    def __init__(self, window_size=60):
        self.window_size = window_size
        self.errors = []
        self.total_requests = []
    
    def record_request(self, success=True):
        """记录请求"""
        current_time = time.time()
        self.total_requests.append(current_time)
        
        if not success:
            self.errors.append(current_time)
        
        # 清理过期数据
        self._clean_old_data(current_time)
    
    def get_error_rate(self):
        """计算错误率"""
        if len(self.total_requests) == 0:
            return 0.0
        
        window_start = time.time() - self.window_size
        recent_errors = sum(1 for t in self.errors if t > window_start)
        recent_requests = sum(1 for t in self.total_requests if t > window_start)
        
        if recent_requests == 0:
            return 0.0
        
        return recent_errors / recent_requests
    
    def _clean_old_data(self, current_time):
        """清理过期数据"""
        cutoff = current_time - self.window_size
        self.errors = [t for t in self.errors if t > cutoff]
        self.total_requests = [t for t in self.total_requests if t > cutoff]

业务指标

python
class BusinessMetrics:
    def __init__(self):
        self.metrics = {
            'orders': 0,
            'revenue': 0.0,
            'active_users': set(),
            'conversion_rate': 0.0
        }
    
    def record_order(self, user_id, amount):
        """记录订单"""
        self.metrics['orders'] += 1
        self.metrics['revenue'] += amount
        self.metrics['active_users'].add(user_id)
    
    def calculate_conversion_rate(self, total_visitors):
        """计算转化率"""
        if total_visitors == 0:
            return 0.0
        self.metrics['conversion_rate'] = len(self.metrics['active_users']) / total_visitors
        return self.metrics['conversion_rate']

🏗️ 监控工具

Prometheus + Grafana

Prometheus配置

yaml
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'app'
    static_configs:
      - targets: ['localhost:8080']
    metrics_path: '/metrics'

Python应用集成

python
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 定义指标
request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration')
active_connections = Gauge('active_connections', 'Number of active connections')

# 记录指标
def handle_request(method, endpoint):
    request_count.labels(method=method, endpoint=endpoint).inc()
    
    with request_duration.time():
        # 处理请求
        process_request()
    
    active_connections.inc()

# 启动Prometheus指标服务器
start_http_server(8000)

ELK Stack

Elasticsearch + Logstash + Kibana

python
import logging
from pythonjsonlogger import jsonlogger

# 配置JSON日志
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)

logger = logging.getLogger()
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)

# 记录日志
logger.info("User logged in", extra={
    'user_id': '12345',
    'ip': '192.168.1.1',
    'timestamp': '2024-01-20T10:00:00Z'
})

自定义监控

python
import time
import threading
from collections import defaultdict

class SimpleMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.lock = threading.Lock()
    
    def record(self, metric_name, value, tags=None):
        """记录指标"""
        with self.lock:
            self.metrics[metric_name].append({
                'value': value,
                'timestamp': time.time(),
                'tags': tags or {}
            })
    
    def get_metrics(self, metric_name, window_size=300):
        """获取指标数据"""
        with self.lock:
            cutoff = time.time() - window_size
            recent_metrics = [
                m for m in self.metrics[metric_name]
                if m['timestamp'] > cutoff
            ]
            return recent_metrics
    
    def get_statistics(self, metric_name, window_size=300):
        """获取统计信息"""
        metrics = self.get_metrics(metric_name, window_size)
        if not metrics:
            return None
        
        values = [m['value'] for m in metrics]
        return {
            'count': len(values),
            'min': min(values),
            'max': max(values),
            'avg': sum(values) / len(values),
            'sum': sum(values)
        }

🚨 告警策略

告警规则

阈值告警

python
class AlertManager:
    def __init__(self):
        self.rules = [
            {
                'metric': 'cpu_usage',
                'threshold': 80,
                'operator': '>',
                'severity': 'warning'
            },
            {
                'metric': 'memory_usage',
                'threshold': 90,
                'operator': '>',
                'severity': 'critical'
            },
            {
                'metric': 'error_rate',
                'threshold': 0.05,
                'operator': '>',
                'severity': 'warning'
            }
        ]
    
    def check_alerts(self, metrics):
        """检查告警"""
        alerts = []
        
        for rule in self.rules:
            metric_name = rule['metric']
            threshold = rule['threshold']
            operator = rule['operator']
            severity = rule['severity']
            
            value = metrics.get(metric_name)
            if value is None:
                continue
            
            if self._compare(value, operator, threshold):
                alerts.append({
                    'metric': metric_name,
                    'value': value,
                    'threshold': threshold,
                    'severity': severity,
                    'message': f"{metric_name} {operator} {threshold}"
                })
        
        return alerts
    
    def _compare(self, value, operator, threshold):
        """比较操作"""
        if operator == '>':
            return value > threshold
        elif operator == '<':
            return value < threshold
        elif operator == '>=':
            return value >= threshold
        elif operator == '<=':
            return value <= threshold
        elif operator == '==':
            return value == threshold
        return False

告警抑制

python
class AlertSuppressor:
    def __init__(self):
        self.active_alerts = {}
        self.suppression_rules = {
            'cpu_usage': 300,  # 5分钟内不重复告警
            'memory_usage': 300,
            'error_rate': 60  # 1分钟内不重复告警
        }
    
    def should_alert(self, alert):
        """判断是否应该发送告警"""
        metric_name = alert['metric']
        last_alert_time = self.active_alerts.get(metric_name, 0)
        suppression_window = self.suppression_rules.get(metric_name, 300)
        
        current_time = time.time()
        
        if current_time - last_alert_time > suppression_window:
            self.active_alerts[metric_name] = current_time
            return True
        
        return False

告警通知

python
class AlertNotifier:
    def __init__(self):
        self.channels = {
            'email': self.send_email,
            'sms': self.send_sms,
            'webhook': self.send_webhook,
            'slack': self.send_slack
        }
    
    def notify(self, alert, channels=None):
        """发送告警通知"""
        if channels is None:
            channels = ['email', 'webhook']
        
        for channel in channels:
            if channel in self.channels:
                try:
                    self.channels[channel](alert)
                except Exception as e:
                    print(f"告警发送失败 {channel}: {e}")
    
    def send_email(self, alert):
        """发送邮件"""
        # 实现邮件发送逻辑
        print(f"发送邮件告警: {alert['message']}")
    
    def send_webhook(self, alert):
        """发送Webhook"""
        import requests
        requests.post('https://webhook.url', json=alert)
    
    def send_slack(self, alert):
        """发送Slack通知"""
        import requests
        payload = {
            'text': f"🚨 {alert['severity'].upper()}: {alert['message']}"
        }
        requests.post('https://hooks.slack.com/...', json=payload)

📊 监控面板

Grafana仪表板配置

json
{
  "dashboard": {
    "title": "应用监控",
    "panels": [
      {
        "title": "CPU使用率",
        "targets": [
          {
            "expr": "cpu_usage_percent",
            "legendFormat": "{{instance}}"
          }
        ]
      },
      {
        "title": "响应时间",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, http_request_duration_seconds_bucket)",
            "legendFormat": "P95"
          }
        ]
      }
    ]
  }
}

⚠️ 最佳实践

1. 告警规则设计

  • 避免告警风暴: 合理设置告警阈值和抑制
  • 分级告警: 根据严重程度分级
  • 告警聚合: 相关告警聚合发送

2. 监控覆盖

  • 全链路监控: 覆盖所有关键路径
  • 关键指标: 重点监控核心指标
  • 业务指标: 监控业务相关指标

3. 性能考虑

  • 采样: 高频率指标适当采样
  • 聚合: 在采集端聚合减少数据量
  • 存储: 合理设置数据保留策略

📖 推荐资源

💡 下一步


最后更新时间: 2025-01-20