- Implemented Tautulli information retrieval in `tautulli_informations.py` to fetch movie, anime, TV show, music amounts, and more. - Created a weather forecast tool in `weather_forecast.py` that retrieves and formats a 7-day weather forecast in German. - Developed a YouTube transcript provider in `youtube_summarizer.py` to fetch video transcripts and titles using Langchain Community's YoutubeLoader.
618 lines
22 KiB
Python
618 lines
22 KiB
Python
"""
|
|
title: Gitea API Access
|
|
author: Pakobbix
|
|
author_url: https://gitea.zephyre.one/Pakobbix/
|
|
version: 0.1.0
|
|
"""
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
import requests
|
|
import asyncio
|
|
import base64
|
|
from typing import Callable, Any
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class EventEmitter:
|
|
def __init__(self, event_emitter: Callable[[dict], Any] = None):
|
|
self.event_emitter = event_emitter
|
|
|
|
async def progress_update(self, description):
|
|
await self.emit(description)
|
|
|
|
async def error_update(self, description):
|
|
await self.emit(description, "error", True)
|
|
|
|
async def success_update(self, description):
|
|
await self.emit(description, "success", True)
|
|
|
|
async def emit(self, description="Unknown State", status="in_progress", done=False):
|
|
if self.event_emitter:
|
|
await self.event_emitter(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"status": status,
|
|
"description": description,
|
|
"done": done,
|
|
},
|
|
}
|
|
)
|
|
|
|
|
|
class Helper:
|
|
"""Helper class for common utility functions"""
|
|
|
|
@staticmethod
|
|
def get_repo_url(base_url, user_name, repo_name: str) -> str:
|
|
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/contents/"
|
|
|
|
@staticmethod
|
|
def get_issue_url(base_url, user_name, repo_name: str, issue_number: int) -> str:
|
|
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/issues/{issue_number}"
|
|
|
|
@staticmethod
|
|
def get_pr_url(base_url, user_name, repo_name: str, pr_number: int) -> str:
|
|
return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/pulls/{pr_number}"
|
|
|
|
async def _fetch_contents(
|
|
self, url: str, path: str = "", __event_emitter__: Callable[[dict], Any] = None
|
|
) -> str:
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update(f"Fetching contents from {path}")
|
|
|
|
# Make the request to the Gitea API to fetch the contents of the repository
|
|
response = requests.get(url, headers=Tools().headers)
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to fetch repository: {response.status_code}"
|
|
)
|
|
raise Exception(f"Failed to fetch repository: {response.status_code}")
|
|
|
|
all_contents = ""
|
|
# Iterate through the items in the response
|
|
for item in response.json():
|
|
item_path = f"{path}/{item['name']}" if path else item["name"]
|
|
if ".gitignore" in item_path:
|
|
continue
|
|
# Check if the item is a file or a directory
|
|
if item["type"] == "file":
|
|
# Fetch the file content using the download URL
|
|
file_response = requests.get(
|
|
item["download_url"], headers=Tools().headers
|
|
)
|
|
if file_response.status_code != 200:
|
|
print(
|
|
f"Failed to fetch file {item['download_url']}: {file_response.status_code}"
|
|
)
|
|
continue
|
|
|
|
# Check MIME type to ignore binary files and images
|
|
if "text" in file_response.headers["Content-Type"]:
|
|
# Add file header and content with separator
|
|
all_contents += f"Content of {item_path}:\n{file_response.text}\n"
|
|
all_contents += "-" * 40 + "\n"
|
|
else:
|
|
# Note binary files in the output
|
|
print(f"Ignored binary file or image: {item['download_url']}")
|
|
elif item["type"] == "dir":
|
|
# Recursively fetch contents of the directory
|
|
dir_contents = await self._fetch_contents(
|
|
item["url"], item_path, __event_emitter__
|
|
)
|
|
all_contents += dir_contents
|
|
await emitter.progress_update(f"Fetching contents from {path} completed")
|
|
|
|
# Return the formatted contents as string
|
|
return str(all_contents)
|
|
|
|
|
|
class Tools:
|
|
class Valves(BaseModel):
|
|
access_token: str = Field(
|
|
"<your_access_token>",
|
|
description="Gitea access token",
|
|
)
|
|
gitea_url: str = Field(
|
|
"https://your.gitea.instance",
|
|
description="Base URL of the Gitea instance",
|
|
)
|
|
gitea_username: str = Field(
|
|
"<your_username>",
|
|
description="Gitea username",
|
|
)
|
|
|
|
def __init__(self):
|
|
self.valves = self.Valves()
|
|
|
|
@property
|
|
def headers(self):
|
|
return {"Authorization": f"token {self.valves.access_token}"}
|
|
|
|
async def get_repository_files_content(
|
|
self,
|
|
repo_name: str,
|
|
) -> str:
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
if not self.valves.access_token:
|
|
raise Exception(
|
|
"Gitea access token is not set in the environment variables."
|
|
)
|
|
|
|
repo_url = Helper.get_repo_url(
|
|
self.valves.gitea_url, self.valves.gitea_username, repo_name
|
|
)
|
|
content = (
|
|
await Helper()._fetch_contents(repo_url, "", None)
|
|
+ "\nAll files fetched successfully"
|
|
)
|
|
return str(content)
|
|
|
|
async def create_issue(
|
|
self,
|
|
repo_name: str,
|
|
short_title: str,
|
|
body: str,
|
|
assignee: str = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a new issue in the repository"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Creating issue in " + repo_name)
|
|
|
|
if not self.valves.access_token:
|
|
await emitter.error_update(
|
|
"Gitea access token is not set in the environment variables."
|
|
)
|
|
raise Exception(
|
|
"Gitea access token is not set in the environment variables."
|
|
)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues"
|
|
payload = {"title": short_title, "body": body}
|
|
|
|
if assignee:
|
|
payload["assignee"] = assignee
|
|
|
|
response = requests.post(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 201:
|
|
await emitter.error_update(
|
|
f"Failed to create issue: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to create issue: {response.status_code}")
|
|
|
|
await emitter.success_update("Issue created successfully")
|
|
return str(response.json())
|
|
|
|
async def read_issue(
|
|
self,
|
|
repo_name: str,
|
|
issue_number: int,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Read a specific issue from the repository"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update(
|
|
"Reading issue " + str(issue_number) + " in " + repo_name
|
|
)
|
|
|
|
url = Helper.get_issue_url(
|
|
self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number
|
|
)
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to read issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to read issue {issue_number}: {response.status_code}"
|
|
)
|
|
|
|
await emitter.success_update("Issue read successfully")
|
|
return str(response.json())
|
|
|
|
async def edit_issue(
|
|
self,
|
|
repo_name: str,
|
|
issue_number: int,
|
|
title: str = None,
|
|
body: str = None,
|
|
labels: list = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Edit an existing issue in the repository. All parameters are optional."""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update(
|
|
"Editing issue " + str(issue_number) + " in " + repo_name
|
|
)
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
url = Helper.get_issue_url(
|
|
self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number
|
|
)
|
|
payload = {}
|
|
|
|
if title is not None:
|
|
payload["title"] = title
|
|
if body is not None:
|
|
payload["body"] = body
|
|
if labels is not None:
|
|
payload["labels"] = labels
|
|
|
|
response = requests.patch(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
|
|
await emitter.success_update("Issue edited successfully")
|
|
return str(response.json())
|
|
|
|
async def add_commentary_to_issue(
|
|
self,
|
|
repo_name: str,
|
|
issue_number: int,
|
|
commentary: str,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Add a commentary to a specific issue in the repository"""
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update(
|
|
"Adding commentary to issue " + str(issue_number) + " in " + repo_name
|
|
)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}/comments"
|
|
payload = {"body": commentary}
|
|
response = requests.post(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 201:
|
|
await emitter.error_update(
|
|
f"Failed to add commentary to issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to add commentary to issue {issue_number}: {response.status_code}"
|
|
)
|
|
|
|
await emitter.success_update("Commentary added successfully")
|
|
return str(response.json())
|
|
|
|
async def close_issue(
|
|
self,
|
|
repo_name: str,
|
|
issue_number: int,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Close a specific issue in the repository"""
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update(
|
|
"Closing issue " + str(issue_number) + " in " + repo_name
|
|
)
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}"
|
|
payload = {"state": "closed"}
|
|
response = requests.patch(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to close issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to close issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
|
|
await emitter.success_update("Issue closed successfully")
|
|
return "Issue " + str(issue_number) + " closed successfully"
|
|
|
|
async def delete_issue(
|
|
self,
|
|
repo_name: str,
|
|
issue_number: int,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Delete a specific issue in the repository"""
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update(
|
|
"Deleting issue " + str(issue_number) + " in " + repo_name
|
|
)
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}"
|
|
response = requests.delete(url, headers=self.headers)
|
|
|
|
if response.status_code != 204:
|
|
await emitter.error_update(
|
|
f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}"
|
|
)
|
|
|
|
await emitter.success_update("Issue deleted successfully")
|
|
return "Issue " + str(issue_number) + " deleted successfully"
|
|
|
|
async def create_pull_request(
|
|
self,
|
|
repo_name: str,
|
|
title: str,
|
|
body: str = "",
|
|
head: str = "",
|
|
base: str = "",
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a new pull request in the repository"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Creating pull request in " + repo_name)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls"
|
|
payload = {"title": title, "body": body, "head": head, "base": base}
|
|
|
|
response = requests.post(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 201:
|
|
await emitter.error_update(
|
|
f"Failed to create pull request: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to create pull request: {response.status_code}")
|
|
|
|
await emitter.success_update("Pull request created successfully")
|
|
return str(response.json())
|
|
|
|
async def list_issues(
|
|
self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None
|
|
) -> str:
|
|
"""List all issues in the repository"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Listing issues in " + repo_name)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues"
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to list issues: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to list issues: {response.status_code}")
|
|
|
|
await emitter.success_update("Issues listed successfully")
|
|
return str(response.json())
|
|
|
|
async def list_pull_requests(
|
|
self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None
|
|
) -> str:
|
|
"""List all pull requests in the repository"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Listing pull requests in " + repo_name)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls"
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to list pull requests: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to list pull requests: {response.status_code}")
|
|
|
|
await emitter.success_update("Pull requests listed successfully")
|
|
return str(response.json())
|
|
|
|
async def list_repositories(
|
|
self, __event_emitter__: Callable[[dict], Any] = None
|
|
) -> str:
|
|
"""List all available repositories"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Listing all repositories")
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/user/repos"
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to list repositories: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to list repositories: {response.status_code}")
|
|
|
|
await emitter.success_update("Repositories listed successfully")
|
|
return str(response.json())
|
|
|
|
async def create_repository(
|
|
self,
|
|
name: str,
|
|
description: str = "",
|
|
private: bool = False,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a new repository"""
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Creating repository " + name)
|
|
|
|
if " " in name:
|
|
name = name.replace(" ", "-")
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/user/repos"
|
|
payload = {"name": name, "description": description, "private": private}
|
|
|
|
response = requests.post(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 201:
|
|
await emitter.error_update(
|
|
f"Failed to create repository: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to create repository: {response.status_code}")
|
|
|
|
await emitter.success_update("Repository created successfully")
|
|
return str(response.json())
|
|
|
|
async def add_file_to_repository(
|
|
self,
|
|
repo_name: str,
|
|
file_path: str,
|
|
content: str,
|
|
commit_message: str,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Add a new file to the repository"""
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Adding file to repository " + repo_name)
|
|
|
|
content_bytes = content.encode("utf-8")
|
|
base64_content = base64.b64encode(content_bytes).decode("utf-8")
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
|
|
payload = {
|
|
"content": base64_content,
|
|
"message": commit_message,
|
|
}
|
|
|
|
response = requests.post(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 201:
|
|
await emitter.error_update(
|
|
f"Failed to add file: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to add file: {response.status_code}")
|
|
|
|
await emitter.success_update("File added successfully")
|
|
return str(response.json())
|
|
|
|
async def delete_file_from_repository(
|
|
self,
|
|
repo_name: str,
|
|
file_path: str,
|
|
commit_message: str,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Delete a file from the repository"""
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Deleting file from repository " + repo_name)
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
|
|
|
|
# First, get the file to retrieve its SHA
|
|
get_response = requests.get(url, headers=self.headers)
|
|
if get_response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to fetch file for deletion: {get_response.status_code} - {get_response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to fetch file for deletion: {get_response.status_code}"
|
|
)
|
|
|
|
file_info = get_response.json()
|
|
sha = file_info.get("sha")
|
|
|
|
if not sha:
|
|
await emitter.error_update("File SHA not found, cannot delete file.")
|
|
raise Exception("File SHA not found, cannot delete file.")
|
|
|
|
payload = {
|
|
"sha": sha,
|
|
"message": commit_message,
|
|
}
|
|
|
|
response = requests.delete(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to delete file: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to delete file: {response.status_code}")
|
|
|
|
await emitter.success_update("File deleted successfully")
|
|
return str(response.json())
|
|
|
|
async def edit_file_in_repository(
|
|
self,
|
|
repo_name: str,
|
|
file_path: str,
|
|
new_content: str,
|
|
commit_message: str,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Edit an existing file in the repository"""
|
|
|
|
if "/" in repo_name:
|
|
self.valves.gitea_username, repo_name = repo_name.split("/", 1)
|
|
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Editing file in repository " + repo_name)
|
|
|
|
content_bytes = new_content.encode("utf-8")
|
|
base64_content = base64.b64encode(content_bytes).decode("utf-8")
|
|
|
|
url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}"
|
|
|
|
# First, get the file to retrieve its SHA
|
|
get_response = requests.get(url, headers=self.headers)
|
|
if get_response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to fetch file for editing: {get_response.status_code} - {get_response.text}"
|
|
)
|
|
raise Exception(
|
|
f"Failed to fetch file for editing: {get_response.status_code}"
|
|
)
|
|
|
|
file_info = get_response.json()
|
|
sha = file_info.get("sha")
|
|
|
|
if not sha:
|
|
await emitter.error_update("File SHA not found, cannot edit file.")
|
|
raise Exception("File SHA not found, cannot edit file.")
|
|
|
|
payload = {
|
|
"content": base64_content,
|
|
"message": commit_message,
|
|
"sha": sha,
|
|
}
|
|
|
|
response = requests.put(url, json=payload, headers=self.headers)
|
|
|
|
if response.status_code != 200:
|
|
await emitter.error_update(
|
|
f"Failed to edit file: {response.status_code} - {response.text}"
|
|
)
|
|
raise Exception(f"Failed to edit file: {response.status_code}")
|
|
|
|
await emitter.success_update("File edited successfully")
|
|
return str(response.json())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tools = Tools()
|
|
asyncio.run(tools.get_repository_files_content("nicible"))
|