如何用Flink CEP实现暴力登录与端口扫描攻击的检测模式
Hey there, let's break down how to implement brute-force login and port scan detection using Apache Flink CEP for your log ingestion scenario. Unlike simple constant matches you might have seen before, these rely on grouped, time-windowed pattern counting—which is exactly where Flink CEP shines for behavioral threat detection.
Requirements
Trigger an alert when the same username generates 5 or more "login failure" events within a 5-minute window.
Implementation Steps & Code
First, define a POJO to model your login events (adjust fields to match your actual log structure):
public class LoginEvent { private String username; private String eventType; // Expected values: "login failure" / "login success" private long timestamp; // Constructors, getters, setters }
Next, build the CEP pattern to track repeated failures for a single user:
import org.apache.flink.cep.Pattern; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.functions.PatternSelectFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.ArrayList; import java.util.List; // Build the brute-force detection pattern Pattern<LoginEvent, LoginEvent> bruteForcePattern = Pattern .<LoginEvent>begin("first_failure") .where(evt -> "login failure".equals(evt.getEventType())) .next("subsequent_failures") .where(evt -> "login failure".equals(evt.getEventType())) .timesOrMore(4) // Total failures = 1 (from begin) + 4+ (from next) = 5+ .within(Time.minutes(5)) .consecutive(); // Optional: Ensures failures are consecutive (no success events in between) // Apply pattern to your log stream DataStream<LoginEvent> loginLogStream = ...; // Replace with your source (Kafka, etc.) PatternStream<LoginEvent> bruteForcePatternStream = CEP.pattern( loginLogStream.keyBy(LoginEvent::getUsername), // Group events by username bruteForcePattern ); // Generate alerts from matched patterns DataStream<String> bruteForceAlerts = bruteForcePatternStream.select( (PatternSelectFunction<LoginEvent, String>) pattern -> { List<LoginEvent> allFailures = new ArrayList<>(pattern.get("first_failure")); allFailures.addAll(pattern.get("subsequent_failures")); String username = allFailures.get(0).getUsername(); long start = allFailures.get(0).getTimestamp(); long end = allFailures.get(allFailures.size() - 1).getTimestamp(); return String.format("[BRUTE-FORCE ALERT] Username '%s' failed to login %d times between %d and %d", username, allFailures.size(), start, end); } );
Key Notes
keyBy(LoginEvent::getUsername)ensures we only track failures for individual users.timesOrMore(4)accounts for the initial "first_failure" event, so total failures hit 5+.- Remove
.consecutive()if you want to count failures even if there are successful logins in between (adjust based on your threat model).
Requirements
Trigger an alert when the same source IP accesses 10 or more unique destination ports within a 1-minute window.
Implementation Options
Flink offers two solid approaches here—CEP for pattern tracking, or window aggregation for simpler counting. Let's cover both:
Option 1: Using Flink CEP
First, define your port event POJO:
public class PortAccessEvent { private String sourceIp; private int destPort; private long timestamp; // Constructors, getters, setters }
Build the CEP pattern to track unique ports per IP:
import org.apache.flink.cep.conditions.IterativeCondition; import org.apache.flink.cep.Pattern; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; Pattern<PortAccessEvent, PortAccessEvent> portScanPattern = Pattern .<PortAccessEvent>begin("first_scan") .followedByAny("subsequent_scans") // Allow any order of events in the window .within(Time.minutes(1)) .with(new IterativeCondition<PortAccessEvent>() { @Override public boolean filter(PortAccessEvent currentEvent, Context<PortAccessEvent> ctx) throws Exception { // Collect all matched events for the current IP List<PortAccessEvent> allScans = new ArrayList<>(ctx.getEventsForPattern("first_scan")); allScans.addAll(ctx.getEventsForPattern("subsequent_scans")); allScans.add(currentEvent); // Count unique destination ports Set<Integer> uniquePorts = allScans.stream() .map(PortAccessEvent::getDestPort) .collect(Collectors.toSet()); // Trigger alert when unique ports reach 10 return uniquePorts.size() >= 10; } }); // Apply pattern to your port log stream DataStream<PortAccessEvent> portLogStream = ...; // Replace with your source PatternStream<PortAccessEvent> portScanPatternStream = CEP.pattern( portLogStream.keyBy(PortAccessEvent::getSourceIp), // Group by source IP portScanPattern ); // Generate alerts DataStream<String> portScanAlerts = portScanPatternStream.select( (PatternSelectFunction<PortAccessEvent, String>) pattern -> { List<PortAccessEvent> allScans = new ArrayList<>(pattern.get("first_scan")); allScans.addAll(pattern.get("subsequent_scans")); String sourceIp = allScans.get(0).getSourceIp(); Set<Integer> uniquePorts = allScans.stream() .map(PortAccessEvent::getDestPort) .collect(Collectors.toSet()); long windowStart = allScans.stream().mapToLong(PortAccessEvent::getTimestamp).min().getAsLong(); return String.format("[PORT SCAN ALERT] IP '%s' scanned %d unique ports starting at %d", sourceIp, uniquePorts.size(), windowStart); } );
Option 2: Using Window Aggregation (More Efficient for Counting)
For this scenario, window aggregation is often simpler and more performant since we're just counting unique values:
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.HashSet; import java.util.Set; DataStream<String> portScanAlertsViaWindow = portLogStream .keyBy(PortAccessEvent::getSourceIp) .window(TumblingEventTimeWindows.of(Time.minutes(1))) // 1-minute tumbling window .aggregate(new AggregateFunction<PortAccessEvent, Set<Integer>, String>() { @Override public Set<Integer> createAccumulator() { return new HashSet<>(); } @Override public Set<Integer> add(PortAccessEvent event, Set<Integer> accumulator) { accumulator.add(event.getDestPort()); return accumulator; } @Override public String getResult(Set<Integer> accumulator) { // Only return an alert if unique ports >=10 return accumulator.size() >= 10 ? String.format("[PORT SCAN ALERT] IP scanned %d unique ports in 1 minute", accumulator.size()) : null; } @Override public Set<Integer> merge(Set<Integer> a, Set<Integer> b) { a.addAll(b); return a; } }) .filter(alert -> alert != null); // Filter out non-alert results
Key Notes
- Use CEP if you need to add more complex logic later (e.g., specific port sequences).
- Use window aggregation for a lightweight, efficient solution focused solely on unique port counting.
内容的提问来源于stack exchange,提问作者JDForLife




