# association_evaluator

This submodule focuses on understanding the interaction between different attributes and/or the relationship between an attribute & the binary target variable.

Association between attributes is measured by: - correlation_matrix - variable_clustering

Association between an attribute and binary target is measured by: - IV_calculation - IG_calculation

Expand source code
# coding=utf-8
"""
This submodule focuses on understanding the interaction between different attributes and/or the relationship
between an attribute & the binary target variable.

Association between attributes is measured by:
- correlation_matrix
- variable_clustering

Association between an attribute and binary target is measured by:
- IV_calculation
- IG_calculation

"""
import math
import warnings

import pandas as pd

import pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql import Window
from pyspark.sql import functions as F

from anovos.data_analyzer.stats_generator import uniqueCount_computation
from anovos.data_analyzer.association_eval_varclus import VarClusHiSpark
from anovos.data_ingest.data_sampling import data_sample
from anovos.data_transformer.transformers import (
attribute_binning,
cat_to_num_unsupervised,
imputation_MMM,
monotonic_binning,
)
from anovos.shared.utils import attributeType_segregation

def correlation_matrix(
spark,
idf,
list_of_cols="all",
drop_cols=[],
use_sampling=False,
sample_size=1000000,
print_impact=False,
):
"""
This function calculates correlation coefficient statistical, which measures the strength of the relationship
between the relative movements of two attributes. Pearson’s correlation coefficient is a standard approach of
measuring correlation between two variables.
This function supports numerical columns only. If Dataframe contains categorical columns also then those columns
must be first converted to numerical columns. Anovos has multiple functions to help convert categorical columns
into numerical columns. Functions cat_to_num_supervised and cat_to_num_unsupervised can be used for this. Some data
cleaning treatment can also be done on categorical columns before converting them to numerical columns.
Few functions to help in columns treatment are outlier_categories, measure_of_cardinality, IDness_detection etc.
This correlation_matrix function returns a correlation matrix dataframe of schema –
attribute, <attribute_names>. Correlation between attribute X and Y can be found at intersection of a) row with
value X in ‘attribute’ column and b) column‘Y’ (or row with value Y in ‘attribute’ column and column ‘X’).
Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include numerical columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
use_sampling
True, False
This argument is to tell function whether to compute correlation matrix on full dataframe or only on small sample
of dataframe, sample size is decided by another argument called sample_size.(Default value = False)
sample_size
int
If use_sampling is True then sample size is decided by this argument.(Default value = 1000000)
print_impact
True, False
This argument is to print out the statistics.(Default value = False)
Returns
-------
DataFrame
[attribute,*attribute_names]
"""
num_cols, cat_cols, other_cols = attributeType_segregation(idf)

if list_of_cols == "all":
list_of_cols = num_cols
if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]
if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

