API Reference

This section provides detailed API documentation for all Workforce modules.

REST API Overview

Workforce exposes a Flask-based REST API for workflow management and execution. All endpoints are scoped to a specific workspace using the workspace ID (deterministic SHA256 hash of the workflow file path).

Edge Type Parameters

Many REST endpoints accept an edge_type parameter to specify the type of edge:

Adding Edges with Type Specification

To add a blocking edge (default):

curl -X POST http://localhost:5000/workspace/abc123def456/add-edge \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "node-uuid-1",
    "target_id": "node-uuid-2",
    "edge_type": "blocking"
  }'

To add a non-blocking edge:

curl -X POST http://localhost:5049/workspace/abc123def456/add-edge \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "node-uuid-1",
    "target_id": "node-uuid-2",
    "edge_type": "non-blocking"
  }'

Python client example (blocking edge):

import requests

response = requests.post(
    "http://localhost:5049/workspace/abc123def456/add-edge",
    json={
        "source_id": "node-uuid-1",
        "target_id": "node-uuid-2",
        "edge_type": "blocking"
    }
)
print(response.json())

Python client example (non-blocking edge):

import requests

response = requests.post(
    "http://localhost:5049/workspace/abc123def456/add-edge",
    json={
        "source_id": "node-uuid-1",
        "target_id": "node-uuid-2",
        "edge_type": "non-blocking"
    }
)
print(response.json())

Updating Edge Types

To change an existing edge’s type from blocking to non-blocking:

curl -X POST http://localhost:5049/workspace/abc123def456/edit-edge-type \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "node-uuid-1",
    "target_id": "node-uuid-2",
    "edge_type": "non-blocking"
  }'

Python client example:

import requests

response = requests.post(
    "http://localhost:5049/workspace/abc123def456/edit-edge-type",
    json={
        "source_id": "node-uuid-1",
        "target_id": "node-uuid-2",
        "edge_type": "non-blocking"
    }
)
print(response.json())

Graph Queries

When retrieving the workflow graph, the edge_type attribute is included in edge data:

curl http://localhost:5049/workspace/abc123def456/get-graph

Response includes edges with type information:

{
  "nodes": [
    {"id": "node-1", "label": "wget data.csv", "status": ""},
    {"id": "node-2", "label": "python process.py", "status": ""}
  ],
  "edges": [
    {
      "id": "edge-1",
      "source": "node-1",
      "target": "node-2",
      "edge_type": "blocking",
      "status": ""
    }
  ]
}

See Glossary for detailed definitions of edge type semantics and Dependency Resolution in the architecture documentation for execution behavior.

Core Module

Top-level package for workforce.

Utils

utils.py — Shared Workforce utilities for workspace identification, networking, and interacting with the fixed-port multi-tenant server.

workforce.utils.compute_workspace_id(workfile_path: str) str[source]

Compute deterministic workspace ID from absolute Workfile path.

Normalizes paths to handle cross-OS and network share compatibility: - Converts to absolute path - Normalizes separators to forward slashes - On Windows, strips drive letters for network paths (//server/share) - Resolves symlinks to canonical path

Parameters:

workfile_path – Path to the workflow file

Returns:

Workspace ID in format ws_<8-char-hash>

workforce.utils.default_workfile() str | None[source]

Return ./Workfile if it exists, else None.

workforce.utils.ensure_workfile(path: str | None = None) str[source]

Resolve a workfile path or create a temporary one when absent.

Order of precedence: 1) Explicit path provided 2) ./Workfile if it exists 3) New temp file path in the system temp directory (not pre-created)

Returns an absolute path suitable for compute_workspace_id.

workforce.utils.get_absolute_path(path: str) str[source]

Convert relative or absolute path to absolute path.

workforce.utils.get_running_server() tuple[str, int, int] | None[source]

Return (host, port, pid) if a server is running, else None (registry only).

workforce.utils.get_workspace_url(workspace_id: str, endpoint: str = '') str[source]

Build absolute URL for a workspace endpoint.

Discovers or starts the server via resolve_server().

workforce.utils.is_port_in_use(port: int, host: str = '127.0.0.1') bool[source]

Check if a port is in use.

Parameters:
  • port – Port number to check

  • host – Host to bind to (default: 127.0.0.1)

Returns:

True if port is in use, False if available

workforce.utils.is_workspace_url(text: str) bool[source]

Check if text looks like a workspace URL.

workforce.utils.lock_file_path() str[source]
workforce.utils.log_file_path(log_dir: str | None = None) str[source]
workforce.utils.looks_like_url(text: str) bool[source]

Check if text looks like a URL (but may not be valid).

Distinguishes URLs from Windows file paths (C:...).

workforce.utils.parse_workspace_url(url: str) tuple[str, str] | None[source]

Parse a workspace URL to extract server URL and workspace ID.

Supports formats: - http://host:port/workspace/ws_abc123 - http://host:port/workspace/ws_abc123/get-graph - host:port/workspace/ws_abc123

Returns:

(server_url, workspace_id) tuple or None if not a valid workspace URL

workforce.utils.pid_file_path() str[source]
workforce.utils.register_workspace(server_url: str, workfile_path: str) dict[source]

Register a workspace path with the server and return metadata.

Retries up to 30 times with 0.5s delay (15s total) to wait for server startup.

workforce.utils.remove_workspace(server_url: str, workspace_id: str) dict[source]

Remove a workspace from the server.

