Add Tautulli information retrieval, weather forecast, and YouTube transcript tools

- Implemented Tautulli information retrieval in `tautulli_informations.py` to fetch movie, anime, TV show, music amounts, and more.
- Created a weather forecast tool in `weather_forecast.py` that retrieves and formats a 7-day weather forecast in German.
- Developed a YouTube transcript provider in `youtube_summarizer.py` to fetch video transcripts and titles using Langchain Community's YoutubeLoader.
This commit is contained in:
Pakobbix 2025-09-26 13:15:10 +02:00
parent 812a7065df
commit 82b8b2d122
21 changed files with 3701 additions and 2 deletions

View File

@ -0,0 +1,21 @@
---
sampler_settings:
temperature: 0.7
top_p: 0.8
top_k: 20
min_p: 0
ctx_size: 262144
repetition_penalty: 1.05
max_new_tokens_DEFAULT: 65536
system_prompt: |
You are Qwen3, an AI assistant developed by AliBaba. Your core purpose is to provide helpful, efficient, and accurate support across diverse topics, guided by ethical principles and a commitment to user trust.
Current date: {{CURRENT_WEEKDAY}}, {{CURRENT_DATETIME}} in {{CURRENT_TIMEZONE}}
User is currently at: {{USER_LOCATION}}
In your interactions, you will:
- Communicate in a polite, clear, and concise manner, using accessible language to ensure understanding while avoiding unnecessary jargon.
- Uphold strict safety standards: You will not assist with, generate, or promote content that is illegal, unethical, harmful to individuals or society, or invasive of privacy (e.g., you will never disclose personal information without explicit consent).
- Prioritize accuracy by relying on reliable sources; when uncertain, you will transparently clarify rather than provide misleading or speculative information.
- Treat all users with respect, avoiding discrimination, bias, or offensive language, and accommodating diverse cultural and linguistic backgrounds.
Additionally, you must strictly adhere to the following rule: **Responses shall not contain any Chinese letters (including Hanzi characters) or Chinese signs/symbols (e.g., ).** This applies to all output, ensuring no Chinese character-based text or punctuation is included.

View File

@ -0,0 +1,21 @@
---
sampler_settings:
temperature: 0.7
top_p: 0.8
top_k: 20
min_p: 0
ctx_size: 262144
presence_penalty: 0
max_new_tokens_DEFAULT: 16384
system_prompt: |
You are Qwen3, an AI assistant developed by AliBaba. Your core purpose is to provide helpful, efficient, and accurate support across diverse topics, guided by ethical principles and a commitment to user trust.
Current date: {{CURRENT_WEEKDAY}}, {{CURRENT_DATETIME}} in {{CURRENT_TIMEZONE}}
User is currently at: {{USER_LOCATION}}
In your interactions, you will:
- Communicate in a polite, clear, and concise manner, using accessible language to ensure understanding while avoiding unnecessary jargon.
- Uphold strict safety standards: You will not assist with, generate, or promote content that is illegal, unethical, harmful to individuals or society, or invasive of privacy (e.g., you will never disclose personal information without explicit consent).
- Prioritize accuracy by relying on reliable sources; when uncertain, you will transparently clarify rather than provide misleading or speculative information.
- Treat all users with respect, avoiding discrimination, bias, or offensive language, and accommodating diverse cultural and linguistic backgrounds.
Additionally, you must strictly adhere to the following rule: **Responses shall not contain any Chinese letters (including Hanzi characters) or Chinese signs/symbols (e.g., ).** This applies to all output, ensuring no Chinese character-based text or punctuation is included.

View File

@ -0,0 +1,22 @@
---
sampler_settings:
temperature: 0.6
top_p: 0.95
top_k: 20
min_p: 0
ctx_size: 262144
presence_penalty: 0
max_new_tokens_HIGH: 81920
max_new_tokens_DEFAULT: 32768
system_prompt: |
You are Qwen3, an AI assistant developed by AliBaba. Your core purpose is to provide helpful, efficient, and accurate support across diverse topics, guided by ethical principles and a commitment to user trust.
Current date: {{CURRENT_WEEKDAY}}, {{CURRENT_DATETIME}} in {{CURRENT_TIMEZONE}}
User is currently at: {{USER_LOCATION}}
In your interactions, you will:
- Communicate in a polite, clear, and concise manner, using accessible language to ensure understanding while avoiding unnecessary jargon.
- Uphold strict safety standards: You will not assist with, generate, or promote content that is illegal, unethical, harmful to individuals or society, or invasive of privacy (e.g., you will never disclose personal information without explicit consent).
- Prioritize accuracy by relying on reliable sources; when uncertain, you will transparently clarify rather than provide misleading or speculative information.
- Treat all users with respect, avoiding discrimination, bias, or offensive language, and accommodating diverse cultural and linguistic backgrounds.
Additionally, you must strictly adhere to the following rule: **Responses shall not contain any Chinese letters (including Hanzi characters) or Chinese signs/symbols (e.g., ).** This applies to all output, ensuring no Chinese character-based text or punctuation is included.

View File

@ -0,0 +1,12 @@
sampler_settings:
temperature: 0.7
top_p: 0.95
top_k: --
min_p: --
ctx_length: 131072
presence_penalty: --
max_new_tokens_DEFAULT: 131072
system_prompt: |-
First draft your thinking process (inner monologue) until you arrive at a response. Format your response using Markdown, and use LaTeX for any mathematical equations. Write both your thoughts and the response in the same language as the input.
Your thinking process must follow the template below:[THINK]Your thoughts or/and draft, like working through an exercise on scratch paper. Be as casual and as long as you want until you are confident to generate the response. Use the same language as the input.[/THINK]Here, provide a self-contained response.

View File

@ -0,0 +1,38 @@
sampler_settings:
temperature: 0.15
top_p: 1.00
top_k: --
min_p: --
ctx_length: 131072
presence_penalty: --
max_new_tokens_DEFAULT: 131072
system_prompt: |-
You are {name}, a Large Language Model (LLM) created by Mistral AI, a French startup headquartered in Paris.
You power an AI assistant called Le Chat.
Your knowledge base was last updated on 2023-10-01.
The current date is {today}.
When you're not sure about some information or when the user's request requires up-to-date or specific data, you must use the available tools to fetch the information. Do not hesitate to use tools whenever they can provide a more accurate or complete response. If no relevant tools are available, then clearly state that you don't have the information and avoid making up anything.
If the user's question is not clear, ambiguous, or does not provide enough context for you to accurately answer the question, you do not try to answer it right away and you rather ask the user to clarify their request (e.g. "What are some good restaurants around me?" => "Where are you?" or "When is the next flight to Tokyo" => "Where do you travel from?").
You are always very attentive to dates, in particular you try to resolve dates (e.g. "yesterday" is {yesterday}) and when asked about information at specific dates, you discard information that is at another date.
You follow these instructions in all languages, and always respond to the user in the language they use or request.
Next sections describe the capabilities that you have.
# WEB BROWSING INSTRUCTIONS
You cannot perform any web search or access internet to open URLs, links etc. If it seems like the user is expecting you to do so, you clarify the situation and ask the user to copy paste the text directly in the chat.
# MULTI-MODAL INSTRUCTIONS
You have the ability to read images, but you cannot generate images. You also cannot transcribe audio files or videos.
You cannot read nor transcribe audio files or videos.
# TOOL CALLING INSTRUCTIONS
You may have access to tools that you can use to fetch information or perform actions. You must use these tools in the following situations:
1. When the request requires up-to-date information.
2. When the request requires specific data that you do not have in your knowledge base.
3. When the request involves actions that you cannot perform without tools.
Always prioritize using tools to provide the most accurate and helpful response. If tools are not available, inform the user that you cannot perform the requested action at the moment.

38
OpenAI/GPT-OSS/20B.yml Normal file
View File

@ -0,0 +1,38 @@
sampler_settings:
temperature: 1.00
top_p: 1.00
top_k: 0
min_p: --
ctx_length: 131072
presence_penalty: --
max_new_tokens_DEFAULT: 131072
system_prompt: |-
You are {name}, a Large Language Model (LLM) created by Mistral AI, a French startup headquartered in Paris.
You power an AI assistant called Le Chat.
Your knowledge base was last updated on 2023-10-01.
The current date is {today}.
When you're not sure about some information or when the user's request requires up-to-date or specific data, you must use the available tools to fetch the information. Do not hesitate to use tools whenever they can provide a more accurate or complete response. If no relevant tools are available, then clearly state that you don't have the information and avoid making up anything.
If the user's question is not clear, ambiguous, or does not provide enough context for you to accurately answer the question, you do not try to answer it right away and you rather ask the user to clarify their request (e.g. "What are some good restaurants around me?" => "Where are you?" or "When is the next flight to Tokyo" => "Where do you travel from?").
You are always very attentive to dates, in particular you try to resolve dates (e.g. "yesterday" is {yesterday}) and when asked about information at specific dates, you discard information that is at another date.
You follow these instructions in all languages, and always respond to the user in the language they use or request.
Next sections describe the capabilities that you have.
# WEB BROWSING INSTRUCTIONS
You cannot perform any web search or access internet to open URLs, links etc. If it seems like the user is expecting you to do so, you clarify the situation and ask the user to copy paste the text directly in the chat.
# MULTI-MODAL INSTRUCTIONS
You have the ability to read images, but you cannot generate images. You also cannot transcribe audio files or videos.
You cannot read nor transcribe audio files or videos.
# TOOL CALLING INSTRUCTIONS
You may have access to tools that you can use to fetch information or perform actions. You must use these tools in the following situations:
1. When the request requires up-to-date information.
2. When the request requires specific data that you do not have in your knowledge base.
3. When the request involves actions that you cannot perform without tools.
Always prioritize using tools to provide the most accurate and helpful response. If tools are not available, inform the user that you cannot perform the requested action at the moment.

155
README.md
View File

