You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

关于Storm IEventLogger在自定义Java应用中使用的技术问询

关于Storm IEventLogger的使用经验分享

我之前在Storm项目里用过IEventLogger做数据库事件记录,刚好能解答你的几个疑问~

一、怎么在自定义Java应用里用IEventLogger?

其实步骤很清晰,核心就是自定义实现这个接口,然后配置到拓扑里:

  • 首先你得写一个类实现IEventLogger接口,重写三个关键方法:
    • prepare():用来初始化资源(比如数据库连接池),这里就能拿到Storm的配置和拓扑上下文
    • log():写具体的日志写入逻辑,比如把事件数据插入数据库
    • close():任务结束时释放资源,比如关闭数据库连接池
  • 然后在提交拓扑的时候,把自定义的日志类配置到Storm的Config里:
    Config conf = new Config();
    // 指定你的自定义事件日志实现类
    conf.put(Config.TOPOLOGY_EVENT_LOGGER_CLASS, YourCustomDbEventLogger.class);
    // 提交拓扑
    StormSubmitter.submitTopology("your-topology-name", conf, yourTopologyBuilder.createTopology());
    

二、线程模型:是单独开线程还是复用业务线程?

这点可以放心,Storm会给每个worker单独启动一个专属的事件日志线程,完全和业务线程(bolt/spout的处理线程)隔离。也就是说你在log()里写的数据库操作,不会阻塞你的业务逻辑处理,避免日志拖慢拓扑性能。

三、怎么获取/设置Storm配置和TopologyContext?

这俩东西在prepare()方法里直接就能拿到,不用额外折腾:

  • prepare()的方法签名,参数就是Map<String, Object> conf(Storm的配置集合)和TopologyContext context(拓扑上下文):
    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context) {
        // 从配置里拿自定义的参数,比如数据库地址
        String dbUrl = (String) conf.get("custom.db.url");
        // 从TopologyContext拿拓扑相关信息,比如当前worker的ID
        String workerId = context.getWorkerId();
        // 这里初始化数据库连接池之类的资源就好
    }
    
  • 如果要加自定义配置项,直接在提交拓扑的Config里put键值对就行,比如上面代码里的custom.db.url,提前在conf里设置好就行。

四、额外提醒几个注意点

  • 因为是独立线程处理日志,要注意log()方法里的线程安全问题,比如数据库连接的复用要保证线程安全,别出现连接泄露或者并发冲突
  • Storm默认不会重试失败的日志写入,你可以在自定义实现里加重试逻辑,或者用消息队列做一层缓冲,避免日志数据丢失
  • 官方文档里提到的一些配置项也可以用上,比如TOPOLOGY_EVENT_LOGGER_EXECUTOR_BUFFER_SIZE,可以调整日志缓冲区的大小,优化批量写入的性能

内容的提问来源于stack exchange,提问作者user5730669

火山引擎 最新活动