Source code for pymepix.clustering.cluster_stream

import numpy as np
[docs]class ClusterStream(): def __init__(self, dim=256, max_dist_tof=1e-8, min_cluster_size=3, tot_offset=0.5, *args, **kwargs): self.dim = dim self.max_dist_tof = max_dist_tof self.min_cluster_size = min_cluster_size self.tot_offset = tot_offset super(ClusterStream, self).__init__(*args, **kwargs)
[docs] def perform(self, data): temp_data = np.full((data.shape[0], data.shape[1] + 1), -1.0) temp_data[:, :-1] = data data = temp_data image = np.full((self.dim + 6, self.dim + 6), -1) # Auf allen vier Seiten wird ein Rand von 3 hinzugefügt, damit Nachbarn immer abfragbar sind deltaPos = [ [(-1, 0), (1, 0), (0, -1), (0, 1)], [(-1, -1), (1, -1), (-1, 1), (1, 1)], [(-2, 0), (2, 0), (0, -2), (0, 2)], [(-2, 1), (-1, 2), (1, 2), (2, 1), (2, -1), (1, -2), (-1, -2), (-2, -1)], [(-2, 2), (2, 2), (2, -2), (-2, -2)], [(-3, 0), (0, 3), (3, 0), (0, -3)], [(-3, 1), (-1, 3), (1, 3), (3, 1), (3, -1), (1, -3), (-1, -3), (-3, -1)], [(-3, -2), (3, -2), (-3, 2), (3, 2), (-2, -3), (2, -3), (-2, 3), (2, 3)] ] # Gruppen von Nachbarschaften für Abstand <4 """Hauptalgorithmus: Die Datenpunkte werden nacheinander mit nahen Nachbarpunkten verglichen (effizient über map abfragbar). Dabei wird ein hinreichend naher Nachbarpunkt (Abstand kleiner als 4 Pixel) als Vorgänger markiert, wenn er der nächstliegende Nachbarpunkt ist, der zeitlich vor dem aktuellen Punkt und mit höherer Intensität gemessen wurde. Wenn es mehrere derartige Nachbarpunkte gibt, wird der zeitlich früheste gewählt. Ein Punkt, zu dem es keinen derartigen Vorgänger gibt, wird als Clusterentrum markiert (es wird der Punkt selbst als eigener Vorgänger eingetragen). Liegen zwei Clusterzentren sehr dicht beieinander (Abstand kleiner als 4 Pixel), werden diese zu einem Clusterzentrum zusammengefasst.""" for i in range(0, data.shape[0]): data[i, 4] = i curr = data[i] x, y, tof, tot, _ = curr x, y = int(x), int(y) image[x + 3, y + 3] = i prev = i # neighbClusters = [i] for grp in deltaPos: for pos in grp: i2 = image[x + pos[0] + 3, y + pos[1] + 3] if i2 >= 0: nb = data[i2] if (tof - nb[2] < self.max_dist_tof) and (tof - nb[2] >= 0) and (nb[3] > tot * self.tot_offset): # Der Nachbarpunkt muss 0 bis < max_dist_tof Einheiten jünger sein und eine höhere Intensität aufweisen # if (tof - nb[2] < self.max_dist_tof) and (tof - nb[2] >= 0): # Der Nachbarpunkt muss 0 bis < max_dist_tof Einheiten jünger sein und eine höhere Intensität aufweisen # if (tof - nb[2] < self.max_dist_tof) and (tof - nb[2] >= 0) and (nb[3] - tot >= 0): # Der Nachbarpunkt muss 0 bis < max_dist_tof Einheiten jünger sein und eine höhere Intensität aufweisen prev = min(prev, i2) # übernehmen, wenn bisheriger Vorgänger in der Liste später auftauchte # if (data[i2, 4] == i2) and (tof - nb[2] < self.max_dist_tof): # Merken von Clusterzentren in der Nähe # neighbClusters.append(i2) if prev < i: # while data[prev, 4] != data[int(data[prev, 4]), 4]: # data[prev, 4] = data[int(data[prev, 4]), 4] data[i, 4] = data[prev, 4] break # if prev == i: # neighbClusters.append[prev] # mn = min(data[neighbClusters][:,4]) # data[neighbClusters][:,4] = mn """Aufbereitung: Zu jedem Clusterzentrum wird ermittelt, wie viele Punkte das Cluster umfasst. Sind dies mindestens min_cluster_size viele, wird das Cluster einer neuen Liste "cleanedFinal" von hinreichend großen Clustern hinzugefügt.""" labels = data[:, 4] un, counts = np.unique(labels, return_counts=True) labels_with_cluster_size = np.column_stack((un, counts)) labels[np.isin(labels, labels_with_cluster_size[labels_with_cluster_size[:, 1] < self.min_cluster_size][:, 0].astype(int))] = 0 return labels