Python+SSMS环境下删除表数据时SQL注入高危告警的解决及替代方案咨询
Python+SSMS环境下删除表数据时SQL注入高危告警的解决及替代方案咨询
我来帮你拆解下当前的问题,然后一步步给你解决思路和替代方案:
首先,你的代码触发高危告警的核心原因有两个:
- 白名单校验完全失效:你定义的
allowed_tables是空列表,然后不管传入什么table_name都直接把它加进白名单,等于没有任何权限校验,安全扫描工具一眼就能识别出“动态表名未受限制”的风险 - 动态表名的拼接没有做安全转义:虽然你对
date和vendor用了参数化查询,但表名是直接拼接的,而参数化查询无法覆盖表名这类数据库对象,所以必须额外处理
一、修复当前代码的直接方案(快速解决告警)
1. 先修复白名单校验逻辑
白名单必须是预先定义的、固定的、你明确信任的表名列表,绝对不能动态添加传入的表名。比如:
# 提前把所有允许删除数据的表名列在这里,必须是你系统中合法的、需要操作的表 ALLOWED_TABLES = ["sales_data", "vendor_monthly_stats", "your_other_allowed_table"]
然后在函数开头做严格校验,不在白名单里的直接拒绝操作:
def truncate_and_load_data_to_tpp(df: pd.DataFrame, date: str, table_name: str, logger: logging.Logger, vendor: str = None): # 第一步:严格校验表名是否在白名单内 if table_name not in ALLOWED_TABLES: error_msg = f"Attempted to delete data from unauthorized table: {table_name}" logger.error(error_msg) raise ValueError(error_msg) # 后续的数据库操作逻辑...
2. 安全处理动态表名的拼接
因为SQL Server不支持把表名作为参数化查询的参数,所以我们需要用SQL Server内置的QUOTENAME()函数来安全包裹表名,它会自动处理表名中的特殊字符(比如空格、连字符、关键字等),防止注入风险。修改后的删除逻辑如下:
conn_tpp = create_connection("tpp") try: with conn_tpp as conn: logger.info(f"Deleting existing data from {table_name} where month = '{date}'") cursor = conn.cursor() # 用QUOTENAME安全处理表名,再拼接SQL语句 if vendor: # 注意:QUOTENAME是SQL Server的函数,直接写在SQL语句里 delete_query = f"DELETE FROM {QUOTENAME(table_name)} WHERE month = %s AND vendor = %s" cursor.execute(delete_query, (date, vendor)) else: delete_query = f"DELETE FROM {QUOTENAME(table_name)} WHERE month = %s" cursor.execute(delete_query, (date,)) conn.commit() except Exception as e: error_msg = f"Error deleting data from TPP for month {date}: {e}" logger.error(error_msg)
二、更安全的替代方案:用存储过程封装删除逻辑
如果想彻底隔离Python代码和动态SQL的风险,推荐把删除逻辑封装到SQL Server的存储过程里,Python端只需要调用存储过程并传递参数,这样所有的权限校验和安全处理都在数据库端完成。
1. 在SQL Server中创建安全的存储过程
CREATE PROCEDURE SafeDeleteMonthlyData @TableName NVARCHAR(128), @Month NVARCHAR(6), -- 假设你的month字段是YYYYMM格式,可根据实际类型调整 @Vendor NVARCHAR(128) = NULL AS BEGIN SET NOCOUNT ON; -- 存储过程内部做白名单校验,和Python端的白名单保持一致 DECLARE @AllowedTables TABLE (TableName NVARCHAR(128)) INSERT INTO @AllowedTables VALUES ('sales_data'), ('vendor_monthly_stats'), ('your_other_allowed_table') -- 校验表名是否合法 IF NOT EXISTS (SELECT 1 FROM @AllowedTables WHERE TableName = @TableName) BEGIN RAISERROR('Unauthorized access to table: %s', 16, 1, @TableName) RETURN END -- 动态构造SQL,用QUOTENAME安全处理表名 DECLARE @SqlCommand NVARCHAR(MAX) IF @Vendor IS NOT NULL BEGIN SET @SqlCommand = N'DELETE FROM ' + QUOTENAME(@TableName) + N' WHERE month = @Month AND vendor = @Vendor' END ELSE BEGIN SET @SqlCommand = N'DELETE FROM ' + QUOTENAME(@TableName) + N' WHERE month = @Month' END -- 用sp_executesql执行动态SQL,确保其他参数是参数化的 EXEC sp_executesql @SqlCommand, N'@Month NVARCHAR(6), @Vendor NVARCHAR(128)', @Month = @Month, @Vendor = @Vendor END
2. Python端调用存储过程的代码
def truncate_and_load_data_to_tpp(df: pd.DataFrame, date: str, table_name: str, logger: logging.Logger, vendor: str = None): conn_tpp = create_connection("tpp") try: with conn_tpp as conn: logger.info(f"Calling stored procedure to delete data from {table_name} where month = '{date}'") cursor = conn.cursor() # 调用存储过程,传递参数 if vendor: cursor.execute("EXEC SafeDeleteMonthlyData @TableName = ?, @Month = ?, @Vendor = ?", (table_name, date, vendor)) else: cursor.execute("EXEC SafeDeleteMonthlyData @TableName = ?, @Month = ?", (table_name, date)) conn.commit() except Exception as e: error_msg = f"Error deleting data from TPP for month {date}: {e}" logger.error(error_msg)
三、额外的安全建议
- 数据库账号的权限最小化:给连接SQL Server的账号只赋予必要的DELETE权限,不要用sa或超级管理员账号
- 对输入参数做额外校验:比如
date格式是否符合预期(比如YYYYMM),vendor是否是合法的枚举值,进一步降低风险 - 安全扫描工具的误报处理:如果已经做了严格的白名单+QUOTENAME处理,确认没有注入风险后,可以给扫描工具的这个告警标记为误报
内容来源于stack exchange




