You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Windows 10系统的PyCharm中使用Apache Flink DataSet Python API

Hey there! I totally get how frustrating it is to spend hours searching for a complete guide to set up Apache Flink for Python, especially when you want to avoid Java/Scala entirely. Let me break this down for you step by step, tailored for PyCharm.

Step 1: Install Core Dependencies

First, let’s make sure you have all the basics covered:

  • Python Version: Apache Flink’s Python API supports Python 3.8 to 3.11 (double-check the version match for your Flink release—newer versions might extend this range).
  • Install Flink Python Package: Open PyCharm’s terminal (or your system terminal) and run:
    pip install apache-flink
    
    This installs the Python bindings, but you’ll still need the Flink binary distribution to run jobs locally or connect to a cluster.
  • Download Flink Binary: Grab the latest stable Flink version (stick to releases 1.17+ for best Python support), extract it to a directory on your machine. Let’s call this path FLINK_HOME (e.g., /Users/you/flink-1.18.0 on macOS or C:\flink-1.18.0 on Windows).
Step 2: Configure PyCharm Environment

To make PyCharm recognize Flink properly:

  • Set Environment Variables: Go to Run > Edit Configurations in PyCharm. For your Python script’s run configuration, add two environment variables:
    • FLINK_HOME: Path to your extracted Flink binary folder
    • PATH: Append $FLINK_HOME/bin (macOS/Linux) or %FLINK_HOME%\bin (Windows) to the existing PATH
  • Verify Interpreter: Ensure your PyCharm project is using the same Python interpreter where you installed apache-flink (check in File > Project Structure > Project Interpreter).

Let’s test with a classic WordCount example. Create a new Python file in your project (e.g., word_count.py) with this code:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def main():
    # Set up the execution environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)  # Use 1 parallelism for local testing

    # Create table environment
    settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(env, environment_settings=settings)

    # Create a simple data source (generates random words)
    t_env.execute_sql("""
        CREATE TABLE words (
            word STRING
        ) WITH (
            'connector' = 'datagen',
            'rows-per-second' = '5',
            'fields.word.length' = '5'
        )
    """)

    # Process data: count word occurrences
    result_table = t_env.sql_query("""
        SELECT word, COUNT(*) as count
        FROM words
        GROUP BY word
    """)

    # Print results to console
    t_env.execute_sql("""
        CREATE TABLE print_result (
            word STRING,
            count BIGINT
        ) WITH (
            'connector' = 'print'
        )
    """)

    result_table.execute_insert("print_result").wait()

if __name__ == "__main__":
    main()
Step 4: Run the Job in PyCharm

You have two options to run this job:

  • Local Standalone Mode:
    1. First, start the Flink local cluster: Open a terminal, navigate to FLINK_HOME/bin, and run start-cluster.sh (macOS/Linux) or start-cluster.bat (Windows).
    2. In PyCharm, run your word_count.py script directly. It will connect to the local cluster and execute the job. You can monitor the job via the Flink Web UI at http://localhost:8081.
  • Embedded Mode:
    If you don’t want to start a separate cluster, run the job in embedded mode by adjusting the environment setup. Modify the main() function to use:
    env = StreamExecutionEnvironment.create_local_execution_environment()
    
    This runs Flink directly within your Python process—perfect for quick testing without extra cluster setup.
Step 5: Troubleshooting Common Hiccups
  • Version Mismatch: If you get errors about missing classes or incompatible APIs, make sure your apache-flink pip version exactly matches the Flink binary version (e.g., if you downloaded Flink 1.18.0, run pip install apache-flink==1.18.0).
  • Flink Runtime Not Found: Double-check that FLINK_HOME is set correctly and that the Flink binary path is added to your PATH. Verify by running flink --version in PyCharm’s terminal—if it returns the version number, you’re good to go.
  • Python Interpreter Issues: Ensure you’re using a CPython interpreter (PyPy isn’t fully supported by Flink right now).

I hope this clears up the setup process for you—let me know if you hit any specific roadblocks!

内容的提问来源于stack exchange,提问作者Daniyal

火山引擎 最新活动