from functools import partial
import numpy as np
import torch as T
import torch.nn as nn

from .abstract import wrapper, Module, Sequential
from .convolution import Conv2d, DepthwiseSepConv2D, FC
from .. import utils
from ..utils import _pointset_shape

__all__ = ['XConv', 'GraphConv', 'BatchGraphConv', 'GraphXConv']

class XConv(Module):
    Performs X-Convolution on unordered set as in `this paper`_.

    .. _this paper:

        shape of the input tensor.
        If an integer is passed, it is treated as the size of each input sample.
    feature_dim : int
        dimension of the input features.
    out_channels : int
        number of channels produced by the convolution.
    out_features : int
        size of each output sample.
    num_neighbors : int
        size of the convolving kernel.
        depth multiplier for intermediate result of depthwise convolution
        non-linear function to activate the linear result.
        It accepts any callable function
        as well as a recognizable ``str``.
        A list of possible ``str`` is in :func:`~neuralnet_pytorch.utils.function`.
    dropout : bool
        whether to use dropout.
    bn : bool
        whether to use batch normalization.
        extra keyword arguments to pass to activation.
    def __init__(self, input_shape, feature_dim, out_channels, out_features, num_neighbors, depth_mul,
                 activation='relu', dropout=None, bn=True, **kwargs):
        input_shape = _pointset_shape(input_shape)

        self.feature_dim = feature_dim
        self.out_channels = out_channels
        self.num_neighbors = num_neighbors
        self.out_features = out_features
        self.depth_mul = depth_mul
        self.activation = utils.function(activation, **kwargs)
        self.dropout = dropout = bn

        self.fcs = Sequential(input_shape=input_shape)
        self.fcs.add_module('fc1', FC(self.fcs.output_shape, out_features, activation=activation))
        if dropout:
            self.fcs.add_module('dropout1', wrapper(self.output_shape, nn.Dropout2d, p=dropout))
        self.fcs.add_module('fc2', FC(self.fcs.output_shape, out_features, activation=activation))
        if dropout:
            self.fcs.add_module('dropout2', wrapper(self.output_shape, nn.Dropout2d, p=dropout))

        from neuralnet_pytorch.layers.resizing import DimShuffle
        from neuralnet_pytorch.layers.normalization import BatchNorm2d

        self.x_trans = Sequential(input_shape=input_shape[:2] + (num_neighbors, input_shape[-1]))
        self.x_trans.add_module('dimshuffle1', DimShuffle(self.x_trans.output_shape, (0, 3, 1, 2)))
        self.x_trans.add_module('conv', Conv2d(self.x_trans.output_shape, num_neighbors ** 2, (1, num_neighbors),
                                               activation=activation, padding='valid', **kwargs))
        self.x_trans.add_module('dimshuffle2', DimShuffle(self.x_trans.output_shape, (0, 2, 3, 1)))
        self.x_trans.add_module('fc1', FC(self.x_trans.output_shape, num_neighbors ** 2, activation='relu', **kwargs))
        self.x_trans.add_module('fc2', FC(self.x_trans.output_shape, num_neighbors ** 2, **kwargs))

        self.end_conv = Sequential(input_shape=input_shape[:2] + (num_neighbors, feature_dim + out_features))
        self.end_conv.add_module('dimshuffle1', DimShuffle(self.end_conv.output_shape, (0, 3, 1, 2)))
                                 DepthwiseSepConv2D(self.end_conv.output_shape, out_channels, (1, num_neighbors),
                                                    depth_mul=depth_mul, activation=None if bn else activation,
                                                    padding='valid', **kwargs))
        if bn:
            self.end_conv.add_module('bn', BatchNorm2d(self.end_conv.output_shape, momentum=.9, activation=activation))
        self.end_conv.add_module('dimshuffle2', DimShuffle(self.end_conv.output_shape, (0, 2, 3, 1)))

    def forward(self, *input, **kwargs):
        rep_pt, pts, fts = input

        if fts is not None:
            assert rep_pt.size()[0] == pts.size()[0] == fts.size()[0]  # Check N is equal.
            assert rep_pt.size()[1] == pts.size()[1] == fts.size()[1]  # Check P is equal.
            assert pts.size()[2] == fts.size()[2] == self.num_neighbors  # Check K is equal.
            assert fts.size()[3] == self.feature_dim  # Check C_in is equal.
            assert rep_pt.size()[0] == pts.size()[0]  # Check N is equal.
            assert rep_pt.size()[1] == pts.size()[1]  # Check P is equal.
            assert pts.size()[2] == self.num_neighbors  # Check K is equal.
        assert rep_pt.size()[2] == pts.size()[3] == self.input_shape[-1]  # Check dims is equal.

        N = len(pts)
        P = rep_pt.size()[1]  # (N, P, K, dims)
        p_center = T.unsqueeze(rep_pt, dim=2)  # (N, P, 1, dims)

        # Move pts to local coordinate system of rep_pt.
        pts_local = pts - p_center  # (N, P, K, dims)

        # Individually lift each point into C_mid space.
        fts_lifted = self.fcs(pts_local)  # (N, P, K, C_mid)

        if fts is None:
            fts_cat = fts_lifted
            fts_cat =, fts), -1)  # (N, P, K, C_mid + C_in)

        # Learn the (N, K, K) X-transformation matrix.
        X_shape = (N, P, self.num_neighbors, self.num_neighbors)
        X = self.x_trans(pts_local)
        X = X.view(*X_shape)

        # Weight and permute fts_cat with the learned X.
        fts_X = T.matmul(X, fts_cat)
        fts_p = self.end_conv(fts_X).squeeze(dim=2)
        return fts_p

    def output_shape(self):
        return self.input_shape[:2] + (self.out_channels,)

    def extra_repr(self):
        s = ('{input_shape}, feature_dim={feature_dim}, out_channels={out_channels}, out_features={out_features}, '
             'num_neighbors={num_neighbors}, depth_mul={depth_mul}, dropout={dropout}, bn={bn}')

        s = s.format(**self.__dict__)
        s += ', activation={}'.format(self.activation.__name__)
        return s

