Source code for rrgcn.random_rgcn_embedder

from copy import deepcopy
from typing import Dict, Optional, Tuple, Union

import pandas as pd
import sklearn
import sklearn.base
import sklearn.preprocessing
import torch
import torch.nn.functional as F
import torch_sparse
from torch_geometric.utils.subgraph import k_hop_subgraph
from torch_sparse import SparseTensor
from tqdm import tqdm

from .node_encoder import NodeEncoder
from .random_rgcn_conv import RandomRGCNConv
from .util import calc_ppv


[docs]class RRGCNEmbedder(torch.nn.Module): def __init__( self, num_nodes: int, num_layers: int, num_relations: int, emb_size: int, device: Union[torch.device, str] = "cuda", ppv: bool = True, seed: int = 42, min_node_degree: int = 0, ): """Random Relational Graph Convolutional Network Knowledge Graph Embedder. Args: num_nodes (int): Number of nodes in the KG. num_layers (int): Number of random graph convolutions. num_relations (int): Number of relations in the KG. emb_size (int): Desired embedding width. device (torch.device or str, optional): PyTorch device to calculate embeddings on. Defaults to "cuda". ppv (bool, optional): If True, concatenate PPV features to embeddings (this effectively doubles the embedding width). Defaults to True. seed (int, optional): Seed used to generate random transformations (fully characterizes the embedder). Defaults to 42. min_node_degree (int, optional): If set, embedder first remove all nodes with a degree lower than the given argument from the graph before taking subgraph. Defaults to 0. """ super().__init__() self.device = device self.ppv = ppv self.num_nodes = num_nodes self.emb_size = emb_size self.seed = seed self.num_layers = num_layers self.min_node_degree = min_node_degree self.layers = [ RandomRGCNConv(emb_size, emb_size, num_relations, seed=seed) for _ in range(num_layers) ] self.ne = NodeEncoder( emb_size=emb_size, num_nodes=num_nodes, seed=self.seed, device=device )
[docs] def forward( self, edge_index: Union[torch.Tensor, torch_sparse.SparseTensor], edge_type: Optional[torch.Tensor] = None, node_features: Optional[Dict[int, Tuple[torch.Tensor, torch.Tensor]]] = None, node_idx: Optional[torch.Tensor] = None, ) -> torch.Tensor: """Calculates node embeddings for a (sub)graph specified by a typed adjacency matrix Args: edge_index (torch.Tensor or torch_sparse.SparseTensor): Adjacency matrix. Either in 2-row head/tail format or using a SparseTensor. edge_type (torch.Tensor, optional): Types for each edge in `edge_index`. Can be omitted if `edge_index` is a SparseTensor where types are included as values. Defaults to None. node_features (Dict[int, Tuple[torch.Tensor, torch.Tensor]], optional): Dictionary with featured node type identifiers as keys, and tuples of node indices and initial features as values. For example, if nodes `[3, 5, 7]` are literals of type `5` with numeric values `[0.7, 0.1, 0.5]`, `node_features` should be: `{5: (torch.tensor([3, 5, 7]), torch.tensor([0.7], [0.1], [0.5]))}` Featured nodes are not limited to numeric literals, e.g. word embeddings can also be passed for string literals. The node indices used to specify the locations of literal nodes should be included in `node_idx` (if supplied). node_idx (torch.Tensor, optional): Useful for batched embedding calculation. Mapping from node indices used in the given (sub)graph's adjancency matrix to node indices in the original graph. Defaults to None. Returns: torch.Tensor: Node embeddings for given (sub)graph. """ if node_idx is None: node_idx = torch.arange(edge_index.max() + 1) # Use kwargs to support both torch_sparse adjacency tensors # and edge_index, edge_type kwargs = {"edge_index": edge_index} if edge_type is not None: kwargs = {**kwargs, "edge_type": edge_type} x = self.ne(node_features, node_idx) x = self.layers[0](x, **kwargs) if self.ppv: # Calculate proportion of positive values in 1-hop neighbourhood # after first convolution ppv = calc_ppv(x, edge_index) # Free GPU memory for next conv layer ppv = ppv.cpu() for conv in self.layers[1:]: x = F.relu(x) x = conv(x, **kwargs) if self.ppv: # Free GPU memory for PPV calculation x = x.cpu() # Return PPV to GPU ppv = ppv.to(self.device) # Random message passing with previous PPV as features ppv = conv(ppv, **kwargs) # Calculate new PPV ppv = calc_ppv(ppv, edge_index) # Free GPU memory for next conv layer ppv = ppv.cpu() # Return conv activations to GPU x = x.to(self.device) if self.ppv: # Return conv activations to GPU x = x.cpu() # Concatenate final conv activations and PPV features x = torch.hstack((x, ppv)) return x
[docs] def get_last_fit_scalers(self) -> Dict[int, sklearn.base.TransformerMixin]: """If during the last call to `embeddings()`, scalers were fit, returns the per featured node fitted sklearn scalers. Returns: Dict[int, sklearn.base.TransformerMixin]: the fitted scalers """ return self.node_features_scalers
[docs] def estimated_peak_memory_usage( self, edge_index: Union[torch.Tensor, torch_sparse.SparseTensor], batch_size: int = 0, idx: Optional[torch.Tensor] = None, subgraph: bool = True, **kwargs ): """Calculates the theoretical peak memory usage for a set of arguments given to `RRGCNEmbedder.embeddings()` Args: edge_index (torch.Tensor or torch_sparse.SparseTensor): Adjacency matrix. Either in 2-row head/tail format or using a SparseTensor. edge_type (torch.Tensor, optional): Types for each edge in `edge_index`. Can be omitted if `edge_index` is a SparseTensor where types are included as values. Defaults to None. batch_size (int, optional): Number of nodes in a single batch. For every batch, a subgraph with number of hops equal to the number of graph convolutions around the included nodes is extracted and used for message passing. If `batch_size` is 0, all nodes of interest are contained in a single batch. Defaults to 0. idx (torch.Tensor, optional): Node indices to extract embeddings for (e.g. indices for train- and test entities). If None, extracts embeddings for all nodes in the graph. Defaults to None. subgraph (bool, optional): If False, the function does not take a k-hop subgraph before executing message passing. This is useful for small graphs where embeddings can be extracted full-batch and calculating the subgraph comes with a significant overhead. Defaults to True. Returns: int: Theoretical peak memory usage in number of bytes """ if subgraph: if idx is None: # Generate embeddings for all nodes all_nodes = torch.arange(edge_index.max() + 1) else: # Only generate embeddings for subset of nodes # (e.g. labelled train + test nodes) all_nodes = idx if batch_size < 1: # Full-batch batches = [all_nodes] else: # Split nodes to generate embeddings for into batches batches = all_nodes.split(batch_size) num_nodes_per_batch = [] for batch in batches: # Calculate batch subgraph with smaller number of nodes and edges nodes, _, _, _ = k_hop_subgraph( batch, self.num_layers, edge_index, relabel_nodes=True ) num_nodes_per_batch.append(len(nodes)) max_num_nodes = max(num_nodes_per_batch) else: max_num_nodes = edge_index.max() + 1 factor = 4 # 4 if self.ppv else 3 return factor * max_num_nodes * self.emb_size * 4 # in number of bytes
[docs] @torch.no_grad() def embeddings( self, edge_index: Union[torch.Tensor, torch_sparse.SparseTensor], edge_type: Optional[torch.Tensor] = None, batch_size: int = 0, node_features: Optional[Dict[int, Tuple[torch.Tensor, torch.Tensor]]] = None, node_features_scalers: Optional[ Union[Dict[int, sklearn.base.TransformerMixin], str] ] = "standard", idx: Optional[torch.Tensor] = None, subgraph: bool = True, ) -> torch.Tensor: """Generate embeddings for a given set of nodes of interest. Args: edge_index (torch.Tensor or torch_sparse.SparseTensor): Adjacency matrix. Either in 2-row head/tail format or using a SparseTensor. edge_type (torch.Tensor, optional): Types for each edge in `edge_index`. Can be omitted if `edge_index` is a SparseTensor where types are included as values. Defaults to None. batch_size (int, optional): Number of nodes in a single batch. For every batch, a subgraph with number of hops equal to the number of graph convolutions around the included nodes is extracted and used for message passing. If `batch_size` is 0, all nodes of interest are contained in a single batch. Defaults to 0. node_features (Dict[int, Tuple[torch.Tensor, torch.Tensor]], optional): Dictionary with featured node type identifiers as keys, and tuples of node indices and initial features as values. For example, if nodes `[3, 5, 7]` are literals of type `5` with numeric values `[0.7, 0.1, 0.5]`, `node_features` should be: `{5: (torch.tensor([3, 5, 7]), torch.tensor([0.7], [0.1], [0.5]))}` Featured nodes are not limited to numeric literals, e.g. word embeddings can also be passed for string literals. The node indices used to specify the locations of literal nodes should be included in `idx` (if supplied). node_features_scalers (Dict[int, TransformerMixin] or str, optional): Dictionary with featured node type identifiers as keys, and sklearn scalers as values. If scalers are not fit, they will be fit on the data. The fit scalers can be retrieved using `.get_last_fit_scalers()`. Can also be "standard", "robust", "power", "quantile" as shorthands for an unfitted StandardScaler, RobustScaler, PowerTransformer and QuantileTransformer respectively. If None, no scaling is applied. Defaults to "standard". idx (torch.Tensor, optional): Node indices to extract embeddings for (e.g. indices for train- and test entities). If None, extracts embeddings for all nodes in the graph. Defaults to None. subgraph (bool, optional): If False, the function does not take a k-hop subgraph before executing message passing. This is useful for small graphs where embeddings can be extracted full-batch and calculating the subgraph comes with a significant overhead. Defaults to True. Returns: torch.Tensor: Node embeddings for given nodes of interest """ self.eval() if idx is None: # Generate embeddings for all nodes all_nodes = torch.arange(edge_index.max() + 1) else: # Only generate embeddings for subset of nodes # (e.g. labelled train + test nodes) all_nodes = idx if batch_size < 1: # Full-batch batches = [all_nodes] else: # Split nodes to generate embeddings for into batches batches = all_nodes.split(batch_size) normalized_node_features = node_features if node_features is not None and node_features_scalers is not None: normalized_node_features = deepcopy(node_features) if isinstance(node_features_scalers, str): assert node_features_scalers in [ "standard", "robust", "power", "quantile", ] kwargs = {} if node_features_scalers == "standard": scaler = sklearn.preprocessing.StandardScaler elif node_features_scalers == "robust": scaler = sklearn.preprocessing.RobustScaler elif node_features_scalers == "power": scaler = sklearn.preprocessing.PowerTransformer elif node_features_scalers == "quantile": scaler = sklearn.preprocessing.QuantileTransformer kwargs = {**kwargs, "output_distribution": "normal"} self.node_features_scalers = { k: scaler(**kwargs) for k, _ in node_features.items() } else: self.node_features_scalers = deepcopy(node_features_scalers) for type_id, (typed_idx, feat) in node_features.items(): if ( type_id not in self.node_features_scalers or self.node_features_scalers[type_id] is None ): continue if not hasattr(self.node_features_scalers[type_id], "n_features_in_"): self.node_features_scalers[type_id] = self.node_features_scalers[ type_id ].fit(feat) normalized_node_features[type_id] = ( typed_idx, torch.tensor( self.node_features_scalers[type_id].transform(feat), device=self.device, dtype=torch.float32, ), ) embs = [] for batch in tqdm(batches): if self.min_node_degree > 0: degrees = pd.Series(edge_index[0].cpu().numpy()).value_counts() low_degree = set( degrees[degrees < self.min_node_degree].index.to_numpy() ) low_degree = torch.tensor( list(low_degree.difference(set(all_nodes.cpu().numpy()))) ).to(edge_index.device) degree_mask = ~( torch.isin(edge_index[0], low_degree) | torch.isin(edge_index[1], low_degree) ).to(edge_index.device) deg_edge_index = edge_index[:, degree_mask] deg_edge_type = edge_type[degree_mask] else: deg_edge_index = edge_index deg_edge_type = edge_type if subgraph: # Calculate batch subgraph with smaller number of nodes and edges nodes, sub_edge_index, mapping, edge_preserved = k_hop_subgraph( batch, self.num_layers, deg_edge_index, relabel_nodes=True ) sub_edge_type = deg_edge_type[edge_preserved] else: nodes, sub_edge_index, mapping, sub_edge_type = ( torch.arange(edge_index.max() + 1), deg_edge_index, batch, deg_edge_type, ) adj_t = ( SparseTensor( row=sub_edge_index[0], col=sub_edge_index[1], value=sub_edge_type, ) .to(self.device) .t() ) sub_node_features = None if node_features is not None: sub_node_features = {} for type_id, (typed_idx, feat) in normalized_node_features.items(): mask = torch.isin(typed_idx.to(nodes.device), nodes) selected_idx = typed_idx[mask] if selected_idx.numel() == 0: continue sub_node_features[type_id] = (selected_idx, feat[mask]) # Calculate embeddings for all nodes participating in batch and then select # the queried nodes emb = ( self(adj_t, node_idx=nodes, node_features=sub_node_features)[mapping] .detach() .cpu() ) embs.append(emb) return torch.concat(embs)