SQL注入攻击通过构造恶意SQL语句绕过业务系统的参数校验,直接操作数据库获取敏感数据或破坏数据完整性。使用数据库代理层在请求到达数据库之前进行过滤,是实时拦截SQL注入的有效方案,不需要修改业务代码即可实现全局安全防护。

数据库代理层拦截原理
数据库代理层位于业务应用和数据库之间,所有发往数据库的SQL请求都会先经过代理层处理。代理层会先对请求内容进行解析和检测,只有符合安全规则的请求才会被转发到数据库,存在注入风险的请求会被直接拦截并返回错误提示。
核心处理流程
- 接收业务应用发送的SQL请求及连接参数
- 解析SQL语句结构,提取关键字、参数值、操作类型等信息
- 匹配预设的安全规则库,检测是否存在注入特征
- 安全请求转发至数据库,返回结果给业务应用
- 危险请求直接拦截,记录攻击日志并向上层返回拦截提示
拦截规则设计
危险关键字匹配
SQL注入通常会包含union、select、drop、update等危险关键字,且会搭配'、--、;等特殊字符。我们可以维护一个危险关键字列表,对SQL语句中的关键字和特殊字符进行匹配检测。
参数合法性校验
业务请求的参数通常都有固定的格式,比如用户ID是纯数字、用户名只包含字母和数字。代理层可以对参数值进行格式校验,不符合预期格式的参数直接判定为异常请求。
SQL语法结构检测
正常的业务SQL语句结构相对固定,注入语句往往会出现语法结构异常,比如where条件后拼接额外的查询逻辑、出现嵌套的select语句等。可以通过SQL语法解析器判断语句结构是否符合正常业务特征。
实现示例
以下是一个简单的数据库代理层过滤实现示例,使用Python开发,核心逻辑包含请求接收、规则检测、请求转发三个部分:
import re
import socket
import pymysql
# 危险关键字列表
DANGER_KEYWORDS = ['union', 'select', 'drop', 'update', 'delete', 'insert', '--', ';', "'"]
# 代理层监听配置
PROXY_HOST = '0.0.0.0'
PROXY_PORT = 3307
# 真实数据库配置
DB_HOST = '127.0.0.1'
DB_PORT = 3306
DB_USER = 'root'
DB_PASS = '123456'
DB_NAME = 'test_db'
def check_sql_injection(sql):
"""检测SQL是否存在注入风险"""
sql_lower = sql.lower()
# 匹配危险关键字
for keyword in DANGER_KEYWORDS:
if keyword in sql_lower:
return True
# 检测异常参数格式,比如数字参数包含非数字字符
num_param_pattern = r'ids*=s*D+'
if re.search(num_param_pattern, sql_lower):
return True
return False
def handle_client(client_socket):
"""处理客户端请求"""
try:
# 接收客户端发送的SQL请求
sql_request = client_socket.recv(4096).decode('utf-8')
print(f"收到请求: {sql_request}")
# 检测是否存在注入
if check_sql_injection(sql_request):
response = "SQL请求存在安全风险,已被拦截"
client_socket.send(response.encode('utf-8'))
print("拦截危险请求")
return
# 转发请求到真实数据库
db_conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME
)
cursor = db_conn.cursor()
cursor.execute(sql_request)
result = cursor.fetchall()
cursor.close()
db_conn.close()
# 返回结果给客户端
client_socket.send(str(result).encode('utf-8'))
except Exception as e:
error_msg = f"处理请求失败: {str(e)}"
client_socket.send(error_msg.encode('utf-8'))
finally:
client_socket.close()
def start_proxy():
"""启动代理层服务"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((PROXY_HOST, PROXY_PORT))
server_socket.listen(5)
print(f"数据库代理层启动,监听端口: {PROXY_PORT}")
while True:
client_socket, addr = server_socket.accept()
print(f"收到来自 {addr} 的连接")
handle_client(client_socket)
if __name__ == '__main__':
start_proxy()
优化与注意事项
实际落地时需要注意规则库的动态更新,定期补充新的注入特征,避免被新型攻击绕过。同时要控制检测的性能开销,避免代理层成为系统的性能瓶颈。对于误拦截的正常请求,要及时调整规则,平衡安全性和业务可用性。另外建议开启攻击日志记录,方便后续安全审计和攻击溯源。