"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合",
"总体用途": "每个数据库支撑的 Web 服务总体用途及服务器业务支持情况"
}
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)

def export_field_explanations(writer, sheet_name, df, field_explanations):
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)

def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
export_overall_report(writer, db_summary, server_overall)

server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {"配置项": "日志相关配置项", "值": "对应配置项的值"}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)

if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)

if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)

for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")

def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()
def call_openai_api(prompt):
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""

def analyze_table(db, table, table_data, server_info):
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result

def analyze_tables(db_structure, server_info):
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis

def summarize_database(db, table_analysis):
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result

def summarize_all_databases(db_structure, table_analysis):
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary

def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险及支持的 Web 服务类型。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result

def export_overall_report(writer, db_summary, server_overall):
rows = []
for db, info in db_summary.items():
rows.append({
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

load_dotenv()

DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def connect_db():
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None

def collect_db_info(conn):
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning(f"无法查看访问记录: {e}")

try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning(f"无法查看日志配置: {e}")

try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning(f"无法查看文件权限: {e}")

try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning(f"无法查看 UDF 信息: {e}")

try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error(f"获取数据库列表失败: {e}")
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)

# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)

# 导出各数据表分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
# 如果有敏感字段信息,第一行标记
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

# 采集 MySQL 服务器数据
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()

# 第一步:对每个数据表进行单独分析
table_analysis = analyze_tables(db_structure, server_info)

# 第二步:对每个数据库的所有数据表分析结果进行汇总
db_summary = summarize_all_databases(db_structure, table_analysis)

# 第三步:结合服务器整体信息和各数据库汇总,再进行整体分析
server_overall = analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info)

# 导出所有分析结果到 Excel
export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall)
logging.info("整体流程完成!")