if any(x not in num_cols for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")

if use_sampling:
if idf.count() > sample_size:
warnings.warn(
"Using sampling. Only "
+ str(sample_size)
+ " random sampled rows are considered."
)
idf = data_sample(
idf, fraction=float(sample_size) / idf.count(), method_type="random"
)

assembler = VectorAssembler(
inputCols=list_of_cols, outputCol="features", handleInvalid="skip"
)
idf_vector = assembler.transform(idf).select("features")
matrix = Correlation.corr(idf_vector, "features", "pearson")
result = matrix.collect()[0]["pearson(features)"].values

odf_pd = pd.DataFrame(
result.reshape(-1, len(list_of_cols)), columns=list_of_cols, index=list_of_cols
)
odf_pd["attribute"] = odf_pd.index
list_of_cols.sort()
odf = (
spark.createDataFrame(odf_pd)
.select(["attribute"] + list_of_cols)
.orderBy("attribute")
)

if print_impact:
odf.show(odf.count())

return odf

def variable_clustering(
spark,
idf,
list_of_cols="all",
drop_cols=[],
stats_mode={},
persist=True,
print_impact=False,
):
"""
This function performs Variable Clustering with necessary pre-processing techniques, including low-cardinality
columns removal, categorical-to-numerical transformation and null values imputation. It works as a wrapper of VarClusHiSpark
class which groups correlated attributes within a cluster and assign uncorrelated attributes into other clusters.
For more details on the algorithm, please check anovos.data_analyzer.association_eval_varclus

It returns a Spark Dataframe with schema – Cluster, Attribute, RS_Ratio. Attributes similar to each other are grouped
together with the same cluster id. The attribute with the lowest (1 — RS_Ratio) can be chosen as a representative of the cluster
while discarding the other attributes from that cluster. This can also help in achieving the dimension reduction, if required.

Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
stats_mode
Takes arguments for read_dataset (data_ingest module) function in a dictionary format
to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or
mode_computation (data_analyzer.stats_generator module) has been computed & saved before.
This is used for MMM imputation as Variable Clustering doesn’t work with missing values. (Default value = {})
persist
Boolean argument - True or False. This argument is used to determine whether to persist on pre-processing (low-cardinality
columns removal, categorical-to-numerical transformation and null values imputation) results of input dataset.
persist=True will enable the use of persist, otherwise False.
It is recommended to set this as True for large datasets. (Default value = True)
print_impact
True, False
This argument is to print out the statistics.(Default value = False)

Returns
-------
DataFrame
[Cluster, Attribute, RS_Ratio]

"""

if list_of_cols == "all":
num_cols, cat_cols, other_cols = attributeType_segregation(idf)
list_of_cols = num_cols + cat_cols
if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]
if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")
if persist:
idf.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()
remove_cols = (
uniqueCount_computation(spark, idf, list_of_cols)
.where(F.col("unique_values") < 2)
.select("attribute")
.rdd.flatMap(lambda x: x)
.collect()
)
list_of_cols = [e for e in list_of_cols if e not in remove_cols]
idf = idf.select(list_of_cols)
cat_cols = attributeType_segregation(idf)[1]

for i in idf.dtypes:
if i[1].startswith("decimal"):
idf = idf.withColumn(i[0], F.col(i[0]).cast("double"))
idf_encoded = cat_to_num_unsupervised(
spark, idf, list_of_cols=cat_cols, method_type="label_encoding"
)
num_cols = attributeType_segregation(idf_encoded)[0]
idf_encoded = idf_encoded.select(num_cols)
idf_imputed = imputation_MMM(spark, idf_encoded, stats_mode=stats_mode)
if persist:
idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()
idf.unpersist()
vc = VarClusHiSpark(idf_imputed, maxeigval2=1, maxclus=None)
vc._varclusspark(spark)
odf_pd = vc._rsquarespark()
odf = spark.createDataFrame(odf_pd).select(
"Cluster",
F.col("Variable").alias("Attribute"),
F.round(F.col("RS_Ratio"), 4).alias("RS_Ratio"),
)
if print_impact:
odf.show(odf.count())
if persist:
idf_imputed.unpersist()
return odf

def IV_calculation(
spark,
idf,
list_of_cols="all",
drop_cols=[],
label_col="label",
event_label=1,
encoding_configs={
"bin_method": "equal_frequency",
"bin_size": 10,
"monotonicity_check": 0,
},
print_impact=False,
):
"""
Information Value (IV) is simple and powerful technique to conduct attribute relevance analysis. It measures
how well an attribute is able to distinguish between a binary target variable i.e. label 0 from label 1,
and hence helps in ranking attributes on the basis of their importance. In the heart of IV methodology are groups
(bins) of observations. For categorical attributes, usually each category is a bin while numerical attributes
need to be split into categories.

IV = ∑ (% of non-events - % of events) * WOE
<br>where:
<br>WOE = In(% of non-events ➗ % of events)
<br>% of event = % label 1 in a bin
<br>% of non-event = % label 0 in a bin

General rule of thumb while creating the bins are that a) each bin should have at least 5% of the observations,
b) the WOE should be monotonic, i.e. either growing or decreasing with the bins, and c) missing values should be
binned separately. An article  from listendata.com can be referred for good understanding of IV & WOE concepts.

Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
label_col
Label/Target column (Default value = "label")
event_label
Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
Takes input in dictionary format. {} i.e. empty dict means no encoding is required.
In case numerical columns are present and encoding is required, following keys shall be
provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical,
"bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and
"monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will
dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
True, False
This argument is to print out the statistics.(Default value = False)

Returns
-------
DataFrame
[attribute, iv]

"""

if label_col not in idf.columns:
raise TypeError("Invalid input for Label Column")

