import numpy as np
[docs]
def expcos(x, e): return np.sign(np.cos(x)) * np.abs(np.cos(x))**e
[docs]
def expsin(x, e): return np.sign(np.sin(x)) * np.abs(np.sin(x))**e
[docs]
def project_points_to_plane(point0, point1, points):
"""
Projects points onto a plane perpendicular to the line segment from point0 to point1.
Args:
point0, point1 (np.ndarray): Define the line segment direction
points (np.ndarray): Points to project (shape: (n_points, num_dims))
Returns:
np.ndarray: Projected points on the orthogonal plane
"""
# Get normalized direction vector (with zero-vector check)
direction = point1 - point0
norm = np.linalg.norm(direction)
if norm < 1e-10: return points
direction = direction / norm
# Project: subtract the component along the direction
components = np.dot(points - point1, direction)
projected_points = points - np.outer(components, direction)
return projected_points
[docs]
def compute_eigen_2d(points):
# given the points projected to the 2D plan, compute the eigenvalues and eigenvectors
cov = np.cov(points, rowvar=False)
eigvals, eigvecs = np.linalg.eigh(cov)
# Sort eigenvalues in descending order
order = np.argsort(eigvals)[::-1]
eigvals = np.clip(eigvals[order], 1e-8, np.inf) # Prevent numerical issues
eigvecs = eigvecs[:, order]
return eigvals, eigvecs
[docs]
def build_2d_superellipse(eigvals, resolution=16, e_proj=1, sym=False):
"""
Build a 2D superellipse given the eigenvalues.
Args:
eigvals (np.ndarray): Eigenvalues for the 2D projection.
resolution (int, optional): Number of points to sample on the superellipse boundary.
Defaults to 16.
e_proj (float, optional): Exponent controlling the superellipse shape.
e_proj=1 creates a standard ellipse, higher values approach a rectangle.
Defaults to 1.
sym (bool, optional): If True, forces the superellipse to be symmetric by
setting both axes to the same length. Defaults to False.
Returns:
np.ndarray: A 2D array (shape: resolution × 2) representing the boundary points
of the superellipse.
"""
# Calculate the radii for the superellipse axes
radii = 2.0 * np.sqrt(eigvals[:2]) # 2-sigma ellipse
if sym:
radii[1] = radii[0] # Force symmetric shape if requested
# Calculate the boundary points of the superellipse
theta = np.linspace(0, 2 * np.pi, resolution+1)
x = radii[0] * expcos(theta, e_proj)
y = radii[1] * expsin(theta, e_proj)
superellipse_2d = np.stack([x[:-1], y[:-1]], axis=1) # shape (resolution, 2)
return superellipse_2d
[docs]
def uncertainty_cross_section(point0, point1, points, resolution=16, e_proj=1, sym=False):
"""
Calculate a cross-sectional uncertainty profile using start, end, and point cloud.
This function generates a 2D superellipse embedded in 3D space that represents the
uncertainty distribution around a trajectory segment. It projects points onto a plane
perpendicular to the segment direction, performs principal component analysis to
determine the main uncertainty directions, and creates a superellipse boundary.
Args:
point0 (np.ndarray): A 3D point representing the start of the segment.
point1 (np.ndarray): A 3D point representing the end of the segment.
points (np.ndarray): A 2D array of points (shape: n_samples × num_dims)
representing the point cloud.
resolution (int, optional): Number of points to sample on the superellipse boundary.
Defaults to 16.
e_proj (float, optional): Exponent controlling the superellipse shape.
e_proj=1 creates a standard ellipse, higher values approach a rectangle.
Defaults to 1.
sym (bool, optional): If True, forces the superellipse to be symmetric by
setting both axes to the same length. Defaults to False.
Returns:
tuple:
- cross_section_3d (np.ndarray): A 2D array (shape: resolution × num_dims)
representing the boundary points of the 3D cross-sectional profile.
- eigen_values (np.ndarray): A 2D array (shape: resolution × 2) containing
the two principal eigenvalues for each boundary point, representing
the magnitudes of uncertainty in the primary directions.
Notes:
- The function creates a 2D superellipse in 3D space (not a true 3D ellipsoid)
- The cross-section is oriented perpendicular to the trajectory direction
- The function uses a 2-sigma confidence region (covering ~95% of variance)
- The superellipse shape can be controlled via the e_proj parameter
"""
# Project onto orthogonal plane
projected_points = project_points_to_plane(point0, point1, points)
mean_projected = np.mean(projected_points, axis=0)
# Calculate covariance and eigendecomposition
eigvals_proj, eigvecs_proj = compute_eigen_2d(projected_points)
cross_section_2d = build_2d_superellipse(eigvals_proj, resolution=resolution, e_proj=e_proj, sym=sym)
# Primary and secondary axes
primary_axis = eigvecs_proj[:, 0]
secondary_axis = eigvecs_proj[:, 1]
# Transform 2D cross-section to 3D space
cross_section_3d = mean_projected + cross_section_2d[:, 0, np.newaxis] * primary_axis + \
cross_section_2d[:, 1, np.newaxis] * secondary_axis
# Return eigenvalues repeated for each point in the cross-section
eigen_values = np.tile(eigvals_proj[:2], (cross_section_3d.shape[0], 1))
return cross_section_3d, eigen_values
[docs]
def generate_cross_sections(trajectories, major_trajectory=None, resolution=20, e_proj=1, sym=False, n_jobs=1):
"""
Compute 3D uncertainty cross-sections along trajectory paths.
For each time step and starting location, this function calculates a cross-sectional
uncertainty profile by projecting the ensemble trajectories onto a plane perpendicular
to the major trajectory direction, performing principal component analysis, and
constructing a superellipse boundary representing the uncertainty distribution.
Args:
trajectories (np.ndarray): Ensemble trajectories with shape
(n_steps, n_starting_locations, n_samples, num_dims).
major_trajectory (np.ndarray, optional): The main trajectory to use for cross-section orientation,
with shape (n_steps, n_starting_locations, num_dims). If None, uses the mean of trajectories.
resolution (int, optional): Number of points to sample on each cross-section boundary.
Defaults to 20.
e_proj (float, optional): Exponent controlling the superellipse shape.
e_proj=1 creates a standard ellipse, higher values approach a rectangle.
Defaults to 1.
sym (bool, optional): If True, forces the superellipse to be symmetric by
setting both axes to the same length. Defaults to False.
n_jobs (int, optional): Number of parallel jobs to use. If n_jobs=1, uses sequential processing.
Defaults to 1.
Returns:
tuple:
- uncertainty_tube (np.ndarray): Cross-section boundary points with shape
(n_steps, n_starting_locations, resolution, num_dims).
- eigen_values (np.ndarray): Eigenvalues for each cross-section boundary point with shape
(n_steps, n_starting_locations, resolution, 2).
Notes:
- The function computes a 2D superellipse embedded in 3D space for each trajectory segment.
- The cross-section is oriented perpendicular to the trajectory direction at each time step.
- The initial positions (t=0) are set to the starting points of the major trajectory.
"""
# Compute major trajectory if not provided (mean over samples)
if major_trajectory is None:
major_trajectory = np.mean(trajectories, axis=2)
n_steps, n_starting_locations, n_samples, num_dims = np.shape(trajectories)
# Initialize output arrays for cross-section boundary points and eigenvalues
uncertainty_tube = np.zeros((n_steps, n_starting_locations, resolution, num_dims))
eigen_values = np.zeros((n_steps, n_starting_locations, resolution, 2))
# Set initial positions (t=0) to major trajectory starting points
for location_index in range(n_starting_locations):
for res_index in range(resolution):
uncertainty_tube[0, location_index, res_index, :] = major_trajectory[0, location_index, :]
use_parallel = n_jobs != 1
if use_parallel:
try:
import multiprocessing as mp
mp.get_context('fork')
except (ImportError, ValueError):
use_parallel = False
print("Warning: multiprocessing with 'fork' context not available. Using sequential processing instead.")
# Calculate uncertainty cross-sections for each time step and starting location
for step_index in range(1, n_steps):
if use_parallel:
pool_context = mp.get_context('fork')
with pool_context.Pool(processes=n_jobs) as pool:
results = pool.starmap(
uncertainty_cross_section,
[
(
major_trajectory[step_index-1, location_index, :],
major_trajectory[step_index, location_index, :],
trajectories[step_index, location_index, :, :],
resolution,
e_proj,
sym
) for location_index in range(n_starting_locations)
]
)
# Unpack results
for location_index, (cs, ev) in enumerate(results):
uncertainty_tube[step_index, location_index, :, :] = cs
eigen_values[step_index, location_index, :, :] = ev
else:
for location_index in range(n_starting_locations):
# Calculate cross-section boundary and eigenvalues
uncertainty_tube[step_index, location_index, :, :], eigen_values[step_index, location_index, :, :] = uncertainty_cross_section(
major_trajectory[step_index-1, location_index, :], # previous major trajectory point
major_trajectory[step_index, location_index, :], # current major trajectory point
trajectories[step_index, location_index, :, :], # ensemble samples at current time
resolution=resolution,
e_proj=e_proj,
sym=sym
)
return uncertainty_tube, eigen_values
[docs]
def project_points_onto_line(point0, point1, points):
"""
Project points onto a line defined by two points.
Parameters
----------
point0 : np.ndarray
Array of shape (2,) representing the first point on the line.
point1 : np.ndarray
Array of shape (2,) representing the second point on the line.
points : np.ndarray
Array of shape (n_points, 2) representing the points to be projected.
Returns
-------
projections : np.ndarray
Array of shape (n_points, 2) representing the projected points on the line.
"""
line_dir = point1 - point0
line_dir = line_dir / np.linalg.norm(line_dir) # Normalize direction
perp_line_dir = np.array([-line_dir[1], line_dir[0]]) # Perpendicular direction
# project points onto the line passing through point1 and perpendicular to line_dir
projections = point1 + np.dot(points - point1, perp_line_dir)[:, np.newaxis] * perp_line_dir
return projections
[docs]
def generate_cross_sections_2D(trajectories):
"""
Compute cross-sections of 2D trajectories.
Parameters
----------
trajectories : np.ndarray
Array of shape (n_trajectories, n_time_steps, n_ensemble_members, 2) representing the 2D trajectories.
Returns
-------
cross_sections : list of np.ndarray
mean_trajectory : np.ndarray
Array of shape (n_trajectories, n_time_steps, 2) representing the mean trajectory.
cross_sections : np.ndarray
Array of shape (n_trajectories, n_time_steps)
"""
n_trajectories, n_time_steps, n_ensemble_members, _ = trajectories.shape
cross_sections = np.zeros((n_trajectories, n_time_steps, 2, 2)) # Each cross-section is represented by a line segment (x1, y1, x2, y2)
mean_trajectories = np.mean(trajectories, axis=2)
# Compute cross-sections at each time step
for i_t in range(1, n_time_steps):
for i_traj in range(n_trajectories):
# project points onto the onto the passing through mean_trajectories[:, i_t] and
# perpendicular to direction_unit
projected_points = project_points_onto_line(mean_trajectories[i_traj, i_t - 1], mean_trajectories[i_traj, i_t], trajectories[i_traj, i_t])
# Compute covariance matrix of the projected points
cov_matrix = np.cov(projected_points.T)
# Eigen decomposition
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
# Sort eigenvalues and eigenvectors
order = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[order]
eigenvectors = eigenvectors[:, order]
cross_sections[i_traj, i_t] = eigenvalues[0]
return mean_trajectories, cross_sections