You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Flink CEP实现暴力登录与端口扫描攻击的检测模式

Hey there, let's break down how to implement brute-force login and port scan detection using Apache Flink CEP for your log ingestion scenario. Unlike simple constant matches you might have seen before, these rely on grouped, time-windowed pattern counting—which is exactly where Flink CEP shines for behavioral threat detection.

1. Brute-Force Login Detection

Requirements

Trigger an alert when the same username generates 5 or more "login failure" events within a 5-minute window.

Implementation Steps & Code

First, define a POJO to model your login events (adjust fields to match your actual log structure):

public class LoginEvent {
    private String username;
    private String eventType; // Expected values: "login failure" / "login success"
    private long timestamp;

    // Constructors, getters, setters
}

Next, build the CEP pattern to track repeated failures for a single user:

import org.apache.flink.cep.Pattern;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternSelectFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.List;

// Build the brute-force detection pattern
Pattern<LoginEvent, LoginEvent> bruteForcePattern = Pattern
    .<LoginEvent>begin("first_failure")
    .where(evt -> "login failure".equals(evt.getEventType()))
    .next("subsequent_failures")
    .where(evt -> "login failure".equals(evt.getEventType()))
    .timesOrMore(4) // Total failures = 1 (from begin) + 4+ (from next) = 5+
    .within(Time.minutes(5))
    .consecutive(); // Optional: Ensures failures are consecutive (no success events in between)

// Apply pattern to your log stream
DataStream<LoginEvent> loginLogStream = ...; // Replace with your source (Kafka, etc.)
PatternStream<LoginEvent> bruteForcePatternStream = CEP.pattern(
    loginLogStream.keyBy(LoginEvent::getUsername), // Group events by username
    bruteForcePattern
);

// Generate alerts from matched patterns
DataStream<String> bruteForceAlerts = bruteForcePatternStream.select(
    (PatternSelectFunction<LoginEvent, String>) pattern -> {
        List<LoginEvent> allFailures = new ArrayList<>(pattern.get("first_failure"));
        allFailures.addAll(pattern.get("subsequent_failures"));
        
        String username = allFailures.get(0).getUsername();
        long start = allFailures.get(0).getTimestamp();
        long end = allFailures.get(allFailures.size() - 1).getTimestamp();
        
        return String.format("[BRUTE-FORCE ALERT] Username '%s' failed to login %d times between %d and %d",
            username, allFailures.size(), start, end);
    }
);

Key Notes

  • keyBy(LoginEvent::getUsername) ensures we only track failures for individual users.
  • timesOrMore(4) accounts for the initial "first_failure" event, so total failures hit 5+.
  • Remove .consecutive() if you want to count failures even if there are successful logins in between (adjust based on your threat model).
2. Port Scan Detection

Requirements

Trigger an alert when the same source IP accesses 10 or more unique destination ports within a 1-minute window.

Implementation Options

Flink offers two solid approaches here—CEP for pattern tracking, or window aggregation for simpler counting. Let's cover both:

Option 1: Using Flink CEP

First, define your port event POJO:

public class PortAccessEvent {
    private String sourceIp;
    private int destPort;
    private long timestamp;

    // Constructors, getters, setters
}

Build the CEP pattern to track unique ports per IP:

import org.apache.flink.cep.conditions.IterativeCondition;
import org.apache.flink.cep.Pattern;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

Pattern<PortAccessEvent, PortAccessEvent> portScanPattern = Pattern
    .<PortAccessEvent>begin("first_scan")
    .followedByAny("subsequent_scans") // Allow any order of events in the window
    .within(Time.minutes(1))
    .with(new IterativeCondition<PortAccessEvent>() {
        @Override
        public boolean filter(PortAccessEvent currentEvent, Context<PortAccessEvent> ctx) throws Exception {
            // Collect all matched events for the current IP
            List<PortAccessEvent> allScans = new ArrayList<>(ctx.getEventsForPattern("first_scan"));
            allScans.addAll(ctx.getEventsForPattern("subsequent_scans"));
            allScans.add(currentEvent);
            
            // Count unique destination ports
            Set<Integer> uniquePorts = allScans.stream()
                .map(PortAccessEvent::getDestPort)
                .collect(Collectors.toSet());
            
            // Trigger alert when unique ports reach 10
            return uniquePorts.size() >= 10;
        }
    });

// Apply pattern to your port log stream
DataStream<PortAccessEvent> portLogStream = ...; // Replace with your source
PatternStream<PortAccessEvent> portScanPatternStream = CEP.pattern(
    portLogStream.keyBy(PortAccessEvent::getSourceIp), // Group by source IP
    portScanPattern
);

// Generate alerts
DataStream<String> portScanAlerts = portScanPatternStream.select(
    (PatternSelectFunction<PortAccessEvent, String>) pattern -> {
        List<PortAccessEvent> allScans = new ArrayList<>(pattern.get("first_scan"));
        allScans.addAll(pattern.get("subsequent_scans"));
        
        String sourceIp = allScans.get(0).getSourceIp();
        Set<Integer> uniquePorts = allScans.stream()
            .map(PortAccessEvent::getDestPort)
            .collect(Collectors.toSet());
        long windowStart = allScans.stream().mapToLong(PortAccessEvent::getTimestamp).min().getAsLong();
        
        return String.format("[PORT SCAN ALERT] IP '%s' scanned %d unique ports starting at %d",
            sourceIp, uniquePorts.size(), windowStart);
    }
);

Option 2: Using Window Aggregation (More Efficient for Counting)

For this scenario, window aggregation is often simpler and more performant since we're just counting unique values:

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Set;

DataStream<String> portScanAlertsViaWindow = portLogStream
    .keyBy(PortAccessEvent::getSourceIp)
    .window(TumblingEventTimeWindows.of(Time.minutes(1))) // 1-minute tumbling window
    .aggregate(new AggregateFunction<PortAccessEvent, Set<Integer>, String>() {
        @Override
        public Set<Integer> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public Set<Integer> add(PortAccessEvent event, Set<Integer> accumulator) {
            accumulator.add(event.getDestPort());
            return accumulator;
        }

        @Override
        public String getResult(Set<Integer> accumulator) {
            // Only return an alert if unique ports >=10
            return accumulator.size() >= 10 
                ? String.format("[PORT SCAN ALERT] IP scanned %d unique ports in 1 minute", accumulator.size())
                : null;
        }

        @Override
        public Set<Integer> merge(Set<Integer> a, Set<Integer> b) {
            a.addAll(b);
            return a;
        }
    })
    .filter(alert -> alert != null); // Filter out non-alert results

Key Notes

  • Use CEP if you need to add more complex logic later (e.g., specific port sequences).
  • Use window aggregation for a lightweight, efficient solution focused solely on unique port counting.

内容的提问来源于stack exchange,提问作者JDForLife

火山引擎 最新活动