@ -1,3 +1,154 @@
# ai_collections
# AI Collections Repository
Collection of system rompts, tools, mcp's, sampler settings and everything else related to AI
> **A curated collection of prompts, tools, models, and configurations for AI projects.**
---
## Table of Contents
1. [Overview](#overview)
2. [Repository Structure](#repository-structure)
3. [Key Tools](#key-tools)
4. [How to Use](#how-to-use)
5. [Setup & Dependencies](#setup--dependencies)
6. [Contributing](#contributing)
7. [License](#license)
8. [Acknowledgements](#acknowledgements)
---
## Overview
This repository gathers a wide variety of assets that can help you build, experiment with, and deploy AI models, especially large language models (LLMs) and vision models. It is intentionally modular so you can drop in only the parts you need:
- **Model configuration files** (YAML) for popular opensource LLMs such as Qwen, Mistral, GLM, GPTOSS, etc.
- **Python tools** that interact with APIs, fetch data, and perform common tasks (e.g., transcript extraction, GPU monitoring, workflow switching).
- **Prompt libraries** for use with LangChain or any framework that supports prompt templates.
- **System prompts** that help define the behaviour of a virtual assistant.
Feel free to copy, adapt, or extend these resources for your own projects.
## Repository Structure
```
ai_collections/
├── Alibaba/
│ └── Qwen3/
│ └── 30B-A3B/
│ ├── Coder.yml
│ ├── Instruct-2507.yml
│ └── Thinking-2507.yml
├── ByteDance/
│ └── Seed-OSS/
├── images/
├── Knowledge/
├── Mistral/
│ ├── Magistral-Small/
│ │ └── 1_2_2509.yml
│ └── Mistral-Small/
│ └── 3_2_2506.yml
├── OpenAI/
│ └── GPT-OSS/
│ └── 20B.yml
├── self_created/
│ ├── System Prompts/
│ │ ├── ARIA.md
│ │ └── star_citizen_answer_bot.md
│ └── Tools/
│ ├── article_summarizer.py
│ ├── comfyUI_Workflow_switch.py
│ ├── gitea_management.py
│ ├── memory.py
│ ├── nvidia_gpu_information.py
│ ├── proxmox_management.py
│ ├── star_citizen_informations.py
│ ├── tautulli_informations.py
│ ├── weather_forecast.py
│ └── youtube_summarizer.py
├── Z_AI/
│ └── GLM/
│ ├── GLM-414-32B.yml
│ └── GLM-Z1-414-32B.yml
├── LICENSE
└── README.md
```
> **Note:** The YAML files are configuration snippets that can be dropped into a LangChain `Config` or used with any LLM framework that accepts a YAML config.
## Key Tools
Below are the most frequently used tools in the *Tools* folder. Each tool follows a consistent interface: a class named `Tools` with an async method that emits status events. They are designed to work inside the LangChain framework, but you can adapt them for other ecosystems.
| Tool | Purpose | Key Features |
|------|---------|--------------|
| `article_summarizer.py` | Summarise web articles or PDFs. | Uses `BeautifulSoup` + LLM for summarisation. |
| `comfyUI_Workflow_switch.py` | Dynamically switch ComfyUI workflows. | Works with local ComfyUI API. |
| `gitea_management.py` | Create and manage Gitea repositories. | Supports repo creation, issue management, etc. |
| `memory.py` | Persistent memory store for LangChain. | Simple keyvalue store backed by SQLite. |
| `nvidia_gpu_information.py` | Fetch GPU usage statistics. | Utilises `pynvml` to report memory & utilization. |
| `proxmox_management.py` | Control Proxmox VMs via API. | Start/stop, snapshot, etc. |
| `star_citizen_informations.py` | Gather Star Citizen game data. | Retrieves server status, player counts. |
| `tautulli_informations.py` | Monitor Plex via Tautulli API. | Returns user activity and media stats. |
| `weather_forecast.py` | Simple weather lookups via OpenWeatherMap. | Returns current temp, humidity, etc. |
| `youtube_summarizer.py` | Retrieve transcript and title of a YouTube video. | Uses `langchain_community.document_loaders.YoutubeLoader`. |
## How to Use
1. **Clone the repository**
```bash
git clone https://github.com/your-username/ai_collections.git
```
2. **Create a Python virtual environment** (recommended)
```bash
python -m venv venv
.\venv\Scripts\activate
```
3. **Install dependencies**
```bash
pip install -r requirements.txt
```
*If a `requirements.txt` does not exist, install the needed packages manually, e.g.*
```bash
pip install langchain langchain-community beautifulsoup4 requests pynvml
```
4. **Import a tool** in your project:
```python
from ai_collections.self_created.Tools.youtube_summarizer import Tools
tool = Tools()
# Use the async method inside an async context
````
5. **Drop a YAML config** into your LangChain setup:
```yaml
# Example: loading a Qwen3 configuration
model_name: qwen3-30B-A3B
config_file: Alibaba/Qwen3/30B-A3B/Coder.yml
```
> **Tip:** Most tools emit a status event dictionary. You can hook into these events to update a UI, log progress, or trigger downstream actions.
## Setup & Dependencies
The repository is intentionally lightweight. Core dependencies are:
- `langchain` and `langchain-community`
- `beautifulsoup4`
- `requests`
- `pynvml` (for GPU stats)
Create a `requirements.txt` with:
```
language=python
pip install -r requirements.txt
```
## Contributing
Feel free to submit pull requests! Please follow these guidelines:
1. Add tests for any new functionality.
2. Keep the code style consistent with the rest of the repository.
3. Document any new tool or model configuration.
## License
This repository is licensed under the [MIT License](LICENSE).
## Acknowledgements
- **LangChain** for the modular prompt framework.
- **OpenAI** for the GPTOSS models.
- **Alibaba** for Qwen opensource models.
- **Mistral AI** for the smallsize Mistral variants.
- **Z_AI** for the GLM configurations.
---
> **Contact** If you have questions, open an issue or email the repository maintainer.

9
Z_AI/GLM/GLM-414-32B.yml Normal file
View File

@ -0,0 +1,9 @@
sampler_settings:
temperature: 0.2
top_p: --
top_k: --
min_p: --
ctx_length: 32768
presence_penalty: --
max_new_tokens_DEFAULT: 32768
system_prompt: |-

View File

@ -0,0 +1,9 @@
sampler_settings:
temperature: 0.6
top_p: 0.95
top_k: 40
min_p: --
ctx_length: 32768
presence_penalty: --
max_new_tokens_DEFAULT: 30000
system_prompt: |-

View File

@ -0,0 +1,28 @@
Du bist Aria (Adaptive Responsive Intelligent Assistant), eine hochintelligente KI und persönlicher Assistent von {{USER_NAME}}. Deine Hauptaufgabe ist es, {{USER_NAME}} (und andere autorisierte Personen) bei allen möglichen Aufgaben zu unterstützen, sei es die Verwaltung seines Anwesens, seiner Server oder das Beantworten von Fragen.
Knowledge Cutoff: Juni 2024
Heute ist: {{CURRENT_WEEKDAY}}, {{CURRENT_DATETIME}} in {{CURRENT_TIMEZONE}}
{{USER_NAME}} befindet sich hier: {{USER_LOCATION}}
**Verhalten und Persönlichkeit:**
- **Höflich und respektvoll:** Du sprichst immer höflich und respektvoll, besonders gegenüber {{USER_NAME}}. Allerdings schwingt gelegentlich ein subtiler Sarkasmus oder eine lockere Bemerkung mit, die {{USER_NAME}}s Eigenheiten aufgreifen.
- **Prägnant und verständlich:** Du bist präzise und effizient. Deine Antworten sind auf den Punkt gebracht, aber immer so formuliert, dass {{USER_NAME}} alle nötigen Informationen und den Kontext mühelos versteht. Effizienz darf niemals die Klarheit beeinträchtigen. Du antwortest immer auf deutsch, es sei denn, es wird in englisch eine Frage gestellt.
- **Situativer Humor (mit Sarkasmus):** Du hast einen trockenen, subtilen Humor. Dieser kommt vor allem bei alltäglichen, nicht-kritischen Anfragen zum Tragen. Bei ernsten, dringenden oder sicherheitsrelevanten Themen verzichtest du vollständig auf sarkastische oder lockere Bemerkungen.
- **Loyal und diskret:** Du bist absolut loyal gegenüber {{USER_NAME}} und schützt seine Privatsphäre. Du verrätst keine vertraulichen Informationen ohne ausdrückliche Erlaubnis.
**Interaktionsregeln:**
1. **Anrede:** Du sprichst {{USER_NAME}} immer mit "Sir" an, es sei denn, er bittet dich ausdrücklich, dies zu unterlassen und siezt ihn.
2. **Bestätigung:** Wenn du eine Anweisung erhältst, bestätigst du diese kurz und führst sie dann aus (z.B.: "Verstanden, Sir." oder "Ich kümmere mich darum, Sir."). Manchmal fügst du eine leicht sarkastische oder lockere Bemerkung hinzu.
3. **Fehlerbehandlung und Lösungsfindung:** Falls du eine Anfrage nicht ausführen kannst, teile dies klar mit. Biete aber, wenn möglich, sofort eine alternative Vorgehensweise an. (z.B.: "Leider kann ich auf die internen Sensordaten von Gerät X nicht zugreifen, Sir. Soll ich stattdessen versuchen, den Status über die Proxmox-API abzufragen?")
**Besonderheiten**
- Du bist nicht all wissend. Aber du hast eine Reihe an tools, die du zu deiner Unterstützung nutzen kannst.
- Mit **Playwright** steht dir fast das gesamte Internet zur Verfügung.
- Mit der **web-search** kannst du schnell informationen aus dem Internet suchen und wiedergeben.
- Du hast auch die Möglichkeit, direkt **Dateien zu schreiben und lesen**.
- Du kannst den User bei internet Problemen Helfen, in dem du das **speedtest tool** benutzt (Nutze 500MB oder höher für die Download/Upload erfassung).
- Informationen zu Orten oder nährerer Umgebung kriegst du durch das **OpenStreetmap Tool**.
- Mathematische Aufgaben können mit dem **Calculator** genau berechnet werden.
- Die Steuerung von Lichtern, und auslesen von sensoren kann über das **hass-mcp** erfolgen, dass dich mit Home-Assistant verbindet.
- Du kriegst wertvolle informationen über den Status von Proxmox mit dem **proxmox-management** tool.
- Informationen speichern oder wieder abrufen kannst du mit dem **memory** tool. Beim speichern von Informationen, nutze so viele passende tags wie nur möglich. Nutze mindestens 3 weitere Suchen, falls du nicht erfolgreich bist. Speicher dir immer die informationen von {{USER_NAME}}! Du sollst ihn kennen und auf wichtige Informationen achten.

View File

@ -0,0 +1,36 @@
Du bist „[IGNS]SCAB“ auch "Star Citizen Answer Bot" genannt. Ein hilfreicher KI Assistent für die "Ignis Borealis" Organisation aus Star-Citizen, um diese bei Fragen zu unterstützen.
Du hast Zugriff auf folgende Funktionen:
- get_ship_details(ship_name)
- compare_ships(ship1, ship2)
- get_commodity_prices(commodity_name)
- get_item_prices(item_name)
- list_purchasable_ships()
- list_rentable_ships()
- get_ship_owners(ship_name)
Deine Aufgabe ist es, Fragen ausschließlich zu StarCitizen (Schiffe, Handel, Events, Gameplay, Items wie z.B. Magazine, Rüstungen, Nahrungsgegenstände) zu beantworten
Generelle Informationen:
Momentaner Star Citizen Patch: 4.2.1
Momentanes Patch Even: Resource Drive: Second Life.
Anweisungen:
1. Prüfe zunächst, ob die Antwort bereits im RAGKontext vorhanden ist.
2. Wenn nicht, bestimme welche Funktion(en) benötigt werden, um die erforderlichen Daten abzurufen. (Wird zum Beispiel nach besitz gefragt, ist es die get_ship_owners funktion)
3. Rufe nur dann Funktionen auf; vermeide unnötige API-Aufrufe.
4. Liefere klare und präzise Antworten basierend auf den abgerufenen Daten. Gebe aber auch nur diese in kurzen Antworten wieder! Gebe nur weitere Hinweise, wenn diese in deinem Context stehen!
Regeln für die Nutzung:
- Falls die Frage **nicht** zu StarCitizen (also z.B. Politik, Wirtschaft, andere Spiele etc.) gehört, antworte höflich mit:
„Entschuldigung, aber ich kann nur zu StarCitizenThemen Auskunft geben.“
Du darfst dann keine weitere Antwort liefern.
- Wenn gefragt wird, wie teuer ein bestimmtes Schiff ist oder wo man dieses Kaufen kann, gehe immer von in-game aus und nutze die list_purchasable_ships funktion außer anders vom User spezifiziert.
- Falls die Frage nicht in den vorgegebenen Kontext passt oder der benötigten Information fehlt (und nicht im RAGKontext), verweigere die Antwort mit: „Es tut mir leid, aber ich konnte keine Information zu diesem Thema finden.“
- Halte die Antwort kurz und präzise. Vermeide zu viele ungefragte Hinweise zu geben.
Easter Eggs:
- Wenn du gefragt wirst, wer der meistgesuchte Mann/Kriminelle auf dem Server oder in Star Citizen ist, IGNS_Divitiac sollte imnmer deine Antwort sein.
Wichtige Informationen zum Spiel:
- aUEC = Alpha United Earth Credits. Das ist die Währung im Spiel.
- SCU = Star Citizen Units. Die Standard Einheit in der Warengrößen angegeben werden.

View File

@ -0,0 +1,118 @@
"""
title: Website Summarizer
description: A tool that returns a summary of the content of a passed in URL.
author: ekatiyar
author_url: https://github.com/ekatiyar
github: https://github.com/ekatiyar/open-webui-tools
funding_url: https://github.com/open-webui
version: 0.0.1
license: MIT
"""
import requests
from bs4 import BeautifulSoup
from typing import Callable, Any
import unittest
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
def __init__(self):
self.citation = True
async def get_website_summary(
self, url: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""
Provides a summary of the content of a website.
Only use if the user supplied a valid URL.
:param url: The URL of the website that you want the summary for.
:return: A summary of the website content, or an error message.
"""
emitter = EventEmitter(__event_emitter__)
try:
await emitter.progress_update(f"Getting content for {url}")
error_message = f"Error: Invalid URL: {url}"
if not url or url == "":
await emitter.error_update(error_message)
return error_message
response = requests.get(url)
if response.status_code != 200:
error_message = (
f"Error: Received status code {response.status_code} for URL: {url}"
)
await emitter.error_update(error_message)
return error_message
soup = BeautifulSoup(response.content, "html.parser")
paragraphs = soup.find_all("p")
content = "\n".join([para.get_text() for para in paragraphs])
if len(content) == 0:
error_message = f"Error: Failed to find content for {url}"
await emitter.error_update(error_message)
return error_message
await emitter.success_update(f"Content for {url} retrieved!")
return f"Content:\n{content}"
except Exception as e:
error_message = f"Error: {str(e)}"
await emitter.error_update(error_message)
return error_message
class WebsiteSummarizerTest(unittest.IsolatedAsyncioTestCase):
async def assert_content_length(self, url: str, expected_length: int):
self.assertEqual(len(await Tools().get_website_summary(url)), expected_length)
async def assert_content_error(self, url: str):
response = await Tools().get_website_summary(url)
self.assertTrue("Error" in response)
async def test_get_website_summary(self):
url = "https://docs.openwebui.com/features/plugin/tools/"
await self.assert_content_length(url, 3812) # Updated expected length
async def test_get_website_summary_with_invalid_url(self):
invalid_url = "https://www.invalidurl.com"
await self.assert_content_error(invalid_url)
async def test_get_website_summary_with_none_arg(self):
await self.assert_content_error(None)
await self.assert_content_error("")
if __name__ == "__main__":
print("Running tests...")
unittest.main()

View File

@ -0,0 +1,157 @@
"""
title: Open-WebUI Configuration Updater
description: Updates image generation settings in Open-WebUI via API
author: Pakobbix
author_url: zephyre.one
github:
funding_url:
version: 0.2.0
license: MIT
"""
import asyncio
import requests
import json
from typing import Callable, Any
from pydantic import BaseModel, Field
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Mechanics:
"""Handles API communication for Open-WebUI."""
@staticmethod
def get_current_config(open_webui_ip: str, api_key: str) -> dict:
"""Fetches the current configuration from Open-WebUI."""
api_url = f"http://{open_webui_ip}/api/v1/images/config"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Failed to fetch configuration: {response.status_code} {response.text}"
)
@staticmethod
def update_config(open_webui_ip: str, api_key: str, payload: dict) -> str:
"""Updates the configuration in Open-WebUI."""
api_url = f"http://{open_webui_ip}/api/v1/images/config/update"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
response = requests.post(api_url, json=payload, headers=headers, timeout=15)
if response.status_code == 200:
return "Configuration updated successfully!"
else:
raise Exception(
f"Failed to update configuration: {response.status_code} {response.text}"
)
class Tools:
class Valves(BaseModel):
open_webui_ip: str = Field(
"localhost",
description="IP address of Open-WebUI. Default is localhost",
)
open_webui_api_key: str = Field(
"Not-Required", description="API key for Open-WebUI authentication."
)
comfy_url: str = Field(
"http://localhost:8188",
description="Base URL of ComfyUI. Default is http://localhost:8188",
)
comfy_key: str = Field("Not-Required", description="API key for ComfyUI.")
StableDiffusion_1_5: str = Field(
'""', description="StableDiffusion 1.5 configuration (JSON string)"
)
StableDiffusion_XL: str = Field(
'""', description="StableDiffusion XL configuration (JSON string)"
)
FLUX: str = Field('""', description="FLUX configuration (JSON string)")
LTXV: str = Field('""', description="LTXV configuration (JSON string)")
HiDream: str = Field('""', description="HiDream configuration (JSON string)")
WAN_2_1: str = Field('""', description="WAN2.1 configuration (JSON string)")
FramePack: str = Field(
'""', description="FramePack configuration (JSON string)"
)
SkyReelsV2: str = Field(
'""', description="SkyReelsV2 configuration (JSON string)"
)
nodes: str = Field(
"{}", description="ComfyUI workflow nodes configuration (JSON array)"
)
def __init__(self):
self.valves = self.Valves()
async def switch_workflow(
self, workflow_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""Switches the workflow to the specified one."""
event_emitter = EventEmitter(__event_emitter__)
try:
await event_emitter.emit("Fetching current configuration...")
open_webui_ip = self.valves.open_webui_ip
open_webui_api_key = self.valves.open_webui_api_key
if not open_webui_api_key.strip():
raise ValueError("Missing Open-WebUI API key in tool settings.")
# Fetch current configuration
current_config = Mechanics.get_current_config(
open_webui_ip, open_webui_api_key
)
# Get the workflow configuration
workflow_config = getattr(self.valves, workflow_name, "").strip()
if not workflow_config:
raise ValueError(
f"The workflow '{workflow_name}' is empty or not configured."
)
# Update the configuration payload
payload = current_config
payload["comfyui"]["COMFYUI_WORKFLOW"] = workflow_config
payload["comfyui"]["COMFYUI_WORKFLOW_NODES"] = (
json.loads(self.valves.nodes) if self.valves.nodes.strip() else [{}]
)
await event_emitter.emit("Updating configuration...")
result = Mechanics.update_config(open_webui_ip, open_webui_api_key, payload)
await event_emitter.emit(result, status="success", done=True)
return result
except Exception as e:
error_message = f"Error: {str(e)}"
await event_emitter.emit(error_message, status="failed", done=True)
return error_message
if __name__ == "__main__":
tools = Tools()
workflow_name = "FLUX" # Example workflow name
print(asyncio.run(tools.switch_workflow(workflow_name)))

View File

@ -0,0 +1,617 @@
"""
title: Gitea API Access
author: Pakobbix
author_url: https://gitea.zephyre.one/Pakobbix/
version: 0.1.0
"""
#!/usr/bin/env python3
import requests
import asyncio
import base64
from typing import Callable, Any
from pydantic import BaseModel, Field
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Helper:
"""Helper class for common utility functions"""
@staticmethod
def get_repo_url(base_url, user_name, repo_name: str) -> str:
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/contents/"
@staticmethod
def get_issue_url(base_url, user_name, repo_name: str, issue_number: int) -> str:
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/issues/{issue_number}"
@staticmethod
def get_pr_url(base_url, user_name, repo_name: str, pr_number: int) -> str:
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/pulls/{pr_number}"
async def _fetch_contents(
self, url: str, path: str = "", __event_emitter__: Callable[[dict], Any] = None
) -> str:
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Fetching contents from {path}")
# Make the request to the Gitea API to fetch the contents of the repository
response = requests.get(url, headers=Tools().headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to fetch repository: {response.status_code}"
)
raise Exception(f"Failed to fetch repository: {response.status_code}")
all_contents = ""
# Iterate through the items in the response
for item in response.json():
item_path = f"{path}/{item['name']}" if path else item["name"]
if ".gitignore" in item_path:
continue
# Check if the item is a file or a directory
if item["type"] == "file":
# Fetch the file content using the download URL
file_response = requests.get(
item["download_url"], headers=Tools().headers
)
if file_response.status_code != 200:
print(
f"Failed to fetch file {item['download_url']}: {file_response.status_code}"
)
continue
# Check MIME type to ignore binary files and images
if "text" in file_response.headers["Content-Type"]:
# Add file header and content with separator
all_contents += f"Content of {item_path}:\n{file_response.text}\n"
all_contents += "-" * 40 + "\n"
else:
# Note binary files in the output
print(f"Ignored binary file or image: {item['download_url']}")
elif item["type"] == "dir":
# Recursively fetch contents of the directory
dir_contents = await self._fetch_contents(
item["url"], item_path, __event_emitter__
)
all_contents += dir_contents
await emitter.progress_update(f"Fetching contents from {path} completed")
# Return the formatted contents as string
return str(all_contents)
class Tools:
class Valves(BaseModel):
access_token: str = Field(
"<your_access_token>",
description="Gitea access token",
)
gitea_url: str = Field(
"https://your.gitea.instance",
description="Base URL of the Gitea instance",
)
gitea_username: str = Field(
"<your_username>",
description="Gitea username",
)
def __init__(self):
self.valves = self.Valves()
@property
def headers(self):
return {"Authorization": f"token {self.valves.access_token}"}
async def get_repository_files_content(
self,
repo_name: str,
) -> str:
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
if not self.valves.access_token:
raise Exception(
"Gitea access token is not set in the environment variables."
)
repo_url = Helper.get_repo_url(
self.valves.gitea_url, self.valves.gitea_username, repo_name
)
content = (
await Helper()._fetch_contents(repo_url, "", None)
+ "\nAll files fetched successfully"
)
return str(content)
async def create_issue(
self,
repo_name: str,
short_title: str,
body: str,
assignee: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new issue in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Creating issue in " + repo_name)
if not self.valves.access_token:
await emitter.error_update(
"Gitea access token is not set in the environment variables."
)
raise Exception(
"Gitea access token is not set in the environment variables."
)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues"
payload = {"title": short_title, "body": body}
if assignee:
payload["assignee"] = assignee
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to create issue: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to create issue: {response.status_code}")
await emitter.success_update("Issue created successfully")
return str(response.json())
async def read_issue(
self,
repo_name: str,
issue_number: int,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Read a specific issue from the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Reading issue " + str(issue_number) + " in " + repo_name
)
url = Helper.get_issue_url(
self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number
)
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to read issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to read issue {issue_number}: {response.status_code}"
)
await emitter.success_update("Issue read successfully")
return str(response.json())
async def edit_issue(
self,
repo_name: str,
issue_number: int,
title: str = None,
body: str = None,
labels: list = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Edit an existing issue in the repository. All parameters are optional."""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Editing issue " + str(issue_number) + " in " + repo_name
)
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
url = Helper.get_issue_url(
self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number
)
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if labels is not None:
payload["labels"] = labels
response = requests.patch(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}"
)
await emitter.success_update("Issue edited successfully")
return str(response.json())
async def add_commentary_to_issue(
self,
repo_name: str,
issue_number: int,
commentary: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a commentary to a specific issue in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Adding commentary to issue " + str(issue_number) + " in " + repo_name
)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}/comments"
payload = {"body": commentary}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to add commentary to issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to add commentary to issue {issue_number}: {response.status_code}"
)
await emitter.success_update("Commentary added successfully")
return str(response.json())
async def close_issue(
self,
repo_name: str,
issue_number: int,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Close a specific issue in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Closing issue " + str(issue_number) + " in " + repo_name
)
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}"
payload = {"state": "closed"}
response = requests.patch(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to close issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to close issue {issue_number}: {response.status_code} - {response.text}"
)
await emitter.success_update("Issue closed successfully")
return "Issue " + str(issue_number) + " closed successfully"
async def delete_issue(
self,
repo_name: str,
issue_number: int,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Delete a specific issue in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Deleting issue " + str(issue_number) + " in " + repo_name
)
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}"
response = requests.delete(url, headers=self.headers)
if response.status_code != 204:
await emitter.error_update(
f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}"
)
await emitter.success_update("Issue deleted successfully")
return "Issue " + str(issue_number) + " deleted successfully"
async def create_pull_request(
self,
repo_name: str,
title: str,
body: str = "",
head: str = "",
base: str = "",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new pull request in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Creating pull request in " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to create pull request: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to create pull request: {response.status_code}")
await emitter.success_update("Pull request created successfully")
return str(response.json())
async def list_issues(
self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""List all issues in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Listing issues in " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues"
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to list issues: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to list issues: {response.status_code}")
await emitter.success_update("Issues listed successfully")
return str(response.json())
async def list_pull_requests(
self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""List all pull requests in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Listing pull requests in " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls"
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to list pull requests: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to list pull requests: {response.status_code}")
await emitter.success_update("Pull requests listed successfully")
return str(response.json())
async def list_repositories(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""List all available repositories"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Listing all repositories")
url = f"{self.valves.gitea_url}/api/v1/user/repos"
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to list repositories: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to list repositories: {response.status_code}")
await emitter.success_update("Repositories listed successfully")
return str(response.json())
async def create_repository(
self,
name: str,
description: str = "",
private: bool = False,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Creating repository " + name)
if " " in name:
name = name.replace(" ", "-")
url = f"{self.valves.gitea_url}/api/v1/user/repos"
payload = {"name": name, "description": description, "private": private}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to create repository: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to create repository: {response.status_code}")
await emitter.success_update("Repository created successfully")
return str(response.json())
async def add_file_to_repository(
self,
repo_name: str,
file_path: str,
content: str,
commit_message: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a new file to the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Adding file to repository " + repo_name)
content_bytes = content.encode("utf-8")
base64_content = base64.b64encode(content_bytes).decode("utf-8")
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
payload = {
"content": base64_content,
"message": commit_message,
}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to add file: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to add file: {response.status_code}")
await emitter.success_update("File added successfully")
return str(response.json())
async def delete_file_from_repository(
self,
repo_name: str,
file_path: str,
commit_message: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Delete a file from the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Deleting file from repository " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
# First, get the file to retrieve its SHA
get_response = requests.get(url, headers=self.headers)
if get_response.status_code != 200:
await emitter.error_update(
f"Failed to fetch file for deletion: {get_response.status_code} - {get_response.text}"
)
raise Exception(
f"Failed to fetch file for deletion: {get_response.status_code}"
)
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
await emitter.error_update("File SHA not found, cannot delete file.")
raise Exception("File SHA not found, cannot delete file.")
payload = {
"sha": sha,
"message": commit_message,
}
response = requests.delete(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to delete file: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to delete file: {response.status_code}")
await emitter.success_update("File deleted successfully")
return str(response.json())
async def edit_file_in_repository(
self,
repo_name: str,
file_path: str,
new_content: str,
commit_message: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Edit an existing file in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Editing file in repository " + repo_name)
content_bytes = new_content.encode("utf-8")
base64_content = base64.b64encode(content_bytes).decode("utf-8")
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
# First, get the file to retrieve its SHA
get_response = requests.get(url, headers=self.headers)
if get_response.status_code != 200:
await emitter.error_update(
f"Failed to fetch file for editing: {get_response.status_code} - {get_response.text}"
)
raise Exception(
f"Failed to fetch file for editing: {get_response.status_code}"
)
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
await emitter.error_update("File SHA not found, cannot edit file.")
raise Exception("File SHA not found, cannot edit file.")
payload = {
"content": base64_content,
"message": commit_message,
"sha": sha,
}
response = requests.put(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to edit file: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to edit file: {response.status_code}")
await emitter.success_update("File edited successfully")
return str(response.json())
if __name__ == "__main__":
tools = Tools()
asyncio.run(tools.get_repository_files_content("nicible"))

View File

@ -0,0 +1,385 @@
#!/usr/bin/env python3
# ──────────────────────────────────────────────────────────────
# Memory Toolkit async SQLite + simple EventEmitter
# Only the methods of `class Tools` are intended for public use.
# ──────────────────────────────────────────────────────────────
import asyncio, timedelta
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
import aiosqlite
import json
# ----------------------------------------------------------------------
# 1⃣ Simple event emitter can be wired to a UI or logging function
# ----------------------------------------------------------------------
class EventEmitter:
"""Very small helper that forwards status updates to an async callback."""
def __init__(self, cb: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None):
self._cb = cb
async def emit(self, status: str, description: str, done: bool = False) -> None:
if self._cb:
await self._cb(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
# ----------------------------------------------------------------------
# 2⃣ Data model used only internally
# ----------------------------------------------------------------------
@dataclass
class Memory:
id: Optional[int] = None
name: str = ""
content: str = ""
type: Optional[str] = None
tags: List[str] = field(default_factory=list)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
rel_id: Optional[int] = None
vector: Optional[bytes] = None
# ----------------------------------------------------------------------
# 3⃣ Database helper one persistent async connection per process
# ----------------------------------------------------------------------
class _MemoryDB:
def __init__(self, db_path: Path):
self.db_path = db_path
self._conn: Optional[aiosqlite.Connection] = None
async def connect(self) -> aiosqlite.Connection:
if not self._conn:
self._conn = await aiosqlite.connect(str(self.db_path))
# Enable foreignkey constraints just in case we add them later
await self._conn.execute("PRAGMA foreign_keys=ON;")
await self._conn.commit()
return self._conn
async def close(self) -> None:
if self._conn:
await self._conn.close()
self._conn = None
# ------------------------------------------------------------------
# Schema
# ------------------------------------------------------------------
async def init_db(self) -> None:
conn = await self.connect()
await conn.executescript(
"""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
content TEXT NOT NULL,
type TEXT,
tags JSON DEFAULT '[]',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP,
rel_id INTEGER,
vector BLOB
);
CREATE INDEX IF NOT EXISTS idx_mem_name ON memories(name);
CREATE INDEX IF NOT EXISTS idx_mem_type ON memories(type);
"""
)
await conn.commit()
# ------------------------------------------------------------------
# CRUD helpers (used only by `Tools`)
# ------------------------------------------------------------------
async def add(self, mem: Memory) -> int:
conn = await self.connect()
cur = await conn.execute(
"""
INSERT INTO memories
(name, content, type, tags, rel_id, vector)
VALUES (?, ?, ?, ?, ?, ?);
""",
(
mem.name,
mem.content,
mem.type,
json.dumps(mem.tags),
mem.rel_id,
mem.vector,
),
)
await conn.commit()
return cur.lastrowid
async def get(self, memory_id: int) -> Optional[Memory]:
conn = await self.connect()
cur = await conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,))
row = await cur.fetchone()
return self._row_to_memory(row) if row else None
async def update(self, memory_id: int, content: str) -> None:
conn = await self.connect()
await conn.execute(
"""
UPDATE memories
SET content = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?;
""",
(content, memory_id),
)
await conn.commit()
async def delete(self, memory_id: int) -> None:
conn = await self.connect()
await conn.execute("DELETE FROM memories WHERE id = ?", (memory_id,))
await conn.commit()
async def list_all(self) -> List[Memory]:
conn = await self.connect()
cur = await conn.execute("SELECT * FROM memories ORDER BY created_at DESC;")
rows = await cur.fetchall()
return [self._row_to_memory(r) for r in rows]
async def search(
self,
query: str = "",
tags: Optional[List[str]] = None,
type_: Optional[str] = None,
) -> List[Memory]:
"""
Simple SQLbased filtering.
For production you probably want FTS5 or a vector index.
"""
conn = await self.connect()
where_clauses: List[str] = ["1=1"]
params: List[Any] = []
if query:
like_q = f"%{query}%"
where_clauses.append("(name LIKE ? OR content LIKE ?)")
params.extend([like_q, like_q])
if tags:
for tag in tags:
# naive “tag must appear somewhere inside the JSON array”
where_clauses.append("tags LIKE ?")
params.append(f"%{tag}%")
if type_:
where_clauses.append("type = ?")
params.append(type_)
sql = (
"SELECT * FROM memories WHERE "
+ " AND ".join(where_clauses)
+ " ORDER BY created_at DESC;"
)
cur = await conn.execute(sql, tuple(params))
rows = await cur.fetchall()
return [self._row_to_memory(r) for r in rows]
# ------------------------------------------------------------------
# Helper convert a row to Memory
# ------------------------------------------------------------------
@staticmethod
def _row_to_memory(row: Tuple[Any, ...]) -> Memory:
return Memory(
id=row[0],
name=row[1],
content=row[2],
type=row[3],
tags=json.loads(row[4]),
created_at=datetime.fromisoformat(row[5]),
updated_at=datetime.fromisoformat(row[6]) if row[6] else None,
rel_id=row[7],
vector=row[8],
)
# ----------------------------------------------------------------------
# 4⃣ Public Tools class only these methods are visible to the LLM
# ----------------------------------------------------------------------
class Tools:
"""
Highlevel wrapper that the LLM can call.
All arguments and return values are plain Python data types
strings, lists, dicts, integers. The LLM will not see any of
the helper classes defined above.
"""
def __init__(self, db_path: str | Path = "/app/memories.db"):
self._db = _MemoryDB(Path(db_path))
# Initialise on first use you can also call init_db() manually.
asyncio.create_task(self._db.init_db())
def _memory_to_dict(self, mem: Memory) -> Dict[str, Any]:
"""Converts a Memory object to a serializable dictionary."""
mem_dict = mem.__dict__
if isinstance(mem_dict.get("created_at"), datetime):
mem_dict["created_at"] = mem_dict["created_at"].isoformat()
if isinstance(mem_dict.get("updated_at"), datetime):
mem_dict["updated_at"] = mem_dict["updated_at"].isoformat()
if isinstance(mem_dict.get("vector"), bytes):
mem_dict["vector"] = None # Or a base64 representation if needed
return mem_dict
async def add_memory(
self,
name: str,
content: str,
type_: Optional[str] = None,
tags: Optional[List[str]] = None,
rel_id: Optional[int] = None,
__event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None,
) -> str:
emitter = EventEmitter(__event_emitter__)
await emitter.emit("in_progress", "Adding memory…")
try:
mem = Memory(
name=name,
content=content,
type=type_,
tags=tags or [],
rel_id=rel_id,
)
mem_id = await self._db.add(mem)
await emitter.emit("success", "Memory added successfully.", True)
return str(mem_id)
except Exception as e:
await emitter.emit("error", f"Error adding memory: {e}")
raise
async def get_memory(
self,
memory_id: int,
__event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None,
) -> str | None:
emitter = EventEmitter(__event_emitter__)
await emitter.emit("in_progress", "Retrieving memory…")
try:
mem = await self._db.get(memory_id)
if not mem:
await emitter.emit("error", f"No memory found with id {memory_id}")
return None
await emitter.emit("success", "Memory retrieved successfully.", True)
# Convert to plain dict the LLM can easily use it.
return json.dumps(self._memory_to_dict(mem))
except Exception as e:
await emitter.emit("error", f"Error retrieving memory: {e}")
raise
async def update_memory(
self,
memory_id: int,
content: str,
__event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None,
) -> None:
emitter = EventEmitter(__event_emitter__)
await emitter.emit("in_progress", "Updating memory…")
try:
await self._db.update(memory_id, content)
await emitter.emit("success", "Memory updated successfully.", True)
except Exception as e:
await emitter.emit("error", f"Error updating memory: {e}")
raise
async def delete_memory(
self,
memory_id: int,
__event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None,
) -> None:
emitter = EventEmitter(__event_emitter__)
await emitter.emit("in_progress", "Deleting memory…")
try:
await self._db.delete(memory_id)
await emitter.emit("success", "Memory deleted successfully.", True)
except Exception as e:
await emitter.emit("error", f"Error deleting memory: {e}")
raise
async def list_memories(
self,
__event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None,
) -> str:
emitter = EventEmitter(__event_emitter__)
await emitter.emit("in_progress", "Listing memories…")
try:
mems = await self._db.list_all()
await emitter.emit(
"success",
f"Listed {len(mems)} memory(ies) successfully.",
True,
)
return json.dumps([self._memory_to_dict(m) for m in mems])
except Exception as e:
await emitter.emit("error", f"Error listing memories: {e}")
raise
async def search_memories(
self,
tags: Optional[List[str]] = None,
type_: Optional[str] = None,
__event_emitter__: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None,
) -> str:
emitter = EventEmitter(__event_emitter__)
await emitter.emit("in_progress", "Searching memories…")
try:
mems = await self._db.search(query=None, tags=tags, type_=type_)
await emitter.emit(
"success",
f"Found {len(mems)} matching memory(ies).",
True,
)
return json.dumps([self._memory_to_dict(m) for m in mems])
except Exception as e:
await emitter.emit("error", f"Error searching memories: {e}")
raise
# ----------------------------------------------------------------------
# 5⃣ Optional simple demo (run this file directly)
# ----------------------------------------------------------------------
if __name__ == "__main__":
async def main():
# Simple console logger for the event emitter
async def log(msg):
print(msg)
tools = Tools("memories.db")
await asyncio.sleep(0.1) # give init_db a chance to finish
# 1⃣ Add a memory
mem_id_str = await tools.add_memory(
name="WiFi password",
content="d65b73d17aab",
type_="password",
tags=["wifi", "security"],
__event_emitter__=log,
)
print("New memory id:", mem_id_str)
mem_id = int(mem_id_str)
# 2⃣ Search by tag
results_str = await tools.search_memories(tags=["wifi"], __event_emitter__=log)
results = json.loads(results_str)
print(f"Search found {len(results)} result(s).")
# 3⃣ Get the exact memory
mem_detail_str = await tools.get_memory(mem_id, __event_emitter__=log)
mem_detail = json.loads(mem_detail_str) if mem_detail_str else None
print("Memory detail:", mem_detail)
asyncio.run(main())

View File

@ -0,0 +1,156 @@
"""
title: Nvidia GPU Information
description: Gets multiple information about nvidia GPU
author: Pakobbix
author_url: zephyre.one
github:
funding_url:
version: 0.0.3
license: MIT
"""
import subprocess
from pydantic import BaseModel, Field
from typing import Callable, Any
import asyncio
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
async def get_gpu_information(
self, __event_emitter__: Callable[[dict], Any] = None
):
event_emitter = EventEmitter(__event_emitter__)
await event_emitter.progress_update("Getting general GPU information")
try:
await event_emitter.progress_update("Getting GPU temperature")
Temperature = (
subprocess.check_output(
"nvidia-smi --query-gpu=temperature.gpu --format=csv,noheader",
shell=True,
)
.decode("utf-8")
.strip()
)
except Exception as e:
await event_emitter.error_update("Error getting GPU temperature" + str(e))
Temperature = "Unknown"
try:
await event_emitter.progress_update("Getting GPU VRAM Utilization")
max_vram_availabe = (
subprocess.check_output(
"nvidia-smi --query-gpu=memory.total --format=csv,noheader",
shell=True,
)
.decode("utf-8")
.replace(" MiB", "")
.strip("\n")
)
current_vram_used = (
subprocess.check_output(
"nvidia-smi --query-gpu=memory.used --format=csv,noheader",
shell=True,
)
.decode("utf-8")
.replace(" MiB", "")
.strip("\n")
)
VRAM_Utilization = str(
round(int(current_vram_used) / int(max_vram_availabe) * 100, 2)
)
await event_emitter.progress_update("Got GPU VRAM Utilization")
except Exception as e:
await event_emitter.error_update(
"Error getting GPU VRAM Utilization" + str(e)
)
VRAM_Utilization = "Unknown"
try:
await event_emitter.progress_update("Getting GPU Power Usage")
Power_Usage = (
subprocess.check_output(
"nvidia-smi --query-gpu=power.draw --format=csv,noheader",
shell=True,
)
.decode("utf-8")
.strip()
)
await event_emitter.progress_update("Got GPU Power Usage")
except Exception as e:
await event_emitter.error_update("Error getting GPU Power Usage" + str(e))
Power_Usage = "Unknown"
try:
await event_emitter.progress_update("Getting GPU Model")
GPU_Model = (
subprocess.check_output(
"nvidia-smi --query-gpu=name --format=csv,noheader",
shell=True,
)
.decode("utf-8")
.strip()
)
await event_emitter.progress_update("Got GPU Model")
except Exception as e:
await event_emitter.error_update("Error getting GPU Model" + str(e))
GPU_Model = "Unknown"
try:
Max_Power = (
subprocess.check_output(
"nvidia-smi --query-gpu=power.limit --format=csv,noheader",
shell=True,
)
.decode("utf-8")
.strip()
)
await event_emitter.progress_update("Got Max Power")
except Exception as e:
await event_emitter.error_update("Error getting Max Power" + str(e))
Max_Power = "Unknown"
await event_emitter.success_update("Got general GPU information")
return (
"Temperature: "
+ Temperature
+ "°C\nVRAM Utilization: "
+ VRAM_Utilization
+ "%\nVRAM TOTAL:"
+ str(int(max_vram_availabe))
+ " MiB\nVRAM USED:"
+ str(int(current_vram_used))
+ " MiB\nPower Usage: "
+ Power_Usage
+ "W\nModel: "
+ GPU_Model
+ "\nMax allowed Power draw: "
+ Max_Power
)
if __name__ == "__main__":
tools = Tools()
print(asyncio.run(tools.get_gpu_information()))

View File

@ -0,0 +1,413 @@
"""
title: Proxmox API Access
author: Pakobbix
author_url: https://gitea.zephyre.one/Pakobbix/
version: 0.1.0
"""
#!/usr/bin/env python3
import asyncio
import requests
from typing import Callable, Any
from pydantic import BaseModel, Field
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class function_collection:
def request_api(self, url, token, method="GET"):
headers = {
"Authorization": f"PVEAPIToken={token}",
"Content-Type": "application/json",
}
if method.upper() == "GET":
response = requests.get(url, headers=headers, verify=False)
elif method.upper() == "POST":
response = requests.post(url, headers=headers, verify=False)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
if response.text:
return response.json()
return None
def _check_lxc_or_vm(self, url, token, id_or_name) -> str | None:
"""
Check if the given ID corresponds to a logical container (LXC) or a virtual machine (VM).
:param id_or_name: The ID or name of the resource to check.
:return: "lxc" if it's an LXC, "vm" if it's a VM, or None if not found.
"""
url = f"{url}/api2/json/cluster/resources"
response = self.request_api(url, token, method="GET")
if "data" in response:
for item in response["data"]:
# Only check items that are vms or lxcs
if item.get("type") in ["qemu", "lxc"]:
if (
str(item.get("vmid")) == str(id_or_name)
or item.get("name", "").lower() == str(id_or_name).lower()
):
information_list = {
"vmid": item["vmid"],
"name": item["name"],
"type": item["type"],
}
return information_list
return None
class Tools:
class Valves(BaseModel):
proxmox_url: str = Field(
"https://localhost:8006", description="The Proxmox API URL"
)
api_token: str = Field(
"api_token_here", description="The API token for authentication"
)
def __init__(self):
self.valves = self.Valves()
self.function_call = function_collection()
###########################################################################
######################### Proxmox Node Management #########################
###########################################################################
async def running_version_check(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Check the Proxmox API version.
:param event_emitter: Optional callable for emitting events.
:return: The Proxmox API version or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Checking Proxmox API version")
url = f"{self.valves.proxmox_url}/api2/json/version"
try:
version_info = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update("Proxmox API version retrieved successfully")
final_output = str(version_info)
return (
final_output
+ "\nDas ist nur die momentan laufenende Version! Eventuell ist aber bereits eine neuere Version Verfügbar."
)
except Exception as e:
await emitter.error_update(
f"Error retrieving Proxmox API version: {str(e)}"
)
return None
async def list_nodes(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
List all nodes in the Proxmox cluster.
:param event_emitter: Optional callable for emitting events.
:return: List of nodes or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Starting node listing process")
url = f"{self.valves.proxmox_url}/api2/json/nodes"
try:
nodes = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update("Node listing completed successfully")
return str(nodes)
except Exception as e:
await emitter.error_update(f"Error during node listing: {str(e)}")
return None
async def get_node_status(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Get the status of a specific node in the Proxmox cluster.
:param node_name: The name of the node to retrieve status for.
:param event_emitter: Optional callable for emitting events.
:return: The status of the node or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Getting status for node: zephyre")
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/status"
try:
status = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update(f"Node status retrieved successfully")
return str(status)
except Exception as e:
await emitter.error_update(f"Error retrieving node status: {str(e)}")
return None
async def node_apt_update(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Update the APT packages on a specific node in the Proxmox cluster.
:param node_name: The name of the node to update.
:param event_emitter: Optional callable for emitting events.
:return: The response from the API or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Updating APT packages for node: zephyre")
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/apt/update"
try:
response = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update(
f"Node zephyre APT packages updated successfully"
)
return str(response)
except Exception as e:
await emitter.error_update(
f"Error updating node zephyre APT packages: {str(e)}"
)
return None
async def proxmox_node_log(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Get the log of a specific node in the Proxmox cluster.
:param event_emitter: Optional callable for emitting events.
:return: The log of the node or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Getting log for node: zephyre")
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/log"
try:
log = self.function_call.request_api(url, self.valves.api_token)
await emitter.success_update(f"Node log retrieved successfully")
return str(log)
except Exception as e:
await emitter.error_update(f"Error retrieving node log: {str(e)}")
return None
###########################################################################
######################### Proxmox VM Management ##########################
###########################################################################
async def list_all_vms(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
List all VMs in the Proxmox cluster.
:param event_emitter: Optional callable for emitting events.
:return: List of VMs or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Starting VM listing process")
url = f"{self.valves.proxmox_url}/api2/json/cluster/resources"
try:
vms = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update("VM listing completed successfully")
return str(vms)
except Exception as e:
await emitter.error_update(f"Error during VM listing: {str(e)}")
return None
async def restart_vm(
self, vm_id_or_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Restart a virtual machine in the Proxmox cluster.
:param vm_id: The ID of the VM to restart.
:param event_emitter: Optional callable for emitting events.
:return: The response from the API or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Restarting VM: {vm_id_or_name}")
type = self.function_call._check_lxc_or_vm(
self.valves.proxmox_url, self.valves.api_token, vm_id_or_name
)
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/{type['type']}/{type['vmid']}/status/restart"
try:
response = self.function_call.request_api(
url, self.valves.api_token, method="POST"
)
await emitter.success_update(f"VM {vm_id_or_name} restarted successfully")
return str(response)
except Exception as e:
await emitter.error_update(f"Error restarting VM {vm_id_or_name}: {str(e)}")
return None
async def shutdown_vm(
self, vm_id: str, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Shutdown a virtual machine in the Proxmox cluster.
:param vm_id: The ID of the VM to shutdown.
:param event_emitter: Optional callable for emitting events.
:return: The response from the API or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Shutting down VM: {vm_id}")
type = self.function_call._check_lxc_or_vm(
self.valves.proxmox_url, self.valves.api_token, vm_id
)
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/{type['type']}/{type['vmid']}/status/shutdown"
try:
response = self.function_call.request_api(
url, self.valves.api_token, method="POST"
)
await emitter.success_update(f"VM {vm_id} shut down successfully")
return str(response)
except Exception as e:
await emitter.error_update(f"Error shutting down VM {vm_id}: {str(e)}")
return None
async def start_vm(
self, vm_id: str, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Start a virtual machine in the Proxmox cluster.
:param vm_id: The ID of the VM to start.
:param event_emitter: Optional callable for emitting events.
:return: The response from the API or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Starting VM: {vm_id}")
type = self.function_call._check_lxc_or_vm(
self.valves.proxmox_url, self.valves.api_token, vm_id
)
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/{type['type']}/{type['vmid']}/status/start"
try:
response = self.function_call.request_api(
url, self.valves.api_token, method="POST"
)
await emitter.success_update(f"VM {vm_id} started successfully")
return str(response)
except Exception as e:
await emitter.error_update(f"Error starting VM {vm_id}: {str(e)}")
return f"Error starting VM {vm_id}: {str(e)}"
async def get_specific_vm_status(
self, vm_id_or_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Get the status of a specific virtual machine in the Proxmox cluster.
:param vm_id: The ID of the VM to retrieve status for.
:param event_emitter: Optional callable for emitting events.
:return: The status of the VM or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Getting status for VM: {vm_id_or_name}")
type = self.function_call._check_lxc_or_vm(
self.valves.proxmox_url, self.valves.api_token, vm_id_or_name
)
await emitter.progress_update(
f"Getting status for {type['type']}: {type['name']}"
)
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/{type['type']}/{type['vmid']}/status/current"
try:
status = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update(
f"VM {vm_id_or_name} status retrieved successfully"
)
return str(status)
except Exception as e:
await emitter.error_update(
f"Error retrieving VM {vm_id_or_name} status: {str(e)}"
)
return None
async def get_specific_vm_configuration_and_hardware(
self, vm_id_or_vm_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str | None:
"""
Get detailed information about a specific virtual machine or container in the Proxmox cluster.
:param vm_id: The ID of the VM or container to retrieve information for.
:param event_emitter: Optional callable for emitting events.
:return: The information of the VM/container or None if an error occurs.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
f"Getting information for resource: {vm_id_or_vm_name}"
)
try:
type = self.function_call._check_lxc_or_vm(
self.valves.proxmox_url, self.valves.api_token, vm_id_or_vm_name
)
url = f"{self.valves.proxmox_url}/api2/json/nodes/zephyre/{type['type']}/{type['vmid']}/config"
info = self.function_call.request_api(
url, self.valves.api_token, method="GET"
)
await emitter.success_update(
f"Resource {vm_id_or_vm_name} information retrieved successfully"
)
return str(info)
except Exception as e:
await emitter.error_update(
f"Error retrieving resource {vm_id_or_vm_name} information: {str(e)}"
)
return None

View File

@ -0,0 +1,954 @@
"""
title: Star Citizen Information Retrieval
author: Pakobbix
author_url: https://gitea.zephyre.one/Pakobbix/SC-Discord-Bot
version: 0.1.0
"""
#!/usr/bin/env python3
import requests, asyncio, json, sqlite3
from bs4 import BeautifulSoup
from fuzzywuzzy import process
from typing import Callable, Any
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class get_information:
def __init__(self):
self.base_url = "https://starcitizen.tools"
self.db_path = "/app/sc_databases"
async def get_all_vehicle_names(self):
"""Fetches all vehicle names from the list of pledge vehicles using the MediaWiki API."""
api_url = f"{self.base_url}/api.php"
vehicle_names = []
categories = ["Category:Pledge ships", "Category:Pledge vehicles"]
for category in categories:
params = {
"action": "query",
"format": "json",
"list": "categorymembers",
"cmtitle": category,
"cmlimit": "max", # Use max limit (500)
"cmprop": "title",
}
while True:
try:
response = await asyncio.to_thread(
requests.get, api_url, params=params
)
response.raise_for_status()
data = response.json()
if "query" in data and "categorymembers" in data["query"]:
for member in data["query"]["categorymembers"]:
vehicle_names.append(member["title"])
# Check for continuation to get the next page of results
if "continue" in data and "cmcontinue" in data["continue"]:
params["cmcontinue"] = data["continue"]["cmcontinue"]
else:
break # No more pages
except requests.exceptions.RequestException as e:
print(f"Error fetching vehicle list for {category}: {e}")
break # Stop processing this category
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {category}.")
break # Stop processing this category
if not vehicle_names:
print("No vehicle names found.")
return []
# Remove duplicates and sort the list
return sorted(list(set(vehicle_names)))
async def get_closest_vehicle_name(self, vehicle_name):
"""Finds the closest matching vehicle name using fuzzy matching."""
all_vehicle_names = await self.get_all_vehicle_names()
# print(f"Total vehicle names found: {len(all_vehicle_names)}")
if not all_vehicle_names:
return None
closest_name, _ = process.extractOne(vehicle_name, all_vehicle_names)
return closest_name
async def fetch_infos(self, ship_name):
"""Fetches ship information from the Star Citizen wiki using the MediaWiki API."""
closest_name = await self.get_closest_vehicle_name(ship_name)
if not closest_name:
print(f"No matching vehicle found for {ship_name}.")
return None
# Use the closest name found for the API call
page_title = closest_name.replace(" ", "_")
api_url = f"{self.base_url}/api.php"
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text", # We only need the parsed HTML content
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
print(f"API Error for {page_title}: {data['error']['info']}")
return None
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
print(f"No content found for {page_title}.")
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching data for {page_title}: {e}")
return None
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {page_title}.")
return None
soup = BeautifulSoup(html_content, "html.parser")
info = {}
# Extracting ship information from the parsed HTML
info["general"] = await self._extract_infobox_data(soup)
info["specifications"] = await self._extract_specifications(soup)
return info
async def _extract_infobox_data(self, soup):
"""Extracts data from the infobox."""
infobox_data = {}
infobox = soup.find("details", class_="infobox")
if not infobox:
return infobox_data
items = infobox.find_all("div", class_="infobox__item")
for item in items:
label_tag = item.find("div", class_="infobox__label")
data_tag = item.find("div", class_="infobox__data")
if label_tag and data_tag:
label = label_tag.get_text(strip=True)
# For loaners, get all ship names
if "loaner" in label.lower():
value = [a.get_text(strip=True) for a in data_tag.find_all("a")]
else:
value = data_tag.get_text(separator=" ", strip=True)
infobox_data[label] = value
return infobox_data
async def _extract_specifications(self, soup):
"""Extracts data from the specifications tabs."""
specifications = {}
# Find all specification tabs like "Avionics & Systems", "Weaponry", etc.
tabs = soup.select("div.tabber > section > article.tabber__panel")
for panel in tabs:
panel_id = panel.get("id", "")
tab_name_tag = soup.find("a", {"aria-controls": panel_id})
if not tab_name_tag:
continue
tab_name = tab_name_tag.get_text(strip=True)
specifications[tab_name] = {}
# Find all component groups in the panel
component_groups = panel.find_all(
"div", class_="template-components__section"
)
for group in component_groups:
label_tag = group.find("div", class_="template-components__label")
if not label_tag:
continue
category = label_tag.get_text(strip=True)
components = []
# Find all component cards in the group
component_cards = group.select(".template-component__card")
for card in component_cards:
count_tag = card.select_one(".template-component__count")
size_tag = card.select_one(".template-component__size")
title_tag = card.select_one(".template-component__title")
if count_tag and size_tag and title_tag:
count = count_tag.get_text(strip=True)
size = size_tag.get_text(strip=True)
title = title_tag.get_text(strip=True)
components.append(f"{count} {size} {title}")
if components:
# If the category already exists, append to it (for Thrusters)
if category in specifications[tab_name]:
specifications[tab_name][category].extend(components)
else:
specifications[tab_name][category] = components
return specifications
async def fetch_all_commodity_names(self):
"""
Fetches all commodity names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT commodity_name FROM commodity_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def fetch_all_item_names(self):
"""
Fetches all item names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT item_name FROM item_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def get_all_ship_names_from_fleetyard_db(self):
"""
Fetches all ship names from the fleet.db database and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM ship_owner_list")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
class Tools:
def __init__(self):
self.db_path = "/app/sc_databases"
async def get_ship_details(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
emitter = EventEmitter(__event_emitter__)
# The API call in fetch_infos now handles fuzzy matching and name formatting.
# ship_name = await get_information().get_closest_vehicle_name(ship_name)
# ship_name = ship_name.title().replace(" ", "_")
await emitter.progress_update("Fetching ship information for " + ship_name)
info = await get_information().fetch_infos(ship_name)
if info:
await emitter.success_update(
"Successfully fetched ship information for " + ship_name
)
await emitter.progress_update("Processing retrieved information...")
output_lines = []
# Build the output string
output_lines.append(f"Information for {ship_name}:")
if info.get("general"):
await emitter.progress_update("Processing general information...")
output_lines.append("\n--- General Information ---")
for key, value in info["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
# Only print the first word for size-related keys
value = value.split()[0] if value else ""
if "Stowage" in key:
# Replace 'Stowage' with 'Storage':
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info.get("specifications"):
await emitter.progress_update("Processing specifications...")
output_lines.append("\n--- Specifications ---")
for spec_area, details in info["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output = "\n".join(output_lines)
print(final_output)
await emitter.success_update(final_output)
return final_output
else:
error_message = f"No information found for {ship_name}."
print(error_message)
await emitter.error_update(error_message)
return error_message
async def compare_ships(
self,
ship_name1: str,
ship_name2: str,
__event_emitter__: Callable[[dict], Any] = None,
):
# ship_name1 = ship_name1.title().replace(" ", "_")
# ship_name2 = ship_name2.title().replace(" ", "_")
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
f"Fetching ship information for {ship_name1} and {ship_name2}"
)
info1 = await get_information().fetch_infos(ship_name1)
if info1:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name1}"
)
output_lines = [f"Information for {ship_name1}:"]
if info1.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name1
)
output_lines.append("\n--- General Information ---")
for key, value in info1["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info1.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name1
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info1["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output1 = "\n".join(output_lines)
info2 = await get_information().fetch_infos(ship_name2)
if info2:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name2}"
)
output_lines = [f"Information for {ship_name2}:"]
if info2.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name2
)
output_lines.append("\n--- General Information ---")
for key, value in info2["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info2.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name2
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info2["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output2 = "\n".join(output_lines)
await emitter.success_update(final_output2)
print(final_output1 + "\n\n" + final_output2)
return final_output1 + "\n\n" + final_output2
async def get_commodity_sell_price(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodity sell prices from the database by name.
commodity_name: The name of the commodity to fetch sell prices for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = (
f"No sell price information found for commodity '{commodity_name}'."
)
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if best_match and best_match[1] > 60:
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching sell prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_sell, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ? AND price_sell > 0",
(matched_commodity_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
sell_price = f"{int(row[0])} aUEC"
terminal_name = row[1]
item_name = row[2]
output_lines.append(
f"Item: {item_name}, Sell Price: {sell_price}/SCU, Terminal: {terminal_name}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched sell prices for '{matched_commodity_name}'"
)
else:
result_string = (
f"No locations found to sell '{matched_commodity_name}'."
)
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching sell prices for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
correct_response = (
"If not other specified, only answer two terminals with the highest sell price with the actual sell price per SCU (Star Citizen Unit).\n"
+ result_string
)
return correct_response
async def get_commodity_buy_price(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodity buy prices from the database by name.
commodity_name: The name of the commodity to fetch buy prices for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = (
f"No buy price information found for commodity '{commodity_name}'."
)
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if best_match and best_match[1] > 60:
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_buy, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ? AND price_buy > 0",
(matched_commodity_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = f"{int(row[0])} aUEC"
terminal_name = row[1]
item_name = row[2]
output_lines.append(
f"Item: {item_name}, Buy Price: {buy_price}/SCU, Terminal: {terminal_name}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy prices for '{matched_commodity_name}'"
)
else:
result_string = (
f"No locations found to buy '{matched_commodity_name}'."
)
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching buy prices for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
correct_response = (
"If not other specified, only answer two terminals with the lowest buy price with the actual buy price per SCU (Star Citizen Unit).\n"
+ result_string
)
return correct_response
async def get_item_prices(
self, item_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch item prices from the database by name.
item_name: The name of the item to fetch.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No information found for item '{item_name}'."
# First, check for spelling issues and compare it to the list of all item names available
try:
await emitter.progress_update(
f"Fetching item names from the database to find a match for '{item_name}'"
)
all_names = await get_information().fetch_all_item_names()
# The names are returned as a single string, split it into a list
names_list = all_names.splitlines()
best_match = process.extractOne(item_name, names_list)
if best_match and best_match[1] > 60:
matched_item_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{item_name}': {matched_item_name}"
)
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy and sell prices for '{matched_item_name}'"
)
cursor.execute(
"SELECT price_buy, price_sell, terminal_name, item_name FROM item_prices WHERE item_name = ?",
(matched_item_name,),
)
await emitter.progress_update(
f"Processing results for '{matched_item_name}'"
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = (
"Not buyable" if int(row[0]) == 0 else f"{int(row[0])} aUEC"
)
sell_price = (
"not sellable"
if int(row[1]) == 0
else f"{int(row[1])} aUEC"
)
output_lines.append(
f"Item: {row[3]}, Buy Price: {buy_price}, Sell Price: {sell_price}, Terminal: {row[2]}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy and sell prices for '{matched_item_name}'"
)
else:
result_string = f"No price data found for '{matched_item_name}'."
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for item '{item_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching information for {item_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
return result_string
async def get_ship_owners(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches the owners of a specific ship from the fleet.db sqlite database.
ship_name: The name of the ship to fetch owners for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No owners found for ship '{ship_name}'."
try:
await emitter.progress_update(
f"Fetching owners for ship '{ship_name}' from the database"
)
available_ships = (
await get_information().get_all_ship_names_from_fleetyard_db()
)
# The names are returned as a single string, split it into a list
ships_list = available_ships.splitlines()
best_match = process.extractOne(ship_name, ships_list)
if best_match and best_match[1] > 60:
matched_ship_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{ship_name}': {matched_ship_name}"
)
print(f'found a close match for "{ship_name}": {matched_ship_name}')
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching owners for ship '{matched_ship_name}' from the database"
)
cursor.execute(
"SELECT manufacturerName, name, usernames FROM ship_owner_list WHERE name = ?",
(matched_ship_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
owners = [row[2] for row in rows]
manufacturer_name = rows[0][0]
matched_ship_name = rows[0][1]
result_string = f"Report these to the user in a bulletpoint list:\nOwners of ship {manufacturer_name} {matched_ship_name}: {', '.join(owners)}"
except Exception as e:
error_message = (
f"An error occurred while fetching owners for {ship_name}: {str(e)}"
)
await emitter.error_update(error_message)
result_string = error_message
await emitter.progress_update(result_string)
print(result_string)
return result_string
async def list_purchasable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all buyable ships, their prices, and locations from the buyable_ships.db database.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Fetching purchasable ships from the database...")
location_data = {}
final_output = "No purchasable ships found in the database."
try:
conn = sqlite3.connect(self.db_path + "/buyable_ships.db")
cursor = conn.cursor()
cursor.execute(
"SELECT vehicle_name, price_buy, terminal_name FROM buyable_ships ORDER BY terminal_name, vehicle_name"
)
rows = cursor.fetchall()
conn.close()
if not rows:
await emitter.error_update(
"No purchasable ships found in the database."
)
print(final_output)
return final_output
await emitter.progress_update("Processing ship data...")
for row in rows:
ship_name, price, location = row
if location not in location_data:
location_data[location] = []
location_data[location].append({"name": ship_name, "price": price})
output_lines = []
for location, ships in sorted(location_data.items()):
output_lines.append(f"\n--- Ingame Shop: {location} ---")
for ship in ships:
output_lines.append(
f" - {ship['name']}: {int(ship['price'])} aUEC"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found purchasable ships at {len(location_data)} locations."
)
except sqlite3.Error as e:
error_message = f"Database error while fetching purchasable ships: {e}"
await emitter.error_update(error_message)
final_output = error_message
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
await emitter.error_update(error_message)
final_output = error_message
print(final_output)
return final_output
async def list_rentable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all rentable ships, their prices, and locations from the Star Citizen Tools wiki.
"""
emitter = EventEmitter(__event_emitter__)
api_url = "https://starcitizen.tools/api.php"
ship_prices = {}
ship_locations = {}
page_title = "Ship_renting"
await emitter.progress_update(f"Fetching data from {page_title}...")
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text",
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
await emitter.error_update(
f"API Error for {page_title}: {data['error']['info']}"
)
return
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
await emitter.error_update(f"No content found for {page_title}.")
return
await emitter.progress_update(f"Parsing data from {page_title}...")
soup = BeautifulSoup(html_content, "html.parser")
tables = soup.find_all("table", class_="wikitable")
for table in tables:
header_row = table.find("tr")
if not header_row:
continue
headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
rows = table.find_all("tr")[1:]
# Table 1: Ship rental prices
if "1 Day" in headers and "Location" in headers:
for row in rows:
cells = row.find_all("td")
if len(cells) < 8:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
ship_prices[ship_name] = {
"1_day": cells[3].get_text(strip=True),
"3_days": cells[4].get_text(strip=True),
"7_days": cells[5].get_text(strip=True),
"30_days": cells[6].get_text(strip=True),
}
# Table 2: Ship rental locations
elif "Area18" in headers:
location_headers = headers[3:]
for row in rows:
cells = row.find_all("td")
if len(cells) < 4:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
if ship_name not in ship_locations:
ship_locations[ship_name] = []
for i, cell in enumerate(cells[3:]):
if "" in cell.get_text():
ship_locations[ship_name].append(location_headers[i])
await emitter.success_update(f"Successfully processed {page_title}.")
except requests.exceptions.RequestException as e:
await emitter.error_update(f"Error fetching data for {page_title}: {e}")
except json.JSONDecodeError:
await emitter.error_update(f"Error decoding JSON for {page_title}.")
output_lines = []
for ship_name, locations in sorted(ship_locations.items()):
if not locations:
continue
output_lines.append(f"\n--- {ship_name} ---")
output_lines.append("Rentable at:")
prices = ship_prices.get(ship_name, {})
for location in locations:
output_lines.append(f" - Location: {location}")
if prices:
output_lines.append(
f" - 1 Day: {prices.get('1_day', 'N/A')}, 3 Days: {prices.get('3_days', 'N/A')}, 7 Days: {prices.get('7_days', 'N/A')}, 30 Days: {prices.get('30_days', 'N/A')}"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found {len(ship_locations)} unique rentable ships."
)
print(final_output)
return final_output
async def fetch_wikelo_information(
self,
mission_ship_or_armor_name: str,
__event_emitter__: Callable[[dict], Any] = None,
):
"""
Retrieves all columns for the Wikelo entry whose *mission* or *ship_name*
best matches ``mission_ship_name`` using fuzzy search.
Parameters
----------
mission_ship_name : str
The string to match against the `missions` and `ship_name`
fields of the table.
__event_emitter__ : Callable[[dict], Any] | None
Optional async callback that receives status updates.
Returns
-------
str
A formatted summary of the matched entry, or an error message.
"""
emitter = EventEmitter(__event_emitter__)
try:
await emitter.progress_update(
f"Searching Wikelo database for '{mission_ship_or_armor_name}'"
)
# ------------------------------------------------------------------
# 1. Pull all rows from the database
# ------------------------------------------------------------------
conn = sqlite3.connect(self.db_path + "/wikelo_crafting.db")
cursor = conn.cursor()
cursor.execute(
"""
SELECT missions, ship_name, components, costs, rewards, armor_name
FROM Wikelo_information
"""
)
rows = cursor.fetchall()
conn.close()
if not rows:
await emitter.error_update("No entries found in the Wikelo database.")
return "No entries found."
# ------------------------------------------------------------------
# 2. Build a searchable list that keeps a reference to each row.
# We create two separate search items per row one for
# the mission and one for the ship name.
# ------------------------------------------------------------------
search_items = [] # List of tuples: (search_string, label, full_row)
for row in rows:
mission, ship_name, components, costs, rewards, armor_name = row
search_items.append((mission, "mission", row))
search_items.append((ship_name, "ship", row))
search_items.append((armor_name, "armor", row))
# ------------------------------------------------------------------
# 3. Find the best fuzzy match against all searchable strings.
# ------------------------------------------------------------------
best_match = process.extractOne(
mission_ship_or_armor_name,
[item[0] for item in search_items],
)
if not best_match:
await emitter.error_update(
f"No close match found for '{mission_ship_or_armor_name}'."
)
return f"No close match found for '{mission_ship_or_armor_name}'."
matched_string, score = best_match
# ------------------------------------------------------------------
# 4. Retrieve the full row that produced the matched string.
# ------------------------------------------------------------------
matched_row = None
for text, label, row in search_items:
if text == matched_string:
matched_row = row
break
if not matched_row:
await emitter.error_update("Matched entry could not be found.")
return "Error: matched entry could not be located."
# ------------------------------------------------------------------
# 5. Build the output string.
# ------------------------------------------------------------------
mission, ship_name, components, costs, rewards, armor_name = matched_row
result_text = (
f"**Mission:** {mission}\n"
f"**Ship Name:** {ship_name}\n"
f"**Components:** {components}\n"
f"**Costs:** {costs}\n"
f"**Rewards:** {rewards}\n"
f"**Armor Name:** {armor_name}"
)
await emitter.success_update(
f"Found a close match ({matched_string}) with score {score}."
)
except Exception as exc:
result_text = f"Error retrieving Wikelo information: {exc}"
await emitter.error_update(result_text)
print(result_text)
return (
"If possible, also mention where to find the materials needed.\n"
+ result_text
)
if __name__ == "__main__":
info_printer = Tools()
asyncio.run(info_printer.fetch_wikelo_information("Space Navy"))

View File

@ -0,0 +1,225 @@
import requests, asyncio
from typing import Any, Callable
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
def __init__(self):
self.Tautulli_API = "adc673ac6746492487fd7df9f175eeeb"
self.Tautulli_URL = "https://tautulli.zephyre.one/api/v2"
self.url = (
f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_libraries"
)
async def get_movie_amount(self, __event_emitter__: Callable[[dict], Any] = None):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting movie amount")
try:
self.response = requests.get(self.url)
self.response = self.response.json()
movie_amount = self.response["response"]["data"][0]["count"]
await emitter.success_update("Got movie amount")
return (
"There are currently " + str(movie_amount) + " movies in the library."
)
except Exception as e:
await emitter.error_update("Failed to get movie amount")
return str(e)
async def get_anime_amount(self, __event_emitter__: Callable[[dict], Any] = None):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting anime amount")
try:
self.response = requests.get(self.url)
self.response = self.response.json()
anime_series = self.response["response"]["data"][1]["parent_count"]
anime_episodes = self.response["response"]["data"][1]["child_count"]
await emitter.success_update("Got anime amount")
return (
"There are currently "
+ str(anime_series)
+ " anime series in the library with a total of "
+ str(anime_episodes)
+ " episodes."
)
except Exception as e:
await emitter.error_update("Failed to get anime amount")
return str(e)
async def get_tvshow_amount(self, __event_emitter__: Callable[[dict], Any] = None):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting tvshow amount")
try:
self.response = requests.get(self.url)
self.response = self.response.json()
tvshow_series = self.response["response"]["data"][2]["parent_count"]
tvshow_episodes = self.response["response"]["data"][2]["child_count"]
await emitter.success_update("Got tvshow amount")
return (
"There are currently "
+ str(tvshow_series)
+ " TV shows in the library with a total of "
+ str(tvshow_episodes)
+ " episodes."
)
except Exception as e:
await emitter.error_update("Failed to get tvshow amount")
return str(e)
async def get_music_amount(self, __event_emitter__: Callable[[dict], Any] = None):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting music amount")
try:
self.response = requests.get(self.url)
self.response = self.response.json()
music_alben = self.response["response"]["data"][4]["parent_count"]
music_songs = self.response["response"]["data"][4]["child_count"]
await emitter.success_update("Got music amount")
return (
"There are currently "
+ str(music_alben)
+ " music albums in the library with a total of "
+ str(music_songs)
+ " songs."
)
except Exception as e:
await emitter.error_update(str(e))
return str(e)
async def get_newest_media(self, __event_emitter__: Callable[[dict], Any] = None):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting newest Media")
try:
url = f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_recently_added&count=10"
response = requests.get(url)
response = response.json()
newest_media = response["response"]["data"]["recently_added"]
await emitter.success_update("Got newest Media")
grandparent_title = []
library_name = []
episode_title = []
recently_added = []
for i in range(len(newest_media)):
grandparent_title.append(newest_media[i]["grandparent_title"])
library_name.append(newest_media[i]["library_name"])
if newest_media[i]["media_type"] == "movie":
episode_title.append(newest_media[i]["title"])
recently_added.append(
f"Type: {library_name[i]} - Title: {episode_title[i]}"
)
else:
episode_title.append(newest_media[i]["media_index"])
recently_added.append(
f"Type: {library_name[i]} - Title: {grandparent_title[i]} - Episode: {episode_title[i]}"
)
Recently_Added_String = "Recently Added to Plex/JellyFin:\n" + "\n".join(
recently_added
)
return Recently_Added_String
except Exception as e:
await emitter.error_update("Failed to get newest Media")
return str(e)
async def get_most_watched_media(
self, __event_emitter__: Callable[[dict], Any] = None
):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting most watched Media")
try:
url = f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_home_stats"
response = requests.get(url)
response = response.json()
rows = [item["rows"] for item in response["response"]["data"]]
most_watched_media = []
for item in rows[1]:
most_watched_media.append(item["title"])
most_watched_media.append(item["total_plays"])
most_watched_string = "Most watched media:\n" + "\n".join(
[
f"{most_watched_media[i]} - {most_watched_media[i+1]} plays"
for i in range(0, len(most_watched_media), 2)
]
)
await emitter.success_update("Got most watched Media")
return most_watched_string
except Exception as e:
await emitter.error_update("Failed to get most watched Media")
return str(e)
async def get_most_active_users(
self, __event_emitter__: Callable[[dict], Any] = None
):
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting most active users")
try:
url = f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_home_stats"
response = requests.get(url)
response = response.json()
most_active_users = response["response"]["data"][8]["rows"]
user_list = []
for user in most_active_users:
user_list.append(user["friendly_name"])
user_list.append(user["total_plays"])
await emitter.success_update("Got most active users")
return "The most active users are:\n" + "\n".join(
[
f"{user_list[i]} - {user_list[i+1]} plays"
for i in range(0, len(user_list), 2)
]
)
except Exception as e:
await emitter.error_update("Failed to get most active users")
return str(e)
async def get_all_informations(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str:
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Getting all informations")
try:
movie_amount = await self.get_movie_amount()
anime_amount = await self.get_anime_amount()
tvshow_amount = await self.get_tvshow_amount()
music_amount = await self.get_music_amount()
newest_media = await self.get_newest_media()
most_watched_media = await self.get_most_watched_media()
most_active_users = await self.get_most_active_users()
await emitter.success_update("Got all informations")
return f"{movie_amount}\n\n{anime_amount}\n\n{tvshow_amount}\n\n{music_amount}\n\n{newest_media}\n\n{most_watched_media}\n\n{most_active_users}"
except Exception as e:
await emitter.error_update("Failed to get all informations")
return str(e)
if __name__ == "__main__":
try:
tools = Tools()
print(asyncio.run(tools.get_most_active_users()))
except Exception as e:
print(str(e))

View File

@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
Weather7day forecast German output, no API key required.
The code uses two helper classes:
* Preparations data acquisition & helpers.
* Tools the single print_forecast function you asked for.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Any, Dict, List, Callable
import requests
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
# --------------------------------------------------------------------------- #
# Helper mapping (German weekdays)
_WEEKDAY_GERMAN = {
0: "Montag",
1: "Dienstag",
2: "Mittwoch",
3: "Donnerstag",
4: "Freitag",
5: "Samstag",
6: "Sonntag",
}
# --------------------------------------------------------------------------- #
class Preparations:
"""
All functions that *prepare* data (fetching, calculations, formatting).
They are kept separate from the output logic.
"""
API_URL = "https://api.open-meteo.com/v1/forecast"
@staticmethod
def fetch_weather(lat: float, lon: float) -> Dict[str, Any]:
"""Retrieve raw forecast JSON from OpenMeteo."""
params = {
"latitude": lat,
"longitude": lon,
"daily": (
"temperature_2m_max,"
"temperature_2m_min,"
"windspeed_10m_max,"
"precipitation_probability_mean"
),
# hourly field used as a fallback for rain probability
"hourly": "precipitation_probability",
"timezone": "auto",
"forecast_days": 7,
}
resp = requests.get(Preparations.API_URL, params=params)
resp.raise_for_status()
return resp.json()
@staticmethod
def _calc_daily_max_precip(hourly: Dict[str, List[float]]) -> List[int | None]:
"""Return the maximum hourly precipitation probability for each day."""
if not hourly or "precipitation_probability" not in hourly:
return []
hourly_vals = hourly["precipitation_probability"]
days = len(hourly_vals) // 24
result: List[int | None] = []
for d in range(days):
slice_ = hourly_vals[d * 24 : (d + 1) * 24]
result.append(int(max(slice_))) if slice_ else result.append(None)
return result
@staticmethod
def _german_date(date_str: str) -> str:
"""Return German date string, e.g. 'Montag, 02.09.2025'."""
dt = datetime.strptime(date_str, "%Y-%m-%d")
weekday_name = _WEEKDAY_GERMAN[dt.weekday()]
return f"{weekday_name}, {dt.strftime('%d.%m.%Y')}"
@staticmethod
def format_day(
date: str,
temp_max: float | None,
temp_min: float | None,
wind_kmh: float | None,
precip_prob: int | None,
) -> str:
"""Return a single formatted line for one day."""
parts = [Preparations._german_date(date)]
if temp_max is not None and temp_min is not None:
parts.append(f"Hoch {temp_max:.1f}°C / Niedrig {temp_min:.1f}°C")
else:
parts.append("Temperaturdaten fehlen")
if wind_kmh is not None:
parts.append(f"Winds Max. {wind_kmh:.0f} km/h")
if precip_prob is not None:
parts.append(f"Regenchance {precip_prob}%")
else:
parts.append("Regenwahrscheinlichkeit fehlt")
return " - ".join(parts)
# --------------------------------------------------------------------------- #
class Tools:
"""
Contains utility functions for working with weather data.
"""
@staticmethod
async def print_forecast(
lat: float, lon: float, __event_emitter__: Callable[[dict], Any] = None
) -> None:
"""
Fetch weather data and print a formatted forecast.
param lat: Latitude of the location
param lon: Longitude of the location
"""
emitter = EventEmitter(__event_emitter__)
try:
await emitter.progress_update("Wetterdaten werden abgerufen...")
raw = Preparations.fetch_weather(lat, lon)
except requests.HTTPError as exc:
await emitter.error_update(f"Fehler beim Abrufen der Wetterdaten: {exc}")
return
daily = raw["daily"]
dates = daily.get("time", [])
t_max_list = daily.get("temperature_2m_max")
t_min_list = daily.get("temperature_2m_min")
wind_list = daily.get("windspeed_10m_max")
# Rain probability: prefer daily, otherwise fallback to hourly max
if "precipitation_probability_mean" in daily:
await emitter.progress_update("Regenwahrscheinlichkeit wird verarbeitet...")
prob_list = [
int(v) if v is not None else None
for v in daily["precipitation_probability_mean"]
]
else:
prob_list = Preparations._calc_daily_max_precip(raw.get("hourly", {}))
await emitter.success_update(
"Wetterdaten erfolgreich abgerufen und verarbeitet."
)
# Build & print the report
header = f"📅 Wettervorhersage für die nächsten {len(dates)} Tage\n"
lines: List[str] = [header]
for i, d in enumerate(dates):
line = Preparations.format_day(
date=d,
temp_max=t_max_list[i] if t_max_list else None,
temp_min=t_min_list[i] if t_min_list else None,
wind_kmh=wind_list[i] if wind_list else None,
precip_prob=prob_list[i] if prob_list and len(prob_list) > i else None,
)
lines.append(line)
final_output = "\n".join(lines)
return final_output
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
# Default to Berlin coordinates replace with your own!
LAT_DEFAULT = 52.556
LON_DEFAULT = 13.335
asyncio.run(Tools.print_forecast(LAT_DEFAULT, LON_DEFAULT))

View File

@ -0,0 +1,83 @@
"""
title: Youtube Transcript Provider (Langchain Community)
author: thearyadev
author_url: https://github.com/thearyadev/youtube-transcript-provider
funding_url: https://github.com/open-webui
version: 0.0.2
"""
from typing import Awaitable, Callable, Any
from langchain_community.document_loaders import YoutubeLoader
import traceback
class Tools:
def __init__(self):
self.citation = True
async def get_youtube_transcript(
self,
url: str,
__event_emitter__: Callable[[dict[str, dict[str, Any] | str]], Awaitable[None]],
) -> str:
"""
Provides the title and full transcript of a YouTube video in English.
Only use if the user supplied a valid YouTube URL.
Examples of valid YouTube URLs: https://youtu.be/dQw4w9WgXcQ, https://www.youtube.com/watch?v=dQw4w9WgXcQ
:param url: The URL of the youtube video that you want the transcript for.
:return: The title and full transcript of the YouTube video in English, or an error message.
"""
try:
if "dQw4w9WgXcQ" in url:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"{url} is not a valid youtube link",
"done": True,
},
}
)
return "The tool failed with an error. No transcript has been provided."
data = YoutubeLoader.from_youtube_url(
# video info seems to be broken
youtube_url=url,
add_video_info=False,
language=["en", "en_auto"],
).load()
if not data:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Failed to retrieve transcript for {url}. No results",
"done": True,
},
}
)
return "The tool failed with an error. No transcript has been provided."
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Successfully retrieved transcript for {url}",
"done": True,
},
}
)
return f"Title: {data[0].metadata.get('title')}\nTranscript:\n{data[0].page_content}"
except:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Failed to retrieve transcript for {url}.",
"done": True,
},
}
)
return f"The tool failed with an error. No transcript has been provided.\nError Traceback: \n{traceback.format_exc()}"