Initial release of scripts and knowledge base

This commit is contained in:
2025-08-15 11:04:40 +02:00
parent 3c5b7c63ab
commit 027de4192c
24 changed files with 3307 additions and 1 deletions

108
llm_tools/fleetyard.py Normal file
View File

@@ -0,0 +1,108 @@
#!/usr/bin/env python3
import requests
import sqlite3
import configparser
from collections import defaultdict
import json
# Testing out the fleetyard API
class FleetyardAPI:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def create_session(self, ini_path="fleetyard_login.ini"):
"""Create a new session using credentials from an INI file."""
config = configparser.ConfigParser()
config.read(ini_path)
login = config.get('login', 'username', fallback=None)
password = config.get('login', 'password', fallback=None)
remember_me = config.getboolean('login', 'rememberMe', fallback=True)
if not login or not password:
raise ValueError("Missing login or password in fleetyard_login.ini")
payload = {
"login": login,
"password": password,
"rememberMe": remember_me,
}
response = self.session.post(f"{self.base_url}/sessions", json=payload)
response.raise_for_status()
return response.json()
def get_igns_fleet(self):
"""Get the IGNs fleet."""
fleet_url = f"{self.base_url}/fleets/igns/vehicles/export"
response = self.session.get(fleet_url)
response.raise_for_status()
return response.json()
def process_fleet_data(fleet_data):
"""Groups fleet data by ship and aggregates owners."""
ship_owners = defaultdict(list)
for ship in fleet_data:
if "username" in ship:
key = (ship["manufacturerName"], ship["name"])
ship_owners[key].append(ship["username"])
return ship_owners
def store_in_database(ship_owners):
"""Stores the processed fleet data in a SQLite database."""
db_file = "fleet.db"
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS ship_owner_list (
manufacturerName TEXT,
name TEXT,
usernames TEXT,
PRIMARY KEY (manufacturerName, name)
)
''')
# Clear existing data to prevent duplicates on re-runs
cursor.execute('DELETE FROM ship_owner_list')
# Insert new data
for (manufacturer, ship_name), owners in ship_owners.items():
usernames_str = ", ".join(sorted(owners))
cursor.execute(
"INSERT INTO ship_owner_list (manufacturerName, name, usernames) VALUES (?, ?, ?)",
(manufacturer, ship_name, usernames_str)
)
conn.commit()
conn.close()
print(f"Data successfully stored in {db_file}")
if __name__ == "__main__":
base_url = "https://api.fleetyards.net/v1"
api = FleetyardAPI(base_url)
try:
# Create a session
session_response = api.create_session()
print("Session created.")
# Get the IGNs fleet
if session_response.get("code") == "success":
fleet_json = api.get_igns_fleet()
print("IGNs fleet data retrieved.")
# Process the data
processed_data = process_fleet_data(fleet_json)
# Store in database
store_in_database(processed_data)
else:
print("Failed to create session, cannot retrieve fleet.")
except requests.exceptions.HTTPError as e:
print(f"An HTTP error occurred: {e}")
print(f"Response body: {e.response.text}")
except requests.exceptions.RequestException as e:
print(f"A request error occurred: {e}")

View File

@@ -0,0 +1,168 @@
import requests
import sqlite3
import time
import schedule
from datetime import datetime
# --- Configuration ---
API_URL = "https://api.uexcorp.space/2.0/commodities_prices_all"
with open("uex_api_key", "r") as f:
BEARER_TOKEN = f.read().strip()
DB_NAME = "commodities.db"
TABLE_NAME = "commodity_prices"
def setup_database():
"""
Sets up the SQLite database and creates the table if it doesn't exist.
The table uses a composite primary key (id_commodity, id_terminal)
to ensure each commodity at each terminal has only one latest entry.
"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# Using "IF NOT EXISTS" prevents errors on subsequent runs.
# The schema is derived from your provided image.
# We use INSERT OR REPLACE later, so a primary key is important.
# (id_commodity, id_terminal) is a good candidate for a unique key.
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id INTEGER,
id_commodity INTEGER,
id_terminal INTEGER,
price_buy REAL,
price_buy_avg REAL,
price_sell REAL,
price_sell_avg REAL,
scu_buy REAL,
scu_buy_avg REAL,
scu_sell_stock REAL,
scu_sell_stock_avg REAL,
scu_sell REAL,
scu_sell_avg REAL,
status_buy INTEGER,
status_sell INTEGER,
date_added INTEGER,
date_modified INTEGER,
commodity_name TEXT,
commodity_code TEXT,
commodity_slug TEXT,
terminal_name TEXT,
terminal_code TEXT,
terminal_slug TEXT,
PRIMARY KEY (id_commodity, id_terminal)
)
''')
conn.commit()
conn.close()
print("Database setup complete. Table 'commodity_prices' is ready.")
def fetch_data_from_api():
"""
Fetches the latest commodity data from the UAX Corp API.
Returns the data as a list of dictionaries or None if an error occurs.
"""
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}"
}
try:
response = requests.get(API_URL, headers=headers)
# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()
data = response.json()
if 'data' in data:
return data['data']
else:
# Handle cases where the structure might be flat
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching data from API: {e}")
return None
def save_data_to_db(data):
"""
Saves the fetched data into the SQLite database.
Uses 'INSERT OR REPLACE' to update existing records for a
commodity/terminal pair or insert new ones.
"""
if not data:
print("No data to save.")
return
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# Prepare data for insertion
records_to_insert = []
for item in data:
# The order of values must match the table schema
records_to_insert.append((
item.get('id'),
item.get('id_commodity'),
item.get('id_terminal'),
item.get('price_buy'),
item.get('price_buy_avg'),
item.get('price_sell'),
item.get('price_sell_avg'),
item.get('scu_buy'),
item.get('scu_buy_avg'),
item.get('scu_sell_stock'),
item.get('scu_sell_stock_avg'),
item.get('scu_sell'),
item.get('scu_sell_avg'),
item.get('status_buy'),
item.get('status_sell'),
item.get('date_added'),
item.get('date_modified'),
item.get('commodity_name'),
item.get('commodity_code'),
item.get('commodity_slug'),
item.get('terminal_name'),
item.get('terminal_code'),
item.get('terminal_slug')
))
# Using executemany is much more efficient than one by one
sql_statement = f'''
INSERT OR REPLACE INTO {TABLE_NAME} (
id, id_commodity, id_terminal, price_buy, price_buy_avg, price_sell,
price_sell_avg, scu_buy, scu_buy_avg, scu_sell_stock, scu_sell_stock_avg,
scu_sell, scu_sell_avg, status_buy, status_sell, date_added, date_modified,
commodity_name, commodity_code, commodity_slug, terminal_name, terminal_code,
terminal_slug
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
'''
cursor.executemany(sql_statement, records_to_insert)
conn.commit()
conn.close()
print(f"Successfully saved/updated {len(records_to_insert)} records to the database.")
def job():
"""The main job function to be scheduled."""
print(f"--- Running job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---")
api_data = fetch_data_from_api()
if api_data:
save_data_to_db(api_data)
print("--- Job finished ---")
if __name__ == "__main__":
# 1. Set up the database and table on the first run.
setup_database()
# 2. Run the job immediately once when the script starts.
job()
# 3. Schedule the job to run every hour.
print(f"Scheduling job to run every hour. Press Ctrl+C to exit.")
schedule.every().hour.do(job)
# 4. Run the scheduler loop.
while True:
schedule.run_pending()
time.sleep(1)

