06 — Backend Architecture
Vibrante-Node v2.2.1 — Technical Reference
This document describes the execution engine, graph model, node registry, and all supporting subsystems that make up the Vibrante-Node backend. It is written for contributors, plugin authors, and advanced users who need to understand what happens between the moment the user clicks Run and the moment the last node finishes.
Table of Contents
- Execution Engine Overview
- Event Loop Integration
- Execution Flow Diagram
- Entry Node Detection
- Data-Pull Recursion
- Execution Order Guarantees
- Loop Execution
- GroupNode Sub-Executor
- Bypass Handling
- init_priority System
- Error Propagation
- Cancellation
- node_results Dictionary
- Signal Catalogue
- State Management
- GraphManager
- NodeRegistry
- BaseNode Contract
- SafeRuntime and Error Isolation
- Performance Considerations
1. Execution Engine Overview
NetworkExecutor (src/core/engine.py) is the central execution engine. It inherits from QObject so it can emit Qt signals from within async code. The engine operates entirely on the Qt main thread — there are no worker threads for node execution. Async I/O is achieved by running an asyncio event loop that is manually stepped by a zero-delay QTimer.
What NetworkExecutor does
- Receives a
GraphManagerinstance containing the full node graph. - Instantiates one
BaseNodesubclass perNodeInstanceModel, restoring its dynamic ports and parameter values from the saved workflow. - Determines which nodes are entry nodes and which are data-pull nodes.
- Starts all entry nodes as concurrent asyncio tasks.
- After each node's
execute()returns, propagates outputs to downstream nodes' inputs reactively. - Triggers downstream execution via exec-port connections.
- Emits Qt signals after every state transition so the UI stays in sync.
Why async
Node execute() coroutines may perform I/O: reading files, making HTTP requests, talking to the Houdini command server over TCP, or calling subprocess-based DCCs. If these were synchronous, they would block the Qt event loop and freeze the entire UI. By using asyncio, multiple nodes can be waiting on I/O simultaneously without any threading overhead. A single OS thread handles everything; the event loop interleaves coroutines cooperatively.
The practical rule is: anything that blocks for more than a few milliseconds must use await. CPU-bound work that cannot be split into coroutines should use asyncio.to_thread() to push it to a thread pool without blocking the event loop (see Performance Considerations).
2. Event Loop Integration
Qt and asyncio each expect to own the thread's event loop. Vibrante-Node bridges them with _EventLoopRunner (src/ui/window.py), a lightweight adapter that drives the asyncio loop from inside Qt's event loop.
How _EventLoopRunner works
QTimer(interval=0) ──► _EventLoopRunner._step()
│
├─ loop.call_soon(loop.stop)
│ schedules a stop callback at end of ready queue
│
└─ loop.run_forever()
runs until the stop callback fires
(processes exactly one round of ready callbacks)
_step() is called by Qt every time the event loop is idle (interval = 0). Each call to _step():
- Queues a
loop.stopat the back of the asyncio ready queue. - Calls
loop.run_forever(). Becauseloop.stopwas queued last,run_foreverreturns after draining all currently-ready callbacks — typically in microseconds. - Checks whether the main task is done, and if so calls
_cleanup().
This scheme means:
- All
awaitsuspensions resume on the Qt main thread. - All Qt signals emitted from within
asynccode are queued connections that deliver on the main thread — no cross-thread marshalling is ever needed. QBasicTimerwarnings ("timer can only be used with threads started with QThread") cannot occur because the asyncio loop never runs on a background thread.
Lifecycle
runner = _EventLoopRunner(executor)
runner.start() # creates asyncio task, starts QTimer
# ... Qt main loop runs ...
runner.stop() # calls executor.stop(); loop keeps pumping until task finishes
_EventLoopRunner creates a brand-new asyncio.new_event_loop() for each run, so there are no stale tasks or leftover state between executions.
3. Execution Flow Diagram
User clicks Run
│
▼
MainWindow.execute_pipeline()
│
├─ Build GraphManager from WorkflowModel
├─ Create NetworkExecutor(graph_manager)
├─ Create _EventLoopRunner(executor)
└─ runner.start()
│
▼
executor.run() [async coroutine]
│
├─ 1. PRE-CALCULATE LOOKUP MAPS
│ _incoming_data_conns: {node_id → [ConnectionModel]}
│ _driven_by_flow: set of node_ids with exec_in connected
│
├─ 2. PREPARE ALL NODES
│ for each NodeInstanceModel:
│ instantiate node_class()
│ restore_from_parameters() (dynamic ports)
│ sync parameters (skip ports with live data connections)
│ clear_outputs()
│ install _on_log, _on_output, _check_stopped hooks
│
├─ 3. PRISM BOOTSTRAP (if prism_core_init present)
│
├─ 4. INIT NODES (init_priority > 0, descending order)
│ each init node's full exec chain completes before next
│
├─ 5. ENTRY NODE DETECTION
│ exec_conns present?
│ YES → flow mode:
│ entry = nodes with exec_out but no connected exec_in
│ OR nodes with no exec pins at all
│ NO → data-flow mode:
│ layers = graph_manager.get_topological_sort()
│ execute layer by layer with asyncio.gather()
│
└─ 6. FLOW EXECUTION [per entry node, concurrent tasks]
_execute_flow(node_id)
│
├─ BYPASS CHECK → short-circuit if bypassed
│
├─ PULL UPSTREAM DATA NODES
│ for each input port (in port definition order):
│ if upstream node not in _driven_by_flow:
│ _run_single_node_impl(upstream, is_data_pull=True)
│
├─ node_started signal
│
├─ reset exec output parameters
│
├─ sync all inputs from node_results cache
│
├─ SafeRuntime.run_node_safe(instance.execute, inputs)
│
├─ SUCCESS:
│ node_results[node_id].update(result)
│ node_output signal
│ node_finished("success") signal
│
└─ FAILURE:
node_error signal
node_finished("failed") signal
[exec chain stops here]
set_output("exec_out", True) called inside execute()
│
├─ stores value in node_results
├─ node_output signal (for live wire inspector)
├─ REACTIVE DATA PROPAGATION:
│ for each data connection from this port:
│ target.parameters[to_port] = value
│ node_results[to_node][to_port] = value
│ if target not in _driven_by_flow:
│ discard from _executed_nodes (force re-run in loops)
│ await target.on_parameter_changed(port, value)
└─ FLOW ROUTING (if port type == 'exec' and value is truthy):
for each exec connection from this port:
await _execute_flow(to_node)
4. Entry Node Detection
Entry node detection depends on whether the graph contains any exec connections.
Flow Mode (exec connections present)
A node is an entry node if either of these conditions is true:
- No exec_in pin — the node has only data ports. Examples:
GetVariable, constant value nodes, theprism_core_initnode. These never need to be triggered by an upstream exec; they run when data-pulled. - Has exec_in pin but no connection to it — the node is the start of an exec chain. This is the primary category: the first node the user wires up (e.g. a
Sequenceror a Houdini node withexec_inconnected to nothing).
Both categories apply even within a single graph. A graph might have two independent exec chains, each starting their own entry node. Both are detected and tracked as concurrent tasks.
Nodes already executed by the init phase (_executed_nodes) are excluded from entry detection to avoid double-execution.
# From engine.py (simplified)
entry_nodes = []
for node_id, instance in self.node_instances.items():
if node_id not in connected_nodes: continue
if node_id in self._executed_nodes: continue
has_exec_in_pin = any(p.data_type == 'exec' for p in instance.inputs.values())
if not has_exec_in_pin:
entry_nodes.append(node_id)
elif node_id not in has_exec_in_conn:
entry_nodes.append(node_id)
Data-Flow Mode (no exec connections)
When the graph has no exec connections at all, the engine falls back to classic topological sort execution. Every node in the graph is placed in a dependency layer, and layers are executed in order using asyncio.gather(). This mode is provided for backward compatibility and for simple graphs that do not need flow control.
5. Data-Pull Recursion
Before executing any node, the engine recursively evaluates all upstream data-only nodes. This is called a data pull. Data-only nodes are those not in _driven_by_flow (i.e. no exec connection feeds into them).
The data-pull algorithm
# Pull in port-definition order so values arrive in a predictable sequence
for port_name in instance.inputs:
for conn in incoming:
if conn.to_port == port_name:
from_id = conn.from_node
is_driven_by_flow = from_id in self._driven_by_flow
if not is_driven_by_flow:
await self._run_single_node_impl(from_id, is_data_pull=True)
Guards against double-execution
_run_single_node_impl has two guards:
- Data-pull guard — if
is_data_pull=Trueand the node is already in_executed_nodes, it is skipped immediately. A data node never needs to run twice within one chain of data pulls. - Re-entrancy guard — if the node is in
_currently_executing, the call returns immediately. This prevents infinite recursion if a cycle somehow survived DAG validation.
Cache invalidation in loops
When a node calls set_output() with a new value during loop iteration, the engine discards all downstream data-only nodes from _executed_nodes. This forces them to re-run and pick up the new value on the next iteration rather than serving a stale cached result. This is essential for patterns like ForEach → GetListItem → SomeDataTransform → HoudiniNode.
6. Execution Order Guarantees
Exec-chain order
Within a single exec chain, nodes execute strictly sequentially. Node B does not start until node A's execute() coroutine returns and all reactive data propagation has completed. This is because _execute_flow is a single await chain: A calls set_output("exec_out", True), which awaits _execute_flow(B), and so on.
Parallel chains
Multiple independent exec chains (multiple entry nodes) are launched as concurrent asyncio tasks. They interleave at await points, which means I/O in one chain does not block progress in another. Shared state (e.g. BaseNode.memory) is not protected by a lock; if two chains write to the same variable key simultaneously, the result is non-deterministic. Design parallel chains to be independent or to use different variable namespaces.
Data node order
In data-flow mode (no exec connections), nodes in the same topological layer run concurrently via asyncio.gather(). Nodes in later layers are guaranteed to run after all nodes in earlier layers have completed.
In flow mode, data-only upstream nodes are pulled in port-definition order. The order in which ports were added to the node definition determines the order upstream data nodes are evaluated.
7. Loop Execution
Two builtin nodes implement looping: ForEachNode and WhileLoopNode. Both use set_output to trigger exec flow from within a single execute() coroutine.
ForEach
ForEachNode.execute() iterates over a list. For each element it:
- Calls
await self.set_output("current_item", element)andawait self.set_output("current_index", i). This propagates the values to downstream data nodes and marks those nodes as stale in_executed_nodes(via the reactive propagation logic in the engine). - Calls
await self.set_output("exec_step", True). The engine sees an exec output become truthy and follows all exec connections fromexec_step, executing the downstream chain to completion. - Repeats for the next element.
After all elements are processed, ForEachNode calls await self.set_output("exec_out", True) to signal completion.
Because each set_output("exec_step", True) call awaits the full downstream chain before returning, iterations are strictly sequential — there is no concurrency between loop bodies.
WhileLoop
WhileLoopNode.execute() reads a break_condition input at the start of each iteration. The break_condition port is excluded from cycle detection in GraphManager.get_topological_sort() (via ignore_ports=["break_condition"]), which is what allows the feedback wire without triggering a CircularDependencyError.
On each iteration, WhileLoopNode:
- Checks
inputs.get("break_condition"). If truthy, firesexec_outand returns. - Fires
exec_loop_bodyto execute the loop body chain. - After the loop body completes, pulls the
break_conditionagain (it was discarded from_executed_nodesby reactive propagation). - Repeats.
The is_stopped() check should be called inside long-running while loops to respect cancellation:
while not self.is_stopped():
if inputs.get("break_condition"):
break
await self.set_output("exec_loop_body", True)
inputs = self.parameters.copy() # re-read inputs after body
8. GroupNode Sub-Executor
GroupNode (src/nodes/builtins/group_node.py) encapsulates a complete WorkflowModel and runs it with a nested NetworkExecutor.
Memory preservation
The outer executor sets BaseNode.memory to the current shared variable state before the sub-executor runs. The sub-executor's run() clears BaseNode.memory at the start (as it does for every run). To prevent this from erasing the outer execution's variables, GroupNode.execute() saves and restores memory around the sub-execution:
saved_memory = dict(_BaseNode.memory)
sub_executor = NetworkExecutor(gm)
# ... connect signals ...
try:
await sub_executor.run()
finally:
_BaseNode.memory.clear()
_BaseNode.memory.update(saved_memory)
Input injection
External input values are injected into GroupInNode instances via parameters["_injected_value"] rather than parameters["value"]. The value key is an output port; the engine's clear_outputs() resets it to None during node preparation, before execute() is called. Using a distinct internal key (_injected_value) avoids the wipe.
Output collection
After the sub-executor finishes, GroupNode collects outputs by reading from sub_executor.node_results of the source nodes connected to each GroupOutNode's value input, rather than from GroupOutNode itself. This avoids a timing issue where GroupOutNode (a data-entry node) might have executed before the exec chain populated its input.
Error propagation
GroupNode has two exec output ports:
exec_out— fires when the inner graph completes with no unhandled exceptions.exec_fail— fires when the inner graph has at least one unhandled node exception (anode_errorsignal was emitted inside).
Semantic failures (e.g. a Houdini node returning success=False) are not considered unhandled exceptions; they are the inner graph's responsibility to route via its own exec pins.
9. Bypass Handling
A node can be bypassed by right-clicking it and selecting Bypass (or pressing Ctrl+B). The bypassed flag is stored on NodeInstanceModel.bypassed.
When the engine encounters a bypassed node during exec-chain traversal:
node_startedis emitted immediately.- The node's
execute()is not called. node_finished("success")is emitted.- Every exec connection from this node is followed (all exec outputs fire), so the downstream chain continues uninterrupted.
Data inputs are not propagated through bypassed nodes. If a downstream node depended on this node's data output, it receives whatever default value the port was initialized with. This is intentional — bypass is a flow control shortcut, not a data pass-through.
10. init_priority System
NodeInstanceModel.init_priority is an integer field (default 0). Any value greater than 0 marks the node as an initialisation node.
Execution order
At the start of every run, before entry-node detection, the engine collects all nodes with init_priority > 0, sorts them in descending order (higher number runs first), and executes each one's full exec chain to completion before starting the next:
init_node_ids = sorted(
[nid for nid, m in self.graph_manager.nodes.items() if m.init_priority > 0],
key=lambda nid: self.graph_manager.nodes[nid].init_priority,
reverse=True,
)
for node_id in init_node_ids:
self._track_task(self._execute_flow(node_id))
# wait for all active tasks to finish before next init node
Use cases
prism_core_init— must run before anyprism_*node soPrismCoreis available in shared memory.- Authentication nodes — log in to an API before the main pipeline starts.
- Data-initialisation nodes — pre-load a large dataset so it is ready when fast processing nodes need it.
init_only mode
NetworkExecutor.run(init_only=True) runs only the init phase and then emits execution_finished(True) without proceeding to the main graph. This mode is used immediately after a workflow is loaded, giving init nodes a chance to set up environment state without requiring the user to click Run.
11. Error Propagation
When SafeRuntime.run_node_safe() catches an exception from execute():
node_error(node_id, error_message)is emitted with the full traceback string.node_finished(node_id, "failed")is emitted.- The function returns
False. The caller (_run_single_node_impl) discards control — it does not follow any exec connections from the failed node. - The exec chain stops at the failed node. All downstream nodes in that chain are never executed.
The node_error signal is connected in MainWindow to the log panel, which displays the error in red with the node name.
GroupNode error propagation
When a node inside a GroupNode fails, the sub-executor emits node_error. GroupNode.execute() catches this via a connected slot that appends to inner_errors. After the sub-executor finishes, if inner_errors is non-empty, GroupNode fires exec_fail instead of exec_out and calls self.log_error() to forward the messages to the parent executor's log.
Manual error handling in nodes
Nodes can signal errors without raising exceptions by calling self.log_error(msg). This writes to the log panel at "error" level but does not stop the exec chain. The node should still return a valid dict (with exec_out: True if it wants execution to continue) or exec_out: False to halt the chain deliberately.
12. Cancellation
The user can stop a running pipeline by clicking the Stop button, which calls _EventLoopRunner.stop(), which calls executor.stop().
NetworkExecutor.stop():
- Sets
self._is_stopped = True. - Calls
task.cancel()on every task inself._active_tasks. This injectsCancelledErrorinto any awaiting coroutine. - Sets
self._finished_eventso the wait loop inrun()exits.
Cooperative cancellation inside long nodes
A node whose execute() runs a long synchronous loop should periodically check self.is_stopped():
async def execute(self, inputs):
for frame in range(start, end):
if self.is_stopped():
self.log_info("Stopped by user.")
return {"exec_out": False}
await self.set_output("current_frame", frame)
await self.set_output("exec_step", True)
return {"exec_out": True}
For CPU-bound work offloaded with asyncio.to_thread(), cancellation of the outer task will cancel the to_thread call but the thread itself will continue running until Python's GIL allows it to be interrupted. Design threads to check a shared stop event if immediate termination is required.
13. node_results Dictionary
NetworkExecutor.node_results is a Dict[UUID, Dict[str, Any]] that stores the most recent output values of every node in the current run.
# Structure
node_results = {
node_instance_id_1: {"output_port_a": value, "output_port_b": value},
node_instance_id_2: {"result": value, "exec_out": True},
# ... one entry per executed node ...
}
Population
Values enter node_results in two ways:
- From
execute()return value — afterSafeRuntime.run_node_safe()returns successfully, the engine callsself.node_results[node_id].update(result). - From
set_output()streaming — when a node callsawait self.set_output(name, value), the engine's output handler storesself.node_results[node_id][name] = valueimmediately, before the exec chain fires.
Use by the wire value inspector
MainWindow._on_node_output is connected to node_output. When it fires, the handler calls scene.update_edge_value(node_widget, port_name, value) for every key in the result dict. NodeScene.update_edge_value() finds all Edge items whose from_port matches and calls edge.set_live_value(value). This makes the value visible as a tooltip when hovering over the wire.
node_results persists after execution completes, so users can inspect final wire values without re-running.
14. Signal Catalogue
All signals are defined on NetworkExecutor(QObject). They are all thread-safe because they originate from the Qt main thread (via _EventLoopRunner).
| Signal | Parameters | When it fires |
|---|---|---|
node_started |
UUID node_id |
Immediately before execute() is called (or immediately for bypassed nodes) |
node_finished |
UUID node_id, str status |
After execute() returns; status is "success" or "failed" |
node_error |
UUID node_id, str error_message |
When execute() raises an unhandled exception; message includes full traceback |
node_output |
UUID node_id, dict output_data |
After execute() returns (with the full result dict), and also after each set_output() call (with a single-key dict) |
node_log |
UUID node_id, str message, str level |
When a node calls log_info(), log_success(), or log_error() |
execution_finished |
bool success |
After all tasks complete or after stop() is called; success=False if stopped |
Connecting to signals from outside the executor
executor = NetworkExecutor(graph_manager)
executor.node_started.connect(lambda nid: print(f"Starting {nid}"))
executor.node_log.connect(lambda nid, msg, lvl: print(f"[{lvl}] {msg}"))
executor.execution_finished.connect(lambda ok: print(f"Done: {ok}"))
15. State Management
What is reset on each run
BaseNode.memory— cleared at the start ofrun(). This is a class-level dict shared by all nodes.node_results— reset to{}at the start ofrun().node_instances— re-instantiated from scratch each run._executed_nodes,_currently_executing— cleared._is_stopped— set toFalse.- All node output parameters — reset to their port defaults by
clear_outputs().
What persists between runs
NodeInstanceModel.parameters— widget values entered by the user persist across runs. Only ports with an incoming data connection are reset to the port's default at run start (to prevent stale widget values from overriding live data).NodeRegistry._classes/_definitions— the node library is not re-scanned between runs.- File handles, external connections, and any Python-level state held by node instances are discarded because
node_instancesis rebuilt each run.
BaseNode.memory
BaseNode.memory is a class-level Dict[str, Any] that acts as a simple shared scratchpad. Nodes access it directly:
# SetVariable node
BaseNode.memory["my_var"] = value
# GetVariable node
value = BaseNode.memory.get("my_var")
Memory is shared across all nodes in the current run, including nodes inside GroupNode sub-executors (with the save/restore mechanism described in section 8). It is not thread-safe for concurrent access from multiple parallel exec chains.
16. GraphManager
GraphManager (src/core/graph.py) is a lightweight container for the graph's node and connection data. It does not execute anything; it is purely a data structure.
Key attributes
| Attribute | Type | Description |
|---|---|---|
nodes |
Dict[UUID, NodeInstanceModel] |
All nodes, keyed by instance UUID |
connections |
List[ConnectionModel] |
All wires between ports |
Cycle detection
add_connection() calls is_dag() after tentatively appending the new connection. is_dag() runs get_topological_sort() which uses the toposort library. If a CircularDependencyError is raised, the connection is removed and add_connection() returns False.
The break_condition port is excluded from dependency calculation by default, allowing while-loop feedback wires without being flagged as cycles:
def get_topological_sort(self, ignore_ports=None):
if ignore_ports is None:
ignore_ports = ["break_condition"]
# ...
for conn in self.connections:
if conn.to_port in ignore_ports:
continue
data[conn.to_node].add(conn.from_node)
Serialization
to_model() returns a WorkflowModel (a Pydantic model). from_model(model) reconstructs the graph from a WorkflowModel. Both operations are O(n) in the number of nodes and connections.
17. NodeRegistry
NodeRegistry (src/core/registry.py) is a class-level singleton that stores all node definitions and their compiled Python classes.
Three storage maps
| Map | Key | Value |
|---|---|---|
_classes |
node_id (str) |
Type[BaseNode] subclass |
_definitions |
node_id (str) |
NodeDefinitionJSON Pydantic model |
_source_paths |
node_id (str) |
Absolute path to the .json file on disk |
Loading order on startup
NodeRegistry.load_all_with_extras(bundled_nodes)
│
├─ register_builtins()
│ Registers Python-class nodes (ForEach, WhileLoop, etc.)
│ GroupInNode, GroupOutNode, GroupNode go into _classes only
│ (hidden from search popup, still executable)
│
├─ _load_directory(bundled_nodes)
│ walks nodes/ recursively, calls load_node() for each .json
│
└─ extra_dirs from v_nodes_dir env var (OS path separator)
_load_directory(extra_dir) for each
JSON node registration
register_definition(definition) does two things:
- Normalises the definition: inserts
exec_in/exec_outports ifuse_exec=Trueand they are missing; applies the Prism auto-rewrite ifnode_idstarts withprism_. - Compiles the
python_codewithexec()and extracts the node class viaregister_node()(preferred) or a bareexecute()function (simplified format).
Hot-reload
reload_node_definition(node_id) re-reads the JSON from _source_paths[node_id] and re-registers. After calling this, any live NodeWidget instances bound to this node_id must be refreshed to pick up new ports or code. The Node Builder dialog calls this automatically after saving a node edit.
Prism auto-rewrite
When node_id.startswith("prism_") and the code does not already import resolve_prism_core, the registry automatically prepends the import and rewrites:
core = inputs.get('core')
# becomes:
core = resolve_prism_core(inputs)
This means every prism node automatically resolves PrismCore from shared memory without the author needing to wire a core output between nodes.
18. BaseNode Contract
Every node class must inherit from BaseNode and implement execute(). The full contract:
Required
class MyNode(BaseNode):
name = "my_node" # must match node_id; used as class identifier
def __init__(self):
super().__init__() # adds exec_in, exec_out automatically
self.add_input("some_input", "string", widget_type="text")
self.add_output("some_output", "string")
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
value = inputs.get("some_input", "")
return {"some_output": result, "exec_out": True}
def register_node():
return MyNode
Optional hooks
| Method | When called | Notes |
|---|---|---|
restore_from_parameters(params) |
By engine before each run | Restore dynamic ports from saved workflow |
on_plug_sync(port, is_input, other, other_port) |
Synchronously when a wire is connected | Called on the Qt main thread |
on_unplug_sync(port, is_input) |
Synchronously when a wire is removed | Called on the Qt main thread |
async on_plug(port, is_input, other, other_port) |
Asynchronously when a wire is connected | Use for async initialization |
async on_unplug(port, is_input) |
Asynchronously when a wire is removed | |
async on_parameter_changed(name, value) |
When reactive data propagation updates an input | Re-run reactive nodes (TwoWaySwitch, GetVariable) |
Streaming outputs
set_output(name, value) is an async method that pushes a single output value during execution without waiting for execute() to return:
async def execute(self, inputs):
for i in range(10):
await self.set_output("progress", i / 10.0)
await asyncio.sleep(0.1) # simulate work
return {"result": "done", "exec_out": True}
Every call to set_output() triggers reactive data propagation and, if the port is an exec port with a truthy value, triggers downstream execution.
19. SafeRuntime and Error Isolation
SafeRuntime.run_node_safe() wraps every execute() call in a try/except:
@staticmethod
async def run_node_safe(fn, inputs):
try:
result = await fn(inputs)
return True, result, None
except Exception as e:
error_msg = f"Node execution failed: {str(e)}\n{traceback.format_exc()}"
return False, None, error_msg
This means a bug in one node's execute() never crashes the engine process. The error is captured as a string, emitted via node_error, and execution of that branch halts cleanly.
SafeRuntime does not suppress asyncio.CancelledError — cancellation from runner.stop() propagates normally through await chains.
20. Performance Considerations
Blocking calls are dangerous
The asyncio event loop and the Qt event loop share the same OS thread. Any blocking call inside execute() — a synchronous time.sleep(), a blocking socket receive without timeout, a subprocess.run() call without async — will freeze the Qt UI for the duration of the block. No mouse events, no repaints, no signal deliveries.
Rules:
- Use await asyncio.sleep(n) instead of time.sleep(n).
- Use asyncio.open_connection() instead of blocking sockets.
- Use asyncio.create_subprocess_exec() for subprocesses.
- For blocking Python code that cannot be made async, use asyncio.to_thread().
Offloading CPU work
import asyncio
async def execute(self, inputs):
heavy_data = inputs.get("data")
result = await asyncio.to_thread(self._cpu_intensive, heavy_data)
return {"output": result, "exec_out": True}
def _cpu_intensive(self, data):
# This runs in a thread pool — does not block the event loop
return expensive_computation(data)
asyncio.to_thread() is available in Python 3.9+. For older versions use loop.run_in_executor(None, fn, *args).
Houdini bridge latency
Each HouBridge RPC call involves: serialising JSON, a TCP round-trip to the local Houdini server, hdefereval.executeDeferred() scheduling on Houdini's main thread, execution, serialisation, and a TCP reply. On a local machine this is typically 1–5 ms per call (with TCP_NODELAY set). A node that makes 100 bridge calls will take ~100–500 ms. Batch multiple parameter sets with bridge.set_parms() and batch child queries with a single bridge.run_code() call where possible.
Graph size
For graphs with 100+ nodes, the topological sort (toposort library) and connection lookup become measurable. The engine pre-calculates _incoming_data_conns and _driven_by_flow once per run to avoid per-node O(n) scans. The main loop is O(n + e) where n = node count and e = connection count. Execution time is dominated by execute() coroutine time, not engine overhead.
Memory
node_results stores every output value of every node. For nodes that output large data structures (big lists, numpy arrays, large strings), this can accumulate significant memory during a run. Values are discarded when node_results is reset at the next run's start. If memory is a concern, avoid storing large objects in node_results across the run — produce them, consume them, and release references.