# 调用大模型
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
print("✅ 分析完成!")
return analysis_result
except json.JSONDecodeError:
print("❌ OpenAI 响应解析失败,原始响应:", response)
return {}
# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
print("📤 正在导出数据到 Excel...")
writer = pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl')
# 导出服务器信息、日志配置、访问记录、文件权限、UDF 信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
pd.DataFrame.from_dict(log_configs, orient='index').to_excel(writer, sheet_name='日志配置')
pd.DataFrame(access_logs).to_excel(writer, sheet_name='访问记录', index=False)
pd.DataFrame(file_privileges).to_excel(writer, sheet_name='文件权限', index=False)
pd.DataFrame(udf_info).to_excel(writer, sheet_name='UDF信息', index=False)
# 导出数据表和分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 写入 Excel
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
writer.close()
print(f"✅ 数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
# 连接数据库
conn = connect_db()
if not conn:
return
# 步骤 1:采集数据库信息
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
# 步骤 2:利用 OpenAI 分析敏感信息和渗透风险
analysis_result = analyze_with_openai({
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
})
# 步骤 3:导出数据和分析结果到 Excel
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
# 关闭连接
conn.close()
print("\n✅ 整体流程完成!")
if name == "main":
main()
如果还有需要优化的地方,随时告诉我!
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
print("✅ 分析完成!")
return analysis_result
except json.JSONDecodeError:
print("❌ OpenAI 响应解析失败,原始响应:", response)
return {}
# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
print("📤 正在导出数据到 Excel...")
writer = pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl')
# 导出服务器信息、日志配置、访问记录、文件权限、UDF 信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
pd.DataFrame.from_dict(log_configs, orient='index').to_excel(writer, sheet_name='日志配置')
pd.DataFrame(access_logs).to_excel(writer, sheet_name='访问记录', index=False)
pd.DataFrame(file_privileges).to_excel(writer, sheet_name='文件权限', index=False)
pd.DataFrame(udf_info).to_excel(writer, sheet_name='UDF信息', index=False)
# 导出数据表和分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 写入 Excel
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
writer.close()
print(f"✅ 数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
# 连接数据库
conn = connect_db()
if not conn:
return
# 步骤 1:采集数据库信息
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
# 步骤 2:利用 OpenAI 分析敏感信息和渗透风险
analysis_result = analyze_with_openai({
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
})
# 步骤 3:导出数据和分析结果到 Excel
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
# 关闭连接
conn.close()
print("\n✅ 整体流程完成!")
if name == "main":
main()
如果还有需要优化的地方,随时告诉我!