#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MySQL主从同步状态监控脚本 - 每3小时通过cron执行一次 - 监控关键字段:Slave_IO_Running, Slave_SQL_Running, Seconds_Behind_Master - 异常时发送邮件告警 - 完整日志记录 """
import pymysql import smtplib import logging import os import sys import socket from datetime import datetime from email.mime.text import MIMEText from email.header import Header
# ==================== 配置区(按实际修改)==================== # 数据库配置(监控从库) DB_CONFIG = { 'host': '127.0.0.1', # 从库IP 'port': 3306, 'user': '', # 建议创建专用监控账号 'password': '', 'database': 'mysql', # 连接必需库 'charset': 'utf8mb4', 'connect_timeout': 10 }
# 邮件配置(使用126邮箱SMTP) MAIL_CONFIG = { 'smtp_server': '', 'smtp_port': 465, 'sender': '', # 发件人邮箱(需与授权码匹配) 'password': '', # 126邮箱SMTP授权码(非登录密码!) 'receiver': '', 'subject_prefix': '[MySQL同步告警]' }
# 监控阈值 ALERT_THRESHOLD = 1800 # Seconds_Behind_Master 超过300秒告警 LOG_FILE = '/var/log/mysql_slave_monitor.log' # 建议使用有写入权限的路径
# ==================== 日志配置 ==================== os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler(sys.stdout) # cron执行时可选保留,方便调试 ] ) logger = logging.getLogger(__name__)
# ==================== 邮件发送函数 ==================== def send_alert_email(issue_details: str, slave_status: dict): """发送告警邮件""" try: # 构建邮件内容 hostname = socket.gethostname() ip_addr = socket.gethostbyname(hostname) body = f""" 【MySQL主从同步异常告警】 检测时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 服务器:{hostname} ({ip_addr}) 从库地址:{DB_CONFIG['host']}:{DB_CONFIG['port']}
异常详情: {issue_details}
请及时处理! """
msg = MIMEText(body, 'plain', 'utf-8') msg['From'] = Header(MAIL_CONFIG['sender']) msg['To'] = Header(MAIL_CONFIG['receiver']) msg['Subject'] = Header(f"{MAIL_CONFIG['subject_prefix']} 主从同步异常 - {hostname}", 'utf-8')
# 发送邮件(SSL) with smtplib.SMTP_SSL(MAIL_CONFIG['smtp_server'], MAIL_CONFIG['smtp_port'], timeout=15) as smtp: smtp.login(MAIL_CONFIG['sender'], MAIL_CONFIG['password']) smtp.sendmail(MAIL_CONFIG['sender'], [MAIL_CONFIG['receiver']], msg.as_string())
logger.info("告警邮件发送成功") return True except Exception as e: logger.error(f"邮件发送失败: {str(e)}", exc_info=True) return False
# ==================== 主监控逻辑 ==================== def check_replication(): """检查主从同步状态""" conn = None cursor = None issues = [] slave_status = {}
try: # 连接数据库 conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute("SHOW SLAVE STATUS") result = cursor.fetchone()
if not result: err_msg = "未检测到从库配置(SHOW SLAVE STATUS 无结果)" logger.error(err_msg) issues.append(err_msg) else: slave_status = result io_running = result.get('Slave_IO_Running', '').strip() sql_running = result.get('Slave_SQL_Running', '').strip() seconds_behind = result.get('Seconds_Behind_Master')
# 检查IO线程 if io_running != 'Yes': issues.append(f"Slave_IO_Running 异常: {io_running}") logger.warning(f"IO线程异常: {io_running}")
# 检查SQL线程 if sql_running != 'Yes': issues.append(f"Slave_SQL_Running 异常: {sql_running}") logger.warning(f"SQL线程异常: {sql_running}")
# 检查延迟(None视为异常) if seconds_behind is None: issues.append("Seconds_Behind_Master 为 NULL(可能线程已停止)") logger.warning("同步延迟为NULL") elif seconds_behind > ALERT_THRESHOLD: issues.append(f"同步延迟过高: {seconds_behind}s (阈值: {ALERT_THRESHOLD}s)") logger.warning(f"同步延迟: {seconds_behind}s")
# 记录正常状态(便于日志追踪) if not issues: logger.info(f"同步正常 | IO:{io_running} | SQL:{sql_running} | 延迟:{seconds_behind}s") return True
# 存在异常且需告警 if issues: issue_summary = "\n".join([f"• {i}" for i in issues]) logger.error(f"检测到同步异常:\n{issue_summary}") if send_alert_email(issue_summary, slave_status): return False else: logger.critical("告警邮件发送失败,需人工介入!") return False
return True
except pymysql.err.OperationalError as e: logger.error(f"数据库连接失败: {e}", exc_info=True) send_alert_email(f"数据库连接失败: {str(e)}", {}) return False except Exception as e: logger.error(f"监控过程发生未知错误: {str(e)}", exc_info=True) send_alert_email(f"脚本执行异常: {str(e)}", {}) return False finally: if cursor: cursor.close() if conn: conn.close()
# ==================== 程序入口 ==================== if __name__ == '__main__': logger.info("=" * 50) logger.info("开始执行MySQL主从同步状态检查") success = check_replication() status = "✓ 检查完成(状态正常)" if success else "✗ 检查完成(存在异常)" logger.info(status) sys.exit(0 if success else 1)
|