if list_of_cols == "all":
num_cols, cat_cols, other_cols = attributeType_segregation(idf)
list_of_cols = num_cols + cat_cols

if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]

if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(
set([e for e in list_of_cols if e not in (drop_cols + [label_col])])
)

if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")

if idf.where(F.col(label_col) == event_label).count() == 0:
raise TypeError("Invalid input for Event Label Value")

num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols))

if (len(num_cols) > 0) & bool(encoding_configs):
bin_size = encoding_configs["bin_size"]
bin_method = encoding_configs["bin_method"]
monotonicity_check = encoding_configs["monotonicity_check"]
if monotonicity_check == 1:
idf_encoded = monotonic_binning(
spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size
)
else:
idf_encoded = attribute_binning(
spark, idf, num_cols, label_col, bin_method, bin_size
)
else:
idf_encoded = idf

list_df = []
idf_encoded = idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
for col in list_of_cols:
df_agg = (
idf_encoded.select(col, label_col)
.groupby(col)
.agg(
F.count(
F.when(F.col(label_col) != event_label, F.col(label_col))
).alias("label_0"),
F.count(
F.when(F.col(label_col) == event_label, F.col(label_col))
).alias("label_1"),
)
.withColumn(
"label_0_total", F.sum(F.col("label_0")).over(Window.partitionBy())
)
.withColumn(
"label_1_total", F.sum(F.col("label_1")).over(Window.partitionBy())
)
)

out_df = (
df_agg.withColumn("event_pcr", F.col("label_1") / F.col("label_1_total"))
.withColumn("nonevent_pcr", F.col("label_0") / F.col("label_0_total"))
.withColumn("diff_event", F.col("nonevent_pcr") - F.col("event_pcr"))
.withColumn("const", F.lit(0.5))
.withColumn(
"woe",
F.when(
(F.col("nonevent_pcr") != 0) & (F.col("event_pcr") != 0),
F.log(F.col("nonevent_pcr") / F.col("event_pcr")),
).otherwise(
F.log(
((F.col("label_0") + F.col("const")) / F.col("label_0_total"))
/ ((F.col("label_1") + F.col("const")) / F.col("label_1_total"))
)
),
)
.withColumn("iv_single", F.col("woe") * F.col("diff_event"))
.withColumn("iv", F.sum(F.col("iv_single")).over(Window.partitionBy()))
.withColumn("attribute", F.lit(str(col)))
.select("attribute", "iv")
.distinct()
)

list_df.append(out_df)

def unionAll(dfs):
first, *_ = dfs
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema
)

odf = unionAll(list_df)
if print_impact:
odf.show(odf.count())
idf_encoded.unpersist()

return odf

def IG_calculation(
spark,
idf,
list_of_cols="all",
drop_cols=[],
label_col="label",
event_label=1,
encoding_configs={
"bin_method": "equal_frequency",
"bin_size": 10,
"monotonicity_check": 0,
},
print_impact=False,
):
"""
Information Gain (IG) is another powerful technique for feature selection analysis. Information gain is
calculated by comparing the entropy of the dataset before and after a transformation (introduction of attribute
in this particular case). Similar to IV calculation, each category is a bin for categorical attributes,
while numerical attributes need to be split into categories.

IG = Total Entropy – Entropy

Total Entropy= -%event*log⁡(%event)-(1-%event)*log⁡(1-%event)

Entropy = ∑(-%〖event〗_i*log⁡(%〖event〗_i )-(1-%〖event〗_i )*log⁡(1-%〖event〗_i)

Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
label_col
Label/Target column (Default value = "label")
event_label
Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
Takes input in dictionary format. {} i.e. empty dict means no encoding is required.
In case numerical columns are present and encoding is required, following keys shall be
provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical,
"bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and
"monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will
dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
True, False
This argument is to print out the statistics.(Default value = False)

Returns
-------
DataFrame
[attribute, id]

"""

if label_col not in idf.columns:
raise TypeError("Invalid input for Label Column")

if list_of_cols == "all":
num_cols, cat_cols, other_cols = attributeType_segregation(idf)
list_of_cols = num_cols + cat_cols
if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]
if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(
set([e for e in list_of_cols if e not in (drop_cols + [label_col])])
)