[docs]class GraphConv(FC): """ Performs graph convolution as described in Adapted from Parameters ---------- input_shape shape of the input tensor. If an integer is passed, it is treated as the size of each input sample. out_features : int size of each output sample. activation non-linear function to activate the linear result. It accepts any callable function as well as a recognizable ``str``. A list of possible ``str`` is in :func:`~neuralnet_pytorch.utils.function`. kwargs extra keyword arguments to pass to activation. """ def __init__(self, input_shape, out_features, bias=True, activation=None, **kwargs): super().__init__(input_shape, out_features, bias, activation=activation, **kwargs) def reset_parameters(self): if self.weights_init is None: stdv = 1. / np.sqrt(self.weight.size(1)), stdv) else: self.weights_init(self.weight) if self.bias is not None: if self.bias_init is None: stdv = 1. / np.sqrt(self.weight.size(1)), stdv) else: self.bias_init(self.bias) def forward(self, input, adj, *args, **kwargs): support =, self.weight.t()) output =, support) if self.bias is not None: output = output + self.bias if self.activation is not None: output = self.activation(output) return output
[docs]class BatchGraphConv(GraphConv): """ Performs graph convolution as described in on a batch of graphs. Adapted from Parameters ---------- input_shape shape of the input tensor. The first dim should be batch size. If an integer is passed, it is treated as the size of each input sample. out_features : int size of each output sample. activation non-linear function to activate the linear result. It accepts any callable function as well as a recognizable ``str``. A list of possible ``str`` is in :func:`~neuralnet_pytorch.utils.function`. kwargs extra keyword arguments to pass to activation. """ def __init__(self, input_shape, out_features, bias=True, activation=None, **kwargs): input_shape = _pointset_shape(input_shape) super().__init__(input_shape, out_features, bias, activation=activation, **kwargs) def forward(self, input, adj, *args, **kwargs): """ Performs graphs convolution. :param input: a ``list``/``tuple`` of 2D matrices. :param adj: a block diagonal matrix whose diagonal consists of adjacency matrices of the input graphs. :return: a batch of embedded graphs. """ shapes = [input_.shape[0] for input_ in input] X =, 0) output = super().forward(X, adj) output = T.split(output, shapes) return output
[docs]@utils.add_custom_repr class GraphXConv(Module): """ Performs GraphX Convolution as described here_. **Disclaimer:** I am the first author of the paper. .. _here: Parameters ---------- input_shape shape of the input tensor. The first dim should be batch size. If an integer is passed, it is treated as the size of each input sample. out_features : int size of each output sample. out_instances : int resolution of the output point clouds. If not specified, output will have the same resolution as input. Default: ``None``. rank if specified and smaller than `num_out_points`, the mixing matrix will be broken into a multiplication of two matrices of sizes ``(num_out_points, rank)`` and ``(rank, input_shape[1])``. bias whether to use bias. Default: ``True``. activation non-linear function to activate the linear result. It accepts any callable function as well as a recognizable ``str``. A list of possible ``str`` is in :func:`~neuralnet_pytorch.utils.function`. weights_init a kernel initialization method from :mod:`torch.nn.init`. bias_init a bias initialization method from :mod:`torch.nn.init`. kwargs extra keyword arguments to pass to activation. """ def __init__(self, input_shape, out_features, out_instances=None, rank=None, bias=True, activation=None, weights_init=None, bias_init=None, **kwargs): input_shape = _pointset_shape(input_shape) super().__init__(input_shape=input_shape) self.out_features = out_features self.out_instances = out_instances if out_instances else input_shape[-2] if rank: assert rank <= self.out_instances // 2, 'rank should be smaller than half of num_out_points' self.rank = rank self.activation = utils.function(activation, **kwargs) pattern = list(range(len(input_shape))) pattern[-1], pattern[-2] = pattern[-2], pattern[-1] self.pattern = pattern self.weights_init = weights_init self.bias_init = bias_init self.weight = nn.Parameter(T.Tensor(out_features, input_shape[-1])) if self.rank is None: self.mixing = nn.Parameter(T.Tensor(self.out_instances, input_shape[-2])) else: self.mixing_u = nn.Parameter(T.Tensor(self.rank, input_shape[-2])) self.mixing_v = nn.Parameter(T.Tensor(self.out_instances, self.rank)) if bias: self.bias = nn.Parameter(T.Tensor(out_features)) self.mixing_bias = nn.Parameter(T.Tensor(self.out_instances)) else: self.register_parameter('bias', None) self.register_parameter('mixing_bias', None) self.reset_parameters() def forward(self, input): output = utils.dimshuffle(input, self.pattern) mixing =, self.mixing_u) if self.rank else self.mixing output = T.matmul(output, mixing.t()) if self.mixing_bias is not None: output = output + self.mixing_bias output = utils.dimshuffle(output, self.pattern) output = T.matmul(output, self.weight.t()) if self.bias is not None: output = output + self.bias return self.activation(output) @property @utils.validate def output_shape(self): return self.input_shape[:-2] + (self.out_instances, self.out_features) def reset_parameters(self): super().reset_parameters() weights_init = partial(nn.init.kaiming_uniform_, a=np.sqrt(5)) if self.weights_init is None \ else self.weights_init weights_init(self.weight) if self.rank: weights_init(self.mixing_u) weights_init(self.mixing_v) else: weights_init(self.mixing) if self.bias is not None: if self.bias_init is None: fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.weight) bound = 1 / np.sqrt(fan_in) bias_init = partial(nn.init.uniform_, a=-bound, b=bound) else: bias_init = self.bias_init bias_init(self.bias) nn.init.zeros_(self.mixing_bias) def extra_repr(self): s = '{input_shape}, out_features={out_features}, out_instances={out_instances}, rank={rank}' s = s.format(**self.__dict__) s += ', activation={}'.format(self.activation.__name__) return s