You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python从DDL文本提取元数据存入JSON失败求助

解决DDL提取数据到JSON的问题:Schema校验与COMPRESS列处理

我来帮你搞定这个DDL数据提取的问题,先梳理下原代码的核心问题,再给出修复后的完整方案:

问题根源分析

你的代码现在没法把数据写入JSON,还存在Schema校验缺失和COMPRESS列处理不了的问题,主要原因有这几点:

  1. 提取函数只打印不返回数据:三个提取函数只是把结果print出来,没有return对应的值,导致后续收集的都是None
  2. 用集合存储数据是错误的{ table_name(i), Schema_name(i), Columns(i)}创建的是无序集合,JSON没法序列化这种无意义的结构,应该用字典来存键值对
  3. 没做Schema存在性校验:如果DDL里的表名没有Schema前缀(比如CREATE TABLE mytable而不是CREATE TABLE myschema.mytable),原代码会直接报索引错误,而且你要求必须有Schema,所以得跳过这类无效DDL
  4. 列提取正则不兼容COMPRESS:原代码的正则逻辑没法匹配带COMPRESS修饰符的列(比如user_id INT COMPRESS),导致这类列提取失败
  5. 冗余的文件关闭操作out_file.close()是多余的,with语句会自动帮你关闭文件

修复后的完整代码

import re
import json

def extract_table_and_schema(ddl):
    # 精准匹配带Schema前缀的表名格式:schema.table
    table_match = re.search(r'CREATE TABLE\s+([a-zA-Z0-9_]+\.[a-zA-Z0-9_]+)', ddl, re.IGNORECASE | re.MULTILINE)
    if not table_match:
        return None, None  # 没有Schema的DDL直接返回空,后续跳过处理
    full_table = table_match.group(1)
    schema_name, table_name = full_table.split('.')
    # 去除可能的空格
    return schema_name.strip(), table_name.strip()

def extract_columns(ddl):
    # 先移除FOREIGN KEY、PARTITION、PRIMARY KEY这些无关的约束行
    cleaned_ddl = re.sub(r'.*(FOREIGN KEY|PARTITION|PRIMARY KEY).*\n?', '', ddl, flags=re.IGNORECASE)
    # 匹配括号内的所有列定义内容
    columns_section = re.search(r'\((.*?)\)', cleaned_ddl, re.DOTALL | re.IGNORECASE)
    if not columns_section:
        return []
    
    columns_str = columns_section.group(1)
    columns = []
    # 分割每个列定义,处理带COMPRESS等修饰符的情况
    for col_def in re.split(r',\s*', columns_str.strip()):
        # 提取列名:取每个定义里的第一个标识符(忽略后面的类型和修饰词)
        col_name_match = re.match(r'^([a-zA-Z0-9_]+)', col_def.strip())
        if col_name_match:
            columns.append(col_name_match.group(1))
    return columns

def main():
    ddl_records = []
    # 读取DDL文件
    with open("DDL_Teradata.txt", "r") as f:
        all_ddl = f.read()
        # 按分号分割DDL语句,过滤掉空的无效语句
        ddl_statements = [stmt.strip() for stmt in all_ddl.split(';') if stmt.strip()]
        
        for stmt in ddl_statements:
            schema, table = extract_table_and_schema(stmt)
            # 严格校验Schema必须存在,否则跳过该DDL
            if not schema or not table:
                print(f"跳过无Schema的无效DDL:{stmt[:50]}...")
                continue
            cols = extract_columns(stmt)
            # 用字典存储每条记录的结构化数据
            record = {
                "schema_name": schema,
                "table_name": table,
                "column_names": cols
            }
            ddl_records.append(record)
    
    # 写入JSON文件,自动处理文件关闭
    with open('ddl.json', 'w') as fp:
        json.dump(ddl_records, fp, indent=4)
    print("处理完成!数据已成功写入ddl.json")

if __name__ == "__main__":
    main()

关键修复细节

  • 合并Schema与表名提取:把原来两个函数合并成一个,统一处理带Schema的表名匹配,直接返回结果,同时自动过滤掉没有Schema的DDL
  • 兼容COMPRESS列:调整列提取逻辑,不管列后面有没有COMPRESS或者其他修饰符,只提取列名部分,确保所有列都能被捕获
  • 用字典存储结构化数据:改用键值对的字典,保证JSON文件里的结构清晰,符合你的需求
  • 函数返回有效数据:所有提取函数都返回对应的数据,不再只是打印,这样才能把数据收集起来写入JSON
  • 处理无效DDL语句:分割DDL后过滤空字符串,避免处理分号结尾的空内容
  • 规范文件操作:全程用with语句管理文件,无需手动关闭,避免资源泄漏

内容的提问来源于stack exchange,提问作者akash

火山引擎 最新活动