workforce.utils.resolve_server(server_url: str | None = None, start_if_missing: bool = True, log_dir: str | None = None) str[source]

Return server URL, starting the server if needed.

Discovery strategy: 1. Try the candidate port (explicit URL, env var, or default) 2. Try PID file if different from candidate 3. Start new server on candidate port if not found

Priority: explicit server_url argument > WORKFORCE_SERVER_URL env > http://127.0.0.1:5049.

Note: If server needs to be started, this function returns immediately. Callers should poll/wait for server availability if needed.

workforce.utils.runtime_dir() str[source]

Return the directory used for runtime artifacts (~/.workforce).

workforce.utils.shell_quote_multiline(script: str) str[source]

Escape single quotes in shell scripts for safe execution.

Edit Module

The edit module provides functions for manipulating workflow graphs.

Graph Functions

workforce.edit.graph.add_edge_to_graph(path, source, target, edge_type='blocking')[source]
workforce.edit.graph.add_node_to_graph(path, label, x=0.0, y=0.0, status='')[source]
workforce.edit.graph.edit_edge_type_in_graph(path, source, target, edge_type)[source]
workforce.edit.graph.edit_node_label_in_graph(path, node_id, label)[source]
workforce.edit.graph.edit_node_position_in_graph(path, node_id, x, y)[source]
workforce.edit.graph.edit_node_positions_in_graph(path, positions)[source]

Batch update positions for multiple nodes.

Parameters:
  • path – Path to graph file

  • positions – List of dicts with keys: node_id, x, y

Returns:

Dict with status and count of updated nodes

workforce.edit.graph.edit_status_in_graph(path, element_type, element_id, value)[source]
workforce.edit.graph.edit_statuses_in_graph(path, updates)[source]

Batch update statuses for multiple elements (nodes/edges).

Parameters:
  • path – Path to graph file

  • updates – List of dicts with keys: element_type, element_id, value

Returns:

Dict with status and count of updated elements

Raises:

Returns error if any element is not found (fail-fast)

workforce.edit.graph.edit_wrapper_in_graph(path, wrapper)[source]
workforce.edit.graph.has_blocking_cycle(path)[source]

Return True if blocking edges contain a directed cycle.

workforce.edit.graph.load_graph(path: str) DiGraph[source]

Load graph from file.

Note: Concurrency safety is provided by the server’s single-threaded queue worker (see workforce/server/queue.py), which serializes all graph mutations. Direct file access is safe because the singleton server architecture prevents concurrent writes.

workforce.edit.graph.remove_edge_from_graph(path, source, target)[source]
workforce.edit.graph.remove_node_from_graph(path, node_id)[source]
workforce.edit.graph.remove_node_logs_in_graph(path, node_ids)[source]

Remove execution logs from multiple nodes.

Parameters:
  • path – Path to graph file

  • node_ids – List of node IDs to clear logs from

Returns:

Dict with status and count of cleared nodes

Raises:

Returns error if any node is not found (fail-fast)

workforce.edit.graph.save_graph(G: DiGraph, path: str)[source]

Save graph to file atomically.

Uses temporary file + os.replace for atomic write, which handles crash safety. Concurrency safety is provided by the server’s single-threaded queue worker.

workforce.edit.graph.save_node_execution_data_in_graph(path, node_id, command, stdout, stderr, pid, error_code)[source]

Save execution data as separate node attributes (all as strings).

Parameters:
  • path – Path to graph file

  • node_id – Node ID to update

  • command – The command that was executed (string)

  • stdout – Standard output from command (string)

  • stderr – Standard error from command (string)

  • pid – Process ID (string representation of int)

  • error_code – Exit code (string representation of int)

Returns:

Dict with status

workforce.edit.graph.save_node_log_in_graph(path, node_id, log)[source]

DEPRECATED: Use save_node_execution_data_in_graph instead.

Edit CLI

workforce.edit.cli.main()[source]

Edit Client

workforce.edit.client.cmd_add_edge(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_add_node(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_edit_edge_type(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_edit_node_label(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_edit_position(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_edit_status(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_edit_wrapper(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_remove_edge(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_remove_node(args, base_url, workspace_id)[source]
workforce.edit.client.cmd_save_node_log(args, base_url, workspace_id)[source]

Run Module

The run module handles workflow execution.

Run Client

Run CLI

Server Module

The server module manages the Flask API and workflow execution engine.

Server Context

Events System

Queue Management

API Routes

WebSocket Handlers

GUI Module

The GUI module provides the Tkinter-based visual workflow editor.

Main Application

Canvas

GUI Client

Core GUI

State Management

class workforce.gui.state.GUIState(graph: Dict[str, Any]=<factory>, selected_nodes: List[str] = <factory>, scale: float = 1.0, pan_x: float = 0.0, pan_y: float = 0.0, base_font_size: int = 10, base_edge_width: int = 2, wrapper: str = '{}', dragging_node: str | None = None, edge_start: str | None = None, _press_x: int = 0, _press_y: int = 0, _potential_deselect: bool = False, _panning: bool = False, _select_rect_active: bool = False, _select_rect_id: int | None = None, _select_rect_start: tuple | None = None, _multi_drag_initial: Dict[str, tuple]=<factory>)[source]

Bases: object

base_edge_width: int = 2
base_font_size: int = 10
dragging_node: str | None = None
edge_start: str | None = None
graph: Dict[str, Any]
pan_x: float = 0.0
pan_y: float = 0.0
scale: float = 1.0
selected_nodes: List[str]
wrapper: str = '{}'