if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")
if idf.where(F.col(label_col) == event_label).count() == 0:
raise TypeError("Invalid input for Event Label Value")

num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols))

if (len(num_cols) > 0) & bool(encoding_configs):
bin_size = encoding_configs["bin_size"]
bin_method = encoding_configs["bin_method"]
monotonicity_check = encoding_configs["monotonicity_check"]
if monotonicity_check == 1:
idf_encoded = monotonic_binning(
spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size
)
else:
idf_encoded = attribute_binning(
spark, idf, num_cols, label_col, bin_method, bin_size
)
else:
idf_encoded = idf

output = []
total_event = idf.where(F.col(label_col) == event_label).count() / idf.count()
total_entropy = -(
total_event * math.log2(total_event)
+ ((1 - total_event) * math.log2((1 - total_event)))
)
idf_encoded = idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
for col in list_of_cols:
idf_entropy = (
(
idf_encoded.withColumn(
label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0)
)
.groupBy(col)
.agg(
F.sum(F.col(label_col)).alias("event_count"),
F.count(F.col(label_col)).alias("total_count"),
)
.withColumn("event_pct", F.col("event_count") / F.col("total_count"))
.withColumn(
"segment_pct",
F.col("total_count")
/ F.sum("total_count").over(Window.partitionBy()),
)
.withColumn(
"entropy",
-F.col("segment_pct")
* (
(F.col("event_pct") * F.log2(F.col("event_pct")))
+ ((1 - F.col("event_pct")) * F.log2((1 - F.col("event_pct"))))
),
)
)
.groupBy()
.agg(F.sum(F.col("entropy")).alias("entropy_sum"))
.withColumn("attribute", F.lit(str(col)))
.withColumn("entropy_total", F.lit(float(total_entropy)))
.withColumn("ig", F.col("entropy_total") - F.col("entropy_sum"))
.select("attribute", "ig")
)
output.append(idf_entropy)

def unionAll(dfs):
first, *_ = dfs
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema
)

odf = unionAll(output)
if print_impact:
odf.show(odf.count())
idf_encoded.unpersist()

return odf



## Functions

 def IG_calculation(spark, idf, list_of_cols='all', drop_cols=[], label_col='label', event_label=1, encoding_configs={'bin_method': 'equal_frequency', 'bin_size': 10, 'monotonicity_check': 0}, print_impact=False) 

Information Gain (IG) is another powerful technique for feature selection analysis. Information gain is calculated by comparing the entropy of the dataset before and after a transformation (introduction of attribute in this particular case). Similar to IV calculation, each category is a bin for categorical attributes, while numerical attributes need to be split into categories.

IG = Total Entropy – Entropy

Total Entropy= -%eventlog⁡(%event)-(1-%event)log⁡(1-%event)

Entropy = ∑(-%〖event〗_ilog⁡(%〖event〗_i )-(1-%〖event〗_i )log⁡(1-%〖event〗_i)

## Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
label_col
Label/Target column (Default value = "label")
event_label
Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
True, False This argument is to print out the statistics.(Default value = False)

## Returns

DataFrame
[attribute, id]
Expand source code
def IG_calculation(
spark,
idf,
list_of_cols="all",
drop_cols=[],
label_col="label",
event_label=1,
encoding_configs={
"bin_method": "equal_frequency",
"bin_size": 10,
"monotonicity_check": 0,
},
print_impact=False,
):
"""
Information Gain (IG) is another powerful technique for feature selection analysis. Information gain is
calculated by comparing the entropy of the dataset before and after a transformation (introduction of attribute
in this particular case). Similar to IV calculation, each category is a bin for categorical attributes,
while numerical attributes need to be split into categories.

IG = Total Entropy – Entropy

Total Entropy= -%event*log⁡(%event)-(1-%event)*log⁡(1-%event)

Entropy = ∑(-%〖event〗_i*log⁡(%〖event〗_i )-(1-%〖event〗_i )*log⁡(1-%〖event〗_i)

Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
label_col
Label/Target column (Default value = "label")
event_label
Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
Takes input in dictionary format. {} i.e. empty dict means no encoding is required.
In case numerical columns are present and encoding is required, following keys shall be
provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical,
"bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and
"monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will
dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
True, False
This argument is to print out the statistics.(Default value = False)

Returns
-------
DataFrame
[attribute, id]

"""

if label_col not in idf.columns:
raise TypeError("Invalid input for Label Column")

if list_of_cols == "all":
num_cols, cat_cols, other_cols = attributeType_segregation(idf)
list_of_cols = num_cols + cat_cols
if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]
if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(
set([e for e in list_of_cols if e not in (drop_cols + [label_col])])
)

