Skip to content

Embeddings

Declarative embedding pipelines for observation processing.

Overview

The falcon.embeddings package provides tools for building observation embedding networks via declarative YAML configuration. Embeddings map raw observations to lower-dimensional summary statistics before they enter the estimator.

Declarative Configuration

Embedding networks are defined in YAML using the _target_ / _input_ system:

  • _target_: Import path of the nn.Module class to instantiate
  • _input_: List of observation node names (or nested sub-embeddings) that feed into this module

Basic Embedding

embedding:
  _target_: model.MyEmbedding
  _input_: [x]

Multi-Input Embedding

embedding:
  _target_: model.MyEmbedding
  _input_: [x, y]  # Multiple observation nodes

Nested Pipeline

Sub-embeddings can be nested arbitrarily deep. Each _input_ entry can itself be a _target_ / _input_ block:

embedding:
  _target_: model.Concatenate
  _input_:
    - _target_: timm.create_model
      model_name: resnet18
      pretrained: true
      num_classes: 0
      _input_:
        _target_: model.Unsqueeze
        _input_: [image]
    - _target_: torch.nn.Linear
      in_features: 64
      out_features: 32
      _input_: [metadata]

Built-in Utilities

Normalization

LazyOnlineNorm

Online normalization with lazy initialization and optional momentum adaptation. Normalizes inputs to zero mean and unit variance using exponential moving averages.

embedding:
  _target_: falcon.embeddings.LazyOnlineNorm
  _input_: [x]
  momentum: 0.01

DiagonalWhitener

Diagonal whitening with optional Hartley transform preprocessing and momentum-based running statistics.

embedding:
  _target_: falcon.embeddings.DiagonalWhitener
  _input_: [x]

hartley_transform

A standalone function for computing the discrete Hartley transform, useful as a preprocessing step.

Dimensionality Reduction

PCAProjector

Streaming dual PCA projector with momentum-based updates. Performs online SVD for dimensionality reduction with variance-based prior (ridge-like regularization) and optional output normalization.

embedding:
  _target_: falcon.embeddings.PCAProjector
  _input_: [x]
  n_components: 32

Class Reference

instantiate_embedding

instantiate_embedding(embedding_config)

Instantiate embedding pipeline from config.

Source code in falcon/embeddings/builder.py
def instantiate_embedding(embedding_config: Dict[str, Any]) -> EmbeddingWrapper:
    """Instantiate embedding pipeline from config."""
    required_input_keys = _collect_input_keys(embedding_config)
    modules, input_keys_list, output_keys, _ = _flatten_config_to_modules(
        embedding_config
    )
    return EmbeddingWrapper(modules, input_keys_list, output_keys, required_input_keys)

EmbeddingWrapper

EmbeddingWrapper(modules, input_keys_list, output_keys, required_input_keys)

Bases: Module

Sequential execution of modules with shared data dictionary.

Source code in falcon/embeddings/builder.py
def __init__(
    self,
    modules: List[nn.Module],
    input_keys_list: List[List[str]],
    output_keys: List[str],
    required_input_keys: List[str],
):
    super().__init__()
    self.modules_list = nn.ModuleList(modules)
    self.input_keys_list = input_keys_list
    self.output_keys = output_keys
    self.input_keys = required_input_keys

LazyOnlineNorm

LazyOnlineNorm(momentum=0.01, epsilon=1e-20, log_prefix=None, adaptive_momentum=False, monotonic_variance=True, use_log_update=False)

Bases: Module

Source code in falcon/embeddings/norms.py
def __init__(
    self,
    momentum=0.01,
    epsilon=1e-20,
    log_prefix=None,
    adaptive_momentum=False,
    monotonic_variance=True,
    use_log_update=False,
):
    super().__init__()
    self.momentum = momentum
    self.epsilon = epsilon
    self.log_prefix = log_prefix
    #self.log_prefix = log_prefix + ":" if log_prefix else ""
    self.monotonic_variance = monotonic_variance
    self.use_log_update = use_log_update
    self.adaptive_momentum = adaptive_momentum

    self.register_buffer("running_mean", None)
    self.register_buffer("running_var", None)
    self.register_buffer("min_variance", None)

    self.initialized = False

DiagonalWhitener

DiagonalWhitener(dim, momentum=0.1, eps=1e-08, use_fourier=False, track_mean=True)

Bases: Module

