Source code for Orange.clustering.hierarchical

import warnings

from collections import namedtuple, deque, defaultdict
from operator import attrgetter
from itertools import count

import heapq
import numpy

import scipy.cluster.hierarchy
import scipy.spatial.distance

from Orange.distance import Euclidean, PearsonR

__all__ = ['HierarchicalClustering']

_undef = object()  # 'no value' sentinel

SINGLE = "single"
AVERAGE = "average"
COMPLETE = "complete"
WEIGHTED = "weighted"
WARD = "ward"


def condensedform(X, mode="upper"):
    X = numpy.asarray(X)
    assert len(X.shape) == 2
    assert X.shape[0] == X.shape[1]

    N = X.shape[0]

    if mode == "upper":
        i, j = numpy.triu_indices(N, k=1)
    elif mode == "lower":
        i, j = numpy.tril_indices(N, k=-1)
    else:
        raise ValueError("invalid mode")
    return X[i, j]


def squareform(X, mode="upper"):
    X = numpy.asarray(X)
    k = X.shape[0]
    N = int(numpy.ceil(numpy.sqrt(k * 2)))
    assert N * (N - 1) // 2 == k
    matrix = numpy.zeros((N, N), dtype=X.dtype)
    if mode == "upper":
        i, j = numpy.triu_indices(N, k=1)
        matrix[i, j] = X
        m, n = numpy.tril_indices(N, k=-1)
        matrix[m, n] = matrix.T[m, n]
    elif mode == "lower":
        i, j = numpy.tril_indices(N, k=-1)
        matrix[i, j] = X
        m, n = numpy.triu_indices(N, k=1)
        matrix[m, n] = matrix.T[m, n]
    return matrix


def data_clustering(data, distance=Euclidean,
                    linkage=AVERAGE):
    """
    Return the hierarchical clustering of the dataset's rows.

    :param Orange.data.Table data: Dataset to cluster.
    :param Orange.distance.Distance distance: A distance measure.
    :param str linkage:
    """
    matrix = distance(data)
    return dist_matrix_clustering(matrix, linkage=linkage)


def feature_clustering(data, distance=PearsonR,
                       linkage=AVERAGE):
    """
    Return the hierarchical clustering of the dataset's columns.

    :param Orange.data.Table data: Dataset to cluster.
    :param Orange.distance.Distance distance: A distance measure.
    :param str linkage:
    """
    matrix = distance(data, axis=0)
    return dist_matrix_clustering(matrix, linkage=linkage)


def dist_matrix_linkage(matrix, linkage=AVERAGE):
    """
    Return linkage using a precomputed distance matrix.

    :param Orange.misc.DistMatrix matrix:
    :param str linkage:
    """
    # Extract compressed upper triangular distance matrix.
    distances = condensedform(matrix)
    return scipy.cluster.hierarchy.linkage(distances, method=linkage)


def dist_matrix_clustering(matrix, linkage=AVERAGE):
    """
    Return the hierarchical clustering using a precomputed distance matrix.

    :param Orange.misc.DistMatrix matrix:
    :param str linkage:
    """
    Z = dist_matrix_linkage(matrix, linkage=linkage)
    return tree_from_linkage(Z)


def sample_clustering(X, linkage=AVERAGE, metric="euclidean"):
    assert len(X.shape) == 2
    Z = scipy.cluster.hierarchy.linkage(X, method=linkage, metric=metric)
    return tree_from_linkage(Z)


class Tree(object):
    __slots__ = ("__value", "__branches", "__hash")

    def __init__(self, value, branches=()):
        if not isinstance(branches, tuple):
            raise TypeError()
        self.__value = value
        self.__branches = branches
        # preemptively cache the hash value
        self.__hash = hash((value, branches))

    def __hash__(self):
        return self.__hash

    def __eq__(self, other):
        return isinstance(other, Tree) and tuple(self) == tuple(other)

    def __lt__(self, other):
        if not isinstance(other, Tree):
            return NotImplemented
        return tuple(self) < tuple(other)

    def __le__(self, other):
        if not isinstance(other, Tree):
            return NotImplemented
        return tuple(self) <= tuple(other)

    def __getnewargs__(self):
        return tuple(self)

    def __iter__(self):
        return iter((self.__value, self.__branches))

    def __repr__(self):
        return ("{0.__name__}(value={1!r}, branches={2!r})"
                .format(type(self), self.value, self.branches))

    @property
    def is_leaf(self):
        return not bool(self.branches)

    @property
    def left(self):
        return self.branches[0]

    @property
    def right(self):
        return self.branches[-1]

    value = property(attrgetter("_Tree__value"))
    branches = property(attrgetter("_Tree__branches"))


