Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 131 additions & 3 deletions Lucy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,66 @@
#!/usr/bin/env python3

import curses
import json
import os
import subprocess
import sys
import threading
import urllib.request

MIN_TERM_HEIGHT = 15
MIN_TERM_WIDTH = 65

# Latest published release, compared against version.txt to flag updates.
GITHUB_LATEST_RELEASE_URL = "https://api.github.com/repos/Sentience-Robotics/lucy_ws/releases/latest"

def is_installed():
"""True when the workspace has been built (mirrors launch_lucy.sh's check)."""
return os.path.isfile("install/setup.bash")

def get_local_version():
try:
with open("version.txt", "r") as f:
return f.read().strip()
except OSError:
return None

def _version_tuple(v):
"""Parse 'vX.Y.Z' into a comparable tuple of ints, or None if unparseable."""
if not v:
return None
parts = []
for part in v.strip().lstrip("vV").split("."):
num = "".join(ch for ch in part if ch.isdigit())
if num == "":
break
parts.append(int(num))
return tuple(parts) or None

def fetch_update_info(update_info, local_version):
"""Best-effort GitHub check run in a background thread: flag update_info when a
newer release exists. Always sets 'done' so the TUI stops polling, even offline."""
try:
req = urllib.request.Request(
GITHUB_LATEST_RELEASE_URL,
headers={"Accept": "application/vnd.github+json", "User-Agent": "lucy-launcher"},
)
with urllib.request.urlopen(req, timeout=4) as resp:
data = json.load(resp)
latest = (data.get("tag_name") or "").strip()
lt, ct = _version_tuple(latest), _version_tuple(local_version)
if lt and ct:
n = max(len(lt), len(ct))
lt += (0,) * (n - len(lt))
ct += (0,) * (n - len(ct))
if lt > ct:
update_info["latest"] = latest
update_info["update_available"] = True
except Exception:
pass
finally:
update_info["done"] = True

def get_dev_mode():
if not os.path.exists(".env"):
return False
Expand Down Expand Up @@ -62,8 +115,43 @@ def run_command(command, interactive=False):
print(f"An error occurred: {e}")
return -1

def main_tui(stdscr):
"""The main curses TUI function. Returns the command to run."""
def not_installed_screen(stdscr):
"""First-run screen shown when the workspace isn't built yet.
Returns True to install, False to close. (Terminal size is guaranteed by the
pre-check in __main__.)"""
curses.curs_set(0)
stdscr.nodelay(0)
stdscr.timeout(-1)

while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
title = "Lucy Workspace Manager"
stdscr.addstr(0, max(0, (w - len(title)) // 2), title, curses.A_BOLD)

lines = [
("Lucy is not installed on this machine.", curses.A_BOLD),
("", curses.A_NORMAL),
("Press ENTER to install it.", curses.A_NORMAL),
("Press Q or ESC to close.", curses.A_DIM),
]
start = max(2, h // 2 - len(lines) // 2)
for i, (text, attr) in enumerate(lines):
stdscr.addstr(start + i, max(0, (w - len(text)) // 2), text, attr)
stdscr.refresh()

key = stdscr.getch()
if key in (ord('\n'), ord('\r')):
return True
if key in (ord('q'), ord('Q'), 27): # q / ESC
return False

def main_tui(stdscr, update_info):
"""The main curses TUI function. Returns the command to run.

update_info is a shared dict the background release check writes into; while it
is still running ('done' unset) we poll so the update highlight can appear
without a keypress."""
h, w = stdscr.getmaxyx()
if h < MIN_TERM_HEIGHT or w < MIN_TERM_WIDTH:
return "TerminalTooSmall"
Expand All @@ -74,6 +162,7 @@ def main_tui(stdscr):
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_CYAN, -1)
curses.init_pair(2, curses.COLOR_YELLOW, -1)

is_dev_mode = get_dev_mode()
current_idx = 0
Expand All @@ -95,13 +184,22 @@ def main_tui(stdscr):
if option == "Developer Mode":
checkbox = "[x]" if is_dev_mode else "[ ]"
stdscr.addstr(2 + i, 4, f"{prefix}{checkbox} {option}")
elif option == "Install/Update" and update_info.get("update_available"):
# Highlight in yellow when a newer release than version.txt exists.
text = f"{prefix}{option} (new release {update_info.get('latest', '')} available)"
stdscr.addstr(2 + i, 4, text, curses.color_pair(2) | curses.A_BOLD)
else:
stdscr.addstr(2 + i, 4, f"{prefix}{option}")

stdscr.addstr(h - 2, 2, "Enter/Space: Select/Toggle | Up/Down: Navigate", curses.A_DIM)
stdscr.refresh()

# Block once the release check has finished; until then poll so the
# update highlight shows up on its own.
stdscr.timeout(-1 if update_info.get("done") else 200)
key = stdscr.getch()
if key == -1:
continue

if key == curses.KEY_UP:
current_idx = (current_idx - 1 + len(options)) % len(options)
Expand Down Expand Up @@ -140,10 +238,40 @@ def check_initial_size():
print(f"Please increase the terminal size to at least {MIN_TERM_WIDTH}x{MIN_TERM_HEIGHT} characters.", file=sys.stderr)
sys.exit(1)

# First run: nothing built yet — offer to install before showing the menu.
if not is_installed():
try:
wants_install = curses.wrapper(not_installed_screen)
except KeyboardInterrupt:
print("\nExiting.")
sys.exit(0)
except curses.error as e:
print(f"A terminal error occurred: {e}", file=sys.stderr)
print("This might be due to resizing the window. Please restart.", file=sys.stderr)
sys.exit(1)
if not wants_install:
sys.exit(0)
rc = run_command(["./install.sh"], interactive=False)
if rc != 0:
print(f"\n--- Install finished with exit code {rc} ---")
print("Press Enter to exit.")
input()
sys.exit(rc)
print("\n--- Install finished successfully. ---")
print("Press Enter to continue to the menu.")
input()

# Kick off the release check in the background so the menu opens instantly;
# the menu polls update_info until the check finishes.
update_info = {"done": False}
threading.Thread(
target=fetch_update_info, args=(update_info, get_local_version()), daemon=True
).start()

while True:
task = None
try:
task = curses.wrapper(main_tui)
task = curses.wrapper(main_tui, update_info)
except KeyboardInterrupt:
print("\nExiting.")
sys.exit(0)
Expand Down
Loading