Source code for sparsegraph.classes.sparsegraph

import numpy as np
from typing import Generic, List, Literal, TypeVar
from scipy import sparse

T = TypeVar("T")


[docs] class SparseGraph(Generic[T]): def __init__(self, adjacency: sparse.csr_matrix, labels: List[T]) -> None: self.size: int = adjacency.shape[0] self.adjacency = adjacency self.indegree = adjacency.sum(axis=0) self.outdegree = adjacency.sum(axis=1) if len(labels) != self.size: raise ValueError("Number of labels must match number of nodes in graph.") self.labels = labels self._in_degree = None self._out_degree = None @staticmethod def __delete_from_csr__( mat: sparse.csr_matrix, indices: List[int] ) -> sparse.csr_matrix: """ Parameters --- mat: The csr sparse adjacency matrix of the graph. indices: The indices of the nodes in the graph that will be removed. Returns --- mat: The sparse adjacency matrix of the graph with the nodes removed. """ if mat.shape[0] != mat.shape[1]: raise ValueError("Matrix must be square.") mask = np.ones(mat.shape[0], dtype=bool) mask[indices] = False return mat[mask][:, mask]
[docs] def get_largest_component( self, directed: bool = True, connection: Literal["strong", "weak"] = "strong" ) -> "SparseGraph": """ Parameters --- directed: If ``True`` the graph is treated as directed. connection: If ``"strong"``, the connected components will all be strongly connected together. If ``"weak"`` the connected components will be weakly connected. If ``directed == False`` the parameter is ignored and the graph will be treated as strongly connected. """ _, component_labels = sparse.csgraph.connected_components( self.adjacency, directed=directed, connection=connection ) unique_component_labels, count = np.unique(component_labels, return_counts=True) largest = unique_component_labels[np.argmax(count)] indices = np.where(component_labels == largest)[0] unconnected_indices = list(set(range(self.adjacency.shape[0])) - set(indices)) subgraph = self.remove_indices(unconnected_indices) return subgraph
[docs] def in_degree(self, index: int) -> int: return self.indegree[index]
[docs] def out_degree(self, index: int) -> int: return self.outdegree[index]
[docs] def outgoing_neighbors(self, index: int) -> np.ndarray: nodes = self.adjacency[index, :].toarray()[0] return np.nonzero(nodes)[0]
[docs] def incoming_neighbors(self, index: int) -> np.ndarray: nodes = self.adjacency[:, index].toarray()[0] return np.nonzero(nodes)[0]
[docs] def remove_indices(self, indices: List[int]) -> "SparseGraph": new_adjacency = self.__delete_from_csr__(self.adjacency, indices) new_labels = [label for i, label in enumerate(self.labels) if i not in indices] return SparseGraph(new_adjacency, new_labels)
[docs] def get_label(self, index: int) -> T: return self.labels[index]