Files
ARIA 32b97bee87 Add configurable case-insensitive search
- Add MEM0_CASE_INSENSITIVE config option (default: false)
- When enabled, searches with both original and lowercase query
- Merges results, keeping highest score for each memory
- Fixes case sensitivity issues with Qdrant embeddings
- Generalize after-install.md with placeholders instead of personal values
2026-04-15 18:19:52 +02:00

169 lines
5.3 KiB
Python

"""Local Mem0 server HTTP client.
Based on Mem0 server OpenAPI specification.
Endpoints:
- POST /add - Add memory
- POST /search - Search memories
- GET /memories - Get all memories
- DELETE /delete/{memory_id} - Delete memory
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
class LocalMem0Client:
"""HTTP client for self-hosted Mem0 server."""
def __init__(self, base_url: str, timeout: float = 10.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/json",
"User-Agent": "hermes-agent-mem0-local-plugin/1.0.0",
}
)
def _request(
self,
method: str,
endpoint: str,
json: Optional[Dict] = None,
params: Optional[Dict] = None,
) -> Dict:
"""Make HTTP request with error handling."""
url = f"{self.base_url}{endpoint}"
try:
resp = self.session.request(
method, url, json=json, params=params, timeout=self.timeout
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.Timeout:
logger.error("Mem0 request timed out after %ss", self.timeout)
raise
except requests.exceptions.ConnectionError as e:
logger.error("Failed to connect to Mem0 server at %s: %s", self.base_url, e)
raise
except requests.exceptions.HTTPError as e:
logger.error(
"Mem0 API error: %s - %s", e.response.status_code, e.response.text
)
raise
def search(
self,
query: str,
user_id: Optional[str] = None,
limit: int = 5,
case_insensitive: bool = False,
) -> List[Dict]:
"""Search memories by semantic similarity.
API: POST /search
Request: {query, user_id, limit}
Response: {results: [{id, text, user_id, score, metadata}]}
Args:
query: Search query
user_id: User identifier
limit: Max results
case_insensitive: If True, search with both original and lowercase query
"""
if not case_insensitive:
payload = {"query": query, "limit": limit}
if user_id:
payload["user_id"] = user_id
result = self._request("POST", "/search", json=payload)
return result.get("results", [])
# Case-insensitive mode: search with both original and lowercase
# Fetch 2x limit to ensure we get top N after merging
results_original = self._search_with_query(query, user_id, limit * 2)
results_lower = self._search_with_query(query.lower(), user_id, limit * 2)
# Merge and deduplicate, keeping highest score
merged = {}
for result in results_original + results_lower:
mem_id = result.get("id")
if mem_id not in merged or result.get("score", 0) > merged[mem_id].get(
"score", 0
):
merged[mem_id] = result
return sorted(merged.values(), key=lambda x: x.get("score", 0), reverse=True)[
:limit
]
def _search_with_query(
self,
query: str,
user_id: Optional[str] = None,
limit: int = 5,
) -> List[Dict]:
"""Internal search helper for case-insensitive mode."""
payload = {"query": query, "limit": limit}
if user_id:
payload["user_id"] = user_id
result = self._request("POST", "/search", json=payload)
return result.get("results", [])
def get_all(self, user_id: Optional[str] = None) -> List[Dict]:
"""Get all memories for a user.
API: GET /memories?user_id=...
Response: {memories: [{id, text, user_id, metadata}]}
"""
params = {}
if user_id:
params["user_id"] = user_id
result = self._request("GET", "/memories", params=params)
return result.get("memories", [])
def add(
self,
message: str,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> Dict:
"""Add a new memory.
API: POST /add
Request: {message, user_id, agent_id, metadata}
Response: {success, memory_id, message}
"""
payload = {"message": message}
if user_id:
payload["user_id"] = user_id
if agent_id:
payload["agent_id"] = agent_id
if metadata:
payload["metadata"] = metadata
return self._request("POST", "/add", json=payload)
def delete(self, memory_id: str) -> Dict:
"""Delete a memory by ID.
API: DELETE /delete/{memory_id}
Response: {success, memory_id, message}
"""
return self._request("DELETE", f"/delete/{memory_id}")
def health(self) -> bool:
"""Check if server is reachable."""
try:
resp = self.session.get(f"{self.base_url}/health", timeout=5.0)
return resp.status_code == 200
except requests.exceptions.RequestException:
return False