dim: number of features (last dimension of x) momentum: how much of the new batch stats to use (PyTorch-style) eps: small constant for numerical stability use_fourier: if True, apply Hartley transform before whitening

Source code in falcon/embeddings/norms.py
def __init__(self, dim, momentum=0.1, eps=1e-8, use_fourier=False, track_mean=True):
    """
    dim: number of features (last dimension of x)
    momentum: how much of the new batch stats to use (PyTorch-style)
    eps: small constant for numerical stability
    use_fourier: if True, apply Hartley transform before whitening
    """
    super().__init__()
    self.dim = dim
    self.momentum = momentum
    self.eps = eps
    self.use_fourier = use_fourier
    self.track_mean = track_mean

    self.register_buffer("running_mean", torch.zeros(dim))
    self.register_buffer("running_var", torch.ones(dim))
    self.initialized = False

update

update(x)

Update running mean and variance from current batch x: Tensor of shape (batch_size, dim)

Source code in falcon/embeddings/norms.py
def update(self, x):
    """
    Update running mean and variance from current batch
    x: Tensor of shape (batch_size, dim)
    """
    if self.use_fourier:
        x = hartley_transform(x)

    batch_mean = x.mean(dim=0)
    batch_var = x.var(dim=0, unbiased=False)

    if not self.initialized:
        self.running_mean = batch_mean.detach()
        self.running_var = batch_var.detach()
        self.initialized = True
    else:
        self.running_mean = (
            1 - self.momentum
        ) * self.running_mean + self.momentum * batch_mean.detach()
        self.running_var = (
            1 - self.momentum
        ) * self.running_var + self.momentum * batch_var.detach()
    if not self.track_mean:
        self.running_mean *= 0

__call__

__call__(x)

Apply whitening: (x - mean) / std If use_fourier, whitening happens in Hartley space and is transformed back.

Source code in falcon/embeddings/norms.py
def __call__(self, x):
    """
    Apply whitening: (x - mean) / std
    If use_fourier, whitening happens in Hartley space and is transformed back.
    """
    if self.use_fourier:
        x = hartley_transform(x)

    std = torch.sqrt(self.running_var + self.eps)
    x_white = (x - self.running_mean) / std

    if self.use_fourier:
        x_white = hartley_transform(x_white)

    return x_white

hartley_transform

hartley_transform(x)

Hartley transform: H(x) = Re(FFT(x)) - Im(FFT(x)) It is its own inverse: H(H(x)) = x

Source code in falcon/embeddings/norms.py
def hartley_transform(x):
    """
    Hartley transform: H(x) = Re(FFT(x)) - Im(FFT(x))
    It is its own inverse: H(H(x)) = x
    """
    fft = torch.fft.fft(x, dim=-1, norm="ortho")
    return fft.real - fft.imag

PCAProjector

PCAProjector(n_components=10, oversampling=10, buffer_size=256, momentum=0.1, normalize_output=True, use_prior=True)

Bases: Module

A streaming, dual PCA projector with momentum-based updates.

This class maintains a running mean and a set of principal components (eigenvectors) and eigenvalues (variances) of the input data, computed using a dual PCA approach. It stores incoming data in a buffer until the buffer is full, then performs an eigen-decomposition update. A momentum term blends the new PCA decomposition with the existing one to adapt over time.

Optionally
  • A "prior" can be applied, which acts like ridge (Tikhonov) regularization. It assumes white noise on the inputs and shrinks each principal component proportionally to 1 / (1 + 1 / eigenvalue).
  • The output can be normalized so that the expected variance (averaged over all features) is unity.

Parameters:

Name Type Description Default
n_components int

Number of principal components to retain.

10
oversampling int

Extra dimensions to capture a slightly larger subspace (not currently used).

10
buffer_size int

Number of samples to accumulate before performing an SVD update.

256
momentum float

Blend factor for merging new PCA decomposition with the old one.

0.1
normalize_output bool

Whether to normalize the reconstructed outputs so that they have unit average variance.

True
use_prior bool

Whether to apply a variance-based prior (ridge-like shrinkage).

