Initialize a DeployedGraph with the given conceptual graph of nodes.
Note: This class uses falcon.info(), falcon.warning() etc. for logging.
These functions use the module-level logger set by cli.py via set_logger().
Source code in falcon/core/deployed_graph.py
| def __init__(self, graph, model_path=None, log_config=None):
"""Initialize a DeployedGraph with the given conceptual graph of nodes.
Note: This class uses falcon.info(), falcon.warning() etc. for logging.
These functions use the module-level logger set by cli.py via set_logger().
"""
self.graph = graph
self.model_path = model_path
self.log_config = log_config or {}
self.wrapped_nodes_dict = {}
self.monitor_bridge = None
self._create_monitor_bridge()
self.deploy_nodes()
|
deploy_nodes
Deploy all nodes in the graph as Ray actors.
Source code in falcon/core/deployed_graph.py
| def deploy_nodes(self):
"""Deploy all nodes in the graph as Ray actors."""
info("Spinning up graph...")
# Create all actors (non-blocking)
for node in self.graph.node_list:
if node.num_actors > 1:
self.wrapped_nodes_dict[node.name] = MultiplexNodeWrapper.remote(
node.actor_config,
node,
self.graph,
node.num_actors,
self.model_path,
self.log_config,
)
else:
self.wrapped_nodes_dict[node.name] = NodeWrapper.options(
**node.actor_config
).remote(node, self.graph, self.model_path, self.log_config)
# Wait for all actors to initialize and register with monitor bridge
for name, actor in self.wrapped_nodes_dict.items():
try:
# MultiplexNodeWrapper has wait_ready(), NodeWrapper uses __ray_ready__
if hasattr(actor, 'wait_ready'):
ray.get(actor.wait_ready.remote())
else:
ray.get(actor.__ray_ready__.remote())
info(f" ✓ {name}")
# Register node with monitor bridge
if self.monitor_bridge:
ray.get(self.monitor_bridge.register_node.remote(name, actor))
except ray.exceptions.RayActorError as e:
raise RuntimeError(f"Failed to initialize node '{name}': {e}") from e
|
sample
sample(num_samples, conditions=None)
Run forward sampling through the graph.
Source code in falcon/core/deployed_graph.py
| def sample(self, num_samples, conditions=None):
"""Run forward sampling through the graph."""
return self._execute_graph(
num_samples,
self.graph.sorted_node_names,
conditions or {},
"sample",
)
|
sample_posterior
sample_posterior(num_samples, conditions=None)
Run posterior sampling through the inference graph.
Source code in falcon/core/deployed_graph.py
| def sample_posterior(self, num_samples, conditions=None):
"""Run posterior sampling through the inference graph."""
return self._execute_graph(
num_samples,
self.graph.sorted_inference_node_names,
conditions or {},
"sample_posterior",
)
|
sample_proposal
sample_proposal(num_samples, conditions=None)
Run proposal sampling through the inference graph.
Source code in falcon/core/deployed_graph.py
| def sample_proposal(self, num_samples, conditions=None):
"""Run proposal sampling through the inference graph."""
return self._execute_graph(
num_samples,
self.graph.sorted_inference_node_names,
conditions or {},
"sample_proposal",
)
|
shutdown
Shut down the deployed graph and release resources.
Source code in falcon/core/deployed_graph.py
| def shutdown(self):
"""Shut down the deployed graph and release resources."""
ray.get([node.shutdown.remote() for node in self.wrapped_nodes_dict.values()])
|
save
Save the deployed graph node status.
Source code in falcon/core/deployed_graph.py
| def save(self, graph_dir):
"""Save the deployed graph node status."""
graph_dir = graph_dir.expanduser().resolve()
graph_dir.mkdir(parents=True, exist_ok=True)
save_futures = []
for name, node in self.wrapped_nodes_dict.items():
node_dir = graph_dir / name
save_future = node.save.remote(node_dir)
save_futures.append(save_future)
ray.get(save_futures)
|
load
Load the deployed graph nodes status.
Source code in falcon/core/deployed_graph.py
| def load(self, graph_dir):
"""Load the deployed graph nodes status."""
info(f"Loading deployed graph from: {graph_dir}")
load_futures = []
for name, node in self.wrapped_nodes_dict.items():
node_dir = Path(graph_dir) / name
load_future = node.load.remote(node_dir)
load_futures.append(load_future)
ray.get(load_futures)
|
pause
Pause all nodes in the deployed graph.
Source code in falcon/core/deployed_graph.py
| def pause(self):
"""Pause all nodes in the deployed graph."""
pause_futures = []
for _, node in self.wrapped_nodes_dict.items():
pause_future = node.pause.remote()
pause_futures.append(pause_future)
ray.get(pause_futures)
|
resume
Resume all nodes in the deployed graph.
Source code in falcon/core/deployed_graph.py
| def resume(self):
"""Resume all nodes in the deployed graph."""
resume_futures = []
for _, node in self.wrapped_nodes_dict.items():
resume_future = node.resume.remote()
resume_futures.append(resume_future)
ray.get(resume_futures)
|
interrupt
Interrupt all nodes in the deployed graph.
Source code in falcon/core/deployed_graph.py
| def interrupt(self):
"""Interrupt all nodes in the deployed graph."""
interrupt_futures = []
for _, node in self.wrapped_nodes_dict.items():
interrupt_future = node.interrupt.remote()
interrupt_futures.append(interrupt_future)
ray.get(interrupt_futures)
|