不久后将出现一批程序员,老板问这个需求明天能做好吗?

他偷偷掏出手机,问了下AI,AI拍胸脯说可以。
第二天程序员问 ai,你做完了吗?
ai 说:你谁啊?
我们计划开发一个基于 Node.js 的 Telegram Bot,并部署到 Cloudflare Workers,实现高效、低延迟的无服务器架构。Bot 将集成 xAI Grok 的三个模型:grok-2-1212 用于流式对话,grok-2-vision-1212 用于图片识别,grok-2-image-1212 用于图片生成。通过智能分析用户输入(文本或图片),Bot 自动调用相应模型,无需用户输入指令,简化交互流程。功能包括:文本对话、图片内容识别、根据文本生成图片,并支持流式响应和上下文对话。最终目标是打造一个智能、易用、高性能的 AI 助手,满足用户多样化的需求。
脚本的需求如下:

1. 数据采集:

连接多个 MySQL 服务器,采集所有数据库、数据表、字段信息及前 5 条样本数据。

获取服务器信息、访问记录、日志配置、文件权限、UDF 信息等。



2. 分阶段分析(利用大模型):

表级分析:逐表识别敏感字段、评估安全风险、推测表用途。

库级分析:汇总单库所有表的敏感字段、分析数据库用途。

服务器级分析:结合所有库的分析结果与服务器信息,生成整体安全评估和业务用途报告。



3. 结果导出:

将所有分析结果导出至 Excel。

针对含敏感字段的表,导出全部记录;其他表仅导出样本数据。

总结服务器信息、访问记录、日志配置、文件权限、UDF 信息,并附中文解释。
table_analysis = analyze_tables(db_structure, server_info)
db_summary = summarize_all_databases(db_structure, table_analysis)
server_overall = analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info)
export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall)
logging.info("整体流程完成!")

if name == "main":
main()
"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合",
"总体用途": "每个数据库支撑的 Web 服务总体用途及服务器业务支持情况"
}
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)

def export_field_explanations(writer, sheet_name, df, field_explanations):
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)

def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
export_overall_report(writer, db_summary, server_overall)

server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {"配置项": "日志相关配置项", "值": "对应配置项的值"}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)

if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)

if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)

for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")

def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()
def call_openai_api(prompt):
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""

def analyze_table(db, table, table_data, server_info):
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result

def analyze_tables(db_structure, server_info):
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis

def summarize_database(db, table_analysis):
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result

def summarize_all_databases(db_structure, table_analysis):
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary

def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险及支持的 Web 服务类型。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result

def export_overall_report(writer, db_summary, server_overall):
rows = []
for db, info in db_summary.items():
rows.append({
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

load_dotenv()

DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def connect_db():
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None

def collect_db_info(conn):
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning(f"无法查看访问记录: {e}")

try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning(f"无法查看日志配置: {e}")

try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning(f"无法查看文件权限: {e}")

try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning(f"无法查看 UDF 信息: {e}")

try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error(f"获取数据库列表失败: {e}")
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)

# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)

# 导出各数据表分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
# 如果有敏感字段信息,第一行标记
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

# 采集 MySQL 服务器数据
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()

# 第一步:对每个数据表进行单独分析
table_analysis = analyze_tables(db_structure, server_info)

# 第二步:对每个数据库的所有数据表分析结果进行汇总
db_summary = summarize_all_databases(db_structure, table_analysis)

# 第三步:结合服务器整体信息和各数据库汇总,再进行整体分析
server_overall = analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info)

# 导出所有分析结果到 Excel
export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall)
logging.info("整体流程完成!")


