import networkx as nx
import os
import tempfile
import logging
import uuid
log = logging.getLogger(__name__)
[docs]
def load_graph(path: str) -> nx.DiGraph:
"""Load graph from file.
Note: Concurrency safety is provided by the server's single-threaded queue worker
(see workforce/server/queue.py), which serializes all graph mutations. Direct file
access is safe because the singleton server architecture prevents concurrent writes.
"""
if not os.path.exists(path):
# Create new empty graph
G = nx.DiGraph()
nx.write_graphml(G, path)
return G
# Read existing graph
G = nx.read_graphml(path)
return nx.DiGraph(G)
[docs]
def save_graph(G: nx.DiGraph, path: str):
"""Save graph to file atomically.
Uses temporary file + os.replace for atomic write, which handles crash safety.
Concurrency safety is provided by the server's single-threaded queue worker.
"""
dirpath = os.path.dirname(path) if os.path.dirname(path) else '.'
with tempfile.NamedTemporaryFile(dir=dirpath, delete=False, mode='wb') as tmp:
tmppath = tmp.name
# Close the file handle so it can be written to on Windows
nx.write_graphml(G, tmppath)
os.replace(tmppath, path)
[docs]
def add_node_to_graph(path, label, x=0.0, y=0.0, status=""):
G = load_graph(path)
node_id = str(uuid.uuid4())
G.add_node(node_id, label=label, x=str(x), y=str(y), status=status)
save_graph(G, path)
log.info(f"Added node {node_id} to graph: {path}")
return {"node_id": node_id}
[docs]
def remove_node_from_graph(path, node_id):
G = load_graph(path)
if node_id in G:
G.remove_node(node_id)
save_graph(G, path)
log.info(f"Removed node {node_id} from graph: {path}")
return {"status": "removed"}
return {"error": "Node not found"}
def _normalize_edge_type(edge_type: str) -> str:
if not edge_type:
return "blocking"
if edge_type == "non_blocking":
return "non-blocking"
return edge_type
[docs]
def add_edge_to_graph(path, source, target, edge_type="blocking"):
G = load_graph(path)
if source not in G or target not in G:
return {"error": "Both source and target must exist"}
edge_id = str(uuid.uuid4())
G.add_edge(source, target, id=edge_id, edge_type=_normalize_edge_type(edge_type))
save_graph(G, path)
log.info(f"Added edge {edge_id} ({source} -> {target}) to graph: {path}")
return {"edge_id": edge_id}
[docs]
def edit_edge_type_in_graph(path, source, target, edge_type):
G = load_graph(path)
if not G.has_edge(source, target):
return {"error": "Edge not found"}
G[source][target]["edge_type"] = _normalize_edge_type(edge_type)
save_graph(G, path)
log.info(f"Set edge_type for {source}->{target} to {edge_type}")
return {"status": "updated"}
[docs]
def remove_edge_from_graph(path, source, target):
G = load_graph(path)
if G.has_edge(source, target):
G.remove_edge(source, target)
save_graph(G, path)
log.info(f"Removed edge ({source} -> {target}) from graph: {path}")
return {"status": "removed"}
return {"error": "Edge not found"}
[docs]
def edit_status_in_graph(path, element_type, element_id, value):
log.info(f"Updating status of {element_id} to {value}")
G = load_graph(path)
if element_type == "node":
if element_id in G:
G.nodes[element_id]["status"] = value
save_graph(G, path)
log.info(f"Set node {element_id} status to '{value}' in graph: {path}")
return {"status": "updated"}
return {"error": "Node not found"}
if element_type == "edge":
for u, v, data in G.edges(data=True):
if str(data.get("id")) == str(element_id):
data["status"] = value
save_graph(G, path)
log.info(f"Set edge {element_id} status to '{value}' in graph: {path}")
return {"status": "updated"}
return {"error": "Edge not found"}
return {"error": "element_type must be node or edge"}
[docs]
def edit_node_position_in_graph(path, node_id, x, y):
G = load_graph(path)
if node_id not in G:
return {"error": "Node not found"}
G.nodes[node_id]["x"] = str(x)
G.nodes[node_id]["y"] = str(y)
save_graph(G, path)
return {"status": "updated"}
[docs]
def edit_node_positions_in_graph(path, positions):
"""Batch update positions for multiple nodes.
Args:
path: Path to graph file
positions: List of dicts with keys: node_id, x, y
Returns:
Dict with status and count of updated nodes
"""
G = load_graph(path)
updated = 0
missing = []
for pos in positions:
node_id = pos.get("node_id")
x = pos.get("x")
y = pos.get("y")
if node_id in G:
G.nodes[node_id]["x"] = str(x)
G.nodes[node_id]["y"] = str(y)
updated += 1
else:
missing.append(node_id)
save_graph(G, path)
log.info(f"Batch updated positions for {updated} nodes in graph: {path}")
result = {"status": "updated", "count": updated}
if missing:
result["missing_nodes"] = missing
return result
[docs]
def edit_wrapper_in_graph(path, wrapper):
G = load_graph(path)
if wrapper is not None:
G.graph['wrapper'] = wrapper
save_graph(G, path)
return {"status": "updated"}
[docs]
def edit_node_label_in_graph(path, node_id, label):
G = load_graph(path)
if node_id not in G:
return {"error": "Node not found"}
G.nodes[node_id]["label"] = label
save_graph(G, path)
return {"status": "updated"}
[docs]
def save_node_log_in_graph(path, node_id, log):
"""DEPRECATED: Use save_node_execution_data_in_graph instead."""
G = load_graph(path)
if node_id not in G:
return {"error": "Node not found"}
G.nodes[node_id]["log"] = log
save_graph(G, path)
return {"status": "updated"}
[docs]
def save_node_execution_data_in_graph(path, node_id, command, stdout, stderr, pid, error_code):
"""Save execution data as separate node attributes (all as strings).
Args:
path: Path to graph file
node_id: Node ID to update
command: The command that was executed (string)
stdout: Standard output from command (string)
stderr: Standard error from command (string)
pid: Process ID (string representation of int)
error_code: Exit code (string representation of int)
Returns:
Dict with status
"""
G = load_graph(path)
if node_id not in G:
return {"error": "Node not found"}
G.nodes[node_id]["command"] = str(command) if command else ""
G.nodes[node_id]["stdout"] = str(stdout) if stdout else ""
G.nodes[node_id]["stderr"] = str(stderr) if stderr else ""
G.nodes[node_id]["pid"] = str(pid) if pid else ""
G.nodes[node_id]["error_code"] = str(error_code) if error_code else ""
save_graph(G, path)
log.info(f"Saved execution data for node {node_id} in graph: {path}")
return {"status": "updated"}
[docs]
def has_blocking_cycle(path):
"""Return True if blocking edges contain a directed cycle."""
G = load_graph(path)
blocking_edges = [
(u, v, data)
for u, v, data in G.edges(data=True)
if data.get("edge_type", "blocking") == "blocking"
]
if not blocking_edges:
return False
blocking_subgraph = nx.DiGraph()
blocking_subgraph.add_nodes_from(G.nodes(data=True))
blocking_subgraph.add_edges_from(blocking_edges)
return not nx.is_directed_acyclic_graph(blocking_subgraph)
[docs]
def edit_statuses_in_graph(path, updates):
"""Batch update statuses for multiple elements (nodes/edges).
Args:
path: Path to graph file
updates: List of dicts with keys: element_type, element_id, value
Returns:
Dict with status and count of updated elements
Raises:
Returns error if any element is not found (fail-fast)
"""
G = load_graph(path)
# Validate all elements exist first (fail-fast)
for update in updates:
element_type = update.get("element_type")
element_id = update.get("element_id")
if element_type == "node":
if element_id not in G:
return {"error": f"Node not found: {element_id}"}
elif element_type == "edge":
found = False
for u, v, data in G.edges(data=True):
if str(data.get("id")) == str(element_id):
found = True
break
if not found:
return {"error": f"Edge not found: {element_id}"}
else:
return {"error": f"Invalid element_type: {element_type}"}
# All elements exist, perform updates
updated = 0
for update in updates:
element_type = update.get("element_type")
element_id = update.get("element_id")
value = update.get("value", "")
if element_type == "node":
G.nodes[element_id]["status"] = value
updated += 1
elif element_type == "edge":
for u, v, data in G.edges(data=True):
if str(data.get("id")) == str(element_id):
data["status"] = value
updated += 1
break
save_graph(G, path)
log.info(f"Batch updated statuses for {updated} elements in graph: {path}")
return {"status": "updated", "count": updated}
[docs]
def remove_node_logs_in_graph(path, node_ids):
"""Remove execution logs from multiple nodes.
Args:
path: Path to graph file
node_ids: List of node IDs to clear logs from
Returns:
Dict with status and count of cleared nodes
Raises:
Returns error if any node is not found (fail-fast)
"""
G = load_graph(path)
# Validate all nodes exist first (fail-fast)
for node_id in node_ids:
if node_id not in G:
return {"error": f"Node not found: {node_id}"}
# All nodes exist, clear logs
log_fields = ["log", "command", "stdout", "stderr", "pid", "error_code"]
cleared = 0
for node_id in node_ids:
for field in log_fields:
if field in G.nodes[node_id]:
del G.nodes[node_id][field]
cleared += 1
save_graph(G, path)
log.info(f"Cleared logs from {cleared} nodes in graph: {path}")
return {"status": "cleared", "count": cleared}