Skip to content

DeployedGraph

The DeployedGraph class orchestrates distributed execution of Falcon graphs using Ray.

Overview

DeployedGraph wraps a Graph and handles:

  • Ray actor initialization for each node
  • Distributed sample generation and training
  • Coordination between nodes during inference

Class Reference

DeployedGraph

DeployedGraph(graph, model_path=None, log_config=None)

Initialize a DeployedGraph with the given conceptual graph of nodes.

Note: This class uses falcon.info(), falcon.warning() etc. for logging. These functions use the module-level logger set by cli.py via set_logger().

Source code in falcon/core/deployed_graph.py
def __init__(self, graph, model_path=None, log_config=None):
    """Initialize a DeployedGraph with the given conceptual graph of nodes.

    Note: This class uses falcon.info(), falcon.warning() etc. for logging.
    These functions use the module-level logger set by cli.py via set_logger().
    """
    self.graph = graph
    self.model_path = model_path
    self.log_config = log_config or {}
    self.wrapped_nodes_dict = {}
    self.monitor_bridge = None

    self._create_monitor_bridge()
    self.deploy_nodes()

deploy_nodes

deploy_nodes()

Deploy all nodes in the graph as Ray actors.

Source code in falcon/core/deployed_graph.py
def deploy_nodes(self):
    """Deploy all nodes in the graph as Ray actors."""
    info("Spinning up graph...")
    self._check_resource_budget()

    # Create all actors (non-blocking)
    for node in self.graph.node_list:
        if node.num_actors > 1:
            self.wrapped_nodes_dict[node.name] = MultiplexNodeWrapper.remote(
                node.actor_config,
                node,
                self.graph,
                node.num_actors,
                self.model_path,
                self.log_config,
            )
        else:
            self.wrapped_nodes_dict[node.name] = NodeWrapper.options(
                **_ray_options(node.actor_config)
            ).remote(node, self.graph, self.model_path, self.log_config)

    # Wait for all actors to initialize and register with monitor bridge
    for name, actor in self.wrapped_nodes_dict.items():
        try:
            # MultiplexNodeWrapper has wait_ready(), NodeWrapper uses __ray_ready__
            if hasattr(actor, 'wait_ready'):
                ready_ref = actor.wait_ready.remote()
            else:
                ready_ref = actor.__ray_ready__.remote()
            done, _ = ray.wait([ready_ref], timeout=60.0)
            if not done:
                raise RuntimeError(
                    f"Node '{name}' did not initialize within 60 s. "
                    "This usually means Ray cannot schedule the actor — "
                    "check that ray.num_gpus/num_cpus in your config do not "
                    "exceed available cluster resources."
                )
            ray.get(done[0])  # re-raise any actor-side exception
            info(f"  ✓ {name}")

            # Register node with monitor bridge
            if self.monitor_bridge:
                ray.get(self.monitor_bridge.register_node.remote(name, actor))
        except ray.exceptions.RayActorError as e:
            raise RuntimeError(f"Failed to initialize node '{name}': {e}") from e

sample

sample(num_samples, conditions=None)

Run forward sampling through the graph.

Parameters:

Name Type Description Default
num_samples

Number of samples to generate

required
conditions

Optional dict of pre-set conditions (arrays/tensors)

None

Returns:

Type Description

List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values

Source code in falcon/core/deployed_graph.py
def sample(self, num_samples, conditions=None):
    """Run forward sampling through the graph.

    Args:
        num_samples: Number of samples to generate
        conditions: Optional dict of pre-set conditions (arrays/tensors)

    Returns:
        List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values
    """
    condition_refs = self._arrays_to_condition_refs(conditions, num_samples) if conditions else {}
    return self._execute_graph(
        num_samples, self.graph.forward_order, condition_refs, "sample",
    )

sample_posterior

sample_posterior(num_samples, conditions=None)

Run posterior sampling through the inference graph.

Parameters:

Name Type Description Default
num_samples

Number of samples to generate

required
conditions

Optional dict of pre-set conditions (arrays/tensors)

None

Returns:

Type Description

List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values

Source code in falcon/core/deployed_graph.py
def sample_posterior(self, num_samples, conditions=None):
    """Run posterior sampling through the inference graph.

    Args:
        num_samples: Number of samples to generate
        conditions: Optional dict of pre-set conditions (arrays/tensors)

    Returns:
        List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values
    """
    condition_refs = self._arrays_to_condition_refs(conditions, num_samples) if conditions else {}
    return self._execute_graph(
        num_samples, self.graph.backward_order, condition_refs, "sample_posterior",
    )

sample_proposal

sample_proposal(num_samples, conditions=None)

Run proposal sampling through the inference graph.

Parameters:

Name Type Description Default
num_samples

