Flume同步MySQL与HDFS的重复数据问题及存储机制问询
Hey there! Let's tackle your three questions about Flume's MySQL-to-HDFS pipeline—they're all super relevant for real-time data sync scenarios, so let's break them down clearly.
Short answer: Yes, duplicates can occur, but it depends on your source setup and how you handle failures. Here’s why:
- If you’re using the
Taildir Sourceto monitor MySQL’s binlog files: Taildir tracks its read position in a JSON file (default path:~/.flume/taildir_position.json). A graceful shutdown saves the latest offset, but unexpected crashes mean any events read but not confirmed by the sink will be re-sent on restart—leading to duplicates. - If you’re using a custom JDBC Source (pulling incremental data via queries): Duplicates happen if your query logic doesn’t properly track the last fetched record. For example, if you pull data with
WHERE updated_at > last_fetched_time, and a record’supdated_atgets modified during the pull, that record will be picked up again in the next run. Or if you forget to updatelast_fetched_timeafter a successful pull, you’ll re-pull the same batch repeatedly.
Flume writes raw data files to HDFS, but to map those files to a "table" (like a Hive table), you’ll pair Flume with a metastore (e.g., Hive Metastore). Here’s a step-by-step breakdown:
Step 1: Configure your Flume Agent
You’ll need three core components: a source (to pull from MySQL), a channel (to buffer events), and an HDFS sink (to write to HDFS). Here’s a practical example config:
# Define components agent.sources = mysqlBinlogSource agent.channels = fileChannel agent.sinks = hdfsSink # Source: Monitor MySQL binlog (using Taildir) agent.sources.mysqlBinlogSource.type = TAILDIR agent.sources.mysqlBinlogSource.positionFile = /opt/flume/taildir_pos.json agent.sources.mysqlBinlogSource.filegroups = mysqlBinlogs agent.sources.mysqlBinlogSource.filegroups.mysqlBinlogs = /var/lib/mysql/mysql-bin.0000* # Add interceptor to parse binlog into structured events agent.sources.mysqlBinlogSource.interceptors = binlogParser agent.sources.mysqlBinlogSource.interceptors.binlogParser.type = com.cloudera.flume.interceptor.BinlogInterceptor$Builder agent.sources.mysqlBinlogSource.interceptors.binlogParser.database = my_target_db agent.sources.mysqlBinlogSource.interceptors.binlogParser.table = my_target_table # Channel: Use File Channel for persistence (better for production) agent.channels.fileChannel.type = file agent.channels.fileChannel.checkpointDir = /opt/flume/checkpoint agent.channels.fileChannel.dataDirs = /opt/flume/data # Sink: Write to HDFS in Parquet format (optimized for analytics) agent.sinks.hdfsSink.type = hdfs agent.sinks.hdfsSink.hdfs.path = hdfs://namenode:8020/user/hive/warehouse/my_target_db.db/my_target_table/dt=%Y%m%d agent.sinks.hdfsSink.hdfs.filePrefix = my_target_table agent.sinks.hdfsSink.hdfs.fileSuffix = .parquet agent.sinks.hdfsSink.hdfs.rollInterval = 3600 # Roll file every hour agent.sinks.hdfsSink.hdfs.batchSize = 1000 # Write 1000 events at a time agent.sinks.hdfsSink.hdfs.serializer = parquet agent.sinks.hdfsSink.hdfs.serializer.schema = "id INT, username STRING, created_at TIMESTAMP"
Step 2: Create a Hive External Table
Once Flume starts writing files to the HDFS path, create a Hive external table that points to that location. This lets you query the data like a regular SQL table:
CREATE EXTERNAL TABLE my_target_db.my_target_table ( id INT, username STRING, created_at TIMESTAMP ) PARTITIONED BY (dt STRING) # Match the date partition in Flume's HDFS path STORED AS PARQUET LOCATION 'hdfs://namenode:8020/user/hive/warehouse/my_target_db.db/my_target_table/';
To load new partitions automatically, run:
SET hive.msck.path.validation=ignore; MSCK REPAIR TABLE my_target_db.my_target_table;
This ties back to the first question but focuses specifically on sync pipelines. Here are the most common ways duplicates creep in, plus mitigation tips:
Common Duplicate Scenarios
- Agent crashes/restarts: File Channel persists unprocessed events, which are re-sent on restart. If those events were partially written to HDFS before the crash, you’ll get duplicates.
- Corrupted position file: For Taildir Source, if the position file (tracking binlog progress) gets corrupted or deleted, Flume will re-read from the start of binlog files, re-sending all old data.
- Flawed JDBC Source logic: If your custom JDBC Source doesn’t update its checkpoint (e.g., last fetched ID/timestamp) after a successful pull, it’ll re-pull the same dataset every run.
- Sink retry failures: If HDFS returns an error even though the write succeeded (network blip), Flume will retry sending the event, resulting in duplicate writes.
How to Reduce/Handle Duplicates
- Use File Channel (not Memory Channel): Memory Channel loses data on crashes, but File Channel ensures no data loss—though it can cause duplicates. It’s the safer production choice.
- Implement data-level deduplication: Add a unique key (like MySQL’s primary key) to your Hive table, then use
MERGEorINSERT OVERWRITEto keep only the latest record per key. Or use a stream processor like Flink to deduplicate events before writing to HDFS. - Secure the position file: Store the Taildir position file on reliable shared storage (like HDFS) instead of local disk to avoid corruption if the agent node fails.
- Add a checkpoint table for JDBC Sources: Create a small MySQL table to track the last fetched ID/timestamp. Read this value before each pull, and update it after success to avoid re-pulling old data.
- Use a middle layer like Kafka: For exactly-once semantics, route Flume events to Kafka first, then use Flink to consume Kafka and write to HDFS. Flink’s exactly-once guarantees eliminate duplicates entirely.
内容的提问来源于stack exchange,提问作者Kshitiz Katiyar