if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")
if idf.where(F.col(label_col) == event_label).count() == 0:
raise TypeError("Invalid input for Event Label Value")

num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols))

if (len(num_cols) > 0) & bool(encoding_configs):
bin_size = encoding_configs["bin_size"]
bin_method = encoding_configs["bin_method"]
monotonicity_check = encoding_configs["monotonicity_check"]
if monotonicity_check == 1:
idf_encoded = monotonic_binning(
spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size
)
else:
idf_encoded = attribute_binning(
spark, idf, num_cols, label_col, bin_method, bin_size
)
else:
idf_encoded = idf

output = []
total_event = idf.where(F.col(label_col) == event_label).count() / idf.count()
total_entropy = -(
total_event * math.log2(total_event)
+ ((1 - total_event) * math.log2((1 - total_event)))
)
idf_encoded = idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
for col in list_of_cols:
idf_entropy = (
(
idf_encoded.withColumn(
label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0)
)
.groupBy(col)
.agg(
F.sum(F.col(label_col)).alias("event_count"),
F.count(F.col(label_col)).alias("total_count"),
)
.withColumn("event_pct", F.col("event_count") / F.col("total_count"))
.withColumn(
"segment_pct",
F.col("total_count")
/ F.sum("total_count").over(Window.partitionBy()),
)
.withColumn(
"entropy",
-F.col("segment_pct")
* (
(F.col("event_pct") * F.log2(F.col("event_pct")))
+ ((1 - F.col("event_pct")) * F.log2((1 - F.col("event_pct"))))
),
)
)
.groupBy()
.agg(F.sum(F.col("entropy")).alias("entropy_sum"))
.withColumn("attribute", F.lit(str(col)))
.withColumn("entropy_total", F.lit(float(total_entropy)))
.withColumn("ig", F.col("entropy_total") - F.col("entropy_sum"))
.select("attribute", "ig")
)
output.append(idf_entropy)

def unionAll(dfs):
first, *_ = dfs
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema
)

odf = unionAll(output)
if print_impact:
odf.show(odf.count())
idf_encoded.unpersist()

return odf


 def IV_calculation(spark, idf, list_of_cols='all', drop_cols=[], label_col='label', event_label=1, encoding_configs={'bin_method': 'equal_frequency', 'bin_size': 10, 'monotonicity_check': 0}, print_impact=False) 

Information Value (IV) is simple and powerful technique to conduct attribute relevance analysis. It measures how well an attribute is able to distinguish between a binary target variable i.e. label 0 from label 1, and hence helps in ranking attributes on the basis of their importance. In the heart of IV methodology are groups (bins) of observations. For categorical attributes, usually each category is a bin while numerical attributes need to be split into categories.

IV = ∑ (% of non-events - % of events) * WOE
where:
WOE = In(% of non-events ➗ % of events)
% of event = % label 1 in a bin
% of non-event = % label 0 in a bin

General rule of thumb while creating the bins are that a) each bin should have at least 5% of the observations, b) the WOE should be monotonic, i.e. either growing or decreasing with the bins, and c) missing values should be binned separately. An article from listendata.com can be referred for good understanding of IV & WOE concepts.

## Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
label_col
Label/Target column (Default value = "label")
event_label
Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
True, False This argument is to print out the statistics.(Default value = False)

## Returns

