如何在Windows 10系统的PyCharm中使用Apache Flink DataSet Python API
Hey there! I totally get how frustrating it is to spend hours searching for a complete guide to set up Apache Flink for Python, especially when you want to avoid Java/Scala entirely. Let me break this down for you step by step, tailored for PyCharm.
Step 1: Install Core Dependencies
First, let’s make sure you have all the basics covered:
- Python Version: Apache Flink’s Python API supports Python 3.8 to 3.11 (double-check the version match for your Flink release—newer versions might extend this range).
- Install Flink Python Package: Open PyCharm’s terminal (or your system terminal) and run:
This installs the Python bindings, but you’ll still need the Flink binary distribution to run jobs locally or connect to a cluster.pip install apache-flink - Download Flink Binary: Grab the latest stable Flink version (stick to releases 1.17+ for best Python support), extract it to a directory on your machine. Let’s call this path
FLINK_HOME(e.g.,/Users/you/flink-1.18.0on macOS orC:\flink-1.18.0on Windows).
Step 2: Configure PyCharm Environment
To make PyCharm recognize Flink properly:
- Set Environment Variables: Go to
Run > Edit Configurationsin PyCharm. For your Python script’s run configuration, add two environment variables:FLINK_HOME: Path to your extracted Flink binary folderPATH: Append$FLINK_HOME/bin(macOS/Linux) or%FLINK_HOME%\bin(Windows) to the existing PATH
- Verify Interpreter: Ensure your PyCharm project is using the same Python interpreter where you installed
apache-flink(check inFile > Project Structure > Project Interpreter).
Step 3: Write a Simple Flink Python Job
Let’s test with a classic WordCount example. Create a new Python file in your project (e.g., word_count.py) with this code:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings def main(): # Set up the execution environment env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) # Use 1 parallelism for local testing # Create table environment settings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=settings) # Create a simple data source (generates random words) t_env.execute_sql(""" CREATE TABLE words ( word STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.word.length' = '5' ) """) # Process data: count word occurrences result_table = t_env.sql_query(""" SELECT word, COUNT(*) as count FROM words GROUP BY word """) # Print results to console t_env.execute_sql(""" CREATE TABLE print_result ( word STRING, count BIGINT ) WITH ( 'connector' = 'print' ) """) result_table.execute_insert("print_result").wait() if __name__ == "__main__": main()
Step 4: Run the Job in PyCharm
You have two options to run this job:
- Local Standalone Mode:
- First, start the Flink local cluster: Open a terminal, navigate to
FLINK_HOME/bin, and runstart-cluster.sh(macOS/Linux) orstart-cluster.bat(Windows). - In PyCharm, run your
word_count.pyscript directly. It will connect to the local cluster and execute the job. You can monitor the job via the Flink Web UI athttp://localhost:8081.
- First, start the Flink local cluster: Open a terminal, navigate to
- Embedded Mode:
If you don’t want to start a separate cluster, run the job in embedded mode by adjusting the environment setup. Modify themain()function to use:
This runs Flink directly within your Python process—perfect for quick testing without extra cluster setup.env = StreamExecutionEnvironment.create_local_execution_environment()
Step 5: Troubleshooting Common Hiccups
- Version Mismatch: If you get errors about missing classes or incompatible APIs, make sure your
apache-flinkpip version exactly matches the Flink binary version (e.g., if you downloaded Flink 1.18.0, runpip install apache-flink==1.18.0). - Flink Runtime Not Found: Double-check that
FLINK_HOMEis set correctly and that the Flink binary path is added to your PATH. Verify by runningflink --versionin PyCharm’s terminal—if it returns the version number, you’re good to go. - Python Interpreter Issues: Ensure you’re using a CPython interpreter (PyPy isn’t fully supported by Flink right now).
I hope this clears up the setup process for you—let me know if you hit any specific roadblocks!
内容的提问来源于stack exchange,提问作者Daniyal




