请求协助将SAS代码转换为PySpark DataFrame并修正日期计算
Hey there! Let's work through your PySpark date code issues and convert that SAS query step by step—no jargon, just straight fixes and explanations.
First, let's sort out your date calculation snippet. There are a couple of small mistakes throwing it off:
- You don't need
F.lit()here—date_subanddate_addoperate directly on date columns, not literal values. - The way you reference the
FDOS_LNcolumn is off; you can useF.col("FDOS_LN")or just the column name if you've imported functions properly. - You had a missing closing parenthesis at the end of your code.
Here's the corrected version to add your date columns to nuclear_df:
from pyspark.sql import functions as F # Proper date column calculations nuclear_df = nuclear_df.withColumn( "two_before", F.date_sub(F.col("FDOS_LN"), 2) ).withColumn( "two_after", F.date_add(F.col("FDOS_LN"), 2) )
Next, let's convert your SAS query to PySpark. The core logic is: select all records from radio_df where the MBR_ID doesn't appear in a subquery that matches members between radio_df and nuclear_df, with radio_df's date falling within ±2 days of nuclear_df's date.
Quick note: In your SAS code, you have a.MBRID vs b.MBR_ID—double-check that these column names are consistent in your actual data (I'll assume they're both MBR_ID for the examples below).
Option 1: Mirror the SAS NOT IN logic (subquery approach)
This directly matches how your SAS code is structured:
# First get the distinct MBR_IDs we need to exclude excluded_mbrs = radio_df.alias("a").join( nuclear_df.alias("b"), F.col("a.MBR_ID") == F.col("b.MBR_ID"), "inner" ).filter( F.col("a.FDOS_LN").between(F.col("b.two_before"), F.col("b.two_after")) ).select(F.col("a.MBR_ID")).distinct() # Filter radio_df to remove those excluded IDs result_df = radio_df.filter(~F.col("MBR_ID").isin(excluded_mbrs.rdd.flatMap(lambda x: x).collect()))
Option 2: Use a left_anti join (recommended for performance)
PySpark handles large datasets better with joins than NOT IN subqueries, since it avoids collecting data to the driver. This is the more efficient approach:
# Create a dataset of IDs that meet the exclusion condition excluded_conditions = radio_df.alias("a").join( nuclear_df.alias("b"), (F.col("a.MBR_ID") == F.col("b.MBR_ID")) & (F.col("a.FDOS_LN").between(F.col("b.two_before"), F.col("b.two_after"))), "inner" ).select(F.col("a.MBR_ID")).distinct() # Left anti join keeps only records in radio_df that don't match the excluded IDs result_df = radio_df.join(excluded_conditions, on="MBR_ID", how="left_anti")
Both approaches will give you the exact same result as your SAS query, but the left_anti join is the better practice for big data workloads.
内容的提问来源于stack exchange,提问作者Deepa




