pi@raspberrypihole:~ $ cat celestrak-slow.py #!/usr/bin/env python3 import requests import re import time import paramiko import logging import csv from datetime import datetime # --- CONFIGURATION --- INPUT_FILE = '/home/pi/satellites.txt' OUTPUT_FILE = '/home/pi/Celestrak_downloaded_satellites.txt' # SFTP CONFIGURATION SFTP_HOST = 'YourHostNameGoesHere' SFTP_USER = 'YourUserNameGoesHere' SFTP_PASS = 'YourPasswrdNotMineGoesHere' REMOTE_PATH = 'www/david/satnogs' REMOTE_FILENAME = 'jdt_tle.txt' # Setup logging logging.basicConfig( filename='/home/pi/celestrak.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) BASE_QUERY_URL = 'https://celestrak.org/NORAD/elements/gp.php?CATNR={cat_id}&FORMAT=csv' HEADERS = {'User-Agent': 'SkyNet-Trolinger-Station/1.0 (Electronics Engineer Build)'} def format_bstar(bstar_str): try: val = float(bstar_str) if val == 0: return " 00000-0" sci_str = f"{val:.5e}" mantissa, exponent = sci_str.split('e') m_digits = mantissa.replace('.', '').replace('-', '') exponent = int(exponent) + 1 m_fixed = m_digits[:5].ljust(5, '0') sign_char = "-" if val < 0 else " " exp_sign = "-" if exponent < 0 else "+" return f"{sign_char}{m_fixed}{exp_sign}{abs(exponent):1d}" except: return " 00000-0" def format_mean_motion_dot(dot_str): try: val = float(dot_str) sign_char = "-" if val < 0 else " " fraction = f"{abs(val):.8f}".split('.')[1] return f"{sign_char}.{fraction[:8]}" except: return " .00000000" def csv_to_tle_block(custom_name, row): # Core variables mapped from CelesTrak's array indices obj_id = row[1] epoch_str = row[2] mean_motion = float(row[3]) eccentricity = float(row[4]) inclination = float(row[5]) ra_of_asc_node = float(row[6]) arg_of_pericenter = float(row[7]) mean_anomaly = float(row[8]) classification = row[10] norad_cat_id = row[11].zfill(5) el_set_num = row[12].strip().zfill(3) rev_at_epoch = int(row[13]) bstar_raw = row[14] mm_dot_raw = row[15] # Time calculations dt = datetime.fromisoformat(epoch_str) year_short = f"{dt.year % 100:02d}" day_of_year = dt.timetuple().tm_yday fraction = (dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond / 1e6) / 86400.0 epoch_fraction = day_of_year + fraction # Line 1 layout clean_id = obj_id.replace('-', '') if len(clean_id) >= 7 and clean_id.startswith('20'): clean_id = clean_id[2:] int_desig = clean_id.ljust(8)[:8] formatted_mm_dot = format_mean_motion_dot(mm_dot_raw) formatted_bstar = format_bstar(bstar_raw) l1_base = f"1 {norad_cat_id}{classification} {int_desig} {year_short}{epoch_fraction:012.8f} {formatted_mm_dot} 00000-0 {formatted_bstar} 0 {el_set_num.rjust(3)}" csum1 = sum(int(c) for c in l1_base if c.isdigit()) + l1_base.count('-') l1 = f"{l1_base}{(csum1 % 10):1d}" # Line 2 layout ecc_fixed = f"{eccentricity:.7f}".split('.')[1][:7] inc_str = f"{inclination:8.4f}"[-8:] raan_str = f"{ra_of_asc_node:8.4f}"[-8:] argp_str = f"{arg_of_pericenter:8.4f}"[-8:] anom_str = f"{mean_anomaly:8.4f}"[-8:] mm_str = f"{mean_motion:11.8f}"[-11:] rev_str = f"{rev_at_epoch:5d}"[-5:] l2_base = f"2 {norad_cat_id} {inc_str} {raan_str} {ecc_fixed} {argp_str} {anom_str} {mm_str}{rev_str}" csum2 = sum(int(c) for c in l2_base if c.isdigit()) + l2_base.count('-') l2 = f"{l2_base}{(csum2 % 10):1d}" return f"{custom_name}\n{l1}\n{l2}\n" def check_maintenance_window(): while True: now = datetime.now() m = now.minute is_restricted = (m >= 55 or m <= 5) or (14 <= m <= 16) or (25 <= m <= 35) or (44 <= m <= 46) if is_restricted: print(f"--- [!] CelesTrak Pause: Maintenance Window ({now.strftime('%H:%M')}) ---", end='\r') time.sleep(30) else: break def download_and_upload(): logging.info("Starting CelesTrak GP/CSV polite sync with native 3LE parsing.") found_count = 0 try: with open(INPUT_FILE, 'r') as f: lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: logging.error(f"Input file {INPUT_FILE} not found.") return # Reset output file buffer open(OUTPUT_FILE, 'w').close() total_sats = len(lines) for index, line in enumerate(lines): match = re.match(r'^(\d+).*?#\s*(.*)', line) if match: cat_id = match.group(1) custom_name = match.group(2).strip() check_maintenance_window() try: response = requests.get(BASE_QUERY_URL.format(cat_id=cat_id), headers=HEADERS, timeout=20) if response.status_code == 200 and response.text.strip(): raw_lines = response.text.strip().splitlines() reader = csv.reader(raw_lines) rows = list(reader) if len(rows) > 0: # Extract data row skipping headers if present data_row = rows[0] if rows[0][0] != "OBJECT_NAME" else (rows[1] if len(rows) > 1 else None) if data_row and len(data_row) > 15: # Reconstruct standard, fixed-width tracking elements clean_3le_output = csv_to_tle_block(custom_name, data_row) with open(OUTPUT_FILE, 'a') as f: f.write(clean_3le_output) logging.info(f"[{index+1}/{total_sats}] Rebuilt 3LE block for {cat_id} ({custom_name})") found_count += 1 else: logging.warning(f"CelesTrak: ID {cat_id} had no valid tracking row.") else: logging.warning(f"CelesTrak: ID {cat_id} failed. Status: {response.status_code}") except Exception as e: logging.error(f"CelesTrak Error on {cat_id}: {e}") if index < total_sats - 1: time.sleep(2) if __name__ == "__main__": download_and_upload()