# /usr/bin/env python
import numpy as np
from numpy.random import default_rng
import scipy.optimize as opt
import matplotlib.pyplot as plt
import matplotlib.animation as ani
from math import *
import copy
[docs]def print_progress(step, total):
"""
Prints a progress bar.
Args:
step (int): progress counter
total (int): counter at completion
"""
message = "simulation progress ["
total_bar_length = 60
percentage = int(step / total * 100)
bar_fill = int(step / total * total_bar_length)
for i in range(total_bar_length):
if i < bar_fill:
message += "|"
else:
message += " "
message += "] "+str(percentage)+" %"
if step < total:
print(message, end="\r")
else:
print(message)
[docs]def draw_profile(frame, history, center_history, k_clusters):
"""
Draws a profile plot of the data.
Used for animation.
Args:
frame (int): index of the frame to draw
history (list): list of systems at different times
center_history (list): list of centers at different times
k_clusters (int): number of clusters
"""
data = history[frame]
centers = center_history[frame]
styles = ['tab:orange','tab:green','tab:blue',\
'tab:red','tab:purple','tab:brown','tab:cyan']
plt.clf()
for point in data:
plt.plot(point.coordinates,'-',
color = styles[point.cluster%len(styles)],
alpha=0.2)
if centers != None:
i = 0
for c in centers:
plt.plot(c.coordinates,'-',lw=3,color='k')
plt.plot(c.coordinates,'-',lw=2,color=styles[i%len(styles)])
i += 1
[docs]def draw_scatter(frame, history, center_history, k_clusters):
"""
Draws the data as a scatter plot.
Used for animation.
Args:
frame (int): index of the frame to draw
history (list): list of systems at different times
center_history (list): list of centers at different times
k_clusters (int): number of clusters
"""
data = history[frame]
centers = center_history[frame]
axes=[0,1]
styles = ['tab:orange','tab:green','tab:blue',\
'tab:red','tab:purple','tab:brown','tab:cyan']
plt.clf()
ax = plt.axes()
ax.set_aspect('equal')
for c in range(k_clusters):
matrix = data_to_matrix(data, c)
plt.plot(matrix[axes[0],:], matrix[axes[1],:], 'o', \
color = styles[c%len(styles)])
for c in range(k_clusters):
x = centers[c].coordinates[axes[0]]
y = centers[c].coordinates[axes[1]]
plt.plot(x,y,'o',markersize=3, \
color=styles[c%len(styles)], markeredgecolor='k')
[docs]def animate_scatter(history, center_history, k_clusters):
"""
Animates the simulation as a scatter plot.
Args:
history (list): list of systems at different times
center_history (list): list of centers at different times
k_clusters (int): number of clusters
"""
nframes = len(history)
print("animating "+str(nframes)+" frames")
fig = plt.figure()
motion = ani.FuncAnimation(fig, draw_scatter, nframes, fargs=( history, center_history, k_clusters ) )
plt.show()
[docs]def animate_profile(history, center_history, k_clusters):
"""
Animates the simulation as a profile plot.
Args:
history (list): list of systems at different times
center_history (list): list of centers at different times
k_clusters (int): number of clusters
"""
nframes = len(history)
print("animating "+str(nframes)+" frames")
fig = plt.figure()
motion = ani.FuncAnimation(fig, draw_profile, nframes, fargs=( history, center_history, k_clusters ) )
plt.show()
[docs]class Point:
"""
Class representing a data point.
Each point has coordinates. It also stores the index of the cluster
it belongs to.
In test cases we may know, how the points should be divided to different classes.
The index of the true class may also be recorded.
Args:
coordinates (array): coordinate vector
true_cluster (int): index of the class the point really belongs to
"""
def __init__(self, coordinates, true_cluster=-1):
self.coordinates = np.array( coordinates )
self.cluster = 0
self.true_cluster = true_cluster
def __str__(self):
if self.true_cluster >= 0:
return "r = "+str(self.coordinates)+", c = "+str(self.true_cluster)
else:
return "r = "+str(self.coordinates)
[docs] def sq_distance(self, another_point):
"""
Calculates the squared distance between this point and another.
Args:
another_point (Point): the other point
Returns:
float: squared distance between the points
"""
sq_sum = 0.0
for x1, x2 in zip(self.coordinates, another_point.coordinates):
sq_sum += (x1-x2)**2
return sq_sum
[docs]def find_clusters(data, centers):
"""
Allocates each data point to a cluster.
The algorithm loops over all data points and checks for each one,
which cluster center is the closest one. The index of the closest center
is saved in point.cluster.
Args:
data (list): list of data points as :class:`Point` objects
centers (list): list of cluster centers as :class:`Point` objects
Returns:
bool, float: True if any changes were made, sum of squares of point-center distances
"""
changed = False
sq_sum = 0.0
# for each data point, find the closest center
for point in data:
# The point should already be assigned to some cluster.
# We calculate the distance (squared) to this center as a reference.
min_dist = point.sq_distance(centers[point.cluster])
# check all cluster centers in case one of them
# is even closer
for cluster_index in range(len(centers)):
sq_dist = point.sq_distance(centers[cluster_index])
# If a shorter distance is found, the point
# becomes part of this new cluster instead.
if sq_dist < min_dist:
min_dist = sq_dist
# save a reference to the new cluster
point.cluster = cluster_index
# We save the fact a change has been made.
changed = True
# also calculate the sum of squares of all
# point - cluster center distances
sq_sum += min_dist
return changed, sq_sum
[docs]def calculate_cluster_centers(data, k_clusters):
"""
Calculates cluster center coordinates.
The center of a cluster is calculated simply as the average of
all the coordinate vectors of the points that belong to the
said cluster.
Mathematically, if we denote the set of points in cluster :math:`j` as :math:`C_j`,
the center is calculated as the sum
.. math::
\\vec{c}_j = \\frac{1}{n_j} \\sum_{\\vec{p} \\in C_j} \\vec{p}
where :math:`n_j` is the number of points in cluster :math:`j`.
Args:
data (list): list of data points as :class:`Point` objects
k_clusters (int): number of clusters
Returns:
list: list of cluster centers as :class:`Point` objects
"""
centers = [None]*k_clusters
dimension = len(data[0].coordinates)
# loop over all clusters
for cluster_index in range(k_clusters):
# to calculate the average of all coordinates,
# we first calculate the component-wise sum
# over the positions of all data points in the cluster
cluster_sum = np.zeros(dimension)
cluster_size = 0
# loop over all data points and search for those
# belonging to the current cluster
for point in data:
if point.cluster == cluster_index:
# the point belongs in this cluster
cluster_size += 1
# add the coordinates of this point to the sum
cluster_sum += point.coordinates
# average of the coordinates is obtained
# by dividing the sum by the number of points
cluster_avr = cluster_sum / cluster_size
# save the new center coordinates as a Point object
centers[cluster_index] = Point(cluster_avr)
return centers
[docs]def k_means_clustering_algorithm(data, k_clusters, n_repeats=10):
"""
Divides the data points in k clusters.
In k means clustering, data points are divided in k different
categories, i.e., clusters.
The algorithm assumes all data points can be represented as numeric vectors
in (possibly very high dimensional) data space.
:math:`\\vec{p} = [x_0, x_1, x_2, \\ldots, x_N]`.
All the coordinates should also have similar scales, because the similarity
of any two data points is measured by their Euclidian distance
.. math ::
d_{i,j} = |\\vec{p}_j - \\vec{p}_i| = \\sqrt{ \\sum_n^N (x_{n,j} - x_{n,i})^2 }.
(If this is not the case, it can always be achieved by appropriate scaling.)
No prior information about the categories is needed except their total number, :math:`k`.
Initially, :math:`k` random points are chosen and their coordinates
are used as the initial coordinates of the cluster centers, :math:`\\vec{c}`.
Then these steps are repeated:
* Each point :math:`\\vec{p}_i` is assigned to the cluster
whose center :math:`\\vec{c}_j` is the closest.
This is done with :meth:`find_clusters`.
* If every point remains in the same cluster where it was before,
we have reached a stable solution and the run ends.
If any changes have been made, we continue.
* New center coordinates are calculated for each cluster as the
average of the coordinates of the points that belong to the cluster,
This is done with :meth:`calculate_cluster_centers`.
The result is a self-consistent solution, where each point belongs to its
nearest cluster and each cluster is centered in the center of its points.
The solution is typically not unique though. Therefore the algorithm is
run several times from different, randomly chosen initial clusters, and the
best solution is picked. Ranking is done according to the sum of squared
distances between data points and cluster centers.
.. note ::
This function is incomplete!
Args:
data (list): list of data points as :class:`Point` objects
k_clusters (int): number of clusters
n_repeats (int): number of tries
Returns:
list, list, list: cluster centers as :class:`Point` objects, evolution of data points, evolution of centers
"""
best_centers = []
best_distance = 0
sq_sum = 0
best_history = []
best_center_history = []
# the cluster division is not unique
# so we run the algorithm several times and
# pick the best solution in the end
for tried in range(n_repeats):
centers = []
history = []
center_history = []
# create an initial guess for cluster centers by
# randomly choosing k data points
for ind in range(k_clusters):
random_point = random.choice(data)
centers.append(Point(random_point.coordinates))
if best_distance == 0:
changed, best_distance = find_clusters(data, centers)
# repeatedly search for clusters until convergence to equilibrium
converged = False
while (not converged):
# assign each point to the cluster whose
# center is nearest to the point
#
# use function find_clusters() here
# save the squared sum of distances that function calculates
# in the variable sq_sum
# todo
if(changed):
# if any point was moved from one cluster to another,
# all cluster centers need to be recalculated
#
# use the function calculate_cluster_centers()
# todo
history.append( copy.deepcopy(data) )
center_history.append( copy.copy(centers) )
# report progress - uncomment lower line if you want more info
print_progress(tried+1, n_repeats)
# print( "run done, square sum = {val:.3f}, best = {best:.3f}".format(val=sq_sum,best=best_distance) )
# check if this is the best solution and save it if it was
if sq_sum < best_distance:
best_distance = sq_sum
best_centers = centers
best_history = copy.deepcopy( history )
best_center_history = copy.deepcopy( center_history )
# reset cluster indices according to the best solution
changed, sq_sum = find_clusters(data, best_centers)
print( "cluster analysis done, final square sum = {val:.3f}".format(val=best_distance) )
return best_centers, best_history, best_center_history
[docs]def data_to_matrix(data, cluster = None):
"""
Creates an array containing data point coordinates.
The coordinates are saved in a matrix where
matrix[i,j] is the jth coordinate of the ith point.
Args:
data (list): list of data points as :class:`Point` objects
cluster (int): If None, all points are included. If a number,
only the points in that cluster are included.
Returns:
array: coordinates as a matrix
"""
dimension = len(data[0].coordinates)
matrix = []
for i in range(dimension):
matrix.append([])
for point in data:
if cluster is None or cluster == point.cluster:
for i in range(dimension):
matrix[i].append(point.coordinates[i])
return np.array(matrix)
[docs]def read_data(filename, data_cols=0, categories=0, delimiter=None):
"""
Reads data from a file.
Each line in the file should define one data point by listing its coordinates.
By default, the coordinates should be separated by spaces.
If a delimiter is given, it is used as the separator instead.
Also by default, each column is assumed to represent a coordinate.
If data_cols is given, each line is assumed to hold that many columns of
coordinate data followed by columns specifying the true class of the point.
The classification is given as columns of zeros and ones so that a
one marks the class of the point.
The file could thus read, e.g.
.. code-block::
1.2 3.4 5.6 1 0 0
1.3 5.7 9.1 0 0 1
...
Where the first point would be of of the first class (index 0)
and the second of the third class (index 2).
Args:
filename (str): name of file to read
data_cols (int): Number of data columns, i.e., dimensionality of the data.
Only give this argument if the file also contains category data.
categories (int): number of known categories
delimiter (str): delimiter, if something else than whitespace.
Returns:
list: generated data as :class:`Point` objects
"""
try:
if delimiter is None:
matrix = np.genfromtxt(filename)
else:
matrix = np.genfromtxt(filename,delimiter=delimiter)
except:
print("could not read "+filename)
data = []
if data_cols == 0:
data_cols = matrix.shape[1]
categories = 0
for i in range(matrix.shape[0]):
coords = matrix[i,:data_cols]
if categories > 0:
category = -1
for j in range(categories):
if matrix[i,data_cols+j] == 1:
category = j
data.append(Point( coords, category ))
else:
data.append(Point( coords ))
return data
[docs]def create_data(n_centers, dimensions=2):
"""
Creates a random set of data points.
The method randomly draws :math:`k` center points
from :math:`[0,10]^d`, and then 100 - 300 normally distributed
data points around them.
Args:
n_centers (int): number of clusters to create, :math:`k`
dimensions (int): dimensionality of the data to create, :math:`d`
Returns:
list: generated data as :class:`Point` objects
"""
data = []
centers = []
print( "centers in the generated data: " )
for i in range(n_centers):
c = 10.0*random.random(size=dimensions)
centers.append(c)
print( " center ",i,": ", c )
type = -1
for center in centers:
type += 1
for i in range(random.integers(100,300)):
p = center + random.standard_normal(size = dimensions)
data.append(Point(p, type))
return data
[docs]def check_results(data, k):
"""
Prints performance report for a test case where the correct groups are known.
Performance is evaluated in two ways.
First, the method checks each cluster found by the k means algorithm
and counts the number of points from each group in that cluster.
The group with most points is declared to be the dominant group of that
cluster and all the points belonging to that group are evaluated as correctly
grouped while the rest are incorrectly grouped. The homogeneity score
is calculated as the number of correctly grouped points divided by the total
number of points. This score is 100 %, if each cluster contains points
only from one group.
Secondly, the method checks each known group and counts the number of the
points of that group in each cluster found by k means.
The cluster with the most points is declared to be the dominant cluster of that
group, and points in that clsuter are defined to be correctly clustered.
The connectedness score is calculated as the number of correctly clustered points
divided by the total number. This score is 100 %, if the points of each group
is contained in a single cluster.
For a perfect score, each cluster should contain exactly all the points from a single
group. This is possible only if number of clusters k is equal to the true number of
groups in the data.
Decreasing the number of clusters k tends to make the connectedness score better
and the homogeneity score worse. (For :math:`k=1`, connectedness would be trivially 100 %
because all points would be in the same cluster.)
Increasing k tends to make homogeneity better and connectedness worse.
(If k would equal the number of points, each point would be its own cluster making
homogeneity 100 % and connectedness close to 0 %.)
Args:
data (list): list of data points as :class:`Point` objects
k (int): number of clusters assumed by the algorithm
"""
n_ok = 0
n_tot = len(data)
n_cat = -1
# check the number of different true categories
for p in data:
if p.true_cluster > n_cat:
n_cat = p.true_cluster
# if categories are not known, just report the cluster sizes
if n_cat < 0:
for i in range(k):
n = 0
for p in data:
if p.cluster == i:
n += 1
print("Points in cluster "+str(i)+": "+str(n)+" / "+str(n_tot)+" = "+str( int( 100*n/n_tot ))+"%")
return
n_cat += 1
# check the found clusters
print("")
print("Comparing results against known classes.")
print("")
print("Checking homogeneity of each found cluster...")
for i in range(k):
print("Cluster "+str(i)+": ")
i_ok = 0
i_tot = 0
categories = np.zeros(n_cat)
for p in data:
if p.cluster == i:
i_tot += 1
categories[ p.true_cluster ] += 1
print( " Distribution of classes:", categories )
max = 0
max_i = 0
for j in range(n_cat):
if categories[j] > max:
max_i = j
max = categories[j]
for p in data:
if p.cluster == i:
if p.true_cluster == max_i:
n_ok += 1
i_ok += 1
print(" Dominant class: "+str(max_i)+".")
print(" Points from this class: "+str(i_ok)+" / "+str(i_tot)+" = "+str( int( 100*i_ok/i_tot ))+"%" )
n2_ok = 0
# check the true categories
print("")
print("Checking connectedness of each known class...")
for i in range(n_cat):
print("Class "+str(i)+": ")
i_ok = 0
i_tot = 0
clusters = np.zeros(k)
for p in data:
if p.true_cluster == i:
i_tot += 1
clusters[ p.cluster ] += 1
print( " Distribution of clusters:", clusters )
max = 0
max_i = 0
for j in range(k):
if clusters[j] > max:
max_i = j
max = clusters[j]
for p in data:
if p.true_cluster == i:
if p.cluster == max_i:
i_ok += 1
n2_ok += 1
print(" Dominant cluster: "+str(max_i)+".")
print(" Points in this cluster: "+str(i_ok)+" / "+str(i_tot)+" = "+str( int( 100*i_ok/i_tot ))+"%" )
print("")
print("Clusters vs. classes: "+str(k)+" vs. "+str(n_cat))
print("Total homogeneity score: "+str(n_ok)+"/"+str(n_tot)+" = "+str( int( 100*n_ok/n_tot ))+"%" )
print("Total connectedness score: "+str(n2_ok)+"/"+str(n_tot)+" = "+str( int( 100*n2_ok/n_tot ))+"%" )
[docs]def scatter_plot(data, k_clusters, axes=[0,1], centers=None):
"""
Plots the data as a 2D scatter plot.
The function assumes the data is sorted in k_clusters categories
and it plots each cluster in a different color.
Only two coordinates are plotted even if the data is high dimensional.
The coordinates to plot can be chosen with the axis parameter,
which must contain the indices of the coordinates to plot.
If cluster centers are given, they are also plotted as dots
with a black outline.
Args:
data (list): list of data points as :class:`Point` objects
k_clusters (int): number of clusters
axes (array): indices of the coordinates to plot
centers (list): cluster center coordinates
"""
styles = ['tab:orange','tab:green','tab:blue',\
'tab:red','tab:purple','tab:brown','tab:cyan']
plt.clf()
ax = plt.axes()
ax.set_aspect('equal')
for c in range(k_clusters):
matrix = data_to_matrix(data, c)
plt.plot(matrix[axes[0],:], matrix[axes[1],:], 'o', \
color = styles[c%len(styles)])
if centers != None:
for c in range(k_clusters):
x = centers[c].coordinates[axes[0]]
y = centers[c].coordinates[axes[1]]
plt.plot(x,y,'o',markersize=3, \
color=styles[c%len(styles)], markeredgecolor='k')
plt.show()
[docs]def profile_plot(data, k_clusters, centers=None):
"""
Plots the data as a series of line plots.
Each line is drawn so that the x-coordinates are
evenly spaced and the y-coordinates are the
coordinates of the data point.
If cluster centers are given, they are also plotted as lines
with a black outline.
Args:
data (list): list of data points as :class:`Point` objects
k_clusters (int): number of clusters
centers (list): cluster center coordinates
"""
styles = ['tab:orange','tab:green','tab:blue',\
'tab:red','tab:purple','tab:brown','tab:cyan']
for point in data:
plt.plot(point.coordinates,'-',
color = styles[point.cluster%len(styles)],
alpha=0.2)
if centers != None:
i = 0
for c in centers:
plt.plot(c.coordinates,'-',lw=3,color='k')
plt.plot(c.coordinates,'-',lw=2,color=styles[i%len(styles)])
i += 1
plt.show()
if __name__ == "__main__":
random = default_rng()
# how many clusters to create?
k_clusters = 4
# how many dimensions in the data?
dimensions = 2
# create random data
data = create_data(k_clusters, dimensions)
# or try categorizing real data
#data = read_data("questions.txt")
#data = read_data("iris-fulldata.csv",4,3,",")
# Run k means cluster analysis.
k = 2
print( "assumed number of centers: ", k )
# run the analysis
cluster_centers, history, center_history = k_means_clustering_algorithm(data, k)
# print solution
print( "final center coordinates: ")
for point in cluster_centers:
print( point.coordinates )
# if the classification is known, check results against it
check_results(data, k)
# plot solution
scatter_plot(data, k, centers=cluster_centers)
profile_plot(data, k, centers=cluster_centers)
# animate solutions
animate_scatter( history, center_history, k )
animate_profile( history, center_history, k )