Skip to content

Conversation

@hijzy
Copy link
Collaborator

@hijzy hijzy commented Jan 26, 2026

Description

Currently, memory retrieval may recall the same fact or topic multiple times. Typical sources include:

  1. Redundant records caused by repeated synchronization
  2. Snapshot writes of the same event from multiple sources
  3. Users mentioning the same content multiple times, leading to the same memory being summarized repeatedly
    These characteristics create systemic issues under queries with no explicit constraints: a redundant candidate set, reduced information density, and downstream generation that is more easily affected by repeated information.
    This optimization focuses on deduplicating similar memories, reducing semantic redundancy within the candidate set and improving diversity and effective information density.
    Given a query, the retrieval system returns a set of candidate memories (including relevance scores, embeddings, etc.). The goal is to select the Top-K results while maintaining overall relevance, minimizing semantic duplication within Top-K, and preserving coverage of useful information.
    After search results are returned, an internal MMR (Maximal Marginal Relevance) deduplication function is invoked to perform subset selection and re-ranking. It balances relevance and diversity, while preventing the diversity penalty term from overwhelming the relevance term when relevance is in the long tail (close to zero), which could otherwise cause the selected set to drift away from the query intent.
    The overall implementation follows a “retrieve more first, then deduplicate” approach, with three key strategies:
  4. Expand the recall candidate pool before deduplication to avoid Top-K being dominated early by the same cluster
  5. Preselect several items primarily by relevance to prevent diversity penalties from suppressing highly relevant results at the start
  6. MMR is responsible only for subset selection; the final output order is re-sorted by the original relevance scores so that downstream components see the most relevant evidence first

Related Issue (Required):

New feature #978

How Has This Been Tested?

Test Script Or Test Steps:

from __future__ import annotations
import warnings
warnings.filterwarnings("ignore")
import json

from typing import Any

from memos.api.handlers.base_handler import HandlerDependencies
from memos.api.handlers.config_builders import (
    build_embedder_config,
    build_feedback_reranker_config,
    build_llm_config,
    build_pref_adder_config,
    build_pref_extractor_config,
    build_pref_retriever_config,
    build_vec_db_config,
)
from memos.api.handlers.search_handler import SearchHandler
from memos.configs.memory import TreeTextMemoryConfig
from memos.embedders.factory import EmbedderFactory
from memos.llms.factory import LLMFactory
from memos.mem_cube.navie import NaiveMemCube
from memos.memories.textual.prefer_text_memory.factory import (
    AdderFactory,
    ExtractorFactory,
    RetrieverFactory,
)
from memos.memories.textual.simple_preference import SimplePreferenceTextMemory
from memos.memories.textual.tree import TreeTextMemory
from memos.reranker.factory import RerankerFactory
from memos.vec_dbs.factory import VecDBFactory


CONFIG_PATH = "./configs/config.json"  # TreeTextMemory 的配置文件路径
SESSION_ID = "search_test_session"  # 用于 info 与 search_priority 的 session 标识
DEFAULT_USER_ID = "search_test_user"  # 本地脚本用 user_id, 仅用于占位
PREFERENCE_ENABLED = False  # 是否启用 preference 记忆检索
PREF_TOP_K = 15  # preference 记忆检索条数

MEM_CUBE_ID = ""
QUERY = ""

DEDUP_MODE = "both"  # 去重模式, 支持 no, mmr, both 或者 None(search_handler 里会处理成 mmr)
TOP_K = 10  # 最终召回条数
MAX_PRINT_CHARS = 200  # 打印 memory 摘要最大字符数


_tree_text_memory: TreeTextMemory | None = None
_pref_memory: SimplePreferenceTextMemory | None = None
_search_handler: SearchHandler | None = None


def _safe_str(x: Any) -> str:
    return x if isinstance(x, str) else "" if x is None else str(x)


def _truncate_text(s: str, max_chars: int) -> str:
    s = (s or "").replace("\r\n", "\n").replace("\r", "\n").strip()
    if max_chars <= 0:
        return ""
    if len(s) <= max_chars:
        return s
    return s[: max_chars - 1] + "…"


