如何从OpenFIGI API取数并存储至AWS/Snowflake?含输入表单咨询
Hey PJ, let's walk through a practical, quick-start solution to get you pulling data from OpenFIGI and storing it in AWS or Snowflake—with all the pieces you need for input forms and cross-table joins. Here's how to break it down:
First, you need a way to submit the structured requests OpenFIGI expects (JSON arrays of objects with fields like idType, idValue, exchCode). Here are two low-friction options:
Option 1: No-Code/Low-Code Quick Win (Google Sheets + Apps Script)
Use a spreadsheet to batch input your request parameters, then a simple script to convert rows to OpenFIGI's format and fire off the API call:
- Create a Google Sheet with headers:
idType,idValue,exchCode(add other optional fields likemicCodeif needed) - Paste this script into the Apps Script editor:
function fetchOpenFIGIData() { const sheet = SpreadsheetApp.getActiveSpreadsheet().getActiveSheet(); const [headers, ...rows] = sheet.getDataRange().getValues(); // Convert rows to OpenFIGI request format const figiRequests = rows.map(row => { const req = {}; headers.forEach((col, idx) => { if (row[idx]) req[col] = row[idx]; }); return req; }); // Call OpenFIGI API const apiResponse = UrlFetchApp.fetch('https://api.openfigi.com/v2/search', { method: 'post', contentType: 'application/json', payload: JSON.stringify(figiRequests), muteHttpExceptions: true }); // Write results to a new sheet const resultsSheet = sheet.getParent().insertSheet('FIGI Results'); const figiData = JSON.parse(apiResponse.getContentText()); resultsSheet.getRange(1, 1, figiData.length, Object.keys(figiData[0]).length) .setValues(figiData.map(item => Object.values(item))); } - Run the script to fetch data—you can even add a custom button to the sheet for one-click execution.
Option 2: Python/Streamlit Form (For More Control)
Build a simple web form to upload CSVs or input parameters manually, then handle the API call:
import streamlit as st import requests import pandas as pd st.title("OpenFIGI Bulk Search Tool") # Upload CSV or enter single parameters uploaded_file = st.file_uploader("Upload CSV (columns: idType, idValue, exchCode)", type="csv") if not uploaded_file: id_type = st.selectbox("ID Type", ["TICKER", "ID_ISIN", "ID_BB_UNIQUE"]) id_value = st.text_input("ID Value") exch_code = st.text_input("Exchange Code (e.g., US)") figi_requests = [{"idType": id_type, "idValue": id_value, "exchCode": exch_code}] else: df = pd.read_csv(uploaded_file) figi_requests = df.to_dict("records") if st.button("Fetch FIGI Data"): response = requests.post("https://api.openfigi.com/v2/search", json=figi_requests) figi_results = pd.DataFrame(response.json()) st.dataframe(figi_results) # Add logic here to send results to AWS/Snowflake (covered below)
Step 1: Temporary Storage (S3)
Start with S3 for low-cost, scalable暂存:
- Create an S3 bucket (e.g.,
my-openfigi-data) - Modify your input layer script to save JSON results to S3 (using
boto3for Python):import boto3 s3 = boto3.client('s3') s3.put_object( Bucket='my-openfigi-data', Key=f'results/{pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M")}_batch.json', Body=figi_results.to_json(orient='records') )
Step 2: Prepare for Cross-Table Joins
Use AWS Glue and Athena to turn raw JSON into queryable tables:
- Run a Glue Crawler on your S3 bucket to auto-detect schema and create a Glue Table
- Query and join with other tables in Athena:
SELECT bt.transaction_id, bt.ticker, of.figi, of.securityType FROM business_transactions bt JOIN openfigi_results of ON bt.ticker = of.ticker AND bt.exchange_code = of.exchCode
Step 3: Direct Load to Redshift (For Production)
If you need to load directly into a data warehouse:
- Create a Redshift table matching OpenFIGI's response schema:
CREATE TABLE openfigi_data ( figi VARCHAR(20) PRIMARY KEY, idType VARCHAR(50), idValue VARCHAR(100), securityType VARCHAR(50), ticker VARCHAR(50), exchCode VARCHAR(10), metadata JSON ); - Use the Redshift Data API to insert data from your input script:
import redshift_connector conn = redshift_connector.connect(...) cursor = conn.cursor() for _, row in figi_results.iterrows(): cursor.execute(""" INSERT INTO openfigi_data VALUES (%s, %s, %s, %s, %s, %s, %s) """, (row.figi, row.idType, row.idValue, row.securityType, row.ticker, row.exchCode, row.to_json())) conn.commit()
Step 1: Temporary Storage (Snowflake Stage)
Use a Snowflake internal stage to hold raw JSON data:
CREATE STAGE openfigi_temp_stage FILE_FORMAT = (TYPE = JSON);
Upload data to the stage from your input script:
import snowflake.connector conn = snowflake.connector.connect(...) cursor = conn.cursor() cursor.execute("PUT file:///tmp/openfigi_results.json @openfigi_temp_stage")
Step 2: Load to Table & Join with Other Data
- Create a Snowflake table to store structured OpenFIGI data:
CREATE TABLE openfigi_data ( figi STRING PRIMARY KEY, id_type STRING, id_value STRING, security_type STRING, ticker STRING, exch_code STRING, metadata VARIANT ); - Load data from the stage and parse JSON:
COPY INTO openfigi_data FROM ( SELECT $1:figi::STRING, $1:idType::STRING, $1:idValue::STRING, $1:securityType::STRING, $1:ticker::STRING, $1:exchCode::STRING, $1 FROM @openfigi_temp_stage/openfigi_results.json ); - Join with your existing tables:
SELECT cp.customer_id, cp.holding_ticker, of.figi, of.security_type FROM customer_portfolio cp JOIN openfigi_data of ON cp.holding_ticker = of.ticker AND cp.exchange = of.exch_code
Quick Note on Rate Limits
Don't forget OpenFIGI's free tier rate limit (60 requests/minute). Add retry logic and throttling to your scripts to avoid errors.
内容的提问来源于stack exchange,提问作者Johnson36912