if name == "main":
main()
"""
对整个 MySQL 服务器进行整体分析(第三阶段),
利用大模型结合服务器信息、各数据库汇总信息以及其他采集数据,
输出总体风险描述和服务器功能说明。
返回格式:{ 'server_overall': '整体描述信息' }
"""
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库(对应 Web 服务)的汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险以及服务器支持哪些类型的 Web 服务(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result


# ====== 导出 Excel ======
def export_overall_report(writer, db_summary, server_overall):
"""
导出总体报告工作表,包含各数据库汇总和整体服务器分析,并增加中文解释
"""
rows = []
for db, info in db_summary.items():
rows.append({
"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
# 加入服务器整体分析
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合,提示可能存在的隐私风险",
"总体用途": "对每个数据库支撑的 Web 服务总体用途的描述,及整体服务器的业务支持情况"
}
# 在第一行下增加解释
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""在导出的工作表中,增加字段中文解释行"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
"""
导出 Excel 文件,包含总体报告、服务器信息、日志配置、访问记录、文件权限、UDF 信息,以及各数据表分析结果
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, db_summary, server_overall)

# 导出服务器信息
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

# 导出日志配置
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析函数 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试和请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_table(db, table, table_data, server_info):
"""
分析单个数据表(第一阶段分析),利用大模型分析表中的敏感信息和潜在风险,
并判断该表是否与某个 Web 服务相关。
返回格式:{ 'sensitive_fields': [...], 'table_usage': '可能用途说明' }
"""
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result


def analyze_tables(db_structure, server_info):
"""
对每个数据表进行分析(第一阶段),返回结果:
{ 数据库: { 表: 分析结果, ... }, ... }
"""
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis


def summarize_database(db, table_analysis):
"""
对一个数据库内的所有数据表的分析结果进行汇总(第二阶段),调用大模型进一步分析,
以生成该数据库(对应一个 Web 服务)的总体敏感信息和用途描述。
返回格式:{ 'all_sensitive_fields': [...], 'database_usage': '总体用途说明' }
"""
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result


def summarize_all_databases(db_structure, table_analysis):
"""
对每个数据库进行第二阶段的汇总分析,
返回结果格式:
{ 数据库: { 'all_sensitive_fields': [...], 'database_usage': '总体用途说明', 'tables': { ... } } }
"""
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary


def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立与 MySQL 服务器的连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
采集 MySQL 服务器数据:
- 所有非系统数据库、数据表、字段及前 5 条样本数据
- 服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
# 服务器基本信息
try:
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录: " + str(e))

try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

# 采集所有非系统数据库及其表信息
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)

# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)

# 导出各数据表分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
# 如果有敏感字段信息,第一行标记
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

# 采集 MySQL 服务器数据
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()

# 第一步:对每个数据表进行单独分析
table_analysis = analyze_tables(db_structure, server_info)

# 第二步:对每个数据库的所有数据表分析结果进行汇总
db_summary = summarize_all_databases(db_structure, table_analysis)

# 第三步:结合服务器整体信息和各数据库汇总,再进行整体分析
server_overall = analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info)

# 导出所有分析结果到 Excel
export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall)
logging.info("整体流程完成!")


if name == "main":
main()
"""
对整个 MySQL 服务器进行整体分析(第三阶段),
利用大模型结合服务器信息、各数据库汇总信息以及其他采集数据,
输出总体风险描述和服务器功能说明。
返回格式:{ 'server_overall': '整体描述信息' }
"""
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库(对应 Web 服务)的汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险以及服务器支持哪些类型的 Web 服务(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result


# ====== 导出 Excel ======
def export_overall_report(writer, db_summary, server_overall):
"""
导出总体报告工作表,包含各数据库汇总和整体服务器分析,并增加中文解释
"""
rows = []
for db, info in db_summary.items():
rows.append({
"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
# 加入服务器整体分析
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合,提示可能存在的隐私风险",
"总体用途": "对每个数据库支撑的 Web 服务总体用途的描述,及整体服务器的业务支持情况"
}
# 在第一行下增加解释
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""在导出的工作表中,增加字段中文解释行"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
"""
导出 Excel 文件,包含总体报告、服务器信息、日志配置、访问记录、文件权限、UDF 信息,以及各数据表分析结果
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, db_summary, server_overall)

# 导出服务器信息
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

# 导出日志配置
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析函数 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试和请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_table(db, table, table_data, server_info):
"""
分析单个数据表(第一阶段分析),利用大模型分析表中的敏感信息和潜在风险,
并判断该表是否与某个 Web 服务相关。
返回格式:{ 'sensitive_fields': [...], 'table_usage': '可能用途说明' }
"""
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result


def analyze_tables(db_structure, server_info):
"""
对每个数据表进行分析(第一阶段),返回结果:
{ 数据库: { 表: 分析结果, ... }, ... }
"""
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis


def summarize_database(db, table_analysis):
"""
对一个数据库内的所有数据表的分析结果进行汇总(第二阶段),调用大模型进一步分析,
以生成该数据库(对应一个 Web 服务)的总体敏感信息和用途描述。
返回格式:{ 'all_sensitive_fields': [...], 'database_usage': '总体用途说明' }
"""
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result


def summarize_all_databases(db_structure, table_analysis):
"""
对每个数据库进行第二阶段的汇总分析,
返回结果格式:
{ 数据库: { 'all_sensitive_fields': [...], 'database_usage': '总体用途说明', 'tables': { ... } } }
"""
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary


def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立与 MySQL 服务器的连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
采集 MySQL 服务器数据:
- 所有非系统数据库、数据表、字段及前 5 条样本数据
- 服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
# 服务器基本信息
try:
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录: " + str(e))

try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

# 采集所有非系统数据库及其表信息
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{conn_id}_{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
overall_data = {
"db_structures": {},
"server_infos": {},
"access_logs": {},
"log_configs": {},
"file_privileges": {},
"udf_infos": {}
}
# 遍历每个 MySQL 配置
for config in DB_CONFIGS:
conn_id = config.get("name", config.get("host", "mysql_default"))
conn = connect_db(config)
if not conn:
continue
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
overall_data["db_structures"][conn_id] = db_structure
overall_data["server_infos"][conn_id] = server_info
overall_data["access_logs"][conn_id] = access_logs
overall_data["log_configs"][conn_id] = log_configs
overall_data["file_privileges"][conn_id] = file_privileges
overall_data["udf_infos"][conn_id] = udf_info
conn.close()

analysis_result = analyze_with_openai(overall_data)
export_to_excel_multiple(overall_data, analysis_result)
logging.info("整体流程完成!")


if name == "main":
main()
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel_multiple(overall_data, analysis_result):
"""
导出多个 MySQL 服务器采集的数据到 Excel,包括每个服务器的总体报告、服务器信息、
日志配置、访问记录、文件权限、UDF 信息,以及各数据表及分析结果。
overall_data 为一个字典,包含各服务器采集的信息,结构如下:
{
"db_structures": {conn_id: {数据库: {表: {columns: [...], samples: [...]}, ...}, ...},
"server_infos": {conn_id: {服务器信息}},
"access_logs": {conn_id: [...]},
"log_configs": {conn_id: {...}},
"file_privileges": {conn_id: [...]},
"udf_infos": {conn_id: [...]}
}
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 遍历每个 MySQL 服务器
for conn_id in overall_data["server_infos"]:
# 导出总体报告
export_overall_report(writer, analysis_result, conn_id)

# 导出服务器信息
sheet_name = f"服务器信息_{conn_id}"[:31]
server_df = pd.DataFrame([overall_data["server_infos"][conn_id]])
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置
sheet_name = f"日志配置_{conn_id}"[:31]
log_df = pd.DataFrame(list(overall_data["log_configs"][conn_id].items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if overall_data["access_logs"].get(conn_id):
sheet_name = f"访问记录_{conn_id}"[:31]
access_df = pd.DataFrame(overall_data["access_logs"][conn_id])
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if overall_data["file_privileges"].get(conn_id):
sheet_name = f"文件权限_{conn_id}"[:31]
file_df = pd.DataFrame(overall_data["file_privileges"][conn_id])
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if overall_data["udf_infos"].get(conn_id):
sheet_name = f"UDF信息_{conn_id}"[:31]
udf_df = pd.DataFrame(overall_data["udf_infos"][conn_id])
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
db_structures = overall_data["db_structures"].get(conn_id, {})
for db, tables in db_structures.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(conn_id, {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue

try:
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试机制及请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_with_openai(data):
"""
使用 OpenAI 分析所有 MySQL 采集的数据,识别敏感信息和潜在渗透风险,
并分析各服务器是否支撑某类 Web 服务及其可能用途。
"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的多个 MySQL 服务器的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据判断敏感信息。"
"\n另外,请分析每个 MySQL 服务器是否支撑某个 Web 服务,以及该 Web 服务器可能的功能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {MySQL服务器标识: {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {MySQL服务器标识: 服务器相关风险描述及 Web 服务功能说明, ...},\n"
" 'access_analysis': {MySQL服务器标识: 访问记录相关风险描述, ...}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result, conn_id):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,可能泄露个人隐私。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务功能及可能用途。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}

report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {}).get(conn_id, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
sheet_name = f"总体报告_{conn_id}"[:31]
report_df.to_excel(writer, sheet_name=sheet_name, index=False)
import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
# 如果未提供 MYSQL_CONFIGS 环境变量,则使用单一默认配置
DEFAULT_DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}

# MYSQL_CONFIGS 应为 JSON 格式的数组,形如:
# [{"name": "mysql1", "host": "...", "user": "...", "password": "...", "port": 3306, "charset": "utf8mb4"}, {...}]
MYSQL_CONFIGS_JSON = os.getenv("MYSQL_CONFIGS", "")
if MYSQL_CONFIGS_JSON:
try:
DB_CONFIGS = json.loads(MYSQL_CONFIGS_JSON)
except Exception as e:
logging.error("解析 MYSQL_CONFIGS 失败,使用默认配置。错误:" + str(e))
DB_CONFIGS = [DEFAULT_DB_CONFIG]
else:
DB_CONFIGS = [DEFAULT_DB_CONFIG]

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db(config):
"""建立单个数据库连接"""
try:
conn = pymysql.connect(**config)
logging.info(f"数据库连接成功: {config.get('name', config['host'])}")
return conn
except Exception as e:
logging.error(f"数据库连接失败 {config.get('name', config['host'])}: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举数据库信息:包括数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(示例查询,实际可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
# 导出总体报告
export_overall_report(writer, analysis_result)

# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}

# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构、服务器信息及其作为 Web 服务器的功能和内容"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。"
"\n另外,请分析这些数据是否用于支撑某个 Web 服务器的功能或内容,并说明该 Web 服务器的可能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述及 Web 服务器功能说明},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务器的功能或内容。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}

report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})

