如何使用psycopg2.extras.execute_values实现PostgreSQL Upsert操作
Fixing Your Efficient PostgreSQL Upsert with psycopg2's
execute_values Great call using execute_values for bulk operations—it's way more efficient than individual execute calls. Let's walk through fixing your syntax errors and optimizing the upsert logic:
First, the Core Syntax Issues
Your error comes from two main problems:
- Misusing the
%splaceholder with aWITHclause—execute_valuesreplaces%swith a fullVALUES (...)list, so wrapping it in aWITHblock breaks PostgreSQL's syntax. - Incomplete
ON CONFLICTsyntax: You need to specify which unique constraint/columns trigger the conflict, and yourUPDATEclause had incorrect formatting (commas instead ofAND, missingSETkeyword, etc.).
Assumptions to Note
I'm assuming your LTSF.Prices table has a unique constraint on ("Asset_ID", "Price_Date", "Price_Type")—this is required for ON CONFLICT to work (PostgreSQL needs to know what constitutes a duplicate row). If you don't have this constraint, add it first:
ALTER TABLE "LTSF"."Prices" ADD CONSTRAINT unique_asset_date_type UNIQUE ("Asset_ID", "Price_Date", "Price_Type");
Corrected Code
Here's the fixed version of your upsert logic, plus some other small optimizations (like reusing the zip file handle instead of reopening it):
import os import zipfile import datetime as dt import logging import xml.etree.ElementTree as ET import psycopg2 from psycopg2.extras import execute_values # Configure logger (you probably have this already) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) temp = "./temp" # Define your temp path properly # Ensure temp directory exists os.makedirs(temp, exist_ok=True) # Database connection (replace with your connection params) conn = psycopg2.connect( dbname="your_db", user="your_user", password="your_pass", host="your_host" ) for zip_file in os.listdir(): if zipfile.is_zipfile(zip_file): logger.info("Processing %s", zip_file) # Reuse the zip file handle instead of reopening multiple times with zipfile.ZipFile(zip_file) as zf: for member in zf.namelist(): now = dt.datetime.now() prices_list = [] local_xml = os.path.join(temp, member) zf.extract(member, path=temp) try: tree = ET.parse(local_xml) root = tree.getroot() ns = root.tag[:-4] for finhist in root.findall(f"./{ns}FinancialHistory"): asset_id = int(finhist.get("Id")) logger.debug("Processing %s", asset_id) for prices in finhist.findall(f"./{ns}Prices"): price_currency = prices.get("Currency") for priceset in prices.findall(f"./{ns}PriceSet"): price_date = priceset.get("Date") for price in priceset.findall(f"./{ns}Price"): price_value = float(price.text) price_type = price.get("Type") prices_list.append( (asset_id, price_date, price_type, price_value, price_currency, now, zip_file) ) finally: # Clean up temp file even if parsing fails try: os.remove(local_xml) except Exception: logger.error("File cannot be deleted", exc_info=True) # Bulk upsert the collected data if prices_list: # Only run if we have data to insert/update cur = conn.cursor() try: # Correct upsert query using execute_values upsert_query = """ INSERT INTO "LTSF"."Prices" ( "Asset_ID", "Price_Date", "Price_Type", "Price_Value", "Price_Currency", "Mod_Date", "Zip_File" ) VALUES %s ON CONFLICT ("Asset_ID", "Price_Date", "Price_Type") DO UPDATE SET "Price_Value" = EXCLUDED."Price_Value", "Price_Currency" = EXCLUDED."Price_Currency", "Mod_Date" = EXCLUDED."Mod_Date", "Zip_File" = EXCLUDED."Zip_File" """ # Execute the bulk upsert execute_values(cur, upsert_query, prices_list) conn.commit() logger.info(f"Successfully upserted {len(prices_list)} rows from {zip_file}") except Exception: logger.error("Problem upserting Prices", exc_info=True) conn.rollback() # Rollback on error finally: cur.close() # Close the database connection conn.close()
Key Improvements Explained
- Removed the
WITHClause:execute_valuesdirectly injects theVALUESlist where%sis placed, which fits perfectly with the standardINSERT ... ON CONFLICTpattern. - Used
EXCLUDEDAlias: PostgreSQL provides theEXCLUDEDkeyword to reference the row that would have been inserted—this is cleaner than trying to alias a CTE. - Proper Transaction Handling: Added
conn.rollback()on error to avoid leaving transactions open, and wrapped the zip file in awithstatement to handle resources correctly. - Skipped Empty Upserts: Only runs the query if
prices_listhas data to avoid unnecessary database calls.
Why This Is Efficient
execute_valuesminimizes round-trips to the database by sending all rows in a single query (batched under the hood if needed).- The
ON CONFLICT DO UPDATEis handled entirely on the database side, so you don't have to split logic into separateSELECTandINSERT/UPDATEcalls.
内容的提问来源于stack exchange,提问作者Lloyd Jackman




