You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Redisearch聚合查询:按标签分组取Top5产品的实现问询

问题分析

你当前的存储结构(每个产品+标签组合存为单文档)能很好地计算标签总分,但Redisearch的FT.AGGREGATE目前没有内置的分组内TopN聚合函数,所以单条聚合查询确实没法直接获取每个标签对应的Top5产品——REDUCE TOLIST会返回该标签下所有未排序的产品,FIRST_VALUE只能取到排序后的第一个结果,都满足不了需求。

下面从现有结构优化和调整存储结构两个方向,给出可行的解决方案:


方案一:基于现有存储结构优化查询流程

虽然没法单查询完成,但可以通过批量查询+Pipeline减少网络开销,高效实现需求:

  1. 第一步:获取标签总分排序
    先执行你现有的聚合查询,拿到按总分降序排列的所有标签列表:
    from redisearch import Client, AggregateRequest, reducers, SortBy
    
    client = Client("product_tags")
    req = AggregateRequest("*") \
        .group_by("@tag", reducers.sum("@score").alias("total_score")) \
        .sort_by(SortBy("@total_score", desc=True))
    tag_scores = client.aggregate(req).rows
    # 得到格式类似 [["tag3", "110"], ["tag2", "110"], ["tag4", "100"], ["tag1", "10"]] 的结果
    
  2. 第二步:批量获取每个标签的Top5产品
    用Redis的Pipeline批量执行查询,避免逐个请求的网络延迟:
    pipe = client.redis.pipeline()
    for tag, _ in tag_scores:
        # 对每个标签执行排序查询,取Top5产品
        pipe.execute_command(
            "FT.SEARCH", "product_tags", f"@tag:{tag}",
            "SORTBY", "score", "DESC", "LIMIT", 0, 5, "RETURN", 1, "product_name"
        )
    top_products_results = pipe.execute()
    
    # 合并结果
    final_result = []
    for (tag, total_score), products_res in zip(tag_scores, top_products_results):
        # 解析FT.SEARCH的返回结果,提取产品名
        product_names = [res[1] for res in products_res[1::2]]
        final_result.append([tag, int(total_score), product_names])
    
    这种方式的优势是不需要修改现有存储,只需要优化查询逻辑,用Pipeline把多次请求合并成一次网络往返,性能提升明显。

方案二:调整存储结构,实现单查询获取结果

如果想要彻底实现单查询满足需求,可以调整存储方式,把每个标签作为独立文档,同时维护标签的总分和Top5产品列表:

  1. 新存储结构
    创建一个新的索引(比如tag_summary),文档ID用tag:{tag_name},字段定义:
    from redisearch import TextField, NumericField
    
    summary_client = Client("tag_summary")
    summary_client.create_index([
        TextField("tag"),
        NumericField("total_score"),
        TextField("top_products")  # 用逗号分隔的产品名字符串,或用JSON类型存有序数组
    ])
    
  2. 数据维护逻辑
    当新增/更新/删除产品时,需要遍历产品的所有标签,更新对应标签的文档:
    • 新增产品:对每个标签,累加total_score,并将产品插入到top_products的正确位置(按score降序),只保留前5个。
    • 更新产品:先减去旧score,再加上新score,然后调整top_products列表。
    • 删除产品:减去对应score,并从top_products中移除该产品。
      推荐用Redis的JSON类型存储top_products,这样维护有序列表会更方便。
  3. 查询逻辑
    直接执行排序查询即可得到所有标签的总分和Top5产品:
    req = AggregateRequest("*") \
        .sort_by(SortBy("@total_score", desc=True)) \
        .return_fields("@tag", "@total_score", "@top_products")
    final_result = summary_client.aggregate(req).rows
    

方案三:用Redis Sorted Set辅助(推荐)

这是一种折中的方案,结合Redisearch和Sorted Set的优势,既方便维护,又能高效查询:

  1. 存储结构
    • 保留你现有的product_tags索引,用于计算标签总分。
    • 对每个标签,创建一个Sorted Set,key为tag_top_products:{tag_name},成员是产品名,score为产品的score值。
  2. 数据维护
    新增/更新产品时,对每个标签执行:
    # 示例:产品product1,标签tag1,score10
    client.redis.zadd(f"tag_top_products:tag1", {"product1": 10})
    
    删除产品时,执行:
    client.redis.zrem(f"tag_top_products:tag1", "product1")
    
  3. 查询流程
    1. 用你现有的聚合查询获取标签总分排序。
    2. 用Pipeline批量执行ZREVRANGE获取每个标签的Top5产品:
      pipe = client.redis.pipeline()
      for tag, _ in tag_scores:
          pipe.zrevrange(f"tag_top_products:{tag}", 0, 4)
      top_products_list = pipe.execute()
      
      # 合并结果
      final_result = [
          [tag, int(total_score), products]
          for (tag, total_score), products in zip(tag_scores, top_products_list)
      ]
      
    这种方式的优势是Sorted Set会自动维护产品的排序,ZREVRANGE可以直接取TopN,性能非常高,而且数据维护逻辑比手动维护列表简单得多。

内容的提问来源于stack exchange,提问作者user12177990

火山引擎 最新活动