Source code for Modules.UncertaintyTube.uncertainty_tubes_stats

import numpy as np

[docs] def expcos(x, e): return np.sign(np.cos(x)) * np.abs(np.cos(x))**e
[docs] def expsin(x, e): return np.sign(np.sin(x)) * np.abs(np.sin(x))**e
[docs] def project_points_to_plane(point0, point1, points): """ Projects points onto a plane perpendicular to the line segment from point0 to point1. Args: point0, point1 (np.ndarray): Define the line segment direction points (np.ndarray): Points to project (shape: (n_points, num_dims)) Returns: np.ndarray: Projected points on the orthogonal plane """ # Get normalized direction vector (with zero-vector check) direction = point1 - point0 norm = np.linalg.norm(direction) if norm < 1e-10: return points direction = direction / norm # Project: subtract the component along the direction components = np.dot(points - point1, direction) projected_points = points - np.outer(components, direction) return projected_points
[docs] def compute_eigen_2d(points): # given the points projected to the 2D plan, compute the eigenvalues and eigenvectors cov = np.cov(points, rowvar=False) eigvals, eigvecs = np.linalg.eigh(cov) # Sort eigenvalues in descending order order = np.argsort(eigvals)[::-1] eigvals = np.clip(eigvals[order], 1e-8, np.inf) # Prevent numerical issues eigvecs = eigvecs[:, order] return eigvals, eigvecs
def _compute_eigen_info(point0, point1, points): """ Helper function to compute eigenvalues and eigenvectors for a single cross-section. """ # Project onto orthogonal plane projected_points = project_points_to_plane(point0, point1, points) # Calculate covariance and eigendecomposition eigvals_proj, eigvecs_proj = compute_eigen_2d(projected_points) return eigvals_proj[:2], eigvecs_proj[:, :2] # Return only the top 2 eigvals and their vectors
[docs] def uncertainty_tubes_summary_statistics(trajectories, n_jobs=1): """ Compute 3D uncertainty summary statistics along trajectory paths. Args: trajectories (np.ndarray): Ensemble trajectories with shape (n_steps, n_starting_locations, n_samples, num_dims). n_jobs (int, optional): Number of parallel jobs to use. If n_jobs=1, uses sequential processing. Defaults to 1. Returns: dict: - "mean_trajectory" (np.ndarray): The mean trajectory with shape (n_steps, n_starting_locations, num_dims). - "eigen_values" (np.ndarray): Eigenvalues for each cross-section with shape (n_steps, n_starting_locations, 2). - "eigen_vectors" (np.ndarray): Eigenvectors for each cross-section with shape (n_steps, n_starting_locations, num_dims, 2). """ major_trajectory = np.mean(trajectories, axis=2) n_steps, n_starting_locations, n_samples, num_dims = np.shape(trajectories) eigen_values = np.zeros((n_steps, n_starting_locations, 2)) eigen_vectors = np.zeros((n_steps, n_starting_locations, num_dims, 2)) use_parallel = n_jobs != 1 if use_parallel: try: import multiprocessing as mp # Use 'fork' context if available, otherwise fallback ctx = mp.get_context('fork') Pool = ctx.Pool except (ImportError, ValueError, NotImplementedError): use_parallel = False print("Warning: multiprocessing with 'fork' context not available. Using sequential processing instead.") # Calculate uncertainty cross-sections for each time step and starting location for step_index in range(1, n_steps): if use_parallel: with Pool(processes=n_jobs) as pool: results = pool.starmap( _compute_eigen_info, [ ( major_trajectory[step_index-1, location_index, :], major_trajectory[step_index, location_index, :], trajectories[step_index, location_index, :, :], ) for location_index in range(n_starting_locations) ] ) for location_index, (ev, evecs) in enumerate(results): eigen_values[step_index, location_index, :] = ev eigen_vectors[step_index, location_index, :, :] = evecs else: for location_index in range(n_starting_locations): ev, evecs = _compute_eigen_info( major_trajectory[step_index-1, location_index, :], major_trajectory[step_index, location_index, :], trajectories[step_index, location_index, :, :], ) eigen_values[step_index, location_index, :] = ev eigen_vectors[step_index, location_index, :, :] = evecs return { "mean_trajectory": major_trajectory, "eigen_values": eigen_values, "eigen_vectors": eigen_vectors }