Skip to content

DeployedGraph

The DeployedGraph class orchestrates distributed execution of Falcon graphs using Ray.

Overview

DeployedGraph wraps a Graph and handles:

  • Ray actor initialization for each node
  • Distributed sample generation and training
  • Coordination between nodes during inference

Class Reference

DeployedGraph

DeployedGraph(graph, model_path=None, log_config=None)

Initialize a DeployedGraph with the given conceptual graph of nodes.

Note: This class uses falcon.info(), falcon.warning() etc. for logging. These functions use the module-level logger set by cli.py via set_logger().

Source code in falcon/core/deployed_graph.py
def __init__(self, graph, model_path=None, log_config=None):
    """Initialize a DeployedGraph with the given conceptual graph of nodes.

    Note: This class uses falcon.info(), falcon.warning() etc. for logging.
    These functions use the module-level logger set by cli.py via set_logger().
    """
    self.graph = graph
    self.model_path = model_path
    self.log_config = log_config or {}
    self.wrapped_nodes_dict = {}
    self.monitor_bridge = None

    self._create_monitor_bridge()
    self.deploy_nodes()

deploy_nodes

deploy_nodes()

Deploy all nodes in the graph as Ray actors.

Source code in falcon/core/deployed_graph.py
def deploy_nodes(self):
    """Deploy all nodes in the graph as Ray actors."""
    info("Spinning up graph...")

    # Create all actors (non-blocking)
    for node in self.graph.node_list:
        if node.num_actors > 1:
            self.wrapped_nodes_dict[node.name] = MultiplexNodeWrapper.remote(
                node.actor_config,
                node,
                self.graph,
                node.num_actors,
                self.model_path,
                self.log_config,
            )
        else:
            self.wrapped_nodes_dict[node.name] = NodeWrapper.options(
                **node.actor_config
            ).remote(node, self.graph, self.model_path, self.log_config)

    # Wait for all actors to initialize and register with monitor bridge
    for name, actor in self.wrapped_nodes_dict.items():
        try:
            # MultiplexNodeWrapper has wait_ready(), NodeWrapper uses __ray_ready__
            if hasattr(actor, 'wait_ready'):
                ray.get(actor.wait_ready.remote())
            else:
                ray.get(actor.__ray_ready__.remote())
            info(f"  ✓ {name}")

            # Register node with monitor bridge
            if self.monitor_bridge:
                ray.get(self.monitor_bridge.register_node.remote(name, actor))
        except ray.exceptions.RayActorError as e:
            raise RuntimeError(f"Failed to initialize node '{name}': {e}") from e

sample

sample(num_samples, conditions=None)

Run forward sampling through the graph.

Source code in falcon/core/deployed_graph.py
def sample(self, num_samples, conditions=None):
    """Run forward sampling through the graph."""
    return self._execute_graph(
        num_samples,
        self.graph.sorted_node_names,
        conditions or {},
        "sample",
    )

sample_posterior

sample_posterior(num_samples, conditions=None)

Run posterior sampling through the inference graph.

Source code in falcon/core/deployed_graph.py
def sample_posterior(self, num_samples, conditions=None):
    """Run posterior sampling through the inference graph."""
    return self._execute_graph(
        num_samples,
        self.graph.sorted_inference_node_names,
        conditions or {},
        "sample_posterior",
    )

sample_proposal

sample_proposal(num_samples, conditions=None)

Run proposal sampling through the inference graph.

Source code in falcon/core/deployed_graph.py
def sample_proposal(self, num_samples, conditions=None):
    """Run proposal sampling through the inference graph."""
    return self._execute_graph(
        num_samples,
        self.graph.sorted_inference_node_names,
        conditions or {},
        "sample_proposal",
    )

shutdown

shutdown()

Shut down the deployed graph and release resources.

Source code in falcon/core/deployed_graph.py
def shutdown(self):
    """Shut down the deployed graph and release resources."""
    ray.get([node.shutdown.remote() for node in self.wrapped_nodes_dict.values()])

save

save(graph_dir)

Save the deployed graph node status.

Source code in falcon/core/deployed_graph.py
def save(self, graph_dir):
    """Save the deployed graph node status."""
    graph_dir = graph_dir.expanduser().resolve()
    graph_dir.mkdir(parents=True, exist_ok=True)
    save_futures = []
    for name, node in self.wrapped_nodes_dict.items():
        node_dir = graph_dir / name
        save_future = node.save.remote(node_dir)
        save_futures.append(save_future)
    ray.get(save_futures)

load

load(graph_dir)

Load the deployed graph nodes status.

Source code in falcon/core/deployed_graph.py
def load(self, graph_dir):
    """Load the deployed graph nodes status."""
    info(f"Loading deployed graph from: {graph_dir}")
    load_futures = []
    for name, node in self.wrapped_nodes_dict.items():
        node_dir = Path(graph_dir) / name
        load_future = node.load.remote(node_dir)
        load_futures.append(load_future)
    ray.get(load_futures)

pause

pause()

Pause all nodes in the deployed graph.

Source code in falcon/core/deployed_graph.py
def pause(self):
    """Pause all nodes in the deployed graph."""
    pause_futures = []
    for _, node in self.wrapped_nodes_dict.items():
        pause_future = node.pause.remote()
        pause_futures.append(pause_future)
    ray.get(pause_futures)

resume

resume()

Resume all nodes in the deployed graph.

Source code in falcon/core/deployed_graph.py
def resume(self):
    """Resume all nodes in the deployed graph."""
    resume_futures = []
    for _, node in self.wrapped_nodes_dict.items():
        resume_future = node.resume.remote()
        resume_futures.append(resume_future)
    ray.get(resume_futures)

interrupt

interrupt()

Interrupt all nodes in the deployed graph.

Source code in falcon/core/deployed_graph.py
def interrupt(self):
    """Interrupt all nodes in the deployed graph."""
    interrupt_futures = []
    for _, node in self.wrapped_nodes_dict.items():
        interrupt_future = node.interrupt.remote()
        interrupt_futures.append(interrupt_future)
    ray.get(interrupt_futures)