You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何使用psycopg2.extras.execute_values实现PostgreSQL Upsert操作

Fixing Your Efficient PostgreSQL Upsert with psycopg2's execute_values

Great call using execute_values for bulk operations—it's way more efficient than individual execute calls. Let's walk through fixing your syntax errors and optimizing the upsert logic:

First, the Core Syntax Issues

Your error comes from two main problems:

  1. Misusing the %s placeholder with a WITH clause—execute_values replaces %s with a full VALUES (...) list, so wrapping it in a WITH block breaks PostgreSQL's syntax.
  2. Incomplete ON CONFLICT syntax: You need to specify which unique constraint/columns trigger the conflict, and your UPDATE clause had incorrect formatting (commas instead of AND, missing SET keyword, etc.).

Assumptions to Note

I'm assuming your LTSF.Prices table has a unique constraint on ("Asset_ID", "Price_Date", "Price_Type")—this is required for ON CONFLICT to work (PostgreSQL needs to know what constitutes a duplicate row). If you don't have this constraint, add it first:

ALTER TABLE "LTSF"."Prices"
ADD CONSTRAINT unique_asset_date_type UNIQUE ("Asset_ID", "Price_Date", "Price_Type");

Corrected Code

Here's the fixed version of your upsert logic, plus some other small optimizations (like reusing the zip file handle instead of reopening it):

import os
import zipfile
import datetime as dt
import logging
import xml.etree.ElementTree as ET
import psycopg2
from psycopg2.extras import execute_values

# Configure logger (you probably have this already)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
temp = "./temp"  # Define your temp path properly

# Ensure temp directory exists
os.makedirs(temp, exist_ok=True)

# Database connection (replace with your connection params)
conn = psycopg2.connect(
    dbname="your_db", user="your_user", password="your_pass", host="your_host"
)

for zip_file in os.listdir():
    if zipfile.is_zipfile(zip_file):
        logger.info("Processing %s", zip_file)
        # Reuse the zip file handle instead of reopening multiple times
        with zipfile.ZipFile(zip_file) as zf:
            for member in zf.namelist():
                now = dt.datetime.now()
                prices_list = []
                local_xml = os.path.join(temp, member)
                zf.extract(member, path=temp)
                
                try:
                    tree = ET.parse(local_xml)
                    root = tree.getroot()
                    ns = root.tag[:-4]
                    
                    for finhist in root.findall(f"./{ns}FinancialHistory"):
                        asset_id = int(finhist.get("Id"))
                        logger.debug("Processing %s", asset_id)
                        
                        for prices in finhist.findall(f"./{ns}Prices"):
                            price_currency = prices.get("Currency")
                            
                            for priceset in prices.findall(f"./{ns}PriceSet"):
                                price_date = priceset.get("Date")
                                
                                for price in priceset.findall(f"./{ns}Price"):
                                    price_value = float(price.text)
                                    price_type = price.get("Type")
                                    prices_list.append(
                                        (asset_id, price_date, price_type, price_value, price_currency, now, zip_file)
                                    )
                finally:
                    # Clean up temp file even if parsing fails
                    try:
                        os.remove(local_xml)
                    except Exception:
                        logger.error("File cannot be deleted", exc_info=True)
        
        # Bulk upsert the collected data
        if prices_list:  # Only run if we have data to insert/update
            cur = conn.cursor()
            try:
                # Correct upsert query using execute_values
                upsert_query = """
                    INSERT INTO "LTSF"."Prices" (
                        "Asset_ID", "Price_Date", "Price_Type", 
                        "Price_Value", "Price_Currency", "Mod_Date", "Zip_File"
                    )
                    VALUES %s
                    ON CONFLICT ("Asset_ID", "Price_Date", "Price_Type")
                    DO UPDATE SET
                        "Price_Value" = EXCLUDED."Price_Value",
                        "Price_Currency" = EXCLUDED."Price_Currency",
                        "Mod_Date" = EXCLUDED."Mod_Date",
                        "Zip_File" = EXCLUDED."Zip_File"
                """
                # Execute the bulk upsert
                execute_values(cur, upsert_query, prices_list)
                conn.commit()
                logger.info(f"Successfully upserted {len(prices_list)} rows from {zip_file}")
            except Exception:
                logger.error("Problem upserting Prices", exc_info=True)
                conn.rollback()  # Rollback on error
            finally:
                cur.close()

# Close the database connection
conn.close()

Key Improvements Explained

  1. Removed the WITH Clause: execute_values directly injects the VALUES list where %s is placed, which fits perfectly with the standard INSERT ... ON CONFLICT pattern.
  2. Used EXCLUDED Alias: PostgreSQL provides the EXCLUDED keyword to reference the row that would have been inserted—this is cleaner than trying to alias a CTE.
  3. Proper Transaction Handling: Added conn.rollback() on error to avoid leaving transactions open, and wrapped the zip file in a with statement to handle resources correctly.
  4. Skipped Empty Upserts: Only runs the query if prices_list has data to avoid unnecessary database calls.

Why This Is Efficient

  • execute_values minimizes round-trips to the database by sending all rows in a single query (batched under the hood if needed).
  • The ON CONFLICT DO UPDATE is handled entirely on the database side, so you don't have to split logic into separate SELECT and INSERT/UPDATE calls.

内容的提问来源于stack exchange,提问作者Lloyd Jackman

火山引擎 最新活动