- Implemented Tautulli information retrieval in `tautulli_informations.py` to fetch movie, anime, TV show, music amounts, and more. - Created a weather forecast tool in `weather_forecast.py` that retrieves and formats a 7-day weather forecast in German. - Developed a YouTube transcript provider in `youtube_summarizer.py` to fetch video transcripts and titles using Langchain Community's YoutubeLoader.
226 lines
9.5 KiB
Python
226 lines
9.5 KiB
Python
import requests, asyncio
|
|
from typing import Any, Callable
|
|
|
|
|
|
class EventEmitter:
|
|
def __init__(self, event_emitter: Callable[[dict], Any] = None):
|
|
self.event_emitter = event_emitter
|
|
|
|
async def progress_update(self, description):
|
|
await self.emit(description)
|
|
|
|
async def error_update(self, description):
|
|
await self.emit(description, "error", True)
|
|
|
|
async def success_update(self, description):
|
|
await self.emit(description, "success", True)
|
|
|
|
async def emit(self, description="Unknown State", status="in_progress", done=False):
|
|
if self.event_emitter:
|
|
await self.event_emitter(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"status": status,
|
|
"description": description,
|
|
"done": done,
|
|
},
|
|
}
|
|
)
|
|
|
|
|
|
class Tools:
|
|
def __init__(self):
|
|
self.Tautulli_API = "adc673ac6746492487fd7df9f175eeeb"
|
|
self.Tautulli_URL = "https://tautulli.zephyre.one/api/v2"
|
|
self.url = (
|
|
f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_libraries"
|
|
)
|
|
|
|
async def get_movie_amount(self, __event_emitter__: Callable[[dict], Any] = None):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting movie amount")
|
|
try:
|
|
self.response = requests.get(self.url)
|
|
self.response = self.response.json()
|
|
movie_amount = self.response["response"]["data"][0]["count"]
|
|
await emitter.success_update("Got movie amount")
|
|
return (
|
|
"There are currently " + str(movie_amount) + " movies in the library."
|
|
)
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get movie amount")
|
|
return str(e)
|
|
|
|
async def get_anime_amount(self, __event_emitter__: Callable[[dict], Any] = None):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting anime amount")
|
|
try:
|
|
self.response = requests.get(self.url)
|
|
self.response = self.response.json()
|
|
anime_series = self.response["response"]["data"][1]["parent_count"]
|
|
anime_episodes = self.response["response"]["data"][1]["child_count"]
|
|
await emitter.success_update("Got anime amount")
|
|
return (
|
|
"There are currently "
|
|
+ str(anime_series)
|
|
+ " anime series in the library with a total of "
|
|
+ str(anime_episodes)
|
|
+ " episodes."
|
|
)
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get anime amount")
|
|
return str(e)
|
|
|
|
async def get_tvshow_amount(self, __event_emitter__: Callable[[dict], Any] = None):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting tvshow amount")
|
|
try:
|
|
self.response = requests.get(self.url)
|
|
self.response = self.response.json()
|
|
tvshow_series = self.response["response"]["data"][2]["parent_count"]
|
|
tvshow_episodes = self.response["response"]["data"][2]["child_count"]
|
|
await emitter.success_update("Got tvshow amount")
|
|
return (
|
|
"There are currently "
|
|
+ str(tvshow_series)
|
|
+ " TV shows in the library with a total of "
|
|
+ str(tvshow_episodes)
|
|
+ " episodes."
|
|
)
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get tvshow amount")
|
|
return str(e)
|
|
|
|
async def get_music_amount(self, __event_emitter__: Callable[[dict], Any] = None):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting music amount")
|
|
try:
|
|
self.response = requests.get(self.url)
|
|
self.response = self.response.json()
|
|
music_alben = self.response["response"]["data"][4]["parent_count"]
|
|
music_songs = self.response["response"]["data"][4]["child_count"]
|
|
await emitter.success_update("Got music amount")
|
|
return (
|
|
"There are currently "
|
|
+ str(music_alben)
|
|
+ " music albums in the library with a total of "
|
|
+ str(music_songs)
|
|
+ " songs."
|
|
)
|
|
except Exception as e:
|
|
await emitter.error_update(str(e))
|
|
return str(e)
|
|
|
|
async def get_newest_media(self, __event_emitter__: Callable[[dict], Any] = None):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting newest Media")
|
|
try:
|
|
url = f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_recently_added&count=10"
|
|
response = requests.get(url)
|
|
response = response.json()
|
|
newest_media = response["response"]["data"]["recently_added"]
|
|
await emitter.success_update("Got newest Media")
|
|
grandparent_title = []
|
|
library_name = []
|
|
episode_title = []
|
|
recently_added = []
|
|
for i in range(len(newest_media)):
|
|
grandparent_title.append(newest_media[i]["grandparent_title"])
|
|
library_name.append(newest_media[i]["library_name"])
|
|
if newest_media[i]["media_type"] == "movie":
|
|
episode_title.append(newest_media[i]["title"])
|
|
recently_added.append(
|
|
f"Type: {library_name[i]} - Title: {episode_title[i]}"
|
|
)
|
|
else:
|
|
episode_title.append(newest_media[i]["media_index"])
|
|
recently_added.append(
|
|
f"Type: {library_name[i]} - Title: {grandparent_title[i]} - Episode: {episode_title[i]}"
|
|
)
|
|
Recently_Added_String = "Recently Added to Plex/JellyFin:\n" + "\n".join(
|
|
recently_added
|
|
)
|
|
return Recently_Added_String
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get newest Media")
|
|
return str(e)
|
|
|
|
async def get_most_watched_media(
|
|
self, __event_emitter__: Callable[[dict], Any] = None
|
|
):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting most watched Media")
|
|
try:
|
|
url = f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_home_stats"
|
|
response = requests.get(url)
|
|
response = response.json()
|
|
rows = [item["rows"] for item in response["response"]["data"]]
|
|
most_watched_media = []
|
|
for item in rows[1]:
|
|
most_watched_media.append(item["title"])
|
|
most_watched_media.append(item["total_plays"])
|
|
most_watched_string = "Most watched media:\n" + "\n".join(
|
|
[
|
|
f"{most_watched_media[i]} - {most_watched_media[i+1]} plays"
|
|
for i in range(0, len(most_watched_media), 2)
|
|
]
|
|
)
|
|
await emitter.success_update("Got most watched Media")
|
|
return most_watched_string
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get most watched Media")
|
|
return str(e)
|
|
|
|
async def get_most_active_users(
|
|
self, __event_emitter__: Callable[[dict], Any] = None
|
|
):
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting most active users")
|
|
try:
|
|
url = f"{self.Tautulli_URL}/api/v2?apikey={self.Tautulli_API}&cmd=get_home_stats"
|
|
response = requests.get(url)
|
|
response = response.json()
|
|
most_active_users = response["response"]["data"][8]["rows"]
|
|
user_list = []
|
|
for user in most_active_users:
|
|
user_list.append(user["friendly_name"])
|
|
user_list.append(user["total_plays"])
|
|
await emitter.success_update("Got most active users")
|
|
return "The most active users are:\n" + "\n".join(
|
|
[
|
|
f"{user_list[i]} - {user_list[i+1]} plays"
|
|
for i in range(0, len(user_list), 2)
|
|
]
|
|
)
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get most active users")
|
|
return str(e)
|
|
|
|
async def get_all_informations(
|
|
self, __event_emitter__: Callable[[dict], Any] = None
|
|
) -> str:
|
|
emitter = EventEmitter(__event_emitter__)
|
|
await emitter.progress_update("Getting all informations")
|
|
try:
|
|
movie_amount = await self.get_movie_amount()
|
|
anime_amount = await self.get_anime_amount()
|
|
tvshow_amount = await self.get_tvshow_amount()
|
|
music_amount = await self.get_music_amount()
|
|
newest_media = await self.get_newest_media()
|
|
most_watched_media = await self.get_most_watched_media()
|
|
most_active_users = await self.get_most_active_users()
|
|
await emitter.success_update("Got all informations")
|
|
return f"{movie_amount}\n\n{anime_amount}\n\n{tvshow_amount}\n\n{music_amount}\n\n{newest_media}\n\n{most_watched_media}\n\n{most_active_users}"
|
|
except Exception as e:
|
|
await emitter.error_update("Failed to get all informations")
|
|
return str(e)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
tools = Tools()
|
|
print(asyncio.run(tools.get_most_active_users()))
|
|
except Exception as e:
|
|
print(str(e))
|