def _get_tree_text_memory() -> TreeTextMemory:
    global _tree_text_memory
    if _tree_text_memory is not None:
        return _tree_text_memory
    tree_config = TreeTextMemoryConfig.from_json_file(CONFIG_PATH)
    _tree_text_memory = TreeTextMemory(tree_config)
    return _tree_text_memory


class DummyScheduler:
    def __init__(self) -> None:
        self.extra: dict[str, Any] = {}


class DummyDeepSearchAgent:
    def run(self, query: str, user_id: str | None = None) -> list[dict[str, Any]]:
        return []


class InternalSearchRequest:
    def __init__(
        self,
        *,
        query: str,
        user_id: str,
        readable_cube_ids: list[str],
        top_k: int,
        dedup: str,
    ) -> None:
        self.query = query
        self.user_id = user_id
        self.readable_cube_ids = readable_cube_ids
        self.mode = "fast"
        self.top_k = top_k
        self.dedup = dedup
        self.pref_top_k = PREF_TOP_K if PREFERENCE_ENABLED else 0
        self.include_preference = PREFERENCE_ENABLED
        self.search_tool_memory = False
        self.tool_mem_top_k = 0
        self.filter: dict[str, Any] | None = None
        self.internet_search = False
        self.threshold = None
        self.search_memory_type = "All"
        self.chat_history = None
        self.session_id = SESSION_ID
        self.mem_cube_id = None
        self.moscube = False
        self.operation = None
        self.source = None


def _get_search_handler() -> SearchHandler:
    global _search_handler, _pref_memory, PREFERENCE_ENABLED
    if _search_handler is not None:
        return _search_handler

    text_mem = _get_tree_text_memory()

    # Initialize preference memory if enabled
    pref_mem = None
    if PREFERENCE_ENABLED:
        try:
            llm = LLMFactory.from_config(build_llm_config())
            embedder = EmbedderFactory.from_config(build_embedder_config())
            vector_db = VecDBFactory.from_config(build_vec_db_config())
            feedback_reranker = RerankerFactory.from_config(build_feedback_reranker_config())

            pref_extractor_cfg = build_pref_extractor_config()
            pref_adder_cfg = build_pref_adder_config()
            pref_retriever_cfg = build_pref_retriever_config()

            pref_extractor = ExtractorFactory.from_config(
                config_factory=pref_extractor_cfg,
                llm_provider=llm,
                embedder=embedder,
                vector_db=vector_db,
            )
            pref_adder = AdderFactory.from_config(
                config_factory=pref_adder_cfg,
                llm_provider=llm,
                embedder=embedder,
                vector_db=vector_db,
                text_mem=text_mem,
            )
            pref_retriever = RetrieverFactory.from_config(
                config_factory=pref_retriever_cfg,
                llm_provider=llm,
                embedder=embedder,
                reranker=feedback_reranker,
                vector_db=vector_db,
            )

            pref_mem = SimplePreferenceTextMemory(
                extractor_llm=llm,
                vector_db=vector_db,
                embedder=embedder,
                reranker=feedback_reranker,
                extractor=pref_extractor,
                adder=pref_adder,
                retriever=pref_retriever,
            )
            _pref_memory = pref_mem
        except Exception as exc:
            print(f"初始化 preference 记忆失败, 将仅检索 text_mem, 原因: {exc}")
            pref_mem = None
            PREFERENCE_ENABLED = False

    naive_mem_cube = NaiveMemCube(text_mem=text_mem, pref_mem=pref_mem, act_mem=None, para_mem=None)

    searcher = text_mem.get_searcher(
        manual_close_internet=True,
        moscube=False,
        process_llm=text_mem.extractor_llm,
    )
    dependencies = HandlerDependencies(
        naive_mem_cube=naive_mem_cube,
        mem_scheduler=DummyScheduler(),
        searcher=searcher,
        deepsearch_agent=DummyDeepSearchAgent(),
    )
    _search_handler = SearchHandler(dependencies)
    return _search_handler


def _extract_text_memories(search_data: dict[str, Any]) -> list[dict[str, Any]]:
    buckets = search_data.get("text_mem") or []
    if not isinstance(buckets, list):
        return []
    out: list[dict[str, Any]] = []
    for b in buckets:
        if not isinstance(b, dict):
            continue
        mems = b.get("memories") or []
        if not isinstance(mems, list):
            continue
        for m in mems:
            if isinstance(m, dict):
                out.append(m)
    return out