report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
import os
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
操作指南:

1. 配置多数据库连接
- 创建multi_db_config.json文件
- 对敏感凭证进行Fernet加密:
python
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
encrypted_pwd = cipher.encrypt(b"real_password").decode()

2. 执行审计扫描
bash
export DECRYPTION_KEY="your_key"
python audit_tool.py

3. 查看输出报告**
- 自动生成report_<timestamp>.md
- 包含字段加密分析和GPT-4生成的渗透建议

该方案通过模块化设计支持企业级数据库审计需求,结合规则引擎与AI分析实现深度安全检测。
def connect_all(self):
"""建立所有数据库连接"""
connections =
for cfg in self.configs:
try:
conn = pymysql.connect(cfg)
connections.append(conn)
logging.info(f"成功连接数据库:{cfg'host'}:{cfg'port'}")
except Exception as e:
logging.error(f"连接失败 {cfg'host'}:{str(e)}")
return connections

====== 增强型数据采集 ======
def enhanced_collect(conn) -> Dict:
"""增强型数据采集(包含密码字段检测)"""
data = {}
with conn.cursor() as cursor:
获取所有数据库
cursor.execute("SHOW DATABASES")
databases = db0 for db in cursor.fetchall()

for db in databases:
if db in ('mysql', 'sys'): continue

