Source code for Core.BandDepths.vector_depths

import numpy as np
from itertools import combinations
import multiprocessing as mp


def _compute_depth_for_vector(args):
    """
    Worker function to compute depth for a single vector (for parallelization).
    
    Parameters:
    -----------
    args : tuple
        (vector_idx, angles, magnitudes, pairs)
    
    Returns:
    --------
    int : depth count for this vector
    """
    i, angles, magnitudes, pairs = args
    angle_i = angles[i]
    mag_i = magnitudes[i]
    
    depth = 0
    for j, k in pairs:
        angle_j, angle_k = angles[j], angles[k]
        mag_j, mag_k = magnitudes[j], magnitudes[k]
        
        # Check if vector i is between vectors j and k
        if (angle_j <= angle_i <= angle_k or angle_k <= angle_i <= angle_j) and \
           (min(mag_j, mag_k) <= mag_i <= max(mag_j, mag_k)):
            depth += 1
    
    return depth


[docs] def calculate_spread_2D(vectors, depths, percentile): """ Calculate the spread in 2D polar coordinates. Parameters: ----------- vectors : numpy.ndarray Array of shape (n, 2) in polar coordinates (magnitude, angle) depths : numpy.ndarray Array of shape (n,) representing the depth of each vector percentile : float Percentile of vectors to include based on depth ranking (0-100). Higher values include more vectors. - percentile=50: Include top 50% deepest vectors - percentile=95: Include top 95% deepest vectors (typical) - percentile=100: Include ALL vectors Returns: -------- tuple Indices of vectors with min/max magnitude, angle among those passing depth threshold """ # FIXED: Use proper percentile semantics # percentile=95 means keep top 95% by depth = depth >= 5th percentile depth_threshold = np.percentile(depths, 100.0 - percentile) first_quadrant = False # Set to True if you want to restrict to first quadrant 0 to pi/2 second_quadrant = False # Set to True if you want to restrict to second quadrant pi/2 to pi third_quadrant = False # Set to True if you want to restrict to third quadrant -pi to -pi/2 fourth_quadrant = False # Set to True if you want to restrict to fourth quadrant -pi/2 to 0 indices = np.where(depths >= depth_threshold)[0] median_idx = np.argmax(depths) if indices.size > 0: filtered_vectors = vectors[indices] # Check if any vector is in the first quadrant first_quadrant = np.any((filtered_vectors[:, 1] >= 0) & (filtered_vectors[:, 1] <= np.pi/2)) second_quadrant = np.any((filtered_vectors[:, 1] > np.pi/2) & (filtered_vectors[:, 1] <= np.pi)) third_quadrant = np.any((filtered_vectors[:, 1] < -np.pi/2) & (filtered_vectors[:, 1] >= -np.pi)) fourth_quadrant = np.any((filtered_vectors[:, 1] < 0) & (filtered_vectors[:, 1] >= -np.pi/2)) if ((not first_quadrant and second_quadrant and third_quadrant and not fourth_quadrant ) or (not first_quadrant and second_quadrant and third_quadrant and fourth_quadrant ) or (first_quadrant and second_quadrant and third_quadrant and not fourth_quadrant )): # find smallest positive angle and smallest negative angle pos_angles = filtered_vectors[filtered_vectors[:, 1] >= 0] neg_angles = filtered_vectors[filtered_vectors[:, 1] < 0] min_angle = np.min(pos_angles[:, 1]) max_angle = np.max(neg_angles[:, 1]) min_mag = np.min(filtered_vectors[:, 0]) max_mag = np.max(filtered_vectors[:, 0]) else: min_angle = np.min(filtered_vectors[:, 1]) max_angle = np.max(filtered_vectors[:, 1]) min_mag = np.min(filtered_vectors[:, 0]) max_mag = np.max(filtered_vectors[:, 0]) else: min_angle, max_angle = 0.0, 0.0 min_mag, max_mag = 0.0, 0.0 return median_idx, min_mag, max_mag, min_angle, max_angle
[docs] def calculate_spread_3D(vectors, depths, percentile): """ Calculate the spread in 3D spherical coordinates. Parameters: ----------- vectors : numpy.ndarray Array of shape (n, 3) in spherical coordinates (magnitude, theta, phi) depths : numpy.ndarray Array of shape (n,) representing the depth of each vector percentile : float Percentile of vectors to include based on depth ranking (0-100). Higher values include more vectors. - percentile=50: Include top 50% deepest vectors - percentile=95: Include top 95% deepest vectors (typical) - percentile=100: Include ALL vectors Returns: -------- tuple Indices of vectors with min/max magnitude, theta, phi among those passing depth threshold """ # FIXED: Use proper percentile semantics # percentile=95 means keep top 95% by depth = depth >= 5th percentile depth_threshold = np.percentile(depths, 100.0 - percentile) indices = np.where(depths >= depth_threshold)[0] median_idx = np.argmax(depths) if indices.size > 0: filtered_vectors = vectors[indices] min_phi_idx = np.argmin(filtered_vectors[:, 2]) max_phi_idx = np.argmax(filtered_vectors[:, 2]) min_theta_idx = np.argmin(filtered_vectors[:, 1]) max_theta_idx = np.argmax(filtered_vectors[:, 1]) min_mag_idx = np.argmin(filtered_vectors[:, 0]) max_mag_idx = np.argmax(filtered_vectors[:, 0]) else: min_phi_idx, max_phi_idx = None, None min_theta_idx, max_theta_idx = None, None min_mag_idx, max_mag_idx = None, None return median_idx, min_mag_idx, max_mag_idx, min_theta_idx, max_theta_idx, min_phi_idx, max_phi_idx
[docs] def cartesian_to_polar(vectors): """ Convert 2D Cartesian vectors to polar coordinates (magnitude, angle). Parameters: ----------- vectors : numpy.ndarray Array of shape (n, 2) representing 2D Cartesian vectors. Returns: -------- polar_coords : numpy.ndarray Array of shape (n, 2) with columns [magnitude, angle in radians]. """ magnitudes = np.linalg.norm(vectors, axis=1) valid_indices = magnitudes > 1.0e-8 # Filter vectors with norm greater than 1.0e-8 angles = np.zeros_like(magnitudes) angles[valid_indices] = np.arctan2(vectors[valid_indices, 1], vectors[valid_indices, 0]) angles = np.unwrap(angles) # Unwrap to ensure continuity in angles angles = (angles + np.pi) % (2 * np.pi) - np.pi # Ensure angles are in the range [-pi, pi] polar_coords = np.column_stack((magnitudes, angles)) return polar_coords
[docs] def cartesian_to_spherical(vectors): """ Convert 3D Cartesian vectors to spherical coordinates (magnitude, theta, phi). Parameters: ----------- vectors : numpy.ndarray Array of shape (n, 3) representing 3D Cartesian vectors. Returns: -------- spherical_coords : numpy.ndarray Array of shape (n, 3) with columns [magnitude, theta, phi]. """ magnitudes = np.linalg.norm(vectors, axis=1) valid_indices = magnitudes > 1.0e-8 # Filter vectors with norm greater than 1.0e-8 theta = np.zeros_like(magnitudes) phi = np.zeros_like(magnitudes) theta[valid_indices] = np.arccos(np.clip(vectors[valid_indices, 2] / magnitudes[valid_indices], -1.0, 1.0)) # Clip to handle numerical precision issues phi[valid_indices] = np.arctan2(vectors[valid_indices, 1], vectors[valid_indices, 0]) phi = np.unwrap(phi) # Unwrap to ensure continuity in angles spherical_coords = np.column_stack((magnitudes, theta, phi)) # if np.any(np.isnan(spherical_coords)): # print("Invalid vectors detected:", vectors) # print("Spherical coordinates:", spherical_coords) # raise ValueError("NaN values found in spherical coordinates conversion.") return spherical_coords
[docs] def angular_spread(vectors): """ Calculate the angular spread and magnitude spread of a set of vectors. Parameters ---------- vectors : numpy.ndarray Array of shape (n, 2) representing 2D Cartesian vectors. Returns ------- angular_spread_deg : float The angular spread in degrees. magnitude_spread : float The magnitude spread. min_idx : int Index of the vector with the minimum angle. max_idx : int Index of the vector with the maximum angle. """ # Calculate the angular spread (max-min angle from origin) and return indices angles_from_origin = np.arctan2(vectors[:, 1], vectors[:, 0]) angles_unwrapped = np.unwrap(angles_from_origin) min_idx = np.argmin(angles_unwrapped) max_idx = np.argmax(angles_unwrapped) angular_spread = angles_unwrapped[max_idx] - angles_unwrapped[min_idx] angular_spread_deg = np.degrees(angular_spread) return angular_spread_deg, min_idx, max_idx
[docs] def magnitude_spread(vectors): """ Calculate the magnitude spread of a set of vectors. Parameters ---------- vectors : numpy.ndarray Array of shape (n, 2) representing 2D Cartesian vectors. Returns ------- magnitude_spread : float The magnitude spread. min_mag_idx : int Index of the vector with the minimum magnitude. max_mag_idx : int Index of the vector with the maximum magnitude. """ # Calculate the magnitude spread (max-min magnitude) and return indices magnitudes = np.linalg.norm(vectors, axis=1) min_mag_idx = np.argmin(magnitudes) max_mag_idx = np.argmax(magnitudes) magnitude_spread = magnitudes[max_mag_idx] - magnitudes[min_mag_idx] return magnitude_spread, min_mag_idx, max_mag_idx
[docs] def compute_vector_depths_2D(vectors, workers=None): """ Compute the depth of each vector in the ensemble using optimized vectorized operations. The vector depth calculation is based on T. A. J. Ouermi, J. Li, Z. Morrow, B. Van Bloemen Waanders and C. R. Johnson, "Glyph-Based Uncertainty Visualization and Analysis of Time-Varying Vector Fields," 2024 IEEE Workshop on Uncertainty Visualization: Applications, Techniques, Software, and Decision Frameworks, St Pete Beach, FL, USA, 2024, pp. 73-77, doi: 10.1109/UncertaintyVisualization63963.2024.00014. Parameters: ----------- vectors : numpy.ndarray Array of shape (n, 2) representing 2D Cartesian vectors. workers : int, optional Number of worker processes for parallel computation. Default is None (sequential). Set to a positive integer to enable multiprocessing. Returns: -------- depths : numpy.ndarray Array of shape (n,) with the depth of each vector. Notes: ------ Optimized version using: 1. Vectorized angle/magnitude computation (precomputed once) 2. Algorithm optimization (avoid redundant computations) 3. Optional parallelization for large ensembles """ n = vectors.shape[0] # Fast path for small ensembles if n < 3: return np.zeros(n) # OPTIMIZATION 1: Precompute all angles and magnitudes once (vectorized) angles = np.arctan2(vectors[:, 1], vectors[:, 0]) magnitudes = np.linalg.norm(vectors, axis=1) # Generate pairs once pairs = list(combinations(np.arange(n), 2)) n_pairs = len(pairs) # OPTIMIZATION 2: Vectorized depth computation # For small ensembles, vectorized version is faster than parallelization overhead if workers is None or workers <= 1 or n < 30: depths = np.zeros(n, dtype=np.int32) # Convert pairs to arrays for vectorized operations pairs_arr = np.array(pairs) j_indices = pairs_arr[:, 0] k_indices = pairs_arr[:, 1] # Precompute pair properties (vectorized) angles_j = angles[j_indices] angles_k = angles[k_indices] mags_j = magnitudes[j_indices] mags_k = magnitudes[k_indices] # Precompute min/max for magnitudes min_mags = np.minimum(mags_j, mags_k) max_mags = np.maximum(mags_j, mags_k) # Compute depth for each vector for i in range(n): angle_i = angles[i] mag_i = magnitudes[i] # Vectorized angle check: i is between j and k angle_between = ((angles_j <= angle_i) & (angle_i <= angles_k)) | \ ((angles_k <= angle_i) & (angle_i <= angles_j)) # Vectorized magnitude check: i is between j and k mag_between = (min_mags <= mag_i) & (mag_i <= max_mags) # Count how many pairs satisfy both conditions depths[i] = np.sum(angle_between & mag_between) else: # OPTIMIZATION 3: Parallel computation for large ensembles ctx = mp.get_context('fork') pool = ctx.Pool(processes=workers) try: # Prepare arguments for each vector args_list = [(i, angles, magnitudes, pairs) for i in range(n)] # Compute depths in parallel depths = np.array(pool.map(_compute_depth_for_vector, args_list), dtype=np.int32) finally: pool.close() pool.join() # Scale depths between 0 and 1 depths = depths.astype(np.float64) if depths.max() > 0.0: depths = (depths - depths.min()) / (depths.max() - depths.min()) return depths
def _compute_depth_for_vector_3D(args): """ Worker function to compute depth for a single 3D vector (for parallelization). Parameters: ----------- args : tuple (vector_idx, magnitudes, thetas, phis, pairs) Returns: -------- int : depth count for this vector """ i, magnitudes, thetas, phis, pairs = args mag_i = magnitudes[i] theta_i = thetas[i] phi_i = phis[i] depth = 0 for j, k in pairs: mag_j, mag_k = magnitudes[j], magnitudes[k] theta_j, theta_k = thetas[j], thetas[k] phi_j, phi_k = phis[j], phis[k] # Check if vector i is between vectors j and k in spherical coordinates if (theta_j < theta_i < theta_k or theta_k < theta_i < theta_j) and \ (phi_j < phi_i < phi_k or phi_k < phi_i < phi_j) and \ (min(mag_j, mag_k) < mag_i < max(mag_j, mag_k)): depth += 1 return depth
[docs] def compute_vector_depths_3D(vectors, workers=None): """ Compute the depth of each vector in the ensemble in 3D using optimized vectorized operations. Assumes vectors are in spherical coordinates (magnitude, theta, phi). The vector depth calculation is based on T. A. J. Ouermi, J. Li, Z. Morrow, B. Van Bloemen Waanders and C. R. Johnson, "Glyph-Based Uncertainty Visualization and Analysis of Time-Varying Vector Fields," 2024 IEEE Workshop on Uncertainty Visualization: Applications, Techniques, Software, and Decision Frameworks, St Pete Beach, FL, USA, 2024, pp. 73-77, doi: 10.1109/UncertaintyVisualization63963.2024.00014. Parameters: ----------- vectors : numpy.ndarray Array of shape (n, 3) representing 3D spherical coordinates (magnitude, theta, phi). workers : int, optional Number of worker processes for parallel computation. Default is None (sequential). Set to a positive integer to enable multiprocessing. Returns: -------- depths : numpy.ndarray Array of shape (n,) with the depth of each vector. Notes: ------ Optimized version using: 1. Vectorized magnitude/angle computation (precomputed once) 2. Algorithm optimization (avoid redundant computations) 3. Optional parallelization for large ensembles """ n = vectors.shape[0] # Fast path for small ensembles if n < 3: return np.zeros(n) # OPTIMIZATION 1: Precompute all components once (vectorized) magnitudes = vectors[:, 0] thetas = vectors[:, 1] phis = vectors[:, 2] # Generate pairs once pairs = list(combinations(np.arange(n), 2)) # OPTIMIZATION 2: Vectorized depth computation # For small ensembles, vectorized version is faster than parallelization overhead if workers is None or workers <= 1 or n < 30: depths = np.zeros(n, dtype=np.int32) # Convert pairs to arrays for vectorized operations pairs_arr = np.array(pairs) j_indices = pairs_arr[:, 0] k_indices = pairs_arr[:, 1] # Precompute pair properties (vectorized) mags_j = magnitudes[j_indices] mags_k = magnitudes[k_indices] thetas_j = thetas[j_indices] thetas_k = thetas[k_indices] phis_j = phis[j_indices] phis_k = phis[k_indices] # Precompute min/max for magnitudes min_mags = np.minimum(mags_j, mags_k) max_mags = np.maximum(mags_j, mags_k) # Compute depth for each vector for i in range(n): mag_i = magnitudes[i] theta_i = thetas[i] phi_i = phis[i] # Vectorized theta check: i is between j and k theta_between = ((thetas_j < theta_i) & (theta_i < thetas_k)) | \ ((thetas_k < theta_i) & (theta_i < thetas_j)) # Vectorized phi check: i is between j and k phi_between = ((phis_j < phi_i) & (phi_i < phis_k)) | \ ((phis_k < phi_i) & (phi_i < phis_j)) # Vectorized magnitude check: i is between j and k (strict inequality) mag_between = (min_mags < mag_i) & (mag_i < max_mags) # Count how many pairs satisfy all three conditions depths[i] = np.sum(theta_between & phi_between & mag_between) else: # OPTIMIZATION 3: Parallel computation for large ensembles ctx = mp.get_context('fork') pool = ctx.Pool(processes=workers) try: # Prepare arguments for each vector args_list = [(i, magnitudes, thetas, phis, pairs) for i in range(n)] # Compute depths in parallel depths = np.array(pool.map(_compute_depth_for_vector_3D, args_list), dtype=np.int32) finally: pool.close() pool.join() # Scale depths between 0 and 1 depths = depths.astype(np.float64) if depths.max() > 0.0: depths = (depths - depths.min()) / (depths.max() - depths.min()) return depths