def _extract_preference_memories(search_data: dict[str, Any]) -> list[dict[str, Any]]:
    """Extract preference memories from search response data."""
    buckets = search_data.get("preference") or []
    if not isinstance(buckets, list):
        return []
    out: list[dict[str, Any]] = []
    for b in buckets:
        if not isinstance(b, dict):
            continue
        mems = b.get("memories") or []
        if not isinstance(mems, list):
            continue
        for m in mems:
            if isinstance(m, dict):
                out.append(m)
    return out


def search_memories(*, mem_cube_id: str, query: str, dedup: str, top_k: int) -> dict[str, Any]:
    """Search both text and preference memories, return dict with both types."""
    mem_cube_id_s = _safe_str(mem_cube_id).strip()
    query_s = _safe_str(query).strip()

    if not mem_cube_id_s:
        raise ValueError("mem_cube_id 不能为空")
    if not query_s:
        raise ValueError("query 不能为空")
    if top_k <= 0:
        raise ValueError("top_k 必须大于 0")

    handler = _get_search_handler()
    req = InternalSearchRequest(
        query=query_s,
        user_id=DEFAULT_USER_ID,
        readable_cube_ids=[mem_cube_id_s],
        top_k=top_k,
        dedup=dedup,
    )
    resp = handler.handle_search_memories(req)
    data = resp.data or {}

    return {
        "text_mem": _extract_text_memories(data),
        "preference": _extract_preference_memories(data),
    }


def _to_print_item(mem: dict[str, Any]) -> dict[str, Any]:
    meta = mem.get("metadata") or {}
    score = meta.get("relativity", 0.0)
    return {
        "id": _safe_str(mem.get("id")),
        "score": float(score) if isinstance(score, (int, float)) else 0.0,
        "memory": _truncate_text(_safe_str(mem.get("memory")), MAX_PRINT_CHARS),
    }


def _print_results(title: str, memories: list[dict[str, Any]]) -> None:
    print("\n" + "=" * 80)
    print(title)
    print("=" * 80)
    items = [_to_print_item(m) for m in memories]
    for i, it in enumerate(items, start=1):
        print(f"{i:02d}. id={it['id']} score={it['score']:.6f}")
        print(it["memory"])
    print(f"count={len(items)}")


def _diff_summary(no_ids: list[str], mmr_ids: list[str]) -> dict[str, Any]:
    set_no = set(no_ids)
    set_mmr = set(mmr_ids)
    only_no = [x for x in no_ids if x not in set_mmr]
    only_mmr = [x for x in mmr_ids if x not in set_no]
    same_order = no_ids == mmr_ids
    jaccard = (len(set_no & set_mmr) / len(set_no | set_mmr)) if (set_no | set_mmr) else 1.0
    return {
        "same_order": same_order,
        "jaccard": jaccard,
        "only_in_no": only_no,
        "only_in_mmr": only_mmr,
    }


