Source code for pysubsea.oos_tools

# pylint: disable=too-many-lines
'''
This module provides classes and functions for processing and analyzing Out-Of-Straightness (OOS)
pipeline survey and route data.

**Features:**

- Designed for use in subsea pipeline, but general enough for any OOS survey
  data processing and analysis tasks.
- The `OOSAnonymisation` class processes OOS survey and route data, providing methods for
  cleaning, sectioning, coordinate normalization, and anonymization of pipeline survey datasets.
- The `OOSDespiker` class implements rolling window Hampel filter despiking for OOS survey data,
  removing outliers while preserving data alignment and group structure.
- The `FFTSmoother` and `GaussianSmoother` class implements group-wise signal processing and
  smoothing (FFT, Gaussian, etc.) for OOS survey data, supporting robust, efficient, and
  reproducible analysis.

All calculations are vectorized using NumPy and SciPy for efficiency and flexibility.

.. raw:: html

   <hr style="height:6px; background-color:#888; border:none; margin:1.5em 0;" />
'''

import numpy as np
from scipy.optimize import leastsq
from scipy.fft import fft, ifft, fftfreq
from scipy.signal import welch
from scipy.ndimage import generic_filter

[docs] class OOSAnonymisation: # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-locals, too-few-public-methods """ Processor for Out-Of-Straightness (OOS) pipeline survey and route data. Provides methods for cleaning, sectioning, and coordinate normalization. All input arrays (parameters) are converted to NumPy arrays and stored as attributes. Additional attributes are initialized as NumPy arrays and will be populated by processing methods. Parameters ---------- route_development : np.ndarray Development identifiers for each route section. route_pipeline_group : np.ndarray Pipeline group identifiers for each route section. route_pipeline : np.ndarray Pipeline identifiers for each route section. route_kp_from : np.ndarray Start KP (kilometer point) for each route section. route_kp_to : np.ndarray End KP (kilometer point) for each route section. route_pipeline_section_type : np.ndarray Section type (e.g., 'Straight', 'Curve', etc.) for each route section. route_curve_radius : np.ndarray Design curve radius for each route section. survey_development : np.ndarray Development identifiers for each survey point. survey_type : np.ndarray Survey type for each survey point. survey_pipeline_group : np.ndarray Pipeline group identifiers for each survey point. survey_pipeline : np.ndarray Pipeline identifiers for each survey point. survey_kp : np.ndarray KP (kilometer point) for each survey point. survey_easting : np.ndarray Easting coordinate for each survey point. survey_northing : np.ndarray Northing coordinate for each survey point. """ def __init__( self, *, route_development, route_pipeline_group, route_pipeline, route_kp_from, route_kp_to, route_pipeline_section_type, route_curve_radius, survey_development, survey_type, survey_pipeline_group, survey_pipeline, survey_kp, survey_easting, survey_northing ): """ Initialize an OOSAnonymisation object with route and survey data. """ # Initialise parameter arrays for the route data self.route_development = np.asarray(route_development, dtype=object) self.route_pipeline_group = np.asarray(route_pipeline_group, dtype=object) self.route_pipeline = np.asarray(route_pipeline, dtype=object) self.route_kp_from = np.asarray(route_kp_from, dtype=float) self.route_kp_to = np.asarray(route_kp_to, dtype=float) self.route_pipeline_section_type = np.asarray(route_pipeline_section_type, dtype=object) self.route_curve_radius = np.asarray(route_curve_radius, dtype=float) # Initialise parameter arrays for the survey data self.survey_development = np.asarray(survey_development, dtype=object) self.survey_type = np.asarray(survey_type, dtype=object) self.survey_pipeline_group = np.asarray(survey_pipeline_group, dtype=object) self.survey_pipeline = np.asarray(survey_pipeline, dtype=object) self.survey_kp = np.asarray(survey_kp, dtype=float) self.survey_easting = np.asarray(survey_easting, dtype=float) self.survey_northing = np.asarray(survey_northing, dtype=float) # Initialise returns arrays for the survey data, section grouped self.survey_section_type = np.full( self.survey_kp.shape[0], '', dtype=object ) self.survey_section_no = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_section_kp_mod = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_section_easting_mod = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_section_northing_mod = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_feature = np.full( self.survey_kp.shape[0], '', dtype=object ) self.survey_design_route_curve_radius = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_actual_route_curve_radius = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) # Initialise returns arrays for the survey data, pipeline group grouped self.survey_group_section_no = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_group_section_type = np.full( self.survey_kp.shape[0], '', dtype=object ) self.survey_group_section_kp_mod = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_group_section_easting_mod = np.full( self.survey_kp.shape[0], np.nan, dtype=float ) self.survey_group_section_northing_mod = np.full( self.survey_kp.shape[0], np.nan, dtype=float )
[docs] def process(self, anonymise: bool = False): """ Run the OOS processing pipeline. Parameters ---------- anonymise : bool, default False If True, runs the full anonymisation pipeline (with cleaning and grouping). If False, runs only the basic section processing. """ self._add_pipeline_section() self._add_feature_labels() self._add_design_route_curve_radius() self._add_actual_route_curve_radius() self._transform_pipeline_section_coordinates() if anonymise: self._trim_feature_kps() self._trim_straight_section_ends() self._add_pipeline_group_section() self._transform_pipeline_group_section_coordinates()
def _add_pipeline_section(self): """ Add section type ('Straight' or 'Curve') and section number to each survey point. For each survey point, determines if it falls within a non-straight route section (based on matching pipeline group, pipeline, and KP range) and adds 'Curve' or 'Straight' to self.survey_section_type. Then, adds a unique section number (self.survey_section_no) to each continuous section of the same type within each (pipeline group, pipeline, survey type) group. Updates ------- self.survey_section_type : np.ndarray Section type ('Straight' or 'Curve') for each survey point. self.survey_section_no : np.ndarray Section number for each survey point, incremented for each new section. Returns ------- None """ # Build mask for non-straight route sections mask = ~np.isin(self.route_pipeline_section_type, ['LBMD', 'PWMD', 'ILT', 'Straight']) # Indices of route sections that are not straight idxs = np.where(mask)[0] mask_curve = np.full(self.survey_kp.shape[0], False) for i in idxs: cond = ( (self.survey_development == self.route_development[i]) & (self.survey_pipeline_group == self.route_pipeline_group[i]) & (self.survey_pipeline == self.route_pipeline[i]) & (self.survey_kp >= self.route_kp_from[i]) & (self.survey_kp <= self.route_kp_to[i]) ) mask_curve = mask_curve | cond # Create or update section type array section_type = np.full(self.survey_kp.shape[0], 'Straight', dtype=object) section_type[mask_curve] = 'Curve' self.survey_section_type = section_type # Add Section No for each continuous section group_keys = [ self.survey_development, self.survey_type, self.survey_pipeline_group, self.survey_pipeline ] section_no = np.zeros(self.survey_kp.shape[0], dtype=int) prev_keys = None prev_type = None current_section = 0 for i in range(self.survey_kp.shape[0]): keys = tuple(key[i] for key in group_keys) curr_type = section_type[i] if keys != prev_keys: current_section = 1 elif curr_type != prev_type: current_section += 1 section_no[i] = current_section prev_keys = keys prev_type = curr_type self.survey_section_no = section_no def _add_feature_labels(self): """ Adds a new attribute 'survey_feature' to the survey points. For each feature (PWMD, LBMD, ILT) in the route, finds the closest survey point (matching development, pipeline group, pipeline) in KP and adds the feature name at that position. All other survey points have an empty string. """ feature_types = ['PWMD', 'LBMD', 'ILT'] for i, section_type in enumerate(self.route_pipeline_section_type): if section_type in feature_types: dev = self.route_development[i] group = self.route_pipeline_group[i] pipe = self.route_pipeline[i] kp = self.route_kp_from[i] # Find matching survey points mask = ( (self.survey_development == dev) & (self.survey_pipeline_group == group) & (self.survey_pipeline == pipe) ) if not np.any(mask): continue kp_survey = self.survey_kp[mask] idx_in_mask = np.argmin(np.abs(kp_survey - kp)) idx = np.where(mask)[0][idx_in_mask] # Only assign if not last index, or if kp matches exactly if idx_in_mask != len(kp_survey) - 1 or np.isclose(kp, kp_survey[idx_in_mask]): self.survey_feature[idx] = section_type def _add_design_route_curve_radius(self): """ Add the 'Design Route Curve Radius' from route data to each survey point where the section type is 'Curve'. For each survey point with section type 'Curve', finds the matching route section (by development, pipeline group, pipeline, and KP within [KP From, KP To]) and adds the corresponding route curve radius to self.survey_design_route_curve_radius. Updates ------- self.survey_design_route_curve_radius : np.ndarray Design route curve radii for each survey point (NaN for non-curve points). Returns ------- None """ mask_curve = self.survey_section_type == 'Curve' for i, _ in enumerate(self.route_development): cond = ( (self.survey_development == self.route_development[i]) & (self.survey_pipeline_group == self.route_pipeline_group[i]) & (self.survey_pipeline == self.route_pipeline[i]) & (self.survey_kp >= self.route_kp_from[i]) & (self.survey_kp <= self.route_kp_to[i]) & mask_curve ) self.survey_design_route_curve_radius[cond] = self.route_curve_radius[i] def _add_actual_route_curve_radius(self): """ Fit a circle to each group of survey points where the section type is 'Curve' and add the fitted radius. For each group of survey points (grouped by development, pipeline group, pipeline, survey type, and section number) where all points are of type 'Curve' and there are more than 3 points, fit a circle to the (easting, northing) coordinates and add the resulting radius to self.survey_actual_route_curve_radius for those points. Non-curve points or groups with insufficient points remain NaN. Updates ------- self.survey_actual_route_curve_radius : np.ndarray Fitted curve radii for each survey point (NaN for non-curve points or insufficient data). Returns ------- None """ def calc_radius(x, y, xc, yc): return np.sqrt((x - xc) ** 2 + (y - yc) ** 2) def fit_circle(x, y): x_m = np.mean(x) y_m = np.mean(y) def f(c): radius_i = calc_radius(x, y, *c) return radius_i - radius_i.mean() center_estimate = x_m, y_m center, _ = leastsq(f, center_estimate) xc, yc = center radius_i = calc_radius(x, y, xc, yc) radius = radius_i.mean() return radius # Group by (development, survey_type, survey_pipeline_group, # survey_pipeline, survey_section_no) group_tuples = list(zip( self.survey_development, self.survey_type, self.survey_pipeline_group, self.survey_pipeline, self.survey_section_no )) unique_groups = set(group_tuples) for group in unique_groups: mask = ( (self.survey_development == group[0]) & (self.survey_type == group[1]) & (self.survey_pipeline_group == group[2]) & (self.survey_pipeline == group[3]) & (self.survey_section_no == group[4]) ) if np.all(self.survey_section_type[mask] == 'Curve') and np.sum(mask) > 3: x = self.survey_easting[mask] y = self.survey_northing[mask] radius = fit_circle(x, y) self.survey_actual_route_curve_radius[mask] = radius def _transform_pipeline_section_coordinates(self): """ Compute and add section-modified coordinates and KP for each pipeline section. For each group of survey points (grouped by development, pipeline group, pipeline, survey type, and section number): - Shift Easting/Northing so the section starts at (0,0) - Rotate the section so it ends at (d, 0) (horizontal line) - Compute a section-modified KP (distance along the section, starting at 0) The results are stored in self.survey_section_easting_mod, self.survey_section_northing_mod, and self.survey_section_kp_mod for each survey point. Updates ------- self.survey_section_easting_mod : np.ndarray Section-modified easting coordinates for each survey point. self.survey_section_northing_mod : np.ndarray Section-modified northing coordinates for each survey point. self.survey_section_kp_mod : np.ndarray Section-modified KP (distance along section, starting at 0) for each survey point. Returns ------- None """ group_tuples = list(zip( self.survey_development, self.survey_type, self.survey_pipeline_group, self.survey_pipeline, self.survey_section_no )) unique_groups = set(group_tuples) for group in unique_groups: mask = ( (self.survey_development == group[0]) & (self.survey_type == group[1]) & (self.survey_pipeline_group == group[2]) & (self.survey_pipeline == group[3]) & (self.survey_section_no == group[4]) ) idx = np.where(mask)[0] if len(idx) == 0: continue x = self.survey_easting[idx] y = self.survey_northing[idx] x0, y0 = x[0], y[0] x_shift = x - x0 y_shift = y - y0 dx = x_shift[-1] dy = y_shift[-1] theta = -np.arctan2(dy, dx) x_mod = x_shift * np.cos(theta) - y_shift * np.sin(theta) y_mod = x_shift * np.sin(theta) + y_shift * np.cos(theta) kp_mod = np.append(0, np.cumsum(np.diff(self.survey_kp[idx]))) self.survey_section_easting_mod[idx] = x_mod self.survey_section_northing_mod[idx] = y_mod self.survey_section_kp_mod[idx] = kp_mod def _trim_feature_kps(self): """ Remove survey points where KP is within ±0.2 km of any KP From in the route data for matching development, pipeline group, and pipeline. Applies a mask to all survey arrays, shortening them in place. This is typically used to remove points near mitigation features (e.g., LBMD, PWMD, ILT) from the survey data. Updates ------- All survey arrays are filtered in place, reducing their length. Returns ------- None """ mitigation_mask = np.isin(self.route_pipeline_section_type, ['LBMD', 'PWMD', 'ILT']) keep_mask = np.ones(self.survey_kp.shape[0], dtype=bool) for i in np.where(mitigation_mask)[0]: cond = ( (self.survey_development == self.route_development[i]) & (self.survey_pipeline_group == self.route_pipeline_group[i]) & (self.survey_pipeline == self.route_pipeline[i]) & (np.abs(self.survey_kp - self.route_kp_from[i]) <= 0.2) ) keep_mask = keep_mask & ~cond # Apply mask to all survey arrays self.survey_development = self.survey_development[keep_mask] self.survey_type = self.survey_type[keep_mask] self.survey_pipeline_group = self.survey_pipeline_group[keep_mask] self.survey_pipeline = self.survey_pipeline[keep_mask] self.survey_kp = self.survey_kp[keep_mask] self.survey_easting = self.survey_easting[keep_mask] self.survey_northing = self.survey_northing[keep_mask] self.survey_section_type = self.survey_section_type[keep_mask] self.survey_section_no = self.survey_section_no[keep_mask] self.survey_section_kp_mod = self.survey_section_kp_mod[keep_mask] self.survey_section_easting_mod = self.survey_section_easting_mod[keep_mask] self.survey_section_northing_mod = self.survey_section_northing_mod[keep_mask] self.survey_design_route_curve_radius = self.survey_design_route_curve_radius[keep_mask] self.survey_actual_route_curve_radius = self.survey_actual_route_curve_radius[keep_mask] self.survey_feature = self.survey_feature[keep_mask] def _trim_straight_section_ends(self): """ Trim 50 meters from both ends of all Straight sections, but only when the section changes from Curve to Straight or from Straight to Curve. Applies a mask to all survey arrays, shortening them in place. Only points at the ends of straight sections adjacent to curves are removed. Updates ------- All survey arrays are filtered in place, reducing their length. Returns ------- None """ n = self.survey_kp.shape[0] keep_mask = np.ones(n, dtype=bool) # Group by (Development, Pipeline Group, Pipeline, Survey Type, Section No) group_tuples = list(zip( self.survey_development, self.survey_type, self.survey_pipeline_group, self.survey_pipeline, self.survey_section_no )) unique_groups = set(group_tuples) for group in unique_groups: mask = ( (self.survey_development == group[0]) & (self.survey_type == group[1]) & (self.survey_pipeline_group == group[2]) & (self.survey_pipeline == group[3]) & (self.survey_section_no == group[4]) ) idx = np.where(mask)[0] if len(idx) == 0: continue if not np.all(self.survey_section_type[idx] == 'Straight'): continue # Find previous and next section types prev_idx = idx[0] - 1 if idx[0] > 0 else None next_idx = idx[-1] + 1 if idx[-1] < n - 1 else None prev_type = self.survey_section_type[prev_idx] if prev_idx is not None else None next_type = self.survey_section_type[next_idx] if next_idx is not None else None # Only trim if adjacent to a Curve if prev_type == 'Curve': kp0 = self.survey_kp[idx].min() trim_mask = (self.survey_kp[idx] - kp0) < 0.05 keep_mask[idx[trim_mask]] = False if next_type == 'Curve': kp1 = self.survey_kp[idx].max() trim_mask = (kp1 - self.survey_kp[idx]) < 0.05 keep_mask[idx[trim_mask]] = False # Apply mask to all survey arrays self.survey_development = self.survey_development[keep_mask] self.survey_type = self.survey_type[keep_mask] self.survey_pipeline_group = self.survey_pipeline_group[keep_mask] self.survey_pipeline = self.survey_pipeline[keep_mask] self.survey_kp = self.survey_kp[keep_mask] self.survey_easting = self.survey_easting[keep_mask] self.survey_northing = self.survey_northing[keep_mask] self.survey_section_type = self.survey_section_type[keep_mask] self.survey_section_no = self.survey_section_no[keep_mask] self.survey_section_kp_mod = self.survey_section_kp_mod[keep_mask] self.survey_section_easting_mod = self.survey_section_easting_mod[keep_mask] self.survey_section_northing_mod = self.survey_section_northing_mod[keep_mask] self.survey_design_route_curve_radius = self.survey_design_route_curve_radius[keep_mask] self.survey_actual_route_curve_radius = self.survey_actual_route_curve_radius[keep_mask] self.survey_feature = self.survey_feature[keep_mask] def _add_pipeline_group_section(self): """ Adds group-level section type ('Straight' or 'Curve') and section number to each survey point. For each survey point, adds the group section type (same as section type) and a unique group section number within each (development, pipeline group, survey type) group. The section number increments for each new continuous section of the same type. Updates ------- self.survey_group_section_type : np.ndarray Group section type for each survey point. self.survey_group_section_no : np.ndarray Group section number for each survey point. Returns ------- None """ # Add group section number current_section = 0 group_keys1 = [self.survey_development, self.survey_type, self.survey_pipeline_group] group_keys2 = group_keys1 + [self.survey_pipeline, self.survey_section_no] prev_keys1 = None prev_keys2 = None prev_kp = None # Group no. loop n = self.survey_kp.shape[0] group_section_no = np.zeros(n, dtype=int) for i in range(n): keys1 = tuple(key[i] for key in group_keys1) keys2 = tuple(key[i] for key in group_keys2) curr_kp = self.survey_kp[i] if keys1 != prev_keys1: current_section = 1 # Reset to 1 when group_keys changes elif keys2 != prev_keys2: current_section += 1 # Increase if section type changes elif prev_kp is not None and curr_kp - prev_kp > 0.350: # +/-200m removed at features current_section += 1 group_section_no[i] = current_section prev_keys1 = keys1 prev_keys2 = keys2 prev_kp = curr_kp self.survey_group_section_no = group_section_no self.survey_group_section_type = self.survey_section_type.copy() def _transform_pipeline_group_section_coordinates(self): # pylint: disable=too-many-statements """ Compute and add group section-modified coordinates and KP for each pipeline group section. For each group of survey points (grouped by development, pipeline group, survey type, group section type, and group section number): - Shift Easting/Northing so the section starts at (0,0) - Rotate the section so it ends at (d, 0) (horizontal line) - Compute a group section-modified KP (distance along the section, starting at 0) The results are stored in self.survey_group_section_easting_mod, self.survey_group_section_northing_mod, and self.survey_group_section_kp_mod for each survey point. Updates ------- self.survey_group_section_easting_mod : np.ndarray Group section-modified easting coordinates for each survey point. self.survey_group_section_northing_mod : np.ndarray Group section-modified northing coordinates for each survey point. self.survey_group_section_kp_mod : np.ndarray Group section-modified KP (distance along section, in km) for each survey point. Returns ------- None """ n = self.survey_kp.shape[0] self.survey_group_section_easting_mod = np.full(n, np.nan, dtype=float) self.survey_group_section_northing_mod = np.full(n, np.nan, dtype=float) self.survey_group_section_kp_mod = np.full(n, np.nan, dtype=float) # Group by (Development, Pipeline Group, Survey Type, Group Section Type) group_tuples = list(zip( self.survey_development, self.survey_type, self.survey_pipeline_group, self.survey_group_section_type )) unique_groups = set(group_tuples) for group in unique_groups: mask = ( (self.survey_development == group[0]) & (self.survey_type == group[1]) & (self.survey_pipeline_group == group[2]) & (self.survey_group_section_type == group[3]) ) idx = np.where(mask)[0] # Sort idx by group section no to ensure correct order section_nos = self.survey_group_section_no[idx] unique_section_nos = np.unique(section_nos) shift_x = 0.0 shift_y = 0.0 for index, sec_no in enumerate(unique_section_nos): sec_mask = idx[section_nos == sec_no] x = self.survey_section_easting_mod[sec_mask] y = self.survey_section_northing_mod[sec_mask] # Compute average slope (angle) for the first 20 and last 20 points n_points = len(x) n_avg = min(20, n_points // 2) # Use up to 20, but not more than half the section # If not the first section, align to average direction between last N points of # previous section and first N of current using robust vector method if index == 0: theta = 0.0 else: # Last n_avg points of previous section x_prev = x_prev_all[-prev_n_avg:] y_prev = y_prev_all[-prev_n_avg:] # First n_avg points of current section x_curr = x[:n_avg] y_curr = y[:n_avg] # Best linear fit for previous section if ( len(x_prev) < 2 or np.any(np.isnan(x_prev)) or np.any(np.isnan(y_prev)) or np.any(np.isinf(x_prev)) or np.any(np.isinf(y_prev)) or np.all(x_prev == x_prev[0]) ): # Fallback: set coeffs_prev to [0, 0] or skip this section coeffs_prev = [0, 0] else: coeffs_prev = np.polyfit(x_prev, y_prev, 1) slope_prev = coeffs_prev[0] v1 = np.array([1.0, slope_prev]) # direction vector from slope v1_u = v1 / np.linalg.norm(v1) # Best linear fit for current section if ( len(x_curr) < 2 or np.any(np.isnan(x_curr)) or np.any(np.isnan(y_curr)) or np.any(np.isinf(x_curr)) or np.any(np.isinf(y_curr)) or np.all(x_curr == x_curr[0]) ): coeffs_curr = [0, 0] else: coeffs_curr = np.polyfit(x_curr, y_curr, 1) slope_curr = coeffs_curr[0] v2 = np.array([1.0, slope_curr]) v2_u = v2 / np.linalg.norm(v2) # Angle between the two vectors dot = np.dot(v1_u, v2_u) det = v1_u[0]*v2_u[1] - v1_u[1]*v2_u[0] theta = -np.arctan2(det, dot) # Previous section prev_n_avg = n_avg # Shift to origin (0,0) x_to_origin = x - x[0] y_to_origin = y - y[0] # Apply rotation x_rot = x_to_origin * np.cos(theta) - y_to_origin * np.sin(theta) y_rot = x_to_origin * np.sin(theta) + y_to_origin * np.cos(theta) # Save previous x_prev_all = x_rot y_prev_all = y_rot # Apply shift for continuity x_mod = x_rot + shift_x y_mod = y_rot + shift_y # Define variables kp_mod = np.insert(np.cumsum(np.diff(self.survey_kp[sec_mask])), 0, 0) self.survey_group_section_easting_mod[sec_mask] = x_mod self.survey_group_section_northing_mod[sec_mask] = y_mod self.survey_group_section_kp_mod[sec_mask] = kp_mod # Prepare shift for next section: last point of this section shift_x = x_mod[-1] shift_y = y_mod[-1] # Align all sections to the x-axis for group in unique_groups: mask = ( (self.survey_development == group[0]) & (self.survey_type == group[1]) & (self.survey_pipeline_group == group[2]) & (self.survey_group_section_type == group[3]) ) idx = np.where(mask)[0] # Use the group section modified coordinates x = self.survey_group_section_easting_mod[idx] y = self.survey_group_section_northing_mod[idx] # Compute angle to rotate so last point is on the x-axis dx = x[-2] - x[1] dy = y[-2] - y[1] theta = -np.arctan2(dy, dx) # Shift so the second point is at the origin x_shift = x - x[1] y_shift = y - y[1] # Rotate all points x_aligned = x_shift * np.cos(theta) - y_shift * np.sin(theta) + x[1] y_aligned = x_shift * np.sin(theta) + y_shift * np.cos(theta) # Store the aligned coordinates back self.survey_group_section_easting_mod[idx] = x_aligned self.survey_group_section_northing_mod[idx] = y_aligned # Ensure the max magnitude of northing is positive max_north = np.max(self.survey_group_section_northing_mod[idx]) min_north = np.min(self.survey_group_section_northing_mod[idx]) if abs(max_north) < abs(min_north): # Flip the sign of northing self.survey_group_section_northing_mod[idx] = ( -self.survey_group_section_northing_mod[idx] )
[docs] def get_route_development(self): """ Get development identifiers for each route section. Returns ------- np.ndarray Development identifiers for each route section. """ return self.route_development
[docs] def get_route_pipeline_group(self): """ Get pipeline group identifiers for each route section. Returns ------- np.ndarray Pipeline group identifiers for each route section. """ return self.route_pipeline_group
[docs] def get_route_pipeline(self): """ Get pipeline identifiers for each route section. Returns ------- np.ndarray Pipeline identifiers for each route section. """ return self.route_pipeline
[docs] def get_route_kp_from(self): """ Get start KP (kilometer point) for each route section. Returns ------- np.ndarray Start KP (kilometer point) for each route section. """ return self.route_kp_from
[docs] def get_route_kp_to(self): """ Get end KP (kilometer point) for each route section. Returns ------- np.ndarray End KP (kilometer point) for each route section. """ return self.route_kp_to
[docs] def get_route_pipeline_section_type(self): """ Get section type (e.g., 'Straight', 'Curve', etc.) for each route section. Returns ------- np.ndarray Section type for each route section. """ return self.route_pipeline_section_type
[docs] def get_route_curve_radius(self): """ Get design curve radius for each route section. Returns ------- np.ndarray Design curve radius for each route section. """ return self.route_curve_radius
[docs] def get_survey_development(self): """ Get development identifiers for each survey point. Returns ------- np.ndarray Development identifiers for each survey point. """ return self.survey_development
[docs] def get_survey_type(self): """ Get survey type for each survey point. Returns ------- np.ndarray Survey type for each survey point. """ return self.survey_type
[docs] def get_survey_pipeline_group(self): """ Get pipeline group identifiers for each survey point. Returns ------- np.ndarray Pipeline group identifiers for each survey point. """ return self.survey_pipeline_group
[docs] def get_survey_pipeline(self): """ Get pipeline identifiers for each survey point. Returns ------- np.ndarray Pipeline identifiers for each survey point. """ return self.survey_pipeline
[docs] def get_survey_kp(self): """ Get KP (kilometer point) for each survey point. Returns ------- np.ndarray KP (kilometer point) for each survey point. """ return self.survey_kp
[docs] def get_survey_easting(self): """ Get easting coordinate for each survey point. Returns ------- np.ndarray Easting coordinate for each survey point. """ return self.survey_easting
[docs] def get_survey_northing(self): """ Get northing coordinate for each survey point. Returns ------- np.ndarray Northing coordinate for each survey point. """ return self.survey_northing
[docs] def get_survey_section_type(self): """ Get section type ('Straight', 'Curve', etc.) for each survey point. Returns ------- np.ndarray Section type for each survey point. """ return self.survey_section_type
[docs] def get_survey_section_no(self): """ Get section number for each survey point. Returns ------- np.ndarray Section number for each survey point. """ return self.survey_section_no
[docs] def get_survey_section_kp_mod(self): """ Get section-modified KP for each survey point. Returns ------- np.ndarray Section-modified KP for each survey point. """ return self.survey_section_kp_mod
[docs] def get_survey_section_easting_mod(self): """ Get section-modified easting coordinates for each survey point. Returns ------- np.ndarray Section-modified easting coordinates for each survey point. """ return self.survey_section_easting_mod
[docs] def get_survey_section_northing_mod(self): """ Get section-modified northing coordinates for each survey point. Returns ------- np.ndarray Section-modified northing coordinates for each survey point. """ return self.survey_section_northing_mod
[docs] def get_survey_feature(self): """ Get feature label (e.g., 'PWMD', 'LBMD', 'ILT') for each survey point. Returns ------- np.ndarray Feature label for each survey point. """ return self.survey_feature
[docs] def get_survey_design_route_curve_radius(self): """ Get design route curve radius for each survey point. Returns ------- np.ndarray Design route curve radius for each survey point. """ return self.survey_design_route_curve_radius
[docs] def get_survey_actual_route_curve_radius(self): """ Get fitted curve radius for each survey point. Returns ------- np.ndarray Fitted curve radius for each survey point. """ return self.survey_actual_route_curve_radius
[docs] def get_survey_group_section_no(self): """ Get group section number for each survey point. Returns ------- np.ndarray Group section number for each survey point. """ return self.survey_group_section_no
[docs] def get_survey_group_section_type(self): """ Get group section type for each survey point. Returns ------- np.ndarray Group section type for each survey point. """ return self.survey_group_section_type
[docs] def get_survey_group_section_kp_mod(self): """ Get group section-modified KP for each survey point. Returns ------- np.ndarray Group section-modified KP for each survey point. """ return self.survey_group_section_kp_mod
[docs] def get_survey_group_section_easting_mod(self): """ Get group section-modified easting coordinates for each survey point. Returns ------- np.ndarray Group section-modified easting coordinates for each survey point. """ return self.survey_group_section_easting_mod
[docs] def get_survey_group_section_northing_mod(self): """ Get group section-modified northing coordinates for each survey point. Returns ------- np.ndarray Group section-modified northing coordinates for each survey point. """ return self.survey_group_section_northing_mod
[docs] class OOSDespiker: # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-locals """ This class identifies and removes outlier values from survey data using a rolling window Hampel filter algorithm. Outliers are replaced with the median or NaN, preserving the original data alignment and group structure. Parameters ---------- development : array-like, optional Development identifier for each survey point. If not provided, a default value is used. survey_type : array-like, optional Survey type for each survey point. If not provided, a default value is used. pipeline_group : array-like, optional Pipeline group for each survey point. If not provided, a default value is used. group_section_type : array-like, optional Group section type for each survey point. If not provided, a default value is used. x : array-like Coordinates associated with the signal for each survey point. These can be either easting values or arc lengths, depending on context. y : array-like Signal values for each survey point. These can be either northing values or curvatures, depending on context. window : int, optional Size of the rolling window for sigma-clipping (default: 11). """ def __init__( self, *, development=None, survey_type=None, pipeline_group=None, group_section_type=None, x, y, window=11 ): """ Initialize an OOSAnonymisation object with route and survey data. """ # Convert to arrays self.x = np.asarray(x) self.y = np.asarray(y) self.window = window if development is None: self.development = np.full(self.y.size, 'Predefined', dtype=object) else: self.development = np.asarray(development, dtype=object) if survey_type is None: self.survey_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.survey_type = np.asarray(survey_type, dtype=object) if pipeline_group is None: self.pipeline_group = np.full(self.y.size, 'Predefined', dtype=object) else: self.pipeline_group = np.asarray(pipeline_group, dtype=object) if group_section_type is None: self.group_section_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.group_section_type = np.asarray(group_section_type, dtype=object) # Initialize group-related attributes self.group_tuples = list(zip( self.development, self.survey_type, self.pipeline_group, self.group_section_type, )) self.unique_groups = sorted(set(self.group_tuples), key=self.group_tuples.index) # Call the processing function self._process() def _process(self): """ Despike outliers in using a rolling window Hampel filter algorithm. Outliers are replaced with median or NaN. """ def hampel_filter_median_aligned(x_g, y_g, window): def backward_gradient(y, x): grad = np.empty_like(y) grad[0] = (y[1] - y[0]) / (x[1] - x[0]) # forward difference at start grad[1:] = (y[1:] - y[:-1]) / (x[1:] - x[:-1]) # backward difference for others return grad n = len(y_g) x_g_despiked = np.full_like(x_g, np.nan, dtype=float) y_g_despiked = np.full_like(y_g, np.nan, dtype=float) half_win = window // 2 for i in range(n): i_start = max(0, i - half_win) i_end = min(n, i + half_win + 1) x_window = x_g[i_start:i_end] y_window = y_g[i_start:i_end] if len(x_window) < window: pad_left = half_win - (i - i_start) pad_right = half_win - (i_end - i - 1) x_window = np.pad(x_window, (pad_left, pad_right), mode='edge') y_window = np.pad(y_window, (pad_left, pad_right), mode='edge') # Align the window x_shift = x_window - x_window[0] y_shift = y_window - y_window[0] dx = x_shift[-1] dy = y_shift[-1] theta = -np.arctan2(dy, dx) x_aligned = x_shift * np.cos(theta) - y_shift * np.sin(theta) y_aligned = x_shift * np.sin(theta) + y_shift * np.cos(theta) center_idx = len(y_aligned) // 2 # Hampel on signal m = np.nanmedian(y_aligned) mad = 1.4826 * np.nanmedian(np.abs(y_aligned - m)) r = np.abs(y_aligned[center_idx] - m) / np.maximum(mad, 1e-12) spike_sig = r > 3.5 # Hampel on gradient if np.allclose(x_aligned, x_aligned[0]) or np.any(np.diff(x_aligned) == 0): g = np.zeros_like(y_aligned) else: g = backward_gradient(y_aligned, x_aligned) mg = np.nanmedian(g) madg = 1.4826 * np.nanmedian(np.abs(g - mg)) rg = np.abs(g[center_idx] - mg) / np.maximum(madg, 1e-12) spike_grad = rg > 3.0 spikes = spike_sig & spike_grad # Replace center value if it's a spike if spikes: # Replace with local median in aligned frame interp_x = x_aligned[center_idx] interp_y = m # Inverse rotation and translation x_inv = interp_x * np.cos(-theta) - interp_y * np.sin(-theta) y_inv = interp_x * np.sin(-theta) + interp_y * np.cos(-theta) x_orig = x_inv + x_window[0] y_orig = y_inv + y_window[0] x_g_despiked[i] = x_orig y_g_despiked[i] = y_orig else: x_g_despiked[i] = x_window[center_idx] y_g_despiked[i] = y_window[center_idx] return x_g_despiked, y_g_despiked # Initialise despiked arrays self.x_despike_mean_aligned = np.full_like(self.x, np.nan, dtype=float) self.y_despike_mean_aligned = np.full_like(self.y, np.nan, dtype=float) for group in self.unique_groups: # Create mask for current group mask = ( (self.development == group[0]) & (self.survey_type == group[1]) & (self.pipeline_group == group[2]) & (self.group_section_type == group[3]) ) # Get indices and values for the current group idx = np.where(mask)[0] x_g = self.x[idx] y_g = self.y[idx] n = len(y_g) if n < self.window: self.y_despike_mean_aligned[idx] = y_g continue # Apply Hampel filter using a rolling window x_g_despiked_mean_aligned, y_g_despiked_mean_aligned = ( hampel_filter_median_aligned( x_g, y_g, self.window ) ) self.x_despike_mean_aligned[idx] = x_g_despiked_mean_aligned self.y_despike_mean_aligned[idx] = y_g_despiked_mean_aligned
[docs] def get_x_y_despike_median_aligned(self): """ Get the despiked y values (i.e., northings or curvatures) for each survey point. Within the rolling window, the starting point is translated to the origin, and the line joining the starting and ending points is rotated to be horizontal. Returns the median of the x and y values within the window. Returns ------- np.ndarray Despiked x values for each survey point. np.ndarray Despiked y values for each survey point. """ return self.x_despike_mean_aligned, self.y_despike_mean_aligned
[docs] class OOSCurvature: # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-locals """ Computes geometric curvature, arc length, and tangent angle for grouped pipeline survey data. This class calculates the arc length, tangent angle, and curvature of the pipeline defined by easting and northing coordinates, for each group section. The results are aligned with the original data and are useful for geometric analysis and further signal processing. Parameters ---------- development : array-like, optional Development identifier for each survey point. If not provided, a default value is used. survey_type : array-like, optional Survey type for each survey point. If not provided, a default value is used. pipeline_group : array-like, optional Pipeline group for each survey point. If not provided, a default value is used. group_section_type : array-like, optional Group section type for each survey point. If not provided, a default value is used. x : array-like These can be either eastings or anonymised eastings, depending on context. y : array-like These can be either northings or anonymised northings, depending on context. """ def __init__( self, *, development=None, survey_type=None, pipeline_group=None, group_section_type=None, x, y ): """ Initialize an OOSAnonymisation object with route and survey data. """ # Convert to arrays self.x = np.asarray(x) self.y = np.asarray(y) if development is None: self.development = np.full(self.y.size, 'Predefined', dtype=object) else: self.development = np.asarray(development, dtype=object) if survey_type is None: self.survey_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.survey_type = np.asarray(survey_type, dtype=object) if pipeline_group is None: self.pipeline_group = np.full(self.y.size, 'Predefined', dtype=object) else: self.pipeline_group = np.asarray(pipeline_group, dtype=object) if group_section_type is None: self.group_section_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.group_section_type = np.asarray(group_section_type, dtype=object) # Initialize group-related attributes self.group_tuples = list(zip( self.development, self.survey_type, self.pipeline_group, self.group_section_type, )) self.unique_groups = sorted(set(self.group_tuples), key=self.group_tuples.index) # Initialize attributes self.arc_length = np.full(self.x.shape[0], np.nan, dtype=float) self.angle = np.full(self.x.shape[0], np.nan, dtype=float) self.curvature = np.full(self.x.shape[0], np.nan, dtype=float) # Call the processing function self._process() def _process(self): """ Calculate the arc length, angle and curvature of the pipeline defined by easting and northing coordinates, for each group section. The result is aligned with the original data. Central differences are used within the central points, and forward and backward differences are used at the first and last point of each group. It also calculates the arc length and angle. """ # Re-initialise arc length and curvature array self.arc_length = np.full(self.x.shape[0], np.nan, dtype=float) self.angle = np.full(self.x.shape[0], np.nan, dtype=float) self.curvature = np.full(self.x.shape[0], np.nan, dtype=float) # Create group tuples group_tuples = list(zip( self.development, self.survey_type, self.pipeline_group, self.group_section_type )) unique_groups = set(group_tuples) # Iterate over each unique group for group in unique_groups: mask = ( (self.development == group[0]) & (self.survey_type == group[1]) & (self.pipeline_group == group[2]) & (self.group_section_type == group[3]) ) idx = np.where(mask)[0] # Coordinate vector if len(idx) < 3: continue # Not enough points to compute curvature x = self.x[idx] y = self.y[idx] coords = np.column_stack((x ,y)) # Remove consecutive duplicate points diffs = np.diff(coords, axis=0) nonzero = np.any(diffs != 0, axis=1) keep = np.insert(nonzero, 0, True) # Always keep the first point coords_dedup = coords[keep] x_dedup = coords_dedup[:, 0] n = len(x_dedup) curvature_full_dedup = np.full(n, np.nan, dtype=float) angle_full_dedup = np.full(n, np.nan, dtype=float) arc_length_full_dedup = np.full(n, np.nan, dtype=float) if n < 3: # Not enough points after deduplication curvature_full = np.full(len(idx), np.nan, dtype=float) angle_full = np.full(len(idx), np.nan, dtype=float) arc_length_full = np.full(len(idx), np.nan, dtype=float) self.arc_length[idx] = arc_length_full self.angle[idx] = angle_full self.curvature[idx] = curvature_full continue # Forward difference for the first point v1_f = coords_dedup[1] - coords_dedup[0] v2_f = coords_dedup[2] - coords_dedup[1] norm_v1_f = np.linalg.norm(v1_f) norm_v2_f = np.linalg.norm(v2_f) ds_f = norm_v1_f tan1_f = v1_f / norm_v1_f if norm_v1_f != 0 else np.zeros_like(v1_f) tan2_f = v2_f / norm_v2_f if norm_v2_f != 0 else np.zeros_like(v2_f) cross_f = tan1_f[0] * tan2_f[1] - tan1_f[1] * tan2_f[0] arc_length_full_dedup[0] = 0.0 # ds_f angle_full_dedup[0] = 0.0 # np.arctan2(v1_f[1], v1_f[0]) curvature_full_dedup[0] = 0.0 # cross_f / ds_f if ds_f != 0 else 0.0 # Central differences for internal points p_prev = coords_dedup[:-2] p_curr = coords_dedup[1:-1] p_next = coords_dedup[2:] v1 = p_curr - p_prev v2 = p_next - p_curr norm_v1 = np.linalg.norm(v1, axis=1) norm_v2 = np.linalg.norm(v2, axis=1) with np.errstate(divide='ignore', invalid='ignore'): tan1 = np.divide(v1, norm_v1[:, None], out=np.zeros_like(v1), where=norm_v1[:, None] != 0) tan2 = np.divide(v2, norm_v2[:, None], out=np.zeros_like(v2), where=norm_v2[:, None] != 0) ds = 0.5 * (norm_v1 + norm_v2) cross = tan1[:, 0] * tan2[:, 1] - tan1[:, 1] * tan2[:, 0] curvature = np.divide(cross, ds, out=np.zeros_like(cross), where=ds != 0) arc_length_full_dedup[1:-1] = ds angle_full_dedup[1:-1] = np.arctan2(v2[:, 1], v2[:, 0]) curvature_full_dedup[1:-1] = curvature # Backward difference for the last point v1_b = coords_dedup[-2] - coords_dedup[-3] v2_b = coords_dedup[-1] - coords_dedup[-2] norm_v1_b = np.linalg.norm(v1_b) norm_v2_b = np.linalg.norm(v2_b) ds_b = norm_v2_b tan1_b = v1_b / norm_v1_b if norm_v1_b != 0 else np.zeros_like(v1_b) tan2_b = v2_b / norm_v2_b if norm_v2_b != 0 else np.zeros_like(v2_b) cross_b = tan1_b[0] * tan2_b[1] - tan1_b[1] * tan2_b[0] arc_length_full_dedup[-1] = 0.0 # ds_b angle_full_dedup[-1] = 0.0 # np.arctan2(v2_b[1], v2_b[0]) curvature_full_dedup[-1] = 0.0 # cross_b / ds_b if ds_b != 0 else 0.0 # Cumulative arc length arc_length_cumsum = np.cumsum(arc_length_full_dedup) # Map de-duplicated results back to original indices curvature_full = np.full(len(idx), np.nan, dtype=float) angle_full = np.full(len(idx), np.nan, dtype=float) arc_length_full = np.full(len(idx), np.nan, dtype=float) orig_idx_dedup = np.where(keep)[0] curvature_full[orig_idx_dedup] = curvature_full_dedup angle_full[orig_idx_dedup] = angle_full_dedup arc_length_full[orig_idx_dedup] = arc_length_cumsum # Update survey group attributes self.arc_length[idx] = arc_length_full self.angle[idx] = angle_full self.curvature[idx] = curvature_full
[docs] def get_arc_length(self): """ Get the arc length of the pipeline. Returns ------- np.ndarray Arc length for each survey point. """ return self.arc_length
[docs] def get_angle(self): """ Get the tangent angle of the pipeline. Returns ------- np.ndarray Tangent angle for each survey point. """ return self.angle
[docs] def get_curvature(self): """ Get the curvature of the pipeline. Returns ------- np.ndarray Curvature for each survey point. """ return self.curvature
[docs] class FFTSmoother: # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-locals """ Class to perform FFT smoothing and spectral analysis on the pipeline survey data, grouped by (development, survey type, pipeline group, group section type). Parameters ---------- development : array-like, optional Development identifier for each survey point. If not provided, a default value is used. survey_type : array-like, optional Survey type for each survey point. If not provided, a default value is used. pipeline_group : array-like, optional Pipeline group for each survey point. If not provided, a default value is used. group_section_type : array-like, optional Group section type for each survey point. If not provided, a default value is used. x : array_like, mandatory Coordinates associated with the signal for each survey point. These can be either anonymised easting values or arc lengths, depending on context. y : array_like, mandatory Signal values for each survey point. These can be either anonymised northing values or curvatures, depending on context. cutoff : float or None, mandatory Wavelength or curvature cutoff for FFT filtering. If None, no filtering is applied. """ def __init__( self, *, development=None, survey_type=None, pipeline_group=None, group_section_type=None, x, y, cutoff ): """ Initialize an OOSAnonymisation object with route and survey data. """ # Convert to arrays self.x = np.asarray(x) self.y = np.asarray(y) if development is None: self.development = np.full(self.y.size, 'Predefined', dtype=object) else: self.development = np.asarray(development, dtype=object) if survey_type is None: self.survey_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.survey_type = np.asarray(survey_type, dtype=object) if pipeline_group is None: self.pipeline_group = np.full(self.y.size, 'Predefined', dtype=object) else: self.pipeline_group = np.asarray(pipeline_group, dtype=object) if group_section_type is None: self.group_section_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.group_section_type = np.asarray(group_section_type, dtype=object) # Initialize group-related attributes self.group_tuples = list(zip( self.development, self.survey_type, self.pipeline_group, self.group_section_type, )) self.unique_groups = sorted(set(self.group_tuples), key=self.group_tuples.index) self.cutoff = cutoff # Initialize _filter attributes self.y_smooth = np.full_like(self.y, np.nan, dtype=float) self.freqs = np.full_like(self.y, np.nan, dtype=float) self.fft = np.full_like(self.y, np.nan, dtype=complex) self.freqs_raw = np.full_like(self.y, np.nan, dtype=float) self.fft_raw = np.full_like(self.y, np.nan, dtype=complex) # Initialize _welch attributes self.psd_development = [] self.psd_survey_type = [] self.psd_pipeline_group = [] self.psd_group_section_type = [] self.psd_freqs = [] self.psd_vals = [] # Initialize _reconstruct_coordinates_from_curvature attributes self.x_recon = np.full_like(self.x, np.nan, dtype=float) self.y_recon = np.full_like(self.x, np.nan, dtype=float) # Call the processing functions self._filter() self._welch() self._reconstruct_coordinates_from_curvature() def _filter(self): """ Apply FFT low-pass filtering to each group, deduplicating (x_g, y_g) pairs within a group before filtering. The smoothed value is mapped back to all original indices for each unique pair, ensuring identical pairs get identical y_smooth values. This method also stores the raw (unfiltered) FFT and frequency values for each group in the attributes `fft_raw` and `freqs_raw`. Sets the following attributes: ------------------------------ y_smooth : ndarray Array of filtered y-values, aligned with input y (NaN for points not in a group). freqs : ndarray Array of filtered frequency values, aligned with input y (NaN for points not in a group). fft : ndarray Array of filtered FFT values, aligned with input y (NaN for points not in a group). freqs_raw : ndarray Array of raw (unfiltered) frequency values, aligned with input y (NaN for points not in a group). fft_raw : ndarray Array of raw (unfiltered) FFT values, aligned with input y (NaN for points not in a group). """ for group in self.unique_groups: # Get the mask for the current group mask = ( (self.development == group[0]) & (self.survey_type == group[1]) & (self.pipeline_group == group[2]) & (self.group_section_type == group[3]) ) # Get the data for the current group x_g = self.x[mask] y_g = self.y[mask] # Deduplicate (x_g, y_g) pairs pairs = np.column_stack((x_g, y_g)) # Discard rows with NaN in either column not_nan_mask = ~np.isnan(pairs).any(axis=1) pairs_clean = pairs[not_nan_mask] x_g_clean = x_g[not_nan_mask] y_g_clean = y_g[not_nan_mask] if len(x_g_clean) == 0: continue _, unique_idx, inverse_idx = np.unique( pairs_clean, axis=0, return_index=True, return_inverse=True ) x_g_unique = x_g_clean[unique_idx] y_g_unique = y_g_clean[unique_idx] # FFT filtering on unique pairs if len(x_g_unique) < 2: # Not enough points to filter y_smooth_unique = y_g_unique.copy() freqs_g = np.full_like(x_g_unique, np.nan, dtype=float) fft_g = np.full_like(x_g_unique, np.nan, dtype=complex) fft_vals_g = np.full_like(x_g_unique, np.nan, dtype=complex) else: dx_g = np.mean(np.diff(x_g_unique)) if dx_g == 0: continue fs_g = 1 / dx_g fft_vals_g = fft(y_g_unique) freqs_g = fftfreq(len(x_g_unique), 1 / fs_g) fft_g = np.copy(fft_vals_g) if self.cutoff is not None: fft_g[np.abs(freqs_g) > (1 / self.cutoff)] = 0 y_smooth_unique = np.real(ifft(fft_g)) # Map smoothed values back to all original indices y_smooth_g = np.full_like(y_g, np.nan, dtype=float) y_smooth_g[not_nan_mask] = y_smooth_unique[inverse_idx] freqs_g_full = np.full_like(x_g, np.nan, dtype=float) fft_g_full = np.full_like(x_g, np.nan, dtype=complex) fft_raw_full = np.full_like(x_g, np.nan, dtype=complex) freqs_raw_full = np.full_like(x_g, np.nan, dtype=float) # For each original index, assign the frequency and fft value from the unique pair for i, idx in enumerate(inverse_idx): if len(freqs_g) > idx: freqs_g_full[i] = freqs_g[idx] fft_g_full[i] = fft_g[idx] freqs_raw_full[i] = freqs_g[idx] fft_raw_full[i] = fft_vals_g[idx] # Sort by ascending frequency sort_idx = np.argsort(freqs_g_full) freqs_g_full = freqs_g_full[sort_idx] fft_g_full = fft_g_full[sort_idx] freqs_raw_full = freqs_raw_full[sort_idx] fft_raw_full = fft_raw_full[sort_idx] # Assign values self.y_smooth[mask] = y_smooth_g self.freqs[mask] = freqs_g_full self.fft[mask] = fft_g_full self.freqs_raw[mask] = freqs_raw_full self.fft_raw[mask] = fft_raw_full def _welch(self, nperseg=256): """ Compute the Power Spectral Density (PSD) using Welch's method for each group. This method estimates the distribution of signal power as a function of wavelength. By plotting the PSD against wavelength (1/frequency), you can visually identify which wavelength bands contain significant noise or signal energy. Typically, noise appears at shorter wavelengths (higher frequencies), while meaningful signal features are often found at longer wavelengths. Use this information to select an appropriate cutoff wavelength for filtering out noise. Sets the following attributes: ------------------------------ psd_development : list of ndarray For each group, an array filled with the development value, same length as the PSD frequency array. psd_survey_type : list of ndarray For each group, an array filled with the survey type value, same length as the PSD frequency array. psd_pipeline_group : list of ndarray For each group, an array filled with the pipeline group value, same length as the PSD frequency array. psd_group_section_type : list of ndarray For each group, an array filled with the group section type value, same length as the PSD frequency array. psd_freqs : list of ndarray List of frequency arrays for each group. psd_vals : list of ndarray List of PSD arrays for each group. """ for group in self.unique_groups: # Get the mask for the current group mask = ( (self.development == group[0]) & (self.survey_type == group[1]) & (self.pipeline_group == group[2]) & (self.group_section_type == group[3]) ) # Get the data for the current group x_g = self.x[mask] y_g = self.y[mask] if len(x_g) < 2 or len(y_g) < 2: continue # Remove NaN values x_g_valid = x_g[~np.isnan(x_g)] y_g_valid = y_g[~np.isnan(y_g)] # Only keep indices where both x_g and y_g are valid (not NaN) valid_mask = ~np.isnan(x_g) & ~np.isnan(y_g) x_g_valid = x_g[valid_mask] y_g_valid = y_g[valid_mask] if len(x_g_valid) < 2 or len(y_g_valid) < 2: continue dx_g = np.mean(np.diff(x_g_valid)) if dx_g == 0 or np.isnan(dx_g): continue fs_g = 1 / dx_g # Compute Welch's method nperseg_eff = min(nperseg, len(y_g_valid)) f_welch, pxx_welch = welch(y_g_valid, fs=fs_g, nperseg=nperseg_eff) # Store the results self.psd_development.append(group[0]) self.psd_survey_type.append(group[1]) self.psd_pipeline_group.append(group[2]) self.psd_group_section_type.append(group[3]) self.psd_freqs.append(f_welch) self.psd_vals.append(pxx_welch) def _reconstruct_coordinates_from_curvature(self): """ Reconstruct eastings and northings from arc length and smoothed curvature for each group. The result is aligned with self.x and self.y_smooth. Sets: self.x_recon : ndarray Reconstructed eastings (aligned with self.x, NaN for points not in a group) self.y_recon : ndarray Reconstructed northings (aligned with self.x, NaN for points not in a group) """ for group in self.unique_groups: # Get the mask for the current group mask = ( (self.development == group[0]) & (self.survey_type == group[1]) & (self.pipeline_group == group[2]) & (self.group_section_type == group[3]) ) # Get the arc length and curvature s = self.x[mask] kappa = self.y_smooth[mask] # Remove NaN from arc length and curvature valid = ~np.isnan(s) & ~np.isnan(kappa) if np.sum(valid) < 2: continue s = s[valid] kappa = kappa[valid] # Ensure s is sorted idx_sort = np.argsort(s) s = s[idx_sort] ds = np.diff(s, prepend=s[0]) kappa = kappa[idx_sort] # Shift the position of ds and kappa +1 position for dataframe alignment purposes ds_angle = np.roll(ds, 1) ds_angle[0] = 0.0 kappa[0] = 0.0 kappa = np.roll(kappa, 1) kappa[0] = 0.0 # Integrate curvature to get tangent angle theta = np.cumsum(kappa * ds_angle) # Integrate cos(theta), sin(theta) to get coordinates x_rec = np.cumsum(np.cos(theta) * ds) y_rec = np.cumsum(np.sin(theta) * ds) # Align so the line from the second point to the one before last is horizontal dx = x_rec[-2] - x_rec[1] dy = y_rec[-2] - y_rec[1] angle = -np.arctan2(dy, dx) cos_a = np.cos(angle) sin_a = np.sin(angle) x_rot = (x_rec - x_rec[1]) * cos_a - (y_rec - y_rec[1]) * sin_a + x_rec[1] y_rot = (x_rec - x_rec[1]) * sin_a + (y_rec - y_rec[1]) * cos_a # Place back into full arrays group_indices = np.where(mask)[0][valid][idx_sort] self.x_recon[group_indices] = x_rot self.y_recon[group_indices] = y_rot
[docs] def get_y_smooth(self): """ Get the FFT smoothed y values. Returns ------- np.ndarray FFT smoothed y values for each survey point. """ return self.y_smooth
[docs] def get_freqs(self): """ Get the filtered frequency values. Returns ------- np.ndarray Filtered frequency values for each survey point. """ return self.freqs
[docs] def get_fft(self): """ Get the filtered FFT values. Returns ------- np.ndarray Filtered FFT values for each survey point. """ return self.fft
[docs] def get_freqs_raw(self): """ Get the raw (unfiltered) frequency values. Returns ------- np.ndarray Raw frequency values for each survey point. """ return self.freqs_raw
[docs] def get_fft_raw(self): """ Get the raw (unfiltered) FFT values. Returns ------- np.ndarray Raw FFT values for each survey point. """ return self.fft_raw
[docs] def get_psd_development(self): """ Get the PSD development arrays. Returns ------- list PSD development arrays for each group. """ return self.psd_development
[docs] def get_psd_survey_type(self): """ Get the PSD survey type arrays. Returns ------- list PSD survey type arrays for each group. """ return self.psd_survey_type
[docs] def get_psd_pipeline_group(self): """ Get the PSD pipeline group arrays. Returns ------- list PSD pipeline group arrays for each group. """ return self.psd_pipeline_group
[docs] def get_psd_group_section_type(self): """ Get the PSD group section type arrays. Returns ------- list PSD group section type arrays for each group. """ return self.psd_group_section_type
[docs] def get_psd_freqs(self): """ Get the PSD frequency arrays. Returns ------- list PSD frequency arrays for each group. """ return self.psd_freqs
[docs] def get_psd_vals(self): """ Get the PSD values arrays. Returns ------- list PSD values arrays for each group. """ return self.psd_vals
[docs] def get_x_recon(self): """ Get the reconstructed x (easting) coordinates. Returns ------- np.ndarray Reconstructed x coordinates for each survey point. """ return self.x_recon
[docs] def get_y_recon(self): """ Get the reconstructed y (northing) coordinates. Returns ------- np.ndarray Reconstructed y coordinates for each survey point. """ return self.y_recon
[docs] class GaussianSmoother: """ Class to perform Gaussian kernel smoothing on the pipeline survey, grouped by (development, survey type, pipeline group, group section type). Parameters ---------- development : array-like, optional Development identifier for each survey point. If not provided, a default value is used. survey_type : array-like, optional Survey type for each survey point. If not provided, a default value is used. pipeline_group : array-like, optional Pipeline group for each survey point. If not provided, a default value is used. group_section_type : array-like, optional Group section type for each survey point. If not provided, a default value is used. x : array-like Anonymised eastings for each survey point. y : array-like Anonymised northings for each survey point. bandwidth : float Bandwidth for the Gaussian kernel. """ def __init__( self, *, development=None, survey_type=None, pipeline_group=None, group_section_type=None, x, y, bandwidth ): # Initialize GaussianSmoother attributes self.x = np.asarray(x) self.y = np.asarray(y) self.bandwidth = bandwidth if development is None: self.development = np.full(self.y.size, 'Predefined', dtype=object) else: self.development = np.asarray(development, dtype=object) if survey_type is None: self.survey_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.survey_type = np.asarray(survey_type, dtype=object) if pipeline_group is None: self.pipeline_group = np.full(self.y.size, 'Predefined', dtype=object) else: self.pipeline_group = np.asarray(pipeline_group, dtype=object) if group_section_type is None: self.group_section_type = np.full(self.y.size, 'Predefined', dtype=object) else: self.group_section_type = np.asarray(group_section_type, dtype=object) # Initialize group-related attributes self.group_tuples = list(zip( self.development, self.survey_type, self.pipeline_group, self.group_section_type, )) self.unique_groups = sorted(set(self.group_tuples), key=self.group_tuples.index) self.y_smooth = np.full_like(self.y, np.nan, dtype=float) # Call the processing function self._process() def _process(self): """ Apply Gaussian kernel smoothing to each group, using bandwidth. The smoothing is performed group-wise, using the same grouping as fft_filter. The result is stored in self.y_smooth, aligned with input y. """ # Compute Gaussian kernel std = 0.37 b_scaled = self.bandwidth / std for group in self.unique_groups: # Create mask for current group mask = ( (self.development == group[0]) & (self.survey_type == group[1]) & (self.pipeline_group == group[2]) & (self.group_section_type == group[3]) ) # Extract group data x_g = self.x[mask] y_g = self.y[mask] arc_length_g = np.zeros_like(x_g) arc_length_g[1:] = np.cumsum(np.sqrt(np.diff(x_g)**2 + np.diff(y_g)**2)) # Initialize smoothed output y_smooth_g = np.empty_like(y_g, dtype=float) for i in range(x_g.size): arc_left = max(arc_length_g[0], arc_length_g[i] - 2 * b_scaled) arc_right = min(arc_length_g[i] + 2 * b_scaled, arc_length_g[-1]) idx = np.where((arc_length_g >= arc_left) & (arc_length_g <= arc_right))[0] xgk = x_g[idx] ygk = y_g[idx] weight = ( 1 / ((2 * np.pi)**0.5 * std) * np.exp(-0.5 * (np.abs(xgk - x_g[i]) / b_scaled / std)**2) ) sumygk = np.dot(weight, ygk) sumweight = np.sum(weight) if sumweight != 0: y_smooth_g[i] = sumygk / sumweight elif ygk.size > 0: y_smooth_g[i] = ygk[0] else: y_smooth_g[i] = np.nan # or 0, or skip, depending on your needs self.y_smooth[mask] = y_smooth_g
[docs] def get_y_smooth(self): """ Get the smoothed y values (i.e., northings) for each survey point. Returns ------- np.ndarray Smoothed y values for each survey point. """ return self.y_smooth