关于Storm IEventLogger在自定义Java应用中使用的技术问询
关于Storm IEventLogger的使用经验分享
我之前在Storm项目里用过IEventLogger做数据库事件记录,刚好能解答你的几个疑问~
一、怎么在自定义Java应用里用IEventLogger?
其实步骤很清晰,核心就是自定义实现这个接口,然后配置到拓扑里:
- 首先你得写一个类实现
IEventLogger接口,重写三个关键方法:prepare():用来初始化资源(比如数据库连接池),这里就能拿到Storm的配置和拓扑上下文log():写具体的日志写入逻辑,比如把事件数据插入数据库close():任务结束时释放资源,比如关闭数据库连接池
- 然后在提交拓扑的时候,把自定义的日志类配置到Storm的
Config里:Config conf = new Config(); // 指定你的自定义事件日志实现类 conf.put(Config.TOPOLOGY_EVENT_LOGGER_CLASS, YourCustomDbEventLogger.class); // 提交拓扑 StormSubmitter.submitTopology("your-topology-name", conf, yourTopologyBuilder.createTopology());
二、线程模型:是单独开线程还是复用业务线程?
这点可以放心,Storm会给每个worker单独启动一个专属的事件日志线程,完全和业务线程(bolt/spout的处理线程)隔离。也就是说你在log()里写的数据库操作,不会阻塞你的业务逻辑处理,避免日志拖慢拓扑性能。
三、怎么获取/设置Storm配置和TopologyContext?
这俩东西在prepare()方法里直接就能拿到,不用额外折腾:
- 看
prepare()的方法签名,参数就是Map<String, Object> conf(Storm的配置集合)和TopologyContext context(拓扑上下文):@Override public void prepare(Map<String, Object> conf, TopologyContext context) { // 从配置里拿自定义的参数,比如数据库地址 String dbUrl = (String) conf.get("custom.db.url"); // 从TopologyContext拿拓扑相关信息,比如当前worker的ID String workerId = context.getWorkerId(); // 这里初始化数据库连接池之类的资源就好 } - 如果要加自定义配置项,直接在提交拓扑的
Config里put键值对就行,比如上面代码里的custom.db.url,提前在conf里设置好就行。
四、额外提醒几个注意点
- 因为是独立线程处理日志,要注意
log()方法里的线程安全问题,比如数据库连接的复用要保证线程安全,别出现连接泄露或者并发冲突 - Storm默认不会重试失败的日志写入,你可以在自定义实现里加重试逻辑,或者用消息队列做一层缓冲,避免日志数据丢失
- 官方文档里提到的一些配置项也可以用上,比如
TOPOLOGY_EVENT_LOGGER_EXECUTOR_BUFFER_SIZE,可以调整日志缓冲区的大小,优化批量写入的性能
内容的提问来源于stack exchange,提问作者user5730669




