association_evaluator
This submodule focuses on understanding the interaction between different attributes and/or the relationship between an attribute & the binary target variable.
Association between attributes is measured by: - correlation_matrix - variable_clustering
Association between an attribute and binary target is measured by: - IV_calculation - IG_calculation
Expand source code
# coding=utf-8 """ This submodule focuses on understanding the interaction between different attributes and/or the relationship between an attribute & the binary target variable. Association between attributes is measured by: - correlation_matrix - variable_clustering Association between an attribute and binary target is measured by: - IV_calculation - IG_calculation """ import itertools import math import pyspark from phik.phik import spark_phik_matrix_from_hist2d_dict from popmon.analysis.hist_numpy import get_2dgrid from pyspark.sql import Window from pyspark.sql import functions as F from varclushi import VarClusHi from anovos.data_analyzer.stats_generator import uniqueCount_computation from anovos.data_ingest.data_ingest import read_dataset from anovos.data_transformer.transformers import ( attribute_binning, monotonic_binning, cat_to_num_unsupervised, imputation_MMM, ) from anovos.shared.utils import attributeType_segregation def correlation_matrix( spark, idf, list_of_cols="all", drop_cols=[], stats_unique={}, print_impact=False ): """ This function calculates correlation coefficient statistical, which measures the strength of the relationship between the relative movements of two attributes. Pearson’s correlation coefficient is a standard approach of measuring correlation between two variables. However, it has some drawbacks: a) It works only with continuous variables, b) It only accounts for a linear relationship between variables, and c) It is sensitive to outliers. To avoid these issues, we are computing Phik (𝜙k), which is a new and practical correlation coefficient that works consistently between categorical, ordinal and interval variables, captures non-linear dependency and reverts to the Pearson correlation coefficient in case of a bivariate normal input distribution. The correlation coefficient is calculated for every pair of attributes and its value lies between 0 and 1, where 0 means there is no correlation between the two attributes and 1 means strong correlation. However, this methodology have drawbacks of its own as it is found to be more computational expensive especially when number of columns in the input dataset is on higher side (number of pairs to analyse increases exponentially with number of columns). Further, there is no indication of the direction of the relationship. More detail can be referred from the [ source paper] [1]. [1]: https://arxiv.org/abs/1811.11440/ "source paper" This function returns a correlation matrix dataframe of schema – attribute, <attribute_names>. Correlation between attribute X and Y can be found at intersection of a) row with value X in ‘attribute’ column and b) column ‘Y’ (or row with value Y in ‘attribute’ column and column ‘X’). Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [attribute,*attribute_names] """ if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if stats_unique == {}: remove_cols = ( uniqueCount_computation(spark, idf, list_of_cols) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: remove_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + remove_cols)]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") combis = [list(c) for c in itertools.combinations_with_replacement(list_of_cols, 2)] hists = idf.select(list_of_cols).pm_make_histograms(combis) grids = {k: get_2dgrid(h) for k, h in hists.items()} odf_pd = spark_phik_matrix_from_hist2d_dict(spark.sparkContext, grids) odf_pd["attribute"] = odf_pd.index list_of_cols.sort() odf = ( spark.createDataFrame(odf_pd) .select(["attribute"] + list_of_cols) .orderBy("attribute") ) if print_impact: odf.show(odf.count()) return odf def variable_clustering( spark, idf, list_of_cols="all", drop_cols=[], sample_size=100000, stats_unique={}, stats_mode={}, print_impact=False, ): """ Variable Clustering groups attributes that are as correlated as possible among themselves within a cluster and as uncorrelated as possible with attribute in other clusters. The function is leveraging [VarClusHi] [2] library to do variable clustering; however, this library is not implemented in a scalable manner due to which the analysis is done on a sample dataset. Further, it is found to be a bit computational expensive especially when number of columns in the input dataset is on higher side (number of pairs to analyse increases exponentially with number of columns). [2]: https://github.com/jingtt/varclushi "VarCluShi" It returns a Spark Dataframe with schema – Cluster, Attribute, RS_Ratio. Attributes similar to each other are grouped together with the same cluster id. The attribute with the lowest (1 — RS_Ratio) can be chosen as a representative of the cluster while discarding the other attributes from that cluster. This can also help in achieving the dimension reduction, if required. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) sample_size Maximum sample size (in terms of number of rows) taken for the computation. Sample dataset is extracted using random sampling. (Default value = 100000) stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used to remove single value columns from the analysis purpose. (Default value = {}) stats_mode Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used for MMM imputation as Variable Clustering doesn’t work with missing values. (Default value = {}) print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [Cluster, Attribute, RS_Ratio] """ if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") idf_sample = idf.sample(False, min(1.0, float(sample_size) / idf.count()), 0) idf_sample.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() if stats_unique == {}: remove_cols = ( uniqueCount_computation(spark, idf_sample, list_of_cols) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: remove_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) list_of_cols = [e for e in list_of_cols if e not in remove_cols] idf_sample = idf_sample.select(list_of_cols) num_cols, cat_cols, other_cols = attributeType_segregation(idf_sample) for i in idf_sample.dtypes: if i[1].startswith("decimal"): idf_sample = idf_sample.withColumn(i[0], F.col(i[0]).cast("double")) idf_encoded = cat_to_num_unsupervised( spark, idf_sample, list_of_cols=cat_cols, method_type=1 ) idf_imputed = imputation_MMM(spark, idf_encoded, stats_mode=stats_mode) idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() idf_sample.unpersist() idf_pd = idf_imputed.toPandas() vc = VarClusHi(idf_pd, maxeigval2=1, maxclus=None) vc.varclus() odf_pd = vc.rsquare odf = spark.createDataFrame(odf_pd).select( "Cluster", F.col("Variable").alias("Attribute"), F.round(F.col("RS_Ratio"), 4).alias("RS_Ratio"), ) if print_impact: odf.show(odf.count()) return odf def IV_calculation( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, encoding_configs={ "bin_method": "equal_frequency", "bin_size": 10, "monotonicity_check": 0, }, print_impact=False, ): """ Information Value (IV) is simple and powerful technique to conduct attribute relevance analysis. It measures how well an attribute is able to distinguish between a binary target variable i.e. label 0 from label 1, and hence helps in ranking attributes on the basis of their importance. In the heart of IV methodology are groups (bins) of observations. For categorical attributes, usually each category is a bin while numerical attributes need to be split into categories. IV = ∑ (% of non-events - % of events) * WOE <br>where: <br>WOE = In(% of non-events ➗ % of events) <br>% of event = % label 1 in a bin <br>% of non-event = % label 0 in a bin General rule of thumb while creating the bins are that a) each bin should have at least 5% of the observations, b) the WOE should be monotonic, i.e. either growing or decreasing with the bins, and c) missing values should be binned separately. An article from listendata.com can be referred for good understanding of IV & WOE concepts. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) encoding_configs Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0). print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [attribute, iv] """ if label_col not in idf.columns: raise TypeError("Invalid input for Label Column") if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + [label_col])]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") if idf.where(F.col(label_col) == event_label).count() == 0: raise TypeError("Invalid input for Event Label Value") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) if (len(num_cols) > 0) & bool(encoding_configs): bin_size = encoding_configs["bin_size"] bin_method = encoding_configs["bin_method"] monotonicity_check = encoding_configs["monotonicity_check"] if monotonicity_check == 1: idf_encoded = monotonic_binning( spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size ) else: idf_encoded = attribute_binning( spark, idf, num_cols, [], bin_method, bin_size ) idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() else: idf_encoded = idf output = [] for col in list_of_cols: df_iv = ( idf_encoded.groupBy(col, label_col) .count() .withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col) .pivot(label_col) .sum("count") .fillna(0.5) .withColumn("event_pct", F.col("1") / F.sum("1").over(Window.partitionBy())) .withColumn( "nonevent_pct", F.col("0") / F.sum("0").over(Window.partitionBy()) ) .withColumn( "iv", (F.col("nonevent_pct") - F.col("event_pct")) * F.log(F.col("nonevent_pct") / F.col("event_pct")), ) ) iv_value = df_iv.select(F.sum("iv")).collect()[0][0] output.append([col, iv_value]) odf = ( spark.createDataFrame(output, ["attribute", "iv"]) .withColumn("iv", F.round(F.col("iv"), 4)) .orderBy(F.desc("iv")) ) if print_impact: odf.show(odf.count()) return odf def IG_calculation( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, encoding_configs={ "bin_method": "equal_frequency", "bin_size": 10, "monotonicity_check": 0, }, print_impact=False, ): """ Information Gain (IG) is another powerful technique for feature selection analysis. Information gain is calculated by comparing the entropy of the dataset before and after a transformation (introduction of attribute in this particular case). Similar to IV calculation, each category is a bin for categorical attributes, while numerical attributes need to be split into categories. IG = Total Entropy – Entropy Total Entropy= -%event*log(%event)-(1-%event)*log(1-%event) Entropy = ∑(-%〖event〗_i*log(%〖event〗_i )-(1-%〖event〗_i )*log(1-%〖event〗_i) Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) encoding_configs Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0). print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [attribute, id] """ if label_col not in idf.columns: raise TypeError("Invalid input for Label Column") if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + [label_col])]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") if idf.where(F.col(label_col) == event_label).count() == 0: raise TypeError("Invalid input for Event Label Value") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) if (len(num_cols) > 0) & bool(encoding_configs): bin_size = encoding_configs["bin_size"] bin_method = encoding_configs["bin_method"] monotonicity_check = encoding_configs["monotonicity_check"] if monotonicity_check == 1: idf_encoded = monotonic_binning( spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size ) else: idf_encoded = attribute_binning( spark, idf, num_cols, [], bin_method, bin_size ) idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() else: idf_encoded = idf output = [] total_event = idf.where(F.col(label_col) == event_label).count() / idf.count() total_entropy = -( total_event * math.log2(total_event) + ((1 - total_event) * math.log2((1 - total_event))) ) for col in list_of_cols: idf_entropy = ( idf_encoded.withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col) .agg( F.sum(F.col(label_col)).alias("event_count"), F.count(F.col(label_col)).alias("total_count"), ) .dropna() .withColumn("event_pct", F.col("event_count") / F.col("total_count")) .withColumn( "segment_pct", F.col("total_count") / F.sum("total_count").over(Window.partitionBy()), ) .withColumn( "entropy", -F.col("segment_pct") * ( (F.col("event_pct") * F.log2(F.col("event_pct"))) + ((1 - F.col("event_pct")) * F.log2((1 - F.col("event_pct")))) ), ) ) entropy = ( idf_entropy.groupBy().sum("entropy").rdd.flatMap(lambda x: x).collect()[0] ) ig_value = total_entropy - entropy if entropy else None output.append([col, ig_value]) odf = ( spark.createDataFrame(output, ["attribute", "ig"]) .withColumn("ig", F.round(F.col("ig"), 4)) .orderBy(F.desc("ig")) ) if print_impact: odf.show(odf.count()) return odf
Functions
def IG_calculation(spark, idf, list_of_cols='all', drop_cols=[], label_col='label', event_label=1, encoding_configs={'bin_method': 'equal_frequency', 'bin_size': 10, 'monotonicity_check': 0}, print_impact=False)
-
Information Gain (IG) is another powerful technique for feature selection analysis. Information gain is calculated by comparing the entropy of the dataset before and after a transformation (introduction of attribute in this particular case). Similar to IV calculation, each category is a bin for categorical attributes, while numerical attributes need to be split into categories.
IG = Total Entropy – Entropy
Total Entropy= -%eventlog(%event)-(1-%event)log(1-%event)
Entropy = ∑(-%〖event〗_ilog(%〖event〗_i )-(1-%〖event〗_i )log(1-%〖event〗_i)
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
label_col
- Label/Target column (Default value = "label")
event_label
- Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
- Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
- True, False This argument is to print out the statistics.(Default value = False)
Returns
DataFrame
- [attribute, id]
Expand source code
def IG_calculation( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, encoding_configs={ "bin_method": "equal_frequency", "bin_size": 10, "monotonicity_check": 0, }, print_impact=False, ): """ Information Gain (IG) is another powerful technique for feature selection analysis. Information gain is calculated by comparing the entropy of the dataset before and after a transformation (introduction of attribute in this particular case). Similar to IV calculation, each category is a bin for categorical attributes, while numerical attributes need to be split into categories. IG = Total Entropy – Entropy Total Entropy= -%event*log(%event)-(1-%event)*log(1-%event) Entropy = ∑(-%〖event〗_i*log(%〖event〗_i )-(1-%〖event〗_i )*log(1-%〖event〗_i) Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) encoding_configs Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0). print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [attribute, id] """ if label_col not in idf.columns: raise TypeError("Invalid input for Label Column") if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + [label_col])]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") if idf.where(F.col(label_col) == event_label).count() == 0: raise TypeError("Invalid input for Event Label Value") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) if (len(num_cols) > 0) & bool(encoding_configs): bin_size = encoding_configs["bin_size"] bin_method = encoding_configs["bin_method"] monotonicity_check = encoding_configs["monotonicity_check"] if monotonicity_check == 1: idf_encoded = monotonic_binning( spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size ) else: idf_encoded = attribute_binning( spark, idf, num_cols, [], bin_method, bin_size ) idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() else: idf_encoded = idf output = [] total_event = idf.where(F.col(label_col) == event_label).count() / idf.count() total_entropy = -( total_event * math.log2(total_event) + ((1 - total_event) * math.log2((1 - total_event))) ) for col in list_of_cols: idf_entropy = ( idf_encoded.withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col) .agg( F.sum(F.col(label_col)).alias("event_count"), F.count(F.col(label_col)).alias("total_count"), ) .dropna() .withColumn("event_pct", F.col("event_count") / F.col("total_count")) .withColumn( "segment_pct", F.col("total_count") / F.sum("total_count").over(Window.partitionBy()), ) .withColumn( "entropy", -F.col("segment_pct") * ( (F.col("event_pct") * F.log2(F.col("event_pct"))) + ((1 - F.col("event_pct")) * F.log2((1 - F.col("event_pct")))) ), ) ) entropy = ( idf_entropy.groupBy().sum("entropy").rdd.flatMap(lambda x: x).collect()[0] ) ig_value = total_entropy - entropy if entropy else None output.append([col, ig_value]) odf = ( spark.createDataFrame(output, ["attribute", "ig"]) .withColumn("ig", F.round(F.col("ig"), 4)) .orderBy(F.desc("ig")) ) if print_impact: odf.show(odf.count()) return odf
def IV_calculation(spark, idf, list_of_cols='all', drop_cols=[], label_col='label', event_label=1, encoding_configs={'bin_method': 'equal_frequency', 'bin_size': 10, 'monotonicity_check': 0}, print_impact=False)
-
Information Value (IV) is simple and powerful technique to conduct attribute relevance analysis. It measures how well an attribute is able to distinguish between a binary target variable i.e. label 0 from label 1, and hence helps in ranking attributes on the basis of their importance. In the heart of IV methodology are groups (bins) of observations. For categorical attributes, usually each category is a bin while numerical attributes need to be split into categories.
IV = ∑ (% of non-events - % of events) * WOE
where:
WOE = In(% of non-events ➗ % of events)
% of event = % label 1 in a bin
% of non-event = % label 0 in a binGeneral rule of thumb while creating the bins are that a) each bin should have at least 5% of the observations, b) the WOE should be monotonic, i.e. either growing or decreasing with the bins, and c) missing values should be binned separately. An article from listendata.com can be referred for good understanding of IV & WOE concepts.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
label_col
- Label/Target column (Default value = "label")
event_label
- Value of (positive) event (i.e label 1) (Default value = 1)
encoding_configs
- Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0).
print_impact
- True, False This argument is to print out the statistics.(Default value = False)
Returns
DataFrame
- [attribute, iv]
Expand source code
def IV_calculation( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, encoding_configs={ "bin_method": "equal_frequency", "bin_size": 10, "monotonicity_check": 0, }, print_impact=False, ): """ Information Value (IV) is simple and powerful technique to conduct attribute relevance analysis. It measures how well an attribute is able to distinguish between a binary target variable i.e. label 0 from label 1, and hence helps in ranking attributes on the basis of their importance. In the heart of IV methodology are groups (bins) of observations. For categorical attributes, usually each category is a bin while numerical attributes need to be split into categories. IV = ∑ (% of non-events - % of events) * WOE <br>where: <br>WOE = In(% of non-events ➗ % of events) <br>% of event = % label 1 in a bin <br>% of non-event = % label 0 in a bin General rule of thumb while creating the bins are that a) each bin should have at least 5% of the observations, b) the WOE should be monotonic, i.e. either growing or decreasing with the bins, and c) missing values should be binned separately. An article from listendata.com can be referred for good understanding of IV & WOE concepts. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) encoding_configs Takes input in dictionary format. {} i.e. empty dict means no encoding is required. In case numerical columns are present and encoding is required, following keys shall be provided - "bin_size" (Default value = 10) i.e. no. of bins for converting the numerical columns to categorical, "bin_method" i.e. method of binning - "equal_frequency" or "equal_range" (Default value = "equal_frequency") and "monotonicity_check" 1 for monotonic binning else 0. monotonicity_check of 1 will dynamically calculate the bin_size ensuring monotonic nature but can be expensive operation (Default value = 0). print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [attribute, iv] """ if label_col not in idf.columns: raise TypeError("Invalid input for Label Column") if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + [label_col])]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") if idf.where(F.col(label_col) == event_label).count() == 0: raise TypeError("Invalid input for Event Label Value") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) if (len(num_cols) > 0) & bool(encoding_configs): bin_size = encoding_configs["bin_size"] bin_method = encoding_configs["bin_method"] monotonicity_check = encoding_configs["monotonicity_check"] if monotonicity_check == 1: idf_encoded = monotonic_binning( spark, idf, num_cols, [], label_col, event_label, bin_method, bin_size ) else: idf_encoded = attribute_binning( spark, idf, num_cols, [], bin_method, bin_size ) idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() else: idf_encoded = idf output = [] for col in list_of_cols: df_iv = ( idf_encoded.groupBy(col, label_col) .count() .withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col) .pivot(label_col) .sum("count") .fillna(0.5) .withColumn("event_pct", F.col("1") / F.sum("1").over(Window.partitionBy())) .withColumn( "nonevent_pct", F.col("0") / F.sum("0").over(Window.partitionBy()) ) .withColumn( "iv", (F.col("nonevent_pct") - F.col("event_pct")) * F.log(F.col("nonevent_pct") / F.col("event_pct")), ) ) iv_value = df_iv.select(F.sum("iv")).collect()[0][0] output.append([col, iv_value]) odf = ( spark.createDataFrame(output, ["attribute", "iv"]) .withColumn("iv", F.round(F.col("iv"), 4)) .orderBy(F.desc("iv")) ) if print_impact: odf.show(odf.count()) return odf
def correlation_matrix(spark, idf, list_of_cols='all', drop_cols=[], stats_unique={}, print_impact=False)
-
This function calculates correlation coefficient statistical, which measures the strength of the relationship between the relative movements of two attributes. Pearson’s correlation coefficient is a standard approach of measuring correlation between two variables. However, it has some drawbacks: a) It works only with continuous variables, b) It only accounts for a linear relationship between variables, and c) It is sensitive to outliers. To avoid these issues, we are computing Phik (𝜙k), which is a new and practical correlation coefficient that works consistently between categorical, ordinal and interval variables, captures non-linear dependency and reverts to the Pearson correlation coefficient in case of a bivariate normal input distribution. The correlation coefficient is calculated for every pair of attributes and its value lies between 0 and 1, where 0 means there is no correlation between the two attributes and 1 means strong correlation. However, this methodology have drawbacks of its own as it is found to be more computational expensive especially when number of columns in the input dataset is on higher side (number of pairs to analyse increases exponentially with number of columns). Further, there is no indication of the direction of the relationship. More detail can be referred from the source paper.
This function returns a correlation matrix dataframe of schema – attribute,
. Correlation between attribute X and Y can be found at intersection of a) row with value X in ‘attribute’ column and b) column ‘Y’ (or row with value Y in ‘attribute’ column and column ‘X’). Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
stats_unique
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
print_impact
- True, False This argument is to print out the statistics.(Default value = False)
Returns
DataFrame
- [attribute,*attribute_names]
Expand source code
def correlation_matrix( spark, idf, list_of_cols="all", drop_cols=[], stats_unique={}, print_impact=False ): """ This function calculates correlation coefficient statistical, which measures the strength of the relationship between the relative movements of two attributes. Pearson’s correlation coefficient is a standard approach of measuring correlation between two variables. However, it has some drawbacks: a) It works only with continuous variables, b) It only accounts for a linear relationship between variables, and c) It is sensitive to outliers. To avoid these issues, we are computing Phik (𝜙k), which is a new and practical correlation coefficient that works consistently between categorical, ordinal and interval variables, captures non-linear dependency and reverts to the Pearson correlation coefficient in case of a bivariate normal input distribution. The correlation coefficient is calculated for every pair of attributes and its value lies between 0 and 1, where 0 means there is no correlation between the two attributes and 1 means strong correlation. However, this methodology have drawbacks of its own as it is found to be more computational expensive especially when number of columns in the input dataset is on higher side (number of pairs to analyse increases exponentially with number of columns). Further, there is no indication of the direction of the relationship. More detail can be referred from the [ source paper] [1]. [1]: https://arxiv.org/abs/1811.11440/ "source paper" This function returns a correlation matrix dataframe of schema – attribute, <attribute_names>. Correlation between attribute X and Y can be found at intersection of a) row with value X in ‘attribute’ column and b) column ‘Y’ (or row with value Y in ‘attribute’ column and column ‘X’). Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [attribute,*attribute_names] """ if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if stats_unique == {}: remove_cols = ( uniqueCount_computation(spark, idf, list_of_cols) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: remove_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + remove_cols)]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") combis = [list(c) for c in itertools.combinations_with_replacement(list_of_cols, 2)] hists = idf.select(list_of_cols).pm_make_histograms(combis) grids = {k: get_2dgrid(h) for k, h in hists.items()} odf_pd = spark_phik_matrix_from_hist2d_dict(spark.sparkContext, grids) odf_pd["attribute"] = odf_pd.index list_of_cols.sort() odf = ( spark.createDataFrame(odf_pd) .select(["attribute"] + list_of_cols) .orderBy("attribute") ) if print_impact: odf.show(odf.count()) return odf
def variable_clustering(spark, idf, list_of_cols='all', drop_cols=[], sample_size=100000, stats_unique={}, stats_mode={}, print_impact=False)
-
Variable Clustering groups attributes that are as correlated as possible among themselves within a cluster and as uncorrelated as possible with attribute in other clusters. The function is leveraging VarClusHi library to do variable clustering; however, this library is not implemented in a scalable manner due to which the analysis is done on a sample dataset. Further, it is found to be a bit computational expensive especially when number of columns in the input dataset is on higher side (number of pairs to analyse increases exponentially with number of columns).
It returns a Spark Dataframe with schema – Cluster, Attribute, RS_Ratio. Attributes similar to each other are grouped together with the same cluster id. The attribute with the lowest (1 — RS_Ratio) can be chosen as a representative of the cluster while discarding the other attributes from that cluster. This can also help in achieving the dimension reduction, if required.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
sample_size
- Maximum sample size (in terms of number of rows) taken for the computation. Sample dataset is extracted using random sampling. (Default value = 100000)
stats_unique
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used to remove single value columns from the analysis purpose. (Default value = {})
stats_mode
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used for MMM imputation as Variable Clustering doesn’t work with missing values. (Default value = {})
print_impact
- True, False This argument is to print out the statistics.(Default value = False)
Returns
DataFrame
- [Cluster, Attribute, RS_Ratio]
Expand source code
def variable_clustering( spark, idf, list_of_cols="all", drop_cols=[], sample_size=100000, stats_unique={}, stats_mode={}, print_impact=False, ): """ Variable Clustering groups attributes that are as correlated as possible among themselves within a cluster and as uncorrelated as possible with attribute in other clusters. The function is leveraging [VarClusHi] [2] library to do variable clustering; however, this library is not implemented in a scalable manner due to which the analysis is done on a sample dataset. Further, it is found to be a bit computational expensive especially when number of columns in the input dataset is on higher side (number of pairs to analyse increases exponentially with number of columns). [2]: https://github.com/jingtt/varclushi "VarCluShi" It returns a Spark Dataframe with schema – Cluster, Attribute, RS_Ratio. Attributes similar to each other are grouped together with the same cluster id. The attribute with the lowest (1 — RS_Ratio) can be chosen as a representative of the cluster while discarding the other attributes from that cluster. This can also help in achieving the dimension reduction, if required. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to analyse e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) sample_size Maximum sample size (in terms of number of rows) taken for the computation. Sample dataset is extracted using random sampling. (Default value = 100000) stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used to remove single value columns from the analysis purpose. (Default value = {}) stats_mode Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. This is used for MMM imputation as Variable Clustering doesn’t work with missing values. (Default value = {}) print_impact True, False This argument is to print out the statistics.(Default value = False) Returns ------- DataFrame [Cluster, Attribute, RS_Ratio] """ if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") idf_sample = idf.sample(False, min(1.0, float(sample_size) / idf.count()), 0) idf_sample.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() if stats_unique == {}: remove_cols = ( uniqueCount_computation(spark, idf_sample, list_of_cols) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: remove_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) list_of_cols = [e for e in list_of_cols if e not in remove_cols] idf_sample = idf_sample.select(list_of_cols) num_cols, cat_cols, other_cols = attributeType_segregation(idf_sample) for i in idf_sample.dtypes: if i[1].startswith("decimal"): idf_sample = idf_sample.withColumn(i[0], F.col(i[0]).cast("double")) idf_encoded = cat_to_num_unsupervised( spark, idf_sample, list_of_cols=cat_cols, method_type=1 ) idf_imputed = imputation_MMM(spark, idf_encoded, stats_mode=stats_mode) idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() idf_sample.unpersist() idf_pd = idf_imputed.toPandas() vc = VarClusHi(idf_pd, maxeigval2=1, maxclus=None) vc.varclus() odf_pd = vc.rsquare odf = spark.createDataFrame(odf_pd).select( "Cluster", F.col("Variable").alias("Attribute"), F.round(F.col("RS_Ratio"), 4).alias("RS_Ratio"), ) if print_impact: odf.show(odf.count()) return odf