You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

请求提供Flink-SQL流处理示例(含Kafka与数据库数据源)

Hey there! Let me share some practical, ready-to-use Flink SQL examples for stream processing, covering both Kafka and database data sources. I'll break each scenario down with clear code and explanations so you can easily adapt them to your projects.

Kafka is the go-to for streaming data ingestion, so let's start with how to consume, process, and output data from Kafka using Flink SQL.

1.1 Create a Kafka Source Table

First, define a table that maps to your Kafka topic. This example uses JSON-formatted messages:

CREATE TABLE kafka_order_stream (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10,2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND -- Define watermark for event-time processing
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_topic',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'properties.group.id' = 'flink-sql-order-consumer',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset' -- Start consuming from latest offset; use 'earliest-offset' for full history
);

Key Notes:

  • The WATERMARK clause is crucial for event-time window processing—it tells Flink how to handle out-of-order events.
  • Adjust scan.startup.mode based on whether you need to process historical data or just new incoming messages.

1.2 Process Kafka Stream Data

Let's run a real-time aggregation to calculate total sales per user every 1 minute:

SELECT
    user_id,
    TUMBLE_START(order_time, INTERVAL '1' MINUTE) AS window_start,
    SUM(amount) AS total_spent
FROM kafka_order_stream
GROUP BY
    user_id,
    TUMBLE(order_time, INTERVAL '1' MINUTE);

You can also filter data on the fly—for example, only keep orders over $100:

SELECT * FROM kafka_order_stream WHERE amount > 100;

1.3 Write Processed Data to Kafka Sink

Let's output the aggregated results to another Kafka topic:

CREATE TABLE kafka_user_sales_sink (
    user_id STRING,
    window_start TIMESTAMP(3),
    total_spent DECIMAL(10,2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_sales_summary',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'format' = 'json'
);

INSERT INTO kafka_user_sales_sink
SELECT
    user_id,
    TUMBLE_START(order_time, INTERVAL '1' MINUTE) AS window_start,
    SUM(amount) AS total_spent
FROM kafka_order_stream
GROUP BY
    user_id,
    TUMBLE(order_time, INTERVAL '1' MINUTE);

For database sources, we use Change Data Capture (CDC) to capture real-time inserts, updates, and deletes from databases like MySQL. Flink has native support for CDC via the flink-connector-mysql-cdc plugin.

2.1 Create MySQL CDC Source Table

This table will capture all changes from a users table in MySQL:

CREATE TABLE mysql_users_cdc (
    user_id STRING PRIMARY KEY NOT ENFORCED, -- Primary key is required for CDC
    username STRING,
    email STRING,
    registration_time TIMESTAMP(3),
    _op STRING -- CDC operation type: 'INSERT', 'UPDATE', 'DELETE'
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql-host',
    'port' = '3306',
    'username' = 'root',
    'password' = 'your-password',
    'database-name' = 'user_db',
    'table-name' = 'users',
    'scan.incremental.snapshot.enabled' = 'true' -- Enable initial snapshot + incremental changes
);

Key Notes:

  • The _op column is automatically added by the CDC connector to indicate the type of change.
  • scan.incremental.snapshot.enabled lets you first read the full existing table data, then continue capturing real-time changes.

2.2 Process CDC Data

Let's capture only new user registrations (INSERT operations) and calculate daily sign-ups:

SELECT
    DATE(registration_time) AS signup_date,
    COUNT(user_id) AS new_users
FROM mysql_users_cdc
WHERE _op = 'INSERT'
GROUP BY DATE(registration_time);

Or track updates to user emails:

SELECT
    user_id,
    email,
    CURRENT_TIMESTAMP() AS update_time
FROM mysql_users_cdc
WHERE _op = 'UPDATE';
3. Combine Kafka and Database Data (Real-Time Enrichment)

A common use case is enriching streaming data from Kafka with dimension data from a database. For example, enriching order data from Kafka with user details from MySQL.

We'll use a Lookup Join—this lets Flink query the MySQL table in real-time to get user info for each order:

-- First, create a lookup table for MySQL users (non-CDC, for point-in-time queries)
CREATE TABLE mysql_users_lookup (
    user_id STRING PRIMARY KEY NOT ENFORCED,
    username STRING,
    email STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql-host:3306/user_db',
    'table-name' = 'users',
    'username' = 'root',
    'password' = 'your-password',
    'lookup.cache.max-rows' = '1000', -- Cache frequent queries to reduce DB load
    'lookup.cache.ttl' = '10min'
);

-- Enrich Kafka order stream with user details
SELECT
    o.order_id,
    o.user_id,
    u.username,
    u.email,
    o.amount,
    o.order_time
FROM kafka_order_stream o
LEFT JOIN mysql_users_lookup u FOR SYSTEM_TIME AS OF o.order_time
ON o.user_id = u.user_id;

Key Notes:

  • FOR SYSTEM_TIME AS OF ensures we get the user data as it existed at the time of the order (avoiding stale data from updates).
  • The lookup cache reduces the number of direct queries to MySQL, which is critical for performance in high-throughput streams.

内容的提问来源于stack exchange,提问作者Avinash Tripathy

火山引擎 最新活动