""" title: Gitea API Access author: Pakobbix author_url: https://gitea.zephyre.one/Pakobbix/ version: 0.1.0 """ #!/usr/bin/env python3 import requests import asyncio import base64 from typing import Callable, Any from pydantic import BaseModel, Field class EventEmitter: def __init__(self, event_emitter: Callable[[dict], Any] = None): self.event_emitter = event_emitter async def progress_update(self, description): await self.emit(description) async def error_update(self, description): await self.emit(description, "error", True) async def success_update(self, description): await self.emit(description, "success", True) async def emit(self, description="Unknown State", status="in_progress", done=False): if self.event_emitter: await self.event_emitter( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) class Helper: """Helper class for common utility functions""" @staticmethod def get_repo_url(base_url, user_name, repo_name: str) -> str: return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/contents/" @staticmethod def get_issue_url(base_url, user_name, repo_name: str, issue_number: int) -> str: return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/issues/{issue_number}" @staticmethod def get_pr_url(base_url, user_name, repo_name: str, pr_number: int) -> str: return f"{base_url}/api/v1/repos/{user_name}/{repo_name}/pulls/{pr_number}" async def _fetch_contents( self, url: str, path: str = "", __event_emitter__: Callable[[dict], Any] = None ) -> str: emitter = EventEmitter(__event_emitter__) await emitter.progress_update(f"Fetching contents from {path}") # Make the request to the Gitea API to fetch the contents of the repository response = requests.get(url, headers=Tools().headers) if response.status_code != 200: await emitter.error_update( f"Failed to fetch repository: {response.status_code}" ) raise Exception(f"Failed to fetch repository: {response.status_code}") all_contents = "" # Iterate through the items in the response for item in response.json(): item_path = f"{path}/{item['name']}" if path else item["name"] if ".gitignore" in item_path: continue # Check if the item is a file or a directory if item["type"] == "file": # Fetch the file content using the download URL file_response = requests.get( item["download_url"], headers=Tools().headers ) if file_response.status_code != 200: print( f"Failed to fetch file {item['download_url']}: {file_response.status_code}" ) continue # Check MIME type to ignore binary files and images if "text" in file_response.headers["Content-Type"]: # Add file header and content with separator all_contents += f"Content of {item_path}:\n{file_response.text}\n" all_contents += "-" * 40 + "\n" else: # Note binary files in the output print(f"Ignored binary file or image: {item['download_url']}") elif item["type"] == "dir": # Recursively fetch contents of the directory dir_contents = await self._fetch_contents( item["url"], item_path, __event_emitter__ ) all_contents += dir_contents await emitter.progress_update(f"Fetching contents from {path} completed") # Return the formatted contents as string return str(all_contents) class Tools: class Valves(BaseModel): access_token: str = Field( "", description="Gitea access token", ) gitea_url: str = Field( "https://your.gitea.instance", description="Base URL of the Gitea instance", ) gitea_username: str = Field( "", description="Gitea username", ) def __init__(self): self.valves = self.Valves() @property def headers(self): return {"Authorization": f"token {self.valves.access_token}"} async def get_repository_files_content( self, repo_name: str, ) -> str: if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) if not self.valves.access_token: raise Exception( "Gitea access token is not set in the environment variables." ) repo_url = Helper.get_repo_url( self.valves.gitea_url, self.valves.gitea_username, repo_name ) content = ( await Helper()._fetch_contents(repo_url, "", None) + "\nAll files fetched successfully" ) return str(content) async def create_issue( self, repo_name: str, short_title: str, body: str, assignee: str = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Create a new issue in the repository""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Creating issue in " + repo_name) if not self.valves.access_token: await emitter.error_update( "Gitea access token is not set in the environment variables." ) raise Exception( "Gitea access token is not set in the environment variables." ) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues" payload = {"title": short_title, "body": body} if assignee: payload["assignee"] = assignee response = requests.post(url, json=payload, headers=self.headers) if response.status_code != 201: await emitter.error_update( f"Failed to create issue: {response.status_code} - {response.text}" ) raise Exception(f"Failed to create issue: {response.status_code}") await emitter.success_update("Issue created successfully") return str(response.json()) async def read_issue( self, repo_name: str, issue_number: int, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Read a specific issue from the repository""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update( "Reading issue " + str(issue_number) + " in " + repo_name ) url = Helper.get_issue_url( self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number ) response = requests.get(url, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to read issue {issue_number}: {response.status_code} - {response.text}" ) raise Exception( f"Failed to read issue {issue_number}: {response.status_code}" ) await emitter.success_update("Issue read successfully") return str(response.json()) async def edit_issue( self, repo_name: str, issue_number: int, title: str = None, body: str = None, labels: list = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Edit an existing issue in the repository. All parameters are optional.""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update( "Editing issue " + str(issue_number) + " in " + repo_name ) if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) url = Helper.get_issue_url( self.valves.gitea_url, self.valves.gitea_username, repo_name, issue_number ) payload = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body if labels is not None: payload["labels"] = labels response = requests.patch(url, json=payload, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}" ) raise Exception( f"Failed to edit issue {issue_number}: {response.status_code} - {response.text}" ) await emitter.success_update("Issue edited successfully") return str(response.json()) async def add_commentary_to_issue( self, repo_name: str, issue_number: int, commentary: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Add a commentary to a specific issue in the repository""" if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) emitter = EventEmitter(__event_emitter__) await emitter.progress_update( "Adding commentary to issue " + str(issue_number) + " in " + repo_name ) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}/comments" payload = {"body": commentary} response = requests.post(url, json=payload, headers=self.headers) if response.status_code != 201: await emitter.error_update( f"Failed to add commentary to issue {issue_number}: {response.status_code} - {response.text}" ) raise Exception( f"Failed to add commentary to issue {issue_number}: {response.status_code}" ) await emitter.success_update("Commentary added successfully") return str(response.json()) async def close_issue( self, repo_name: str, issue_number: int, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Close a specific issue in the repository""" if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) emitter = EventEmitter(__event_emitter__) await emitter.progress_update( "Closing issue " + str(issue_number) + " in " + repo_name ) if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}" payload = {"state": "closed"} response = requests.patch(url, json=payload, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to close issue {issue_number}: {response.status_code} - {response.text}" ) raise Exception( f"Failed to close issue {issue_number}: {response.status_code} - {response.text}" ) await emitter.success_update("Issue closed successfully") return "Issue " + str(issue_number) + " closed successfully" async def delete_issue( self, repo_name: str, issue_number: int, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Delete a specific issue in the repository""" if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) emitter = EventEmitter(__event_emitter__) await emitter.progress_update( "Deleting issue " + str(issue_number) + " in " + repo_name ) if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues/{issue_number}" response = requests.delete(url, headers=self.headers) if response.status_code != 204: await emitter.error_update( f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}" ) raise Exception( f"Failed to delete issue {issue_number}: {response.status_code} - {response.text}" ) await emitter.success_update("Issue deleted successfully") return "Issue " + str(issue_number) + " deleted successfully" async def create_pull_request( self, repo_name: str, title: str, body: str = "", head: str = "", base: str = "", __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Create a new pull request in the repository""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Creating pull request in " + repo_name) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls" payload = {"title": title, "body": body, "head": head, "base": base} response = requests.post(url, json=payload, headers=self.headers) if response.status_code != 201: await emitter.error_update( f"Failed to create pull request: {response.status_code} - {response.text}" ) raise Exception(f"Failed to create pull request: {response.status_code}") await emitter.success_update("Pull request created successfully") return str(response.json()) async def list_issues( self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None ) -> str: """List all issues in the repository""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Listing issues in " + repo_name) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/issues" response = requests.get(url, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to list issues: {response.status_code} - {response.text}" ) raise Exception(f"Failed to list issues: {response.status_code}") await emitter.success_update("Issues listed successfully") return str(response.json()) async def list_pull_requests( self, repo_name: str, __event_emitter__: Callable[[dict], Any] = None ) -> str: """List all pull requests in the repository""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Listing pull requests in " + repo_name) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/pulls" response = requests.get(url, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to list pull requests: {response.status_code} - {response.text}" ) raise Exception(f"Failed to list pull requests: {response.status_code}") await emitter.success_update("Pull requests listed successfully") return str(response.json()) async def list_repositories( self, __event_emitter__: Callable[[dict], Any] = None ) -> str: """List all available repositories""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Listing all repositories") url = f"{self.valves.gitea_url}/api/v1/user/repos" response = requests.get(url, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to list repositories: {response.status_code} - {response.text}" ) raise Exception(f"Failed to list repositories: {response.status_code}") await emitter.success_update("Repositories listed successfully") return str(response.json()) async def create_repository( self, name: str, description: str = "", private: bool = False, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Create a new repository""" emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Creating repository " + name) if " " in name: name = name.replace(" ", "-") url = f"{self.valves.gitea_url}/api/v1/user/repos" payload = {"name": name, "description": description, "private": private} response = requests.post(url, json=payload, headers=self.headers) if response.status_code != 201: await emitter.error_update( f"Failed to create repository: {response.status_code} - {response.text}" ) raise Exception(f"Failed to create repository: {response.status_code}") await emitter.success_update("Repository created successfully") return str(response.json()) async def add_file_to_repository( self, repo_name: str, file_path: str, content: str, commit_message: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Add a new file to the repository""" if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Adding file to repository " + repo_name) content_bytes = content.encode("utf-8") base64_content = base64.b64encode(content_bytes).decode("utf-8") url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}" payload = { "content": base64_content, "message": commit_message, } response = requests.post(url, json=payload, headers=self.headers) if response.status_code != 201: await emitter.error_update( f"Failed to add file: {response.status_code} - {response.text}" ) raise Exception(f"Failed to add file: {response.status_code}") await emitter.success_update("File added successfully") return str(response.json()) async def delete_file_from_repository( self, repo_name: str, file_path: str, commit_message: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Delete a file from the repository""" if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Deleting file from repository " + repo_name) url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}" # First, get the file to retrieve its SHA get_response = requests.get(url, headers=self.headers) if get_response.status_code != 200: await emitter.error_update( f"Failed to fetch file for deletion: {get_response.status_code} - {get_response.text}" ) raise Exception( f"Failed to fetch file for deletion: {get_response.status_code}" ) file_info = get_response.json() sha = file_info.get("sha") if not sha: await emitter.error_update("File SHA not found, cannot delete file.") raise Exception("File SHA not found, cannot delete file.") payload = { "sha": sha, "message": commit_message, } response = requests.delete(url, json=payload, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to delete file: {response.status_code} - {response.text}" ) raise Exception(f"Failed to delete file: {response.status_code}") await emitter.success_update("File deleted successfully") return str(response.json()) async def edit_file_in_repository( self, repo_name: str, file_path: str, new_content: str, commit_message: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Edit an existing file in the repository""" if "/" in repo_name: self.valves.gitea_username, repo_name = repo_name.split("/", 1) emitter = EventEmitter(__event_emitter__) await emitter.progress_update("Editing file in repository " + repo_name) content_bytes = new_content.encode("utf-8") base64_content = base64.b64encode(content_bytes).decode("utf-8") url = f"{self.valves.gitea_url}/api/v1/repos/{self.valves.gitea_username}/{repo_name}/contents/{file_path}" # First, get the file to retrieve its SHA get_response = requests.get(url, headers=self.headers) if get_response.status_code != 200: await emitter.error_update( f"Failed to fetch file for editing: {get_response.status_code} - {get_response.text}" ) raise Exception( f"Failed to fetch file for editing: {get_response.status_code}" ) file_info = get_response.json() sha = file_info.get("sha") if not sha: await emitter.error_update("File SHA not found, cannot edit file.") raise Exception("File SHA not found, cannot edit file.") payload = { "content": base64_content, "message": commit_message, "sha": sha, } response = requests.put(url, json=payload, headers=self.headers) if response.status_code != 200: await emitter.error_update( f"Failed to edit file: {response.status_code} - {response.text}" ) raise Exception(f"Failed to edit file: {response.status_code}") await emitter.success_update("File edited successfully") return str(response.json()) if __name__ == "__main__": tools = Tools() asyncio.run(tools.get_repository_files_content("nicible"))