"""Local Mem0 server HTTP client. Based on Mem0 server OpenAPI specification. Endpoints: - POST /add - Add memory - POST /search - Search memories - GET /memories - Get all memories - DELETE /delete/{memory_id} - Delete memory """ from __future__ import annotations import logging from typing import Any, Dict, List, Optional import requests logger = logging.getLogger(__name__) class LocalMem0Client: """HTTP client for self-hosted Mem0 server.""" def __init__(self, base_url: str, timeout: float = 10.0): self.base_url = base_url.rstrip("/") self.timeout = timeout self.session = requests.Session() self.session.headers.update( { "Content-Type": "application/json", "User-Agent": "hermes-agent-mem0-local-plugin/1.0.0", } ) def _request( self, method: str, endpoint: str, json: Optional[Dict] = None, params: Optional[Dict] = None, ) -> Dict: """Make HTTP request with error handling.""" url = f"{self.base_url}{endpoint}" try: resp = self.session.request( method, url, json=json, params=params, timeout=self.timeout ) resp.raise_for_status() return resp.json() except requests.exceptions.Timeout: logger.error("Mem0 request timed out after %ss", self.timeout) raise except requests.exceptions.ConnectionError as e: logger.error("Failed to connect to Mem0 server at %s: %s", self.base_url, e) raise except requests.exceptions.HTTPError as e: logger.error( "Mem0 API error: %s - %s", e.response.status_code, e.response.text ) raise def search( self, query: str, user_id: Optional[str] = None, limit: int = 5, case_insensitive: bool = False, ) -> List[Dict]: """Search memories by semantic similarity. API: POST /search Request: {query, user_id, limit} Response: {results: [{id, text, user_id, score, metadata}]} Args: query: Search query user_id: User identifier limit: Max results case_insensitive: If True, search with both original and lowercase query """ if not case_insensitive: payload = {"query": query, "limit": limit} if user_id: payload["user_id"] = user_id result = self._request("POST", "/search", json=payload) return result.get("results", []) # Case-insensitive mode: search with both original and lowercase # Fetch 2x limit to ensure we get top N after merging results_original = self._search_with_query(query, user_id, limit * 2) results_lower = self._search_with_query(query.lower(), user_id, limit * 2) # Merge and deduplicate, keeping highest score merged = {} for result in results_original + results_lower: mem_id = result.get("id") if mem_id not in merged or result.get("score", 0) > merged[mem_id].get( "score", 0 ): merged[mem_id] = result return sorted(merged.values(), key=lambda x: x.get("score", 0), reverse=True)[ :limit ] def _search_with_query( self, query: str, user_id: Optional[str] = None, limit: int = 5, ) -> List[Dict]: """Internal search helper for case-insensitive mode.""" payload = {"query": query, "limit": limit} if user_id: payload["user_id"] = user_id result = self._request("POST", "/search", json=payload) return result.get("results", []) def get_all(self, user_id: Optional[str] = None) -> List[Dict]: """Get all memories for a user. API: GET /memories?user_id=... Response: {memories: [{id, text, user_id, metadata}]} """ params = {} if user_id: params["user_id"] = user_id result = self._request("GET", "/memories", params=params) return result.get("memories", []) def add( self, message: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, metadata: Optional[Dict] = None, ) -> Dict: """Add a new memory. API: POST /add Request: {message, user_id, agent_id, metadata} Response: {success, memory_id, message} """ payload = {"message": message} if user_id: payload["user_id"] = user_id if agent_id: payload["agent_id"] = agent_id if metadata: payload["metadata"] = metadata return self._request("POST", "/add", json=payload) def delete(self, memory_id: str) -> Dict: """Delete a memory by ID. API: DELETE /delete/{memory_id} Response: {success, memory_id, message} """ return self._request("DELETE", f"/delete/{memory_id}") def health(self) -> bool: """Check if server is reachable.""" try: resp = self.session.get(f"{self.base_url}/health", timeout=5.0) return resp.status_code == 200 except requests.exceptions.RequestException: return False