ClusterData = namedtuple("Cluster", ["range", "height"])
SingletonData = namedtuple("Singleton", ["range", "height", "index"])


class _Ranged:

    @property
    def first(self):
        return self.range[0]

    @property
    def last(self):
        return self.range[-1]


class ClusterData(ClusterData, _Ranged):
    __slots__ = ()


class SingletonData(SingletonData, _Ranged):
    __slots__ = ()


def tree_from_linkage(linkage):
    """
    Return a Tree representation of a clustering encoded in a linkage matrix.

    .. seealso:: scipy.cluster.hierarchy.linkage

    """
    scipy.cluster.hierarchy.is_valid_linkage(
        linkage, throw=True, name="linkage")
    T = {}
    N, _ = linkage.shape
    N = N + 1
    order = []
    for i, (c1, c2, d, _) in enumerate(linkage):
        if c1 < N:
            left = Tree(SingletonData(range=(len(order), len(order) + 1),
                                      height=0.0, index=int(c1)),
                        ())
            order.append(c1)
        else:
            left = T[c1]

        if c2 < N:
            right = Tree(SingletonData(range=(len(order), len(order) + 1),
                                       height=0.0, index=int(c2)),
                         ())
            order.append(c2)
        else:
            right = T[c2]

        t = Tree(ClusterData(range=(left.value.first, right.value.last),
                             height=d),
                 (left, right))
        T[N + i] = t

    root = T[N + N - 2]
    T = {}

    leaf_idx = 0
    for node in postorder(root):
        if node.is_leaf:
            T[node] = Tree(
                node.value._replace(range=(leaf_idx, leaf_idx + 1)), ())
            leaf_idx += 1
        else:
            left, right = T[node.left].value, T[node.right].value
            assert left.first < right.first

            t = Tree(
                node.value._replace(range=(left.range[0], right.range[1])),
                tuple(T[ch] for ch in node.branches)
            )
            assert t.value.range[0] <= t.value.range[-1]
            assert left.first == t.value.first and right.last == t.value.last
            assert t.value.first < right.first
            assert t.value.last > left.last
            T[node] = t

    return T[root]


def linkage_from_tree(tree: Tree) -> numpy.ndarray:
    leafs = [n for n in preorder(tree) if n.is_leaf]

    Z = numpy.zeros((len(leafs) - 1, 4), float)
    i = 0
    node_to_i = defaultdict(count(len(leafs)).__next__)
    for node in postorder(tree):
        if node.is_leaf:
            node_to_i[node] = node.value.index
        else:
            assert len(node.branches) == 2
            assert node.left in node_to_i
            assert node.right in node_to_i
            Z[i] = [node_to_i[node.left], node_to_i[node.right],
                    node.value.height, 0]
            _ni = node_to_i[node]
            assert _ni == Z.shape[0] + i + 1
            i += 1
    assert i == Z.shape[0]
    return Z


def postorder(tree, branches=attrgetter("branches")):
    stack = deque([tree])
    visited = set()

    while stack:
        current = stack.popleft()
        children = branches(current)
        if children:
            # yield the item on the way up
            if current in visited:
                yield current
            else:
                # stack = children + [current] + stack
                stack.extendleft([current])
                stack.extendleft(reversed(children))
                visited.add(current)

        else:
            yield current
            visited.add(current)


def preorder(tree, branches=attrgetter("branches")):
    stack = deque([tree])
    while stack:
        current = stack.popleft()
        yield current
        children = branches(current)
        if children:
            stack.extendleft(reversed(children))


