def call_openai_api(prompt):
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""

def analyze_table(db, table, table_data, server_info):
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result

def analyze_tables(db_structure, server_info):
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis

def summarize_database(db, table_analysis):
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result

def summarize_all_databases(db_structure, table_analysis):
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary

def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
prompt = (
"请基于下面提供的 MySQL 服务器信息、各数据库汇总分析、访问记录、日志配置、文件权限、UDF 信息,"
"生成该 MySQL 服务器的整体安全和业务功能分析报告,描述可能存在的风险及支持的 Web 服务类型。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'server_overall': '整体描述信息' }\n"
"服务器信息:\n" + json.dumps(server_info, ensure_ascii=False, indent=2) +
"\n数据库汇总:\n" + json.dumps(db_summary, ensure_ascii=False, indent=2) +
"\n访问记录:\n" + json.dumps(access_logs, ensure_ascii=False, indent=2) +
"\n日志配置:\n" + json.dumps(log_configs, ensure_ascii=False, indent=2) +
"\n文件权限:\n" + json.dumps(file_privileges, ensure_ascii=False, indent=2) +
"\nUDF 信息:\n" + json.dumps(udf_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"MySQL 服务器整体分析失败,响应:{response}")
result = {}
return result

def export_overall_report(writer, db_summary, server_overall):
rows = []
for db, info in db_summary.items():
rows.append({
 
 
Back to Top