# Original source: https://github.com/mvlearn/mvlearn
# License: MIT
import numpy as np
from scipy.stats import ortho_group
from sklearn.utils import check_random_state
[docs]
def make_gaussian_mixture(
centers,
covariances,
n_samples=100,
transform="linear",
noise=None,
noise_dims=None,
class_probs=None,
random_state=None,
shuffle=False,
return_latents=False,
add_latent_noise=False,
):
r"""Two-view Gaussian mixture model dataset generator.
This creates a two-view dataset from a Gaussian mixture model and
a (possibly nonlinear) transformation.
Parameters
----------
centers : 1D array-like or list of 1D array-likes
The mean(s) of the Gaussian(s) from which the latent
points are sampled. If is a list of 1D array-likes, each is the
mean of a distinct Gaussian, sampled from with
probability given by `class_probs`. Otherwise is the mean of a
single Gaussian from which all are sampled.
covariances : 2D array-like or list of 2D array-likes
The covariance matrix(s) of the Gaussian(s), matched
to the specified centers.
n_samples : int
The number of points in each view, divided across Gaussians per
`class_probs`.
transform : 'linear' | 'sin' | 'poly' | callable, (default 'linear')
Transformation to perform on the latent variable. If a function,
applies it to the latent. Otherwise uses an implemented function.
noise : float or None (default=None)
Variance of mean zero Gaussian noise added to the first view.
noise_dims : int or None (default=None)
Number of additional dimensions of standard normal noise to add.
class_probs : array-like, default=None
A list of probabilities specifying the probability of a latent
point being sampled from each of the Gaussians. Must sum to 1. If
None, then is taken to be uniform over the Gaussians.
random_state : int, default=None
If set, can be used to reproduce the data generated.
shuffle : bool, default=False
If ``True``, data is shuffled so the labels are not ordered.
return_latents : bool (default False)
If true, returns the non-noisy latent variables.
add_latent_noise : bool (default False)
If true, adds noise to the latent variables before applying the
transformation.
Returns
-------
Xs : list of np.ndarray, of shape (n_samples, n_features)
The latent data and its noisy transformation.
y : np.ndarray, shape (n_samples,)
The integer labels for each sample's Gaussian membership.
latents : np.ndarray, shape (n_samples, n_features)
The non-noisy latent variables. Only returned if
``return_latents=True``.
Notes
-----
For each class :math:`i` with prior probability :math:`p_i`,
center and covariance matrix :math:`\mu_i` and :math:`\Sigma_i`, and
:math:`n` total samples, the latent data is sampled such that:
.. math::
(X_1, y_1), \dots, (X_{np_i}, Y_{np_i}) \overset{i.i.d.}{\sim}
\mathcal{N}(\mu_i, \Sigma_i)
Two views of data are returned, the first being the latent samples and
the second being a specified transformation of the latent samples.
Additional noise may be added to the first view or added as noise
dimensions to both views.
Examples
--------
>>> from sktree.datasets.multiview import make_gaussian_mixture
>>> import numpy as np
>>> n_samples = 10
>>> centers = [[0,1], [0,-1]]
>>> covariances = [np.eye(2), np.eye(2)]
>>> Xs, y = make_gaussian_mixture(n_samples, centers, covariances,
... shuffle=True, shuffle_random_state=42)
>>> print(y)
[1. 0. 1. 0. 1. 0. 1. 0. 0. 1.]
"""
centers = np.asarray(centers)
covariances = np.asarray(covariances)
if centers.ndim == 1:
centers = centers[np.newaxis, :]
if covariances.ndim == 2:
covariances = covariances[np.newaxis, :]
if not centers.ndim == 2:
msg = "centers is of the incorrect shape"
raise ValueError(msg)
if not covariances.ndim == 3:
msg = "covariance matrix is of the incorrect shape"
raise ValueError(msg)
if centers.shape[0] != covariances.shape[0]:
msg = "The first dimensions of 2D centers and 3D covariances" + " must be equal"
raise ValueError(msg)
if class_probs is None:
class_probs = np.ones(centers.shape[0])
class_probs /= centers.shape[0]
elif sum(class_probs) != 1.0:
msg = "elements of `class_probs` must sum to 1"
raise ValueError(msg)
if len(centers) != len(class_probs) or len(covariances) != len(class_probs):
msg = "centers, covariances, and class_probs must be of equal length"
raise ValueError(msg)
rng = np.random.default_rng(seed=random_state)
latent = np.concatenate(
[
rng.multivariate_normal(
centers[i],
covariances[i],
size=int(class_probs[i] * n_samples),
)
for i in range(len(class_probs))
]
)
y = np.concatenate(
[i * np.ones(int(class_probs[i] * n_samples)) for i in range(len(class_probs))]
)
if add_latent_noise:
latent += rng.standard_normal(size=latent.shape) * 0.1
# shuffle latent samples and labels
if shuffle:
indices = np.arange(latent.shape[0]).squeeze()
rng.shuffle(indices)
latent = latent[indices, :]
y = y[indices]
if callable(transform):
X = np.asarray([transform(x) for x in latent])
elif not isinstance(transform, str):
raise TypeError(
"'transform' must be of type string or a callable function," + f"not {type(transform)}"
)
elif transform == "linear":
X = _linear2view(latent, rng)
elif transform == "poly":
X = _poly2view(latent)
elif transform == "sin":
X = _sin2view(latent)
else:
raise ValueError(
"Transform type must be one of {'linear', 'poly', 'sin'}"
+ f" or a callable function. Not {transform}"
)
if noise is not None:
Xs = [latent + np.sqrt(noise) * rng.standard_normal(size=latent.shape), X]
else:
Xs = [latent, X]
# if random_state is not None, make sure both views are independent
# but reproducible
if noise_dims is not None:
Xs = [_add_noise(X, noise_dims, rng) for X in Xs]
if return_latents:
return Xs, y, latent
else:
return Xs, y
def _add_noise(X, n_noise, rng):
"""Appends dimensions of standard normal noise to X"""
noise_vars = rng.standard_normal(size=(X.shape[0], n_noise))
return np.hstack((X, noise_vars))
def _linear2view(X, rng):
"""Rotates the data, a linear transformation"""
if X.shape[1] == 1:
X = -X
else:
X = X @ ortho_group.rvs(X.shape[1], random_state=rng)
return X
def _poly2view(X):
"""Applies a degree 2 polynomial transform to the data"""
X = np.asarray([np.power(x, 2) for x in X])
return X
def _sin2view(X):
"""Applies a sinusoidal transformation to the data"""
X = np.asarray([np.sin(x) for x in X])
return X
def _rand_orthog(n, K, random_state=None):
"""
Samples a random orthonormal matrix.
Parameters
----------
n : int, positive
Number of rows in the matrix
K : int, positive
Number of columns in the matrix
random_state : None | int | instance of RandomState, optional
Seed to set randomization for reproducible results
Returns
-------
A: array-like, (n, K)
A random, column orthonormal matrix.
Notes
-----
See Section A.1.1 of :footcite:`perry2009crossvalidation`.
References
----------
.. footbibliography::
"""
rng = check_random_state(random_state)
Z = rng.normal(size=(n, K))
Q, R = np.linalg.qr(Z)
s = np.ones(K)
neg_mask = rng.uniform(size=K) > 0.5
s[neg_mask] = -1
return Q * s
[docs]
def make_joint_factor_model(
n_views,
n_features,
n_samples=100,
joint_rank=1,
noise_std=1,
m=1.5,
random_state=None,
return_decomp=False,
):
"""Joint factor model data generator.
Samples from a low rank, joint factor model where there is one set of
shared scores.
Parameters
----------
n_views : int
Number of views to sample. This corresponds to ``B`` in the notes.
n_features : int, or list of int
Number of features in each view. A list specifies a different number
of features for each view.
n_samples : int
Number of samples in each view
joint_rank : int (default 1)
Rank of the common signal across views.
noise_std : float (default 1)
Scale of noise distribution.
m : float (default 1.5)
Signal strength.
random_state : int or RandomState instance, optional (default=None)
Controls random orthonormal matrix sampling and random noise
generation. Set for reproducible results.
return_decomp : bool, default=False
If ``True``, returns the ``view_loadings`` as well.
Returns
-------
Xs : list of array-likes
List of samples data matrices with the following attributes.
- Xs length: n_views
- Xs[i] shape: (n_samples, n_features_i).
U: (n_samples, joint_rank)
The true orthonormal joint scores matrix. Returned if
``return_decomp`` is True.
view_loadings: list of numpy.ndarray
The true view loadings matrices. Returned if
``return_decomp`` is True.
Notes
-----
The data is generated as follows, where:
- :math:`b` are the different views
- :math:`U` is is a (n_samples, joint_rank) matrix of rotation matrices.
- ``svals`` are the singular values sampled.
- :math:`W_b` are (n_features_b, joint_rank) view loadings matrices, which are
orthonormal matrices to linearly transform the data, while preserving inner
products (i.e. a unitary transformation).
For b = 1, .., B
X_b = U @ diag(svals) @ W_b^T + noise_std * E_b
where U and each W_b are orthonormal matrices. The singular values are
linearly increasing following :footcite:`choi2017selectingpca` section 2.2.3.
References
----------
.. footbibliography::
"""
rng = check_random_state(random_state)
if isinstance(n_features, int):
n_features = [n_features] * n_views
# generate W_b orthonormal matrices
view_loadings = [_rand_orthog(d, joint_rank, random_state=rng) for d in n_features]
# sample monotonically increasing singular values
# the signal increases linearly and ``m`` determines the strength of the signal
svals = np.arange(1, 1 + joint_rank).astype(float)
svals *= m * noise_std * (n_samples * max(n_features)) ** (1 / 4)
# rotation operators that are generated via standard random normal
U = rng.standard_normal(size=(n_samples, joint_rank))
U = np.linalg.qr(U)[0]
# random noise for each view
Es = [noise_std * rng.standard_normal(size=(n_samples, d)) for d in n_features]
Xs = [(U * svals) @ view_loadings[b].T + Es[b] for b in range(n_views)]
if return_decomp:
return Xs, U, view_loadings
else:
return Xs