#!/usr/bin/env python3 # ────────────────────────────────────────────────────────────── # Memory Toolkit – async SQLite + simple EventEmitter # Only the methods of `class Tools` are intended for public use. # ────────────────────────────────────────────────────────────── import asyncio, timedelta from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple import aiosqlite import json # ---------------------------------------------------------------------- # 1️⃣ Simple event emitter – can be wired to a UI or logging function # ---------------------------------------------------------------------- class EventEmitter: """Very small helper that forwards status updates to an async callback.""" def __init__(self, cb: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None): self._cb = cb async def emit(self, status: str, description: str, done: bool = False) -> None: if self._cb: await self._cb( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) # ---------------------------------------------------------------------- # 2️⃣ Data model – used only internally # ---------------------------------------------------------------------- @dataclass class Memory: id: Optional[int] = None name: str = "" content: str = "" type: Optional[str] = None tags: List[str] = field(default_factory=list) created_at: Optional[datetime] = None updated_at: Optional[datetime] = None rel_id: Optional[int] = None vector: Optional[bytes] = None # ---------------------------------------------------------------------- # 3️⃣ Database helper – one persistent async connection per process # ---------------------------------------------------------------------- class _MemoryDB: def __init__(self, db_path: Path): self.db_path = db_path self._conn: Optional[aiosqlite.Connection] = None async def connect(self) -> aiosqlite.Connection: if not self._conn: self._conn = await aiosqlite.connect(str(self.db_path)) # Enable foreign‑key constraints just in case we add them later await self._conn.execute("PRAGMA foreign_keys=ON;") await self._conn.commit() return self._conn async def close(self) -> None: if self._conn: await self._conn.close() self._conn = None # ------------------------------------------------------------------ # Schema # ------------------------------------------------------------------ async def init_db(self) -> None: conn = await self.connect() await conn.executescript( """ CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, content TEXT NOT NULL, type TEXT, tags JSON DEFAULT '[]', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP, rel_id INTEGER, vector BLOB ); CREATE INDEX IF NOT EXISTS idx_mem_name ON memories(name); CREATE INDEX IF NOT EXISTS idx_mem_type ON memories(type); """ ) await conn.commit() # ------------------------------------------------------------------ # CRUD helpers (used only by `Tools`) # ------------------------------------------------------------------ async def add(self, mem: Memory) -> int: conn = await self.connect() cur = await conn.execute( """ INSERT INTO memories (name, content, type, tags, rel_id, vector) VALUES (?, ?, ?, ?, ?, ?); """, ( mem.name, mem.content, mem.type, json.dumps(mem.tags), mem.rel_id, mem.vector, ), ) await conn.commit() return cur.lastrowid async def get(self, memory_id: int) -> Optional[Memory]: conn = await self.connect() cur = await conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)) row = await cur.fetchone() return self._row_to_memory(row) if row else None async def update(self, memory_id: int, content: str) -> None: conn = await self.connect() await conn.execute( """ UPDATE memories SET content = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?; """, (content, memory_id), ) await conn.commit() async def delete(self, memory_id: int) -> None: conn = await self.connect() await conn.execute("DELETE FROM memories WHERE id = ?", (memory_id,)) await conn.commit() async def list_all(self) -> List[Memory]: conn = await self.connect() cur = await conn.execute("SELECT * FROM memories ORDER BY created_at DESC;") rows = await cur.fetchall() return [self._row_to_memory(r) for r in rows] async def search( self, query: str = "", tags: Optional[List[str]] = None, type_: Optional[str] = None, ) -> List[Memory]: """ Simple SQL‑based filtering. For production you probably want FTS5 or a vector index. """ conn = await self.connect() where_clauses: List[str] = ["1=1"] params: List[Any] = [] if query: like_q = f"%{query}%" where_clauses.append("(name LIKE ? OR content LIKE ?)") params.extend([like_q, like_q]) if tags: for tag in tags: # naive “tag must appear somewhere inside the JSON array” where_clauses.append("tags LIKE ?") params.append(f"%{tag}%") if type_: where_clauses.append("type = ?") params.append(type_) sql = ( "SELECT * FROM memories WHERE " + " AND ".join(where_clauses) + " ORDER BY created_at DESC;" ) cur = await conn.execute(sql, tuple(params)) rows = await cur.fetchall() return [self._row_to_memory(r) for r in rows] # ------------------------------------------------------------------ # Helper – convert a row to Memory # ------------------------------------------------------------------ @staticmethod def _row_to_memory(row: Tuple[Any, ...]) -> Memory: return Memory( id=row[0], name=row[1], content=row[2], type=row[3], tags=json.loads(row[4]), created_at=datetime.fromisoformat(row[5]), updated_at=datetime.fromisoformat(row[6]) if row[6] else None, rel_id=row[7], vector=row[8], ) # ---------------------------------------------------------------------- # 4️⃣ Public Tools class – only these methods are visible to the LLM # ---------------------------------------------------------------------- class Tools: """ High‑level wrapper that the LLM can call. All arguments and return values are plain Python data types – strings, lists, dicts, integers. The LLM will not see any of the helper classes defined above. """ def __init__(self, db_path: str | Path = "/app/memories.db"): self._db = _MemoryDB(Path(db_path)) # Initialise on first use – you can also call init_db() manually. asyncio.create_task(self._db.init_db()) def _memory_to_dict(self, mem: Memory) -> Dict[str, Any]: """Converts a Memory object to a serializable dictionary.""" mem_dict = mem.__dict__ if isinstance(mem_dict.get("created_at"), datetime): mem_dict["created_at"] = mem_dict["created_at"].isoformat() if isinstance(mem_dict.get("updated_at"), datetime): mem_dict["updated_at"] = mem_dict["updated_at"].isoformat() if isinstance(mem_dict.get("vector"), bytes): mem_dict["vector"] = None # Or a base64 representation if needed return mem_dict async def add_memory( self, name: str, content: str, type_: Optional[str] = None, tags: Optional[List[str]] = None, rel_id: Optional[int] = None, __event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None, ) -> str: emitter = EventEmitter(__event_emitter__) await emitter.emit("in_progress", "Adding memory…") try: mem = Memory( name=name, content=content, type=type_, tags=tags or [], rel_id=rel_id, ) mem_id = await self._db.add(mem) await emitter.emit("success", "Memory added successfully.", True) return str(mem_id) except Exception as e: await emitter.emit("error", f"Error adding memory: {e}") raise async def get_memory( self, memory_id: int, __event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None, ) -> str | None: emitter = EventEmitter(__event_emitter__) await emitter.emit("in_progress", "Retrieving memory…") try: mem = await self._db.get(memory_id) if not mem: await emitter.emit("error", f"No memory found with id {memory_id}") return None await emitter.emit("success", "Memory retrieved successfully.", True) # Convert to plain dict – the LLM can easily use it. return json.dumps(self._memory_to_dict(mem)) except Exception as e: await emitter.emit("error", f"Error retrieving memory: {e}") raise async def update_memory( self, memory_id: int, content: str, __event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None, ) -> None: emitter = EventEmitter(__event_emitter__) await emitter.emit("in_progress", "Updating memory…") try: await self._db.update(memory_id, content) await emitter.emit("success", "Memory updated successfully.", True) except Exception as e: await emitter.emit("error", f"Error updating memory: {e}") raise async def delete_memory( self, memory_id: int, __event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None, ) -> None: emitter = EventEmitter(__event_emitter__) await emitter.emit("in_progress", "Deleting memory…") try: await self._db.delete(memory_id) await emitter.emit("success", "Memory deleted successfully.", True) except Exception as e: await emitter.emit("error", f"Error deleting memory: {e}") raise async def list_memories( self, __event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None, ) -> str: emitter = EventEmitter(__event_emitter__) await emitter.emit("in_progress", "Listing memories…") try: mems = await self._db.list_all() await emitter.emit( "success", f"Listed {len(mems)} memory(ies) successfully.", True, ) return json.dumps([self._memory_to_dict(m) for m in mems]) except Exception as e: await emitter.emit("error", f"Error listing memories: {e}") raise async def search_memories( self, tags: Optional[List[str]] = None, type_: Optional[str] = None, __event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None, ) -> str: emitter = EventEmitter(__event_emitter__) await emitter.emit("in_progress", "Searching memories…") try: mems = await self._db.search(query=None, tags=tags, type_=type_) await emitter.emit( "success", f"Found {len(mems)} matching memory(ies).", True, ) return json.dumps([self._memory_to_dict(m) for m in mems]) except Exception as e: await emitter.emit("error", f"Error searching memories: {e}") raise # ---------------------------------------------------------------------- # 5️⃣ Optional – simple demo (run this file directly) # ---------------------------------------------------------------------- if __name__ == "__main__": async def main(): # Simple console logger for the event emitter async def log(msg): print(msg) tools = Tools("memories.db") await asyncio.sleep(0.1) # give init_db a chance to finish # 1️⃣ Add a memory mem_id_str = await tools.add_memory( name="WiFi password", content="d65b73d17aab", type_="password", tags=["wifi", "security"], __event_emitter__=log, ) print("New memory id:", mem_id_str) mem_id = int(mem_id_str) # 2️⃣ Search by tag results_str = await tools.search_memories(tags=["wifi"], __event_emitter__=log) results = json.loads(results_str) print(f"Search found {len(results)} result(s).") # 3️⃣ Get the exact memory mem_detail_str = await tools.get_memory(mem_id, __event_emitter__=log) mem_detail = json.loads(mem_detail_str) if mem_detail_str else None print("Memory detail:", mem_detail) asyncio.run(main())