ai_collections/self_created/Tools/gitea_management.py
Pakobbix 82b8b2d122 Add Tautulli information retrieval, weather forecast, and YouTube transcript tools
- Implemented Tautulli information retrieval in `tautulli_informations.py` to fetch movie, anime, TV show, music amounts, and more.
- Created a weather forecast tool in `weather_forecast.py` that retrieves and formats a 7-day weather forecast in German.
- Developed a YouTube transcript provider in `youtube_summarizer.py` to fetch video transcripts and titles using Langchain Community's YoutubeLoader.
2025-09-26 13:15:10 +02:00

618 lines
22 KiB
Python

"""
title: Gitea API Access
author: Pakobbix
author_url: https://gitea.zephyre.one/Pakobbix/
version: 0.1.0
"""
#!/usr/bin/env python3
import requests
import asyncio
import base64
from typing import Callable, Any
from pydantic import BaseModel, Field
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Helper:
"""Helper class for common utility functions"""
@staticmethod
def get_repo_url(base_url, user_name, repo_name: str) -> str:
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/contents/"
@staticmethod
def get_issue_url(base_url, user_name, repo_name: str, issue_number: int) -> str:
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/issues/{issue_number}"
@staticmethod
def get_pr_url(base_url, user_name, repo_name: str, pr_number: int) -> str:
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/pulls/{pr_number}"
async def _fetch_contents(
self, url: str, path: str = "", __event_emitter__: Callable[[dict], Any] = None
) -> str:
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(f"Fetching contents from {path}")
# Make the request to the Gitea API to fetch the contents of the repository
response = requests.get(url, headers=Tools().headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to fetch repository: {response.status_code}"
)
raise Exception(f"Failed to fetch repository: {response.status_code}")
all_contents = ""
# Iterate through the items in the response
for item in response.json():
item_path = f"{path}/{item['name']}" if path else item["name"]
if ".gitignore" in item_path:
continue
# Check if the item is a file or a directory
if item["type"] == "file":
# Fetch the file content using the download URL
file_response = requests.get(
item["download_url"], headers=Tools().headers
)
if file_response.status_code != 200:
print(
f"Failed to fetch file {item['download_url']}: {file_response.status_code}"
)
continue
# Check MIME type to ignore binary files and images
if "text" in file_response.headers["Content-Type"]:
# Add file header and content with separator
all_contents += f"Content of {item_path}:\n{file_response.text}\n"
all_contents += "-" * 40 + "\n"
else:
# Note binary files in the output
print(f"Ignored binary file or image: {item['download_url']}")
elif item["type"] == "dir":
# Recursively fetch contents of the directory
dir_contents = await self._fetch_contents(
item["url"], item_path, __event_emitter__
)
all_contents += dir_contents
await emitter.progress_update(f"Fetching contents from {path} completed")
# Return the formatted contents as string
return str(all_contents)
class Tools:
class Valves(BaseModel):
access_token: str = Field(
"<your_access_token>",
description="Gitea access token",
)
gitea_url: str = Field(
"https://your.gitea.instance",
description="Base URL of the Gitea instance",
)
gitea_username: str = Field(
"<your_username>",
description="Gitea username",
)
def __init__(self):
self.valves = self.Valves()
@property
def headers(self):
return {"Authorization": f"token {self.valves.access_token}"}
async def get_repository_files_content(
self,
repo_name: str,
) -> str:
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
if not self.valves.access_token:
raise Exception(
"Gitea access token is not set in the environment variables."
)
repo_url = Helper.get_repo_url(
self.valves.gitea_url, self.valves.gitea_username, repo_name
)
content = (
await Helper()._fetch_contents(repo_url, "", None)
+ "\nAll files fetched successfully"
)
return str(content)
async def create_issue(
self,
repo_name: str,
short_title: str,
body: str,
assignee: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new issue in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Creating issue in " + repo_name)
if not self.valves.access_token:
await emitter.error_update(
"Gitea access token is not set in the environment variables."
)
raise Exception(
"Gitea access token is not set in the environment variables."
)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues"
payload = {"title": short_title, "body": body}
if assignee:
payload["assignee"] = assignee
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to create issue: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to create issue: {response.status_code}")
await emitter.success_update("Issue created successfully")
return str(response.json())
async def read_issue(
self,
repo_name: str,
issue_number: int,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Read a specific issue from the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Reading issue " + str(issue_number) + " in " + repo_name
)
url = Helper.get_issue_url(
self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number
)
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to read issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to read issue {issue_number}: {response.status_code}"
)
await emitter.success_update("Issue read successfully")
return str(response.json())
async def edit_issue(
self,
repo_name: str,
issue_number: int,
title: str = None,
body: str = None,
labels: list = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Edit an existing issue in the repository. All parameters are optional."""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Editing issue " + str(issue_number) + " in " + repo_name
)
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
url = Helper.get_issue_url(
self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number
)
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if labels is not None:
payload["labels"] = labels
response = requests.patch(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}"
)
await emitter.success_update("Issue edited successfully")
return str(response.json())
async def add_commentary_to_issue(
self,
repo_name: str,
issue_number: int,
commentary: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a commentary to a specific issue in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Adding commentary to issue " + str(issue_number) + " in " + repo_name
)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}/comments"
payload = {"body": commentary}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to add commentary to issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to add commentary to issue {issue_number}: {response.status_code}"
)
await emitter.success_update("Commentary added successfully")
return str(response.json())
async def close_issue(
self,
repo_name: str,
issue_number: int,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Close a specific issue in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Closing issue " + str(issue_number) + " in " + repo_name
)
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}"
payload = {"state": "closed"}
response = requests.patch(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to close issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to close issue {issue_number}: {response.status_code} - {response.text}"
)
await emitter.success_update("Issue closed successfully")
return "Issue " + str(issue_number) + " closed successfully"
async def delete_issue(
self,
repo_name: str,
issue_number: int,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Delete a specific issue in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
"Deleting issue " + str(issue_number) + " in " + repo_name
)
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}"
response = requests.delete(url, headers=self.headers)
if response.status_code != 204:
await emitter.error_update(
f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}"
)
raise Exception(
f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}"
)
await emitter.success_update("Issue deleted successfully")
return "Issue " + str(issue_number) + " deleted successfully"
async def create_pull_request(
self,
repo_name: str,
title: str,
body: str = "",
head: str = "",
base: str = "",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new pull request in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Creating pull request in " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to create pull request: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to create pull request: {response.status_code}")
await emitter.success_update("Pull request created successfully")
return str(response.json())
async def list_issues(
self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""List all issues in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Listing issues in " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues"
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to list issues: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to list issues: {response.status_code}")
await emitter.success_update("Issues listed successfully")
return str(response.json())
async def list_pull_requests(
self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""List all pull requests in the repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Listing pull requests in " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls"
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to list pull requests: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to list pull requests: {response.status_code}")
await emitter.success_update("Pull requests listed successfully")
return str(response.json())
async def list_repositories(
self, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""List all available repositories"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Listing all repositories")
url = f"{self.valves.gitea_url}/api/v1/user/repos"
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to list repositories: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to list repositories: {response.status_code}")
await emitter.success_update("Repositories listed successfully")
return str(response.json())
async def create_repository(
self,
name: str,
description: str = "",
private: bool = False,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new repository"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Creating repository " + name)
if " " in name:
name = name.replace(" ", "-")
url = f"{self.valves.gitea_url}/api/v1/user/repos"
payload = {"name": name, "description": description, "private": private}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to create repository: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to create repository: {response.status_code}")
await emitter.success_update("Repository created successfully")
return str(response.json())
async def add_file_to_repository(
self,
repo_name: str,
file_path: str,
content: str,
commit_message: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a new file to the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Adding file to repository " + repo_name)
content_bytes = content.encode("utf-8")
base64_content = base64.b64encode(content_bytes).decode("utf-8")
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
payload = {
"content": base64_content,
"message": commit_message,
}
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code != 201:
await emitter.error_update(
f"Failed to add file: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to add file: {response.status_code}")
await emitter.success_update("File added successfully")
return str(response.json())
async def delete_file_from_repository(
self,
repo_name: str,
file_path: str,
commit_message: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Delete a file from the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Deleting file from repository " + repo_name)
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
# First, get the file to retrieve its SHA
get_response = requests.get(url, headers=self.headers)
if get_response.status_code != 200:
await emitter.error_update(
f"Failed to fetch file for deletion: {get_response.status_code} - {get_response.text}"
)
raise Exception(
f"Failed to fetch file for deletion: {get_response.status_code}"
)
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
await emitter.error_update("File SHA not found, cannot delete file.")
raise Exception("File SHA not found, cannot delete file.")
payload = {
"sha": sha,
"message": commit_message,
}
response = requests.delete(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to delete file: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to delete file: {response.status_code}")
await emitter.success_update("File deleted successfully")
return str(response.json())
async def edit_file_in_repository(
self,
repo_name: str,
file_path: str,
new_content: str,
commit_message: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Edit an existing file in the repository"""
if "/" in repo_name:
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Editing file in repository " + repo_name)
content_bytes = new_content.encode("utf-8")
base64_content = base64.b64encode(content_bytes).decode("utf-8")
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
# First, get the file to retrieve its SHA
get_response = requests.get(url, headers=self.headers)
if get_response.status_code != 200:
await emitter.error_update(
f"Failed to fetch file for editing: {get_response.status_code} - {get_response.text}"
)
raise Exception(
f"Failed to fetch file for editing: {get_response.status_code}"
)
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
await emitter.error_update("File SHA not found, cannot edit file.")
raise Exception("File SHA not found, cannot edit file.")
payload = {
"content": base64_content,
"message": commit_message,
"sha": sha,
}
response = requests.put(url, json=payload, headers=self.headers)
if response.status_code != 200:
await emitter.error_update(
f"Failed to edit file: {response.status_code} - {response.text}"
)
raise Exception(f"Failed to edit file: {response.status_code}")
await emitter.success_update("File edited successfully")
return str(response.json())
if __name__ == "__main__":
tools = Tools()
asyncio.run(tools.get_repository_files_content("nicible"))