SC-Discord-Bot/llm_tools/star_citizen_info_retrieval.py

937 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
title: Star Citizen Information Retrieval
author: Pakobbix
author_url: https://gitea.zephyre.one/Pakobbix/SC-Discord-Bot
version: 0.1.0
"""
#!/usr/bin/env python3
import requests, asyncio, json, sqlite3
from bs4 import BeautifulSoup
from fuzzywuzzy import process
from typing import Callable, Any
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class get_information:
def __init__(self):
self.base_url = "https://starcitizen.tools"
self.db_path = "/app/sc_databases"
async def get_all_vehicle_names(self):
"""Fetches all vehicle names from the list of pledge vehicles using the MediaWiki API."""
api_url = f"{self.base_url}/api.php"
vehicle_names = []
categories = ["Category:Pledge ships", "Category:Pledge vehicles"]
for category in categories:
params = {
"action": "query",
"format": "json",
"list": "categorymembers",
"cmtitle": category,
"cmlimit": "max", # Use max limit (500)
"cmprop": "title",
}
while True:
try:
response = await asyncio.to_thread(
requests.get, api_url, params=params
)
response.raise_for_status()
data = response.json()
if "query" in data and "categorymembers" in data["query"]:
for member in data["query"]["categorymembers"]:
vehicle_names.append(member["title"])
# Check for continuation to get the next page of results
if "continue" in data and "cmcontinue" in data["continue"]:
params["cmcontinue"] = data["continue"]["cmcontinue"]
else:
break # No more pages
except requests.exceptions.RequestException as e:
print(f"Error fetching vehicle list for {category}: {e}")
break # Stop processing this category
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {category}.")
break # Stop processing this category
if not vehicle_names:
print("No vehicle names found.")
return []
# Remove duplicates and sort the list
return sorted(list(set(vehicle_names)))
async def get_closest_vehicle_name(self, vehicle_name):
"""Finds the closest matching vehicle name using fuzzy matching."""
all_vehicle_names = await self.get_all_vehicle_names()
# print(f"Total vehicle names found: {len(all_vehicle_names)}")
if not all_vehicle_names:
return None
closest_name, _ = process.extractOne(vehicle_name, all_vehicle_names)
return closest_name
async def fetch_infos(self, ship_name):
"""Fetches ship information from the Star Citizen wiki using the MediaWiki API."""
closest_name = await self.get_closest_vehicle_name(ship_name)
if not closest_name:
print(f"No matching vehicle found for {ship_name}.")
return None
# Use the closest name found for the API call
page_title = closest_name.replace(" ", "_")
api_url = f"{self.base_url}/api.php"
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text", # We only need the parsed HTML content
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
print(f"API Error for {page_title}: {data['error']['info']}")
return None
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
print(f"No content found for {page_title}.")
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching data for {page_title}: {e}")
return None
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {page_title}.")
return None
soup = BeautifulSoup(html_content, "html.parser")
info = {}
# Extracting ship information from the parsed HTML
info["general"] = await self._extract_infobox_data(soup)
info["specifications"] = await self._extract_specifications(soup)
return info
async def _extract_infobox_data(self, soup):
"""Extracts data from the infobox."""
infobox_data = {}
infobox = soup.find("details", class_="infobox")
if not infobox:
return infobox_data
items = infobox.find_all("div", class_="infobox__item")
for item in items:
label_tag = item.find("div", class_="infobox__label")
data_tag = item.find("div", class_="infobox__data")
if label_tag and data_tag:
label = label_tag.get_text(strip=True)
# For loaners, get all ship names
if "loaner" in label.lower():
value = [a.get_text(strip=True) for a in data_tag.find_all("a")]
else:
value = data_tag.get_text(separator=" ", strip=True)
infobox_data[label] = value
return infobox_data
async def _extract_specifications(self, soup):
"""Extracts data from the specifications tabs."""
specifications = {}
# Find all specification tabs like "Avionics & Systems", "Weaponry", etc.
tabs = soup.select("div.tabber > section > article.tabber__panel")
for panel in tabs:
panel_id = panel.get("id", "")
tab_name_tag = soup.find("a", {"aria-controls": panel_id})
if not tab_name_tag:
continue
tab_name = tab_name_tag.get_text(strip=True)
specifications[tab_name] = {}
# Find all component groups in the panel
component_groups = panel.find_all(
"div", class_="template-components__section"
)
for group in component_groups:
label_tag = group.find("div", class_="template-components__label")
if not label_tag:
continue
category = label_tag.get_text(strip=True)
components = []
# Find all component cards in the group
component_cards = group.select(".template-component__card")
for card in component_cards:
count_tag = card.select_one(".template-component__count")
size_tag = card.select_one(".template-component__size")
title_tag = card.select_one(".template-component__title")
if count_tag and size_tag and title_tag:
count = count_tag.get_text(strip=True)
size = size_tag.get_text(strip=True)
title = title_tag.get_text(strip=True)
components.append(f"{count} {size} {title}")
if components:
# If the category already exists, append to it (for Thrusters)
if category in specifications[tab_name]:
specifications[tab_name][category].extend(components)
else:
specifications[tab_name][category] = components
return specifications
async def fetch_all_commodity_names(self):
"""
Fetches all commodity names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT commodity_name FROM commodity_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def fetch_all_item_names(self):
"""
Fetches all item names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT item_name FROM item_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def get_all_ship_names_from_fleetyard_db(self):
"""
Fetches all ship names from the fleet.db database and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM ship_owner_list")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
class Tools:
def __init__(self):
self.db_path = "/app/sc_databases"
async def get_ship_details(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
emitter = EventEmitter(__event_emitter__)
# The API call in fetch_infos now handles fuzzy matching and name formatting.
# ship_name = await get_information().get_closest_vehicle_name(ship_name)
# ship_name = ship_name.title().replace(" ", "_")
await emitter.progress_update("Fetching ship information for " + ship_name)
info = await get_information().fetch_infos(ship_name)
if info:
await emitter.success_update(
"Successfully fetched ship information for " + ship_name
)
await emitter.progress_update("Processing retrieved information...")
output_lines = []
# Build the output string
output_lines.append(f"Information for {ship_name}:")
if info.get("general"):
await emitter.progress_update("Processing general information...")
output_lines.append("\n--- General Information ---")
for key, value in info["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
# Only print the first word for size-related keys
value = value.split()[0] if value else ""
if "Stowage" in key:
# Replace 'Stowage' with 'Storage':
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info.get("specifications"):
await emitter.progress_update("Processing specifications...")
output_lines.append("\n--- Specifications ---")
for spec_area, details in info["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output = "\n".join(output_lines)
print(final_output)
await emitter.success_update(final_output)
return final_output
else:
error_message = f"No information found for {ship_name}."
print(error_message)
await emitter.error_update(error_message)
return error_message
async def compare_ships(
self,
ship_name1: str,
ship_name2: str,
__event_emitter__: Callable[[dict], Any] = None,
):
# ship_name1 = ship_name1.title().replace(" ", "_")
# ship_name2 = ship_name2.title().replace(" ", "_")
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
f"Fetching ship information for {ship_name1} and {ship_name2}"
)
info1 = await get_information().fetch_infos(ship_name1)
if info1:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name1}"
)
output_lines = [f"Information for {ship_name1}:"]
if info1.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name1
)
output_lines.append("\n--- General Information ---")
for key, value in info1["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info1.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name1
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info1["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output1 = "\n".join(output_lines)
info2 = await get_information().fetch_infos(ship_name2)
if info2:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name2}"
)
output_lines = [f"Information for {ship_name2}:"]
if info2.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name2
)
output_lines.append("\n--- General Information ---")
for key, value in info2["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info2.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name2
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info2["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output2 = "\n".join(output_lines)
await emitter.success_update(final_output2)
print(final_output1 + "\n\n" + final_output2)
return final_output1 + "\n\n" + final_output2
async def get_commodity_sell_price(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodity sell prices from the database by name.
commodity_name: The name of the commodity to fetch sell prices for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No sell price information found for commodity '{commodity_name}'."
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if best_match and best_match[1] > 60:
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching sell prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_sell, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ? AND price_sell > 0",
(matched_commodity_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
sell_price = f"{int(row[0])} aUEC"
terminal_name = row[1]
item_name = row[2]
output_lines.append(
f"Item: {item_name}, Sell Price: {sell_price}/SCU, Terminal: {terminal_name}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched sell prices for '{matched_commodity_name}'"
)
else:
result_string = f"No locations found to sell '{matched_commodity_name}'."
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching sell prices for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
correct_response = "If not other specified, only answer two terminals with the highest sell price with the actual sell price per SCU (Star Citizen Unit).\n" + result_string
return correct_response
async def get_commodity_buy_price(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodity buy prices from the database by name.
commodity_name: The name of the commodity to fetch buy prices for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No buy price information found for commodity '{commodity_name}'."
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if best_match and best_match[1] > 60:
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_buy, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ? AND price_buy > 0",
(matched_commodity_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = f"{int(row[0])} aUEC"
terminal_name = row[1]
item_name = row[2]
output_lines.append(
f"Item: {item_name}, Buy Price: {buy_price}/SCU, Terminal: {terminal_name}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy prices for '{matched_commodity_name}'"
)
else:
result_string = (
f"No locations found to buy '{matched_commodity_name}'."
)
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching buy prices for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
correct_response = "If not other specified, only answer two terminals with the lowest buy price with the actual buy price per SCU (Star Citizen Unit).\n" + result_string
return correct_response
async def get_item_prices(
self, item_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch item prices from the database by name.
item_name: The name of the item to fetch.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No information found for item '{item_name}'."
# First, check for spelling issues and compare it to the list of all item names available
try:
await emitter.progress_update(
f"Fetching item names from the database to find a match for '{item_name}'"
)
all_names = await get_information().fetch_all_item_names()
# The names are returned as a single string, split it into a list
names_list = all_names.splitlines()
best_match = process.extractOne(item_name, names_list)
if best_match and best_match[1] > 60:
matched_item_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{item_name}': {matched_item_name}"
)
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy and sell prices for '{matched_item_name}'"
)
cursor.execute(
"SELECT price_buy, price_sell, terminal_name, item_name FROM item_prices WHERE item_name = ?",
(matched_item_name,),
)
await emitter.progress_update(
f"Processing results for '{matched_item_name}'"
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = (
"Not buyable" if int(row[0]) == 0 else f"{int(row[0])} aUEC"
)
sell_price = (
"not sellable"
if int(row[1]) == 0
else f"{int(row[1])} aUEC"
)
output_lines.append(
f"Item: {row[3]}, Buy Price: {buy_price}, Sell Price: {sell_price}, Terminal: {row[2]}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy and sell prices for '{matched_item_name}'"
)
else:
result_string = f"No price data found for '{matched_item_name}'."
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for item '{item_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching information for {item_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
return result_string
async def get_ship_owners(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches the owners of a specific ship from the fleet.db sqlite database.
ship_name: The name of the ship to fetch owners for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No owners found for ship '{ship_name}'."
try:
await emitter.progress_update(
f"Fetching owners for ship '{ship_name}' from the database"
)
available_ships = (
await get_information().get_all_ship_names_from_fleetyard_db()
)
# The names are returned as a single string, split it into a list
ships_list = available_ships.splitlines()
best_match = process.extractOne(ship_name, ships_list)
if best_match and best_match[1] > 60:
matched_ship_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{ship_name}': {matched_ship_name}"
)
print(f'found a close match for "{ship_name}": {matched_ship_name}')
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching owners for ship '{matched_ship_name}' from the database"
)
cursor.execute(
"SELECT manufacturerName, name, usernames FROM ship_owner_list WHERE name = ?",
(matched_ship_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
owners = [row[2] for row in rows]
manufacturer_name = rows[0][0]
matched_ship_name = rows[0][1]
result_string = f"Report these to the user in a bulletpoint list:\nOwners of ship {manufacturer_name} {matched_ship_name}: {', '.join(owners)}"
except Exception as e:
error_message = (
f"An error occurred while fetching owners for {ship_name}: {str(e)}"
)
await emitter.error_update(error_message)
result_string = error_message
await emitter.progress_update(result_string)
print(result_string)
return result_string
async def list_purchasable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all buyable ships, their prices, and locations from the buyable_ships.db database.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update("Fetching purchasable ships from the database...")
location_data = {}
final_output = "No purchasable ships found in the database."
try:
conn = sqlite3.connect(self.db_path + "/buyable_ships.db")
cursor = conn.cursor()
cursor.execute(
"SELECT vehicle_name, price_buy, terminal_name FROM buyable_ships ORDER BY terminal_name, vehicle_name"
)
rows = cursor.fetchall()
conn.close()
if not rows:
await emitter.error_update(
"No purchasable ships found in the database."
)
print(final_output)
return final_output
await emitter.progress_update("Processing ship data...")
for row in rows:
ship_name, price, location = row
if location not in location_data:
location_data[location] = []
location_data[location].append({"name": ship_name, "price": price})
output_lines = []
for location, ships in sorted(location_data.items()):
output_lines.append(f"\n--- Ingame Shop: {location} ---")
for ship in ships:
output_lines.append(
f" - {ship['name']}: {int(ship['price'])} aUEC"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found purchasable ships at {len(location_data)} locations."
)
except sqlite3.Error as e:
error_message = f"Database error while fetching purchasable ships: {e}"
await emitter.error_update(error_message)
final_output = error_message
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
await emitter.error_update(error_message)
final_output = error_message
print(final_output)
return final_output
async def list_rentable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all rentable ships, their prices, and locations from the Star Citizen Tools wiki.
"""
emitter = EventEmitter(__event_emitter__)
api_url = "https://starcitizen.tools/api.php"
ship_prices = {}
ship_locations = {}
page_title = "Ship_renting"
await emitter.progress_update(f"Fetching data from {page_title}...")
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text",
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
await emitter.error_update(
f"API Error for {page_title}: {data['error']['info']}"
)
return
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
await emitter.error_update(f"No content found for {page_title}.")
return
await emitter.progress_update(f"Parsing data from {page_title}...")
soup = BeautifulSoup(html_content, "html.parser")
tables = soup.find_all("table", class_="wikitable")
for table in tables:
header_row = table.find("tr")
if not header_row:
continue
headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
rows = table.find_all("tr")[1:]
# Table 1: Ship rental prices
if "1 Day" in headers and "Location" in headers:
for row in rows:
cells = row.find_all("td")
if len(cells) < 8:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
ship_prices[ship_name] = {
"1_day": cells[3].get_text(strip=True),
"3_days": cells[4].get_text(strip=True),
"7_days": cells[5].get_text(strip=True),
"30_days": cells[6].get_text(strip=True),
}
# Table 2: Ship rental locations
elif "Area18" in headers:
location_headers = headers[3:]
for row in rows:
cells = row.find_all("td")
if len(cells) < 4:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
if ship_name not in ship_locations:
ship_locations[ship_name] = []
for i, cell in enumerate(cells[3:]):
if "" in cell.get_text():
ship_locations[ship_name].append(location_headers[i])
await emitter.success_update(f"Successfully processed {page_title}.")
except requests.exceptions.RequestException as e:
await emitter.error_update(f"Error fetching data for {page_title}: {e}")
except json.JSONDecodeError:
await emitter.error_update(f"Error decoding JSON for {page_title}.")
output_lines = []
for ship_name, locations in sorted(ship_locations.items()):
if not locations:
continue
output_lines.append(f"\n--- {ship_name} ---")
output_lines.append("Rentable at:")
prices = ship_prices.get(ship_name, {})
for location in locations:
output_lines.append(f" - Location: {location}")
if prices:
output_lines.append(
f" - 1 Day: {prices.get('1_day', 'N/A')}, 3 Days: {prices.get('3_days', 'N/A')}, 7 Days: {prices.get('7_days', 'N/A')}, 30 Days: {prices.get('30_days', 'N/A')}"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found {len(ship_locations)} unique rentable ships."
)
print(final_output)
return final_output
async def fetch_wikelo_information(
self,
mission_ship_or_armor_name: str,
__event_emitter__: Callable[[dict], Any] = None,
):
"""
Retrieves all columns for the Wikelo entry whose *mission* or *ship_name*
best matches ``mission_ship_name`` using fuzzy search.
Parameters
----------
mission_ship_name : str
The string to match against the `missions` and `ship_name`
fields of the table.
__event_emitter__ : Callable[[dict], Any] | None
Optional async callback that receives status updates.
Returns
-------
str
A formatted summary of the matched entry, or an error message.
"""
emitter = EventEmitter(__event_emitter__)
try:
await emitter.progress_update(
f"Searching Wikelo database for '{mission_ship_or_armor_name}'"
)
# ------------------------------------------------------------------
# 1. Pull all rows from the database
# ------------------------------------------------------------------
conn = sqlite3.connect(self.db_path + "/wikelo_crafting.db")
cursor = conn.cursor()
cursor.execute(
"""
SELECT missions, ship_name, components, costs, rewards, armor_name
FROM Wikelo_information
"""
)
rows = cursor.fetchall()
conn.close()
if not rows:
await emitter.error_update("No entries found in the Wikelo database.")
return "No entries found."
# ------------------------------------------------------------------
# 2. Build a searchable list that keeps a reference to each row.
# We create two separate search items per row one for
# the mission and one for the ship name.
# ------------------------------------------------------------------
search_items = [] # List of tuples: (search_string, label, full_row)
for row in rows:
mission, ship_name, components, costs, rewards, armor_name = row
search_items.append((mission, "mission", row))
search_items.append((ship_name, "ship", row))
search_items.append((armor_name, "armor", row))
# ------------------------------------------------------------------
# 3. Find the best fuzzy match against all searchable strings.
# ------------------------------------------------------------------
best_match = process.extractOne(
mission_ship_or_armor_name,
[item[0] for item in search_items],
)
if not best_match:
await emitter.error_update(f"No close match found for '{mission_ship_or_armor_name}'.")
return f"No close match found for '{mission_ship_or_armor_name}'."
matched_string, score = best_match
# ------------------------------------------------------------------
# 4. Retrieve the full row that produced the matched string.
# ------------------------------------------------------------------
matched_row = None
for text, label, row in search_items:
if text == matched_string:
matched_row = row
break
if not matched_row:
await emitter.error_update("Matched entry could not be found.")
return "Error: matched entry could not be located."
# ------------------------------------------------------------------
# 5. Build the output string.
# ------------------------------------------------------------------
mission, ship_name, components, costs, rewards, armor_name = matched_row
result_text = (
f"**Mission:** {mission}\n"
f"**Ship Name:** {ship_name}\n"
f"**Components:** {components}\n"
f"**Costs:** {costs}\n"
f"**Rewards:** {rewards}\n"
f"**Armor Name:** {armor_name}"
)
await emitter.success_update(
f"Found a close match ({matched_string}) with score {score}."
)
except Exception as exc:
result_text = f"Error retrieving Wikelo information: {exc}"
await emitter.error_update(result_text)
print(result_text)
return "If possible, also mention where to find the materials needed.\n" + result_text
if __name__ == "__main__":
info_printer = Tools()
asyncio.run(info_printer.fetch_wikelo_information("Space Navy"))