Source code for dendromatics.stripe

import math
import timeit

import numpy as np
import pgeof
from pgeof import EFeatureID

from .primitives.clustering import DBSCAN_clustering
from .primitives.voxel import voxelate

# -----------------------------------------------------------------------------
# verticality_clustering_iteration
# -----------------------------------------------------------------------------


[docs] def verticality_clustering_iteration( stripe, vert_scale, vert_threshold, n_points, resolution_xy, resolution_z, n_digits, ): """This function is to be used internally by verticality_clustering. The intended use of this function is to accept a stripe as an input, defined this as a subset of the original cloud delimited by a lower height and an upper height, which will narrow down a region where it is expected to only be stems. Then it will voxelate those points and compute the verticality via compute_features() from pgeof It will filter points based on their verticality value, voxelate again and then cluster the remaining points. Those are expected to belong to stems. Parameters ---------- stripe : numpy.ndarray The point cloud containing the stripe. It is expected to have X, Y, Z0 fields. 3D or higher array containing data with `float` type. vert_scale : float Scale to be used during verticality computation to define a neighborhood around a given point. Verticality will be computed from the structure tensor of said neighborhood via eigen-decomposition. vert_threshold : float Minimum verticality value associated to a point to consider it as part of a stem. n_points : int Minimum number of points in a cluster for it to be considered as a potential stem. resolution_xy : float (x, y) voxel resolution. resolution_z : float (z) voxel resolution. n_digits : int Number of digits dedicated to each coordinate ((x), (y) or (z)) during the generation of each point code. Returns ------- clust_stripe: numpy.ndarray Point cloud containing the points from the stripe that are considered as stems. It consists of 4 columns: (x), (y) and (z) coordinates, and a 4th column containing the cluster ID of the cluster that each point belongs to. t1 : float Time spent. """ t = timeit.default_timer() print(" -Computing verticality...") # Call to 'voxelate' function to voxelate the cloud. voxelated_stripe, vox_to_stripe_ind, _ = voxelate( stripe, resolution_xy, resolution_z, n_digits, with_n_points=False ) # Computation of verticality values associated to voxels using # 'compute_features' function. It needs a vicinity radius, provided by # 'vert_scale'. # use a large max_knn like the one used by jakteristics (it could be lowered) vert_values = pgeof.compute_features_selected(voxelated_stripe, vert_scale, 50000, [EFeatureID.Verticality]) elapsed = timeit.default_timer() - t print(" %.2f" % elapsed, "s") t1 = elapsed # Verticality values are appended to the ORIGINAL cloud, using voxel-to- # original-cloud indexes. vert_stripe = np.hstack((stripe, vert_values[vox_to_stripe_ind])) # Filtering of points that were in voxels whose verticality value is under # the threshold. Output is a filtered cloud. filt_stripe = vert_stripe[vert_stripe[:, -1] > vert_threshold] # Check there are enough points to continue if filt_stripe.shape[0] == 0: raise ValueError( "No vertical clusters where found with these parameters." "Suggestion: decrease n_points/voxel size or verticality " "threshold." ) t = timeit.default_timer() print(" -Clustering...") # The filtered cloud is voxelated. vox_filt_stripe, vox_to_filt_stripe_ind, _ = voxelate( filt_stripe, resolution_xy, resolution_z, n_digits, with_n_points=False ) eps = resolution_xy * math.sqrt(3) + 1e-6 # Clusterization of the voxelated cloud obtained from the filtered cloud. cluster_labels = DBSCAN_clustering(vox_filt_stripe, eps=eps, min_samples=2) # Set of all cluster labels and their cardinality: cluster_id = {1,...,K}, # K = 'number of clusters'. cluster_id, K = np.unique(cluster_labels, return_counts=True) # Raise error if there's only one cluster id (-1) if len(cluster_id) == 1 and cluster_id[0] == -1: raise ValueError( "No stems were found with the current configuration. Suggestion: decrease n_points/voxel size." ) elapsed = timeit.default_timer() - t print(" %.2f" % elapsed, "s") t1 = elapsed + t1 t = timeit.default_timer() print(" -Extracting 'candidate' stems...") # Cluster labels are appended to the FILTERED cloud. They map each point to # the cluster they belong to, according to the clustering algorithm. vox_filt_lab_stripe = np.append( filt_stripe, np.expand_dims(cluster_labels[vox_to_filt_stripe_ind], axis=1), axis=1, ) # Filtering of labels associated only to clusters that contain a minimum # number of points. # Moreover, ID = -1 is always created by DBSCAN to include points # that were not included in any cluster. large_clusters = cluster_id[(K > n_points) & (cluster_id != -1)] # Raise error if there are no large clusters. if large_clusters.size == 0: raise ValueError( "Clusters were found, but they are too small to be considered potential " "stems using current settings. Suggestion: decrease n_points." ) # Removing the points that are not in valid clusters. clust_stripe = vox_filt_lab_stripe[np.isin(vox_filt_lab_stripe[:, -1], large_clusters)] n_clusters = large_clusters.shape[0] elapsed = timeit.default_timer() - t print(" %.2f" % elapsed, "s") t1 = elapsed + t1 print(" %.2f" % t1, "s per iteration") print(" ", n_clusters, " clusters") return clust_stripe, t1
# ----------------------------------------------------------------------------------------------------------------------------------- # verticality_clustering # ----------------------------------------------------------------------------------------------------------------------------------------
[docs] def verticality_clustering( stripe, scale=0.1, vert_threshold=0.7, n_points=1000, n_iter=2, resolution_xy=0.02, resolution_z=0.02, n_digits=5, ): """This function implements a for loop that iteratively calls verticality_clustering_iteration, 'peeling off' the stems. Parameters ---------- stripe : numpy.ndarray The point cloud containing the stripe. It is expected to have X, Y, Z0 fields. 3D or higher array containing data with `float` type. scale : float Scale to be used during verticality computation to define a neighborhood around a given point. Verticality will be computed from the structure tensor of said neighborhood via Eigendecomposition. Defaults to 0.1. vert_threshold : float Minimum verticality value associated to a point to consider it as part of a stem. Defaults to 0.7. n_points : int Minimum number of points in a cluster for it to be considered as a potential stem. Defaults to 1000. n_iter : int Number of iterations of 'peeling'. Defaults to 2. resolution_xy : float (x, y) voxel resolution. Defaults to 0.02. resolution_z : float (z) voxel resolution. Defaults to 0.02. n_digits : int Number of digits dedicated to each coordinate ((x), (y) or (z)) during the generation of each point code. Defaults to 5. Returns ------- clust_stripe : numpy.ndarray Point cloud containing the points from the stripe that are considered as stems. It consists of 4 columns: (x), (y) and (z) coordinates, and a 4th column containing the cluster ID of the cluster that each point belongs to. """ # This first if loop is just a fix that allows to compute everything # ignoring verticality. It should be addressed as it currently computes # verticality when n_iter = 0 and that should not happen (although, in # practice, n_iter should never be 0). It does not provide wrong results # but it slows down the process needlessly. if n_iter == 0: n_iter = 1 vert_threshold = 0 # Basically, use verticality_clustering as many times as defined by n_iter aux_stripe = stripe total_t = 0 for i in np.arange(n_iter): print("Iteration number", i + 1, "out of", n_iter) clust_stripe, t = verticality_clustering_iteration( aux_stripe, scale, vert_threshold, n_points, resolution_xy, resolution_z, n_digits ) aux_stripe = clust_stripe total_t = total_t + t print("Final:") print("%.2f" % total_t, "s in total (whole process)") return clust_stripe