if name == "main":
main()
"""
对整个 MySQL 服务器进行整体分析(第三阶段),
利用大模型结合服务器信息、各数据库汇总信息以及其他采集数据,
输出总体风险描述和服务器功能说明。
返回格式:{ 'server_overall': '整体描述信息' }
"""
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库(对应 Web 服务)的汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险以及服务器支持哪些类型的 Web 服务(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result


# ====== 导出 Excel ======
def export_overall_report(writer, db_summary, server_overall):
"""
导出总体报告工作表,包含各数据库汇总和整体服务器分析,并增加中文解释
"""
rows = []
for db, info in db_summary.items():
rows.append({
"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
# 加入服务器整体分析
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合,提示可能存在的隐私风险",
"总体用途": "对每个数据库支撑的 Web 服务总体用途的描述,及整体服务器的业务支持情况"
}
# 在第一行下增加解释
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""在导出的工作表中,增加字段中文解释行"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
"""
导出 Excel 文件,包含总体报告、服务器信息、日志配置、访问记录、文件权限、UDF 信息,以及各数据表分析结果
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, db_summary, server_overall)

# 导出服务器信息
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

# 导出日志配置
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析函数 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试和请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_table(db, table, table_data, server_info):
"""
分析单个数据表(第一阶段分析),利用大模型分析表中的敏感信息和潜在风险,
并判断该表是否与某个 Web 服务相关。
返回格式:{ 'sensitive_fields': [...], 'table_usage': '可能用途说明' }
"""
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result


def analyze_tables(db_structure, server_info):
"""
对每个数据表进行分析(第一阶段),返回结果:
{ 数据库: { 表: 分析结果, ... }, ... }
"""
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis


def summarize_database(db, table_analysis):
"""
对一个数据库内的所有数据表的分析结果进行汇总(第二阶段),调用大模型进一步分析,
以生成该数据库(对应一个 Web 服务)的总体敏感信息和用途描述。
返回格式:{ 'all_sensitive_fields': [...], 'database_usage': '总体用途说明' }
"""
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result


def summarize_all_databases(db_structure, table_analysis):
"""
对每个数据库进行第二阶段的汇总分析,
返回结果格式:
{ 数据库: { 'all_sensitive_fields': [...], 'database_usage': '总体用途说明', 'tables': { ... } } }
"""
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary


def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立与 MySQL 服务器的连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
采集 MySQL 服务器数据:
- 所有非系统数据库、数据表、字段及前 5 条样本数据
- 服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
# 服务器基本信息
try:
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录: " + str(e))

try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

# 采集所有非系统数据库及其表信息
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)

# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)

# 导出各数据表分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
# 如果有敏感字段信息,第一行标记
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

# 采集 MySQL 服务器数据
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()

# 第一步:对每个数据表进行单独分析
table_analysis = analyze_tables(db_structure, server_info)

# 第二步:对每个数据库的所有数据表分析结果进行汇总
db_summary = summarize_all_databases(db_structure, table_analysis)

# 第三步:结合服务器整体信息和各数据库汇总,再进行整体分析
server_overall = analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info)

# 导出所有分析结果到 Excel
export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall)
logging.info("整体流程完成!")


if name == "main":
main()
"""
对整个 MySQL 服务器进行整体分析(第三阶段),
利用大模型结合服务器信息、各数据库汇总信息以及其他采集数据,
输出总体风险描述和服务器功能说明。
返回格式:{ 'server_overall': '整体描述信息' }
"""
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库(对应 Web 服务)的汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险以及服务器支持哪些类型的 Web 服务(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result


# ====== 导出 Excel ======
def export_overall_report(writer, db_summary, server_overall):
"""
导出总体报告工作表,包含各数据库汇总和整体服务器分析,并增加中文解释
"""
rows = []
for db, info in db_summary.items():
rows.append({
"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
# 加入服务器整体分析
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合,提示可能存在的隐私风险",
"总体用途": "对每个数据库支撑的 Web 服务总体用途的描述,及整体服务器的业务支持情况"
}
# 在第一行下增加解释
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""在导出的工作表中,增加字段中文解释行"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
"""
导出 Excel 文件,包含总体报告、服务器信息、日志配置、访问记录、文件权限、UDF 信息,以及各数据表分析结果
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, db_summary, server_overall)

# 导出服务器信息
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

# 导出日志配置
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析函数 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试和请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_table(db, table, table_data, server_info):
"""
分析单个数据表(第一阶段分析),利用大模型分析表中的敏感信息和潜在风险,
并判断该表是否与某个 Web 服务相关。
返回格式:{ 'sensitive_fields': [...], 'table_usage': '可能用途说明' }
"""
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result


def analyze_tables(db_structure, server_info):
"""
对每个数据表进行分析(第一阶段),返回结果:
{ 数据库: { 表: 分析结果, ... }, ... }
"""
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis


def summarize_database(db, table_analysis):
"""
对一个数据库内的所有数据表的分析结果进行汇总(第二阶段),调用大模型进一步分析,
以生成该数据库(对应一个 Web 服务)的总体敏感信息和用途描述。
返回格式:{ 'all_sensitive_fields': [...], 'database_usage': '总体用途说明' }
"""
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result


def summarize_all_databases(db_structure, table_analysis):
"""
对每个数据库进行第二阶段的汇总分析,
返回结果格式:
{ 数据库: { 'all_sensitive_fields': [...], 'database_usage': '总体用途说明', 'tables': { ... } } }
"""
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary


def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立与 MySQL 服务器的连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
采集 MySQL 服务器数据:
- 所有非系统数据库、数据表、字段及前 5 条样本数据
- 服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
# 服务器基本信息
try:
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录: " + str(e))

try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

# 采集所有非系统数据库及其表信息
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{conn_id}_{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
overall_data = {
"db_structures": {},
"server_infos": {},
"access_logs": {},
"log_configs": {},
"file_privileges": {},
"udf_infos": {}
}
# 遍历每个 MySQL 配置
for config in DB_CONFIGS:
conn_id = config.get("name", config.get("host", "mysql_default"))
conn = connect_db(config)
if not conn:
continue
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
overall_data["db_structures"][conn_id] = db_structure
overall_data["server_infos"][conn_id] = server_info
overall_data["access_logs"][conn_id] = access_logs
overall_data["log_configs"][conn_id] = log_configs
overall_data["file_privileges"][conn_id] = file_privileges
overall_data["udf_infos"][conn_id] = udf_info
conn.close()

analysis_result = analyze_with_openai(overall_data)
export_to_excel_multiple(overall_data, analysis_result)
logging.info("整体流程完成!")


if name == "main":
main()
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel_multiple(overall_data, analysis_result):
"""
导出多个 MySQL 服务器采集的数据到 Excel,包括每个服务器的总体报告、服务器信息、
日志配置、访问记录、文件权限、UDF 信息,以及各数据表及分析结果。
overall_data 为一个字典,包含各服务器采集的信息,结构如下:
{
"db_structures": {conn_id: {数据库: {表: {columns: [...], samples: [...]}, ...}, ...},
"server_infos": {conn_id: {服务器信息}},
"access_logs": {conn_id: [...]},
"log_configs": {conn_id: {...}},
"file_privileges": {conn_id: [...]},
"udf_infos": {conn_id: [...]}
}
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 遍历每个 MySQL 服务器
for conn_id in overall_data["server_infos"]:
# 导出总体报告
export_overall_report(writer, analysis_result, conn_id)

# 导出服务器信息
sheet_name = f"服务器信息_{conn_id}"[:31]
server_df = pd.DataFrame([overall_data["server_infos"][conn_id]])
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置
sheet_name = f"日志配置_{conn_id}"[:31]
log_df = pd.DataFrame(list(overall_data["log_configs"][conn_id].items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if overall_data["access_logs"].get(conn_id):
sheet_name = f"访问记录_{conn_id}"[:31]
access_df = pd.DataFrame(overall_data["access_logs"][conn_id])
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if overall_data["file_privileges"].get(conn_id):
sheet_name = f"文件权限_{conn_id}"[:31]
file_df = pd.DataFrame(overall_data["file_privileges"][conn_id])
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if overall_data["udf_infos"].get(conn_id):
sheet_name = f"UDF信息_{conn_id}"[:31]
udf_df = pd.DataFrame(overall_data["udf_infos"][conn_id])
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
db_structures = overall_data["db_structures"].get(conn_id, {})
for db, tables in db_structures.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(conn_id, {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue

try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试机制及请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_with_openai(data):
"""
使用 OpenAI 分析所有 MySQL 采集的数据,识别敏感信息和潜在渗透风险,
并分析各服务器是否支撑某类 Web 服务及其可能用途。
"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的多个 MySQL 服务器的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据判断敏感信息。"
"\n另外,请分析每个 MySQL 服务器是否支撑某个 Web 服务,以及该 Web 服务器可能的功能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {MySQL服务器标识: {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {MySQL服务器标识: 服务器相关风险描述及 Web 服务功能说明, ...},\n"
" 'access_analysis': {MySQL服务器标识: 访问记录相关风险描述, ...}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result, conn_id):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,可能泄露个人隐私。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务功能及可能用途。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}

report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {}).get(conn_id, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
sheet_name = f"总体报告_{conn_id}"[:31]
report_df.to_excel(writer, sheet_name=sheet_name, index=False)
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
# 如果未提供 MYSQL_CONFIGS 环境变量,则使用单一默认配置
DEFAULT_DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

# MYSQL_CONFIGS 应为 JSON 格式的数组,形如:
# [{"name": "mysql1", "host": "...", "user": "...", "password": "...", "port": 3306, "charset": "utf8mb4"}, {...}]
MYSQL_CONFIGS_JSON = os.getenv("MYSQL_CONFIGS", "")
if MYSQL_CONFIGS_JSON:
try:
DB_CONFIGS = json.loads(MYSQL_CONFIGS_JSON)
except Exception as e:
logging.error("解析 MYSQL_CONFIGS 失败,使用默认配置。错误:" + str(e))
DB_CONFIGS = [DEFAULT_DB_CONFIG]
else:
DB_CONFIGS = [DEFAULT_DB_CONFIG]

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db(config):
"""建立单个数据库连接"""
try:
conn = pymysql.connect(**config)
logging.info(f"数据库连接成功: {config.get('name', config['host'])}")
return conn
except Exception as e:
logging.error(f"数据库连接失败 {config.get('name', config['host'])}: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举数据库信息:包括数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(示例查询,实际可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
# 导出总体报告
export_overall_report(writer, analysis_result)

# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}

# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()
Back to Top