def main(*, mem_cube_id: str, query: str, dedup_mode: str, topk: int) -> None:
    if dedup_mode in ("no", "both"):
        no_results = search_memories(mem_cube_id=mem_cube_id, query=query, dedup="no", top_k=topk)
        no_text_mems = no_results["text_mem"]
        no_pref_mems = no_results["preference"]
        _print_results("TEXT_MEM dedup=no", no_text_mems)
        if PREFERENCE_ENABLED and no_pref_mems:
            _print_results("PREFERENCE dedup=no", no_pref_mems)
    else:
        no_text_mems = []
        no_pref_mems = []

    if dedup_mode in ("mmr", "both"):
        mmr_results = search_memories(mem_cube_id=mem_cube_id, query=query, dedup="mmr", top_k=topk)
        mmr_text_mems = mmr_results["text_mem"]
        mmr_pref_mems = mmr_results["preference"]
        _print_results("TEXT_MEM dedup=mmr", mmr_text_mems)
        if PREFERENCE_ENABLED and mmr_pref_mems:
            _print_results("PREFERENCE dedup=mmr", mmr_pref_mems)
    else:
        results = search_memories(mem_cube_id=mem_cube_id, query=query, dedup=dedup_mode, top_k=topk)
        mmr_text_mems = results["text_mem"]
        mmr_pref_mems = results["preference"]
        _print_results(f"TEXT_MEM dedup={dedup_mode}", mmr_text_mems)
        if PREFERENCE_ENABLED and mmr_pref_mems:
            _print_results(f"PREFERENCE dedup={dedup_mode}", mmr_pref_mems)

    if dedup_mode == "both":
        # Text memory comparison
        no_text_ids = [_safe_str(m.get("id")) for m in no_text_mems]
        mmr_text_ids = [_safe_str(m.get("id")) for m in mmr_text_mems]
        text_diff = _diff_summary(no_text_ids, mmr_text_ids)
        print("\n" + "=" * 80)
        print("TEXT_MEM diff_summary")
        print("=" * 80)
        print(json.dumps(text_diff, ensure_ascii=False, indent=2))

        # Preference memory comparison
        if PREFERENCE_ENABLED and no_pref_mems and mmr_pref_mems:
            no_pref_ids = [_safe_str(m.get("id")) for m in no_pref_mems]
            mmr_pref_ids = [_safe_str(m.get("id")) for m in mmr_pref_mems]
            pref_diff = _diff_summary(no_pref_ids, mmr_pref_ids)
            print("\n" + "=" * 80)
            print("PREFERENCE diff_summary")
            print("=" * 80)
            print(json.dumps(pref_diff, ensure_ascii=False, indent=2))


if __name__ == "__main__":
    main(mem_cube_id=MEM_CUBE_ID, query=QUERY, dedup_mode=DEDUP_MODE, topk=TOP_K)

Checklist

I have performed a self-review of my own code | 我已自行检查了自己的代码
I have commented my code in hard-to-understand areas | 我已在难以理解的地方对代码进行了注释
I have added tests that prove my fix is effective or that my feature works | 我已添加测试以证明我的修复有效或功能正常
I have created related documentation issue/PR in MemOS-Docs (if applicable) | 我已在 MemOS-Docs 中创建了相关的文档 issue/PR(如果适用)
I have linked the issue to this PR (if applicable) | 我已将 issue 链接到此 PR(如果适用)
I have mentioned the person who will review this PR | 我已提及将审查此 PR 的人

Reviewer Checklist

closes #xxxx (Replace xxxx with the GitHub issue number)
Made sure Checks passed
Tests have been provided

harvey_xiang and others added 5 commits January 28, 2026 20:05
## Description

<!--
Please include a summary of the changes below;
Fill in the issue number that this PR addresses (if applicable);
Fill in the related MemOS-Docs repository issue or PR link (if
applicable);
Mention the person who will review this PR (if you know who it is);
Replace (summary), (issue), (docs-issue-or-pr-link), and (reviewer) with
the appropriate information.

请在下方填写更改的摘要;
填写此 PR 解决的问题编号(如果适用);
填写相关的 MemOS-Docs 仓库 issue 或 PR 链接(如果适用);
提及将审查此 PR 的人(如果您知道是谁);
替换 (summary)、(issue)、(docs-issue-or-pr-link) 和 (reviewer) 为适当的信息。
-->

Summary: (summary)

Fix: #(issue)

Docs Issue/PR: (docs-issue-or-pr-link)

Reviewer: @(reviewer)

## Checklist:

- [ ] I have performed a self-review of my own code | 我已自行检查了自己的代码
- [ ] I have commented my code in hard-to-understand areas |
我已在难以理解的地方对代码进行了注释
- [ ] I have added tests that prove my fix is effective or that my
feature works | 我已添加测试以证明我的修复有效或功能正常
- [ ] I have created related documentation issue/PR in
[MemOS-Docs](https://github.com/MemTensor/MemOS-Docs) (if applicable) |
我已在 [MemOS-Docs](https://github.com/MemTensor/MemOS-Docs) 中创建了相关的文档
issue/PR(如果适用)
- [ ] I have linked the issue to this PR (if applicable) | 我已将 issue
链接到此 PR(如果适用)
- [ ] I have mentioned the person who will review this PR | 我已提及将审查此 PR
的人
@hijzy hijzy changed the base branch from main to dev-20260126-v2.0.4 January 28, 2026 13:03
@hijzy hijzy marked this pull request as ready for review January 28, 2026 13:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants