Source code for dendromatics.ground

import warnings

import CSF
import numpy as np
from scipy.interpolate import griddata
from scipy.spatial import KDTree
from sklearn.cluster import DBSCAN

from .voxel.voxel import *

# -----------------------------------------------------------------------------
# clean_ground
# -----------------------------------------------------------------------------


[docs] def clean_ground(cloud, res_ground=0.15, min_points=2): """This function takes a point cloud and denoises it via DBSCAN clustering. It first voxelates the point cloud, then it clusters the voxel cloud and excludes clusters of size less than the value determined by min_points. Parameters ---------- cloud : numpy.ndarray The point cloud to be denoised. Matrix containing (x, y, z) coordinates of the points. res_ground : float (x, y, z) voxel resolution in meters. Defaults to 0.15. min_points : int Clusters with size smaller than this value will be regarded as noise and thus eliminated from the point cloud. Defaults to 2. Returns ------- clust_cloud : numpy.ndarray The denoised point cloud. Matrix containing (x, y, z) coordinates of the denoised points. """ vox_cloud, vox_to_cloud_ind, _ = voxelate( cloud, res_ground, res_ground, with_n_points=False ) # Cluster labels are appended to the FILTERED cloud. They map each point to # the cluster they belong to, according to the clustering algorithm. eps = res_ground * 1.75 clustering = DBSCAN(eps=eps, min_samples=min_points).fit(vox_cloud) cloud_labs = np.append( cloud, np.expand_dims(clustering.labels_[vox_to_cloud_ind], axis=1), axis=1 ) # Set of all cluster labels and their cardinality: cluster_id = {1,...,K}, # K = 'number of points'. cluster_id, K = np.unique(clustering.labels_, return_counts=True) # Filtering of labels associated only to clusters that contain a minimum # number of points. large_clusters = cluster_id[K > min_points] # ID = -1 is always created by DBSCAN() to include points that were not # included in any cluster. large_clusters = large_clusters[large_clusters != -1] # Removing the points that are not in valid clusters. clust_cloud = cloud_labs[np.isin(cloud_labs[:, -1], large_clusters), :3] return clust_cloud
# ----------------------------------------------------------------------------- # classify_ground # -----------------------------------------------------------------------------
[docs] def generate_dtm( cloud, bSloopSmooth=True, cloth_resolution=0.5, classify_threshold=0.1, ): """This function takes a point cloud and generates a Digital Terrain Model (DTM) based on its ground. It's based on 'Cloth Simulation Filter' by W. Zhang et al., 2016 (http://www.mdpi.com/2072-4292/8/6/501/htm), which is implemented in CSF package. This function just implements it in a convenient way for this use-case. Parameters ---------- cloud : numpy.ndarray The point cloud. Matrix containing (x, y, z) coordinates of the points. bSloopSmooth : Boolean The resulting DTM will be smoothed. Refer to CSF documentation. Defaults to True. cloth_resolution : float The resolution of the cloth grid. Refer to CSF documentation. Defaults to 0.5. classify_threshold : float The height threshold used to classify the point cloud into ground and non-ground parts. Refer to CSF documentation. Defaults to 0.1. Returns ------- cloth_nodes : numpy.ndarray Matrix containing (x, y, z) coordinates of the DTM points. """ ### Cloth simulation filter ### csf = CSF.CSF() # initialize the csf ### parameter settings ### csf.params.bSloopSmooth = bSloopSmooth csf.params.cloth_resolution = cloth_resolution # csf.params.rigidness # 1, 2 or 3 csf.params.classify_threshold = classify_threshold # default is 0.5 m csf.setPointCloud(cloud) # pass the (x), (y), (z) list to csf raw_nodes = csf.do_cloth_export() # do actual filtering and export cloth cloth_nodes = np.reshape(np.array(raw_nodes), (-1, 3)) return cloth_nodes
# ----------------------------------------------------------------------------- # clean_cloth # -----------------------------------------------------------------------------
[docs] def clean_cloth(dtm_points): """This function takes a Digital Terrain Model (DTM) and denoises it. This denoising is done via a 2 MADs criterion from the median height value of a neighbourhood of size 15. Parameters ---------- dtm_points : numpy.ndarray Matrix containing (x, y, z) coordinates of the DTM points. Returns ------- clean_points : numpy.ndarray Matrix containing (x, y, z) coordinates of the denoised DTM points. """ if dtm_points.shape[0] < 15: raise ValueError( "input DTM is too small (less than 15 points). Denoising cannot be done." ) elif dtm_points.shape[0] == 15: warnings.warn( "input DTM contains exactly 15 points, which is the minimum input size" "accepted by clean_cloth().", stacklevel=2, ) tree = KDTree(dtm_points[:, :2]) _, indexes = tree.query(dtm_points[:, :2], 15, workers=-1) abs_devs = np.abs(dtm_points[:, 2] - np.median(dtm_points[:, 2][indexes], axis=1)) mads = np.median(abs_devs) clean_points = dtm_points[abs_devs <= 2 * mads] return clean_points
# ----------------------------------------------------------------------------- # complete_dtm # -----------------------------------------------------------------------------
[docs] def complete_dtm(dtm_points): """This function uses scipy.interpolate.griddata to interpolate the missing values in a Digital Terrain Model (DTM). Parameters ---------- dtm_points : numpy array Matrix containing (x, y, z) coordinates of the DTM points. Returns ------- completed_dtm : numpy.ndarray Matrix containing (x, y, z) coordinates of the completed DTM points. """ # Separate x, y, z coordinates x = dtm_points[:, 0] y = dtm_points[:, 1] z = dtm_points[:, 2] # Generate a grid of points based on min x, y values xi = np.linspace(min(x), max(x), 100) yi = np.linspace(min(y), max(y), 100) xi, yi = np.meshgrid(xi, yi) # Interpolate missing values using griddata zi = griddata((x, y), z, (xi, yi), method="cubic") # Combine interpolated points with existing points completed_dtm = np.hstack((xi.reshape(-1, 1), yi.reshape(-1, 1), zi.reshape(-1, 1))) # Remove nan values which may arise from interpolation completed_dtm = completed_dtm[~np.isnan(completed_dtm).any(axis=1)] return completed_dtm
# ----------------------------------------------------------------------------- # normalize_heights # -----------------------------------------------------------------------------
[docs] def normalize_heights(cloud, dtm_points): """This function takes a point cloud and a Digital Terrain Model (DTM) and normalizes the heights of the first based on the second. Parameters ---------- cloud : numpy.ndarray Matrix containing (x, y, z) coordinates of the point cloud. dtm_points : numpy array Matrix containing (x, y, z) coordinates of the DTM points. Returns ------- zs_diff_triples : numpy.ndarray Vector containing the normalized height values for the cloud points. """ tree = KDTree(dtm_points[:, :2]) d, idx_pt_mesh = tree.query(cloud[:, :2], 3, workers=-1) # Z point cloud - Z dtm (Weighted average, based on distance) zs_diff_triples = cloud[:, 2] - np.average( dtm_points[:, 2][idx_pt_mesh], weights=d, axis=1 ) return zs_diff_triples
# ----------------------------------------------------------------------------- # check_normalization() # -----------------------------------------------------------------------------
[docs] def check_normalization( cloud, original_area, res_xy=1.0, z_min=-0.1, z_max=0.15, warning_thresh=0.1 ): """Compare the area of a slice of points from a point cloud to another area and store a warning indicator if difference is greater than a certain threshold. Area of the slice will be approximated from a voxelated version of it. Parameters ---------- cloud : numpy.ndarray A 2D numpy array storing the point cloud. It must be a normalized point cloud. original_area : float Area to compare with. res_xy : float (x, y) voxel resolution. Defaults to 1.0 m. z_min: float The minimum Z value that defines the slice. Defaults to -0.10 m. z_max: float The maximum Z value that defines the slice. Defaults to 0.15 m. warning_thresh: float Threshold area difference. Defaults to 0.1 (10 % difference in area). Returns ------- area_warning : bool True if area difference is greater than threshold, False if not. """ # (z) voxel resolution. if z_min > z_max: raise ValueError("z_min must be smaller than z_max") elif z_min == z_max: raise ValueError("z_min and z_max must be different") else: res_z = (z_max - z_min) * 1.01 # Select a slice of points from the cloud where Z value is within (z_min, z_max) ground_slice = cloud[(cloud[:, 2] >= z_min) & (cloud[:, 2] <= z_max)] # Voxelate the slice and store only cloud_to_vox_ind output for efficiency _, _, voxelated_slice = voxelate( ground_slice, res_xy, res_z, with_n_points=False, silent=False ) # Area of the voxelated ground slice (n of voxels * area of voxel base) slice_area = voxelated_slice.shape[0] * res_xy**2 if original_area <= 0: raise ValueError("Original area to compare with must be positive") elif not 0 < warning_thresh < 1: raise ValueError("warning_thresh must be larger than 0 and smaller than 1") else: # Calculate difference in area that breaks the threshold threshold_difference = warning_thresh * original_area # Calculate the absolute difference between the two areas difference = original_area - slice_area # Check if the difference is greater than 10 % of the first number if difference >= threshold_difference: area_warning = True else: area_warning = False return area_warning