From 7d4cd463d7d52e36e280b9a5b06cbecff3ee969c Mon Sep 17 00:00:00 2001 From: ARIA Date: Sun, 12 Apr 2026 17:12:02 +0200 Subject: [PATCH] Add configurable prefetch limit and score threshold for mem0 - Add prefetch_limit config (default: 3) to control max memories returned - Add prefetch_score_threshold config (default: 60%) to filter low-similarity results - Add error messages when Mem0 API is unavailable - Fix circuit breaker to not trip on successful API calls with no matching memories - Properly type-convert config values from JSON --- __init__.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/__init__.py b/__init__.py index f1b8bdb..3834bba 100644 --- a/__init__.py +++ b/__init__.py @@ -46,6 +46,8 @@ def _load_config() -> dict: "agent_id": os.environ.get("MEM0_AGENT_ID", "hermes"), "rerank": True, "timeout": 10.0, + "prefetch_limit": 3, + "prefetch_score_threshold": 60, } config_path = get_hermes_home() / "mem0-local.json" @@ -146,6 +148,8 @@ class Mem0LocalMemoryProvider(MemoryProvider): self._user_id = "hermes-user" self._agent_id = "hermes" self._rerank = True + self._prefetch_limit = 3 + self._prefetch_score_threshold = 60 self._prefetch_result = "" self._prefetch_lock = threading.Lock() self._prefetch_thread = None @@ -206,6 +210,16 @@ class Mem0LocalMemoryProvider(MemoryProvider): "description": "Request timeout in seconds", "default": "10.0", }, + { + "key": "prefetch_limit", + "description": "Max memories to prefetch (pre-LLM hook)", + "default": "3", + }, + { + "key": "prefetch_score_threshold", + "description": "Min similarity score % to include memory (0-100)", + "default": "60", + }, ] def _get_client(self) -> LocalMem0Client: @@ -219,6 +233,10 @@ class Mem0LocalMemoryProvider(MemoryProvider): self._user_id = self._config.get("user_id", "hermes-user") self._agent_id = self._config.get("agent_id", "hermes") self._rerank = self._config.get("rerank", True) + self._prefetch_limit = int(self._config.get("prefetch_limit", 3)) + self._prefetch_score_threshold = int( + self._config.get("prefetch_score_threshold", 60) + ) base_url = self._config.get("base_url", "http://localhost:8000") timeout = float(self._config.get("timeout", 10.0)) self._client = LocalMem0Client(base_url, timeout=timeout) @@ -267,6 +285,10 @@ class Mem0LocalMemoryProvider(MemoryProvider): ) self._agent_id = self._config.get("agent_id", "hermes") self._rerank = self._config.get("rerank", True) + self._prefetch_limit = int(self._config.get("prefetch_limit", 3)) + self._prefetch_score_threshold = int( + self._config.get("prefetch_score_threshold", 60) + ) def system_prompt_block(self) -> str: return ( @@ -290,27 +312,39 @@ class Mem0LocalMemoryProvider(MemoryProvider): self._prefetch_result = "" if not result: return "" + # Check if it's an error message + if result.startswith("ERROR:"): + return f"## Mem0 Error\n{result[6:]}" return f"## Mem0 Memory\n{result}" def queue_prefetch_and_get(self, query: str) -> str: """Sync prefetch for pre_llm_call hook - returns memory context immediately.""" if self._is_breaker_open(): - return "" + return ( + "ERROR:Memory service temporarily unavailable. Please try again later." + ) try: client = self._get_client() results = client.search( query=query, user_id=self._user_id, - limit=5, + limit=self._prefetch_limit, ) - if results: - formatted = self._format_search_results(results) + # Filter by score threshold + threshold = self._prefetch_score_threshold / 100.0 + filtered = [r for r in results if r.get("score", 0) >= threshold] + if filtered: + formatted = self._format_search_results(filtered) if formatted: self._record_success() return formatted + self._record_success() except Exception as e: self._record_failure() logger.debug("Mem0 prefetch failed: %s", e) + return ( + "ERROR:Memory service temporarily unavailable. Please try again later." + ) return "" def queue_prefetch(self, query: str, *, session_id: str = "") -> None: @@ -321,6 +355,8 @@ class Mem0LocalMemoryProvider(MemoryProvider): session_id: Unused. Kept for API compatibility. """ if self._is_breaker_open(): + with self._prefetch_lock: + self._prefetch_result = "ERROR:Memory service temporarily unavailable. Please try again later." return def _run(): @@ -329,16 +365,24 @@ class Mem0LocalMemoryProvider(MemoryProvider): results = client.search( query=query, user_id=self._user_id, - limit=5, + limit=self._prefetch_limit, ) - if results: - formatted = self._format_search_results(results) + # Filter by score threshold + threshold = self._prefetch_score_threshold / 100.0 + filtered = [r for r in results if r.get("score", 0) >= threshold] + if filtered: + formatted = self._format_search_results(filtered) with self._prefetch_lock: self._prefetch_result = formatted + else: + with self._prefetch_lock: + self._prefetch_result = "" self._record_success() except Exception as e: self._record_failure() logger.debug("Mem0 prefetch failed: %s", e) + with self._prefetch_lock: + self._prefetch_result = "ERROR:Memory service temporarily unavailable. Please try again later." self._prefetch_thread = threading.Thread( target=_run, daemon=True, name="mem0-local-prefetch" @@ -504,7 +548,10 @@ def register(ctx) -> None: try: results = provider.queue_prefetch_and_get(user_message) if results: - return {"context": results} + # Error messages get their own header, memories get standard header + if results.startswith("ERROR:"): + return {"context": f"## Mem0 Error\n{results[6:]}"} + return {"context": f"## Mem0 Memory\n{results}"} except Exception as e: logger.debug("Mem0 pre_llm_call hook failed: %s", e) return {} -- 2.52.0