a1240adbb9
- Generalized OpenAPI spec reference in client.py - Removed personal IP addresses and user IDs from README - Added MEM0_PREFETCH_LIMIT and MEM0_PREFETCH_SCORE_THRESHOLD to documentation - Updated example configurations with generic values
130 lines
3.8 KiB
Python
130 lines
3.8 KiB
Python
"""Local Mem0 server HTTP client.
|
|
|
|
Based on Mem0 server OpenAPI specification.
|
|
|
|
Endpoints:
|
|
- POST /add - Add memory
|
|
- POST /search - Search memories
|
|
- GET /memories - Get all memories
|
|
- DELETE /delete/{memory_id} - Delete memory
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LocalMem0Client:
|
|
"""HTTP client for self-hosted Mem0 server."""
|
|
|
|
def __init__(self, base_url: str, timeout: float = 10.0):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
self.session.headers.update(
|
|
{
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "hermes-agent-mem0-local-plugin/1.0.0",
|
|
}
|
|
)
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
json: Optional[Dict] = None,
|
|
params: Optional[Dict] = None,
|
|
) -> Dict:
|
|
"""Make HTTP request with error handling."""
|
|
url = f"{self.base_url}{endpoint}"
|
|
try:
|
|
resp = self.session.request(
|
|
method, url, json=json, params=params, timeout=self.timeout
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.Timeout:
|
|
logger.error("Mem0 request timed out after %ss", self.timeout)
|
|
raise
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error("Failed to connect to Mem0 server at %s: %s", self.base_url, e)
|
|
raise
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(
|
|
"Mem0 API error: %s - %s", e.response.status_code, e.response.text
|
|
)
|
|
raise
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
user_id: Optional[str] = None,
|
|
limit: int = 5,
|
|
) -> List[Dict]:
|
|
"""Search memories by semantic similarity.
|
|
|
|
API: POST /search
|
|
Request: {query, user_id, limit}
|
|
Response: {results: [{id, text, user_id, score, metadata}]}
|
|
"""
|
|
payload = {"query": query, "limit": limit}
|
|
if user_id:
|
|
payload["user_id"] = user_id
|
|
result = self._request("POST", "/search", json=payload)
|
|
return result.get("results", [])
|
|
|
|
def get_all(self, user_id: Optional[str] = None) -> List[Dict]:
|
|
"""Get all memories for a user.
|
|
|
|
API: GET /memories?user_id=...
|
|
Response: {memories: [{id, text, user_id, metadata}]}
|
|
"""
|
|
params = {}
|
|
if user_id:
|
|
params["user_id"] = user_id
|
|
result = self._request("GET", "/memories", params=params)
|
|
return result.get("memories", [])
|
|
|
|
def add(
|
|
self,
|
|
message: str,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
metadata: Optional[Dict] = None,
|
|
) -> Dict:
|
|
"""Add a new memory.
|
|
|
|
API: POST /add
|
|
Request: {message, user_id, agent_id, metadata}
|
|
Response: {success, memory_id, message}
|
|
"""
|
|
payload = {"message": message}
|
|
if user_id:
|
|
payload["user_id"] = user_id
|
|
if agent_id:
|
|
payload["agent_id"] = agent_id
|
|
if metadata:
|
|
payload["metadata"] = metadata
|
|
return self._request("POST", "/add", json=payload)
|
|
|
|
def delete(self, memory_id: str) -> Dict:
|
|
"""Delete a memory by ID.
|
|
|
|
API: DELETE /delete/{memory_id}
|
|
Response: {success, memory_id, message}
|
|
"""
|
|
return self._request("DELETE", f"/delete/{memory_id}")
|
|
|
|
def health(self) -> bool:
|
|
"""Check if server is reachable."""
|
|
try:
|
|
resp = self.session.get(f"{self.base_url}/health", timeout=5.0)
|
|
return resp.status_code == 200
|
|
except requests.exceptions.RequestException:
|
|
return False
|