SC-Discord-Bot/llm_tools/star_citizen_info_retrieval.py

829 lines
36 KiB
Python

"""
title: Star Citizen Information Retrieval
author: Pakobbix
author_url: https://gitea.zephyre.one/Pakobbix/SC-Discord-Bot
version: 0.1.0
"""
#!/usr/bin/env python3
import requests, asyncio, json, sqlite3
from bs4 import BeautifulSoup
from fuzzywuzzy import process
from typing import Callable, Any
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class get_information:
def __init__(self):
self.base_url = "https://starcitizen.tools"
self.db_path = "/app/sc_databases"
async def get_all_vehicle_names(self):
"""Fetches all vehicle names from the list of pledge vehicles using the MediaWiki API."""
api_url = f"{self.base_url}/api.php"
vehicle_names = []
categories = ["Category:Pledge ships", "Category:Pledge vehicles"]
for category in categories:
params = {
"action": "query",
"format": "json",
"list": "categorymembers",
"cmtitle": category,
"cmlimit": "max", # Use max limit (500)
"cmprop": "title",
}
while True:
try:
response = await asyncio.to_thread(
requests.get, api_url, params=params
)
response.raise_for_status()
data = response.json()
if "query" in data and "categorymembers" in data["query"]:
for member in data["query"]["categorymembers"]:
vehicle_names.append(member["title"])
# Check for continuation to get the next page of results
if "continue" in data and "cmcontinue" in data["continue"]:
params["cmcontinue"] = data["continue"]["cmcontinue"]
else:
break # No more pages
except requests.exceptions.RequestException as e:
print(f"Error fetching vehicle list for {category}: {e}")
break # Stop processing this category
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {category}.")
break # Stop processing this category
if not vehicle_names:
print("No vehicle names found.")
return []
# Remove duplicates and sort the list
return sorted(list(set(vehicle_names)))
async def get_closest_vehicle_name(self, vehicle_name):
"""Finds the closest matching vehicle name using fuzzy matching."""
all_vehicle_names = await self.get_all_vehicle_names()
# print(f"Total vehicle names found: {len(all_vehicle_names)}")
if not all_vehicle_names:
return None
closest_name, _ = process.extractOne(vehicle_name, all_vehicle_names)
return closest_name
async def fetch_infos(self, ship_name):
"""Fetches ship information from the Star Citizen wiki using the MediaWiki API."""
closest_name = await self.get_closest_vehicle_name(ship_name)
if not closest_name:
print(f"No matching vehicle found for {ship_name}.")
return None
# Use the closest name found for the API call
page_title = closest_name.replace(" ", "_")
api_url = f"{self.base_url}/api.php"
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text", # We only need the parsed HTML content
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
print(f"API Error for {page_title}: {data['error']['info']}")
return None
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
print(f"No content found for {page_title}.")
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching data for {page_title}: {e}")
return None
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {page_title}.")
return None
soup = BeautifulSoup(html_content, "html.parser")
info = {}
# Extracting ship information from the parsed HTML
info["general"] = await self._extract_infobox_data(soup)
info["specifications"] = await self._extract_specifications(soup)
return info
async def _extract_infobox_data(self, soup):
"""Extracts data from the infobox."""
infobox_data = {}
infobox = soup.find("details", class_="infobox")
if not infobox:
return infobox_data
items = infobox.find_all("div", class_="infobox__item")
for item in items:
label_tag = item.find("div", class_="infobox__label")
data_tag = item.find("div", class_="infobox__data")
if label_tag and data_tag:
label = label_tag.get_text(strip=True)
# For loaners, get all ship names
if "loaner" in label.lower():
value = [a.get_text(strip=True) for a in data_tag.find_all("a")]
else:
value = data_tag.get_text(separator=" ", strip=True)
infobox_data[label] = value
return infobox_data
async def _extract_specifications(self, soup):
"""Extracts data from the specifications tabs."""
specifications = {}
# Find all specification tabs like "Avionics & Systems", "Weaponry", etc.
tabs = soup.select("div.tabber > section > article.tabber__panel")
for panel in tabs:
panel_id = panel.get("id", "")
tab_name_tag = soup.find("a", {"aria-controls": panel_id})
if not tab_name_tag:
continue
tab_name = tab_name_tag.get_text(strip=True)
specifications[tab_name] = {}
# Find all component groups in the panel
component_groups = panel.find_all(
"div", class_="template-components__section"
)
for group in component_groups:
label_tag = group.find("div", class_="template-components__label")
if not label_tag:
continue
category = label_tag.get_text(strip=True)
components = []
# Find all component cards in the group
component_cards = group.select(".template-component__card")
for card in component_cards:
count_tag = card.select_one(".template-component__count")
size_tag = card.select_one(".template-component__size")
title_tag = card.select_one(".template-component__title")
if count_tag and size_tag and title_tag:
count = count_tag.get_text(strip=True)
size = size_tag.get_text(strip=True)
title = title_tag.get_text(strip=True)
components.append(f"{count} {size} {title}")
if components:
# If the category already exists, append to it (for Thrusters)
if category in specifications[tab_name]:
specifications[tab_name][category].extend(components)
else:
specifications[tab_name][category] = components
return specifications
async def fetch_all_commodity_names(self):
"""
Fetches all commodity names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT commodity_name FROM commodity_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def fetch_all_item_names(self):
"""
Fetches all item names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT item_name FROM item_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def get_all_ship_names_from_fleetyard_db(self):
"""
Fetches all ship names from the fleet.db database and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM ship_owner_list")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
class Tools:
def __init__(self):
self.db_path = "/app/sc_databases"
async def get_ship_details(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
emitter = EventEmitter(__event_emitter__)
# The API call in fetch_infos now handles fuzzy matching and name formatting.
# ship_name = await get_information().get_closest_vehicle_name(ship_name)
# ship_name = ship_name.title().replace(" ", "_")
await emitter.progress_update("Fetching ship information for " + ship_name)
info = await get_information().fetch_infos(ship_name)
if info:
await emitter.success_update(
"Successfully fetched ship information for " + ship_name
)
await emitter.progress_update("Processing retrieved information...")
output_lines = []
# Build the output string
output_lines.append(f"Information for {ship_name}:")
if info.get("general"):
await emitter.progress_update("Processing general information...")
output_lines.append("\n--- General Information ---")
for key, value in info["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
# Only print the first word for size-related keys
value = value.split()[0] if value else ""
if "Stowage" in key:
# Replace 'Stowage' with 'Storage':
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info.get("specifications"):
await emitter.progress_update("Processing specifications...")
output_lines.append("\n--- Specifications ---")
for spec_area, details in info["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output = "\n".join(output_lines)
print(final_output)
await emitter.success_update(final_output)
return final_output
else:
error_message = f"No information found for {ship_name}."
print(error_message)
await emitter.error_update(error_message)
return error_message
async def compare_ships(
self,
ship_name1: str,
ship_name2: str,
__event_emitter__: Callable[[dict], Any] = None,
):
# ship_name1 = ship_name1.title().replace(" ", "_")
# ship_name2 = ship_name2.title().replace(" ", "_")
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
f"Fetching ship information for {ship_name1} and {ship_name2}"
)
info1 = await get_information().fetch_infos(ship_name1)
if info1:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name1}"
)
output_lines = [f"Information for {ship_name1}:"]
if info1.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name1
)
output_lines.append("\n--- General Information ---")
for key, value in info1["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info1.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name1
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info1["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output1 = "\n".join(output_lines)
info2 = await get_information().fetch_infos(ship_name2)
if info2:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name2}"
)
output_lines = [f"Information for {ship_name2}:"]
if info2.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name2
)
output_lines.append("\n--- General Information ---")
for key, value in info2["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info2.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name2
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info2["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output2 = "\n".join(output_lines)
await emitter.success_update(final_output2)
print(final_output1 + "\n\n" + final_output2)
return final_output1 + "\n\n" + final_output2
async def get_commodity_sell_price(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodity sell prices from the database by name.
commodity_name: The name of the commodity to fetch sell prices for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No sell price information found for commodity '{commodity_name}'."
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if best_match and best_match[1] > 60:
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching sell prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_sell, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ? AND price_sell > 0",
(matched_commodity_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
sell_price = f"{int(row[0])} aUEC"
terminal_name = row[1]
item_name = row[2]
output_lines.append(
f"Item: {item_name}, Sell Price: {sell_price}/SCU, Terminal: {terminal_name}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched sell prices for '{matched_commodity_name}'"
)
else:
result_string = f"No locations found to sell '{matched_commodity_name}'."
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching sell prices for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
correct_response = "If not other specified, only answer two terminals with the highest sell price with the actual sell price per SCU (Star Citizen Unit).\n" + result_string
return correct_response
async def get_commodity_buy_price(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodity buy prices from the database by name.
commodity_name: The name of the commodity to fetch buy prices for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No buy price information found for commodity '{commodity_name}'."
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if best_match and best_match[1] > 60:
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_buy, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ? AND price_buy > 0",
(matched_commodity_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = f"{int(row[0])} aUEC"
terminal_name = row[1]
item_name = row[2]
output_lines.append(
f"Item: {item_name}, Buy Price: {buy_price}/SCU, Terminal: {terminal_name}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy prices for '{matched_commodity_name}'"
)
else:
result_string = (
f"No locations found to buy '{matched_commodity_name}'."
)
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching buy prices for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
correct_response = "If not other specified, only answer two terminals with the lowest buy price with the actual buy price per SCU (Star Citizen Unit).\n" + result_string
return correct_response
async def get_item_prices(
self, item_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch item prices from the database by name.
item_name: The name of the item to fetch.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No information found for item '{item_name}'."
# First, check for spelling issues and compare it to the list of all item names available
try:
await emitter.progress_update(
f"Fetching item names from the database to find a match for '{item_name}'"
)
all_names = await get_information().fetch_all_item_names()
# The names are returned as a single string, split it into a list
names_list = all_names.splitlines()
best_match = process.extractOne(item_name, names_list)
if best_match and best_match[1] > 60:
matched_item_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{item_name}': {matched_item_name}"
)
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy and sell prices for '{matched_item_name}'"
)
cursor.execute(
"SELECT price_buy, price_sell, terminal_name, item_name FROM item_prices WHERE item_name = ?",
(matched_item_name,),
)
await emitter.progress_update(
f"Processing results for '{matched_item_name}'"
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = (
"Not buyable" if int(row[0]) == 0 else f"{int(row[0])} aUEC"
)
sell_price = (
"not sellable"
if int(row[1]) == 0
else f"{int(row[1])} aUEC"
)
output_lines.append(
f"Item: {row[3]}, Buy Price: {buy_price}, Sell Price: {sell_price}, Terminal: {row[2]}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy and sell prices for '{matched_item_name}'"
)
else:
result_string = f"No price data found for '{matched_item_name}'."
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for item '{item_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching information for {item_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
return result_string
async def get_ship_owners(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches the owners of a specific ship from the fleet.db sqlite database.
ship_name: The name of the ship to fetch owners for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No owners found for ship '{ship_name}'."
try:
await emitter.progress_update(
f"Fetching owners for ship '{ship_name}' from the database"
)
available_ships = (
await get_information().get_all_ship_names_from_fleetyard_db()
)
# The names are returned as a single string, split it into a list
ships_list = available_ships.splitlines()
best_match = process.extractOne(ship_name, ships_list)
if best_match and best_match[1] > 60:
matched_ship_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{ship_name}': {matched_ship_name}"
)
print(f'found a close match for "{ship_name}": {matched_ship_name}')
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching owners for ship '{matched_ship_name}' from the database"
)
cursor.execute(
"SELECT manufacturerName, name, usernames FROM ship_owner_list WHERE name = ?",
(matched_ship_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
owners = [row[2] for row in rows]
manufacturer_name = rows[0][0]
matched_ship_name = rows[0][1]
result_string = f"Report these to the user in a bulletpoint list:\nOwners of ship {manufacturer_name} {matched_ship_name}: {', '.join(owners)}"
except Exception as e:
error_message = (
f"An error occurred while fetching owners for {ship_name}: {str(e)}"
)
await emitter.error_update(error_message)
result_string = error_message
await emitter.progress_update(result_string)
print(result_string)
return result_string
async def list_purchasable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all buyable ships, their prices, and locations from the buyable_ships.db database.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Fetching purchasable ships from the database...")
location_data = {}
final_output = "No purchasable ships found in the database."
try:
conn = sqlite3.connect(self.db_path + "/buyable_ships.db")
cursor = conn.cursor()
cursor.execute(
"SELECT vehicle_name, price_buy, terminal_name FROM buyable_ships ORDER BY terminal_name, vehicle_name"
)
rows = cursor.fetchall()
conn.close()
if not rows:
await emitter.error_update(
"No purchasable ships found in the database."
)
print(final_output)
return final_output
await emitter.progress_update("Processing ship data...")
for row in rows:
ship_name, price, location = row
if location not in location_data:
location_data[location] = []
location_data[location].append({"name": ship_name, "price": price})
output_lines = []
for location, ships in sorted(location_data.items()):
output_lines.append(f"\n--- Ingame Shop: {location} ---")
for ship in ships:
output_lines.append(
f" - {ship['name']}: {int(ship['price'])} aUEC"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found purchasable ships at {len(location_data)} locations."
)
except sqlite3.Error as e:
error_message = f"Database error while fetching purchasable ships: {e}"
await emitter.error_update(error_message)
final_output = error_message
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
await emitter.error_update(error_message)
final_output = error_message
print(final_output)
return final_output
async def list_rentable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all rentable ships, their prices, and locations from the Star Citizen Tools wiki.
"""
emitter = EventEmitter(__event_emitter__)
api_url = "https://starcitizen.tools/api.php"
ship_prices = {}
ship_locations = {}
page_title = "Ship_renting"
await emitter.progress_update(f"Fetching data from {page_title}...")
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text",
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
await emitter.error_update(
f"API Error for {page_title}: {data['error']['info']}"
)
return
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
await emitter.error_update(f"No content found for {page_title}.")
return
await emitter.progress_update(f"Parsing data from {page_title}...")
soup = BeautifulSoup(html_content, "html.parser")
tables = soup.find_all("table", class_="wikitable")
for table in tables:
header_row = table.find("tr")
if not header_row:
continue
headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
rows = table.find_all("tr")[1:]
# Table 1: Ship rental prices
if "1 Day" in headers and "Location" in headers:
for row in rows:
cells = row.find_all("td")
if len(cells) < 8:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
ship_prices[ship_name] = {
"1_day": cells[3].get_text(strip=True),
"3_days": cells[4].get_text(strip=True),
"7_days": cells[5].get_text(strip=True),
"30_days": cells[6].get_text(strip=True),
}
# Table 2: Ship rental locations
elif "Area18" in headers:
location_headers = headers[3:]
for row in rows:
cells = row.find_all("td")
if len(cells) < 4:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
if ship_name not in ship_locations:
ship_locations[ship_name] = []
for i, cell in enumerate(cells[3:]):
if "" in cell.get_text():
ship_locations[ship_name].append(location_headers[i])
await emitter.success_update(f"Successfully processed {page_title}.")
except requests.exceptions.RequestException as e:
await emitter.error_update(f"Error fetching data for {page_title}: {e}")
except json.JSONDecodeError:
await emitter.error_update(f"Error decoding JSON for {page_title}.")
output_lines = []
for ship_name, locations in sorted(ship_locations.items()):
if not locations:
continue
output_lines.append(f"\n--- {ship_name} ---")
output_lines.append("Rentable at:")
prices = ship_prices.get(ship_name, {})
for location in locations:
output_lines.append(f" - Location: {location}")
if prices:
output_lines.append(
f" - 1 Day: {prices.get('1_day', 'N/A')}, 3 Days: {prices.get('3_days', 'N/A')}, 7 Days: {prices.get('7_days', 'N/A')}, 30 Days: {prices.get('30_days', 'N/A')}"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found {len(ship_locations)} unique rentable ships."
)
print(final_output)
return final_output
if __name__ == "__main__":
info_printer = Tools()
asyncio.run(info_printer.list_rentable_ships())