DataFrame
[attribute, iv]
Expand source code
def IV_calculation(
spark,
idf,
list_of_cols="all",
drop_cols=[],
label_col="label",
event_label=1,
encoding_configs={
"bin_method": "equal_frequency",
"bin_size": 10,
"monotonicity_check": 0,
},
print_impact=False,
):
"""
Information Value (IV) is simple and powerful technique to conduct attribute relevance analysis. It measures
how well an attribute is able to distinguish between a binary target variable i.e. label 0 from label 1,
and hence helps in ranking attributes on the basis of their importance. In the heart of IV methodology are groups
(bins) of observations. For categorical attributes, usually each category is a bin while numerical attributes
need to be split into categories.

IV = ∑ (% of non-events - % of events) * WOE
<br>where:
<br>WOE = In(% of non-events ➗ % of events)
<br>% of event = % label 1 in a bin
<br>% of non-event = % label 0 in a bin

General rule of thumb while creating the bins are that a) each bin should have at least 5% of the observations,
b) the WOE should be monotonic, i.e. either growing or decreasing with the bins, and c) missing values should be
binned separately. An article  from listendata.com can be referred for good understanding of IV & WOE concepts.

Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
label_col
Label/Target column (Default value = "label")
event_label
Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
Takes input in dictionary format. {} i.e. empty dict means no encoding is required.
In case numerical columns are present and encoding is required, following keys shall be
provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical,
"bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and
"monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will
dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
True, False
This argument is to print out the statistics.(Default value = False)

Returns
-------
DataFrame
[attribute, iv]

"""

if label_col not in idf.columns:
raise TypeError("Invalid input for Label Column")

if list_of_cols == "all":
num_cols, cat_cols, other_cols = attributeType_segregation(idf)
list_of_cols = num_cols + cat_cols

if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]

if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(
set([e for e in list_of_cols if e not in (drop_cols + [label_col])])
)

if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")

if idf.where(F.col(label_col) == event_label).count() == 0:
raise TypeError("Invalid input for Event Label Value")

num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols))

if (len(num_cols) > 0) & bool(encoding_configs):
bin_size = encoding_configs["bin_size"]
bin_method = encoding_configs["bin_method"]
monotonicity_check = encoding_configs["monotonicity_check"]
if monotonicity_check == 1:
idf_encoded = monotonic_binning(
spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size
)
else:
idf_encoded = attribute_binning(
spark, idf, num_cols, label_col, bin_method, bin_size
)
else:
idf_encoded = idf

list_df = []
idf_encoded = idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
for col in list_of_cols:
df_agg = (
idf_encoded.select(col, label_col)
.groupby(col)
.agg(
F.count(
F.when(F.col(label_col) != event_label, F.col(label_col))
).alias("label_0"),
F.count(
F.when(F.col(label_col) == event_label, F.col(label_col))
).alias("label_1"),
)
.withColumn(
"label_0_total", F.sum(F.col("label_0")).over(Window.partitionBy())
)
.withColumn(
"label_1_total", F.sum(F.col("label_1")).over(Window.partitionBy())
)
)

out_df = (
df_agg.withColumn("event_pcr", F.col("label_1") / F.col("label_1_total"))
.withColumn("nonevent_pcr", F.col("label_0") / F.col("label_0_total"))
.withColumn("diff_event", F.col("nonevent_pcr") - F.col("event_pcr"))
.withColumn("const", F.lit(0.5))
.withColumn(
"woe",
F.when(
(F.col("nonevent_pcr") != 0) & (F.col("event_pcr") != 0),
F.log(F.col("nonevent_pcr") / F.col("event_pcr")),
).otherwise(
F.log(
((F.col("label_0") + F.col("const")) / F.col("label_0_total"))
/ ((F.col("label_1") + F.col("const")) / F.col("label_1_total"))
)
),
)
.withColumn("iv_single", F.col("woe") * F.col("diff_event"))
.withColumn("iv", F.sum(F.col("iv_single")).over(Window.partitionBy()))
.withColumn("attribute", F.lit(str(col)))
.select("attribute", "iv")
.distinct()
)

list_df.append(out_df)

def unionAll(dfs):
first, *_ = dfs
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema
)

odf = unionAll(list_df)
if print_impact:
odf.show(odf.count())
idf_encoded.unpersist()

return odf


 def correlation_matrix(spark, idf, list_of_cols='all', drop_cols=[], use_sampling=False, sample_size=1000000, print_impact=False) 

