You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

在Django应用中使用MySQL的AES_ENCRYPT与AES_DECRYPT加密解密问题

解决Django中Python AES与MySQL AES_ENCRYPT/DECRYPT结果不一致的问题

我之前也踩过这个坑,核心原因是MySQL的AES实现和标准AES库(比如PyCryptodome)的默认行为有差异,主要体现在密钥处理、编码方式和填充规则上。下面是几个可行的解决方案,从手动对齐逻辑到自定义模型字段,让你不用再依赖原生SQL查询:

1. 手动对齐Python AES与MySQL的行为

MySQL的AES_ENCRYPT/DECRYPT有几个关键特性:

  • 默认使用ECB模式(无IV)
  • 密钥会被自动填充到16/24/32字节(用空格填充,优先匹配最长的支持长度:256位→32字节,其次192→24,128→16)
  • 明文和密钥都用数据库连接的默认编码(通常是Latin1,也可能是UTF-8)转换为字节
  • 使用PKCS#7填充规则补全明文到块大小(16字节)

你可以用PyCryptodome模拟这个逻辑,代码如下:

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import os

# 建议从环境变量读取密钥,不要硬编码
AES_KEY = os.getenv("MYSQL_AES_KEY", "16-bytes encryption key")

def mysql_aes_encrypt(plaintext: str, key: str) -> bytes:
    # 对齐MySQL的编码(如果你的数据库用UTF-8,就改成utf-8)
    plaintext_bytes = plaintext.encode("latin1")
    key_bytes = key.encode("latin1")
    
    # 按MySQL规则填充/截断密钥
    key_len = len(key_bytes)
    if key_len <= 16:
        padded_key = key_bytes.ljust(16, b" ")
    elif key_len <= 24:
        padded_key = key_bytes.ljust(24, b" ")
    else:
        padded_key = key_bytes[:32]  # 超过32字节就截断
    
    # ECB模式加密,PKCS#7填充
    cipher = AES.new(padded_key, AES.MODE_ECB)
    padded_plaintext = pad(plaintext_bytes, AES.block_size, style="pkcs7")
    return cipher.encrypt(padded_plaintext)

def mysql_aes_decrypt(ciphertext: bytes, key: str) -> str:
    key_bytes = key.encode("latin1")
    key_len = len(key_bytes)
    if key_len <= 16:
        padded_key = key_bytes.ljust(16, b" ")
    elif key_len <= 24:
        padded_key = key_bytes.ljust(24, b" ")
    else:
        padded_key = key_bytes[:32]
    
    cipher = AES.new(padded_key, AES.MODE_ECB)
    decrypted_bytes = cipher.decrypt(ciphertext)
    # 去除PKCS#7填充并解码
    unpadded_bytes = unpad(decrypted_bytes, AES.block_size, style="pkcs7")
    return unpadded_bytes.decode("latin1")

用这个方法加密的结果,和MySQL的AES_ENCRYPT(my_field, 'key')完全一致,解密同理。

2. 自定义Django模型字段(推荐)

如果不想每次手动调用加密解密函数,可以封装成自定义模型字段,让ORM自动处理加密和解密,用法和普通字段完全一样:

from django.db import models
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import os

AES_KEY = os.getenv("MYSQL_AES_KEY", "16-bytes encryption key")

class MySQLAESField(models.BinaryField):
    def from_db_value(self, value, expression, connection):
        """从数据库读取时自动解密"""
        if value is None:
            return value
        
        # 处理密钥(和MySQL对齐)
        key_bytes = AES_KEY.encode("latin1")
        key_len = len(key_bytes)
        if key_len <= 16:
            padded_key = key_bytes.ljust(16, b" ")
        elif key_len <= 24:
            padded_key = key_bytes.ljust(24, b" ")
        else:
            padded_key = key_bytes[:32]
        
        cipher = AES.new(padded_key, AES.MODE_ECB)
        decrypted = unpad(cipher.decrypt(value), AES.block_size, style="pkcs7")
        return decrypted.decode("latin1")
    
    def to_python(self, value):
        """将数据库的二进制值转换为Python字符串"""
        if isinstance(value, str):
            return value
        if value is None:
            return value
        
        # 同from_db_value的解密逻辑
        key_bytes = AES_KEY.encode("latin1")
        key_len = len(key_bytes)
        if key_len <= 16:
            padded_key = key_bytes.ljust(16, b" ")
        elif key_len <= 24:
            padded_key = key_bytes.ljust(24, b" ")
        else:
            padded_key = key_bytes[:32]
        
        cipher = AES.new(padded_key, AES.MODE_ECB)
        decrypted = unpad(cipher.decrypt(value), AES.block_size, style="pkcs7")
        return decrypted.decode("latin1")
    
    def get_prep_value(self, value):
        """保存到数据库时自动加密"""
        if value is None:
            return value
        
        # 处理密钥
        key_bytes = AES_KEY.encode("latin1")
        key_len = len(key_bytes)
        if key_len <= 16:
            padded_key = key_bytes.ljust(16, b" ")
        elif key_len <= 24:
            padded_key = key_bytes.ljust(24, b" ")
        else:
            padded_key = key_bytes[:32]
        
        cipher = AES.new(padded_key, AES.MODE_ECB)
        plaintext_bytes = value.encode("latin1")
        padded_plaintext = pad(plaintext_bytes, AES.block_size, style="pkcs7")
        return cipher.encrypt(padded_plaintext)

然后在你的模型中使用这个字段:

class MyModel(models.Model):
    my_field = MySQLAESField()
    # 其他字段...

现在你可以像使用普通字段一样操作:

# 保存加密数据
obj = MyModel(my_field="my secret data")
obj.save()

# 查询自动解密
obj = MyModel.objects.get(id=1)
print(obj.my_field)  # 输出 "my secret data"

# 过滤查询(自动加密匹配值)
results = MyModel.objects.filter(my_field="my secret data")

3. 重要注意事项

  • 密钥安全:绝对不要把密钥硬编码到代码里,用环境变量或者专业的密钥管理工具(如HashiCorp Vault)存储。
  • ECB模式风险:ECB模式是不安全的,因为相同的明文块会生成相同的密文块,容易被破解。如果对安全性要求高,建议放弃MySQL的AES函数,完全在Python侧使用更安全的模式(如CBC或GCM),同时存储密文和IV到数据库(IV需要随机生成,不能重复)。
  • 编码适配:如果你的数据库用UTF-8编码,记得把代码中的latin1改成utf-8,避免乱码。

内容的提问来源于stack exchange,提问作者Ahmedn1

火山引擎 最新活动