Source code for dendromatics.ground

import math
import warnings

import numpy as np
from CSF_3DFin import CSF
from scipy.interpolate import griddata
from scipy.spatial import KDTree

from .primitives.clustering import DBSCAN_clustering
from .primitives.voxel import voxelate

# -----------------------------------------------------------------------------
# clean_ground
# -----------------------------------------------------------------------------


[docs] def clean_ground(cloud, res_ground=0.15, min_points=2): """This function takes a point cloud and denoises it via DBSCAN clustering. It first voxelates the point cloud, then it clusters the voxel cloud and excludes clusters of size less than the value determined by min_points. Parameters ---------- cloud : numpy.ndarray The point cloud to be denoised. Matrix containing (x, y, z) coordinates of the points. res_ground : float (x, y, z) voxel resolution in meters. Defaults to 0.15. min_points : int Clusters with size smaller than this value will be regarded as noise and thus eliminated from the point cloud. Defaults to 2. Returns ------- clust_cloud : numpy.ndarray The denoised point cloud. Matrix containing (x, y, z) coordinates of the denoised points. """ vox_cloud, vox_to_cloud_ind, _ = voxelate(cloud, res_ground, res_ground, with_n_points=False) # Cluster labels are appended to the FILTERED cloud. They map each point to # the cluster they belong to, according to the clustering algorithm. eps = res_ground * math.sqrt(3) + 1e-6 cluster_labels = DBSCAN_clustering(vox_cloud, eps=eps, min_samples=min_points) cloud_labs = np.append( cloud, np.expand_dims(cluster_labels[vox_to_cloud_ind], axis=1), axis=1, ) # Set of all cluster labels and their cardinality: cluster_id = {1,...,K}, # K = 'number of points'. cluster_id, K = np.unique(cluster_labels, return_counts=True) # Filtering of labels associated only to clusters that contain a minimum # number of points. # ID = -1 is always created by DBSCAN to include points that were not # included in any cluster. large_clusters = cluster_id[(K > min_points) & (cluster_id != -1)] # Removing the points that are not in valid clusters. clust_cloud = cloud_labs[np.isin(cloud_labs[:, -1], large_clusters), :3] return clust_cloud
# ----------------------------------------------------------------------------- # classify_ground # -----------------------------------------------------------------------------
[docs] def generate_dtm( cloud, bSloopSmooth=True, cloth_resolution=0.5, ): """This function takes a point cloud and generates a Digital Terrain Model (DTM) based on its ground. It's based on 'Cloth Simulation Filter' by W. Zhang et al., 2016 (http://www.mdpi.com/2072-4292/8/6/501/htm), which is implemented in CSF package. This function just implements it in a convenient way for this use-case. Parameters ---------- cloud : numpy.ndarray The point cloud. Matrix containing (x, y, z) coordinates of the points. bSloopSmooth : Boolean The resulting DTM will be smoothed. Refer to CSF documentation. Defaults to True. cloth_resolution : float The resolution of the cloth grid. Refer to CSF documentation. Defaults to 0.5. Returns ------- cloth_nodes : numpy.ndarray Matrix containing (x, y, z) coordinates of the DTM points. """ ### Cloth simulation filter ### csf = CSF() # initialize the csf ### parameter settings ### csf.params.smooth_slope = bSloopSmooth csf.params.cloth_resolution = cloth_resolution csf.params.verbose = True # csf.params.rigidness # 1, 2 or 3 csf.set_point_cloud(cloud) # pass the (x), (y), (z) list to csf raw_nodes = csf.run_cloth_simulation() # do actual filtering and export cloth cloth_nodes = np.reshape(np.array(raw_nodes), (-1, 3)) return cloth_nodes
# ----------------------------------------------------------------------------- # clean_cloth # -----------------------------------------------------------------------------
[docs] def clean_cloth(dtm_points): """This function takes a Digital Terrain Model (DTM) and denoises it. This denoising is done via a 2 MADs criterion from the median height value of a neighborhood of size 15. Parameters ---------- dtm_points : numpy.ndarray Matrix containing (x, y, z) coordinates of the DTM points. Returns ------- clean_points : numpy.ndarray Matrix containing (x, y, z) coordinates of the denoised DTM points. """ if dtm_points.shape[0] < 15: raise ValueError("input DTM is too small (less than 15 points). Denoising cannot be done.") if dtm_points.shape[0] == 15: warnings.warn( "input DTM contains exactly 15 points, which is the minimum input size accepted by clean_cloth().", stacklevel=2, ) tree = KDTree(dtm_points[:, :2]) _, indexes = tree.query(dtm_points[:, :2], 15, workers=-1) abs_devs = np.abs(dtm_points[:, 2] - np.median(dtm_points[:, 2][indexes], axis=1)) mads = np.median(abs_devs) clean_points = dtm_points[abs_devs <= 2 * mads] return clean_points
# ----------------------------------------------------------------------------- # complete_dtm # -----------------------------------------------------------------------------
[docs] def complete_dtm(dtm_points): """This function uses scipy.interpolate.griddata to interpolate the missing values in a Digital Terrain Model (DTM). Parameters ---------- dtm_points : numpy array Matrix containing (x, y, z) coordinates of the DTM points. Returns ------- completed_dtm : numpy.ndarray Matrix containing (x, y, z) coordinates of the completed DTM points. """ # Separate x, y, z coordinates x = dtm_points[:, 0] y = dtm_points[:, 1] z = dtm_points[:, 2] # Generate a grid of points based on min x, y values xi = np.linspace(min(x), max(x), 100) yi = np.linspace(min(y), max(y), 100) xi, yi = np.meshgrid(xi, yi) # Interpolate missing values using griddata zi = griddata((x, y), z, (xi, yi), method="cubic") # Combine interpolated points with existing points completed_dtm = np.hstack((xi.reshape(-1, 1), yi.reshape(-1, 1), zi.reshape(-1, 1))) # Remove nan values which may arise from interpolation completed_dtm = completed_dtm[~np.isnan(completed_dtm).any(axis=1)] return completed_dtm
# ----------------------------------------------------------------------------- # normalize_heights # -----------------------------------------------------------------------------
[docs] def normalize_heights(cloud, dtm_points): """This function takes a point cloud and a Digital Terrain Model (DTM) and normalizes the heights of the first based on the second. Parameters ---------- cloud : numpy.ndarray Matrix containing (x, y, z) coordinates of the point cloud. dtm_points : numpy array Matrix containing (x, y, z) coordinates of the DTM points. Returns ------- zs_diff_triples : numpy.ndarray Vector containing the normalized height values for the cloud points. """ tree = KDTree(dtm_points[:, :2]) d, idx_pt_mesh = tree.query(cloud[:, :2], 3, workers=-1) # Z point cloud - Z dtm (Weighted average, based on distance) zs_diff_triples = cloud[:, 2] - np.average(dtm_points[:, 2][idx_pt_mesh], weights=d, axis=1) return zs_diff_triples
# ----------------------------------------------------------------------------- # check_normalization() # -----------------------------------------------------------------------------
[docs] def check_normalization_discrepancy(cloud, original_area, res_xy=1.0, z_min=-0.1, z_max=0.15, warning_thresh=0.1): """Compare the area of a slice of points from a point cloud to another area and return a warning indicator if difference is greater than a certain threshold. The percentage of discrepancy between the too area is also returned. Area of the slice will be approximated from a voxelated version of it. Parameters ---------- cloud : numpy.ndarray A 2D numpy array storing the point cloud. It must be a normalized point cloud. original_area : float Area to compare with. res_xy : float (x, y) voxel resolution. Defaults to 1.0 m. z_min: float The minimum Z value that defines the slice. Defaults to -0.10 m. z_max: float The maximum Z value that defines the slice. Defaults to 0.15 m. warning_thresh: float Threshold area difference. Defaults to 0.1 (10 % difference in area). Returns ------- area_warning : bool True if area difference is greater than threshold, False if not. difference_percentage: The percentage of discrepancy between the original area and the slice area. """ # (z) voxel resolution. if z_min > z_max: raise ValueError("z_min must be smaller than z_max") if z_min == z_max: raise ValueError("z_min and z_max must be different") # original_area if original_area <= 0: raise ValueError("Original area to compare with must be positive") # warning_threshold if not 0 < warning_thresh < 1: raise ValueError("warning_thresh must be larger than 0 and smaller than 1") # Compute the z resolution as a function of z_max - z_min res_z = (z_max - z_min) * 1.01 # Select a slice of points from the cloud where Z value is within (z_min, z_max) ground_slice = cloud[(cloud[:, 2] >= z_min) & (cloud[:, 2] <= z_max)] # Voxelate the slice and store only cloud_to_vox_ind output for efficiency _, _, voxelated_slice = voxelate(ground_slice, res_xy, res_z, with_n_points=False, verbose=True) # Area of the voxelated ground slice (n of voxels * area of voxel base) slice_area = voxelated_slice.shape[0] * res_xy**2 # Calculate difference in area that breaks the threshold threshold_difference = warning_thresh * original_area # Calculate the absolute difference between the two areas area_difference = abs(original_area - slice_area) # TODO: In very rare occasions, the slice area could be larger than the original # area. The function should account for that, and return a different kind of # warning for those situations (and its threshold could be different). # For instance, if the original area has been computed through a grid of voxels # (as this function does to compute slice_area) using a smaller voxel size, # this could happen. We haven't tested it yet as we do not have access # to any point clouds where this situation happens. # Check if the difference is greater than 10 % of the first number area_warning = area_difference >= threshold_difference return area_warning, area_difference * 100 / original_area
[docs] def check_normalization(cloud, original_area, res_xy=1.0, z_min=-0.1, z_max=0.15, warning_thresh=0.1): """Compare the area of a slice of points from a point cloud to another area and store a warning indicator if difference is greater than a certain threshold. Area of the slice will be approximated from a voxelated version of it. This function is kept for backward compatibility and call check_normalization_discrepancy under the hood. Parameters ---------- cloud : numpy.ndarray A 2D numpy array storing the point cloud. It must be a normalized point cloud. original_area : float Area to compare with. res_xy : float (x, y) voxel resolution. Defaults to 1.0 m. z_min: float The minimum Z value that defines the slice. Defaults to -0.10 m. z_max: float The maximum Z value that defines the slice. Defaults to 0.15 m. warning_thresh: float Threshold area difference. Defaults to 0.1 (10 % difference in area). Returns ------- area_warning : bool True if area difference is greater than threshold, False if not. """ indicator, _ = check_normalization_discrepancy(cloud, original_area, res_xy, z_min, z_max, warning_thresh) return indicator