This function calculates correlation coefficient statistical, which measures the strength of the relationship between the relative movements of two attributes. Pearson’s correlation coefficient is a standard approach of measuring correlation between two variables. This function supports numerical columns only. If Dataframe contains categorical columns also then those columns must be first converted to numerical columns. Anovos has multiple functions to help convert categorical columns into numerical columns. Functions cat_to_num_supervised and cat_to_num_unsupervised can be used for this. Some data cleaning treatment can also be done on categorical columns before converting them to numerical columns. Few functions to help in columns treatment are outlier_categories, measure_of_cardinality, IDness_detection etc. This correlation_matrix function returns a correlation matrix dataframe of schema – attribute, . Correlation between attribute X and Y can be found at intersection of a) row with value X in ‘attribute’ column and b) column‘Y’ (or row with value Y in ‘attribute’ column and column ‘X’). Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
use_sampling
True, False This argument is to tell function whether to compute correlation matrix on full dataframe or only on small sample of dataframe, sample size is decided by another argument called sample_size.(Default value = False)
sample_size
int If use_sampling is True then sample size is decided by this argument.(Default value = 1000000)
print_impact
True, False This argument is to print out the statistics.(Default value = False)

## Returns

DataFrame
[attribute,*attribute_names]
Expand source code
def correlation_matrix(
spark,
idf,
list_of_cols="all",
drop_cols=[],
use_sampling=False,
sample_size=1000000,
print_impact=False,
):
"""
This function calculates correlation coefficient statistical, which measures the strength of the relationship
between the relative movements of two attributes. Pearson’s correlation coefficient is a standard approach of
measuring correlation between two variables.
This function supports numerical columns only. If Dataframe contains categorical columns also then those columns
must be first converted to numerical columns. Anovos has multiple functions to help convert categorical columns
into numerical columns. Functions cat_to_num_supervised and cat_to_num_unsupervised can be used for this. Some data
cleaning treatment can also be done on categorical columns before converting them to numerical columns.
Few functions to help in columns treatment are outlier_categories, measure_of_cardinality, IDness_detection etc.
This correlation_matrix function returns a correlation matrix dataframe of schema –
attribute, <attribute_names>. Correlation between attribute X and Y can be found at intersection of a) row with
value X in ‘attribute’ column and b) column‘Y’ (or row with value Y in ‘attribute’ column and column ‘X’).
Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include numerical columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
use_sampling
True, False
This argument is to tell function whether to compute correlation matrix on full dataframe or only on small sample
of dataframe, sample size is decided by another argument called sample_size.(Default value = False)
sample_size
int
If use_sampling is True then sample size is decided by this argument.(Default value = 1000000)
print_impact
True, False
This argument is to print out the statistics.(Default value = False)
Returns
-------
DataFrame
[attribute,*attribute_names]
"""
num_cols, cat_cols, other_cols = attributeType_segregation(idf)

if list_of_cols == "all":
list_of_cols = num_cols
if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]
if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

