Source code for workforce.utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
utils.py — Shared Workforce utilities for workspace identification,
networking, and interacting with the fixed-port multi-tenant server.
"""

import hashlib
import json
import os
import socket
import sys
import tempfile
import urllib.error
import urllib.parse
import urllib.request
import signal
import subprocess
import time

# -----------------------------------------------------------------------------
# Workspace identification
# -----------------------------------------------------------------------------

[docs] def compute_workspace_id(workfile_path: str) -> str: """Compute deterministic workspace ID from absolute Workfile path. Normalizes paths to handle cross-OS and network share compatibility: - Converts to absolute path - Normalizes separators to forward slashes - On Windows, strips drive letters for network paths (//server/share) - Resolves symlinks to canonical path Args: workfile_path: Path to the workflow file Returns: Workspace ID in format ws_<8-char-hash> """ # Get absolute path abs_path = os.path.abspath(workfile_path) # Resolve symlinks to get canonical path try: abs_path = os.path.realpath(abs_path) except (OSError, ValueError): # If realpath fails (e.g., broken symlink), use abspath pass # Normalize path separators to forward slashes for cross-platform consistency normalized_path = abs_path.replace(os.sep, '/') # On Windows, handle network paths by stripping drive letters from UNC paths # \\server\share becomes //server/share for consistency if sys.platform == 'win32' and normalized_path.startswith('//'): # Already a UNC path, keep as-is pass elif sys.platform == 'win32' and len(normalized_path) > 1 and normalized_path[1] == ':': # Local drive path like C:/path - keep as-is for now # Could be enhanced to detect if it's a mapped network drive pass # Compute hash of normalized path path_bytes = normalized_path.encode("utf-8") hash_hex = hashlib.sha256(path_bytes).hexdigest()[:8] return f"ws_{hash_hex}"
[docs] def parse_workspace_url(url: str) -> tuple[str, str] | None: """ Parse a workspace URL to extract server URL and workspace ID. Supports formats: - http://host:port/workspace/ws_abc123 - http://host:port/workspace/ws_abc123/get-graph - host:port/workspace/ws_abc123 Returns: (server_url, workspace_id) tuple or None if not a valid workspace URL """ import re # Add http:// if missing if not url.startswith(('http://', 'https://')): url = 'http://' + url # Match pattern: http://host:port/workspace/ws_XXXXXXXX[/endpoint] # Workspace ID format: ws_ followed by 8+ alphanumeric characters pattern = r'^(https?://[^/]+)/workspace/(ws_[a-zA-Z0-9]{8,})(?:/.*)?$' match = re.match(pattern, url) if match: server_url = match.group(1) workspace_id = match.group(2) return (server_url, workspace_id) return None
[docs] def is_workspace_url(text: str) -> bool: """Check if text looks like a workspace URL.""" return parse_workspace_url(text) is not None
[docs] def looks_like_url(text: str) -> bool: r"""Check if text looks like a URL (but may not be valid). Distinguishes URLs from Windows file paths (C:\...). """ if not text: return False # Check for explicit URL schemes first if text.startswith(('http://', 'https://')): return True # Check for /workspace/ endpoint if '/workspace/' in text: return True # Check for colon, but exclude Windows drive letters (C:, D:, etc.) # Windows drive letters are single letters followed by colon at position 1 if ':' in text: # If colon is at position 1 and text[0] is a single letter, it's a drive letter colon_pos = text.find(':') if colon_pos == 1 and text[0].isalpha(): # This is a Windows drive letter path (C:\...) or relative path with colon # In URL context, we need host:port, so position 1 colon isn't a URL return False # Otherwise, it might be a URL with host:port return True return False
[docs] def get_workspace_url(workspace_id: str, endpoint: str = "") -> str: """Build absolute URL for a workspace endpoint. Discovers or starts the server via resolve_server(). """ server_url = resolve_server() base = f"{server_url}/workspace/{workspace_id}" if endpoint: if not endpoint.startswith("/"): endpoint = "/" + endpoint return base + endpoint return base
# ----------------------------------------------------------------------------- # HTTP POST helper # ----------------------------------------------------------------------------- def _post(base_url: str, endpoint: str, payload: dict | None = None, retry_on_connect_error: bool = False) -> dict: """POST JSON payload to an endpoint. Used by edit, run, and GUI clients. If retry_on_connect_error=True, retry up to 30 times with 0.5s delay between attempts (15s total). This allows time for background server startup, especially on Windows frozen executables. """ if not endpoint.startswith("/"): endpoint = "/" + endpoint url = f"{base_url.rstrip('/')}{endpoint}" data = json.dumps(payload or {}).encode("utf-8") max_retries = 30 if retry_on_connect_error else 1 last_error = None for attempt in range(max_retries): try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req) as resp: resp_data = resp.read().decode("utf-8") try: return json.loads(resp_data) except json.JSONDecodeError: raise RuntimeError(f"Server returned non-JSON response: {resp_data}") except urllib.error.HTTPError as e: # HTTP errors (404, 409, 500, etc.) - try to read error response try: error_body = e.read().decode("utf-8") error_data = json.loads(error_body) error_msg = error_data.get("error", str(e)) except: error_msg = str(e) raise RuntimeError(f"Failed to POST to {url}: HTTP Error {e.code} {e.reason}. {error_msg}") except (urllib.error.URLError, ConnectionRefusedError, OSError) as e: # Connection errors - retry if enabled last_error = e if retry_on_connect_error and attempt < max_retries - 1: import time time.sleep(0.5) continue raise RuntimeError(f"Failed to POST to {url}: {e}") except Exception as e: raise RuntimeError(f"Unexpected error POSTing to {url}: {e}") raise RuntimeError(f"Failed to POST to {url} after {max_retries} retries: {last_error}")
[docs] def shell_quote_multiline(script: str) -> str: """Escape single quotes in shell scripts for safe execution.""" return script.replace("'", "'\\''")
[docs] def default_workfile() -> str | None: """Return ./Workfile if it exists, else None.""" path = os.path.join(os.getcwd(), "Workfile") return path if os.path.exists(path) else None
[docs] def get_absolute_path(path: str) -> str: """Convert relative or absolute path to absolute path.""" return os.path.abspath(path)
[docs] def ensure_workfile(path: str | None = None) -> str: """Resolve a workfile path or create a temporary one when absent. Order of precedence: 1) Explicit path provided 2) ./Workfile if it exists 3) New temp file path in the system temp directory (not pre-created) Returns an absolute path suitable for compute_workspace_id. """ if path: return os.path.abspath(path) existing = default_workfile() if existing: return os.path.abspath(existing) fd, temp_path = tempfile.mkstemp(prefix="workforce_tmp_", suffix=".wf.graphml") os.close(fd) # Remove the empty file so the first load can create a valid GraphML try: os.unlink(temp_path) except FileNotFoundError: pass return temp_path
[docs] def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool: """Check if a port is in use. Args: port: Port number to check host: Host to bind to (default: 127.0.0.1) Returns: True if port is in use, False if available """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind((host, port)) return False # Port is free except OSError: return True # Port is in use
[docs] def runtime_dir() -> str: """Return the directory used for runtime artifacts (~/.workforce).""" path = os.path.join(os.path.expanduser("~"), ".workforce") os.makedirs(path, exist_ok=True) return path
[docs] def pid_file_path() -> str: return os.path.join(runtime_dir(), "server.pid")
[docs] def lock_file_path() -> str: return os.path.join(runtime_dir(), "server.lock")
[docs] def log_file_path(log_dir: str | None = None) -> str: if log_dir: os.makedirs(log_dir, exist_ok=True) return os.path.join(log_dir, "server.log") return os.path.join(runtime_dir(), "server.log")
def _read_pid_file() -> tuple[str, int, int] | None: """Return (host, port, pid) if pid file exists and is parseable.""" path = pid_file_path() if not os.path.exists(path): return None try: with open(path, "r", encoding="utf-8") as f: lines = [line.strip() for line in f.readlines() if line.strip()] if len(lines) < 2: return None host_port = lines[0] if ":" not in host_port: return None host, port_str = host_port.split(":", 1) pid = int(lines[1]) port = int(port_str) return host, port, pid except Exception: return None def _pid_alive(pid: int) -> bool: """Check if a process with given PID is still alive. Handles both Unix and Windows platforms correctly: - Returns False if pid is 0 (invalid/unknown) - Unix: Uses os.kill(pid, 0) signal check - Windows: Uses tasklist with very aggressive timeout """ if pid == 0: # Invalid or unknown PID return False try: if sys.platform != "win32": # Unix: os.kill(pid, 0) checks liveness without killing os.kill(pid, 0) return True else: # Windows: Use tasklist with extremely short timeout # tasklist can hang on some systems, so fail fast try: result = subprocess.run( ["tasklist", "/FI", f"PID eq {pid}"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=0.1 ) # tasklist returns 0 if process found, 1 if not return result.returncode == 0 except subprocess.TimeoutExpired: # Timeout is likely due to tasklist hang on bad PID # Assume dead to speed up discovery return False except ProcessLookupError: # Process doesn't exist return False except PermissionError: # Process exists but we don't have permission to send signals # On Unix, treat as alive since process clearly exists (we got permission error, not "not found") return True except Exception: # Unknown error - assume dead to avoid false positives return False def _server_responding(host: str, port: int, timeout: float = 0.1) -> bool: try: with urllib.request.urlopen(f"http://{host}:{port}/workspaces", timeout=timeout) as resp: return resp.status == 200 except Exception: return False def _normalize_server_url(url: str) -> tuple[str, int, str]: """Normalize URL, preserving scheme, returning (host, port, normalized_url). Defaults to http when the scheme is missing. When no port is provided, uses 443 for https and 5049 otherwise. """ parsed = urllib.parse.urlparse(url) if not parsed.scheme: parsed = urllib.parse.urlparse("http://" + url) scheme = (parsed.scheme or "http").lower() host = parsed.hostname or "127.0.0.1" port = parsed.port or (443 if scheme == "https" else 5049) normalized = f"{scheme}://{host}:{port}" return host, port, normalized
[docs] def register_workspace(server_url: str, workfile_path: str) -> dict: """Register a workspace path with the server and return metadata. Retries up to 30 times with 0.5s delay (15s total) to wait for server startup. """ return _post(server_url, "/workspace/register", {"path": workfile_path}, retry_on_connect_error=True)
[docs] def remove_workspace(server_url: str, workspace_id: str) -> dict: """Remove a workspace from the server.""" import urllib.request url = f"{server_url.rstrip('/')}/workspace/{workspace_id}" req = urllib.request.Request(url, method="DELETE") with urllib.request.urlopen(req) as resp: data = resp.read().decode("utf-8") try: return json.loads(data) except json.JSONDecodeError: return {"status": data}
[docs] def resolve_server(server_url: str | None = None, start_if_missing: bool = True, log_dir: str | None = None) -> str: """Return server URL, starting the server if needed. Discovery strategy: 1. Try the candidate port (explicit URL, env var, or default) 2. Try PID file if different from candidate 3. Start new server on candidate port if not found Priority: explicit server_url argument > WORKFORCE_SERVER_URL env > http://127.0.0.1:5049. Note: If server needs to be started, this function returns immediately. Callers should poll/wait for server availability if needed. """ candidate = server_url or os.environ.get("WORKFORCE_SERVER_URL") or "http://127.0.0.1:5049" host, port, normalized = _normalize_server_url(candidate) # Step 1: Quick check on the candidate URL (the one we're supposed to use) try: with urllib.request.urlopen(f"{candidate}/workspaces", timeout=0.1) as resp: if resp.status == 200: return candidate except Exception: pass # Step 2: Try PID file lookup (registry) pid_info = _read_pid_file() if pid_info: pid_host, pid_port, pid = pid_info pid_url = f"http://{pid_host}:{pid_port}" if _pid_alive(pid) and _server_responding(pid_host, pid_port): return pid_url try: os.remove(pid_file_path()) except OSError: pass if not start_if_missing: raise RuntimeError("Workforce server is not running") from workforce.server import start_server # Step 3: Exponential backoff before starting new server to prevent concurrent starts # This allows a second wf command to find the server started by the first one for backoff_delay in [0.5, 1.0]: time.sleep(backoff_delay) # Quick recheck on candidate try: with urllib.request.urlopen(f"{candidate}/workspaces", timeout=0.1) as resp: if resp.status == 200: return candidate except Exception: pass # Step 4: If port already in use, fail fast with guidance if is_port_in_use(port, host=host): raise RuntimeError( f"Port {port} on {host} is already in use. " "Set WORKFORCE_SERVER_URL to an available host:port or run 'wf server start --port <port>'." ) # Step 5: Start new server on candidate port start_server(background=True, host=host, port=port, log_dir=log_dir) # Return expected URL immediately - server will start in background return normalized
[docs] def get_running_server() -> tuple[str, int, int] | None: """Return (host, port, pid) if a server is running, else None (registry only).""" info = _read_pid_file() if info: host, port, pid = info if _pid_alive(pid) and _server_responding(host, port): return info # PID file points to dead process, clean it up try: os.remove(pid_file_path()) except OSError: pass return None