def leaves(tree, branches=attrgetter("branches")):
    """
    Return an iterator over the leaf nodes in a tree structure.
    """
    return (node for node in postorder(tree, branches)
            if node.is_leaf)


def prune(cluster, level=None, height=None, condition=None):
    """
    Prune the clustering instance ``cluster``.

    :param Tree cluster: Cluster root node to prune.
    :param int level: If not `None` prune all clusters deeper then `level`.
    :param float height:
        If not `None` prune all clusters with height lower then `height`.
    :param function condition:
        If not `None condition must be a `Tree -> bool` function
        evaluating to `True` if the cluster should be pruned.

    .. note::
        At least one `level`, `height` or `condition` argument needs to
        be supplied.

    """
    if not any(arg is not None for arg in [level, height, condition]):
        raise ValueError("At least one pruning argument must be supplied")

    level_check = height_check = condition_check = lambda cl: False

    if level is not None:
        cluster_depth = cluster_depths(cluster)
        level_check = lambda cl: cluster_depth[cl] >= level

    if height is not None:
        height_check = lambda cl: cl.value.height <= height

    if condition is not None:
        condition_check = condition

    def check_all(cl):
        return level_check(cl) or height_check(cl) or condition_check(cl)

    T = {}

    for node in postorder(cluster):
        if check_all(node):
            if node.is_leaf:
                T[node] = node
            else:
                T[node] = Tree(node.value, ())
        else:
            T[node] = Tree(node.value,
                           tuple(T[ch] for ch in node.branches))
    return T[cluster]


def cluster_depths(cluster):
    """
    Return a dictionary mapping :class:`Tree` instances to their depth.

    :param Tree cluster: Root cluster
    :rtype: class:`dict`

    """
    depths = {}
    depths[cluster] = 0
    for cluster in preorder(cluster):
        cl_depth = depths[cluster]
        depths.update(dict.fromkeys(cluster.branches, cl_depth + 1))
    return depths


def top_clusters(tree, k):
    """
    Return `k` topmost clusters from hierarchical clustering.

    :param Tree root: Root cluster.
    :param int k: Number of top clusters.

    :rtype: list of :class:`Tree` instances
    """
    def item(node):
        return ((node.is_leaf, -node.value.height), node)

    heap = [item(tree)]

    while len(heap) < k:
        _, cl = heap[0]  # peek
        if cl.is_leaf:
            assert all(n.is_leaf for _, n in heap)
            break
        key, cl = heapq.heappop(heap)
        left, right = cl.left, cl.right
        heapq.heappush(heap, item(left))
        heapq.heappush(heap, item(right))

    return [n for _, n in heap]


def optimal_leaf_ordering(
        tree: Tree, distances: numpy.ndarray, progress_callback=_undef
) -> Tree:
    """
    Order the leaves in the clustering tree.

    :param Tree tree:
        Binary hierarchical clustering tree.
    :param numpy.ndarray distances:
        A (N, N) numpy.ndarray of distances that were used to compute
        the clustering.

    .. seealso:: scipy.cluster.hierarchy.optimal_leaf_ordering
    """
    if progress_callback is not _undef:
        warnings.warn(
            "'progress_callback' parameter is deprecated and ignored. "
            "Passing it will raise an error in the future.",
            FutureWarning, stacklevel=2
        )
    Z = linkage_from_tree(tree)
    y = condensedform(numpy.asarray(distances))
    Zopt = scipy.cluster.hierarchy.optimal_leaf_ordering(Z, y)
    return tree_from_linkage(Zopt)


[docs] class HierarchicalClustering: def __init__(self, n_clusters=2, linkage=AVERAGE): self.n_clusters = n_clusters self.linkage = linkage def fit(self, X): self.tree = dist_matrix_clustering(X, linkage=self.linkage) cut = top_clusters(self.tree, self.n_clusters) labels = numpy.zeros(self.tree.value.last) for i, cl in enumerate(cut): indices = [leaf.value.index for leaf in leaves(cl)] labels[indices] = i self.labels = labels def fit_predict(self, X, y=None): self.fit(X) return self.labels