Number of samples to generate

required
conditions

Optional dict of pre-set conditions (arrays/tensors)

None

Returns:

Type Description

List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values

Source code in falcon/core/deployed_graph.py
def sample_proposal(self, num_samples, conditions=None):
    """Run proposal sampling through the inference graph.

    Args:
        num_samples: Number of samples to generate
        conditions: Optional dict of pre-set conditions (arrays/tensors)

    Returns:
        List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values
    """
    condition_refs = self._arrays_to_condition_refs(conditions, num_samples) if conditions else {}
    return self._execute_graph(
        num_samples, self.graph.backward_order, condition_refs, "sample_proposal",
    )

sample_ppd

sample_ppd(num_samples, conditions=None)

Run posterior predictive distribution (PPD) sampling.

Two-phase: sample latent variables from the posterior, then forward-simulate observables from those posterior samples.

Parameters:

Name Type Description Default
num_samples

Number of samples to generate

required
conditions

Observations dict (same as passed to sample_posterior)

None

Returns:

Type Description

List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values (both posterior latents and forward-simulated observables)

Source code in falcon/core/deployed_graph.py
def sample_ppd(self, num_samples, conditions=None):
    """Run posterior predictive distribution (PPD) sampling.

    Two-phase: sample latent variables from the posterior, then forward-simulate
    observables from those posterior samples.

    Args:
        num_samples: Number of samples to generate
        conditions: Observations dict (same as passed to sample_posterior)

    Returns:
        List[Dict[str, ObjectRef]]: One dict per sample with refs to all node values
            (both posterior latents and forward-simulated observables)
    """
    observation_refs = self._arrays_to_condition_refs(conditions, num_samples) if conditions else {}

    # Phase 1: theta ~ p(theta | x_obs)
    posterior_refs = self._execute_graph(
        num_samples, self.graph.backward_order, observation_refs, "sample_posterior",
    )

    # Phase 2: x_ppd ~ p(x | theta)  — forward-simulate fresh observables
    # Condition on posterior theta; observed nodes are NOT pre-set here so they
    # get re-simulated rather than returning the original observations.
    posterior_condition_refs = self._extract_value_refs(posterior_refs)
    forward_refs = self._execute_graph(
        num_samples, self.graph.forward_order, posterior_condition_refs, "sample",
    )

    # Merge: posterior_refs holds theta.value; forward_refs holds x_ppd.value.
    # Conditioned (theta) nodes are absent from forward_refs due to the skip in
    # _execute_graph, so update is safe — no key collisions.
    merged = [dict(p) for p in posterior_refs]
    for i, fwd_dict in enumerate(forward_refs):
        merged[i].update(fwd_dict)
    return merged

shutdown

shutdown()

Shut down the deployed graph and release resources.

Source code in falcon/core/deployed_graph.py
def shutdown(self):
    """Shut down the deployed graph and release resources."""
    ray.get([node.shutdown.remote() for node in self.wrapped_nodes_dict.values()])

launch

launch(dataset_manager, observations, graph_path=None, stop_check=None)

Launch training.

Parameters:

Name Type Description Default
dataset_manager

Dataset manager for samples

required
observations

Observation data

required
graph_path

Path to save/load graph

None
stop_check

Optional callable that returns True when graceful stop is requested

None
Source code in falcon/core/deployed_graph.py
def launch(self, dataset_manager, observations, graph_path=None, stop_check=None):
    """Launch training.

    Args:
        dataset_manager: Dataset manager for samples
        observations: Observation data
        graph_path: Path to save/load graph
        stop_check: Optional callable that returns True when graceful stop is requested
    """
    asyncio.run(self._launch(dataset_manager, observations, graph_path=graph_path, stop_check=stop_check))

save

save(graph_dir)

Save the deployed graph node status.

Source code in falcon/core/deployed_graph.py
def save(self, graph_dir):
    """Save the deployed graph node status."""
    graph_dir = graph_dir.expanduser().resolve()
    graph_dir.mkdir(parents=True, exist_ok=True)
    save_futures = []
    for name, node in self.wrapped_nodes_dict.items():
        node_dir = graph_dir / name
        save_future = node.save.remote(node_dir)
        save_futures.append(save_future)
    ray.get(save_futures)

load

load(graph_dir)

Load the deployed graph nodes status.

Source code in falcon/core/deployed_graph.py
def load(self, graph_dir):
    """Load the deployed graph nodes status."""
    info(f"Loading deployed graph from: {graph_dir}")
    load_futures = []
    for name, node in self.wrapped_nodes_dict.items():
        node_dir = Path(graph_dir) / name
        load_future = node.load.remote(node_dir)
        load_futures.append(load_future)
    ray.get(load_futures)