association_eval_varclus
Expand source code
from pyspark.ml.feature import VectorAssembler, StandardScaler from pyspark.mllib.linalg.distributed import RowMatrix, DenseMatrix import math import collections from factor_analyzer import Rotator import pandas as pd import numpy as np import random class VarClusHiSpark(object): """ This class is a scalable version of [VarClusHi] [2] library to perform variable clustering on PySpark dataframes with necessary optimizations, and sampling is not required. [2]: https://github.com/jingtt/varclushi "VarCluShi" Variable Clustering groups attributes that are as correlated as possible among themselves within a cluster and as uncorrelated as possible with attribute in other clusters. By default, it begins with all variables in a single cluster. It then repeats the following steps: 1. A cluster is chosen for splitting. This cluster has the largest eigenvalue associated with the 2nd PC 2. Find the first 2 PCs of the chosen cluster and split into 2 clusters, then perform an orthoblique rotation (raw quartimax rotation on the eigenvectors) 3. Variables are iteratively re-assigned to clusters to maximize the variance: (a) Nearest Component Sorting (NCS) Phase: In each iteration, the cluster components are computed. Each variable is assigned to the rotated component with which it has the highest squared correlation (b) Search Phase: Assign each variable into different cluster to see if this increase the variance explained. If variable is re-assigned here, the components of the clusters involved are recomputed before next variable is tested The procedure stops when the 2nd eigenvalue of each cluster is smaller than maxeigval2 parameter. """ def __init__(self, df, feat_list=None, maxeigval2=1, maxclus=None, n_rs=0): """ Parameters ---------- df PySpark Dataframe feat_list List of features to perform variable clustering e.g., ["col1","col2"]. If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None) maxeigval2 Maximum value of second-largest eigenvalue e.g., 1 (Default value = 1) maxclus Maximum number of clusters e.g., 20 If maxclus is None, there will be no restrictions on total number of clusters. If maxclus is specified, the algorithm will stop splitting data when number of clusters reaches maxclus. (Default value = None) n_rs Number of random shuffles e.g., 0 This parameter controls the number of random shuffle iterations after re-assignment. If n_rs is 0, re-assignment of each feature will be conducted with no extra random-shuffling. If n_rs is not 0, extra random shuffling of the features and re-assignment will be conducted. (Default value = 0) """ if feat_list is None: self.df = df self.feat_list = df.columns else: self.df = df.select(feat_list) self.feat_list = feat_list self.maxeigval2 = maxeigval2 self.maxclus = maxclus self.n_rs = n_rs if len(self.feat_list) <= 1: all_corr = [len(self.feat_list)] else: vecAssembler = VectorAssembler( inputCols=self.feat_list, outputCol="features" ) stream_df = vecAssembler.transform(df) scaler = StandardScaler( inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True, ) scalerModel = scaler.fit(stream_df) scaled_df = scalerModel.transform(stream_df) rm = RowMatrix(scaled_df.select("scaledFeatures").rdd.map(list)) all_corr = rm.computeCovariance().toArray() self.corr_df = pd.DataFrame( all_corr, columns=self.feat_list, index=self.feat_list ) def correig(self, df, feat_list=None, n_pcs=2): """ This function find the correlation matrix between feat_list, calculates the top n_pcs eigenvalues, eigenvectors, and gets the variance-proportion. Parameters ---------- df Spark dataframe feat_list list of features columns e.g. ["col1", "col2"] If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None) n_pcs number of PCs e.g. 2 This parameter controls the size of Principal Components. e.g. If n_pcs=2, only the first 2 eigenvalues, eigenvectors of the correlation matrix will be extracted. (Default value = 2) Returns ------- (top n_pcs) eigenvalues, associated eigenvectors, correlation matrix in Pandas dataframe and variance proportions eigvals Top n_pcs eigenvalues in a Numpy array e.g. [eigval1, eigval2] eigvecs The associated eigenvectors of the top n_pcs eigenvalues in a Numpy array e.g. [eigvec1, eigvec2] Each eigenvector is an array of size D, where D represents the number of columns of df[feat_list] corr_df Correlation matrix of stored in Pandas dataframe The size of this output is DxD where D represents the number of columns of df[feat_list]. The value at index i and column j indicates the pearson's correlation score between feat_list[i] and feat_list[j] varprops The variance proportion of the top n_pcs eigenvalues in a Numpy array e.g. [varprop1, varprop2] The order of this array is the same with eigvals, eigvecs, with the first value varprop1 indicating the variance proportion of the largest eigenvalue. """ if feat_list is None: feat_list = df.columns if len(feat_list) <= 1: corr = [len(feat_list)] eigvals = [len(feat_list)] + [0] * (n_pcs - 1) eigvecs = np.array([[len(feat_list)]]) varprops = [sum(eigvals)] else: corr = self.corr_df.loc[feat_list, feat_list].values raw_eigvals, raw_eigvecs = np.linalg.eigh(corr) idx = np.argsort(raw_eigvals)[::-1] eigvals, eigvecs = raw_eigvals[idx], raw_eigvecs[:, idx] eigvals, eigvecs = eigvals[:n_pcs], eigvecs[:, :n_pcs] varprops = eigvals / sum(raw_eigvals) corr_df = pd.DataFrame(corr, columns=feat_list, index=feat_list) return eigvals, eigvecs, corr_df, varprops def _calc_tot_var(self, df, *clusters): """ This function calculates the total variance explained of given clusters Parameters ---------- df Spark dataframe clusters list of clusters e.g. [clus1, clus2] Each cluster is a list of feature columns which are classified into this cluster. e.g. clus1 = ["col1", "col2"] Returns ------- tot_var, tot_prop tot_var: sum of the first eigenvalues among all clusters passed in. e.g. 0.5 tot_prop: weighted average of the "variance for 1st PC" for each cluster passed in. e.g. 0.4 """ tot_len, tot_var, tot_prop = (0,) * 3 for clus in clusters: if clus == []: continue c_len = len(clus) c_eigvals, _, _, c_varprops = self.correig(df.select(clus)) tot_var += c_eigvals[0] tot_prop = (tot_prop * tot_len + c_varprops[0] * c_len) / (tot_len + c_len) tot_len += c_len return tot_var, tot_prop def _reassign(self, df, clus1, clus2, feat_list=None): """ This function performs the re-assignment of each variable into different cluster. If the re-assignment could increase the variance explained, the components of the clusters involved are recomputed before next variable is tested. Parameters ---------- df Spark dataframe clus1 List of feature columns in first cluster e.g. ["col1", "col2"] clus2 List of feature columns in second cluster e.g. ["col3", "col4"] feat_list List of features to re-assign e.g. ["feat1", "feat2"] If feat_list is None, all features inside clus1 and clus2 will be re-assigned If feat_list is specified, it should only contain columns in clus1 and/or clus2, and only the specified columns will be re-assigned. (Default value = None) Returns ------- fin_clus1, fin_clus2, max_var fin_clus1: final cluster 1's list of feature columns e.g. ["col1", "col2"] fin_clus2: final cluster 2's list of feature columns e.g. ["col3", "col4"] max_var: the maximum variance achieved e.g. 0.9 """ if feat_list is None: feat_list = clus1 + clus2 init_var = self._calc_tot_var(df, clus1, clus2)[0] fin_clus1, fin_clus2 = clus1[:], clus2[:] check_var, max_var = (init_var,) * 2 while True: for feat in feat_list: new_clus1, new_clus2 = fin_clus1[:], fin_clus2[:] if feat in new_clus1: new_clus1.remove(feat) new_clus2.append(feat) elif feat in new_clus2: new_clus1.append(feat) new_clus2.remove(feat) else: continue new_var = self._calc_tot_var(df, new_clus1, new_clus2)[0] if new_var > check_var: check_var = new_var fin_clus1, fin_clus2 = new_clus1[:], new_clus2[:] if max_var == check_var: break else: max_var = check_var return fin_clus1, fin_clus2, max_var def _reassign_rs(self, df, clus1, clus2, n_rs=0): """ This function should be called after _reassign, and it performs random shuffling of n_rs times to run the re-assignment Parameters ---------- df Spark dataframe clus1 List of feature columns in first cluster e.g. ["col1", "col2"] clus2 List of feature columns in second cluster e.g. ["col3", "col4"] n_rs Number of random shuffles e.g. 2 If n_rs is 0, random shuffling of the features after re-assignment will not be conducted. If n_rs is >0, random shuffling of n_rs times will be conducted to perform re-assignment. (Default value = 0) Returns ------- fin_rs_clus1, fin_rs_clus2, max_rs_var fin_rs_clus1: final cluster 1's list of feature columns after random shuffling e.g. ["col1", "col2"] fin_rs_clus2: final cluster 2's list of feature columns after random shuffling e.g. ["col3", "col4"] max_rs_var: the maximum variance achieved after random shuffling e.g. 0.9 """ feat_list = clus1 + clus2 fin_rs_clus1, fin_rs_clus2, max_rs_var = self._reassign(df, clus1, clus2) for _ in range(n_rs): random.shuffle(feat_list) rs_clus1, rs_clus2, rs_var = self._reassign(df, clus1, clus2, feat_list) if rs_var > max_rs_var: max_rs_var = rs_var fin_rs_clus1, fin_rs_clus2 = rs_clus1, rs_clus2 return fin_rs_clus1, fin_rs_clus2, max_rs_var def _varclusspark(self, spark): """ This function is the main function which performs variable clustering. By default, it begins with all variables in a single cluster. It then repeats the following steps: 1. A cluster is chosen for splitting. This cluster has the largest eigenvalue associated with the 2nd PC 2. Find the first 2 PCs of the chosen cluster and split into 2 clusters, then perform an orthoblique rotation (raw quartimax rotation on the eigenvectors) 3. Variables are iteratively re-assigned to clusters to maximize the variance (a) Nearest Component Sorting (NCS) Phase: In each iteration, the cluster components are computed. Each variable is assigned to the rotated component with which it has the highest squared correlation (b) Search Phase: Assign each variable into different cluster to see if this increase the variance explained. If variable is re-assigned here, the components of the clusters involved are recomputed before next variable is tested The procedure stops when the 2nd eigenvalue of each cluster is smaller than maxeigval2 parameter. Parameters ---------- spark Spark Session Returns ------- """ """""" ClusInfo = collections.namedtuple( "ClusInfo", ["clus", "eigval1", "eigval2", "eigvecs", "varprop"] ) c_eigvals, c_eigvecs, c_corrs, c_varprops = self.correig( self.df.select(self.feat_list) ) self.corrs = c_corrs clus0 = ClusInfo( clus=self.feat_list, eigval1=c_eigvals[0], eigval2=c_eigvals[1], eigvecs=c_eigvecs, varprop=c_varprops[0], ) self.clusters = collections.OrderedDict([(0, clus0)]) while True: if self.maxclus is not None and len(self.clusters) >= self.maxclus: break idx = max(self.clusters, key=lambda x: self.clusters.get(x).eigval2) if self.clusters[idx].eigval2 > self.maxeigval2: split_clus = self.clusters[idx].clus c_eigvals, c_eigvecs, split_corrs, _ = self.correig( self.df.select(split_clus) ) else: break if c_eigvals[1] > self.maxeigval2: clus1, clus2 = [], [] rotator = Rotator(method="quartimax") r_eigvecs = rotator.fit_transform(pd.DataFrame(c_eigvecs)) num_rows, num_cols = r_eigvecs.shape r_eigvecs_dm = DenseMatrix( num_rows, num_cols, r_eigvecs.ravel(order="F") ) r_eigvecs_rm = RowMatrix(spark.sparkContext.parallelize(r_eigvecs.T)) dm = DenseMatrix(num_rows, num_rows, split_corrs.values.ravel()) comb_sigmas = r_eigvecs_rm.multiply(dm).multiply(r_eigvecs_dm) comb_sigmas = np.sqrt( np.diag(comb_sigmas.rows.map(lambda row: np.array(row)).collect()) ) comb_sigma1 = comb_sigmas[0] comb_sigma2 = comb_sigmas[1] for feat in split_clus: comb_cov1 = np.dot(r_eigvecs[:, 0], split_corrs[feat].values.T) comb_cov2 = np.dot(r_eigvecs[:, 1], split_corrs[feat].values.T) corr_pc1 = comb_cov1 / comb_sigma1 corr_pc2 = comb_cov2 / comb_sigma2 if abs(corr_pc1) > abs(corr_pc2): clus1.append(feat) else: clus2.append(feat) fin_clus1, fin_clus2, _ = self._reassign_rs( self.df, clus1, clus2, self.n_rs ) c1_eigvals, c1_eigvecs, _, c1_varprops = self.correig( self.df.select(fin_clus1) ) c2_eigvals, c2_eigvecs, _, c2_varprops = self.correig( self.df.select(fin_clus2) ) self.clusters[idx] = ClusInfo( clus=fin_clus1, eigval1=c1_eigvals[0], eigval2=c1_eigvals[1], eigvecs=c1_eigvecs, varprop=c1_varprops[0], ) self.clusters[len(self.clusters)] = ClusInfo( clus=fin_clus2, eigval1=c2_eigvals[0], eigval2=c2_eigvals[1], eigvecs=c2_eigvecs, varprop=c2_varprops[0], ) else: break return self def _rsquarespark(self): """ After variable clustering is done, this function calculates the square correlation of each feature with (1) its own cluster (2) the "nearest cluster" (3) RS-ratio using own cluster and "nearest clutser" RS_Own: Squared correlation between variable and its own cluster RS_NC: The largest squared correlation among correlations between variable and all other clusters RS_Ratio: (1-RS_Own)/(1-RS_NC) Returns ------- rs_table Pandas dataframe [Cluster, Variable, RS_Own, RS_NC, RS_Ratio] Cluster: integer-type column starting from 0 to maximum number of clusters Variable: string-type column. Each row represents a feature name RS_Own: float-type column indicating the squared correlation between Variable and its Cluster RS_NC: float-type column indicating the squared correlation between Variable and its "nearest cluster" RS_Ratio: float-type column (1-RS_Own)/(1-RS_NC) """ cols = ["Cluster", "Variable", "RS_Own", "RS_NC", "RS_Ratio"] rs_table = pd.DataFrame(columns=cols) sigmas = [] for _, clusinfo in self.clusters.items(): c_eigvec = clusinfo.eigvecs[:, 0] c_sigma = math.sqrt( np.dot( np.dot( c_eigvec, self.corr_df.loc[clusinfo.clus, clusinfo.clus].values ), c_eigvec.T, ) ) sigmas.append(c_sigma) n_row = 0 for i, clus_own in self.clusters.items(): for feat in clus_own.clus: row = [i, feat] cov_own = np.dot( clus_own.eigvecs[:, 0], self.corr_df.loc[feat, clus_own.clus].values.T, ) if len(clus_own.clus) == 1 and feat == clus_own.clus[0]: rs_own = 1 else: rs_own = (cov_own / sigmas[i]) ** 2 rs_others = [] for j, clus_other in self.clusters.items(): if j == i: continue cov_other = np.dot( clus_other.eigvecs[:, 0], self.corr_df.loc[feat, clus_other.clus].values.T, ) rs = (cov_other / sigmas[j]) ** 2 rs_others.append(rs) rs_nc = max(rs_others) if len(rs_others) > 0 else 0 row += [rs_own, rs_nc, (1 - rs_own) / (1 - rs_nc)] rs_table.loc[n_row] = row n_row += 1 return rs_table
Classes
class VarClusHiSpark (df, feat_list=None, maxeigval2=1, maxclus=None, n_rs=0)
-
This class is a scalable version of VarClusHi library to perform variable clustering on PySpark dataframes with necessary optimizations, and sampling is not required.
Variable Clustering groups attributes that are as correlated as possible among themselves within a cluster and as uncorrelated as possible with attribute in other clusters. By default, it begins with all variables in a single cluster. It then repeats the following steps: 1. A cluster is chosen for splitting. This cluster has the largest eigenvalue associated with the 2nd PC 2. Find the first 2 PCs of the chosen cluster and split into 2 clusters, then perform an orthoblique rotation (raw quartimax rotation on the eigenvectors) 3. Variables are iteratively re-assigned to clusters to maximize the variance: (a) Nearest Component Sorting (NCS) Phase: In each iteration, the cluster components are computed. Each variable is assigned to the rotated component with which it has the highest squared correlation (b) Search Phase: Assign each variable into different cluster to see if this increase the variance explained. If variable is re-assigned here, the components of the clusters involved are recomputed before next variable is tested The procedure stops when the 2nd eigenvalue of each cluster is smaller than maxeigval2 parameter.
Parameters
df
- PySpark Dataframe
feat_list
- List of features to perform variable clustering e.g., ["col1","col2"]. If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None)
maxeigval2
- Maximum value of second-largest eigenvalue e.g., 1 (Default value = 1)
maxclus
- Maximum number of clusters e.g., 20 If maxclus is None, there will be no restrictions on total number of clusters. If maxclus is specified, the algorithm will stop splitting data when number of clusters reaches maxclus. (Default value = None)
n_rs
- Number of random shuffles e.g., 0 This parameter controls the number of random shuffle iterations after re-assignment. If n_rs is 0, re-assignment of each feature will be conducted with no extra random-shuffling. If n_rs is not 0, extra random shuffling of the features and re-assignment will be conducted. (Default value = 0)
Expand source code
class VarClusHiSpark(object): """ This class is a scalable version of [VarClusHi] [2] library to perform variable clustering on PySpark dataframes with necessary optimizations, and sampling is not required. [2]: https://github.com/jingtt/varclushi "VarCluShi" Variable Clustering groups attributes that are as correlated as possible among themselves within a cluster and as uncorrelated as possible with attribute in other clusters. By default, it begins with all variables in a single cluster. It then repeats the following steps: 1. A cluster is chosen for splitting. This cluster has the largest eigenvalue associated with the 2nd PC 2. Find the first 2 PCs of the chosen cluster and split into 2 clusters, then perform an orthoblique rotation (raw quartimax rotation on the eigenvectors) 3. Variables are iteratively re-assigned to clusters to maximize the variance: (a) Nearest Component Sorting (NCS) Phase: In each iteration, the cluster components are computed. Each variable is assigned to the rotated component with which it has the highest squared correlation (b) Search Phase: Assign each variable into different cluster to see if this increase the variance explained. If variable is re-assigned here, the components of the clusters involved are recomputed before next variable is tested The procedure stops when the 2nd eigenvalue of each cluster is smaller than maxeigval2 parameter. """ def __init__(self, df, feat_list=None, maxeigval2=1, maxclus=None, n_rs=0): """ Parameters ---------- df PySpark Dataframe feat_list List of features to perform variable clustering e.g., ["col1","col2"]. If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None) maxeigval2 Maximum value of second-largest eigenvalue e.g., 1 (Default value = 1) maxclus Maximum number of clusters e.g., 20 If maxclus is None, there will be no restrictions on total number of clusters. If maxclus is specified, the algorithm will stop splitting data when number of clusters reaches maxclus. (Default value = None) n_rs Number of random shuffles e.g., 0 This parameter controls the number of random shuffle iterations after re-assignment. If n_rs is 0, re-assignment of each feature will be conducted with no extra random-shuffling. If n_rs is not 0, extra random shuffling of the features and re-assignment will be conducted. (Default value = 0) """ if feat_list is None: self.df = df self.feat_list = df.columns else: self.df = df.select(feat_list) self.feat_list = feat_list self.maxeigval2 = maxeigval2 self.maxclus = maxclus self.n_rs = n_rs if len(self.feat_list) <= 1: all_corr = [len(self.feat_list)] else: vecAssembler = VectorAssembler( inputCols=self.feat_list, outputCol="features" ) stream_df = vecAssembler.transform(df) scaler = StandardScaler( inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True, ) scalerModel = scaler.fit(stream_df) scaled_df = scalerModel.transform(stream_df) rm = RowMatrix(scaled_df.select("scaledFeatures").rdd.map(list)) all_corr = rm.computeCovariance().toArray() self.corr_df = pd.DataFrame( all_corr, columns=self.feat_list, index=self.feat_list ) def correig(self, df, feat_list=None, n_pcs=2): """ This function find the correlation matrix between feat_list, calculates the top n_pcs eigenvalues, eigenvectors, and gets the variance-proportion. Parameters ---------- df Spark dataframe feat_list list of features columns e.g. ["col1", "col2"] If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None) n_pcs number of PCs e.g. 2 This parameter controls the size of Principal Components. e.g. If n_pcs=2, only the first 2 eigenvalues, eigenvectors of the correlation matrix will be extracted. (Default value = 2) Returns ------- (top n_pcs) eigenvalues, associated eigenvectors, correlation matrix in Pandas dataframe and variance proportions eigvals Top n_pcs eigenvalues in a Numpy array e.g. [eigval1, eigval2] eigvecs The associated eigenvectors of the top n_pcs eigenvalues in a Numpy array e.g. [eigvec1, eigvec2] Each eigenvector is an array of size D, where D represents the number of columns of df[feat_list] corr_df Correlation matrix of stored in Pandas dataframe The size of this output is DxD where D represents the number of columns of df[feat_list]. The value at index i and column j indicates the pearson's correlation score between feat_list[i] and feat_list[j] varprops The variance proportion of the top n_pcs eigenvalues in a Numpy array e.g. [varprop1, varprop2] The order of this array is the same with eigvals, eigvecs, with the first value varprop1 indicating the variance proportion of the largest eigenvalue. """ if feat_list is None: feat_list = df.columns if len(feat_list) <= 1: corr = [len(feat_list)] eigvals = [len(feat_list)] + [0] * (n_pcs - 1) eigvecs = np.array([[len(feat_list)]]) varprops = [sum(eigvals)] else: corr = self.corr_df.loc[feat_list, feat_list].values raw_eigvals, raw_eigvecs = np.linalg.eigh(corr) idx = np.argsort(raw_eigvals)[::-1] eigvals, eigvecs = raw_eigvals[idx], raw_eigvecs[:, idx] eigvals, eigvecs = eigvals[:n_pcs], eigvecs[:, :n_pcs] varprops = eigvals / sum(raw_eigvals) corr_df = pd.DataFrame(corr, columns=feat_list, index=feat_list) return eigvals, eigvecs, corr_df, varprops def _calc_tot_var(self, df, *clusters): """ This function calculates the total variance explained of given clusters Parameters ---------- df Spark dataframe clusters list of clusters e.g. [clus1, clus2] Each cluster is a list of feature columns which are classified into this cluster. e.g. clus1 = ["col1", "col2"] Returns ------- tot_var, tot_prop tot_var: sum of the first eigenvalues among all clusters passed in. e.g. 0.5 tot_prop: weighted average of the "variance for 1st PC" for each cluster passed in. e.g. 0.4 """ tot_len, tot_var, tot_prop = (0,) * 3 for clus in clusters: if clus == []: continue c_len = len(clus) c_eigvals, _, _, c_varprops = self.correig(df.select(clus)) tot_var += c_eigvals[0] tot_prop = (tot_prop * tot_len + c_varprops[0] * c_len) / (tot_len + c_len) tot_len += c_len return tot_var, tot_prop def _reassign(self, df, clus1, clus2, feat_list=None): """ This function performs the re-assignment of each variable into different cluster. If the re-assignment could increase the variance explained, the components of the clusters involved are recomputed before next variable is tested. Parameters ---------- df Spark dataframe clus1 List of feature columns in first cluster e.g. ["col1", "col2"] clus2 List of feature columns in second cluster e.g. ["col3", "col4"] feat_list List of features to re-assign e.g. ["feat1", "feat2"] If feat_list is None, all features inside clus1 and clus2 will be re-assigned If feat_list is specified, it should only contain columns in clus1 and/or clus2, and only the specified columns will be re-assigned. (Default value = None) Returns ------- fin_clus1, fin_clus2, max_var fin_clus1: final cluster 1's list of feature columns e.g. ["col1", "col2"] fin_clus2: final cluster 2's list of feature columns e.g. ["col3", "col4"] max_var: the maximum variance achieved e.g. 0.9 """ if feat_list is None: feat_list = clus1 + clus2 init_var = self._calc_tot_var(df, clus1, clus2)[0] fin_clus1, fin_clus2 = clus1[:], clus2[:] check_var, max_var = (init_var,) * 2 while True: for feat in feat_list: new_clus1, new_clus2 = fin_clus1[:], fin_clus2[:] if feat in new_clus1: new_clus1.remove(feat) new_clus2.append(feat) elif feat in new_clus2: new_clus1.append(feat) new_clus2.remove(feat) else: continue new_var = self._calc_tot_var(df, new_clus1, new_clus2)[0] if new_var > check_var: check_var = new_var fin_clus1, fin_clus2 = new_clus1[:], new_clus2[:] if max_var == check_var: break else: max_var = check_var return fin_clus1, fin_clus2, max_var def _reassign_rs(self, df, clus1, clus2, n_rs=0): """ This function should be called after _reassign, and it performs random shuffling of n_rs times to run the re-assignment Parameters ---------- df Spark dataframe clus1 List of feature columns in first cluster e.g. ["col1", "col2"] clus2 List of feature columns in second cluster e.g. ["col3", "col4"] n_rs Number of random shuffles e.g. 2 If n_rs is 0, random shuffling of the features after re-assignment will not be conducted. If n_rs is >0, random shuffling of n_rs times will be conducted to perform re-assignment. (Default value = 0) Returns ------- fin_rs_clus1, fin_rs_clus2, max_rs_var fin_rs_clus1: final cluster 1's list of feature columns after random shuffling e.g. ["col1", "col2"] fin_rs_clus2: final cluster 2's list of feature columns after random shuffling e.g. ["col3", "col4"] max_rs_var: the maximum variance achieved after random shuffling e.g. 0.9 """ feat_list = clus1 + clus2 fin_rs_clus1, fin_rs_clus2, max_rs_var = self._reassign(df, clus1, clus2) for _ in range(n_rs): random.shuffle(feat_list) rs_clus1, rs_clus2, rs_var = self._reassign(df, clus1, clus2, feat_list) if rs_var > max_rs_var: max_rs_var = rs_var fin_rs_clus1, fin_rs_clus2 = rs_clus1, rs_clus2 return fin_rs_clus1, fin_rs_clus2, max_rs_var def _varclusspark(self, spark): """ This function is the main function which performs variable clustering. By default, it begins with all variables in a single cluster. It then repeats the following steps: 1. A cluster is chosen for splitting. This cluster has the largest eigenvalue associated with the 2nd PC 2. Find the first 2 PCs of the chosen cluster and split into 2 clusters, then perform an orthoblique rotation (raw quartimax rotation on the eigenvectors) 3. Variables are iteratively re-assigned to clusters to maximize the variance (a) Nearest Component Sorting (NCS) Phase: In each iteration, the cluster components are computed. Each variable is assigned to the rotated component with which it has the highest squared correlation (b) Search Phase: Assign each variable into different cluster to see if this increase the variance explained. If variable is re-assigned here, the components of the clusters involved are recomputed before next variable is tested The procedure stops when the 2nd eigenvalue of each cluster is smaller than maxeigval2 parameter. Parameters ---------- spark Spark Session Returns ------- """ """""" ClusInfo = collections.namedtuple( "ClusInfo", ["clus", "eigval1", "eigval2", "eigvecs", "varprop"] ) c_eigvals, c_eigvecs, c_corrs, c_varprops = self.correig( self.df.select(self.feat_list) ) self.corrs = c_corrs clus0 = ClusInfo( clus=self.feat_list, eigval1=c_eigvals[0], eigval2=c_eigvals[1], eigvecs=c_eigvecs, varprop=c_varprops[0], ) self.clusters = collections.OrderedDict([(0, clus0)]) while True: if self.maxclus is not None and len(self.clusters) >= self.maxclus: break idx = max(self.clusters, key=lambda x: self.clusters.get(x).eigval2) if self.clusters[idx].eigval2 > self.maxeigval2: split_clus = self.clusters[idx].clus c_eigvals, c_eigvecs, split_corrs, _ = self.correig( self.df.select(split_clus) ) else: break if c_eigvals[1] > self.maxeigval2: clus1, clus2 = [], [] rotator = Rotator(method="quartimax") r_eigvecs = rotator.fit_transform(pd.DataFrame(c_eigvecs)) num_rows, num_cols = r_eigvecs.shape r_eigvecs_dm = DenseMatrix( num_rows, num_cols, r_eigvecs.ravel(order="F") ) r_eigvecs_rm = RowMatrix(spark.sparkContext.parallelize(r_eigvecs.T)) dm = DenseMatrix(num_rows, num_rows, split_corrs.values.ravel()) comb_sigmas = r_eigvecs_rm.multiply(dm).multiply(r_eigvecs_dm) comb_sigmas = np.sqrt( np.diag(comb_sigmas.rows.map(lambda row: np.array(row)).collect()) ) comb_sigma1 = comb_sigmas[0] comb_sigma2 = comb_sigmas[1] for feat in split_clus: comb_cov1 = np.dot(r_eigvecs[:, 0], split_corrs[feat].values.T) comb_cov2 = np.dot(r_eigvecs[:, 1], split_corrs[feat].values.T) corr_pc1 = comb_cov1 / comb_sigma1 corr_pc2 = comb_cov2 / comb_sigma2 if abs(corr_pc1) > abs(corr_pc2): clus1.append(feat) else: clus2.append(feat) fin_clus1, fin_clus2, _ = self._reassign_rs( self.df, clus1, clus2, self.n_rs ) c1_eigvals, c1_eigvecs, _, c1_varprops = self.correig( self.df.select(fin_clus1) ) c2_eigvals, c2_eigvecs, _, c2_varprops = self.correig( self.df.select(fin_clus2) ) self.clusters[idx] = ClusInfo( clus=fin_clus1, eigval1=c1_eigvals[0], eigval2=c1_eigvals[1], eigvecs=c1_eigvecs, varprop=c1_varprops[0], ) self.clusters[len(self.clusters)] = ClusInfo( clus=fin_clus2, eigval1=c2_eigvals[0], eigval2=c2_eigvals[1], eigvecs=c2_eigvecs, varprop=c2_varprops[0], ) else: break return self def _rsquarespark(self): """ After variable clustering is done, this function calculates the square correlation of each feature with (1) its own cluster (2) the "nearest cluster" (3) RS-ratio using own cluster and "nearest clutser" RS_Own: Squared correlation between variable and its own cluster RS_NC: The largest squared correlation among correlations between variable and all other clusters RS_Ratio: (1-RS_Own)/(1-RS_NC) Returns ------- rs_table Pandas dataframe [Cluster, Variable, RS_Own, RS_NC, RS_Ratio] Cluster: integer-type column starting from 0 to maximum number of clusters Variable: string-type column. Each row represents a feature name RS_Own: float-type column indicating the squared correlation between Variable and its Cluster RS_NC: float-type column indicating the squared correlation between Variable and its "nearest cluster" RS_Ratio: float-type column (1-RS_Own)/(1-RS_NC) """ cols = ["Cluster", "Variable", "RS_Own", "RS_NC", "RS_Ratio"] rs_table = pd.DataFrame(columns=cols) sigmas = [] for _, clusinfo in self.clusters.items(): c_eigvec = clusinfo.eigvecs[:, 0] c_sigma = math.sqrt( np.dot( np.dot( c_eigvec, self.corr_df.loc[clusinfo.clus, clusinfo.clus].values ), c_eigvec.T, ) ) sigmas.append(c_sigma) n_row = 0 for i, clus_own in self.clusters.items(): for feat in clus_own.clus: row = [i, feat] cov_own = np.dot( clus_own.eigvecs[:, 0], self.corr_df.loc[feat, clus_own.clus].values.T, ) if len(clus_own.clus) == 1 and feat == clus_own.clus[0]: rs_own = 1 else: rs_own = (cov_own / sigmas[i]) ** 2 rs_others = [] for j, clus_other in self.clusters.items(): if j == i: continue cov_other = np.dot( clus_other.eigvecs[:, 0], self.corr_df.loc[feat, clus_other.clus].values.T, ) rs = (cov_other / sigmas[j]) ** 2 rs_others.append(rs) rs_nc = max(rs_others) if len(rs_others) > 0 else 0 row += [rs_own, rs_nc, (1 - rs_own) / (1 - rs_nc)] rs_table.loc[n_row] = row n_row += 1 return rs_table
Methods
def correig(self, df, feat_list=None, n_pcs=2)
-
This function find the correlation matrix between feat_list, calculates the top n_pcs eigenvalues, eigenvectors, and gets the variance-proportion. Parameters
df
- Spark dataframe
feat_list
- list of features columns e.g. ["col1", "col2"] If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None)
n_pcs
- number of PCs e.g. 2 This parameter controls the size of Principal Components. e.g. If n_pcs=2, only the first 2 eigenvalues, eigenvectors of the correlation matrix will be extracted. (Default value = 2)
Returns
- (top n_pcs) eigenvalues, associated eigenvectors, correlation matrix in Pandas dataframe and variance proportions
eigvals
- Top n_pcs eigenvalues in a Numpy array e.g. [eigval1, eigval2]
eigvecs
- The associated eigenvectors of the top n_pcs eigenvalues in a Numpy array e.g. [eigvec1, eigvec2] Each eigenvector is an array of size D, where D represents the number of columns of df[feat_list]
corr_df
- Correlation matrix of stored in Pandas dataframe The size of this output is DxD where D represents the number of columns of df[feat_list]. The value at index i and column j indicates the pearson's correlation score between feat_list[i] and feat_list[j]
varprops
- The variance proportion of the top n_pcs eigenvalues in a Numpy array e.g. [varprop1, varprop2] The order of this array is the same with eigvals, eigvecs, with the first value varprop1 indicating the variance proportion of the largest eigenvalue.
Expand source code
def correig(self, df, feat_list=None, n_pcs=2): """ This function find the correlation matrix between feat_list, calculates the top n_pcs eigenvalues, eigenvectors, and gets the variance-proportion. Parameters ---------- df Spark dataframe feat_list list of features columns e.g. ["col1", "col2"] If feat_list is None, all columns of the input dataframe will be used. If feat_list is specified, only columns inside feat_list will be used. To run the algorithm successfully, df[feat_list] should only contain numerical values. (Default value = None) n_pcs number of PCs e.g. 2 This parameter controls the size of Principal Components. e.g. If n_pcs=2, only the first 2 eigenvalues, eigenvectors of the correlation matrix will be extracted. (Default value = 2) Returns ------- (top n_pcs) eigenvalues, associated eigenvectors, correlation matrix in Pandas dataframe and variance proportions eigvals Top n_pcs eigenvalues in a Numpy array e.g. [eigval1, eigval2] eigvecs The associated eigenvectors of the top n_pcs eigenvalues in a Numpy array e.g. [eigvec1, eigvec2] Each eigenvector is an array of size D, where D represents the number of columns of df[feat_list] corr_df Correlation matrix of stored in Pandas dataframe The size of this output is DxD where D represents the number of columns of df[feat_list]. The value at index i and column j indicates the pearson's correlation score between feat_list[i] and feat_list[j] varprops The variance proportion of the top n_pcs eigenvalues in a Numpy array e.g. [varprop1, varprop2] The order of this array is the same with eigvals, eigvecs, with the first value varprop1 indicating the variance proportion of the largest eigenvalue. """ if feat_list is None: feat_list = df.columns if len(feat_list) <= 1: corr = [len(feat_list)] eigvals = [len(feat_list)] + [0] * (n_pcs - 1) eigvecs = np.array([[len(feat_list)]]) varprops = [sum(eigvals)] else: corr = self.corr_df.loc[feat_list, feat_list].values raw_eigvals, raw_eigvecs = np.linalg.eigh(corr) idx = np.argsort(raw_eigvals)[::-1] eigvals, eigvecs = raw_eigvals[idx], raw_eigvecs[:, idx] eigvals, eigvecs = eigvals[:n_pcs], eigvecs[:, :n_pcs] varprops = eigvals / sum(raw_eigvals) corr_df = pd.DataFrame(corr, columns=feat_list, index=feat_list) return eigvals, eigvecs, corr_df, varprops