162
llm_tools/get_items.py Normal file
View File

@@ -0,0 +1,162 @@
import requests
import sqlite3
import time
import schedule
from datetime import datetime
# --- Configuration for Item Prices ---
API_URL = "https://api.uexcorp.space/2.0/items_prices_all"
with open("uex_api_key", "r") as f:
BEARER_TOKEN = f.read().strip()
DB_NAME = "items.db" # Using a dedicated DB file
TABLE_NAME = "item_prices"
def setup_item_database():
"""
Sets up the SQLite database and creates the item_prices table if it doesn't exist.
"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# Schema is derived from the new API documentation.
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id INTEGER,
id_item INTEGER,
id_terminal INTEGER,
id_category INTEGER,
price_buy REAL,
price_sell REAL,
date_added INTEGER,
date_modified INTEGER,
item_name TEXT,
item_uuid TEXT,
terminal_name TEXT,
PRIMARY KEY (id_item, id_terminal)
)
''')
conn.commit()
conn.close()
print(f"Database setup complete. Table '{TABLE_NAME}' is ready.")
def fetch_item_data_from_api():
"""
Fetches the latest item price data from the UAX Corp API.
Returns the data as a list of dictionaries or None if an error occurs.
"""
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}"
}
try:
response = requests.get(API_URL, headers=headers)
response.raise_for_status() # Check for HTTP errors
data = response.json()
if 'data' in data:
return data['data']
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching item data from API: {e}")
return None
def sync_data_with_db(data):
"""
Synchronizes the database with the fetched API data.
It de-duplicates the source data, then deletes all old entries and
inserts the new ones in a single transaction.
"""
if not data:
print("No data received from API. Database will not be changed.")
return
# --- De-duplication Step ---
# The API is returning duplicates for (id_item, id_terminal).
# We will process the list and keep only the one with the latest 'date_modified'.
unique_items = {}
for item in data:
key = (item.get('id_item'), item.get('id_terminal'))
if key not in unique_items or item.get('date_modified') > unique_items[key].get('date_modified'):
unique_items[key] = item
# The final list of records to insert is the values of our de-duplicated dictionary.
clean_data = list(unique_items.values())
print(f"Received {len(data)} records from API. After de-duplication, {len(clean_data)} unique records will be processed.")
conn = None
try:
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# --- Start Transaction ---
# 1. Delete all existing records from the table.
cursor.execute(f"DELETE FROM {TABLE_NAME}")
print(f"Cleared all old records from '{TABLE_NAME}'.")
# 2. Prepare the new, clean records for insertion.
records_to_insert = []
for item in clean_data: # Use the clean_data list now
records_to_insert.append((
item.get('id'),
item.get('id_item'),
item.get('id_terminal'),
item.get('id_category'),
item.get('price_buy'),
item.get('price_sell'),
item.get('date_added'),
item.get('date_modified'),
item.get('item_name'),
item.get('item_uuid'),
item.get('terminal_name')
))
# 3. Insert all new records.
sql_statement = f'''
INSERT INTO {TABLE_NAME} (
id, id_item, id_terminal, id_category, price_buy, price_sell,
date_added, date_modified, item_name, item_uuid, terminal_name
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
'''
cursor.executemany(sql_statement, records_to_insert)
# --- Commit Transaction ---
conn.commit()
print(f"Successfully synchronized {len(records_to_insert)} records into the database.")
except sqlite3.Error as e:
print(f"Database error: {e}")
if conn:
print("Rolling back changes.")
conn.rollback()
finally:
if conn:
conn.close()
def item_sync_job():
"""The main job function to be scheduled for syncing item prices."""
print(f"--- Running item sync job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---")
api_data = fetch_item_data_from_api()
sync_data_with_db(api_data)
print("--- Item sync job finished ---")
if __name__ == "__main__":
# 1. Set up the database and table on the first run.
setup_item_database()
# 2. Run the job immediately once when the script starts.
item_sync_job()
# 3. Schedule the job to run every hour.
print(f"Scheduling item sync job to run every hour. Press Ctrl+C to exit.")
schedule.every().hour.do(item_sync_job)
# 4. Run the scheduler loop.
while True:
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\nExiting scheduler.")
break

View File

@@ -0,0 +1,4 @@
[login]
username = YOUR_USERNAME
password = YOUR_PASSWORD
rememberMe = true

View File

@@ -0,0 +1 @@
YOUR_API_KEY_HERE

View File

@@ -0,0 +1,792 @@
#!/usr/bin/env python3
import requests, asyncio, json, sqlite3
from bs4 import BeautifulSoup
from fuzzywuzzy import process
from typing import Callable, Any
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class get_information:
def __init__(self):
self.base_url = "https://starcitizen.tools"
self.db_path = "/app/sc_databases"
async def get_all_vehicle_names(self):
"""Fetches all vehicle names from the list of pledge vehicles using the MediaWiki API."""
api_url = f"{self.base_url}/api.php"
vehicle_names = []
categories = ["Category:Pledge ships", "Category:Pledge vehicles"]
for category in categories:
params = {
"action": "query",
"format": "json",
"list": "categorymembers",
"cmtitle": category,
"cmlimit": "max", # Use max limit (500)
"cmprop": "title",
}
while True:
try:
response = await asyncio.to_thread(
requests.get, api_url, params=params
)
response.raise_for_status()
data = response.json()
if "query" in data and "categorymembers" in data["query"]:
for member in data["query"]["categorymembers"]:
vehicle_names.append(member["title"])
# Check for continuation to get the next page of results
if "continue" in data and "cmcontinue" in data["continue"]:
params["cmcontinue"] = data["continue"]["cmcontinue"]
else:
break # No more pages
except requests.exceptions.RequestException as e:
print(f"Error fetching vehicle list for {category}: {e}")
break # Stop processing this category
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {category}.")
break # Stop processing this category
if not vehicle_names:
print("No vehicle names found.")
return []
# Remove duplicates and sort the list
return sorted(list(set(vehicle_names)))
async def get_closest_vehicle_name(self, vehicle_name):
"""Finds the closest matching vehicle name using fuzzy matching."""
all_vehicle_names = await self.get_all_vehicle_names()
# print(f"Total vehicle names found: {len(all_vehicle_names)}")
if not all_vehicle_names:
return None
closest_name, _ = process.extractOne(vehicle_name, all_vehicle_names)
return closest_name
async def fetch_infos(self, ship_name):
"""Fetches ship information from the Star Citizen wiki using the MediaWiki API."""
closest_name = await self.get_closest_vehicle_name(ship_name)
if not closest_name:
print(f"No matching vehicle found for {ship_name}.")
return None
# Use the closest name found for the API call
page_title = closest_name.replace(" ", "_")
api_url = f"{self.base_url}/api.php"
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text", # We only need the parsed HTML content
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
print(f"API Error for {page_title}: {data['error']['info']}")
return None
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
print(f"No content found for {page_title}.")
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching data for {page_title}: {e}")
return None
except json.JSONDecodeError:
print(f"Error decoding JSON from response for {page_title}.")
return None
soup = BeautifulSoup(html_content, "html.parser")
info = {}
# Extracting ship information from the parsed HTML
info["general"] = await self._extract_infobox_data(soup)
info["specifications"] = await self._extract_specifications(soup)
return info
async def _extract_infobox_data(self, soup):
"""Extracts data from the infobox."""
infobox_data = {}
infobox = soup.find("details", class_="infobox")
if not infobox:
return infobox_data
items = infobox.find_all("div", class_="infobox__item")
for item in items:
label_tag = item.find("div", class_="infobox__label")
data_tag = item.find("div", class_="infobox__data")
if label_tag and data_tag:
label = label_tag.get_text(strip=True)
# For loaners, get all ship names
if "loaner" in label.lower():
value = [a.get_text(strip=True) for a in data_tag.find_all("a")]
else:
value = data_tag.get_text(separator=" ", strip=True)
infobox_data[label] = value
return infobox_data
async def _extract_specifications(self, soup):
"""Extracts data from the specifications tabs."""
specifications = {}
# Find all specification tabs like "Avionics & Systems", "Weaponry", etc.
tabs = soup.select("div.tabber > section > article.tabber__panel")
for panel in tabs:
panel_id = panel.get("id", "")
tab_name_tag = soup.find("a", {"aria-controls": panel_id})
if not tab_name_tag:
continue
tab_name = tab_name_tag.get_text(strip=True)
specifications[tab_name] = {}
# Find all component groups in the panel
component_groups = panel.find_all(
"div", class_="template-components__section"
)
for group in component_groups:
label_tag = group.find("div", class_="template-components__label")
if not label_tag:
continue
category = label_tag.get_text(strip=True)
components = []
# Find all component cards in the group
component_cards = group.select(".template-component__card")
for card in component_cards:
count_tag = card.select_one(".template-component__count")
size_tag = card.select_one(".template-component__size")
title_tag = card.select_one(".template-component__title")
if count_tag and size_tag and title_tag:
count = count_tag.get_text(strip=True)
size = size_tag.get_text(strip=True)
title = title_tag.get_text(strip=True)
components.append(f"{count} {size} {title}")
if components:
# If the category already exists, append to it (for Thrusters)
if category in specifications[tab_name]:
specifications[tab_name][category].extend(components)
else:
specifications[tab_name][category] = components
return specifications
async def fetch_all_commodity_names(self):
"""
Fetches all commodity names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT commodity_name FROM commodity_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def fetch_all_item_names(self):
"""
Fetches all item names from the database and sort them uniquely and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT item_name FROM item_prices")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
async def get_all_ship_names_from_fleetyard_db(self):
"""
Fetches all ship names from the fleet.db database and returns a string.
"""
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM ship_owner_list")
rows = cursor.fetchall()
conn.close()
return_string = "\n".join([row[0] for row in rows])
return return_string
class Tools:
def __init__(self):
self.db_path = "/app/sc_databases"
async def get_ship_details(
self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
emitter = EventEmitter(__event_emitter__)
# The API call in fetch_infos now handles fuzzy matching and name formatting.
# ship_name = await get_information().get_closest_vehicle_name(ship_name)
# ship_name = ship_name.title().replace(" ", "_")
await emitter.progress_update("Fetching ship information for " + ship_name)
info = await get_information().fetch_infos(ship_name)
if info:
await emitter.success_update(
"Successfully fetched ship information for " + ship_name
)
await emitter.progress_update("Processing retrieved information...")
output_lines = []
# Build the output string
output_lines.append(f"Information for {ship_name}:")
if info.get("general"):
await emitter.progress_update("Processing general information...")
output_lines.append("\n--- General Information ---")
for key, value in info["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
# Only print the first word for size-related keys
value = value.split()[0] if value else ""
if "Stowage" in key:
# Replace 'Stowage' with 'Storage':
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info.get("specifications"):
await emitter.progress_update("Processing specifications...")
output_lines.append("\n--- Specifications ---")
for spec_area, details in info["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output = "\n".join(output_lines)
print(final_output)
await emitter.success_update(final_output)
return final_output
else:
error_message = f"No information found for {ship_name}."
print(error_message)
await emitter.error_update(error_message)
return error_message
async def compare_ships(
self,
ship_name1: str,
ship_name2: str,
__event_emitter__: Callable[[dict], Any] = None,
):
# ship_name1 = ship_name1.title().replace(" ", "_")
# ship_name2 = ship_name2.title().replace(" ", "_")
emitter = EventEmitter(__event_emitter__)
await emitter.progress_update(
f"Fetching ship information for {ship_name1} and {ship_name2}"
)
info1 = await get_information().fetch_infos(ship_name1)
if info1:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name1}"
)
output_lines = [f"Information for {ship_name1}:"]
if info1.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name1
)
output_lines.append("\n--- General Information ---")
for key, value in info1["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info1.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name1
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info1["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output1 = "\n".join(output_lines)
info2 = await get_information().fetch_infos(ship_name2)
if info2:
await emitter.success_update(
f"Successfully fetched ship information for {ship_name2}"
)
output_lines = [f"Information for {ship_name2}:"]
if info2.get("general"):
await emitter.progress_update(
"Processing general information for " + ship_name2
)
output_lines.append("\n--- General Information ---")
for key, value in info2["general"].items():
if isinstance(value, list):
output_lines.append(f"{key}: {', '.join(value)}")
else:
if "Size" in key:
value = value.split()[0] if value else ""
if "Stowage" in key:
key = key.replace("Stowage", "Storage")
output_lines.append(f"{key}: {value}")
if info2.get("specifications"):
await emitter.progress_update(
"Processing specifications for " + ship_name2
)
output_lines.append("\n--- Specifications ---")
for spec_area, details in info2["specifications"].items():
if not details:
continue
output_lines.append(f"\n[{spec_area}]")
for category, items in details.items():
output_lines.append(f" {category}:")
for item in items:
output_lines.append(f" - {item}")
final_output2 = "\n".join(output_lines)
await emitter.success_update(final_output2)
print(final_output1 + "\n\n" + final_output2)
return final_output1 + "\n\n" + final_output2
async def get_commodity_prices(
self, commodity_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch commodities from the database by name.
commodity_name: The name of the commodity to fetch.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No information found for commodity '{commodity_name}'."
# First, check for spelling issues and compare it to the list of all commodity names available
try:
await emitter.progress_update(
f"Fetching commodity names from the database to find a match for '{commodity_name}'"
)
all_names = await get_information().fetch_all_commodity_names()
# The names are returned as a single string, split it into a list
names_list = all_names.splitlines()
best_match = process.extractOne(commodity_name, names_list)
if (
best_match and best_match[1] > 60
): # If the match is above 60% confidence
matched_commodity_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{commodity_name}': {matched_commodity_name}"
)
conn = sqlite3.connect(self.db_path + "/commodities.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy and sell prices for '{matched_commodity_name}'"
)
cursor.execute(
"SELECT price_buy, price_sell, terminal_name, commodity_name FROM commodity_prices WHERE commodity_name = ?",
(matched_commodity_name,),
)
await emitter.progress_update(
f"Processing results for '{matched_commodity_name}'"
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = (
"Not buyable"
if int(row[0]) == 0
else f"{int(row[0])} aUEC"
)
sell_price = (
"not sellable"
if int(row[1]) == 0
else f"{int(row[1])} aUEC"
)
output_lines.append(
f"Item: {row[3]}, Buy Price: {buy_price} aUEC, Sell Price: {sell_price} aUEC, Terminal: {row[2]}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy and sell prices for '{matched_commodity_name}'"
)
else:
result_string = (
f"No price data found for '{matched_commodity_name}'."
)
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for commodity '{commodity_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching information for {commodity_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
return result_string
async def get_item_prices(
self, item_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetch item prices from the database by name.
item_name: The name of the item to fetch.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No information found for item '{item_name}'."
# First, check for spelling issues and compare it to the list of all item names available
try:
await emitter.progress_update(
f"Fetching item names from the database to find a match for '{item_name}'"
)
all_names = await get_information().fetch_all_item_names()
# The names are returned as a single string, split it into a list
names_list = all_names.splitlines()
best_match = process.extractOne(item_name, names_list)
if best_match and best_match[1] > 60:
matched_item_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{item_name}': {matched_item_name}"
)
conn = sqlite3.connect(self.db_path + "/items.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching buy and sell prices for '{matched_item_name}'"
)
cursor.execute(
"SELECT price_buy, price_sell, terminal_name, item_name FROM item_prices WHERE item_name = ?",
(matched_item_name,),
)
await emitter.progress_update(
f"Processing results for '{matched_item_name}'"
)
rows = cursor.fetchall()
conn.close()
if rows:
output_lines = []
for row in rows:
buy_price = (
"Not buyable"
if int(row[0]) == 0
else f"{int(row[0])} aUEC"
)
sell_price = (
"not sellable"
if int(row[1]) == 0
else f"{int(row[1])} aUEC"
)
output_lines.append(
f"Item: {row[3]}, Buy Price: {buy_price}, Sell Price: {sell_price}, Terminal: {row[2]}"
)
result_string = "\n".join(output_lines)
await emitter.success_update(
f"Successfully fetched buy and sell prices for '{matched_item_name}'"
)
else:
result_string = f"No price data found for '{matched_item_name}'."
await emitter.error_update(result_string)
else:
result_string = f"Could not find a confident match for item '{item_name}'. Best guess was '{best_match[0]}' with {best_match[1]}% confidence."
await emitter.error_update(result_string)
except Exception as e:
error_message = f"An error occurred while fetching information for {item_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
print(result_string)
return result_string
async def get_ship_owners(self, ship_name: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches the owners of a specific ship from the fleet.db sqlite database.
ship_name: The name of the ship to fetch owners for.
"""
emitter = EventEmitter(__event_emitter__)
result_string = f"No owners found for ship '{ship_name}'."
try:
await emitter.progress_update(
f"Fetching owners for ship '{ship_name}' from the database"
)
available_ships = await get_information().get_all_ship_names_from_fleetyard_db()
# The names are returned as a single string, split it into a list
ships_list = available_ships.splitlines()
best_match = process.extractOne(ship_name, ships_list)
if best_match and best_match[1] > 60:
matched_ship_name = best_match[0]
await emitter.success_update(
f"Found a close match for '{ship_name}': {matched_ship_name}"
)
print(f'found a close match for "{ship_name}": {matched_ship_name}')
conn = sqlite3.connect(self.db_path + "/fleet.db")
cursor = conn.cursor()
await emitter.progress_update(
f"Fetching owners for ship '{matched_ship_name}' from the database"
)
cursor.execute(
"SELECT manufacturerName, name, usernames FROM ship_owner_list WHERE name = ?",
(matched_ship_name,),
)
rows = cursor.fetchall()
conn.close()
if rows:
owners = [row[2] for row in rows]
manufacturer_name = rows[0][0]
matched_ship_name = rows[0][1]
result_string = f"Please report these to the user in a bulletpoint list:\nOwners of ship {manufacturer_name} {matched_ship_name}: {', '.join(owners)}"
except Exception as e:
error_message = f"An error occurred while fetching owners for {ship_name}: {str(e)}"
await emitter.error_update(error_message)
result_string = error_message
await emitter.progress_update(result_string)
print(result_string)
return result_string
async def list_purchasable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all buyable ships, their prices, and locations from the Star Citizen Tools wiki.
"""
emitter = EventEmitter(__event_emitter__)
api_url = "https://starcitizen.tools/api.php"
ship_data = {}
page_title = "Purchasing_ships"
await emitter.progress_update(f"Fetching data from {page_title}...")
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text",
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
await emitter.error_update(
f"API Error for {page_title}: {data['error']['info']}"
)
return
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
await emitter.error_update(f"No content found for {page_title}.")
return
await emitter.progress_update(f"Parsing data from {page_title}...")
soup = BeautifulSoup(html_content, "html.parser")
tables = soup.find_all("table", class_="wikitable")
for table in tables:
header_row = table.find("tr")
if not header_row:
continue
headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
rows = table.find_all("tr")[1:]
for row in rows:
cells = row.find_all("td")
if not cells or len(cells) < 3:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
price = cells[2].get_text(strip=True)
if ship_name not in ship_data:
ship_data[ship_name] = []
location_headers = headers[3:]
for i, cell in enumerate(cells[3:]):
if "" in cell.get_text():
location = location_headers[i]
ship_data[ship_name].append(
{"price": price + " aUEC (alpha United Earth Credits)", "location": location}
)
await emitter.success_update(f"Successfully processed {page_title}.")
except requests.exceptions.RequestException as e:
await emitter.error_update(f"Error fetching data for {page_title}: {e}")
except json.JSONDecodeError:
await emitter.error_update(f"Error decoding JSON for {page_title}.")
output_lines = []
for ship_name, locations in sorted(ship_data.items()):
output_lines.append(f"\n--- {ship_name} ---")
output_lines.append("Buyable at:")
for item in locations:
output_lines.append(
f" - Location: {item['location']}, Price: {item['price']}"
)
final_output = "\n".join(output_lines)
await emitter.success_update(f"Found {len(ship_data)} unique buyable ships.")
print(final_output)
return final_output
async def list_rentable_ships(
self, __event_emitter__: Callable[[dict], Any] = None
):
"""
Fetches all rentable ships, their prices, and locations from the Star Citizen Tools wiki.
"""
emitter = EventEmitter(__event_emitter__)
api_url = "https://starcitizen.tools/api.php"
ship_prices = {}
ship_locations = {}
page_title = "Ship_renting"
await emitter.progress_update(f"Fetching data from {page_title}...")
params = {
"action": "parse",
"page": page_title,
"format": "json",
"prop": "text",
}
try:
response = await asyncio.to_thread(requests.get, api_url, params=params)
response.raise_for_status()
data = response.json()
if "error" in data:
await emitter.error_update(
f"API Error for {page_title}: {data['error']['info']}"
)
return
html_content = data.get("parse", {}).get("text", {}).get("*", "")
if not html_content:
await emitter.error_update(f"No content found for {page_title}.")
return
await emitter.progress_update(f"Parsing data from {page_title}...")
soup = BeautifulSoup(html_content, "html.parser")
tables = soup.find_all("table", class_="wikitable")
for table in tables:
header_row = table.find("tr")
if not header_row:
continue
headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
rows = table.find_all("tr")[1:]
# Table 1: Ship rental prices
if "1 Day" in headers and "Location" in headers:
for row in rows:
cells = row.find_all("td")
if len(cells) < 8:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
ship_prices[ship_name] = {
"1_day": cells[3].get_text(strip=True),
"3_days": cells[4].get_text(strip=True),
"7_days": cells[5].get_text(strip=True),
"30_days": cells[6].get_text(strip=True),
}
# Table 2: Ship rental locations
elif "Area18" in headers:
location_headers = headers[3:]
for row in rows:
cells = row.find_all("td")
if len(cells) < 4:
continue
ship_name_tag = cells[1].find("a")
if not ship_name_tag or not ship_name_tag.get("title"):
continue
ship_name = ship_name_tag.get("title").strip()
if ship_name not in ship_locations:
ship_locations[ship_name] = []
for i, cell in enumerate(cells[3:]):
if "" in cell.get_text():
ship_locations[ship_name].append(location_headers[i])
await emitter.success_update(f"Successfully processed {page_title}.")
except requests.exceptions.RequestException as e:
await emitter.error_update(f"Error fetching data for {page_title}: {e}")
except json.JSONDecodeError:
await emitter.error_update(f"Error decoding JSON for {page_title}.")
output_lines = []
for ship_name, locations in sorted(ship_locations.items()):
if not locations:
continue
output_lines.append(f"\n--- {ship_name} ---")
output_lines.append("Rentable at:")
prices = ship_prices.get(ship_name, {})
for location in locations:
output_lines.append(f" - Location: {location}")
if prices:
output_lines.append(
f" - 1 Day: {prices.get('1_day', 'N/A')}, 3 Days: {prices.get('3_days', 'N/A')}, 7 Days: {prices.get('7_days', 'N/A')}, 30 Days: {prices.get('30_days', 'N/A')}"
)
final_output = "\n".join(output_lines)
await emitter.success_update(
f"Found {len(ship_locations)} unique rentable ships."
)
print(final_output)
return final_output
if __name__ == "__main__":
info_printer = Tools()
asyncio.run(info_printer.get_ship_owners("Perseus"))