Source code for clustering

# /usr/bin/env python
import numpy as np
from numpy.random import default_rng
import scipy.optimize as opt
import matplotlib.pyplot as plt
import matplotlib.animation as ani
from math import *
import copy



        

[docs]def draw_profile(frame, history, center_history, k_clusters): """ Draws a profile plot of the data. Used for animation. Args: frame (int): index of the frame to draw history (list): list of systems at different times center_history (list): list of centers at different times k_clusters (int): number of clusters """ data = history[frame] centers = center_history[frame] styles = ['tab:orange','tab:green','tab:blue',\ 'tab:red','tab:purple','tab:brown','tab:cyan'] plt.clf() for point in data: plt.plot(point.coordinates,'-', color = styles[point.cluster%len(styles)], alpha=0.2) if centers != None: i = 0 for c in centers: plt.plot(c.coordinates,'-',lw=3,color='k') plt.plot(c.coordinates,'-',lw=2,color=styles[i%len(styles)]) i += 1
[docs]def draw_scatter(frame, history, center_history, k_clusters): """ Draws the data as a scatter plot. Used for animation. Args: frame (int): index of the frame to draw history (list): list of systems at different times center_history (list): list of centers at different times k_clusters (int): number of clusters """ data = history[frame] centers = center_history[frame] axes=[0,1] styles = ['tab:orange','tab:green','tab:blue',\ 'tab:red','tab:purple','tab:brown','tab:cyan'] plt.clf() ax = plt.axes() ax.set_aspect('equal') for c in range(k_clusters): matrix = data_to_matrix(data, c) plt.plot(matrix[axes[0],:], matrix[axes[1],:], 'o', \ color = styles[c%len(styles)]) for c in range(k_clusters): x = centers[c].coordinates[axes[0]] y = centers[c].coordinates[axes[1]] plt.plot(x,y,'o',markersize=3, \ color=styles[c%len(styles)], markeredgecolor='k')
[docs]def animate_scatter(history, center_history, k_clusters): """ Animates the simulation as a scatter plot. Args: history (list): list of systems at different times center_history (list): list of centers at different times k_clusters (int): number of clusters """ nframes = len(history) print("animating "+str(nframes)+" frames") fig = plt.figure() motion = ani.FuncAnimation(fig, draw_scatter, nframes, fargs=( history, center_history, k_clusters ) ) plt.show()
[docs]def animate_profile(history, center_history, k_clusters): """ Animates the simulation as a profile plot. Args: history (list): list of systems at different times center_history (list): list of centers at different times k_clusters (int): number of clusters """ nframes = len(history) print("animating "+str(nframes)+" frames") fig = plt.figure() motion = ani.FuncAnimation(fig, draw_profile, nframes, fargs=( history, center_history, k_clusters ) ) plt.show()
[docs]class Point: """ Class representing a data point. Each point has coordinates. It also stores the index of the cluster it belongs to. In test cases we may know, how the points should be divided to different classes. The index of the true class may also be recorded. Args: coordinates (array): coordinate vector true_cluster (int): index of the class the point really belongs to """ def __init__(self, coordinates, true_cluster=-1): self.coordinates = np.array( coordinates ) self.cluster = 0 self.true_cluster = true_cluster def __str__(self): if self.true_cluster >= 0: return "r = "+str(self.coordinates)+", c = "+str(self.true_cluster) else: return "r = "+str(self.coordinates)
[docs] def sq_distance(self, another_point): """ Calculates the squared distance between this point and another. Args: another_point (Point): the other point Returns: float: squared distance between the points """ sq_sum = 0.0 for x1, x2 in zip(self.coordinates, another_point.coordinates): sq_sum += (x1-x2)**2 return sq_sum
[docs]def find_clusters(data, centers): """ Allocates each data point to a cluster. The algorithm loops over all data points and checks for each one, which cluster center is the closest one. The index of the closest center is saved in point.cluster. Args: data (list): list of data points as :class:`Point` objects centers (list): list of cluster centers as :class:`Point` objects Returns: bool, float: True if any changes were made, sum of squares of point-center distances """ changed = False sq_sum = 0.0 # for each data point, find the closest center for point in data: # The point should already be assigned to some cluster. # We calculate the distance (squared) to this center as a reference. min_dist = point.sq_distance(centers[point.cluster]) # check all cluster centers in case one of them # is even closer for cluster_index in range(len(centers)): sq_dist = point.sq_distance(centers[cluster_index]) # If a shorter distance is found, the point # becomes part of this new cluster instead. if sq_dist < min_dist: min_dist = sq_dist # save a reference to the new cluster point.cluster = cluster_index # We save the fact a change has been made. changed = True # also calculate the sum of squares of all # point - cluster center distances sq_sum += min_dist return changed, sq_sum
[docs]def calculate_cluster_centers(data, k_clusters): """ Calculates cluster center coordinates. The center of a cluster is calculated simply as the average of all the coordinate vectors of the points that belong to the said cluster. Mathematically, if we denote the set of points in cluster :math:`j` as :math:`C_j`, the center is calculated as the sum .. math:: \\vec{c}_j = \\frac{1}{n_j} \\sum_{\\vec{p} \\in C_j} \\vec{p} where :math:`n_j` is the number of points in cluster :math:`j`. Args: data (list): list of data points as :class:`Point` objects k_clusters (int): number of clusters Returns: list: list of cluster centers as :class:`Point` objects """ centers = [None]*k_clusters dimension = len(data[0].coordinates) # loop over all clusters for cluster_index in range(k_clusters): # to calculate the average of all coordinates, # we first calculate the component-wise sum # over the positions of all data points in the cluster cluster_sum = np.zeros(dimension) cluster_size = 0 # loop over all data points and search for those # belonging to the current cluster for point in data: if point.cluster == cluster_index: # the point belongs in this cluster cluster_size += 1 # add the coordinates of this point to the sum cluster_sum += point.coordinates # average of the coordinates is obtained # by dividing the sum by the number of points cluster_avr = cluster_sum / cluster_size # save the new center coordinates as a Point object centers[cluster_index] = Point(cluster_avr) return centers
[docs]def k_means_clustering_algorithm(data, k_clusters, n_repeats=10): """ Divides the data points in k clusters. In k means clustering, data points are divided in k different categories, i.e., clusters. The algorithm assumes all data points can be represented as numeric vectors in (possibly very high dimensional) data space. :math:`\\vec{p} = [x_0, x_1, x_2, \\ldots, x_N]`. All the coordinates should also have similar scales, because the similarity of any two data points is measured by their Euclidian distance .. math :: d_{i,j} = |\\vec{p}_j - \\vec{p}_i| = \\sqrt{ \\sum_n^N (x_{n,j} - x_{n,i})^2 }. (If this is not the case, it can always be achieved by appropriate scaling.) No prior information about the categories is needed except their total number, :math:`k`. Initially, :math:`k` random points are chosen and their coordinates are used as the initial coordinates of the cluster centers, :math:`\\vec{c}`. Then these steps are repeated: * Each point :math:`\\vec{p}_i` is assigned to the cluster whose center :math:`\\vec{c}_j` is the closest. This is done with :meth:`find_clusters`. * If every point remains in the same cluster where it was before, we have reached a stable solution and the run ends. If any changes have been made, we continue. * New center coordinates are calculated for each cluster as the average of the coordinates of the points that belong to the cluster, This is done with :meth:`calculate_cluster_centers`. The result is a self-consistent solution, where each point belongs to its nearest cluster and each cluster is centered in the center of its points. The solution is typically not unique though. Therefore the algorithm is run several times from different, randomly chosen initial clusters, and the best solution is picked. Ranking is done according to the sum of squared distances between data points and cluster centers. .. note :: This function is incomplete! Args: data (list): list of data points as :class:`Point` objects k_clusters (int): number of clusters n_repeats (int): number of tries Returns: list, list, list: cluster centers as :class:`Point` objects, evolution of data points, evolution of centers """ best_centers = [] best_distance = 0 sq_sum = 0 best_history = [] best_center_history = [] # the cluster division is not unique # so we run the algorithm several times and # pick the best solution in the end for tried in range(n_repeats): centers = [] history = [] center_history = [] # create an initial guess for cluster centers by # randomly choosing k data points for ind in range(k_clusters): random_point = random.choice(data) centers.append(Point(random_point.coordinates)) if best_distance == 0: changed, best_distance = find_clusters(data, centers) # repeatedly search for clusters until convergence to equilibrium converged = False while (not converged): # assign each point to the cluster whose # center is nearest to the point # # use function find_clusters() here # save the squared sum of distances that function calculates # in the variable sq_sum # todo if(changed): # if any point was moved from one cluster to another, # all cluster centers need to be recalculated # # use the function calculate_cluster_centers() # todo history.append( copy.deepcopy(data) ) center_history.append( copy.copy(centers) ) # report progress - uncomment lower line if you want more info print_progress(tried+1, n_repeats) # print( "run done, square sum = {val:.3f}, best = {best:.3f}".format(val=sq_sum,best=best_distance) ) # check if this is the best solution and save it if it was if sq_sum < best_distance: best_distance = sq_sum best_centers = centers best_history = copy.deepcopy( history ) best_center_history = copy.deepcopy( center_history ) # reset cluster indices according to the best solution changed, sq_sum = find_clusters(data, best_centers) print( "cluster analysis done, final square sum = {val:.3f}".format(val=best_distance) ) return best_centers, best_history, best_center_history
[docs]def data_to_matrix(data, cluster = None): """ Creates an array containing data point coordinates. The coordinates are saved in a matrix where matrix[i,j] is the jth coordinate of the ith point. Args: data (list): list of data points as :class:`Point` objects cluster (int): If None, all points are included. If a number, only the points in that cluster are included. Returns: array: coordinates as a matrix """ dimension = len(data[0].coordinates) matrix = [] for i in range(dimension): matrix.append([]) for point in data: if cluster is None or cluster == point.cluster: for i in range(dimension): matrix[i].append(point.coordinates[i]) return np.array(matrix)
[docs]def read_data(filename, data_cols=0, categories=0, delimiter=None): """ Reads data from a file. Each line in the file should define one data point by listing its coordinates. By default, the coordinates should be separated by spaces. If a delimiter is given, it is used as the separator instead. Also by default, each column is assumed to represent a coordinate. If data_cols is given, each line is assumed to hold that many columns of coordinate data followed by columns specifying the true class of the point. The classification is given as columns of zeros and ones so that a one marks the class of the point. The file could thus read, e.g. .. code-block:: 1.2 3.4 5.6 1 0 0 1.3 5.7 9.1 0 0 1 ... Where the first point would be of of the first class (index 0) and the second of the third class (index 2). Args: filename (str): name of file to read data_cols (int): Number of data columns, i.e., dimensionality of the data. Only give this argument if the file also contains category data. categories (int): number of known categories delimiter (str): delimiter, if something else than whitespace. Returns: list: generated data as :class:`Point` objects """ try: if delimiter is None: matrix = np.genfromtxt(filename) else: matrix = np.genfromtxt(filename,delimiter=delimiter) except: print("could not read "+filename) data = [] if data_cols == 0: data_cols = matrix.shape[1] categories = 0 for i in range(matrix.shape[0]): coords = matrix[i,:data_cols] if categories > 0: category = -1 for j in range(categories): if matrix[i,data_cols+j] == 1: category = j data.append(Point( coords, category )) else: data.append(Point( coords )) return data
[docs]def create_data(n_centers, dimensions=2): """ Creates a random set of data points. The method randomly draws :math:`k` center points from :math:`[0,10]^d`, and then 100 - 300 normally distributed data points around them. Args: n_centers (int): number of clusters to create, :math:`k` dimensions (int): dimensionality of the data to create, :math:`d` Returns: list: generated data as :class:`Point` objects """ data = [] centers = [] print( "centers in the generated data: " ) for i in range(n_centers): c = 10.0*random.random(size=dimensions) centers.append(c) print( " center ",i,": ", c ) type = -1 for center in centers: type += 1 for i in range(random.integers(100,300)): p = center + random.standard_normal(size = dimensions) data.append(Point(p, type)) return data
[docs]def check_results(data, k): """ Prints performance report for a test case where the correct groups are known. Performance is evaluated in two ways. First, the method checks each cluster found by the k means algorithm and counts the number of points from each group in that cluster. The group with most points is declared to be the dominant group of that cluster and all the points belonging to that group are evaluated as correctly grouped while the rest are incorrectly grouped. The homogeneity score is calculated as the number of correctly grouped points divided by the total number of points. This score is 100 %, if each cluster contains points only from one group. Secondly, the method checks each known group and counts the number of the points of that group in each cluster found by k means. The cluster with the most points is declared to be the dominant cluster of that group, and points in that clsuter are defined to be correctly clustered. The connectedness score is calculated as the number of correctly clustered points divided by the total number. This score is 100 %, if the points of each group is contained in a single cluster. For a perfect score, each cluster should contain exactly all the points from a single group. This is possible only if number of clusters k is equal to the true number of groups in the data. Decreasing the number of clusters k tends to make the connectedness score better and the homogeneity score worse. (For :math:`k=1`, connectedness would be trivially 100 % because all points would be in the same cluster.) Increasing k tends to make homogeneity better and connectedness worse. (If k would equal the number of points, each point would be its own cluster making homogeneity 100 % and connectedness close to 0 %.) Args: data (list): list of data points as :class:`Point` objects k (int): number of clusters assumed by the algorithm """ n_ok = 0 n_tot = len(data) n_cat = -1 # check the number of different true categories for p in data: if p.true_cluster > n_cat: n_cat = p.true_cluster # if categories are not known, just report the cluster sizes if n_cat < 0: for i in range(k): n = 0 for p in data: if p.cluster == i: n += 1 print("Points in cluster "+str(i)+": "+str(n)+" / "+str(n_tot)+" = "+str( int( 100*n/n_tot ))+"%") return n_cat += 1 # check the found clusters print("") print("Comparing results against known classes.") print("") print("Checking homogeneity of each found cluster...") for i in range(k): print("Cluster "+str(i)+": ") i_ok = 0 i_tot = 0 categories = np.zeros(n_cat) for p in data: if p.cluster == i: i_tot += 1 categories[ p.true_cluster ] += 1 print( " Distribution of classes:", categories ) max = 0 max_i = 0 for j in range(n_cat): if categories[j] > max: max_i = j max = categories[j] for p in data: if p.cluster == i: if p.true_cluster == max_i: n_ok += 1 i_ok += 1 print(" Dominant class: "+str(max_i)+".") print(" Points from this class: "+str(i_ok)+" / "+str(i_tot)+" = "+str( int( 100*i_ok/i_tot ))+"%" ) n2_ok = 0 # check the true categories print("") print("Checking connectedness of each known class...") for i in range(n_cat): print("Class "+str(i)+": ") i_ok = 0 i_tot = 0 clusters = np.zeros(k) for p in data: if p.true_cluster == i: i_tot += 1 clusters[ p.cluster ] += 1 print( " Distribution of clusters:", clusters ) max = 0 max_i = 0 for j in range(k): if clusters[j] > max: max_i = j max = clusters[j] for p in data: if p.true_cluster == i: if p.cluster == max_i: i_ok += 1 n2_ok += 1 print(" Dominant cluster: "+str(max_i)+".") print(" Points in this cluster: "+str(i_ok)+" / "+str(i_tot)+" = "+str( int( 100*i_ok/i_tot ))+"%" ) print("") print("Clusters vs. classes: "+str(k)+" vs. "+str(n_cat)) print("Total homogeneity score: "+str(n_ok)+"/"+str(n_tot)+" = "+str( int( 100*n_ok/n_tot ))+"%" ) print("Total connectedness score: "+str(n2_ok)+"/"+str(n_tot)+" = "+str( int( 100*n2_ok/n_tot ))+"%" )
[docs]def scatter_plot(data, k_clusters, axes=[0,1], centers=None): """ Plots the data as a 2D scatter plot. The function assumes the data is sorted in k_clusters categories and it plots each cluster in a different color. Only two coordinates are plotted even if the data is high dimensional. The coordinates to plot can be chosen with the axis parameter, which must contain the indices of the coordinates to plot. If cluster centers are given, they are also plotted as dots with a black outline. Args: data (list): list of data points as :class:`Point` objects k_clusters (int): number of clusters axes (array): indices of the coordinates to plot centers (list): cluster center coordinates """ styles = ['tab:orange','tab:green','tab:blue',\ 'tab:red','tab:purple','tab:brown','tab:cyan'] plt.clf() ax = plt.axes() ax.set_aspect('equal') for c in range(k_clusters): matrix = data_to_matrix(data, c) plt.plot(matrix[axes[0],:], matrix[axes[1],:], 'o', \ color = styles[c%len(styles)]) if centers != None: for c in range(k_clusters): x = centers[c].coordinates[axes[0]] y = centers[c].coordinates[axes[1]] plt.plot(x,y,'o',markersize=3, \ color=styles[c%len(styles)], markeredgecolor='k') plt.show()
[docs]def profile_plot(data, k_clusters, centers=None): """ Plots the data as a series of line plots. Each line is drawn so that the x-coordinates are evenly spaced and the y-coordinates are the coordinates of the data point. If cluster centers are given, they are also plotted as lines with a black outline. Args: data (list): list of data points as :class:`Point` objects k_clusters (int): number of clusters centers (list): cluster center coordinates """ styles = ['tab:orange','tab:green','tab:blue',\ 'tab:red','tab:purple','tab:brown','tab:cyan'] for point in data: plt.plot(point.coordinates,'-', color = styles[point.cluster%len(styles)], alpha=0.2) if centers != None: i = 0 for c in centers: plt.plot(c.coordinates,'-',lw=3,color='k') plt.plot(c.coordinates,'-',lw=2,color=styles[i%len(styles)]) i += 1 plt.show()
if __name__ == "__main__": random = default_rng() # how many clusters to create? k_clusters = 4 # how many dimensions in the data? dimensions = 2 # create random data data = create_data(k_clusters, dimensions) # or try categorizing real data #data = read_data("questions.txt") #data = read_data("iris-fulldata.csv",4,3,",") # Run k means cluster analysis. k = 2 print( "assumed number of centers: ", k ) # run the analysis cluster_centers, history, center_history = k_means_clustering_algorithm(data, k) # print solution print( "final center coordinates: ") for point in cluster_centers: print( point.coordinates ) # if the classification is known, check results against it check_results(data, k) # plot solution scatter_plot(data, k, centers=cluster_centers) profile_plot(data, k, centers=cluster_centers) # animate solutions animate_scatter( history, center_history, k ) animate_profile( history, center_history, k )