if any(x not in num_cols for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")

if use_sampling:
if idf.count() > sample_size:
warnings.warn(
"Using sampling. Only "
+ str(sample_size)
+ " random sampled rows are considered."
)
idf = data_sample(
idf, fraction=float(sample_size) / idf.count(), method_type="random"
)

assembler = VectorAssembler(
inputCols=list_of_cols, outputCol="features", handleInvalid="skip"
)
idf_vector = assembler.transform(idf).select("features")
matrix = Correlation.corr(idf_vector, "features", "pearson")
result = matrix.collect()[0]["pearson(features)"].values

odf_pd = pd.DataFrame(
result.reshape(-1, len(list_of_cols)), columns=list_of_cols, index=list_of_cols
)
odf_pd["attribute"] = odf_pd.index
list_of_cols.sort()
odf = (
spark.createDataFrame(odf_pd)
.select(["attribute"] + list_of_cols)
.orderBy("attribute")
)

if print_impact:
odf.show(odf.count())

return odf


 def variable_clustering(spark, idf, list_of_cols='all', drop_cols=[], stats_mode={}, persist=True, print_impact=False) 

This function performs Variable Clustering with necessary pre-processing techniques, including low-cardinality columns removal, categorical-to-numerical transformation and null values imputation. It works as a wrapper of VarClusHiSpark class which groups correlated attributes within a cluster and assign uncorrelated attributes into other clusters. For more details on the algorithm, please check anovos.data_analyzer.association_eval_varclus

It returns a Spark Dataframe with schema – Cluster, Attribute, RS_Ratio. Attributes similar to each other are grouped together with the same cluster id. The attribute with the lowest (1 — RS_Ratio) can be chosen as a representative of the cluster while discarding the other attributes from that cluster. This can also help in achieving the dimension reduction, if required.

## Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
stats_mode
Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used for MMM imputation as Variable Clustering doesn’t work with missing values. (Default value = {})
persist
Boolean argument - True or False. This argument is used to determine whether to persist on pre-processing (low-cardinality columns removal, categorical-to-numerical transformation and null values imputation) results of input dataset. persist=True will enable the use of persist, otherwise False. It is recommended to set this as True for large datasets. (Default value = True)
print_impact
True, False This argument is to print out the statistics.(Default value = False)

## Returns

DataFrame
[Cluster, Attribute, RS_Ratio]
Expand source code
def variable_clustering(
spark,
idf,
list_of_cols="all",
drop_cols=[],
stats_mode={},
persist=True,
print_impact=False,
):
"""
This function performs Variable Clustering with necessary pre-processing techniques, including low-cardinality
columns removal, categorical-to-numerical transformation and null values imputation. It works as a wrapper of VarClusHiSpark
class which groups correlated attributes within a cluster and assign uncorrelated attributes into other clusters.
For more details on the algorithm, please check anovos.data_analyzer.association_eval_varclus

It returns a Spark Dataframe with schema – Cluster, Attribute, RS_Ratio. Attributes similar to each other are grouped
together with the same cluster id. The attribute with the lowest (1 — RS_Ratio) can be chosen as a representative of the cluster
while discarding the other attributes from that cluster. This can also help in achieving the dimension reduction, if required.

Parameters
----------
spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to analyse e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument
is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
a few handful of them. (Default value = [])
stats_mode
Takes arguments for read_dataset (data_ingest module) function in a dictionary format
to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or
mode_computation (data_analyzer.stats_generator module) has been computed & saved before.
This is used for MMM imputation as Variable Clustering doesn’t work with missing values. (Default value = {})
persist
Boolean argument - True or False. This argument is used to determine whether to persist on pre-processing (low-cardinality
columns removal, categorical-to-numerical transformation and null values imputation) results of input dataset.
persist=True will enable the use of persist, otherwise False.
It is recommended to set this as True for large datasets. (Default value = True)
print_impact
True, False
This argument is to print out the statistics.(Default value = False)

Returns
-------
DataFrame
[Cluster, Attribute, RS_Ratio]

"""

if list_of_cols == "all":
num_cols, cat_cols, other_cols = attributeType_segregation(idf)
list_of_cols = num_cols + cat_cols
if isinstance(list_of_cols, str):
list_of_cols = [x.strip() for x in list_of_cols.split("|")]
if isinstance(drop_cols, str):
drop_cols = [x.strip() for x in drop_cols.split("|")]

list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0):
raise TypeError("Invalid input for Column(s)")
if persist:
idf.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()
remove_cols = (
uniqueCount_computation(spark, idf, list_of_cols)
.where(F.col("unique_values") < 2)
.select("attribute")
.rdd.flatMap(lambda x: x)
.collect()
)
list_of_cols = [e for e in list_of_cols if e not in remove_cols]
idf = idf.select(list_of_cols)
cat_cols = attributeType_segregation(idf)[1]

for i in idf.dtypes:
if i[1].startswith("decimal"):
idf = idf.withColumn(i[0], F.col(i[0]).cast("double"))
idf_encoded = cat_to_num_unsupervised(
spark, idf, list_of_cols=cat_cols, method_type="label_encoding"
)
num_cols = attributeType_segregation(idf_encoded)[0]
idf_encoded = idf_encoded.select(num_cols)
idf_imputed = imputation_MMM(spark, idf_encoded, stats_mode=stats_mode)
if persist:
idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()
idf.unpersist()
vc = VarClusHiSpark(idf_imputed, maxeigval2=1, maxclus=None)
vc._varclusspark(spark)
odf_pd = vc._rsquarespark()
odf = spark.createDataFrame(odf_pd).select(
"Cluster",
F.col("Variable").alias("Attribute"),
F.round(F.col("RS_Ratio"), 4).alias("RS_Ratio"),
)
if print_impact:
odf.show(odf.count())
if persist:
idf_imputed.unpersist()
return odf