如何使用Event Stream Rules计算3事件滑动窗口平均值并生成事实?
Drools 实现滑动窗口计算3次事件平均值示例
没问题,我给你整理一个完整的示例,完全匹配你的需求——监听Integer类型的事件流,用滑动窗口捕获最近3个事件,计算平均值后通过insertLogical插入为逻辑事实,这样当窗口内的事件变化时,旧的平均值会自动被移除,新的平均值会实时更新。
第一步:定义承载平均值的事实类
首先我们需要一个简单的类来存储计算出的平均值:
public class AverageFact { private double average; public AverageFact(double average) { this.average = average; } // getter和toString方法 public double getAverage() { return average; } @Override public String toString() { return "AverageFact{average=" + average + "}"; } }
第二步:编写Drools规则文件(.drl)
这个规则会监听名为Stream的入口点,使用滑动窗口捕获最近3个Integer事件,计算平均值后插入逻辑事实:
package com.example.drools; import com.example.drools.AverageFact; // 声明我们要监听的入口点 declare entry-point Stream end rule "Calculate 3-event sliding window average" // 监听Stream入口点的Integer事件,取最近3个(滑动窗口) when accumulate( Integer($num: this) from entry-point "Stream" window:size(3), $sum: sum($num), $count: count($num), // 确保窗口里有至少1个事件才计算 eval($count > 0) ) // 计算平均值 $average: double(doubleValue($sum / $count)) then // 插入逻辑事实:当窗口内的事件变化时,旧的AverageFact会自动被移除 insertLogical(new AverageFact($average)); // 打印日志验证结果 System.out.println("计算出的平均值:" + $average); end // 可选:添加一个规则来验证AverageFact的存在和更新 rule "Log AverageFact changes" when $avgFact: AverageFact() then System.out.println("当前有效的平均值事实:" + $avgFact); end
第三步:编写测试代码
接下来写一个简单的测试类,模拟向入口点插入Integer事件,触发规则:
import org.kie.api.KieServices; import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.rule.EntryPoint; public class DemoAppTest { public static void main(String[] args) { // 初始化Kie容器和会话 KieServices kieServices = KieServices.Factory.get(); KieContainer kieContainer = kieServices.getKieClasspathContainer(); KieSession kieSession = kieContainer.newKieSession(); // 获取我们定义的Stream入口点 EntryPoint streamEntryPoint = kieSession.getEntryPoint("Stream"); // 插入测试事件,观察平均值变化 streamEntryPoint.insert(10); kieSession.fireAllRules(); // 窗口有1个事件,平均值10.0 streamEntryPoint.insert(20); kieSession.fireAllRules(); // 窗口有2个事件,平均值15.0 streamEntryPoint.insert(30); kieSession.fireAllRules(); // 窗口有3个事件,平均值20.0 streamEntryPoint.insert(40); kieSession.fireAllRules(); // 窗口滑动,移除10,加入40,平均值30.0 // 关闭会话 kieSession.dispose(); } }
关键部分解释
- 滑动窗口
window:size(3):只会保留最近的3个Integer事件,当第4个事件插入时,最早的那个事件会被移出窗口。 accumulate函数:用来聚合窗口内的事件,这里计算了总和(sum)和事件数量(count),进而得到平均值。insertLogical:插入的是逻辑事实,当触发它的原始事实(窗口内的Integer事件)被移除时,这个AverageFact会自动从工作内存中删除,完美适配滑动窗口的动态变化场景。
运行测试代码后,你会看到控制台输出每次事件插入后的平均值,以及对应的AverageFact更新。
内容的提问来源于stack exchange,提问作者Quellist




