sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()