import os
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
# 如果未提供 MYSQL_CONFIGS 环境变量,则使用单一默认配置
DEFAULT_DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}
# MYSQL_CONFIGS 应为 JSON 格式的数组,形如:
# [{"name": "mysql1", "host": "...", "user": "...", "password": "...", "port": 3306, "charset": "utf8mb4"}, {...}]
MYSQL_CONFIGS_JSON = os.getenv("MYSQL_CONFIGS", "")
if MYSQL_CONFIGS_JSON:
try:
DB_CONFIGS = json.loads(MYSQL_CONFIGS_JSON)
except Exception as e:
logging.error("解析 MYSQL_CONFIGS 失败,使用默认配置。错误:" + str(e))
DB_CONFIGS = [DEFAULT_DB_CONFIG]
else:
DB_CONFIGS = [DEFAULT_DB_CONFIG]
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db(config):
"""建立单个数据库连接"""
try:
conn = pymysql.connect(**config)
logging.info(f"数据库连接成功: {config.get('name', config['host'])}")
return conn
except Exception as e:
logging.error(f"数据库连接失败 {config.get('name', config['host'])}: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举数据库信息:包括数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(示例查询,实际可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
import json
import pymysql
import openai
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
# 如果未提供 MYSQL_CONFIGS 环境变量,则使用单一默认配置
DEFAULT_DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": os.getenv("DB_CHARSET", "utf8mb4")
}
# MYSQL_CONFIGS 应为 JSON 格式的数组,形如:
# [{"name": "mysql1", "host": "...", "user": "...", "password": "...", "port": 3306, "charset": "utf8mb4"}, {...}]
MYSQL_CONFIGS_JSON = os.getenv("MYSQL_CONFIGS", "")
if MYSQL_CONFIGS_JSON:
try:
DB_CONFIGS = json.loads(MYSQL_CONFIGS_JSON)
except Exception as e:
logging.error("解析 MYSQL_CONFIGS 失败,使用默认配置。错误:" + str(e))
DB_CONFIGS = [DEFAULT_DB_CONFIG]
else:
DB_CONFIGS = [DEFAULT_DB_CONFIG]
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db(config):
"""建立单个数据库连接"""
try:
conn = pymysql.connect(**config)
logging.info(f"数据库连接成功: {config.get('name', config['host'])}")
return conn
except Exception as e:
logging.error(f"数据库连接失败 {config.get('name', config['host'])}: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举数据库信息:包括数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(示例查询,实际可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库