True
Source code in falcon/embeddings/svd.py
def __init__(
    self,
    n_components: int = 10,
    # input_dim: int,
    oversampling: int = 10,
    buffer_size: int = 256,
    momentum: float = 0.1,
    normalize_output: bool = True,
    use_prior: bool = True,
    # add_mean: bool = False,
) -> None:
    """
    Args:
        n_components: Number of principal components to retain.
        oversampling: Extra dimensions to capture a slightly larger subspace (not currently used).
        buffer_size: Number of samples to accumulate before performing an SVD update.
        momentum: Blend factor for merging new PCA decomposition with the old one.
        normalize_output: Whether to normalize the reconstructed outputs
                          so that they have unit average variance.
        use_prior: Whether to apply a variance-based prior (ridge-like shrinkage).
    """
    super().__init__()
    self.n_components: int = n_components
    # self.input_dim: int = input_dim
    self.oversampling: int = oversampling
    self.buffer_size: int = buffer_size
    # self.device: str = device
    self.momentum: float = momentum
    self.normalize_output: bool = normalize_output
    self.use_prior: bool = use_prior
    # self.add_mean: bool = add_mean

    # Running mean of the input data, updated incrementally
    # self.mean: Optional[torch.Tensor] = None  # shape: (D,)
    # Number of samples accumulated so far (used for updating the mean)
    # self.n_samples: int = 0

    # Temporary buffer for incoming data points
    self.buffer: List[torch.Tensor] = []
    # Counts how many samples have been appended to the buffer
    self.buffer_counter: int = 0

    # Principal components (top-k right singular vectors) and eigenvalues
    self.components: Optional[torch.Tensor] = None  # shape: (k, D)
    self.eigenvalues: Optional[torch.Tensor] = None  # shape: (k,)

update

update(X)

Accumulate a batch of data in the buffer. If the buffer is full, update the PCA decomposition.

Parameters:

Name Type Description Default
X Tensor

A batch of input data with shape (batch_size, D).

required
Source code in falcon/embeddings/svd.py
def update(self, X: torch.Tensor) -> None:
    """
    Accumulate a batch of data in the buffer. If the buffer is full, update the PCA decomposition.

    Args:
        X: A batch of input data with shape (batch_size, D).
    """
    batch_size = X.shape[0]

    # Store in the buffer
    self.buffer.append(X)
    self.buffer_counter += batch_size

    # If we've accumulated enough samples in the buffer, update the PCA decomposition
    if self.buffer_counter >= self.buffer_size:
        self._compute_svd_update()
        # Clear the buffer and reset counter
        self.buffer = []
        self.buffer_counter = 0

forward

forward(X)

Filter the input data by projecting onto the learned principal components, optionally applying a variance-based prior, then reconstructing and possibly normalizing.

Parameters:

Name Type Description Default
X Tensor

A batch of input data with shape (batch_size, D).

required

Returns:

Type Description
Tensor

torch.Tensor: A batch of data with shape (batch_size, D) after PCA

Tensor

transform (and optional prior & normalization).

Source code in falcon/embeddings/svd.py
def forward(self, X: torch.Tensor) -> torch.Tensor:
    """
    Filter the input data by projecting onto the learned principal components,
    optionally applying a variance-based prior, then reconstructing and possibly normalizing.

    Args:
        X: A batch of input data with shape (batch_size, D).

    Returns:
        torch.Tensor: A batch of data with shape (batch_size, D) after PCA
        transform (and optional prior & normalization).
    """
    # If no PCA has been computed yet, we can't project
    if self.components is None:
        raise ValueError(
            "SVD components not computed yet. Call update() enough times first."
        )

    # Shift input by the running mean
    # X_centered = X - self.mean
    # Project onto the principal components
    X_proj = X @ self.components.T  # shape: (batch_size, k)

    # Optionally apply the variance-based prior
    # This is like ridge regularization in a Bayesian sense,
    # assuming white noise on the inputs: X_proj / (1 + 1 / eigenvalues).
    if self.use_prior:
        if self.eigenvalues is None:
            raise ValueError(
                "Eigenvalues not available. PCA must be computed first."
            )
        X_proj = X_proj / (1.0 + (1.0 / self.eigenvalues)).unsqueeze(0)

    # Reconstruct from the principal components
    X_reconstructed = X_proj @ self.components

    # Optionally normalize the output so that the average variance is ~1
    if self.normalize_output:
        if self.eigenvalues is None:
            raise ValueError(
                "Eigenvalues not available. PCA must be computed first."
            )
        # The sum of eigenvalues is the total variance in the top-k subspace
        # We divide by sqrt( average variance per feature ) = sqrt( sum / D )
        input_dim = X_reconstructed.shape[-1]
        scale_factor = (self.eigenvalues.sum() / input_dim) ** 0.5
        X_reconstructed /= scale_factor

    return X_reconstructed