本文介绍如何使用 时间压缩算子(TIME_COMPRESS) 实现记忆的时间分层管理,使系统能够根据时间范围自动调整记忆粒度,保持近期记忆清晰、长期记忆简化。
时间范围 | 聚合粒度 | 保留内容 |
|---|---|---|
最近 1 周 | 日级聚合 | 保留每日细节 |
最近 1 个月 | 周级聚合 | 合并每周代表事件 |
最近 1 年 | 月级聚合 | 提取月度摘要 |
超过 1 年 | 年级聚合 | 每年仅留一条代表记忆 |
说明:TIME_COMPRESS 算子会在后台自动执行此聚合逻辑。
您可以通过控制台或数据面 API 创建带有 TIME_COMPRESS 聚合规则的记忆库,该规则用于自动按时间层级压缩事件摘要。
创建一个名为 time_compress_demo 的记忆库。
请参考下图,在记忆抽取配置中编辑事件规则,以确保记忆库能够准确捕获用户交互数据。
在记忆抽取配置中,请参考下图编辑画像规则。添加“event_history”字段时,点击更多配置,将关联事件 event_v1 中的 summary 作为聚合对象,并分别选择 TIME_COMPRESS 算子作为聚合操作符。
配置好事件和画像抽取规则后,点击“创建记忆库”即可完成创建。
通过 AddSession 接口将多时段的用户事件写入记忆库,系统会在后续压缩阶段自动执行时间聚合。
import os from volcengine.base.Request import Request from volcengine.Credentials import Credentials from volcengine.auth.SignerV4 import SignerV4 import requests import json import time AK=os.getenv("VIKINGDB_AK", "your_ak") SK=os.getenv("VIKINGDB_SK", "your_sk") Domain = "api-knowledgebase.mlp.cn-beijing.volces.com" def prepare_request(method, path, ak, sk, data=None): r = Request() r.set_shema("http") r.set_method(method) r.set_host(Domain) r.set_path(path) if data is not None: r.set_body(json.dumps(data)) credentials = Credentials(ak, sk, 'air', 'cn-north-1') SignerV4.sign(r, credentials) return r def internal_request(method, api, payload, params=None): req = prepare_request( method = method, path = api, ak = AK, sk = SK, data = payload) r = requests.request(method=req.method, url="{}://{}{}".format(req.schema, req.host, req.path), headers=req.headers, data=req.body, params=params, ) return r import datetime def ts_ms(y, m, d): return int(datetime.datetime(y, m, d, 10, 0, 0).timestamp() * 1000) # ==== mock 听歌兴趣演进数据 ==== entries = [ { "time": ts_ms(2022, 3, 1), "user": "最近开始喜欢听周杰伦的老歌,比如《晴天》和《七里香》。", "assistant": "怀旧风不错~旋律真的耐听。" }, { "time": ts_ms(2023, 5, 20), "user": "这段时间主要听欧美流行,比如 Taylor Swift 和 Ed Sheeran。", "assistant": "看来你喜欢节奏感更强的流行乐了~" }, { "time": ts_ms(2023, 5, 21), "user": "最近又听了Taylor Swift 和 Ed Sheeran的歌", "assistant": "真不错呀,你很喜欢他们了" }, { "time": ts_ms(2023, 5, 29), "user": "我最近去野营了", "assistant": "野营很刺激,也很有挑战性。" }, { "time": ts_ms(2024, 11, 30), "user": "入坑 City Pop,最喜欢竹内玛莉亚的《Plastic Love》。", "assistant": "City Pop 很抓耳,也很适合通勤。" }, { "time": ts_ms(2025, 2, 10), "user": "迷上独立音乐,比如房东的猫和落日飞车。", "assistant": "独立音乐更有个人风格。" }, { "time": ts_ms(2025, 8, 20), "user": "夏天更喜欢清爽的 Bossa Nova。", "assistant": "轻快又放松~" } ] path = "/api/memory/session/add" success, fail = 0, 0 for i, e in enumerate(entries, 1): # session_id 每次不同:可用时间戳或索引生成 session_id = f"music_timeline_{int(time.time() * 1000)}_{i}" payload = { "collection_name": "time_compress_demo", "session_id": session_id, "messages": [ {"role": "user", "content": e["user"], "role_name": "jiajie", "time": e["time"]}, {"role": "assistant", "content": e["assistant"], "role_name": "robot", "time": e["time"]}, ], "metadata": { "default_user_id": "user", "default_assistant_id": "robot", "time": e["time"], } } try: rsp = internal_request("POST", path, payload) ts_readable = datetime.datetime.fromtimestamp(e["time"] / 1000) print(f"[{i:02d}] {session_id} | time={ts_readable} | status={rsp.status_code}") try: print(" response:", rsp.json()) except Exception: print(" text:", rsp.text[:500]) if 200 <= rsp.status_code < 300: success += 1 else: fail += 1 except Exception as ex: print(f"[{i:02d}] ERROR for {session_id}: {ex}") fail += 1 if i < 10: time.sleep(60) # 防止过快请求 print(f"\n Done. Success={success}, Fail={fail}")
通过 SearchMemory 接口查询压缩后的聚合记忆,系统将返回压缩后记忆的结果。
import os import requests import json from volcengine.base.Request import Request from volcengine.Credentials import Credentials from volcengine.auth.SignerV4 import SignerV4 AK=os.getenv("VIKINGDB_AK", "your_ak") SK=os.getenv("VIKINGDB_SK", "your_sk") Domain = "api-knowledgebase.mlp.cn-beijing.volces.com" def prepare_request(method, path, ak, sk, data=None): r = Request() r.set_shema("http") r.set_method(method) r.set_host(Domain) r.set_path(path) if data is not None: r.set_body(json.dumps(data)) credentials = Credentials(ak, sk, 'air', 'cn-north-1') SignerV4.sign(r, credentials) return r def internal_request(method, api, payload, params=None): req = prepare_request( method = method, path = api, ak = AK, sk = SK, data = payload) r = requests.request(method=req.method, url="{}://{}{}".format(req.schema, req.host, req.path), headers=req.headers, data=req.body, params=params, ) return r path = "/api/memory/search" playload = { "collection_name": "time_compress_demo", "filter": { "user_id": "user", "memory_type": ["sum_entity"] } } rsp = internal_request('POST', path, playload) print(rsp.json())
在控制台中,可以看到事件已按照时间压缩规则自动聚合并生成摘要。