"""
对整个 MySQL 服务器进行整体分析(第三阶段),
利用大模型结合服务器信息、各数据库汇总信息以及其他采集数据,
输出总体风险描述和服务器功能说明。
返回格式:{ 'server_overall': '整体描述信息' }
"""
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库(对应 Web 服务)的汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险以及服务器支持哪些类型的 Web 服务(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result


# ====== 导出 Excel ======
def export_overall_report(writer, db_summary, server_overall):
"""
导出总体报告工作表,包含各数据库汇总和整体服务器分析,并增加中文解释
"""
rows = []
for db, info in db_summary.items():
rows.append({
"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
# 加入服务器整体分析
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]

explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合,提示可能存在的隐私风险",
"总体用途": "对每个数据库支撑的 Web 服务总体用途的描述,及整体服务器的业务支持情况"
}
# 在第一行下增加解释
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""在导出的工作表中,增加字段中文解释行"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
"""
导出 Excel 文件,包含总体报告、服务器信息、日志配置、访问记录、文件权限、UDF 信息,以及各数据表分析结果
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, db_summary, server_overall)

# 导出服务器信息
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)

# 导出日志配置
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, "日志配置", log_df, log_explanations)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
 
 
Back to Top