# 导出总体报告
export_overall_report(writer, analysis_result)

# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}

# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构、服务器信息及其作为 Web 服务器的功能和内容"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。"
"\n另外,请分析这些数据是否用于支撑某个 Web 服务器的功能或内容,并说明该 Web 服务器的可能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述及 Web 服务器功能说明},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务器的功能或内容。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}

report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})

report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
import os
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}

# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}

report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})

report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)

# 通用字段解释
common_field_explanations = {
import os
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)

# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)

# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)

# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)

# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 定义每个数据表字段的中文解释(根据实际情况调整)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()
db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY) # 请求后延迟 1 秒
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
# 定义总体报告字段解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}

report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})

report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
# field_explanations: dict, 键为字段名,值为中文解释
explanation_row = []
for col in df.columns:
explanation_row.append(field_explanations.get(col, ""))
# 在df上方插入解释行
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)


def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)

# 定义通用字段解释(可根据实际情况扩展)
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
import pymysql
import openai
import json
import pandas as pd
import time
import logging

# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
REQUEST_DELAY = 1 # 每个请求延迟 1 秒,防止被滥用阻止

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue

try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return

db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2)
return ""


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"请用中文输出分析结果,输出格式为:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,每个字段增加中文解释
"""
# 定义字段中文解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,如身份证号、手机号、密码等。",
"server_analysis": "服务器相关分析结果,包括版本、日志配置、文件权限、UDF 提权风险等信息。",
"access_analysis": "访问记录分析结果,展示访问数据库的服务器信息及潜在风险。"
}
# 构造总体报告数据
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
# 将 value 转换为字符串格式,便于展示
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})

report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)


def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)

# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)

# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)

# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)

# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
import pymysql
import openai
import json
import pandas as pd
import time
import logging

# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue

try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

conn.close()
logging.info("整体流程完成!")


if name == "main":
main()


---

以上代码在原有基础上增加了异常处理、日志记录和部分数据格式优化,使脚本在面对较大数据量或异常情况时更加健壮和可维护。如果还有其他需求或改进意见,请随时反馈!
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2) # 等待后重试
return ""


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}


# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)

# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)

# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)

# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)

# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"
sheet_name = sheet_name[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
下面列出一些可以进一步优化脚本的建议,并提供一个改进后的代码版本供参考:


---

优化建议

1. 异常处理和重试机制

对调用 OpenAI API 增加异常捕获、超时处理及重试机制,保证在网络波动时依然能获得结果。

对数据库查询增加更多异常捕获,避免因单个查询失败导致整个流程中断。



2. 上下文管理和资源释放

使用 with 上下文管理器来管理数据库连接和 Excel Writer,确保资源在异常情况下也能正确释放。



3. 数据格式和列名优化

对导出的日志配置、访问记录等数据进行格式整理,添加列标题,使 Excel 文件更具可读性。

考虑对 sheet 名称冲突或长度进行更严格处理。



4. 参数化查询和安全性

对 SQL 语句使用参数化查询(如果涉及动态参数),以降低 SQL 注入风险(尽管此处主要用于采集数据)。



5. 日志记录

增加日志记录模块,将关键步骤和异常写入日志文件,便于后续排查问题。





---

改进后的代码

import pymysql
import openai
import json
import pandas as pd
import time
import logging

# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")

try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))

try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))

try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))

try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))

# 获取数据库结构和样本数据
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

try:
cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue

db_structure[db] = {}

for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
# 调用大模型
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
print(" 分析完成!")
return analysis_result
except json.JSONDecodeError:
print(" OpenAI 响应解析失败,原始响应:", response)
return {}


# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
print("📤 正在导出数据到 Excel...")
writer = pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl')

# 导出服务器信息、日志配置、访问记录、文件权限、UDF 信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
pd.DataFrame.from_dict(log_configs, orient='index').to_excel(writer, sheet_name='日志配置')
pd.DataFrame(access_logs).to_excel(writer, sheet_name='访问记录', index=False)
pd.DataFrame(file_privileges).to_excel(writer, sheet_name='文件权限', index=False)
pd.DataFrame(udf_info).to_excel(writer, sheet_name='UDF信息', index=False)

# 导出数据表和分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']

df = pd.DataFrame(data, columns=columns)

# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)

# 写入 Excel
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)

writer.close()
print(f" 数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
# 连接数据库
conn = connect_db()
if not conn:
return

# 步骤 1:采集数据库信息
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)

# 步骤 2:利用 OpenAI 分析敏感信息和渗透风险
analysis_result = analyze_with_openai({
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
})

# 步骤 3:导出数据和分析结果到 Excel
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)

# 关闭连接
conn.close()
print("\n 整体流程完成!")


if name == "main":
main()

如果还有需要优化的地方,随时告诉我!
已经优化完毕!下面是新增了对日志配置、文件权限、UDF 提权等渗透途径分析的代码:

import pymysql
import openai
import json
import pandas as pd

# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'

# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
print(" 数据库连接成功")
return conn
except Exception as e:
print(f" 数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
print("🔍 正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []

with conn.cursor() as cursor:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}

# 获取访问记录(需要权限)
try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except:
print("⚠️ 当前用户无法查看访问记录 (SHOW PROCESSLIST)")

# 日志配置
try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except:
print("⚠️ 无法查看日志配置")

# 文件权限
try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except:
print("⚠️ 无法查看文件权限")

# UDF 提权信息
try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except:
print("⚠️ 无法查看 UDF 信息")

# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

cursor.execute(f"USE {db}")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]

db_structure[db] = {}

for table in tables:
# 获取字段信息
cursor.execute(f"DESCRIBE {table}")
columns = [col[0] for col in cursor.fetchall()]

# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM {table} LIMIT 5")
samples = cursor.fetchall()

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析"""
openai.api_key = OPENAI_API_KEY
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
print("🧠 正在通过 OpenAI 分析...")

