#!/usr/bin/env python3 import requests, asyncio, json, sqlite3 from bs4 import BeautifulSoup from fuzzywuzzy import process from typing import Callable, Any class EventEmitter: def __init__(self, event_emitter: Callable[[dict], Any] = None): self.event_emitter = event_emitter async def progress_update(self, description): await self.emit(description) async def error_update(self, description): await self.emit(description, "error", True) async def success_update(self, description): await self.emit(description, "success", True) async def emit(self, description="Unknown State", status="in_progress", done=False): if self.event_emitter: await self.event_emitter( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) class get_information: def __init__(self): self.base_url = "https://starcitizen.tools" self.db_path = "/app/sc_databases" async def get_all_vehicle_names(self): """Fetches all vehicle names from the list of pledge vehicles using the MediaWiki API.""" api_url = f"{self.base_url}/api.php" vehicle_names = [] categories = ["Category:Pledge ships", "Category:Pledge vehicles"] for category in categories: params = { "action": "query", "format": "json", "list": "categorymembers", "cmtitle": category, "cmlimit": "max", # Use max limit (500) "cmprop": "title", } while True: try: response = await asyncio.to_thread( requests.get, api_url, params=params ) response.raise_for_status() data = response.json() if "query" in data and "categorymembers" in data["query"]: for member in data["query"]["categorymembers"]: vehicle_names.append(member["title"]) # Check for continuation to get the next page of results if "continue" in data and "cmcontinue" in data["continue"]: params["cmcontinue"] = data["continue"]["cmcontinue"] else: break # No more pages except requests.exceptions.RequestException as e: print(f"Error fetching vehicle list for {category}: {e}") break # Stop processing this category except json.JSONDecodeError: print(f"Error decoding JSON from response for {category}.") break # Stop processing this category if not vehicle_names: print("No vehicle names found.") return [] # Remove duplicates and sort the list return sorted(list(set(vehicle_names))) async def get_closest_vehicle_name(self, vehicle_name): """Finds the closest matching vehicle name using fuzzy matching.""" all_vehicle_names = await self.get_all_vehicle_names() # print(f"Total vehicle names found: {len(all_vehicle_names)}") if not all_vehicle_names: return None closest_name, _ = process.extractOne(vehicle_name, all_vehicle_names) return closest_name async def fetch_infos(self, ship_name): """Fetches ship information from the Star Citizen wiki using the MediaWiki API.""" closest_name = await self.get_closest_vehicle_name(ship_name) if not closest_name: print(f"No matching vehicle found for {ship_name}.") return None # Use the closest name found for the API call page_title = closest_name.replace(" ", "_") api_url = f"{self.base_url}/api.php" params = { "action": "parse", "page": page_title, "format": "json", "prop": "text", # We only need the parsed HTML content } try: response = await asyncio.to_thread(requests.get, api_url, params=params) response.raise_for_status() data = response.json() if "error" in data: print(f"API Error for {page_title}: {data['error']['info']}") return None html_content = data.get("parse", {}).get("text", {}).get("*", "") if not html_content: print(f"No content found for {page_title}.") return None except requests.exceptions.RequestException as e: print(f"Error fetching data for {page_title}: {e}") return None except json.JSONDecodeError: print(f"Error decoding JSON from response for {page_title}.") return None soup = BeautifulSoup(html_content, "html.parser") info = {} # Extracting ship information from the parsed HTML info["general"] = await self._extract_infobox_data(soup) info["specifications"] = await self._extract_specifications(soup) return info async def _extract_infobox_data(self, soup): """Extracts data from the infobox.""" infobox_data = {} infobox = soup.find("details", class_="infobox") if not infobox: return infobox_data items = infobox.find_all("div", class_="infobox__item") for item in items: label_tag = item.find("div", class_="infobox__label") data_tag = item.find("div", class_="infobox__data") if label_tag and data_tag: label = label_tag.get_text(strip=True) # For loaners, get all ship names if "loaner" in label.lower(): value = [a.get_text(strip=True) for a in data_tag.find_all("a")] else: value = data_tag.get_text(separator=" ", strip=True) infobox_data[label] = value return infobox_data async def _extract_specifications(self, soup): """Extracts data from the specifications tabs.""" specifications = {} # Find all specification tabs like "Avionics & Systems", "Weaponry", etc. tabs = soup.select("div.tabber > section > article.tabber__panel") for panel in tabs: panel_id = panel.get("id", "") tab_name_tag = soup.find("a", {"aria-controls": panel_id}) if not tab_name_tag: continue tab_name = tab_name_tag.get_text(strip=True) specifications[tab_name] = {} # Find all component groups in the panel component_groups = panel.find_all( "div", class_="template-components__section" ) for group in component_groups: label_tag = group.find("div", class_="template-components__label") if not label_tag: continue category = label_tag.get_text(strip=True) components = [] # Find all component cards in the group component_cards = group.select(".template-component__card") for card in component_cards: count_tag = card.select_one(".template-component__count") size_tag = card.select_one(".template-component__size") title_tag = card.select_one(".template-component__title") if count_tag and size_tag and title_tag: count = count_tag.get_text(strip=True) size = size_tag.get_text(strip=True) title = title_tag.get_text(strip=True) components.append(f"{count} {size} {title}") if components: # If the category already exists, append to it (for Thrusters) if category in specifications[tab_name]: specifications[tab_name][category].extend(components) else: specifications[tab_name][category] = components return specifications async def fetch_all_commodity_names(self): """ Fetches all commodity names from the database and sort them uniquely and returns a string. """ conn = sqlite3.connect(self.db_path + "/commodities.db") cursor = conn.cursor() cursor.execute("SELECT DISTINCT commodity_name FROM commodity_prices") rows = cursor.fetchall() conn.close() return_string = "\n".join([row[0] for row in rows]) return return_string async def fetch_all_item_names(self): """ Fetches all item names from the database and sort them uniquely and returns a string. """ conn = sqlite3.connect(self.db_path + "/items.db") cursor = conn.cursor() cursor.execute("SELECT DISTINCT item_name FROM item_prices") rows = cursor.fetchall() conn.close() return_string = "\n".join([row[0] for row in rows]) return return_string async def get_all_ship_names_from_fleetyard_db(self): """ Fetches all ship names from the fleet.db database and returns a string. """ conn = sqlite3.connect(self.db_path + "/fleet.db") cursor = conn.cursor() cursor.execute("SELECT DISTINCT name FROM ship_owner_list") rows = cursor.fetchall() conn.close() return_string = "\n".join([row[0] for row in rows]) return return_string class Tools: def __init__(self): self.db_path = "/app/sc_databases" async def get_ship_details( self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None ): emitter = EventEmitter(__event_emitter__) # The API call in fetch_infos now handles fuzzy matching and name formatting. # ship_name = await get_information().get_closest_vehicle_name(ship_name) # ship_name = ship_name.title().replace(" ", "_") await emitter.progress_update("Fetching ship information for " + ship_name) info = await get_information().fetch_infos(ship_name) if info: await emitter.success_update( "Successfully fetched ship information for " + ship_name ) await emitter.progress_update("Processing retrieved information...") output_lines = [] # Build the output string output_lines.append(f"Information for {ship_name}:") if info.get("general"): await emitter.progress_update("Processing general information...") output_lines.append("\n--- General Information ---") for key, value in info["general"].items(): if isinstance(value, list): output_lines.append(f"{key}: {', '.join(value)}") else: if "Size" in key: # Only print the first word for size-related keys value = value.split()[0] if value else "" if "Stowage" in key: # Replace 'Stowage' with 'Storage': key = key.replace("Stowage", "Storage") output_lines.append(f"{key}: {value}") if info.get("specifications"): await emitter.progress_update("Processing specifications...") output_lines.append("\n--- Specifications ---") for spec_area, details in info["specifications"].items(): if not details: continue output_lines.append(f"\n[{spec_area}]") for category, items in details.items(): output_lines.append(f" {category}:") for item in items: output_lines.append(f" - {item}") final_output = "\n".join(output_lines) print(final_output) await emitter.success_update(final_output) return final_output else: error_message = f"No information found for {ship_name}." print(error_message) await emitter.error_update(error_message) return error_message async def compare_ships( self, ship_name1: str, ship_name2: str, __event_emitter__: Callable[[dict], Any] = None, ): # ship_name1 = ship_name1.title().replace(" ", "_") # ship_name2 = ship_name2.title().replace(" ", "_") emitter = EventEmitter(__event_emitter__) await emitter.progress_update( f"Fetching ship information for {ship_name1} and {ship_name2}" ) info1 = await get_information().fetch_infos(ship_name1) if info1: await emitter.success_update( f"Successfully fetched ship information for {ship_name1}" ) output_lines = [f"Information for {ship_name1}:"] if info1.get("general"): await emitter.progress_update( "Processing general information for " + ship_name1 ) output_lines.append("\n--- General Information ---") for key, value in info1["general"].items(): if isinstance(value, list): output_lines.append(f"{key}: {', '.join(value)}") else: if "Size" in key: value = value.split()[0] if value else "" if "Stowage" in key: key = key.replace("Stowage", "Storage") output_lines.append(f"{key}: {value}") if info1.get("specifications"): await emitter.progress_update( "Processing specifications for " + ship_name1 ) output_lines.append("\n--- Specifications ---") for spec_area, details in info1["specifications"].items(): if not details: continue output_lines.append(f"\n[{spec_area}]") for category, items in details.items(): output_lines.append(f" {category}:") for item in items: output_lines.append(f" - {item}") final_output1 = "\n".join(output_lines) info2 = await get_information().fetch_infos(ship_name2) if info2: await emitter.success_update( f"Successfully fetched ship information for {ship_name2}" ) output_lines = [f"Information for {ship_name2}:"] if info2.get("general"): await emitter.progress_update( "Processing general information for " + ship_name2 ) output_lines.append("\n--- General Information ---") for key, value in info2["general"].items(): if isinstance(value, list): output_lines.append(f"{key}: {', '.join(value)}") else: if "Size" in key: value = value.split()[0] if value else "" if "Stowage" in key: key = key.replace("Stowage", "Storage") output_lines.append(f"{key}: {value}") if info2.get("specifications"): await emitter.progress_update( "Processing specifications for " + ship_name2 ) output_lines.append("\n--- Specifications ---") for spec_area, details in info2["specifications"].items(): if not details: continue output_lines.append(f"\n[{spec_area}]") for category, items in details.items(): output_lines.append(f" {category}:") for item in items: output_lines.append(f" - {item}") final_output2 = "\n".join(output_lines) await emitter.success_update(final_output2) print(final_output1 + "\n\n" + final_output2) return final_output1 + "\n\n" + final_output2 async def get_commodity_prices( self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None ): """ Fetch commodities from the database by name. commodity_name: The name of the commodity to fetch. """ emitter = EventEmitter(__event_emitter__) result_string = f"No information found for commodity '{commodity_name}'." # First, check for spelling issues and compare it to the list of all commodity names available try: await emitter.progress_update( f"Fetching commodity names from the database to find a match for '{commodity_name}'" ) all_names = await get_information().fetch_all_commodity_names() # The names are returned as a single string, split it into a list names_list = all_names.splitlines() best_match = process.extractOne(commodity_name, names_list) if ( best_match and best_match[1] > 60 ): # If the match is above 60% confidence matched_commodity_name = best_match[0] await emitter.success_update( f"Found a close match for '{commodity_name}': {matched_commodity_name}" ) conn = sqlite3.connect(self.db_path + "/commodities.db") cursor = conn.cursor() await emitter.progress_update( f"Fetching buy and sell prices for '{matched_commodity_name}'" ) cursor.execute( "SELECT price_buy, price_sell, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ?", (matched_commodity_name,), ) await emitter.progress_update( f"Processing results for '{matched_commodity_name}'" ) rows = cursor.fetchall() conn.close() if rows: output_lines = [] for row in rows: buy_price = ( "Not buyable" if int(row[0]) == 0 else f"{int(row[0])} aUEC" ) sell_price = ( "not sellable" if int(row[1]) == 0 else f"{int(row[1])} aUEC" ) output_lines.append( f"Item: {row[3]}, Buy Price: {buy_price} aUEC, Sell Price: {sell_price} aUEC, Terminal: {row[2]}" ) result_string = "\n".join(output_lines) await emitter.success_update( f"Successfully fetched buy and sell prices for '{matched_commodity_name}'" ) else: result_string = ( f"No price data found for '{matched_commodity_name}'." ) await emitter.error_update(result_string) else: result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence." await emitter.error_update(result_string) except Exception as e: error_message = f"An error occurred while fetching information for {commodity_name}: {str(e)}" await emitter.error_update(error_message) result_string = error_message print(result_string) return result_string async def get_item_prices( self, item_name: str, __event_emitter__: Callable[[dict], Any] = None ): """ Fetch item prices from the database by name. item_name: The name of the item to fetch. """ emitter = EventEmitter(__event_emitter__) result_string = f"No information found for item '{item_name}'." # First, check for spelling issues and compare it to the list of all item names available try: await emitter.progress_update( f"Fetching item names from the database to find a match for '{item_name}'" ) all_names = await get_information().fetch_all_item_names() # The names are returned as a single string, split it into a list names_list = all_names.splitlines() best_match = process.extractOne(item_name, names_list) if best_match and best_match[1] > 60: matched_item_name = best_match[0] await emitter.success_update( f"Found a close match for '{item_name}': {matched_item_name}" ) conn = sqlite3.connect(self.db_path + "/items.db") cursor = conn.cursor() await emitter.progress_update( f"Fetching buy and sell prices for '{matched_item_name}'" ) cursor.execute( "SELECT price_buy, price_sell, terminal_name, item_name FROM item_prices WHERE item_name = ?", (matched_item_name,), ) await emitter.progress_update( f"Processing results for '{matched_item_name}'" ) rows = cursor.fetchall() conn.close() if rows: output_lines = [] for row in rows: buy_price = ( "Not buyable" if int(row[0]) == 0 else f"{int(row[0])} aUEC" ) sell_price = ( "not sellable" if int(row[1]) == 0 else f"{int(row[1])} aUEC" ) output_lines.append( f"Item: {row[3]}, Buy Price: {buy_price}, Sell Price: {sell_price}, Terminal: {row[2]}" ) result_string = "\n".join(output_lines) await emitter.success_update( f"Successfully fetched buy and sell prices for '{matched_item_name}'" ) else: result_string = f"No price data found for '{matched_item_name}'." await emitter.error_update(result_string) else: result_string = f"Could not find a confident match for item '{item_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence." await emitter.error_update(result_string) except Exception as e: error_message = f"An error occurred while fetching information for {item_name}: {str(e)}" await emitter.error_update(error_message) result_string = error_message print(result_string) return result_string async def get_ship_owners(self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None ): """ Fetches the owners of a specific ship from the fleet.db sqlite database. ship_name: The name of the ship to fetch owners for. """ emitter = EventEmitter(__event_emitter__) result_string = f"No owners found for ship '{ship_name}'." try: await emitter.progress_update( f"Fetching owners for ship '{ship_name}' from the database" ) available_ships = await get_information().get_all_ship_names_from_fleetyard_db() # The names are returned as a single string, split it into a list ships_list = available_ships.splitlines() best_match = process.extractOne(ship_name, ships_list) if best_match and best_match[1] > 60: matched_ship_name = best_match[0] await emitter.success_update( f"Found a close match for '{ship_name}': {matched_ship_name}" ) print(f'found a close match for "{ship_name}": {matched_ship_name}') conn = sqlite3.connect(self.db_path + "/fleet.db") cursor = conn.cursor() await emitter.progress_update( f"Fetching owners for ship '{matched_ship_name}' from the database" ) cursor.execute( "SELECT manufacturerName, name, usernames FROM ship_owner_list WHERE name = ?", (matched_ship_name,), ) rows = cursor.fetchall() conn.close() if rows: owners = [row[2] for row in rows] manufacturer_name = rows[0][0] matched_ship_name = rows[0][1] result_string = f"Please report these to the user in a bulletpoint list:\nOwners of ship {manufacturer_name} {matched_ship_name}: {', '.join(owners)}" except Exception as e: error_message = f"An error occurred while fetching owners for {ship_name}: {str(e)}" await emitter.error_update(error_message) result_string = error_message await emitter.progress_update(result_string) print(result_string) return result_string async def list_purchasable_ships( self, __event_emitter__: Callable[[dict], Any] = None ): """ Fetches all buyable ships, their prices, and locations from the Star Citizen Tools wiki. """ emitter = EventEmitter(__event_emitter__) api_url = "https://starcitizen.tools/api.php" ship_data = {} page_title = "Purchasing_ships" await emitter.progress_update(f"Fetching data from {page_title}...") params = { "action": "parse", "page": page_title, "format": "json", "prop": "text", } try: response = await asyncio.to_thread(requests.get, api_url, params=params) response.raise_for_status() data = response.json() if "error" in data: await emitter.error_update( f"API Error for {page_title}: {data['error']['info']}" ) return html_content = data.get("parse", {}).get("text", {}).get("*", "") if not html_content: await emitter.error_update(f"No content found for {page_title}.") return await emitter.progress_update(f"Parsing data from {page_title}...") soup = BeautifulSoup(html_content, "html.parser") tables = soup.find_all("table", class_="wikitable") for table in tables: header_row = table.find("tr") if not header_row: continue headers = [th.get_text(strip=True) for th in header_row.find_all("th")] rows = table.find_all("tr")[1:] for row in rows: cells = row.find_all("td") if not cells or len(cells) < 3: continue ship_name_tag = cells[1].find("a") if not ship_name_tag or not ship_name_tag.get("title"): continue ship_name = ship_name_tag.get("title").strip() price = cells[2].get_text(strip=True) if ship_name not in ship_data: ship_data[ship_name] = [] location_headers = headers[3:] for i, cell in enumerate(cells[3:]): if "✔" in cell.get_text(): location = location_headers[i] ship_data[ship_name].append( {"price": price + " aUEC (alpha United Earth Credits)", "location": location} ) await emitter.success_update(f"Successfully processed {page_title}.") except requests.exceptions.RequestException as e: await emitter.error_update(f"Error fetching data for {page_title}: {e}") except json.JSONDecodeError: await emitter.error_update(f"Error decoding JSON for {page_title}.") output_lines = [] for ship_name, locations in sorted(ship_data.items()): output_lines.append(f"\n--- {ship_name} ---") output_lines.append("Buyable at:") for item in locations: output_lines.append( f" - Location: {item['location']}, Price: {item['price']}" ) final_output = "\n".join(output_lines) await emitter.success_update(f"Found {len(ship_data)} unique buyable ships.") print(final_output) return final_output async def list_rentable_ships( self, __event_emitter__: Callable[[dict], Any] = None ): """ Fetches all rentable ships, their prices, and locations from the Star Citizen Tools wiki. """ emitter = EventEmitter(__event_emitter__) api_url = "https://starcitizen.tools/api.php" ship_prices = {} ship_locations = {} page_title = "Ship_renting" await emitter.progress_update(f"Fetching data from {page_title}...") params = { "action": "parse", "page": page_title, "format": "json", "prop": "text", } try: response = await asyncio.to_thread(requests.get, api_url, params=params) response.raise_for_status() data = response.json() if "error" in data: await emitter.error_update( f"API Error for {page_title}: {data['error']['info']}" ) return html_content = data.get("parse", {}).get("text", {}).get("*", "") if not html_content: await emitter.error_update(f"No content found for {page_title}.") return await emitter.progress_update(f"Parsing data from {page_title}...") soup = BeautifulSoup(html_content, "html.parser") tables = soup.find_all("table", class_="wikitable") for table in tables: header_row = table.find("tr") if not header_row: continue headers = [th.get_text(strip=True) for th in header_row.find_all("th")] rows = table.find_all("tr")[1:] # Table 1: Ship rental prices if "1 Day" in headers and "Location" in headers: for row in rows: cells = row.find_all("td") if len(cells) < 8: continue ship_name_tag = cells[1].find("a") if not ship_name_tag or not ship_name_tag.get("title"): continue ship_name = ship_name_tag.get("title").strip() ship_prices[ship_name] = { "1_day": cells[3].get_text(strip=True), "3_days": cells[4].get_text(strip=True), "7_days": cells[5].get_text(strip=True), "30_days": cells[6].get_text(strip=True), } # Table 2: Ship rental locations elif "Area18" in headers: location_headers = headers[3:] for row in rows: cells = row.find_all("td") if len(cells) < 4: continue ship_name_tag = cells[1].find("a") if not ship_name_tag or not ship_name_tag.get("title"): continue ship_name = ship_name_tag.get("title").strip() if ship_name not in ship_locations: ship_locations[ship_name] = [] for i, cell in enumerate(cells[3:]): if "✔" in cell.get_text(): ship_locations[ship_name].append(location_headers[i]) await emitter.success_update(f"Successfully processed {page_title}.") except requests.exceptions.RequestException as e: await emitter.error_update(f"Error fetching data for {page_title}: {e}") except json.JSONDecodeError: await emitter.error_update(f"Error decoding JSON for {page_title}.") output_lines = [] for ship_name, locations in sorted(ship_locations.items()): if not locations: continue output_lines.append(f"\n--- {ship_name} ---") output_lines.append("Rentable at:") prices = ship_prices.get(ship_name, {}) for location in locations: output_lines.append(f" - Location: {location}") if prices: output_lines.append( f" - 1 Day: {prices.get('1_day', 'N/A')}, 3 Days: {prices.get('3_days', 'N/A')}, 7 Days: {prices.get('7_days', 'N/A')}, 30 Days: {prices.get('30_days', 'N/A')}" ) final_output = "\n".join(output_lines) await emitter.success_update( f"Found {len(ship_locations)} unique rentable ships." ) print(final_output) return final_output if __name__ == "__main__": info_printer = Tools() asyncio.run(info_printer.get_ship_owners("Perseus"))