From 6350c2a4287c17bd9a1446a068e3abf4903b0b15 Mon Sep 17 00:00:00 2001 From: Wenqiang Wei Date: Tue, 27 Jan 2026 22:11:56 +0800 Subject: [PATCH 1/6] fix: embedding service timeout occasionally --- src/memos/embedders/universal_api.py | 8 +- src/memos/mem_reader/multi_modal_struct.py | 85 +++++++++++++++---- .../read_multi_modal/assistant_parser.py | 3 +- .../read_multi_modal/multi_modal_parser.py | 1 + .../read_multi_modal/user_parser.py | 3 +- src/memos/mem_reader/simple_struct.py | 3 +- src/memos/multi_mem_cube/single_cube.py | 6 +- 7 files changed, 87 insertions(+), 22 deletions(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 60bae15a5..7e4d9b8d1 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -32,9 +32,15 @@ def __init__(self, config: UniversalAPIEmbedderConfig): @timed_with_status( log_prefix="model_timed_embedding", - log_extra_args={"model_name_or_path": "text-embedding-3-large"}, + log_extra_args=lambda self, texts: { + "model_name_or_path": "text-embedding-3-large", + "text_len": len(texts), + "text_content": texts, + }, ) def embed(self, texts: list[str]) -> list[list[float]]: + if isinstance(texts, str): + texts = [texts] # Truncate texts if max_tokens is configured texts = self._truncate_texts(texts) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 9edcd0a55..5fcfd3c91 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -79,12 +79,11 @@ def _split_large_memory_item( chunks = self.chunker.chunk(item_text) split_items = [] - for chunk in chunks: + def _create_chunk_item(chunk): # Chunk objects have a 'text' attribute chunk_text = chunk.text if not chunk_text or not chunk_text.strip(): - continue - + return None # Create a new memory item for each chunk, preserving original metadata split_item = self._make_memory_item( value=chunk_text, @@ -98,8 +97,17 @@ def _split_large_memory_item( key=item.metadata.key, sources=item.metadata.sources or [], background=item.metadata.background or "", + need_embed=False, ) - split_items.append(split_item) + return split_item + + # Use thread pool to parallel process chunks + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures = [executor.submit(_create_chunk_item, chunk) for chunk in chunks] + for future in concurrent.futures.as_completed(futures): + split_item = future.result() + if split_item is not None: + split_items.append(split_item) return split_items if split_items else [item] except Exception as e: @@ -127,15 +135,42 @@ def _concat_multi_modal_memories( # Split large memory items before processing processed_items = [] - for item in all_memory_items: - item_text = item.memory or "" - item_tokens = self._count_tokens(item_text) - if item_tokens > max_tokens: - # Split the large item into multiple chunks - split_items = self._split_large_memory_item(item, max_tokens) - processed_items.extend(split_items) - else: - processed_items.append(item) + # control whether to parallel chunk large memory items + parallel_chunking = True + + if parallel_chunking: + # parallel chunk large memory items + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + future_to_item = { + executor.submit(self._split_large_memory_item, item, max_tokens): item + for item in all_memory_items + if (item.memory or "") and self._count_tokens(item.memory) > max_tokens + } + # 先收集无需切分的小文本 + processed_items.extend( + [ + item + for item in all_memory_items + if not ( + (item.memory or "") and self._count_tokens(item.memory) > max_tokens + ) + ] + ) + # collect split items from futures + for future in concurrent.futures.as_completed(future_to_item): + split_items = future.result() + processed_items.extend(split_items) + else: + # serial chunk large memory items + for item in all_memory_items: + item_text = item.memory or "" + item_tokens = self._count_tokens(item_text) + if item_tokens > max_tokens: + # Split the large item into multiple chunks + split_items = self._split_large_memory_item(item, max_tokens) + processed_items.extend(split_items) + else: + processed_items.append(item) # If only one item after processing, return as-is if len(processed_items) == 1: @@ -797,13 +832,29 @@ def _process_multi_modal_data( if isinstance(scene_data_info, list): # Parse each message in the list all_memory_items = [] - for msg in scene_data_info: - items = self.multi_modal_parser.parse(msg, info, mode="fast", **kwargs) - all_memory_items.extend(items) + # Use thread pool to parse each message in parallel + with ContextThreadPoolExecutor(max_workers=30) as executor: + futures = [ + executor.submit( + self.multi_modal_parser.parse, + msg, + info, + mode="fast", + need_emb=False, + **kwargs, + ) + for msg in scene_data_info + ] + for future in concurrent.futures.as_completed(futures): + try: + items = future.result() + all_memory_items.extend(items) + except Exception as e: + logger.error(f"[MultiModalFine] Error in parallel parsing: {e}") else: # Parse as single message all_memory_items = self.multi_modal_parser.parse( - scene_data_info, info, mode="fast", **kwargs + scene_data_info, info, mode="fast", need_emb=False, **kwargs ) fast_memory_items = self._concat_multi_modal_memories(all_memory_items) if mode == "fast": diff --git a/src/memos/mem_reader/read_multi_modal/assistant_parser.py b/src/memos/mem_reader/read_multi_modal/assistant_parser.py index 3519216d2..89d4fec7f 100644 --- a/src/memos/mem_reader/read_multi_modal/assistant_parser.py +++ b/src/memos/mem_reader/read_multi_modal/assistant_parser.py @@ -210,6 +210,7 @@ def parse_fast( info: dict[str, Any], **kwargs, ) -> list[TextualMemoryItem]: + need_emb = kwargs.get("need_emb", True) if not isinstance(message, dict): logger.warning(f"[AssistantParser] Expected dict, got {type(message)}") return [] @@ -290,7 +291,7 @@ def parse_fast( status="activated", tags=["mode:fast"], key=_derive_key(line), - embedding=self.embedder.embed([line])[0], + embedding=self.embedder.embed([line])[0] if need_emb else None, usage=[], sources=sources, background="", diff --git a/src/memos/mem_reader/read_multi_modal/multi_modal_parser.py b/src/memos/mem_reader/read_multi_modal/multi_modal_parser.py index 2c8140419..808410e65 100644 --- a/src/memos/mem_reader/read_multi_modal/multi_modal_parser.py +++ b/src/memos/mem_reader/read_multi_modal/multi_modal_parser.py @@ -149,6 +149,7 @@ def parse( logger.warning(f"[MultiModalParser] No parser found for message: {message}") return [] + logger.info(f"[{parser.__class__.__name__}] Parsing message in {mode} mode: {message}") # Parse using the appropriate parser try: return parser.parse(message, info, mode=mode, **kwargs) diff --git a/src/memos/mem_reader/read_multi_modal/user_parser.py b/src/memos/mem_reader/read_multi_modal/user_parser.py index 1c9afab65..1ab48c82e 100644 --- a/src/memos/mem_reader/read_multi_modal/user_parser.py +++ b/src/memos/mem_reader/read_multi_modal/user_parser.py @@ -151,6 +151,7 @@ def parse_fast( info: dict[str, Any], **kwargs, ) -> list[TextualMemoryItem]: + need_emb = kwargs.get("need_emb", True) if not isinstance(message, dict): logger.warning(f"[UserParser] Expected dict, got {type(message)}") return [] @@ -192,7 +193,7 @@ def parse_fast( status="activated", tags=["mode:fast"], key=_derive_key(line), - embedding=self.embedder.embed([line])[0], + embedding=self.embedder.embed([line])[0] if need_emb else None, usage=[], sources=sources, background="", diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 783da763e..6f8bff4ad 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -198,6 +198,7 @@ def _make_memory_item( background: str = "", type_: str = "fact", confidence: float = 0.99, + need_embed: bool = True, **kwargs, ) -> TextualMemoryItem: """construct memory item""" @@ -213,7 +214,7 @@ def _make_memory_item( status="activated", tags=tags or [], key=key if key is not None else derive_key(value), - embedding=self.embedder.embed([value])[0], + embedding=self.embedder.embed([value])[0] if need_embed else None, usage=[], sources=sources or [], background=background, diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 426cf32be..f12ff1a1d 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -2,6 +2,7 @@ import json import os +import time import traceback from dataclasses import dataclass @@ -790,7 +791,7 @@ def _process_text_mem( extract_mode, add_req.mode, ) - + init_time = time.time() # Extract memories memories_local = self.mem_reader.get_memory( [add_req.messages], @@ -804,6 +805,9 @@ def _process_text_mem( mode=extract_mode, user_name=user_context.mem_cube_id, ) + self.logger.info( + f"Time for get_memory in extract mode {extract_mode}: {time.time() - init_time}" + ) flattened_local = [mm for m in memories_local for mm in m] # Explicitly set source_doc_id to metadata if present in info From c19f87d06a28cb470d6cd7a011989bd9befa2c0b Mon Sep 17 00:00:00 2001 From: Wenqiang Wei Date: Tue, 27 Jan 2026 22:14:59 +0800 Subject: [PATCH 2/6] style: remove useless comment --- src/memos/mem_reader/multi_modal_struct.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 5fcfd3c91..5579e3a9e 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -146,7 +146,6 @@ def _concat_multi_modal_memories( for item in all_memory_items if (item.memory or "") and self._count_tokens(item.memory) > max_tokens } - # 先收集无需切分的小文本 processed_items.extend( [ item From 7ed19840d49569a3b2756bd1c9fa85bfc7bc55f5 Mon Sep 17 00:00:00 2001 From: Wenqiang Wei Date: Wed, 28 Jan 2026 11:21:46 +0800 Subject: [PATCH 3/6] fix: add standby link for embedding --- src/memos/embedders/universal_api.py | 51 ++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 7e4d9b8d1..bf27950bd 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -1,3 +1,7 @@ +import asyncio +import os +import time + from openai import AzureOpenAI as AzureClient from openai import OpenAI as OpenAIClient @@ -43,14 +47,49 @@ def embed(self, texts: list[str]) -> list[list[float]]: texts = [texts] # Truncate texts if max_tokens is configured texts = self._truncate_texts(texts) - + logger.info(f"Embeddings request with input: {texts}") if self.provider == "openai" or self.provider == "azure": try: - response = self.client.embeddings.create( - model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), - input=texts, - ) - return [r.embedding for r in response.data] + # use asyncio.wait_for to implement 3 seconds timeout, fallback to default client if timeout + async def _create_embeddings(): + return self.client.embeddings.create( + model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), + input=texts, + timeout=self.config.timeout, + ) + + try: + # wait for environment variable specified timeout (5 seconds), trigger asyncio.TimeoutError if timeout + init_time = time.time() + response = asyncio.run( + asyncio.wait_for( + _create_embeddings(), timeout=os.getenv("MOS_EMBEDDER_TIMEOUT", 5) + ) + ) + logger.info( + f"Embeddings request succeeded with {time.time() - init_time} seconds" + ) + return [r.embedding for r in response.data] + except asyncio.TimeoutError: + logger.warning( + f"Embeddings request timed out after {os.getenv('MOS_EMBEDDER_TIMEOUT', 5)} seconds, fallback to default client" + ) + client = OpenAIClient( + api_key=os.getenv("OPENAI_API_KEY", "sk-xxxx"), + base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), + default_headers=self.config.headers_extra + if self.config.headers_extra + else None, + ) + init_time = time.time() + response = client.embeddings.create( + model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), + input=texts, + ) + logger.info( + f"Embeddings request using default client succeeded with {time.time() - init_time} seconds" + ) + return [r.embedding for r in response.data] except Exception as e: raise Exception(f"Embeddings request ended with error: {e}") from e else: From d3166bc3a2df6fb63fb05f2b2dd76fa921e7c27b Mon Sep 17 00:00:00 2001 From: Wenqiang Wei Date: Wed, 28 Jan 2026 12:00:52 +0800 Subject: [PATCH 4/6] fix: remove timeout attribute from UniversalEmbedderConfig --- src/memos/embedders/universal_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index bf27950bd..d786bc1ed 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -55,7 +55,6 @@ async def _create_embeddings(): return self.client.embeddings.create( model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), input=texts, - timeout=self.config.timeout, ) try: From 44536a49b0f0489c26a250af1533c8e6a60d215c Mon Sep 17 00:00:00 2001 From: Wenqiang Wei Date: Wed, 28 Jan 2026 15:37:01 +0800 Subject: [PATCH 5/6] fix: timeout parameter format error --- src/memos/embedders/universal_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index d786bc1ed..5747bbdc3 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -62,7 +62,7 @@ async def _create_embeddings(): init_time = time.time() response = asyncio.run( asyncio.wait_for( - _create_embeddings(), timeout=os.getenv("MOS_EMBEDDER_TIMEOUT", 5) + _create_embeddings(), timeout=int(os.getenv("MOS_EMBEDDER_TIMEOUT", 5)) ) ) logger.info( From bccb12734cad31461634a4adb63fe24da22fb3eb Mon Sep 17 00:00:00 2001 From: Wenqiang Wei Date: Wed, 28 Jan 2026 19:47:32 +0800 Subject: [PATCH 6/6] fix: add backup client when creating embedder --- src/memos/api/config.py | 12 ++++ src/memos/configs/embedder.py | 17 ++++++ src/memos/embedders/universal_api.py | 83 +++++++++++++++++----------- 3 files changed, 79 insertions(+), 33 deletions(-) diff --git a/src/memos/api/config.py b/src/memos/api/config.py index a3bf25be0..024de4af5 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -440,6 +440,18 @@ def get_embedder_config() -> dict[str, Any]: "model_name_or_path": os.getenv("MOS_EMBEDDER_MODEL", "text-embedding-3-large"), "headers_extra": json.loads(os.getenv("MOS_EMBEDDER_HEADERS_EXTRA", "{}")), "base_url": os.getenv("MOS_EMBEDDER_API_BASE", "http://openai.com"), + "backup_client": os.getenv("MOS_EMBEDDER_BACKUP_CLIENT", "false").lower() + == "true", + "backup_base_url": os.getenv( + "MOS_EMBEDDER_BACKUP_API_BASE", "http://openai.com" + ), + "backup_api_key": os.getenv("MOS_EMBEDDER_BACKUP_API_KEY", "sk-xxxx"), + "backup_headers_extra": json.loads( + os.getenv("MOS_EMBEDDER_BACKUP_HEADERS_EXTRA", "{}") + ), + "backup_model_name_or_path": os.getenv( + "MOS_EMBEDDER_BACKUP_MODEL", "text-embedding-3-large" + ), }, } else: # ollama diff --git a/src/memos/configs/embedder.py b/src/memos/configs/embedder.py index c2e648247..050043ab0 100644 --- a/src/memos/configs/embedder.py +++ b/src/memos/configs/embedder.py @@ -58,6 +58,23 @@ class UniversalAPIEmbedderConfig(BaseEmbedderConfig): base_url: str | None = Field( default=None, description="Optional base URL for custom or proxied endpoint" ) + backup_client: bool = Field( + default=False, + description="Whether to use backup client", + ) + backup_base_url: str | None = Field( + default=None, description="Optional backup base URL for custom or proxied endpoint" + ) + backup_api_key: str | None = Field( + default=None, description="Optional backup API key for the embedding provider" + ) + backup_headers_extra: dict[str, Any] | None = Field( + default=None, + description="Extra headers for the backup embedding model", + ) + backup_model_name_or_path: str | None = Field( + default=None, description="Optional backup model name or path" + ) class EmbedderConfigFactory(BaseConfig): diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 5747bbdc3..d2bdf9318 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -33,6 +33,15 @@ def __init__(self, config: UniversalAPIEmbedderConfig): ) else: raise ValueError(f"Embeddings unsupported provider: {self.provider}") + self.use_backup_client = config.backup_client + if self.use_backup_client: + self.backup_client = OpenAIClient( + api_key=config.backup_api_key, + base_url=config.backup_base_url, + default_headers=config.backup_headers_extra + if config.backup_headers_extra + else None, + ) @timed_with_status( log_prefix="model_timed_embedding", @@ -50,46 +59,54 @@ def embed(self, texts: list[str]) -> list[list[float]]: logger.info(f"Embeddings request with input: {texts}") if self.provider == "openai" or self.provider == "azure": try: - # use asyncio.wait_for to implement 3 seconds timeout, fallback to default client if timeout + async def _create_embeddings(): return self.client.embeddings.create( model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), input=texts, ) - try: - # wait for environment variable specified timeout (5 seconds), trigger asyncio.TimeoutError if timeout - init_time = time.time() - response = asyncio.run( - asyncio.wait_for( - _create_embeddings(), timeout=int(os.getenv("MOS_EMBEDDER_TIMEOUT", 5)) - ) - ) - logger.info( - f"Embeddings request succeeded with {time.time() - init_time} seconds" - ) - return [r.embedding for r in response.data] - except asyncio.TimeoutError: - logger.warning( - f"Embeddings request timed out after {os.getenv('MOS_EMBEDDER_TIMEOUT', 5)} seconds, fallback to default client" - ) - client = OpenAIClient( - api_key=os.getenv("OPENAI_API_KEY", "sk-xxxx"), - base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), - default_headers=self.config.headers_extra - if self.config.headers_extra - else None, - ) - init_time = time.time() - response = client.embeddings.create( - model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), - input=texts, + init_time = time.time() + response = asyncio.run( + asyncio.wait_for( + _create_embeddings(), timeout=int(os.getenv("MOS_EMBEDDER_TIMEOUT", 5)) ) - logger.info( - f"Embeddings request using default client succeeded with {time.time() - init_time} seconds" - ) - return [r.embedding for r in response.data] + ) + logger.info(f"Embeddings request succeeded with {time.time() - init_time} seconds") + logger.info(f"Embeddings request response: {response}") + return [r.embedding for r in response.data] except Exception as e: - raise Exception(f"Embeddings request ended with error: {e}") from e + logger.warning( + f"Embeddings request ended with {type(e).__name__} error: {e}, try backup client" + ) + if self.use_backup_client: + try: + + async def _create_embeddings_backup(): + return self.backup_client.embeddings.create( + model=getattr( + self.config, + "backup_model_name_or_path", + "text-embedding-3-large", + ), + input=texts, + ) + + init_time = time.time() + response = asyncio.run( + asyncio.wait_for( + _create_embeddings_backup(), + timeout=int(os.getenv("MOS_EMBEDDER_TIMEOUT", 5)), + ) + ) + logger.info( + f"Backup embeddings request succeeded with {time.time() - init_time} seconds" + ) + logger.info(f"Backup embeddings request response: {response}") + return [r.embedding for r in response.data] + except Exception as e: + raise ValueError(f"Backup embeddings request ended with error: {e}") from e + else: + raise ValueError(f"Embeddings request ended with error: {e}") from e else: raise ValueError(f"Embeddings unsupported provider: {self.provider}")