# 导出总体报告
export_overall_report(writer, analysis_result)
# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
export_overall_report(writer, analysis_result)
# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()