datadb = {}
try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = tbl0 for tbl in cursor.fetchall()

for tbl in tables:
获取表结构
cursor.execute(f"DESCRIBE {tbl}")
columns = col0 for col in cursor.fetchall()

检测敏感字段
sensitive_cols =
for col in columns:
if re.search(r'passw(or)?dpwdhashsalt', col, re.I):
sensitive_cols.append(col)

获取样本并分析加密
crypto_analysis = {}
if sensitive_cols:
cursor.execute(f"SELECT {','.join(sensitive_cols)} FROM {tbl} LIMIT 5")
samples = cursor.fetchall()

for idx, col in enumerate(sensitive_cols):
sample = samples0idx if samples else ''
crypto_types = CryptoDetector.detect_crypto_type(sample)
crypto_analysiscol = {
'types': crypto_types,
'advice': CryptoDetector.get_decrypt_advice(crypto_types)
}

datadbtbl = {
'columns': columns,
'sensitive': crypto_analysis
}

except Exception as e:
logging.error(f"采集失败 {db}.{tbl}:{str(e)}")

return data

====== 改进后的主流程 ======
def main():
初始化多数据库连接
db_mgr = DBConnectionManager("multi_db_config.json")
connections = db_mgr.connect_all()

all_reports =
for conn in connections:
try:
执行增强采集
report = enhanced_collect(conn)
all_reports.append(report)

生成渗透建议报告
analysis_prompt = generate_analysis_prompt(report)
ai_advice = openai.ChatCompletion.create(
model="gpt-4",
messages={"role": "user", "content": analysis_prompt}
)

保存结果到Markdown
save_markdown_report(report, ai_advice)

finally:
conn.close()

合并所有报告
merge_reports(all_reports)

核心改进说明:

1. 多数据库支持
json
// multi_db_config.json

{
"host": "db1.example.com",
"user": "audit_user",
"password": "gAAAAABk...(加密凭证)",
"encrypted": true,
"port": 3306
},
{
"host": "db2.example.com",
"user": "readonly_user",
"password": "plaintext_pass",
"port": 3307
}

2. 加密类型识别逻辑
python
输入样本检测示例
sample = "5f4dcc3b5aa765d61d8327deb882cf99" MD5
CryptoDetector.detect_crypto_type(sample)
输出: {'md5': 0.9}

解密建议生成
advice = CryptoDetector.get_decrypt_advice({'md5': 0.9})
print(advice)
输出: "MD5哈希:使用彩虹表碰撞(如hashcat -m 0)或在线解密网站"

3. 渗透建议报告示例
markdown
安全审计报告 - 192.168.1.100

敏感字段分析
数据库 表 字段 加密类型 渗透建议

user_db accounts password bcrypt 需GPU集群暴力破解(约$1/10亿次)
order_db transactions card_token AES 查找配置文件中的AES密钥

AI建议
尝试以下渗透路径:
1. 从web应用的/config目录查找aes_key.txt
2. 使用JohnTheRipper进行bcrypt爆破
3. 检查数据库备份文件中的测试账号
Back to Top