# 生成分析 prompt
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
# 导出数据表和分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']

df = pd.DataFrame(data, columns=columns)

# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)

# 写入 Excel
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)

writer.close()
print(f" 数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
# 连接数据库
conn = connect_db()
if not conn:
return

# 步骤 1:采集数据库信息
db_structure, server_info, access_logs = collect_db_info(conn)

# 步骤 2:利用 OpenAI 分析敏感信息
analysis_result = analyze_with_openai({
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs
})

# 步骤 3:导出数据和分析结果到 Excel
export_to_excel(db_structure, server_info, access_logs, analysis_result)

# 关闭连接
conn.close()
print("\n 整体流程完成!")


if name == "main":
main()

`
`
import pymysql
import openai
import json
import pandas as pd

# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_TABLES_PER_BATCH = 10 # 分批传递表数量,避免超载

# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
print(" 数据库连接成功")
return conn
except Exception as e:
print(f" 数据库连接失败: {e}")
return None


# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录等"""
print("🔍 正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []

with conn.cursor() as cursor:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}

# 获取访问记录(需要权限)
try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except:
print("⚠️ 当前用户无法查看访问记录 (SHOW PROCESSLIST)")

# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]

for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库

cursor.execute(f"USE `{db}`")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]

db_structure[db] = {}

for table in tables:
# 获取字段信息
cursor.execute(f"DESCRIBE `{table}`")
columns = [col[0] for col in cursor.fetchall()]

# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM `{table}` LIMIT 5")
samples = cursor.fetchall()

db_structure[db][table] = {
"columns": columns,
"samples": samples
}

return db_structure, server_info, access_logs


# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析"""
openai.api_key = OPENAI_API_KEY
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()


def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和服务器信息"""
print("🧠 正在通过 OpenAI 分析...")

# 生成分析 prompt(强化中文环境敏感信息识别)
prompt = (
"以下是数据库结构、服务器信息和访问记录,请识别可能的敏感信息(如身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址等),"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)

# 调用大模型
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
print(" 分析完成!")
return analysis_result
except json.JSONDecodeError:
print(" OpenAI 响应解析失败,原始响应:", response)
return {}


# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, analysis_result):
"""导出数据和分析结果到 Excel"""
print("📤 正在导出数据到 Excel...")
writer = pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl')

# 导出服务器信息
server_df = pd.DataFrame([server_info])
server_df.to_excel(writer, sheet_name='服务器信息', index=False)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出服务器信息
server_df = pd.DataFrame([server_info])
server_df.to_excel(writer, sheet_name='服务器信息', index=False)

# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)

# 导出数据表和分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']

df = pd.DataFrame(data, columns=columns)

# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)

# 写入 Excel
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)

writer.close()
print(f" 数据导出完成:{OUTPUT_FILE}")


# ====== 主流程 ======
def main():
# 连接数据库
conn = connect_db()
if not conn:
return

# 步骤 1:采集数据库信息
db_structure, server_info, access_logs = collect_db_info(conn)

# 步骤 2:利用 OpenAI 分析敏感信息
analysis_result = analyze_with_openai({
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs
})

# 步骤 3:导出数据和分析结果到 Excel
export_to_excel(db_structure, server_info, access_logs, analysis_result)

# 关闭连接
conn.close()
print("\n 整体流程完成!")


if name == "main":
main()


---

这个版本的脚本强化了对中文环境的敏感信息识别能力,尤其是:

字段名分析:能识别拼音、中文、缩写的字段名。

样本数据判断:结合数据内容识别隐私信息。

大模型提示词优化:提升对中文环境的适应性。


如果还有需要优化的地方,随时告诉我!
Back to Top