"""!
@brief Cluster analysis algorithm: K-Medoids.
@details Implementation based on papers @cite book::algorithms_for_clustering_data, @cite book::finding_groups_in_data.
@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2019
@copyright GNU Public License
@cond GNU_PUBLIC_LICENSE
PyClustering is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PyClustering is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
@endcond
"""
import numpy
from pyclustering.cluster.encoder import type_encoding
from pyclustering.utils import medoid
from pyclustering.utils.metric import distance_metric, type_metric
import pyclustering.core.kmedoids_wrapper as wrapper
from pyclustering.core.wrapper import ccore_library
from pyclustering.core.metric_wrapper import metric_wrapper
class kmedoids:
"""!
@brief Class represents clustering algorithm K-Medoids.
@details The algorithm is less sensitive to outliers tham K-Means. The principle difference between K-Medoids and K-Medians is that
K-Medoids uses existed points from input data space as medoids, but median in K-Medians can be unreal object (not from
input data space).
Clustering example:
@code
from pyclustering.cluster.kmedoids import kmedoids
from pyclustering.cluster import cluster_visualizer
from pyclustering.utils import read_sample
from pyclustering.samples.definitions import FCPS_SAMPLES
# Load list of points for cluster analysis.
sample = read_sample(FCPS_SAMPLES.SAMPLE_TWO_DIAMONDS)
# Set random initial medoids.
initial_medoids = [1, 500]
# Create instance of K-Medoids algorithm.
kmedoids_instance = kmedoids(sample, initial_medoids)
# Run cluster analysis and obtain results.
kmedoids_instance.process()
clusters = kmedoids_instance.get_clusters()
# Show allocated clusters.
print(clusters)
# Display clusters.
visualizer = cluster_visualizer()
visualizer.append_clusters(clusters, sample)
visualizer.show()
@endcode
Metric for calculation distance between points can be specified by parameter additional 'metric':
@code
# create Minkowski distance metric with degree equals to '2'
metric = distance_metric(type_metric.MINKOWSKI, degree=2)
# create K-Medoids algorithm with specific distance metric
kmedoids_instance = kmedoids(sample, initial_medoids, metric=metric)
# run cluster analysis and obtain results
kmedoids_instance.process()
clusters = kmedoids_instance.get_clusters()
@endcode
Distance matrix can be used instead of sequence of points to increase performance and for that purpose parameter 'data_type' should be used:
@code
# calculate distance matrix for sample
sample = read_sample(path_to_sample)
matrix = calculate_distance_matrix(sample)
# create K-Medoids algorithm for processing distance matrix instead of points
kmedoids_instance = kmedoids(matrix, initial_medoids, data_type='distance_matrix')
# run cluster analysis and obtain results
kmedoids_instance.process()
clusters = kmedoids_instance.get_clusters()
medoids = kmedoids_instance.get_medoids()
@endcode
"""
def __init__(self, data, initial_index_medoids, tolerance=0.001, ccore=True, **kwargs):
"""!
@brief Constructor of clustering algorithm K-Medoids.
@param[in] data (list): Input data that is presented as list of points (objects), each point should be represented by list or tuple.
@param[in] initial_index_medoids (list): Indexes of intial medoids (indexes of points in input data).
@param[in] tolerance (double): Stop condition: if maximum value of distance change of medoids of clusters is less than tolerance than algorithm will stop processing.
@param[in] ccore (bool): If specified than CCORE library (C++ pyclustering library) is used for clustering instead of Python code.
@param[in] **kwargs: Arbitrary keyword arguments (available arguments: 'metric', 'data_type', 'itermax').
Keyword Args:
- metric (distance_metric): Metric that is used for distance calculation between two points.
- data_type (string): Data type of input sample 'data' that is processed by the algorithm ('points', 'distance_matrix').
- itermax (uint): Maximum number of iteration for cluster analysis.
"""
self.__pointer_data = data
self.__clusters = []
self.__medoid_indexes = initial_index_medoids
self.__tolerance = tolerance
self.__metric = kwargs.get('metric', distance_metric(type_metric.EUCLIDEAN_SQUARE))
self.__data_type = kwargs.get('data_type', 'points')
self.__itermax = kwargs.get('itermax', 200)
self.__distance_calculator = self.__create_distance_calculator()
self.__ccore = ccore and self.__metric.get_type() != type_metric.USER_DEFINED
if self.__ccore:
self.__ccore = ccore_library.workable()
#self.__verify_instance()
def process(self):
"""!
@brief Performs cluster analysis in line with rules of K-Medoids algorithm.
@return (kmedoids) Returns itself (K-Medoids instance).
@remark Results of clustering can be obtained using corresponding get methods.
@see get_clusters()
@see get_medoids()
"""
if self.__ccore is True:
ccore_metric = metric_wrapper.create_instance(self.__metric)
self.__clusters, self.__medoid_indexes = wrapper.kmedoids(self.__pointer_data, self.__medoid_indexes, self.__tolerance, self.__itermax, ccore_metric.get_pointer(), self.__data_type)
else:
changes = float('inf')
iterations = 0
while changes > self.__tolerance and iterations < self.__itermax:
self.__clusters = self.__update_clusters()
update_medoid_indexes = self.__update_medoids()
changes = max([self.__distance_calculator(self.__medoid_indexes[index], update_medoid_indexes[index]) for index in range(len(update_medoid_indexes))])
self.__medoid_indexes = update_medoid_indexes
iterations += 1
return self
def predict(self, points):
"""!
@brief Calculates the closest cluster to each point.
@param[in] points (array_like): Points for which closest clusters are calculated.
@return (list) List of closest clusters for each point. Each cluster is denoted by index. Return empty
collection if 'process()' method was not called.
An example how to calculate (or predict) the closest cluster to specified points.
@code
from pyclustering.cluster.kmedoids import kmedoids
from pyclustering.samples.definitions import SIMPLE_SAMPLES
from pyclustering.utils import read_sample
# Load list of points for cluster analysis.
sample = read_sample(SIMPLE_SAMPLES.SAMPLE_SIMPLE3)
# Initial medoids for sample 'Simple3'.
initial_medoids = [4, 12, 25, 37]
# Create instance of K-Medoids algorithm with prepared centers.
kmedoids_instance = kmedoids(sample, initial_medoids)
# Run cluster analysis.
kmedoids_instance.process()
# Calculate the closest cluster to following two points.
points = [[0.35, 0.5], [2.5, 2.0]]
closest_clusters = kmedoids_instance.predict(points)
print(closest_clusters)
@endcode
"""
if len(self.__clusters) == 0:
return []
medoids = [ self.__pointer_data[index] for index in self.__medoid_indexes ]
differences = numpy.zeros((len(points), len(medoids)))
for index_point in range(len(points)):
differences[index_point] = [ self.__metric(points[index_point], center) for center in medoids ]
return numpy.argmin(differences, axis=1)
def get_clusters(self):
"""!
@brief Returns list of allocated clusters, each cluster contains indexes of objects in list of data.
@see process()
@see get_medoids()
"""
return self.__clusters
def get_medoids(self):
"""!
@brief Returns list of medoids of allocated clusters represented by indexes from the input data.
@see process()
@see get_clusters()
"""
return self.__medoid_indexes
def get_cluster_encoding(self):
"""!
@brief Returns clustering result representation type that indicate how clusters are encoded.
@return (type_encoding) Clustering result representation.
@see get_clusters()
"""
return type_encoding.CLUSTER_INDEX_LIST_SEPARATION
def __verify_instance(self):
pass
def __create_distance_calculator(self):
"""!
@brief Creates distance calculator in line with algorithms parameters.
@return (callable) Distance calculator.
"""
if self.__data_type == 'points':
return lambda index1, index2: self.__metric(self.__pointer_data[index1], self.__pointer_data[index2])
elif self.__data_type == 'distance_matrix':
if isinstance(self.__pointer_data, numpy.matrix):
return lambda index1, index2: self.__pointer_data.item((index1, index2))
return lambda index1, index2: self.__pointer_data[index1][index2]
else:
raise TypeError("Unknown type of data is specified '%s'" % self.__data_type)
def __update_clusters(self):
"""!
@brief Calculate distance to each point from the each cluster.
@details Nearest points are captured by according clusters and as a result clusters are updated.
@return (list) updated clusters as list of clusters where each cluster contains indexes of objects from data.
"""
clusters = [[self.__medoid_indexes[i]] for i in range(len(self.__medoid_indexes))]
for index_point in range(len(self.__pointer_data)):
if index_point in self.__medoid_indexes:
continue
index_optim = -1
dist_optim = float('Inf')
for index in range(len(self.__medoid_indexes)):
dist = self.__distance_calculator(index_point, self.__medoid_indexes[index])
if dist < dist_optim:
index_optim = index
dist_optim = dist
clusters[index_optim].append(index_point)
return clusters
def __update_medoids(self):
"""!
@brief Find medoids of clusters in line with contained objects.
@return (list) list of medoids for current number of clusters.
"""
medoid_indexes = [-1] * len(self.__clusters)
for index in range(len(self.__clusters)):
medoid_index = medoid(self.__pointer_data, self.__clusters[index], metric=self.__metric, data_type=self.__data_type)
medoid_indexes[index] = medoid_index
return medoid_indexes