transformers
The data transformer module supports selected pre-processing & transformation functions, such as binning, encoding, scaling, imputation, to name a few, which are required for statistics generation and quality checks. Functions supported through this module are listed below:
- attribute_binning
- monotonic_binning
- cat_to_num_transformer
- cat_to_num_unsupervised
- cat_to_num_supervised
- z_standardization
- IQR_standardization
- normalization
- imputation_MMM
- imputation_sklearn
- imputation_matrixFactorization
- auto_imputation
- autoencoder_latentFeatures
- PCA_latentFeatures
- feature_transformation
- boxcox_transformation
- outlier_categories
- expression_parser
Expand source code
# coding=utf-8 """ The data transformer module supports selected pre-processing & transformation functions, such as binning, encoding, scaling, imputation, to name a few, which are required for statistics generation and quality checks. Functions supported through this module are listed below: - attribute_binning - monotonic_binning - cat_to_num_transformer - cat_to_num_unsupervised - cat_to_num_supervised - z_standardization - IQR_standardization - normalization - imputation_MMM - imputation_sklearn - imputation_matrixFactorization - auto_imputation - autoencoder_latentFeatures - PCA_latentFeatures - feature_transformation - boxcox_transformation - outlier_categories - expression_parser """ import copy import os import pickle import random import platform import subprocess import tempfile import warnings from itertools import chain import numpy as np import pandas as pd import pyspark from packaging import version from scipy import stats if version.parse(pyspark.__version__) < version.parse("3.0.0"): from pyspark.ml.feature import OneHotEncoderEstimator as OneHotEncoder else: from pyspark.ml.feature import OneHotEncoder from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.feature import ( PCA, Imputer, ImputerModel, IndexToString, MinMaxScaler, MinMaxScalerModel, PCAModel, StringIndexer, StringIndexerModel, VectorAssembler, ) from pyspark.ml.linalg import DenseVector from pyspark.ml.recommendation import ALS from pyspark.mllib.stat import Statistics from pyspark.sql import functions as F from pyspark.sql import types as T from pyspark.sql.window import Window from anovos.data_analyzer.stats_generator import ( missingCount_computation, uniqueCount_computation, ) from anovos.data_ingest.data_ingest import read_dataset, recast_column from anovos.data_ingest.data_sampling import data_sample from anovos.shared.utils import ends_with, attributeType_segregation, get_dtype # enable_iterative_imputer is prequisite for importing IterativeImputer # check the following issue for more details https://github.com/scikit-learn/scikit-learn/issues/16833 from sklearn.experimental import enable_iterative_imputer # noqa from sklearn.impute import KNNImputer, IterativeImputer if "arm64" not in platform.version().lower(): import tensorflow from tensorflow.keras.models import load_model, Model from tensorflow.keras.layers import Dense, Input, BatchNormalization, LeakyReLU def attribute_binning( spark, idf, list_of_cols="all", drop_cols=[], method_type="equal_range", bin_size=10, bin_dtype="numerical", pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Attribute binning (or discretization) is a method of numerical attribute into discrete (integer or categorical values) using pre-defined number of bins. This data pre-processing technique is used to reduce the effects of minor observation errors. Also, Binning introduces non-linearity and tends to improve the performance of the model. In this function, we are focussing on unsupervised way of binning i.e. without taking the target variable into account - Equal Range Binning, Equal Frequency Binning. In Equal Range method, each bin is of equal size/width and computed as: w = max- min / no. of bins *bins cutoff=[min, min+w,min+2w…..,max-w,max]* whereas in Equal Frequency binning method, bins are created in such a way that each bin has equal no. of rows, though the width of bins may vary from each other. w = 1 / no. of bins *bins cutoff=[min, wthpctile, 2wthpctile….,max ]* Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range") bin_size Number of bins. (Default value = 10) bin_dtype "numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical") pre_existing_model Boolean argument – True or False. True if binning model exists already, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print the number of categories generated for each attribute (may or may be not same as bin_size) Returns ------- DataFrame Binned Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Binning Performed - No numerical column(s) to transform") return idf if method_type not in ("equal_frequency", "equal_range"): raise TypeError("Invalid input for method_type") if bin_size < 2: raise TypeError("Invalid input for bin_size") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if pre_existing_model: df_model = spark.read.parquet(model_path + "/attribute_binning") bin_cutoffs = [] for i in list_of_cols: mapped_value = ( df_model.where(F.col("attribute") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) bin_cutoffs.append(mapped_value) else: if method_type == "equal_frequency": pctile_width = 1 / bin_size pctile_cutoff = [] for j in range(1, bin_size): pctile_cutoff.append(j * pctile_width) bin_cutoffs = idf.approxQuantile(list_of_cols, pctile_cutoff, 0.01) else: funs = [F.max, F.min] exprs = [f(F.col(c)) for f in funs for c in list_of_cols] list_result = idf.groupby().agg(*exprs).rdd.flatMap(lambda x: x).collect() bin_cutoffs = [] drop_col_process = [] for i in range(int(len(list_result) / 2)): bin_cutoff = [] max_val = list_result[i] min_val = list_result[i + int(len(list_result) / 2)] if not max_val and max_val != 0: drop_col_process.append(list_of_cols[i]) continue bin_width = (max_val - min_val) / bin_size for j in range(1, bin_size): bin_cutoff.append(min_val + j * bin_width) bin_cutoffs.append(bin_cutoff) if drop_col_process: warnings.warn( "Columns contains too much null values. Dropping " + ", ".join(drop_col_process) ) list_of_cols = list( set([e for e in list_of_cols if e not in drop_col_process]) ) if model_path != "NA": df_model = spark.createDataFrame( zip(list_of_cols, bin_cutoffs), schema=["attribute", "parameters"] ) df_model.write.parquet(model_path + "/attribute_binning", mode="overwrite") def bucket_label(value, index): if value is None: return None for i in range(0, len(bin_cutoffs[index])): if value <= bin_cutoffs[index][i]: if bin_dtype == "numerical": return i + 1 else: if i == 0: return "<= " + str(round(bin_cutoffs[index][i], 4)) else: return ( str(round(bin_cutoffs[index][i - 1], 4)) + "-" + str(round(bin_cutoffs[index][i], 4)) ) else: next if bin_dtype == "numerical": return len(bin_cutoffs[0]) + 1 else: return "> " + str(round(bin_cutoffs[index][len(bin_cutoffs[0]) - 1], 4)) if bin_dtype == "numerical": f_bucket_label = F.udf(bucket_label, T.IntegerType()) else: f_bucket_label = F.udf(bucket_label, T.StringType()) odf = idf for idx, i in enumerate(list_of_cols): odf = odf.withColumn(i + "_binned", f_bucket_label(F.col(i), F.lit(idx))) if output_mode == "replace": for col in list_of_cols: odf = odf.drop(col).withColumnRenamed(col + "_binned", col) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_binned") for i in list_of_cols] uniqueCount_computation(spark, odf, output_cols).show(len(output_cols), False) return odf def monotonic_binning( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, bin_method="equal_range", bin_size=10, bin_dtype="numerical", output_mode="replace", ): """ This function constitutes supervised way of binning the numerical attribute into discrete (integer or categorical values) attribute. Instead of pre-defined fixed number of bins, number of bins are dynamically computed to ensure the monotonic nature of bins i.e. % event should increase or decrease with the bin. Monotonic nature of bins is evaluated by looking at spearman rank correlation, which should be either +1 or -1, between the bin index and % event. In case, the monotonic nature is not attained, user defined fixed number of bins are used for the binning. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) bin_method "equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range") bin_size Default number of bins in case monotonicity is not achieved. bin_dtype "numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical") output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace") Returns ------- DataFrame Binned Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + [label_col])]) ) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") odf = idf for col in list_of_cols: n = 20 r = 0 while n > 2: tmp = ( attribute_binning( spark, idf, [col], drop_cols=[], method_type=bin_method, bin_size=n, output_mode="append", ) .select(label_col, col, col + "_binned") .withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col + "_binned") .agg(F.avg(col).alias("mean_val"), F.avg(label_col).alias("mean_label")) .dropna() ) r, p = stats.spearmanr( tmp.toPandas()[["mean_val"]], tmp.toPandas()[["mean_label"]] ) if r == 1.0: odf = attribute_binning( spark, odf, [col], drop_cols=[], method_type=bin_method, bin_size=n, bin_dtype=bin_dtype, output_mode=output_mode, ) break n = n - 1 r = 0 if r < 1.0: odf = attribute_binning( spark, odf, [col], drop_cols=[], method_type=bin_method, bin_size=bin_size, bin_dtype=bin_dtype, output_mode=output_mode, ) return odf def cat_to_num_transformer( spark, idf, list_of_cols, drop_cols, method_type, encoding, label_col, event_label ): """ This is method which helps converting a categorical attribute into numerical attribute(s) based on the analysis dataset. If there's a presence of label column then the relevant processing would happen through cat_to_num_supervised. However, for unsupervised scenario, the processing would happen through cat_to_num_unsupervised. Computation details can be referred from the respective functions. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. method_type Depending upon the use case the method type can be either Supervised or Unsupervised. For Supervised use case, label_col is mandatory. encoding "label_encoding" or "onehot_encoding" In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available that can be selected by index_order argument). In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column. label_col Label/Target column event_label Value of (positive) event """ num_cols, cat_cols, other_cols = attributeType_segregation(idf) if len(cat_cols) > 0: if list_of_cols == "all": list_of_cols = cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if (method_type == "supervised") & (label_col is not None): odf = cat_to_num_supervised( spark, idf, label_col=label_col, event_label=event_label ) odf = odf.withColumn( label_col, F.when(F.col(label_col) == event_label, F.lit(1)).otherwise(F.lit(0)), ) return odf elif (method_type == "unsupervised") & (label_col is None): odf = cat_to_num_unsupervised( spark, idf, method_type=encoding, index_order="frequencyDesc", ) return odf else: return idf def cat_to_num_unsupervised( spark, idf, list_of_cols="all", drop_cols=[], method_type="label_encoding", index_order="frequencyDesc", cardinality_threshold=50, pre_existing_model=False, model_path="NA", stats_unique={}, output_mode="replace", print_impact=False, ): """ This is unsupervised method of converting a categorical attribute into numerical attribute(s). This is among the most important transformations required for any modelling exercise, as most of the machine learning algorithms cannot process categorical values. It covers two popular encoding techniques – label encoding & one-hot encoding. In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available – can be selected by index_order argument). One of the pitfalls of using this technique is that the model may learn some spurious relationship, which doesn't exist or might not make any logical sense in the real world settings. In one-hot encoding, every unique value in the attribute will be added as a feature in a form of dummy/binary attribute. However, using this method on high cardinality attributes can further aggravate the dimensionality issue. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "label_encoding" or "onehot_encoding" In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available that can be selected by index_order argument). In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column. (Default value = 1) index_order "frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc". Valid only for Label Encoding method_type. (Default value = "frequencyDesc") cardinality_threshold Defines threshold to skip columns with higher cardinality values from encoding - a warning is issued. (Default value = 50) pre_existing_model Boolean argument - True or False. True if encoding model exists already, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre existing model nor there is a need to save one. stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_index" for label encoding e.g. column X is appended as X_index, or a postfix "_{n}" for one hot encoding, n varies from 0 to unique value count e.g. column X is appended as X_0, X_1, X_2 (n = 3 i.e. no. of unique values for X). (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out the change in schema (one hot encoding) or descriptive statistics (label encoding) Returns ------- DataFrame Encoded Dataframe """ cat_cols = attributeType_segregation(idf)[1] if list_of_cols == "all": list_of_cols = cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if method_type not in ("onehot_encoding", "label_encoding"): raise TypeError("Invalid input for method_type") if index_order not in ( "frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc", ): raise TypeError("Invalid input for Encoding Index Order") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if stats_unique == {}: skip_cols = ( uniqueCount_computation(spark, idf, list_of_cols) .where(F.col("unique_values") > cardinality_threshold) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: skip_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") > cardinality_threshold) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) skip_cols = list( set([e for e in skip_cols if e in list_of_cols and e not in drop_cols]) ) if skip_cols: warnings.warn( "Columns dropped from encoding due to high cardinality: " + ",".join(skip_cols) ) list_of_cols = list( set([e for e in list_of_cols if e not in drop_cols + skip_cols]) ) if len(list_of_cols) == 0: warnings.warn("No Encoding Computation - No categorical column(s) to transform") return idf list_of_cols_vec = [] list_of_cols_idx = [] for i in list_of_cols: list_of_cols_vec.append(i + "_vec") list_of_cols_idx.append(i + "_index") odf_indexed = idf if version.parse(pyspark.__version__) < version.parse("3.0.0"): for idx, i in enumerate(list_of_cols): if pre_existing_model: indexerModel = StringIndexerModel.load( model_path + "/cat_to_num_unsupervised/indexer-model/" + i ) else: stringIndexer = StringIndexer( inputCol=i, outputCol=i + "_index", stringOrderType=index_order, handleInvalid="keep", ) indexerModel = stringIndexer.fit(idf.select(i)) if model_path != "NA": indexerModel.write().overwrite().save( model_path + "/cat_to_num_unsupervised/indexer-model/" + i ) odf_indexed = indexerModel.transform(odf_indexed) else: if pre_existing_model: indexerModel = StringIndexerModel.load( model_path + "/cat_to_num_unsupervised/indexer" ) else: stringIndexer = StringIndexer( inputCols=list_of_cols, outputCols=list_of_cols_idx, stringOrderType=index_order, handleInvalid="keep", ) indexerModel = stringIndexer.fit(odf_indexed) if model_path != "NA": indexerModel.write().overwrite().save( model_path + "/cat_to_num_unsupervised/indexer" ) odf_indexed = indexerModel.transform(odf_indexed) if method_type == "onehot_encoding": if pre_existing_model: encoder = OneHotEncoder.load( model_path + "/cat_to_num_unsupervised/encoder" ) else: encoder = OneHotEncoder( inputCols=list_of_cols_idx, outputCols=list_of_cols_vec, handleInvalid="keep", ) if model_path != "NA": encoder.write().overwrite().save( model_path + "/cat_to_num_unsupervised/encoder" ) odf = encoder.fit(odf_indexed).transform(odf_indexed) new_cols = [] odf_sample = odf.take(1) for i in list_of_cols: odf_schema = odf.schema uniq_cats = odf_sample[0].asDict()[i + "_vec"].size for j in range(0, uniq_cats): odf_schema = odf_schema.add( T.StructField(i + "_" + str(j), T.IntegerType()) ) new_cols.append(i + "_" + str(j)) odf = odf.rdd.map( lambda x: ( *x, *(DenseVector(x[i + "_vec"]).toArray().astype(int).tolist()), ) ).toDF(schema=odf_schema) if output_mode == "replace": odf = odf.drop(i, i + "_vec", i + "_index") else: odf = odf.drop(i + "_vec", i + "_index") else: odf = odf_indexed for i in list_of_cols: odf = odf.withColumn( i + "_index", F.when(F.col(i).isNull(), None).otherwise( F.col(i + "_index").cast(T.IntegerType()) ), ) if output_mode == "replace": for i in list_of_cols: odf = odf.drop(i).withColumnRenamed(i + "_index", i) odf = odf.select(idf.columns) if print_impact: if method_type == "label_encoding": if output_mode == "append": new_cols = [i + "_index" for i in list_of_cols] else: new_cols = list_of_cols print("Before") idf.select(list_of_cols).summary("count", "min", "max").show(3, False) print("After") odf.select(new_cols).summary("count", "min", "max").show(3, False) if method_type == "onehot_encoding": print("Before") idf.select(list_of_cols).printSchema() print("After") if output_mode == "append": odf.select(list_of_cols + new_cols).printSchema() else: odf.select(new_cols).printSchema() if skip_cols: print( "Columns dropped from encoding due to high cardinality: " + ",".join(skip_cols) ) return odf def cat_to_num_supervised( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, pre_existing_model=False, model_path="NA", output_mode="replace", persist=False, persist_option=pyspark.StorageLevel.MEMORY_AND_DISK, print_impact=False, ): """ This is a supervised method to convert a categorical attribute into a numerical attribute. It takes a label/target column to indicate whether the event is positive or negative. For each column, the positive event rate for each categorical value is used as the encoded numerical value. For example, there are 3 distinct values in a categorical attribute X: X1, X2 and X3. Within the input dataframe, there are - 15 positive events and 5 negative events with X==X1; - 10 positive events and 40 negative events with X==X2; - 20 positive events and 20 negative events with X==X3. Thus, value X1 is mapped to 15/(15+5) = 0.75, value X2 is mapped to 10/(10+40) = 0.2 and value X3 is mapped to 20/(20+20) = 0.5. This mapping will be applied to all values in attribute X. This encoding method can be helpful in avoiding creating too many dummy variables which may cause dimensionality issue and it also works with categorical attributes without an order or rank. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) pre_existing_model Boolean argument – True or False. True if model (original and mapped numerical value for each column) exists already, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" is used to save the model in "intermediate_data/" folder for optimization purpose. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_encoded" e.g. column X is appended as X_encoded. (Default value = "replace") persist Boolean argument - True or False. This parameter is for optimization purpose. If True, repeatedly used dataframe will be persisted (StorageLevel can be specified in persist_option). We recommend setting this parameter as True if at least one of the following criteria is True: (1) The underlying data source is in csv format (2) The transformation will be applicable to most columns. (Default value = False) persist_option A pyspark.StorageLevel instance. This parameter is useful only when persist is True. (Default value = pyspark.StorageLevel.MEMORY_AND_DISK) print_impact True, False (Default value = False) This argument is to print out the descriptive statistics of encoded columns. Returns ------- DataFrame Encoded Dataframe """ cat_cols = attributeType_segregation(idf)[1] if list_of_cols == "all": list_of_cols = cat_cols elif isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if (e not in drop_cols) & (e != label_col)]) ) if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Categorical Encoding - No categorical column(s) to transform") return idf if label_col not in idf.columns: raise TypeError("Invalid input for Label Column") odf = idf label_col_bool = label_col + "_cat_to_num_sup_temp" idf = idf.withColumn( label_col_bool, F.when(F.col(label_col) == event_label, "1").otherwise("0"), ) if model_path == "NA": skip_if_error = True model_path = "intermediate_data" else: skip_if_error = False save_model = True if persist: idf = idf.persist(persist_option) for index, i in enumerate(list_of_cols): if pre_existing_model: df_tmp = spark.read.csv( model_path + "/cat_to_num_supervised/" + i, header=True, inferSchema=True, ) else: df_tmp = ( idf.select(i, label_col_bool) .groupBy(i) .pivot(label_col_bool) .count() .fillna(0) .withColumn( i + "_encoded", F.round(F.col("1") / (F.col("1") + F.col("0")), 4) ) .drop(*["1", "0"]) ) if save_model: try: df_tmp.coalesce(1).write.csv( model_path + "/cat_to_num_supervised/" + i, header=True, mode="overwrite", ignoreLeadingWhiteSpace=False, ignoreTrailingWhiteSpace=False, ) df_tmp = spark.read.csv( model_path + "/cat_to_num_supervised/" + i, header=True, inferSchema=True, ) except Exception as error: if skip_if_error: warnings.warn( "For optimization purpose, we recommend specifying a valid model_path value to save the intermediate data. Saving to the default path - '" + model_path + "/cat_to_num_supervised/" + i + "' faced an error." ) save_model = False else: raise error if df_tmp.count() > 1: odf = odf.join(df_tmp, i, "left_outer") else: odf = odf.crossJoin(df_tmp) if output_mode == "replace": for i in list_of_cols: odf = odf.drop(i).withColumnRenamed(i + "_encoded", i) odf = odf.select([i for i in idf.columns if i != label_col_bool]) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_encoded") for i in list_of_cols] print("Before: ") idf.select(list_of_cols).summary("count", "min", "max").show(3, False) print("After: ") odf.select(output_cols).summary("count", "min", "max").show(3, False) if persist: idf.unpersist() return odf def z_standardization( spark, idf, list_of_cols="all", drop_cols=[], pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Standardization is commonly used in data pre-processing process. z_standardization standardizes the selected attributes of an input dataframe by normalizing each attribute to have standard deviation of 1 and mean of 0. For each attribute, the standard deviation (s) and mean (u) are calculated and a sample x will be standardized into ( x-u)/s. If the standard deviation of an attribute is 0, it will be excluded in standardization and a warning will be shown. None values will be kept as None in the output dataframe. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) pre_existing_model Boolean argument – True or False. True if model files (Mean/stddev for each feature) exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns. Returns ------- DataFrame Rescaled Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Standardization Performed - No numerical column(s) to transform" ) return idf if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") parameters = [] excluded_cols = [] if pre_existing_model: df_model = spark.read.parquet(model_path + "/z_standardization") for i in list_of_cols: mapped_value = ( df_model.where(F.col("feature") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) parameters.append(mapped_value) else: for i in list_of_cols: mean, stddev = idf.select(F.mean(i), F.stddev(i)).first() parameters.append( [float(mean) if mean else None, float(stddev) if stddev else None] ) if stddev: if round(stddev, 5) == 0.0: excluded_cols.append(i) else: excluded_cols.append(i) if len(excluded_cols) > 0: warnings.warn( "The following column(s) are excluded from standardization because the standard deviation is zero:" + str(excluded_cols) ) odf = idf for index, i in enumerate(list_of_cols): if i not in excluded_cols: modify_col = (i + "_scaled") if (output_mode == "append") else i odf = odf.withColumn( modify_col, (F.col(i) - parameters[index][0]) / parameters[index][1] ) if (not pre_existing_model) & (model_path != "NA"): df_model = spark.createDataFrame( zip(list_of_cols, parameters), schema=["feature", "parameters"] ) df_model.coalesce(1).write.parquet( model_path + "/z_standardization", mode="overwrite" ) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [ (i + "_scaled") for i in list_of_cols if i not in excluded_cols ] print("Before: ") idf.select(list_of_cols).describe().show(5, False) print("After: ") odf.select(output_cols).describe().show(5, False) return odf def IQR_standardization( spark, idf, list_of_cols="all", drop_cols=[], pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) pre_existing_model Boolean argument – True or False. True if model files (25/50/75 percentile for each feature) exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns. Returns ------- DataFrame Rescaled Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Standardization Performed - No numerical column(s) to transform" ) return idf if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if pre_existing_model: df_model = spark.read.parquet(model_path + "/IQR_standardization") parameters = [] for i in list_of_cols: mapped_value = ( df_model.where(F.col("feature") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) parameters.append(mapped_value) else: parameters = idf.approxQuantile(list_of_cols, [0.25, 0.5, 0.75], 0.01) excluded_cols = [] for i, param in zip(list_of_cols, parameters): if len(param) > 0: if round(param[0], 5) == round(param[2], 5): excluded_cols.append(i) else: excluded_cols.append(i) if len(excluded_cols) > 0: warnings.warn( "The following column(s) are excluded from standardization because the 75th and 25th percentiles are the same:" + str(excluded_cols) ) odf = idf for index, i in enumerate(list_of_cols): if i not in excluded_cols: modify_col = (i + "_scaled") if (output_mode == "append") else i odf = odf.withColumn( modify_col, (F.col(i) - parameters[index][1]) / (parameters[index][2] - parameters[index][0]), ) if (not pre_existing_model) & (model_path != "NA"): df_model = spark.createDataFrame( zip(list_of_cols, parameters), schema=["feature", "parameters"] ) df_model.coalesce(1).write.parquet( model_path + "/IQR_standardization", mode="overwrite" ) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [ (i + "_scaled") for i in list_of_cols if i not in excluded_cols ] print("Before: ") idf.select(list_of_cols).describe().show(5, False) print("After: ") odf.select(output_cols).describe().show(5, False) return odf def normalization( idf, list_of_cols="all", drop_cols=[], pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Parameters ---------- idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) pre_existing_model Boolean argument – True or False. True if normalization/scalar model exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out before and after descriptive statistics of rescaled columns. Returns ------- DataFrame Rescaled Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Normalization Performed - No numerical column(s) to transform" ) return idf if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") idf_id = idf.withColumn("tempID", F.monotonically_increasing_id()) idf_partial = idf_id.select(["tempID"] + list_of_cols) assembler = VectorAssembler( inputCols=list_of_cols, outputCol="list_of_cols_vector", handleInvalid="keep" ) assembled_data = assembler.transform(idf_partial) if pre_existing_model: scalerModel = MinMaxScalerModel.load(model_path + "/normalization") else: scaler = MinMaxScaler( inputCol="list_of_cols_vector", outputCol="list_of_cols_scaled" ) scalerModel = scaler.fit(assembled_data) if model_path != "NA": scalerModel.write().overwrite().save(model_path + "/normalization") scaledData = scalerModel.transform(assembled_data) def vector_to_array(v): return v.toArray().tolist() f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType())) odf_partial = scaledData.withColumn( "list_of_cols_array", f_vector_to_array("list_of_cols_scaled") ).drop(*["list_of_cols_scaled", "list_of_cols_vector"]) odf_schema = odf_partial.schema for i in list_of_cols: odf_schema = odf_schema.add(T.StructField(i + "_scaled", T.FloatType())) odf_partial = ( odf_partial.rdd.map(lambda x: (*x, *x["list_of_cols_array"])) .toDF(schema=odf_schema) .drop("list_of_cols_array") ) odf = idf_id.join(odf_partial.drop(*list_of_cols), "tempID", "left_outer").select( idf.columns + [ ( F.when(F.isnan(F.col(i + "_scaled")), None).otherwise( F.col(i + "_scaled") ) ).alias(i + "_scaled") for i in list_of_cols ] ) if output_mode == "replace": for i in list_of_cols: odf = odf.drop(i).withColumnRenamed(i + "_scaled", i) odf = odf.select(idf.columns) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_scaled") for i in list_of_cols] print("Before: ") idf.select(list_of_cols).describe().show(5, False) print("After: ") odf.select(output_cols).describe().show(5, False) return odf def imputation_MMM( spark, idf, list_of_cols="missing", drop_cols=[], method_type="median", pre_existing_model=False, model_path="NA", output_mode="replace", stats_missing={}, stats_mode={}, print_impact=False, ): """ This function handles missing value related issues by substituting null values by the measure of central tendency (mode for categorical features and mean/median for numerical features). For numerical attributes, it leverages [Imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer .html) functionality of Spark MLlib. Though, Imputer can be used for categorical attributes but this feature is available only in Spark3.x, therefore for categorical features, we compute mode or leverage mode computation from Measures of Central Tendency. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "median", "mean" (valid only for for numerical columns attributes). Mode is only option for categorical columns. (Default value = "median") pre_existing_model Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) stats_mode Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. Returns ------- DataFrame Imputed Dataframe """ if stats_missing == {}: missing_df = missingCount_computation(spark, idf) else: missing_df = read_dataset(spark, **stats_missing).select( "attribute", "missing_count", "missing_pct" ) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if str(pre_existing_model).lower() == "true": pre_existing_model = True elif str(pre_existing_model).lower() == "false": pre_existing_model = False else: raise TypeError("Non-Boolean input for pre_existing_model") if (len(missing_cols) == 0) & (not pre_existing_model) & (model_path == "NA"): return idf num_cols, cat_cols, other_cols = attributeType_segregation(idf) if list_of_cols == "all": list_of_cols = num_cols + cat_cols if list_of_cols == "missing": list_of_cols = [x for x in missing_cols if x in num_cols + cat_cols] if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if len(list_of_cols) == 0: warnings.warn("No Imputation performed- No column(s) to impute") return idf if any(x not in num_cols + cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if method_type not in ("mode", "mean", "median"): raise TypeError("Invalid input for method_type") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) odf = idf if len(num_cols) > 0: recast_cols = [] recast_type = [] for i in num_cols: if get_dtype(idf, i) not in ("float", "double"): odf = odf.withColumn(i, F.col(i).cast(T.DoubleType())) recast_cols.append(i + "_imputed") recast_type.append(get_dtype(idf, i)) # For mode imputation if method_type == "mode": if stats_mode == {}: parameters = [ str( ( idf.select(i) .dropna() .groupby(i) .count() .orderBy("count", ascending=False) .first() or [None] )[0] ) for i in num_cols ] else: mode_df = read_dataset(spark, **stats_mode).replace("None", None) mode_df_cols = list(mode_df.select("attribute").toPandas()["attribute"]) parameters = [] for i in num_cols: if i not in mode_df_cols: parameters.append( str( ( idf.select(i) .dropna() .groupby(i) .count() .orderBy("count", ascending=False) .first() or [None] )[0] ) ) else: parameters.append( mode_df.where(F.col("attribute") == i) .select("mode") .rdd.flatMap(list) .collect()[0] ) for index, i in enumerate(num_cols): odf = odf.withColumn( i + "_imputed", F.when(F.col(i).isNull(), parameters[index]).otherwise(F.col(i)), ) else: # For mean, median imputation # Building new imputer model or uploading the existing model if pre_existing_model: imputerModel = ImputerModel.load( model_path + "/imputation_MMM/num_imputer-model" ) else: imputer = Imputer( strategy=method_type, inputCols=num_cols, outputCols=[(e + "_imputed") for e in num_cols], ) imputerModel = imputer.fit(odf) # Applying model # odf = recast_column(imputerModel.transform(odf), recast_cols, recast_type) odf = imputerModel.transform(odf) for i, j in zip(recast_cols, recast_type): odf = odf.withColumn(i, F.col(i).cast(j)) # Saving model if required if (not pre_existing_model) & (model_path != "NA"): imputerModel.write().overwrite().save( model_path + "/imputation_MMM/num_imputer-model" ) if len(cat_cols) > 0: if pre_existing_model: df_model = spark.read.csv( model_path + "/imputation_MMM/cat_imputer", header=True, inferSchema=True, ) parameters = [] for i in cat_cols: mapped_value = ( df_model.where(F.col("attribute") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) parameters.append(mapped_value) else: if stats_mode == {}: parameters = [ str( ( idf.select(i) .dropna() .groupby(i) .count() .orderBy("count", ascending=False) .first() or [None] )[0] ) for i in cat_cols ] else: mode_df = read_dataset(spark, **stats_mode).replace("None", None) parameters = [ mode_df.where(F.col("attribute") == i) .select("mode") .rdd.flatMap(list) .collect()[0] for i in cat_cols ] for index, i in enumerate(cat_cols): odf = odf.withColumn( i + "_imputed", F.when(F.col(i).isNull(), parameters[index]).otherwise(F.col(i)), ) if (not pre_existing_model) & (model_path != "NA"): df_model = spark.createDataFrame( zip(cat_cols, parameters), schema=["attribute", "parameters"] ) df_model.repartition(1).write.csv( model_path + "/imputation_MMM/cat_imputer", header=True, mode="overwrite", ) for i in num_cols + cat_cols: if i not in missing_cols: odf = odf.drop(i + "_imputed") elif output_mode == "replace": odf = odf.drop(i).withColumnRenamed(i + "_imputed", i) if print_impact: if output_mode == "replace": odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, list_of_cols).select( "attribute", F.col("missing_count").alias("missingCount_after") ), "attribute", "inner", ) else: output_cols = [ (i + "_imputed") for i in [e for e in (num_cols + cat_cols) if e in missing_cols] ] odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, output_cols) .withColumnRenamed("attribute", "attribute_after") .withColumn( "attribute", F.expr("substring(attribute_after, 1, length(attribute_after)-8)"), ) .drop("missing_pct"), "attribute", "inner", ) odf_print.show(len(list_of_cols), False) return odf def imputation_sklearn( spark, idf, list_of_cols="missing", drop_cols=[], missing_threshold=1.0, method_type="regression", use_sampling=True, sample_method="random", strata_cols="all", stratified_type="population", sample_size=10000, sample_seed=42, persist=True, persist_option=pyspark.StorageLevel.MEMORY_AND_DISK, pre_existing_model=False, model_path="NA", output_mode="replace", stats_missing={}, run_type="local", auth_key="NA", print_impact=False, ): """ The function "imputation_sklearn" leverages sklearn imputer algorithms. Two methods are supported via this function: “KNN” and “regression”. “KNN” option trains a sklearn.impute.KNNImputer which is based on k-Nearest Neighbors algorithm. The missing values of a sample are imputed using the mean of its 5 nearest neighbors in the training set. “regression” option trains a sklearn.impute.IterativeImputer which models attribute to impute as a function of rest of the attributes and imputes using the estimation. Imputation is performed in an iterative way from attributes with fewest number of missing values to most. All the hyperparameters used in the above mentioned imputers are their default values. However, sklearn imputers are not scalable, which might be slow if the size of the input dataframe is large. In fact, if the input dataframe size exceeds 10 GigaBytes, the model fitting step powered by sklearn might fail. Thus, an input sample_size (the default value is 10,000) can be set to control the number of samples to be used to train the imputer. If the total number of input dataset exceeds sample_size, the rest of the samples will be imputed using the trained imputer in a scalable manner. This is one of the way to demonstrate how Anovos has been designed as a scalable feature engineering library. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) missing_threshold Float argument - If list_of_cols is "missing", this argument is used to determined the missing threshold for every column. The column that has more (count of missing value/ count of total value) >= missing_threshold will be excluded from the list of columns to be imputed. (Default value = 1.0) method_type "KNN", "regression". "KNN" option trains a sklearn.impute.KNNImputer. "regression" option trains a sklearn.impute.IterativeImputer (Default value = "regression") use_sampling Boolean argument - True or False. This argument is used to determine whether to use sampling on source and target dataset, True will enable the use of sample method, otherwise False. It is recommended to set this as True for large datasets. (Default value = True) sample_method If use_sampling is True, this argument is used to determine the sampling method. "stratified" for Stratified sampling, "random" for Random Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "random") strata_cols If use_sampling is True and sample_method is "stratified", this argument is used to determine the list of columns used to be treated as strata. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "all") stratified_type If use_sampling is True and sample_method is "stratified", this argument is used to determine the stratified sampling method. "population" stands for Proportionate Stratified Sampling, "balanced" stands for Optimum Stratified Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "population") sample_size If use_sampling is True, this argument is used to determine maximum rows for training the sklearn imputer (Default value = 10000) sample_seed If use_sampling is True, this argument is used to determine the seed of sampling method. (Default value = 42) persist Boolean argument - True or False. This argument is used to determine whether to persist on binning result of source and target dataset, True will enable the use of persist, otherwise False. It is recommended to set this as True for large datasets. (Default value = True) persist_option If persist is True, this argument is used to determine the type of persist. For all the pyspark.StorageLevel option available in persist, please refer to https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html (Default value = pyspark.StorageLevel.MEMORY_AND_DISK) pre_existing_model Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) run_type "local", "emr", "databricks", "ak8s" (Default value = "local") auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. Returns ------- DataFrame Imputed Dataframe """ if persist: idf = idf.persist(persist_option) num_cols = attributeType_segregation(idf)[0] if stats_missing == {}: missing_df = missingCount_computation(spark, idf, num_cols) else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(num_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "Following columns dropped from the imputation as all values are null: " + ",".join(empty_cols) ) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .where(F.col("missing_pct") < missing_threshold) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if list_of_cols == "all": list_of_cols = num_cols if list_of_cols == "missing": list_of_cols = missing_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if (e not in drop_cols) & (e not in empty_cols)]) ) if len(list_of_cols) <= 1: warnings.warn( "No Imputation Performed - No Column(s) or Insufficient Column(s) to Impute" ) return idf if str(pre_existing_model).lower() == "true": pre_existing_model = True elif str(pre_existing_model).lower() == "false": pre_existing_model = False else: raise TypeError("Non-Boolean input for pre_existing_model") if ( (len([e for e in list_of_cols if e in missing_cols]) == 0) & (not pre_existing_model) & (model_path == "NA") ): warnings.warn( "No Imputation Performed - No Column(s) to Impute and No Imputation Model to be saved" ) return idf if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if method_type not in ("KNN", "regression"): raise TypeError("Invalid input for method_type") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if pre_existing_model: if run_type == "emr": bash_cmd = "aws s3 cp " + model_path + "/imputation_sklearn.sav ." output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) elif run_type == "ak8s": bash_cmd = ( 'azcopy cp "' + model_path + "/imputation_sklearn.sav" + str(auth_key) + '" .' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) else: imputer = pickle.load(open(model_path + "/imputation_sklearn.sav", "rb")) else: if use_sampling: count_idf = idf.count() if count_idf > sample_size: idf_model = data_sample( idf, strata_cols=strata_cols, fraction=sample_size / count_idf, method_type=sample_method, stratified_type=stratified_type, seed_value=sample_seed, ) else: idf_model = idf else: idf_model = idf if persist: idf_model = idf_model.persist(persist_option) idf_pd = idf_model.select(list_of_cols).toPandas() if method_type == "KNN": imputer = KNNImputer( n_neighbors=5, weights="uniform", metric="nan_euclidean" ) if method_type == "regression": imputer = IterativeImputer() imputer.fit(idf_pd) if (not pre_existing_model) & (model_path != "NA"): if run_type == "emr": pickle.dump(imputer, open("imputation_sklearn.sav", "wb")) bash_cmd = ( "aws s3 cp imputation_sklearn.sav " + model_path + "/imputation_sklearn.sav" ) output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) elif run_type == "ak8s": pickle.dump(imputer, open("imputation_sklearn.sav", "wb")) bash_cmd = ( 'azcopy cp "imputation_sklearn.sav" "' + ends_with(model_path) + "imputation_sklearn.sav" + str(auth_key) + '"' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) else: local_path = model_path + "/imputation_sklearn.sav" os.makedirs(os.path.dirname(local_path), exist_ok=True) pickle.dump(imputer, open(local_path, "wb")) imputer = pickle.load( open(model_path + "/imputation_sklearn.sav", "rb") ) @F.pandas_udf(returnType=T.ArrayType(T.DoubleType())) def prediction(*cols): input_pdf = pd.concat(cols, axis=1) return pd.Series(row.tolist() for row in imputer.transform(input_pdf)) result_df = idf.withColumn("features", prediction(*list_of_cols)) if persist: result_df = result_df.persist(persist_option) odf_schema = result_df.schema for i in list_of_cols: odf_schema = odf_schema.add(T.StructField(i + "_imputed", T.FloatType())) odf = ( result_df.rdd.map(lambda x: (*x, *x["features"])) .toDF(schema=odf_schema) .drop("features") ) output_cols = [] for i in list_of_cols: if output_mode == "append": if i not in missing_cols: odf = odf.drop(i + "_imputed") else: output_cols.append(i + "_imputed") else: odf = odf.drop(i).withColumnRenamed(i + "_imputed", i) odf = odf.select(idf.columns + output_cols) if print_impact: if output_mode == "replace": odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, list_of_cols).select( "attribute", F.col("missing_count").alias("missingCount_after") ), "attribute", "inner", ) else: odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, output_cols) .withColumnRenamed("attribute", "attribute_after") .withColumn( "attribute", F.expr("substring(attribute_after, 1, length(attribute_after)-8)"), ) .drop("missing_pct"), "attribute", "inner", ) odf_print.show(len(list_of_cols), False) if persist: idf.unpersist() if not pre_existing_model: idf_model.unpersist() result_df.unpersist() return odf def imputation_matrixFactorization( spark, idf, list_of_cols="missing", drop_cols=[], id_col="", output_mode="replace", stats_missing={}, print_impact=False, ): """ imputation_matrixFactorization uses collaborative filtering technique to impute missing values. Collaborative filtering is commonly used in recommender systems to fill the missing user-item entries and PySpark provides an implementation using alternating least squares (ALS) algorithm, which is used in this function. To fit our problem into the ALS model, each attribute is treated as an item and an id column needs to be specified by the user to generate the user-item pairs. In the case, ID column doesn't exist in the dataset and proxu ID column is implicitly generated by the function. Subsequently, all user-item pairs with known values will be used to train the ALS model and the trained model can be used to predict the user-item pairs with missing values. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) id_col ID column (Default value = "") output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. Returns ------- DataFrame Imputed Dataframe """ num_cols = attributeType_segregation(idf)[0] if stats_missing == {}: missing_df = missingCount_computation(spark, idf, num_cols) else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(num_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "Following columns dropped from the imputation as all values are null: " + ",".join(empty_cols) ) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .where(F.col("missing_pct") < 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if list_of_cols == "all": list_of_cols = num_cols if list_of_cols == "missing": list_of_cols = missing_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set( [ e for e in list_of_cols if (e not in drop_cols) & (e != id_col) & (e not in empty_cols) ] ) ) if (len(list_of_cols) == 0) | ( len([e for e in list_of_cols if e in missing_cols]) == 0 ): warnings.warn("No Imputation Performed - No Column(s) to Impute") return idf if len(list_of_cols) == 1: warnings.warn( "No Imputation Performed - Needs more than 1 column for matrix factorization" ) return idf if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") remove_id = False if id_col == "": idf = idf.withColumn("id", F.monotonically_increasing_id()).withColumn( "id", F.row_number().over(Window.orderBy("id")) ) id_col = "id" remove_id = True key_and_val = F.create_map( list(chain.from_iterable([[F.lit(c), F.col(c)] for c in list_of_cols])) ) df_flatten = idf.select(id_col, F.explode(key_and_val)).withColumn( "key", F.concat(F.col("key"), F.lit("_imputed")) ) id_type = get_dtype(idf, id_col) if id_type == "string": id_indexer = StringIndexer().setInputCol(id_col).setOutputCol("IDLabel") id_indexer_model = id_indexer.fit(df_flatten) df_flatten = id_indexer_model.transform(df_flatten).drop(id_col) else: df_flatten = df_flatten.withColumnRenamed(id_col, "IDLabel") indexer = StringIndexer().setInputCol("key").setOutputCol("keyLabel") indexer_model = indexer.fit(df_flatten) df_encoded = indexer_model.transform(df_flatten).drop("key") df_model = df_encoded.where(F.col("value").isNotNull()) df_test = df_encoded.where(F.col("value").isNull()) if ( df_model.select("IDLabel").distinct().count() < df_encoded.select("IDLabel").distinct().count() ): warnings.warn( "The returned odf may not be fully imputed because values for all list_of_cols are null for some IDs" ) als = ALS( maxIter=20, regParam=0.01, userCol="IDLabel", itemCol="keyLabel", ratingCol="value", coldStartStrategy="drop", ) model = als.fit(df_model) df_pred = ( model.transform(df_test).drop("value").withColumnRenamed("prediction", "value") ) df_encoded_pred = df_model.union(df_pred.select(df_model.columns)) if id_type == "string": IDlabelReverse = IndexToString().setInputCol("IDLabel").setOutputCol(id_col) df_encoded_pred = IDlabelReverse.transform(df_encoded_pred) else: df_encoded_pred = df_encoded_pred.withColumnRenamed("IDLabel", id_col) keylabelReverse = IndexToString().setInputCol("keyLabel").setOutputCol("key") odf_imputed = ( keylabelReverse.transform(df_encoded_pred) .groupBy(id_col) .pivot("key") .agg(F.first("value")) .select( [id_col] + [(i + "_imputed") for i in list_of_cols if i in missing_cols] ) ) odf = idf.join(odf_imputed, id_col, "left_outer") for i in list_of_cols: if i not in missing_cols: odf = odf.drop(i + "_imputed") elif output_mode == "replace": odf = odf.drop(i).withColumnRenamed(i + "_imputed", i) if remove_id: odf = odf.drop("id") if print_impact: if output_mode == "replace": odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, list_of_cols).select( "attribute", F.col("missing_count").alias("missingCount_after") ), "attribute", "inner", ) else: output_cols = [ (i + "_imputed") for i in [e for e in list_of_cols if e in missing_cols] ] odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, output_cols) .withColumnRenamed("attribute", "attribute_after") .withColumn( "attribute", F.expr("substring(attribute_after, 1, length(attribute_after)-8)"), ) .drop("missing_pct"), "attribute", "inner", ) odf_print.show(len(list_of_cols), False) return odf def auto_imputation( spark, idf, list_of_cols="missing", drop_cols=[], id_col="", null_pct=0.1, stats_missing={}, output_mode="replace", run_type="local", root_path="", auth_key="NA", print_impact=True, ): """ auto_imputation tests for 5 imputation methods using the other imputation functions provided in this module and returns the one with the best performance. The 5 methods are: (1) imputation_MMM with method_type="mean" (2) imputation_MMM with method_type="median" (3) imputation_sklearn with method_type="KNN" (4) imputation_sklearn with method_type="regression" (5) imputation_matrixFactorization Samples without missing values in attributes to impute are used for testing by removing some % of values and impute them again using the above 5 methods. RMSE/attribute_mean is used as the evaluation metric for each attribute to reduce the effect of unit difference among attributes. The final error of a method is calculated by the sum of ( RMSE/attribute_mean) for all numerical attributes to impute and the method with the least error will be selected. The above testing is only applicable for numerical attributes. If categorical attributes are included, they will be automatically imputed using imputation_MMM. In addition, if there is only one numerical attribute to impute, only method (1) and (2) will be tested because the rest of the methods require more than one column. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) id_col ID column (Default value = "") null_pct proportion of the valid input data to be replaced by None to form the test data (Default value = 0.1) stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") run_type "local", "emr", "databricks", "ak8s" (Default value = "local") root_path This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is "" auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. It also print the name of best performing imputation method along with RMSE details. Returns ------- DataFrame Imputed Dataframe """ if stats_missing == {}: missing_df = missingCount_computation(spark, idf) missing_df.write.parquet( root_path + "intermediate_data/imputation_comparison/missingCount_computation", mode="overwrite", ) stats_missing = { "file_path": root_path + "intermediate_data/imputation_comparison/missingCount_computation", "file_type": "parquet", } else: missing_df = read_dataset(spark, **stats_missing).select( "attribute", "missing_count", "missing_pct" ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn("Following columns have all null values: " + ",".join(empty_cols)) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if list_of_cols == "all": list_of_cols = idf.columns if list_of_cols == "missing": list_of_cols = missing_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if (e not in drop_cols) & (e != id_col)]) ) if any(x not in idf.columns for x in list_of_cols): raise TypeError("Invalid input for Column(s)") del_cols = [e for e in list_of_cols if e in empty_cols] odf_del = idf.drop(*del_cols) list_of_cols = [e for e in list_of_cols if e not in empty_cols] num_cols, cat_cols, other_cols = attributeType_segregation( odf_del.select(list_of_cols) ) missing_catcols = [e for e in cat_cols if e in missing_cols] missing_numcols = [e for e in num_cols if e in missing_cols] if missing_catcols: odf_imputed_cat = imputation_MMM( spark, odf_del, list_of_cols=missing_catcols, stats_missing=stats_missing ) else: odf_imputed_cat = odf_del if len(missing_numcols) == 0: warnings.warn( "No Imputation Performed for numerical columns - No Column(s) to Impute" ) return odf_imputed_cat idf_test = ( odf_imputed_cat.dropna(subset=missing_numcols) .withColumn("index", F.monotonically_increasing_id()) .withColumn("index", F.row_number().over(Window.orderBy("index"))) ) null_count = int(null_pct * idf_test.count()) idf_null = idf_test for i in missing_numcols: null_index = random.sample(range(idf_test.count()), null_count) idf_null = idf_null.withColumn( i, F.when(F.col("index").isin(null_index), None).otherwise(F.col(i)) ) idf_null.write.parquet( root_path + "intermediate_data/imputation_comparison/test_dataset", mode="overwrite", ) idf_null = spark.read.parquet( root_path + "intermediate_data/imputation_comparison/test_dataset" ) method1 = imputation_MMM( spark, idf_null, list_of_cols=missing_numcols, method_type="mean", stats_missing=stats_missing, output_mode=output_mode, ) method2 = imputation_MMM( spark, idf_null, list_of_cols=missing_numcols, method_type="median", stats_missing=stats_missing, output_mode=output_mode, ) valid_methods = [method1, method2] if len(num_cols) > 1: method3 = imputation_sklearn( spark, idf_null, list_of_cols=num_cols, method_type="KNN", stats_missing=stats_missing, output_mode=output_mode, auth_key=auth_key, ) method4 = imputation_sklearn( spark, idf_null, list_of_cols=num_cols, method_type="regression", stats_missing=stats_missing, output_mode=output_mode, auth_key=auth_key, ) method5 = imputation_matrixFactorization( spark, idf_null, list_of_cols=num_cols, id_col=id_col, stats_missing=stats_missing, output_mode=output_mode, ) valid_methods = [method1, method2, method3, method4, method5] nrmse_all = [] method_all = ["MMM-mean", "MMM-median", "KNN", "regression", "matrix_factorization"] for index, method in enumerate(valid_methods): nrmse = 0 for i in missing_numcols: pred_col = (i + "_imputed") if output_mode == "append" else i idf_joined = ( idf_test.select("index", F.col(i).alias("val")) .join( method.select("index", F.col(pred_col).alias("pred")), "index", "left_outer", ) .dropna() ) idf_joined = recast_column( idf=idf_joined, list_of_cols=["val", "pred"], list_of_dtypes=["double", "double"], ) pred_mean = float( method.select(F.mean(pred_col)).rdd.flatMap(lambda x: x).collect()[0] ) i_nrmse = ( RegressionEvaluator( metricName="rmse", labelCol="val", predictionCol="pred" ).evaluate(idf_joined) ) / abs(pred_mean) nrmse += i_nrmse nrmse_all.append(nrmse) min_index = nrmse_all.index(np.min(nrmse_all)) best_method = method_all[min_index] odf = valid_methods[min_index] if print_impact: print(list(zip(method_all, nrmse_all))) print("Best Imputation Method: ", best_method) return odf def autoencoder_latentFeatures( spark, idf, list_of_cols="all", drop_cols=[], reduction_params=0.5, sample_size=500000, epochs=100, batch_size=256, pre_existing_model=False, model_path="NA", standardization=True, standardization_configs={"pre_existing_model": False, "model_path": "NA"}, imputation=False, imputation_configs={"imputation_function": "imputation_MMM"}, stats_missing={}, output_mode="replace", run_type="local", root_path="", auth_key="NA", print_impact=False, ): """ Many machine learning models suffer from "the curse of dimensionality" when the number of features is too large. autoencoder_latentFeatures is able to reduce the dimensionality by compressing input attributes to a smaller number of latent features. To be more specific, it trains a neural network model using TensorFlow library. The neural network contains an encoder and a decoder, where the encoder learns to represent the input using smaller number of latent features controlled by the input *reduction_params* and the decoder learns to reproduce the input using the latent features. In the end, only the encoder will be kept and the latent features generated by the encoder will be added to the output dataframe. However, the neural network model is not trained in a scalable manner, which might not be able to handle large input dataframe. Thus, an input *sample_size* (the default value is 500,000) can be set to control the number of samples to be used to train the model. If the total number of samples exceeds *sample_size*, the rest of the samples will be predicted using the fitted encoder. Standardization is highly recommended if the input attributes are not of the same scale. Otherwise, the model might not converge smoothly. Inputs *standardization* and *standardization_configs* can be set accordingly to perform standardization within the function. In addition, if a sample contains missing values in the model input, the output values for its latent features will all be None. Thus data imputation is also recommended if missing values exist, which can be done within the function by setting inputs *imputation* and *imputation_configs*. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) reduction_params Determines the number of encoded features in the result. If reduction_params < 1, int(reduction_params * <number of columns>) columns will be generated. Else, reduction_params columns will be generated. (Default value = 0.5) sample_size Maximum rows for training the autoencoder model using tensorflow. (Default value = 500000) epochs Number of epochs to train the tensorflow model. (Default value = 100) batch_size Number of samples per gradient update when fitting the tensorflow model. (Default value = 256) pre_existing_model Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. standardization Boolean argument – True or False. True, if the standardization required. (Default value = True) standardization_configs z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False) imputation Boolean argument – True or False. True, if the imputation required. (Default value = False) imputation_configs Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"}) stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed columns: latent_<col_index>. “append” option append transformed columns with format latent_<col_index> to the input dataset, e.g. latent_0, latent_1 will be appended if reduction_params=2. (Default value = "replace") run_type "local", "emr", "databricks", "ak8s" (Default value = "local") root_path This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is "" auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False This argument is to print descriptive statistics of the latest features (Default value = False) Returns ------- DataFrame Dataframe with Latent Features """ if "arm64" in platform.version().lower(): warnings.warn( "This function is currently not supported for ARM64 - Mac M1 Machine" ) return idf num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Latent Features Generated - No Column(s) to Transform") return idf if stats_missing == {}: missing_df = missingCount_computation(spark, idf, list_of_cols) missing_df.write.parquet( root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation", mode="overwrite", ) stats_missing = { "file_path": root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation", "file_type": "parquet", } else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(list_of_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "The following column(s) are excluded from dimensionality reduction as all values are null: " + ",".join(empty_cols) ) list_of_cols = [e for e in list_of_cols if e not in empty_cols] if standardization: idf_standardized = z_standardization( spark, idf, list_of_cols=list_of_cols, output_mode="append", **standardization_configs, ) list_of_cols_scaled = [ i + "_scaled" for i in list_of_cols if (i + "_scaled") in idf_standardized.columns ] else: idf_standardized = idf for i in list_of_cols: idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i)) list_of_cols_scaled = [i + "_scaled" for i in list_of_cols] if imputation: all_functions = globals().copy() all_functions.update(locals()) f = all_functions.get(imputation_configs["imputation_function"]) args = copy.deepcopy(imputation_configs) args.pop("imputation_function", None) missing_df_scaled = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled"))) ) missing_df_scaled.write.parquet( root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation_scaled", mode="overwrite", ) stats_missing_scaled = { "file_path": root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation_scaled", "file_type": "parquet", } idf_imputed = f( spark, idf_standardized, list_of_cols_scaled, stats_missing=stats_missing_scaled, **args, ) else: idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled) n_inputs = len(list_of_cols_scaled) if reduction_params < 1: n_bottleneck = int(reduction_params * n_inputs) else: n_bottleneck = int(reduction_params) if pre_existing_model: if run_type == "emr": bash_cmd = ( "aws s3 cp " + model_path + "/autoencoders_latentFeatures/encoder.h5 ." ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( "aws s3 cp " + model_path + "/autoencoders_latentFeatures/model.h5 ." ) output = subprocess.check_output(["bash", "-c", bash_cmd]) encoder = load_model("encoder.h5") model = load_model("model.h5") elif run_type == "ak8s": bash_cmd = ( 'azcopy cp "' + model_path + "/autoencoders_latentFeatures/encoder.h5" + str(auth_key) + '" .' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( 'azcopy cp "' + model_path + "/autoencoders_latentFeatures/model.h5" + str(auth_key) + '" .' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) encoder = load_model("encoder.h5") model = load_model("model.h5") else: encoder = load_model(model_path + "/autoencoders_latentFeatures/encoder.h5") model = load_model(model_path + "/autoencoders_latentFeatures/model.h5") else: idf_valid = idf_imputed.select(list_of_cols_scaled) idf_model = idf_valid.sample( False, min(1.0, float(sample_size) / idf_valid.count()), 0 ) idf_train = idf_model.sample(False, 0.8, 0) idf_test = idf_model.subtract(idf_train) X_train = idf_train.toPandas() X_test = idf_test.toPandas() visible = Input(shape=(n_inputs,)) e = Dense(n_inputs * 2)(visible) e = BatchNormalization()(e) e = LeakyReLU()(e) e = Dense(n_inputs)(e) e = BatchNormalization()(e) e = LeakyReLU()(e) bottleneck = Dense(n_bottleneck)(e) d = Dense(n_inputs)(bottleneck) d = BatchNormalization()(d) d = LeakyReLU()(d) d = Dense(n_inputs * 2)(d) d = BatchNormalization()(d) d = LeakyReLU()(d) output = Dense(n_inputs, activation="linear")(d) model = Model(inputs=visible, outputs=output) encoder = Model(inputs=visible, outputs=bottleneck) model.compile(optimizer="adam", loss="mse") history = model.fit( X_train, X_train, epochs=int(epochs), batch_size=int(batch_size), verbose=2, validation_data=(X_test, X_test), ) if (not pre_existing_model) & (model_path != "NA"): if run_type == "emr": encoder.save("encoder.h5") model.save("model.h5") bash_cmd = ( "aws s3 cp encoder.h5 " + model_path + "/autoencoders_latentFeatures/encoder.h5" ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( "aws s3 cp model.h5 " + model_path + "/autoencoders_latentFeatures/model.h5" ) output = subprocess.check_output(["bash", "-c", bash_cmd]) elif run_type == "ak8s": encoder.save("encoder.h5") model.save("model.h5") bash_cmd = ( 'azcopy cp "encoder.h5" "' + model_path + "/autoencoders_latentFeatures/encoder.h5" + str(auth_key) + '" ' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( 'azcopy cp "model.h5" "' + model_path + "/autoencoders_latentFeatures/model.h5" + str(auth_key) + '" ' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) else: if not os.path.exists(model_path + "/autoencoders_latentFeatures/"): os.makedirs(model_path + "/autoencoders_latentFeatures/") encoder.save(model_path + "/autoencoders_latentFeatures/encoder.h5") model.save(model_path + "/autoencoders_latentFeatures/model.h5") class ModelWrapperPickable: def __init__(self, model): self.model = model def __getstate__(self): model_str = "" with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd: tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True) model_str = fd.read() d = {"model_str": model_str} return d def __setstate__(self, state): with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd: fd.write(state["model_str"]) fd.flush() self.model = tensorflow.keras.models.load_model(fd.name) model_wrapper = ModelWrapperPickable(encoder) def compute_output_pandas_udf(model_wrapper): @F.pandas_udf(returnType=T.ArrayType(T.DoubleType())) def predict_pandas_udf(*cols): X = pd.concat(cols, axis=1) return pd.Series(row.tolist() for row in model_wrapper.model.predict(X)) return predict_pandas_udf odf = idf_imputed.withColumn( "predicted_output", compute_output_pandas_udf(model_wrapper)(*list_of_cols_scaled), ) odf_schema = odf.schema for i in range(0, n_bottleneck): odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType())) odf = ( odf.rdd.map(lambda x: (*x, *x["predicted_output"])) .toDF(schema=odf_schema) .drop("predicted_output") ) odf = odf.drop(*list_of_cols_scaled) if output_mode == "replace": odf = odf.drop(*list_of_cols) if print_impact: output_cols = ["latent_" + str(j) for j in range(0, n_bottleneck)] odf.select(output_cols).describe().show(5, False) return odf def PCA_latentFeatures( spark, idf, list_of_cols="all", drop_cols=[], explained_variance_cutoff=0.95, pre_existing_model=False, model_path="NA", standardization=True, standardization_configs={"pre_existing_model": False, "model_path": "NA"}, imputation=False, imputation_configs={"imputation_function": "imputation_MMM"}, stats_missing={}, output_mode="replace", run_type="local", root_path="", auth_key="NA", print_impact=False, ): """ Similar to autoencoder_latentFeatures, PCA_latentFeatures also generates latent features which reduces the dimensionality of the input dataframe but through a different technique: Principal Component Analysis (PCA). PCA algorithm produces principal components such that it can describe most of the remaining variance and all the principal components are orthogonal to each other. The final number of generated principal components is controlled by the input *explained_variance_cutoff*. In other words, the number of selected principal components, k, is the minimum value such that top k components (latent features) can explain at least *explained_variance_cutoff* of the total variance. Standardization is highly recommended if the input attributes are not of the same scale. Otherwise the generated latent features might be dominated by the attributes with larger variance. Inputs *standardization* and *standardization_configs* can be set accordingly to perform standardization within the function. In addition, data imputation is also recommended if missing values exist, which can be done within the function by setting inputs *imputation* and *imputation_configs*. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) explained_variance_cutoff Determines the number of latent columns in the output. If N is the smallest integer such that top N latent columns explain more than explained_variance_cutoff variance, these N columns will be selected. (Default value = 0.95) pre_existing_model Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. standardization Boolean argument – True or False. True, if the standardization required. (Default value = True) standardization_configs z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False) imputation Boolean argument – True or False. True, if the imputation required. (Default value = False) imputation_configs Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"}) stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed columns: latent_<col_index>. “append” option append transformed columns with format latent_<col_index> to the input dataset, e.g. latent_0, latent_1. (Default value = "replace") run_type "local", "emr", "databricks", "ak8s" (Default value = "local") root_path This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is "" auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False This argument is to print descriptive statistics of the latest features (Default value = False) Returns ------- DataFrame Dataframe with Latent Features """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Latent Features Generated - No Column(s) to Transform") return idf if stats_missing == {}: missing_df = missingCount_computation(spark, idf, list_of_cols) missing_df.write.parquet( root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation", mode="overwrite", ) stats_missing = { "file_path": root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation", "file_type": "parquet", } else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(list_of_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "The following column(s) are excluded from dimensionality reduction as all values are null: " + ",".join(empty_cols) ) list_of_cols = [e for e in list_of_cols if e not in empty_cols] if standardization: idf_standardized = z_standardization( spark, idf, list_of_cols=list_of_cols, output_mode="append", **standardization_configs, ) list_of_cols_scaled = [ i + "_scaled" for i in list_of_cols if (i + "_scaled") in idf_standardized.columns ] else: idf_standardized = idf for i in list_of_cols: idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i)) list_of_cols_scaled = [i + "_scaled" for i in list_of_cols] if imputation: all_functions = globals().copy() all_functions.update(locals()) f = all_functions.get(imputation_configs["imputation_function"]) args = copy.deepcopy(imputation_configs) args.pop("imputation_function", None) missing_df_scaled = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled"))) ) missing_df_scaled.write.parquet( root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled", mode="overwrite", ) stats_missing_scaled = { "file_path": root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled", "file_type": "parquet", } idf_imputed = f( spark, idf_standardized, list_of_cols_scaled, stats_missing=stats_missing_scaled, **args, ) else: idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled) idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() assembler = VectorAssembler(inputCols=list_of_cols_scaled, outputCol="features") assembled_data = assembler.transform(idf_imputed).drop(*list_of_cols_scaled) if pre_existing_model: pca = PCA.load(model_path + "/PCA_latentFeatures/pca_path") pcaModel = PCAModel.load(model_path + "/PCA_latentFeatures/pcaModel_path") n = pca.getK() else: pca = PCA( k=len(list_of_cols_scaled), inputCol="features", outputCol="features_pca" ) pcaModel = pca.fit(assembled_data) explained_variance = 0 for n in range(1, len(list_of_cols) + 1): explained_variance += pcaModel.explainedVariance[n - 1] if explained_variance > explained_variance_cutoff: break pca = PCA(k=n, inputCol="features", outputCol="features_pca") pcaModel = pca.fit(assembled_data) if (not pre_existing_model) & (model_path != "NA"): pcaModel.write().overwrite().save( model_path + "/PCA_latentFeatures/pcaModel_path" ) pca.write().overwrite().save(model_path + "/PCA_latentFeatures/pca_path") def vector_to_array(v): return v.toArray().tolist() f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType())) odf = ( pcaModel.transform(assembled_data) .withColumn("features_pca_array", f_vector_to_array("features_pca")) .drop(*["features", "features_pca"]) ) odf_schema = odf.schema for i in range(0, n): odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType())) odf = ( odf.rdd.map(lambda x: (*x, *x["features_pca_array"])) .toDF(schema=odf_schema) .drop("features_pca_array") .replace(float("nan"), None, subset=["latent_" + str(j) for j in range(0, n)]) ) if output_mode == "replace": odf = odf.drop(*list_of_cols) if print_impact: print("Explained Variance: ", round(np.sum(pcaModel.explainedVariance[0:n]), 4)) output_cols = ["latent_" + str(j) for j in range(0, n)] odf.select(output_cols).describe().show(5, False) return odf def feature_transformation( idf, list_of_cols="all", drop_cols=[], method_type="sqrt", N=None, output_mode="replace", print_impact=False, ): """ As the name indicates, feature_transformation performs mathematical transformation over selected attributes. The following methods are supported for an input attribute x: ln(x), log10(x), log2(x), e^x, 2^x, 10^x, N^x, square and cube root of x, x^2, x^3, x^N, trigonometric transformations of x (sin, cos, tan, asin, acos, atan), radians, x%N, x!, 1/x, floor and ceiling of x and x rounded to N decimal places. Some transformations only work with positive or non-negative input values such as log and square root and an error will be returned if violated. Parameters ---------- idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "ln", "log10", "log2", "exp", "powOf2" (2^x), "powOf10" (10^x), "powOfN" (N^x), "sqrt" (square root), "cbrt" (cube root), "sq" (square), "cb" (cube), "toPowerN" (x^N), "sin", "cos", "tan", "asin", "acos", "atan", "radians", "remainderDivByN" (x%N), "factorial" (x!), "mul_inv" (1/x), "floor", "ceil", "roundN" (round to N decimal places) (Default value = "sqrt") N None by default. If method_type is "powOfN", "toPowerN", "remainderDivByN" or "roundN", N will be used as the required constant. output_mode "replace", "append". “replace” option replaces original columns with transformed columns. “append” option append transformed columns with a postfix (E.g. "_ln", "_powOf<N>") to the input dataset. (Default value = "replace") print_impact True, False This argument is to print before and after descriptive statistics of the transformed features (Default value = False) Returns ------- DataFrame Transformed Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if (len(list_of_cols) == 0) | (any(x not in num_cols for x in list_of_cols)): raise TypeError("Invalid input for Column(s)") if method_type not in [ "ln", "log10", "log2", "exp", "powOf2", "powOf10", "powOfN", "sqrt", "cbrt", "sq", "cb", "toPowerN", "sin", "cos", "tan", "asin", "acos", "atan", "radians", "remainderDivByN", "factorial", "mul_inv", "floor", "ceil", "roundN", ]: raise TypeError("Invalid input method_type") num_cols = attributeType_segregation(idf.select(list_of_cols))[0] list_of_cols = num_cols odf = idf transformation_function = { "ln": F.log, "log10": F.log10, "log2": F.log2, "exp": F.exp, "powOf2": (lambda x: F.pow(2.0, x)), "powOf10": (lambda x: F.pow(10.0, x)), "powOfN": (lambda x: F.pow(N, x)), "sqrt": F.sqrt, "cbrt": F.cbrt, "sq": (lambda x: x**2), "cb": (lambda x: x**3), "toPowerN": (lambda x: x**N), "sin": F.sin, "cos": F.cos, "tan": F.tan, "asin": F.asin, "acos": F.acos, "atan": F.atan, "radians": F.radians, "remainderDivByN": (lambda x: x % F.lit(N)), "factorial": F.factorial, "mul_inv": (lambda x: F.lit(1) / x), "floor": F.floor, "ceil": F.ceil, "roundN": (lambda x: F.round(x, N)), } def get_col_name(i): if output_mode == "replace": return i else: if method_type in ["powOfN", "toPowerN", "remainderDivByN", "roundN"]: return i + "_" + method_type[:-1] + str(N) else: return i + "_" + method_type output_cols = [] for i in list_of_cols: modify_col = get_col_name(i) odf = odf.withColumn(modify_col, transformation_function[method_type](F.col(i))) output_cols.append(modify_col) if print_impact: print("Before:") idf.select(list_of_cols).describe().show(5, False) print("After:") odf.select(output_cols).describe().show(5, False) return odf def boxcox_transformation( idf, list_of_cols="all", drop_cols=[], boxcox_lambda=None, output_mode="replace", print_impact=False, ): """ Some machine learning algorithms require the input data to follow normal distributions. Thus, when the input data is too skewed, boxcox_transformation can be used to transform it into a more normal-like distribution. The transformed value of a sample x depends on a coefficient lambda: (1) if lambda = 0, x is transformed into log(x); (2) if lambda !=0, x is transformed into (x^lambda-1)/lambda. The value of lambda can be either specified by the user or automatically selected within the function. If lambda needs to be selected within the function, a range of values (0, 1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5) will be tested and the lambda, which optimizes the KolmogorovSmirnovTest by PySpark with the theoretical distribution being normal, will be used to perform the transformation. Different lambda values can be assigned to different attributes but one attribute can only be assigned one lambda value. Parameters ---------- idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) boxcox_lambda Lambda value for box_cox transormation. If boxcox_lambda is not None, it will be directly used for the transformation. It can be a (1) list: each element represents a lambda value for an attribute and the length of the list must be the same as the number of columns to transform. (2) int/float: all attributes will be assigned the same lambda value. Else, search for the best lambda among [1,-1,0.5,-0.5,2,-2,0.25,-0.25,3,-3,4,-4,5,-5] for each column and apply the transformation (Default value = None) output_mode "replace", "append". “replace” option replaces original columns with transformed columns. “append” option append transformed columns with a postfix "_bxcx_<lambda>" to the input dataset. (Default value = "replace") print_impact True, False This argument is to print before and after descriptive statistics of the transformed features (Default value = False) Returns ------- DataFrame Transformed Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if (len(list_of_cols) == 0) | (any(x not in num_cols for x in list_of_cols)): raise TypeError("Invalid input for Column(s)") num_cols = attributeType_segregation(idf.select(list_of_cols))[0] list_of_cols = num_cols odf = idf col_mins = idf.select([F.min(i) for i in list_of_cols]) if any([i <= 0 for i in col_mins.rdd.flatMap(lambda x: x).collect()]): col_mins.show(1, False) raise ValueError("Data must be positive") if boxcox_lambda is not None: if isinstance(boxcox_lambda, (list, tuple)): if len(boxcox_lambda) != len(list_of_cols): raise TypeError("Invalid input for boxcox_lambda") elif not all([isinstance(l, (float, int)) for l in boxcox_lambda]): raise TypeError("Invalid input for boxcox_lambda") else: boxcox_lambda_list = list(boxcox_lambda) elif isinstance(boxcox_lambda, (float, int)): boxcox_lambda_list = [boxcox_lambda] * len(list_of_cols) else: raise TypeError("Invalid input for boxcox_lambda") else: boxcox_lambda_list = [] for i in list_of_cols: lambdaVal = [1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5] best_pVal = 0 for j in lambdaVal: pVal = Statistics.kolmogorovSmirnovTest( odf.select(F.pow(F.col(i), j)).rdd.flatMap(lambda x: x), "norm" ).pValue if pVal > best_pVal: best_pVal = pVal best_lambdaVal = j pVal = Statistics.kolmogorovSmirnovTest( odf.select(F.log(F.col(i))).rdd.flatMap(lambda x: x), "norm" ).pValue if pVal > best_pVal: best_pVal = pVal best_lambdaVal = 0 boxcox_lambda_list.append(best_lambdaVal) output_cols = [] for i, curr_lambdaVal in zip(list_of_cols, boxcox_lambda_list): if curr_lambdaVal != 1: modify_col = ( (i + "_bxcx_" + str(curr_lambdaVal)) if output_mode == "append" else i ) output_cols.append(modify_col) if curr_lambdaVal == 0: odf = odf.withColumn(modify_col, F.log(F.col(i))) else: odf = odf.withColumn(modify_col, F.pow(F.col(i), curr_lambdaVal)) if len(output_cols) == 0: warnings.warn( "lambdaVal for all columns are 1 so no transformation is performed and idf is returned" ) return idf if print_impact: print("Transformed Columns: ", list_of_cols) print("Best BoxCox Parameter(s): ", boxcox_lambda_list) print("Before:") idf.select(list_of_cols).describe().unionByName( idf.select([F.skewness(i).alias(i) for i in list_of_cols]).withColumn( "summary", F.lit("skewness") ) ).show(6, False) print("After:") if output_mode == "replace": odf.select(list_of_cols).describe().unionByName( odf.select([F.skewness(i).alias(i) for i in list_of_cols]).withColumn( "summary", F.lit("skewness") ) ).show(6, False) else: output_cols = [("`" + i + "`") for i in output_cols] odf.select(output_cols).describe().unionByName( odf.select( [F.skewness(i).alias(i[1:-1]) for i in output_cols] ).withColumn("summary", F.lit("skewness")) ).show(6, False) return odf def outlier_categories( spark, idf, list_of_cols="all", drop_cols=[], coverage=1.0, max_category=50, pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ This function replaces less frequently seen values (called as outlier values in the current context) in a categorical column by 'outlier_categories'. Outlier values can be defined in two ways – a) Max N categories, where N is user defined value. In this method, top N-1 frequently seen categories are considered and rest are clubbed under single category 'outlier_categories'. or Alternatively, b) Coverage – top frequently seen categories are considered till it covers minimum N% of rows and rest lesser seen values are mapped to 'outlier_categories'. Even if the Coverage is less, maximum category constraint is given priority. Further, there is a caveat that when multiple categories have same rank. Then, number of categorical values can be more than max_category defined by the user. This function performs better when distinct values, in any column, are not more than 100. It is recommended that to drop those columns from the computation which has more than 100 distinct values, to get better performance out of this function. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) coverage Defines the minimum % of rows that will be mapped to actual category name and the rest to be mapped to 'outlier_categories' and takes value between 0 to 1. Coverage of 0.8 can be interpreted as top frequently seen categories are considered till it covers minimum 80% of rows and rest lesser seen values are mapped to 'outlier_categories'. (Default value = 1.0) max_category Even if coverage is less, only (max_category - 1) categories will be mapped to actual name and rest to 'outlier_categories'. Caveat is when multiple categories have same rank, then #categories can be more than max_category. (Default value = 50) pre_existing_model Boolean argument – True or False. True if the model with the outlier/other values for each attribute exists already to be used, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for the pre-saved model. If pre_existing_model is False, this field can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_outliered" e.g. column X is appended as X_outliered. (Default value = "replace") print_impact True, False This argument is to print before and after unique count of the transformed features (Default value = False) Returns ------- DataFrame Transformed Dataframe """ cat_cols = attributeType_segregation(idf)[1] if list_of_cols == "all": list_of_cols = cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Outlier Categories Computation - No categorical column(s) to transform" ) return idf if (coverage <= 0) | (coverage > 1): raise TypeError("Invalid input for Coverage Value") if max_category < 2: raise TypeError("Invalid input for Maximum No. of Categories Allowed") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") idf = idf.persist(pyspark.StorageLevel.MEMORY_AND_DISK) if pre_existing_model: df_model = spark.read.csv( model_path + "/outlier_categories", header=True, inferSchema=True ) else: for index, i in enumerate(list_of_cols): window = Window.partitionBy().orderBy(F.desc("count_pct")) df_cats = ( idf.select(i) .groupBy(i) .count() .dropna() .withColumn( "count_pct", F.col("count") / F.sum("count").over(Window.partitionBy()), ) .withColumn("rank", F.rank().over(window)) .withColumn( "cumu", F.sum("count_pct").over( window.rowsBetween(Window.unboundedPreceding, 0) ), ) .withColumn("lag_cumu", F.lag("cumu").over(window)) .fillna(0) .where(~((F.col("cumu") >= coverage) & (F.col("lag_cumu") >= coverage))) .where(F.col("rank") <= (max_category - 1)) .select(F.lit(i).alias("attribute"), F.col(i).alias("parameters")) ) if index == 0: df_model = df_cats else: df_model = df_model.union(df_cats) df_params = df_model.rdd.groupByKey().mapValues(list).collect() dict_params = dict(df_params) broadcast_params = spark.sparkContext.broadcast(dict_params) def get_params(key): parameters = list() dict_params = broadcast_params.value params = dict_params.get(key) if params: parameters = params return parameters odf = idf for i in list_of_cols: parameters = get_params(i) if output_mode == "replace": odf = odf.withColumn( i, F.when( (F.col(i).isin(parameters)) | (F.col(i).isNull()), F.col(i) ).otherwise("outlier_categories"), ) else: odf = odf.withColumn( i + "_outliered", F.when( (F.col(i).isin(parameters)) | (F.col(i).isNull()), F.col(i) ).otherwise("outlier_categories"), ) if (not pre_existing_model) & (model_path != "NA"): df_model.repartition(1).write.csv( model_path + "/outlier_categories", header=True, mode="overwrite" ) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_outliered") for i in list_of_cols] uniqueCount_computation(spark, idf, list_of_cols).select( "attribute", F.col("unique_values").alias("uniqueValues_before") ).show(len(list_of_cols), False) uniqueCount_computation(spark, odf, output_cols).select( "attribute", F.col("unique_values").alias("uniqueValues_after") ).show(len(list_of_cols), False) idf.unpersist() return odf def expression_parser(idf, list_of_expr, postfix="", print_impact=False): """ expression_parser can be used to evaluate a list of SQL expressions and output the result as new features. It is able to handle column names containing special characters such as “.”, “-”, “@”, “^”, etc, by converting them to “_” first before the evaluation and convert them back to the original names before returning the output dataframe. Parameters ---------- idf Input Dataframe list_of_expr List of expressions to evaluate as new features e.g., ["expr1","expr2"]. Alternatively, expressions can be specified in a string format, where different expressions are separated by pipe delimiter “|” e.g., "expr1|expr2". postfix postfix for new feature name.Naming convention "f" + expression_index + postfix e.g. with postfix of "new", new added features are named as f0new, f1new etc. (Default value = "") print_impact True, False This argument is to print the descriptive statistics of the parsed features (Default value = False) Returns ------- DataFrame Parsed Dataframe """ if isinstance(list_of_expr, str): list_of_expr = [x.strip() for x in list_of_expr.split("|")] special_chars = [ "&", "$", ";", ":", ",", "*", "#", "@", "?", "%", "!", "^", "(", ")", "-", "/", "'", ".", '"', ] rename_cols = [] replace_chars = {} for char in special_chars: for col in idf.columns: if char in col: rename_cols.append(col) if col in replace_chars.keys(): (replace_chars[col]).append(char) else: replace_chars[col] = [char] rename_mapping_to_new, rename_mapping_to_old = {}, {} idf_renamed = idf for col in rename_cols: new_col = col for char in replace_chars[col]: new_col = new_col.replace(char, "_") rename_mapping_to_old[new_col] = col rename_mapping_to_new[col] = new_col idf_renamed = idf_renamed.withColumnRenamed(col, new_col) list_of_expr_ = [] for expr in list_of_expr: new_expr = expr for col in rename_cols: if col in expr: new_expr = new_expr.replace(col, rename_mapping_to_new[col]) list_of_expr_.append(new_expr) list_of_expr = list_of_expr_ odf = idf_renamed new_cols = [] for index, exp in enumerate(list_of_expr): odf = odf.withColumn("f" + str(index) + postfix, F.expr(exp)) new_cols.append("f" + str(index) + postfix) for new_col, col in rename_mapping_to_old.items(): odf = odf.withColumnRenamed(new_col, col) if print_impact: print("Columns Added: ", new_cols) odf.select(new_cols).describe().show(5, False) return odf
Functions
def IQR_standardization(spark, idf, list_of_cols='all', drop_cols=[], pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)
-
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
pre_existing_model
- Boolean argument – True or False. True if model files (25/50/75 percentile for each feature) exists already, False Otherwise (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
print_impact
- True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns.
Returns
DataFrame
- Rescaled Dataframe
Expand source code
def IQR_standardization( spark, idf, list_of_cols="all", drop_cols=[], pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) pre_existing_model Boolean argument – True or False. True if model files (25/50/75 percentile for each feature) exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns. Returns ------- DataFrame Rescaled Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Standardization Performed - No numerical column(s) to transform" ) return idf if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if pre_existing_model: df_model = spark.read.parquet(model_path + "/IQR_standardization") parameters = [] for i in list_of_cols: mapped_value = ( df_model.where(F.col("feature") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) parameters.append(mapped_value) else: parameters = idf.approxQuantile(list_of_cols, [0.25, 0.5, 0.75], 0.01) excluded_cols = [] for i, param in zip(list_of_cols, parameters): if len(param) > 0: if round(param[0], 5) == round(param[2], 5): excluded_cols.append(i) else: excluded_cols.append(i) if len(excluded_cols) > 0: warnings.warn( "The following column(s) are excluded from standardization because the 75th and 25th percentiles are the same:" + str(excluded_cols) ) odf = idf for index, i in enumerate(list_of_cols): if i not in excluded_cols: modify_col = (i + "_scaled") if (output_mode == "append") else i odf = odf.withColumn( modify_col, (F.col(i) - parameters[index][1]) / (parameters[index][2] - parameters[index][0]), ) if (not pre_existing_model) & (model_path != "NA"): df_model = spark.createDataFrame( zip(list_of_cols, parameters), schema=["feature", "parameters"] ) df_model.coalesce(1).write.parquet( model_path + "/IQR_standardization", mode="overwrite" ) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [ (i + "_scaled") for i in list_of_cols if i not in excluded_cols ] print("Before: ") idf.select(list_of_cols).describe().show(5, False) print("After: ") odf.select(output_cols).describe().show(5, False) return odf
def PCA_latentFeatures(spark, idf, list_of_cols='all', drop_cols=[], explained_variance_cutoff=0.95, pre_existing_model=False, model_path='NA', standardization=True, standardization_configs={'pre_existing_model': False, 'model_path': 'NA'}, imputation=False, imputation_configs={'imputation_function': 'imputation_MMM'}, stats_missing={}, output_mode='replace', run_type='local', root_path='', auth_key='NA', print_impact=False)
-
Similar to autoencoder_latentFeatures, PCA_latentFeatures also generates latent features which reduces the dimensionality of the input dataframe but through a different technique: Principal Component Analysis (PCA). PCA algorithm produces principal components such that it can describe most of the remaining variance and all the principal components are orthogonal to each other. The final number of generated principal components is controlled by the input explained_variance_cutoff. In other words, the number of selected principal components, k, is the minimum value such that top k components (latent features) can explain at least explained_variance_cutoff of the total variance.
Standardization is highly recommended if the input attributes are not of the same scale. Otherwise the generated latent features might be dominated by the attributes with larger variance. Inputs standardization and standardization_configs can be set accordingly to perform standardization within the function. In addition, data imputation is also recommended if missing values exist, which can be done within the function by setting inputs imputation and imputation_configs.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
explained_variance_cutoff
- Determines the number of latent columns in the output. If N is the smallest integer such that top N latent columns explain more than explained_variance_cutoff variance, these N columns will be selected. (Default value = 0.95)
pre_existing_model
- Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
standardization
- Boolean argument – True or False. True, if the standardization required. (Default value = True)
standardization_configs
- z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
imputation
- Boolean argument – True or False. True, if the imputation required. (Default value = False)
imputation_configs
- Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
stats_missing
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
- "replace", "append".
“replace” option replaces original columns with transformed columns: latent_
. “append” option append transformed columns with format latent_ to the input dataset, e.g. latent_0, latent_1. (Default value = "replace") run_type
- "local", "emr", "databricks", "ak8s" (Default value = "local")
root_path
- This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is ""
auth_key
- Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA"
print_impact
- True, False This argument is to print descriptive statistics of the latest features (Default value = False)
Returns
DataFrame
- Dataframe with Latent Features
Expand source code
def PCA_latentFeatures( spark, idf, list_of_cols="all", drop_cols=[], explained_variance_cutoff=0.95, pre_existing_model=False, model_path="NA", standardization=True, standardization_configs={"pre_existing_model": False, "model_path": "NA"}, imputation=False, imputation_configs={"imputation_function": "imputation_MMM"}, stats_missing={}, output_mode="replace", run_type="local", root_path="", auth_key="NA", print_impact=False, ): """ Similar to autoencoder_latentFeatures, PCA_latentFeatures also generates latent features which reduces the dimensionality of the input dataframe but through a different technique: Principal Component Analysis (PCA). PCA algorithm produces principal components such that it can describe most of the remaining variance and all the principal components are orthogonal to each other. The final number of generated principal components is controlled by the input *explained_variance_cutoff*. In other words, the number of selected principal components, k, is the minimum value such that top k components (latent features) can explain at least *explained_variance_cutoff* of the total variance. Standardization is highly recommended if the input attributes are not of the same scale. Otherwise the generated latent features might be dominated by the attributes with larger variance. Inputs *standardization* and *standardization_configs* can be set accordingly to perform standardization within the function. In addition, data imputation is also recommended if missing values exist, which can be done within the function by setting inputs *imputation* and *imputation_configs*. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) explained_variance_cutoff Determines the number of latent columns in the output. If N is the smallest integer such that top N latent columns explain more than explained_variance_cutoff variance, these N columns will be selected. (Default value = 0.95) pre_existing_model Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. standardization Boolean argument – True or False. True, if the standardization required. (Default value = True) standardization_configs z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False) imputation Boolean argument – True or False. True, if the imputation required. (Default value = False) imputation_configs Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"}) stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed columns: latent_<col_index>. “append” option append transformed columns with format latent_<col_index> to the input dataset, e.g. latent_0, latent_1. (Default value = "replace") run_type "local", "emr", "databricks", "ak8s" (Default value = "local") root_path This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is "" auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False This argument is to print descriptive statistics of the latest features (Default value = False) Returns ------- DataFrame Dataframe with Latent Features """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Latent Features Generated - No Column(s) to Transform") return idf if stats_missing == {}: missing_df = missingCount_computation(spark, idf, list_of_cols) missing_df.write.parquet( root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation", mode="overwrite", ) stats_missing = { "file_path": root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation", "file_type": "parquet", } else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(list_of_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "The following column(s) are excluded from dimensionality reduction as all values are null: " + ",".join(empty_cols) ) list_of_cols = [e for e in list_of_cols if e not in empty_cols] if standardization: idf_standardized = z_standardization( spark, idf, list_of_cols=list_of_cols, output_mode="append", **standardization_configs, ) list_of_cols_scaled = [ i + "_scaled" for i in list_of_cols if (i + "_scaled") in idf_standardized.columns ] else: idf_standardized = idf for i in list_of_cols: idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i)) list_of_cols_scaled = [i + "_scaled" for i in list_of_cols] if imputation: all_functions = globals().copy() all_functions.update(locals()) f = all_functions.get(imputation_configs["imputation_function"]) args = copy.deepcopy(imputation_configs) args.pop("imputation_function", None) missing_df_scaled = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled"))) ) missing_df_scaled.write.parquet( root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled", mode="overwrite", ) stats_missing_scaled = { "file_path": root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled", "file_type": "parquet", } idf_imputed = f( spark, idf_standardized, list_of_cols_scaled, stats_missing=stats_missing_scaled, **args, ) else: idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled) idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() assembler = VectorAssembler(inputCols=list_of_cols_scaled, outputCol="features") assembled_data = assembler.transform(idf_imputed).drop(*list_of_cols_scaled) if pre_existing_model: pca = PCA.load(model_path + "/PCA_latentFeatures/pca_path") pcaModel = PCAModel.load(model_path + "/PCA_latentFeatures/pcaModel_path") n = pca.getK() else: pca = PCA( k=len(list_of_cols_scaled), inputCol="features", outputCol="features_pca" ) pcaModel = pca.fit(assembled_data) explained_variance = 0 for n in range(1, len(list_of_cols) + 1): explained_variance += pcaModel.explainedVariance[n - 1] if explained_variance > explained_variance_cutoff: break pca = PCA(k=n, inputCol="features", outputCol="features_pca") pcaModel = pca.fit(assembled_data) if (not pre_existing_model) & (model_path != "NA"): pcaModel.write().overwrite().save( model_path + "/PCA_latentFeatures/pcaModel_path" ) pca.write().overwrite().save(model_path + "/PCA_latentFeatures/pca_path") def vector_to_array(v): return v.toArray().tolist() f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType())) odf = ( pcaModel.transform(assembled_data) .withColumn("features_pca_array", f_vector_to_array("features_pca")) .drop(*["features", "features_pca"]) ) odf_schema = odf.schema for i in range(0, n): odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType())) odf = ( odf.rdd.map(lambda x: (*x, *x["features_pca_array"])) .toDF(schema=odf_schema) .drop("features_pca_array") .replace(float("nan"), None, subset=["latent_" + str(j) for j in range(0, n)]) ) if output_mode == "replace": odf = odf.drop(*list_of_cols) if print_impact: print("Explained Variance: ", round(np.sum(pcaModel.explainedVariance[0:n]), 4)) output_cols = ["latent_" + str(j) for j in range(0, n)] odf.select(output_cols).describe().show(5, False) return odf
def attribute_binning(spark, idf, list_of_cols='all', drop_cols=[], method_type='equal_range', bin_size=10, bin_dtype='numerical', pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)
-
Attribute binning (or discretization) is a method of numerical attribute into discrete (integer or categorical values) using pre-defined number of bins. This data pre-processing technique is used to reduce the effects of minor observation errors. Also, Binning introduces non-linearity and tends to improve the performance of the model. In this function, we are focussing on unsupervised way of binning i.e. without taking the target variable into account - Equal Range Binning, Equal Frequency Binning. In Equal Range method, each bin is of equal size/width and computed as:
w = max- min / no. of bins
bins cutoff=[min, min+w,min+2w…..,max-w,max]
whereas in Equal Frequency binning method, bins are created in such a way that each bin has equal no. of rows, though the width of bins may vary from each other.
w = 1 / no. of bins
bins cutoff=[min, wthpctile, 2wthpctile….,max ]
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
method_type
- "equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
bin_size
- Number of bins. (Default value = 10)
bin_dtype
- "numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical")
pre_existing_model
- Boolean argument – True or False. True if binning model exists already, False Otherwise. (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace")
print_impact
- True, False (Default value = False) This argument is to print the number of categories generated for each attribute (may or may be not same as bin_size)
Returns
DataFrame
- Binned Dataframe
Expand source code
def attribute_binning( spark, idf, list_of_cols="all", drop_cols=[], method_type="equal_range", bin_size=10, bin_dtype="numerical", pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Attribute binning (or discretization) is a method of numerical attribute into discrete (integer or categorical values) using pre-defined number of bins. This data pre-processing technique is used to reduce the effects of minor observation errors. Also, Binning introduces non-linearity and tends to improve the performance of the model. In this function, we are focussing on unsupervised way of binning i.e. without taking the target variable into account - Equal Range Binning, Equal Frequency Binning. In Equal Range method, each bin is of equal size/width and computed as: w = max- min / no. of bins *bins cutoff=[min, min+w,min+2w…..,max-w,max]* whereas in Equal Frequency binning method, bins are created in such a way that each bin has equal no. of rows, though the width of bins may vary from each other. w = 1 / no. of bins *bins cutoff=[min, wthpctile, 2wthpctile….,max ]* Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range") bin_size Number of bins. (Default value = 10) bin_dtype "numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical") pre_existing_model Boolean argument – True or False. True if binning model exists already, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print the number of categories generated for each attribute (may or may be not same as bin_size) Returns ------- DataFrame Binned Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Binning Performed - No numerical column(s) to transform") return idf if method_type not in ("equal_frequency", "equal_range"): raise TypeError("Invalid input for method_type") if bin_size < 2: raise TypeError("Invalid input for bin_size") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if pre_existing_model: df_model = spark.read.parquet(model_path + "/attribute_binning") bin_cutoffs = [] for i in list_of_cols: mapped_value = ( df_model.where(F.col("attribute") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) bin_cutoffs.append(mapped_value) else: if method_type == "equal_frequency": pctile_width = 1 / bin_size pctile_cutoff = [] for j in range(1, bin_size): pctile_cutoff.append(j * pctile_width) bin_cutoffs = idf.approxQuantile(list_of_cols, pctile_cutoff, 0.01) else: funs = [F.max, F.min] exprs = [f(F.col(c)) for f in funs for c in list_of_cols] list_result = idf.groupby().agg(*exprs).rdd.flatMap(lambda x: x).collect() bin_cutoffs = [] drop_col_process = [] for i in range(int(len(list_result) / 2)): bin_cutoff = [] max_val = list_result[i] min_val = list_result[i + int(len(list_result) / 2)] if not max_val and max_val != 0: drop_col_process.append(list_of_cols[i]) continue bin_width = (max_val - min_val) / bin_size for j in range(1, bin_size): bin_cutoff.append(min_val + j * bin_width) bin_cutoffs.append(bin_cutoff) if drop_col_process: warnings.warn( "Columns contains too much null values. Dropping " + ", ".join(drop_col_process) ) list_of_cols = list( set([e for e in list_of_cols if e not in drop_col_process]) ) if model_path != "NA": df_model = spark.createDataFrame( zip(list_of_cols, bin_cutoffs), schema=["attribute", "parameters"] ) df_model.write.parquet(model_path + "/attribute_binning", mode="overwrite") def bucket_label(value, index): if value is None: return None for i in range(0, len(bin_cutoffs[index])): if value <= bin_cutoffs[index][i]: if bin_dtype == "numerical": return i + 1 else: if i == 0: return "<= " + str(round(bin_cutoffs[index][i], 4)) else: return ( str(round(bin_cutoffs[index][i - 1], 4)) + "-" + str(round(bin_cutoffs[index][i], 4)) ) else: next if bin_dtype == "numerical": return len(bin_cutoffs[0]) + 1 else: return "> " + str(round(bin_cutoffs[index][len(bin_cutoffs[0]) - 1], 4)) if bin_dtype == "numerical": f_bucket_label = F.udf(bucket_label, T.IntegerType()) else: f_bucket_label = F.udf(bucket_label, T.StringType()) odf = idf for idx, i in enumerate(list_of_cols): odf = odf.withColumn(i + "_binned", f_bucket_label(F.col(i), F.lit(idx))) if output_mode == "replace": for col in list_of_cols: odf = odf.drop(col).withColumnRenamed(col + "_binned", col) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_binned") for i in list_of_cols] uniqueCount_computation(spark, odf, output_cols).show(len(output_cols), False) return odf
def auto_imputation(spark, idf, list_of_cols='missing', drop_cols=[], id_col='', null_pct=0.1, stats_missing={}, output_mode='replace', run_type='local', root_path='', auth_key='NA', print_impact=True)
-
auto_imputation tests for 5 imputation methods using the other imputation functions provided in this module and returns the one with the best performance. The 5 methods are: (1) imputation_MMM with method_type="mean" (2) imputation_MMM with method_type="median" (3) imputation_sklearn with method_type="KNN" (4) imputation_sklearn with method_type="regression" (5) imputation_matrixFactorization
Samples without missing values in attributes to impute are used for testing by removing some % of values and impute them again using the above 5 methods. RMSE/attribute_mean is used as the evaluation metric for each attribute to reduce the effect of unit difference among attributes. The final error of a method is calculated by the sum of ( RMSE/attribute_mean) for all numerical attributes to impute and the method with the least error will be selected.
The above testing is only applicable for numerical attributes. If categorical attributes are included, they will be automatically imputed using imputation_MMM. In addition, if there is only one numerical attribute to impute, only method (1) and (2) will be tested because the rest of the methods require more than one column.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
id_col
- ID column (Default value = "")
null_pct
- proportion of the valid input data to be replaced by None to form the test data (Default value = 0.1)
stats_missing
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
run_type
- "local", "emr", "databricks", "ak8s" (Default value = "local")
root_path
- This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is ""
auth_key
- Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA"
print_impact
- True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. It also print the name of best performing imputation method along with RMSE details.
Returns
DataFrame
- Imputed Dataframe
Expand source code
def auto_imputation( spark, idf, list_of_cols="missing", drop_cols=[], id_col="", null_pct=0.1, stats_missing={}, output_mode="replace", run_type="local", root_path="", auth_key="NA", print_impact=True, ): """ auto_imputation tests for 5 imputation methods using the other imputation functions provided in this module and returns the one with the best performance. The 5 methods are: (1) imputation_MMM with method_type="mean" (2) imputation_MMM with method_type="median" (3) imputation_sklearn with method_type="KNN" (4) imputation_sklearn with method_type="regression" (5) imputation_matrixFactorization Samples without missing values in attributes to impute are used for testing by removing some % of values and impute them again using the above 5 methods. RMSE/attribute_mean is used as the evaluation metric for each attribute to reduce the effect of unit difference among attributes. The final error of a method is calculated by the sum of ( RMSE/attribute_mean) for all numerical attributes to impute and the method with the least error will be selected. The above testing is only applicable for numerical attributes. If categorical attributes are included, they will be automatically imputed using imputation_MMM. In addition, if there is only one numerical attribute to impute, only method (1) and (2) will be tested because the rest of the methods require more than one column. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) id_col ID column (Default value = "") null_pct proportion of the valid input data to be replaced by None to form the test data (Default value = 0.1) stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") run_type "local", "emr", "databricks", "ak8s" (Default value = "local") root_path This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is "" auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. It also print the name of best performing imputation method along with RMSE details. Returns ------- DataFrame Imputed Dataframe """ if stats_missing == {}: missing_df = missingCount_computation(spark, idf) missing_df.write.parquet( root_path + "intermediate_data/imputation_comparison/missingCount_computation", mode="overwrite", ) stats_missing = { "file_path": root_path + "intermediate_data/imputation_comparison/missingCount_computation", "file_type": "parquet", } else: missing_df = read_dataset(spark, **stats_missing).select( "attribute", "missing_count", "missing_pct" ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn("Following columns have all null values: " + ",".join(empty_cols)) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if list_of_cols == "all": list_of_cols = idf.columns if list_of_cols == "missing": list_of_cols = missing_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if (e not in drop_cols) & (e != id_col)]) ) if any(x not in idf.columns for x in list_of_cols): raise TypeError("Invalid input for Column(s)") del_cols = [e for e in list_of_cols if e in empty_cols] odf_del = idf.drop(*del_cols) list_of_cols = [e for e in list_of_cols if e not in empty_cols] num_cols, cat_cols, other_cols = attributeType_segregation( odf_del.select(list_of_cols) ) missing_catcols = [e for e in cat_cols if e in missing_cols] missing_numcols = [e for e in num_cols if e in missing_cols] if missing_catcols: odf_imputed_cat = imputation_MMM( spark, odf_del, list_of_cols=missing_catcols, stats_missing=stats_missing ) else: odf_imputed_cat = odf_del if len(missing_numcols) == 0: warnings.warn( "No Imputation Performed for numerical columns - No Column(s) to Impute" ) return odf_imputed_cat idf_test = ( odf_imputed_cat.dropna(subset=missing_numcols) .withColumn("index", F.monotonically_increasing_id()) .withColumn("index", F.row_number().over(Window.orderBy("index"))) ) null_count = int(null_pct * idf_test.count()) idf_null = idf_test for i in missing_numcols: null_index = random.sample(range(idf_test.count()), null_count) idf_null = idf_null.withColumn( i, F.when(F.col("index").isin(null_index), None).otherwise(F.col(i)) ) idf_null.write.parquet( root_path + "intermediate_data/imputation_comparison/test_dataset", mode="overwrite", ) idf_null = spark.read.parquet( root_path + "intermediate_data/imputation_comparison/test_dataset" ) method1 = imputation_MMM( spark, idf_null, list_of_cols=missing_numcols, method_type="mean", stats_missing=stats_missing, output_mode=output_mode, ) method2 = imputation_MMM( spark, idf_null, list_of_cols=missing_numcols, method_type="median", stats_missing=stats_missing, output_mode=output_mode, ) valid_methods = [method1, method2] if len(num_cols) > 1: method3 = imputation_sklearn( spark, idf_null, list_of_cols=num_cols, method_type="KNN", stats_missing=stats_missing, output_mode=output_mode, auth_key=auth_key, ) method4 = imputation_sklearn( spark, idf_null, list_of_cols=num_cols, method_type="regression", stats_missing=stats_missing, output_mode=output_mode, auth_key=auth_key, ) method5 = imputation_matrixFactorization( spark, idf_null, list_of_cols=num_cols, id_col=id_col, stats_missing=stats_missing, output_mode=output_mode, ) valid_methods = [method1, method2, method3, method4, method5] nrmse_all = [] method_all = ["MMM-mean", "MMM-median", "KNN", "regression", "matrix_factorization"] for index, method in enumerate(valid_methods): nrmse = 0 for i in missing_numcols: pred_col = (i + "_imputed") if output_mode == "append" else i idf_joined = ( idf_test.select("index", F.col(i).alias("val")) .join( method.select("index", F.col(pred_col).alias("pred")), "index", "left_outer", ) .dropna() ) idf_joined = recast_column( idf=idf_joined, list_of_cols=["val", "pred"], list_of_dtypes=["double", "double"], ) pred_mean = float( method.select(F.mean(pred_col)).rdd.flatMap(lambda x: x).collect()[0] ) i_nrmse = ( RegressionEvaluator( metricName="rmse", labelCol="val", predictionCol="pred" ).evaluate(idf_joined) ) / abs(pred_mean) nrmse += i_nrmse nrmse_all.append(nrmse) min_index = nrmse_all.index(np.min(nrmse_all)) best_method = method_all[min_index] odf = valid_methods[min_index] if print_impact: print(list(zip(method_all, nrmse_all))) print("Best Imputation Method: ", best_method) return odf
def autoencoder_latentFeatures(spark, idf, list_of_cols='all', drop_cols=[], reduction_params=0.5, sample_size=500000, epochs=100, batch_size=256, pre_existing_model=False, model_path='NA', standardization=True, standardization_configs={'pre_existing_model': False, 'model_path': 'NA'}, imputation=False, imputation_configs={'imputation_function': 'imputation_MMM'}, stats_missing={}, output_mode='replace', run_type='local', root_path='', auth_key='NA', print_impact=False)
-
Many machine learning models suffer from "the curse of dimensionality" when the number of features is too large. autoencoder_latentFeatures is able to reduce the dimensionality by compressing input attributes to a smaller number of latent features.
To be more specific, it trains a neural network model using TensorFlow library. The neural network contains an encoder and a decoder, where the encoder learns to represent the input using smaller number of latent features controlled by the input reduction_params and the decoder learns to reproduce the input using the latent features. In the end, only the encoder will be kept and the latent features generated by the encoder will be added to the output dataframe.
However, the neural network model is not trained in a scalable manner, which might not be able to handle large input dataframe. Thus, an input sample_size (the default value is 500,000) can be set to control the number of samples to be used to train the model. If the total number of samples exceeds sample_size, the rest of the samples will be predicted using the fitted encoder.
Standardization is highly recommended if the input attributes are not of the same scale. Otherwise, the model might not converge smoothly. Inputs standardization and standardization_configs can be set accordingly to perform standardization within the function. In addition, if a sample contains missing values in the model input, the output values for its latent features will all be None. Thus data imputation is also recommended if missing values exist, which can be done within the function by setting inputs imputation and imputation_configs.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
reduction_params
- Determines the number of encoded features in the result.
If reduction_params < 1, int(reduction_params *
) columns will be generated. Else, reduction_params columns will be generated. (Default value = 0.5) sample_size
- Maximum rows for training the autoencoder model using tensorflow. (Default value = 500000)
epochs
- Number of epochs to train the tensorflow model. (Default value = 100)
batch_size
- Number of samples per gradient update when fitting the tensorflow model. (Default value = 256)
pre_existing_model
- Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
standardization
- Boolean argument – True or False. True, if the standardization required. (Default value = True)
standardization_configs
- z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
imputation
- Boolean argument – True or False. True, if the imputation required. (Default value = False)
imputation_configs
- Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
stats_missing
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
- "replace", "append".
“replace” option replaces original columns with transformed columns: latent_
. “append” option append transformed columns with format latent_ to the input dataset, e.g. latent_0, latent_1 will be appended if reduction_params=2. (Default value = "replace") run_type
- "local", "emr", "databricks", "ak8s" (Default value = "local")
root_path
- This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is ""
auth_key
- Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA"
print_impact
- True, False This argument is to print descriptive statistics of the latest features (Default value = False)
Returns
DataFrame
- Dataframe with Latent Features
Expand source code
def autoencoder_latentFeatures( spark, idf, list_of_cols="all", drop_cols=[], reduction_params=0.5, sample_size=500000, epochs=100, batch_size=256, pre_existing_model=False, model_path="NA", standardization=True, standardization_configs={"pre_existing_model": False, "model_path": "NA"}, imputation=False, imputation_configs={"imputation_function": "imputation_MMM"}, stats_missing={}, output_mode="replace", run_type="local", root_path="", auth_key="NA", print_impact=False, ): """ Many machine learning models suffer from "the curse of dimensionality" when the number of features is too large. autoencoder_latentFeatures is able to reduce the dimensionality by compressing input attributes to a smaller number of latent features. To be more specific, it trains a neural network model using TensorFlow library. The neural network contains an encoder and a decoder, where the encoder learns to represent the input using smaller number of latent features controlled by the input *reduction_params* and the decoder learns to reproduce the input using the latent features. In the end, only the encoder will be kept and the latent features generated by the encoder will be added to the output dataframe. However, the neural network model is not trained in a scalable manner, which might not be able to handle large input dataframe. Thus, an input *sample_size* (the default value is 500,000) can be set to control the number of samples to be used to train the model. If the total number of samples exceeds *sample_size*, the rest of the samples will be predicted using the fitted encoder. Standardization is highly recommended if the input attributes are not of the same scale. Otherwise, the model might not converge smoothly. Inputs *standardization* and *standardization_configs* can be set accordingly to perform standardization within the function. In addition, if a sample contains missing values in the model input, the output values for its latent features will all be None. Thus data imputation is also recommended if missing values exist, which can be done within the function by setting inputs *imputation* and *imputation_configs*. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) reduction_params Determines the number of encoded features in the result. If reduction_params < 1, int(reduction_params * <number of columns>) columns will be generated. Else, reduction_params columns will be generated. (Default value = 0.5) sample_size Maximum rows for training the autoencoder model using tensorflow. (Default value = 500000) epochs Number of epochs to train the tensorflow model. (Default value = 100) batch_size Number of samples per gradient update when fitting the tensorflow model. (Default value = 256) pre_existing_model Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. standardization Boolean argument – True or False. True, if the standardization required. (Default value = True) standardization_configs z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False) imputation Boolean argument – True or False. True, if the imputation required. (Default value = False) imputation_configs Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"}) stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed columns: latent_<col_index>. “append” option append transformed columns with format latent_<col_index> to the input dataset, e.g. latent_0, latent_1 will be appended if reduction_params=2. (Default value = "replace") run_type "local", "emr", "databricks", "ak8s" (Default value = "local") root_path This argument takes in a base folder path for writing out intermediate_data/ folder to. Default value is "" auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False This argument is to print descriptive statistics of the latest features (Default value = False) Returns ------- DataFrame Dataframe with Latent Features """ if "arm64" in platform.version().lower(): warnings.warn( "This function is currently not supported for ARM64 - Mac M1 Machine" ) return idf num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Latent Features Generated - No Column(s) to Transform") return idf if stats_missing == {}: missing_df = missingCount_computation(spark, idf, list_of_cols) missing_df.write.parquet( root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation", mode="overwrite", ) stats_missing = { "file_path": root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation", "file_type": "parquet", } else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(list_of_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "The following column(s) are excluded from dimensionality reduction as all values are null: " + ",".join(empty_cols) ) list_of_cols = [e for e in list_of_cols if e not in empty_cols] if standardization: idf_standardized = z_standardization( spark, idf, list_of_cols=list_of_cols, output_mode="append", **standardization_configs, ) list_of_cols_scaled = [ i + "_scaled" for i in list_of_cols if (i + "_scaled") in idf_standardized.columns ] else: idf_standardized = idf for i in list_of_cols: idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i)) list_of_cols_scaled = [i + "_scaled" for i in list_of_cols] if imputation: all_functions = globals().copy() all_functions.update(locals()) f = all_functions.get(imputation_configs["imputation_function"]) args = copy.deepcopy(imputation_configs) args.pop("imputation_function", None) missing_df_scaled = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled"))) ) missing_df_scaled.write.parquet( root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation_scaled", mode="overwrite", ) stats_missing_scaled = { "file_path": root_path + "intermediate_data/autoencoder_latentFeatures/missingCount_computation_scaled", "file_type": "parquet", } idf_imputed = f( spark, idf_standardized, list_of_cols_scaled, stats_missing=stats_missing_scaled, **args, ) else: idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled) n_inputs = len(list_of_cols_scaled) if reduction_params < 1: n_bottleneck = int(reduction_params * n_inputs) else: n_bottleneck = int(reduction_params) if pre_existing_model: if run_type == "emr": bash_cmd = ( "aws s3 cp " + model_path + "/autoencoders_latentFeatures/encoder.h5 ." ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( "aws s3 cp " + model_path + "/autoencoders_latentFeatures/model.h5 ." ) output = subprocess.check_output(["bash", "-c", bash_cmd]) encoder = load_model("encoder.h5") model = load_model("model.h5") elif run_type == "ak8s": bash_cmd = ( 'azcopy cp "' + model_path + "/autoencoders_latentFeatures/encoder.h5" + str(auth_key) + '" .' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( 'azcopy cp "' + model_path + "/autoencoders_latentFeatures/model.h5" + str(auth_key) + '" .' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) encoder = load_model("encoder.h5") model = load_model("model.h5") else: encoder = load_model(model_path + "/autoencoders_latentFeatures/encoder.h5") model = load_model(model_path + "/autoencoders_latentFeatures/model.h5") else: idf_valid = idf_imputed.select(list_of_cols_scaled) idf_model = idf_valid.sample( False, min(1.0, float(sample_size) / idf_valid.count()), 0 ) idf_train = idf_model.sample(False, 0.8, 0) idf_test = idf_model.subtract(idf_train) X_train = idf_train.toPandas() X_test = idf_test.toPandas() visible = Input(shape=(n_inputs,)) e = Dense(n_inputs * 2)(visible) e = BatchNormalization()(e) e = LeakyReLU()(e) e = Dense(n_inputs)(e) e = BatchNormalization()(e) e = LeakyReLU()(e) bottleneck = Dense(n_bottleneck)(e) d = Dense(n_inputs)(bottleneck) d = BatchNormalization()(d) d = LeakyReLU()(d) d = Dense(n_inputs * 2)(d) d = BatchNormalization()(d) d = LeakyReLU()(d) output = Dense(n_inputs, activation="linear")(d) model = Model(inputs=visible, outputs=output) encoder = Model(inputs=visible, outputs=bottleneck) model.compile(optimizer="adam", loss="mse") history = model.fit( X_train, X_train, epochs=int(epochs), batch_size=int(batch_size), verbose=2, validation_data=(X_test, X_test), ) if (not pre_existing_model) & (model_path != "NA"): if run_type == "emr": encoder.save("encoder.h5") model.save("model.h5") bash_cmd = ( "aws s3 cp encoder.h5 " + model_path + "/autoencoders_latentFeatures/encoder.h5" ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( "aws s3 cp model.h5 " + model_path + "/autoencoders_latentFeatures/model.h5" ) output = subprocess.check_output(["bash", "-c", bash_cmd]) elif run_type == "ak8s": encoder.save("encoder.h5") model.save("model.h5") bash_cmd = ( 'azcopy cp "encoder.h5" "' + model_path + "/autoencoders_latentFeatures/encoder.h5" + str(auth_key) + '" ' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) bash_cmd = ( 'azcopy cp "model.h5" "' + model_path + "/autoencoders_latentFeatures/model.h5" + str(auth_key) + '" ' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) else: if not os.path.exists(model_path + "/autoencoders_latentFeatures/"): os.makedirs(model_path + "/autoencoders_latentFeatures/") encoder.save(model_path + "/autoencoders_latentFeatures/encoder.h5") model.save(model_path + "/autoencoders_latentFeatures/model.h5") class ModelWrapperPickable: def __init__(self, model): self.model = model def __getstate__(self): model_str = "" with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd: tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True) model_str = fd.read() d = {"model_str": model_str} return d def __setstate__(self, state): with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd: fd.write(state["model_str"]) fd.flush() self.model = tensorflow.keras.models.load_model(fd.name) model_wrapper = ModelWrapperPickable(encoder) def compute_output_pandas_udf(model_wrapper): @F.pandas_udf(returnType=T.ArrayType(T.DoubleType())) def predict_pandas_udf(*cols): X = pd.concat(cols, axis=1) return pd.Series(row.tolist() for row in model_wrapper.model.predict(X)) return predict_pandas_udf odf = idf_imputed.withColumn( "predicted_output", compute_output_pandas_udf(model_wrapper)(*list_of_cols_scaled), ) odf_schema = odf.schema for i in range(0, n_bottleneck): odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType())) odf = ( odf.rdd.map(lambda x: (*x, *x["predicted_output"])) .toDF(schema=odf_schema) .drop("predicted_output") ) odf = odf.drop(*list_of_cols_scaled) if output_mode == "replace": odf = odf.drop(*list_of_cols) if print_impact: output_cols = ["latent_" + str(j) for j in range(0, n_bottleneck)] odf.select(output_cols).describe().show(5, False) return odf
def boxcox_transformation(idf, list_of_cols='all', drop_cols=[], boxcox_lambda=None, output_mode='replace', print_impact=False)
-
Some machine learning algorithms require the input data to follow normal distributions. Thus, when the input data is too skewed, boxcox_transformation can be used to transform it into a more normal-like distribution. The transformed value of a sample x depends on a coefficient lambda: (1) if lambda = 0, x is transformed into log(x); (2) if lambda !=0, x is transformed into (x^lambda-1)/lambda.
The value of lambda can be either specified by the user or automatically selected within the function. If lambda needs to be selected within the function, a range of values (0, 1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5) will be tested and the lambda, which optimizes the KolmogorovSmirnovTest by PySpark with the theoretical distribution being normal, will be used to perform the transformation. Different lambda values can be assigned to different attributes but one attribute can only be assigned one lambda value.
Parameters
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
boxcox_lambda
- Lambda value for box_cox transormation. If boxcox_lambda is not None, it will be directly used for the transformation. It can be a (1) list: each element represents a lambda value for an attribute and the length of the list must be the same as the number of columns to transform. (2) int/float: all attributes will be assigned the same lambda value. Else, search for the best lambda among [1,-1,0.5,-0.5,2,-2,0.25,-0.25,3,-3,4,-4,5,-5] for each column and apply the transformation (Default value = None)
output_mode
- "replace", "append".
“replace” option replaces original columns with transformed columns.
“append” option append transformed columns with a postfix "bxcx
" to the input dataset. (Default value = "replace") print_impact
- True, False This argument is to print before and after descriptive statistics of the transformed features (Default value = False)
Returns
DataFrame
- Transformed Dataframe
Expand source code
def boxcox_transformation( idf, list_of_cols="all", drop_cols=[], boxcox_lambda=None, output_mode="replace", print_impact=False, ): """ Some machine learning algorithms require the input data to follow normal distributions. Thus, when the input data is too skewed, boxcox_transformation can be used to transform it into a more normal-like distribution. The transformed value of a sample x depends on a coefficient lambda: (1) if lambda = 0, x is transformed into log(x); (2) if lambda !=0, x is transformed into (x^lambda-1)/lambda. The value of lambda can be either specified by the user or automatically selected within the function. If lambda needs to be selected within the function, a range of values (0, 1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5) will be tested and the lambda, which optimizes the KolmogorovSmirnovTest by PySpark with the theoretical distribution being normal, will be used to perform the transformation. Different lambda values can be assigned to different attributes but one attribute can only be assigned one lambda value. Parameters ---------- idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) boxcox_lambda Lambda value for box_cox transormation. If boxcox_lambda is not None, it will be directly used for the transformation. It can be a (1) list: each element represents a lambda value for an attribute and the length of the list must be the same as the number of columns to transform. (2) int/float: all attributes will be assigned the same lambda value. Else, search for the best lambda among [1,-1,0.5,-0.5,2,-2,0.25,-0.25,3,-3,4,-4,5,-5] for each column and apply the transformation (Default value = None) output_mode "replace", "append". “replace” option replaces original columns with transformed columns. “append” option append transformed columns with a postfix "_bxcx_<lambda>" to the input dataset. (Default value = "replace") print_impact True, False This argument is to print before and after descriptive statistics of the transformed features (Default value = False) Returns ------- DataFrame Transformed Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if (len(list_of_cols) == 0) | (any(x not in num_cols for x in list_of_cols)): raise TypeError("Invalid input for Column(s)") num_cols = attributeType_segregation(idf.select(list_of_cols))[0] list_of_cols = num_cols odf = idf col_mins = idf.select([F.min(i) for i in list_of_cols]) if any([i <= 0 for i in col_mins.rdd.flatMap(lambda x: x).collect()]): col_mins.show(1, False) raise ValueError("Data must be positive") if boxcox_lambda is not None: if isinstance(boxcox_lambda, (list, tuple)): if len(boxcox_lambda) != len(list_of_cols): raise TypeError("Invalid input for boxcox_lambda") elif not all([isinstance(l, (float, int)) for l in boxcox_lambda]): raise TypeError("Invalid input for boxcox_lambda") else: boxcox_lambda_list = list(boxcox_lambda) elif isinstance(boxcox_lambda, (float, int)): boxcox_lambda_list = [boxcox_lambda] * len(list_of_cols) else: raise TypeError("Invalid input for boxcox_lambda") else: boxcox_lambda_list = [] for i in list_of_cols: lambdaVal = [1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5] best_pVal = 0 for j in lambdaVal: pVal = Statistics.kolmogorovSmirnovTest( odf.select(F.pow(F.col(i), j)).rdd.flatMap(lambda x: x), "norm" ).pValue if pVal > best_pVal: best_pVal = pVal best_lambdaVal = j pVal = Statistics.kolmogorovSmirnovTest( odf.select(F.log(F.col(i))).rdd.flatMap(lambda x: x), "norm" ).pValue if pVal > best_pVal: best_pVal = pVal best_lambdaVal = 0 boxcox_lambda_list.append(best_lambdaVal) output_cols = [] for i, curr_lambdaVal in zip(list_of_cols, boxcox_lambda_list): if curr_lambdaVal != 1: modify_col = ( (i + "_bxcx_" + str(curr_lambdaVal)) if output_mode == "append" else i ) output_cols.append(modify_col) if curr_lambdaVal == 0: odf = odf.withColumn(modify_col, F.log(F.col(i))) else: odf = odf.withColumn(modify_col, F.pow(F.col(i), curr_lambdaVal)) if len(output_cols) == 0: warnings.warn( "lambdaVal for all columns are 1 so no transformation is performed and idf is returned" ) return idf if print_impact: print("Transformed Columns: ", list_of_cols) print("Best BoxCox Parameter(s): ", boxcox_lambda_list) print("Before:") idf.select(list_of_cols).describe().unionByName( idf.select([F.skewness(i).alias(i) for i in list_of_cols]).withColumn( "summary", F.lit("skewness") ) ).show(6, False) print("After:") if output_mode == "replace": odf.select(list_of_cols).describe().unionByName( odf.select([F.skewness(i).alias(i) for i in list_of_cols]).withColumn( "summary", F.lit("skewness") ) ).show(6, False) else: output_cols = [("`" + i + "`") for i in output_cols] odf.select(output_cols).describe().unionByName( odf.select( [F.skewness(i).alias(i[1:-1]) for i in output_cols] ).withColumn("summary", F.lit("skewness")) ).show(6, False) return odf
def cat_to_num_supervised(spark, idf, list_of_cols='all', drop_cols=[], label_col='label', event_label=1, pre_existing_model=False, model_path='NA', output_mode='replace', persist=False, persist_option=StorageLevel(True, True, False, False, 1), print_impact=False)
-
This is a supervised method to convert a categorical attribute into a numerical attribute. It takes a label/target column to indicate whether the event is positive or negative. For each column, the positive event rate for each categorical value is used as the encoded numerical value.
For example, there are 3 distinct values in a categorical attribute X: X1, X2 and X3. Within the input dataframe, there are - 15 positive events and 5 negative events with X==X1; - 10 positive events and 40 negative events with X==X2; - 20 positive events and 20 negative events with X==X3.
Thus, value X1 is mapped to 15/(15+5) = 0.75, value X2 is mapped to 10/(10+40) = 0.2 and value X3 is mapped to 20/(20+20) = 0.5. This mapping will be applied to all values in attribute X. This encoding method can be helpful in avoiding creating too many dummy variables which may cause dimensionality issue and it also works with categorical attributes without an order or rank.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
label_col
- Label/Target column (Default value = "label")
event_label
- Value of (positive) event (i.e label 1) (Default value = 1)
pre_existing_model
- Boolean argument – True or False. True if model (original and mapped numerical value for each column) exists already, False Otherwise. (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" is used to save the model in "intermediate_data/" folder for optimization purpose.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_encoded" e.g. column X is appended as X_encoded. (Default value = "replace")
persist
- Boolean argument - True or False. This parameter is for optimization purpose. If True, repeatedly used dataframe will be persisted (StorageLevel can be specified in persist_option). We recommend setting this parameter as True if at least one of the following criteria is True: (1) The underlying data source is in csv format (2) The transformation will be applicable to most columns. (Default value = False)
persist_option
- A pyspark.StorageLevel instance. This parameter is useful only when persist is True. (Default value = pyspark.StorageLevel.MEMORY_AND_DISK)
print_impact
- True, False (Default value = False) This argument is to print out the descriptive statistics of encoded columns.
Returns
DataFrame
- Encoded Dataframe
Expand source code
def cat_to_num_supervised( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, pre_existing_model=False, model_path="NA", output_mode="replace", persist=False, persist_option=pyspark.StorageLevel.MEMORY_AND_DISK, print_impact=False, ): """ This is a supervised method to convert a categorical attribute into a numerical attribute. It takes a label/target column to indicate whether the event is positive or negative. For each column, the positive event rate for each categorical value is used as the encoded numerical value. For example, there are 3 distinct values in a categorical attribute X: X1, X2 and X3. Within the input dataframe, there are - 15 positive events and 5 negative events with X==X1; - 10 positive events and 40 negative events with X==X2; - 20 positive events and 20 negative events with X==X3. Thus, value X1 is mapped to 15/(15+5) = 0.75, value X2 is mapped to 10/(10+40) = 0.2 and value X3 is mapped to 20/(20+20) = 0.5. This mapping will be applied to all values in attribute X. This encoding method can be helpful in avoiding creating too many dummy variables which may cause dimensionality issue and it also works with categorical attributes without an order or rank. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) pre_existing_model Boolean argument – True or False. True if model (original and mapped numerical value for each column) exists already, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" is used to save the model in "intermediate_data/" folder for optimization purpose. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_encoded" e.g. column X is appended as X_encoded. (Default value = "replace") persist Boolean argument - True or False. This parameter is for optimization purpose. If True, repeatedly used dataframe will be persisted (StorageLevel can be specified in persist_option). We recommend setting this parameter as True if at least one of the following criteria is True: (1) The underlying data source is in csv format (2) The transformation will be applicable to most columns. (Default value = False) persist_option A pyspark.StorageLevel instance. This parameter is useful only when persist is True. (Default value = pyspark.StorageLevel.MEMORY_AND_DISK) print_impact True, False (Default value = False) This argument is to print out the descriptive statistics of encoded columns. Returns ------- DataFrame Encoded Dataframe """ cat_cols = attributeType_segregation(idf)[1] if list_of_cols == "all": list_of_cols = cat_cols elif isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if (e not in drop_cols) & (e != label_col)]) ) if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No Categorical Encoding - No categorical column(s) to transform") return idf if label_col not in idf.columns: raise TypeError("Invalid input for Label Column") odf = idf label_col_bool = label_col + "_cat_to_num_sup_temp" idf = idf.withColumn( label_col_bool, F.when(F.col(label_col) == event_label, "1").otherwise("0"), ) if model_path == "NA": skip_if_error = True model_path = "intermediate_data" else: skip_if_error = False save_model = True if persist: idf = idf.persist(persist_option) for index, i in enumerate(list_of_cols): if pre_existing_model: df_tmp = spark.read.csv( model_path + "/cat_to_num_supervised/" + i, header=True, inferSchema=True, ) else: df_tmp = ( idf.select(i, label_col_bool) .groupBy(i) .pivot(label_col_bool) .count() .fillna(0) .withColumn( i + "_encoded", F.round(F.col("1") / (F.col("1") + F.col("0")), 4) ) .drop(*["1", "0"]) ) if save_model: try: df_tmp.coalesce(1).write.csv( model_path + "/cat_to_num_supervised/" + i, header=True, mode="overwrite", ignoreLeadingWhiteSpace=False, ignoreTrailingWhiteSpace=False, ) df_tmp = spark.read.csv( model_path + "/cat_to_num_supervised/" + i, header=True, inferSchema=True, ) except Exception as error: if skip_if_error: warnings.warn( "For optimization purpose, we recommend specifying a valid model_path value to save the intermediate data. Saving to the default path - '" + model_path + "/cat_to_num_supervised/" + i + "' faced an error." ) save_model = False else: raise error if df_tmp.count() > 1: odf = odf.join(df_tmp, i, "left_outer") else: odf = odf.crossJoin(df_tmp) if output_mode == "replace": for i in list_of_cols: odf = odf.drop(i).withColumnRenamed(i + "_encoded", i) odf = odf.select([i for i in idf.columns if i != label_col_bool]) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_encoded") for i in list_of_cols] print("Before: ") idf.select(list_of_cols).summary("count", "min", "max").show(3, False) print("After: ") odf.select(output_cols).summary("count", "min", "max").show(3, False) if persist: idf.unpersist() return odf
def cat_to_num_transformer(spark, idf, list_of_cols, drop_cols, method_type, encoding, label_col, event_label)
-
This is method which helps converting a categorical attribute into numerical attribute(s) based on the analysis dataset. If there's a presence of label column then the relevant processing would happen through cat_to_num_supervised. However, for unsupervised scenario, the processing would happen through cat_to_num_unsupervised. Computation details can be referred from the respective functions.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them.
method_type
- Depending upon the use case the method type can be either Supervised or Unsupervised. For Supervised use case, label_col is mandatory.
encoding
- "label_encoding" or "onehot_encoding" In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available that can be selected by index_order argument). In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column.
label_col
- Label/Target column
event_label
- Value of (positive) event
Expand source code
def cat_to_num_transformer( spark, idf, list_of_cols, drop_cols, method_type, encoding, label_col, event_label ): """ This is method which helps converting a categorical attribute into numerical attribute(s) based on the analysis dataset. If there's a presence of label column then the relevant processing would happen through cat_to_num_supervised. However, for unsupervised scenario, the processing would happen through cat_to_num_unsupervised. Computation details can be referred from the respective functions. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. method_type Depending upon the use case the method type can be either Supervised or Unsupervised. For Supervised use case, label_col is mandatory. encoding "label_encoding" or "onehot_encoding" In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available that can be selected by index_order argument). In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column. label_col Label/Target column event_label Value of (positive) event """ num_cols, cat_cols, other_cols = attributeType_segregation(idf) if len(cat_cols) > 0: if list_of_cols == "all": list_of_cols = cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if (method_type == "supervised") & (label_col is not None): odf = cat_to_num_supervised( spark, idf, label_col=label_col, event_label=event_label ) odf = odf.withColumn( label_col, F.when(F.col(label_col) == event_label, F.lit(1)).otherwise(F.lit(0)), ) return odf elif (method_type == "unsupervised") & (label_col is None): odf = cat_to_num_unsupervised( spark, idf, method_type=encoding, index_order="frequencyDesc", ) return odf else: return idf
def cat_to_num_unsupervised(spark, idf, list_of_cols='all', drop_cols=[], method_type='label_encoding', index_order='frequencyDesc', cardinality_threshold=50, pre_existing_model=False, model_path='NA', stats_unique={}, output_mode='replace', print_impact=False)
-
This is unsupervised method of converting a categorical attribute into numerical attribute(s). This is among the most important transformations required for any modelling exercise, as most of the machine learning algorithms cannot process categorical values. It covers two popular encoding techniques – label encoding & one-hot encoding.
In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available – can be selected by index_order argument). One of the pitfalls of using this technique is that the model may learn some spurious relationship, which doesn't exist or might not make any logical sense in the real world settings. In one-hot encoding, every unique value in the attribute will be added as a feature in a form of dummy/binary attribute. However, using this method on high cardinality attributes can further aggravate the dimensionality issue.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
method_type
- "label_encoding" or "onehot_encoding" In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available that can be selected by index_order argument). In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column. (Default value = 1)
index_order
- "frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc". Valid only for Label Encoding method_type. (Default value = "frequencyDesc")
cardinality_threshold
- Defines threshold to skip columns with higher cardinality values from encoding - a warning is issued. (Default value = 50)
pre_existing_model
- Boolean argument - True or False. True if encoding model exists already, False Otherwise. (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre existing model nor there is a need to save one.
stats_unique
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "index" for label encoding e.g. column X is appended as X_index, or a postfix "" for one hot encoding, n varies from 0 to unique value count e.g. column X is appended as X_0, X_1, X_2 (n = 3 i.e. no. of unique values for X). (Default value = "replace")
print_impact
- True, False (Default value = False) This argument is to print out the change in schema (one hot encoding) or descriptive statistics (label encoding)
Returns
DataFrame
- Encoded Dataframe
Expand source code
def cat_to_num_unsupervised( spark, idf, list_of_cols="all", drop_cols=[], method_type="label_encoding", index_order="frequencyDesc", cardinality_threshold=50, pre_existing_model=False, model_path="NA", stats_unique={}, output_mode="replace", print_impact=False, ): """ This is unsupervised method of converting a categorical attribute into numerical attribute(s). This is among the most important transformations required for any modelling exercise, as most of the machine learning algorithms cannot process categorical values. It covers two popular encoding techniques – label encoding & one-hot encoding. In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available – can be selected by index_order argument). One of the pitfalls of using this technique is that the model may learn some spurious relationship, which doesn't exist or might not make any logical sense in the real world settings. In one-hot encoding, every unique value in the attribute will be added as a feature in a form of dummy/binary attribute. However, using this method on high cardinality attributes can further aggravate the dimensionality issue. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "label_encoding" or "onehot_encoding" In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency ordering (both ascending & descending options are available that can be selected by index_order argument). In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column. (Default value = 1) index_order "frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc". Valid only for Label Encoding method_type. (Default value = "frequencyDesc") cardinality_threshold Defines threshold to skip columns with higher cardinality values from encoding - a warning is issued. (Default value = 50) pre_existing_model Boolean argument - True or False. True if encoding model exists already, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre existing model nor there is a need to save one. stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_index" for label encoding e.g. column X is appended as X_index, or a postfix "_{n}" for one hot encoding, n varies from 0 to unique value count e.g. column X is appended as X_0, X_1, X_2 (n = 3 i.e. no. of unique values for X). (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out the change in schema (one hot encoding) or descriptive statistics (label encoding) Returns ------- DataFrame Encoded Dataframe """ cat_cols = attributeType_segregation(idf)[1] if list_of_cols == "all": list_of_cols = cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if method_type not in ("onehot_encoding", "label_encoding"): raise TypeError("Invalid input for method_type") if index_order not in ( "frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc", ): raise TypeError("Invalid input for Encoding Index Order") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if stats_unique == {}: skip_cols = ( uniqueCount_computation(spark, idf, list_of_cols) .where(F.col("unique_values") > cardinality_threshold) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: skip_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") > cardinality_threshold) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) skip_cols = list( set([e for e in skip_cols if e in list_of_cols and e not in drop_cols]) ) if skip_cols: warnings.warn( "Columns dropped from encoding due to high cardinality: " + ",".join(skip_cols) ) list_of_cols = list( set([e for e in list_of_cols if e not in drop_cols + skip_cols]) ) if len(list_of_cols) == 0: warnings.warn("No Encoding Computation - No categorical column(s) to transform") return idf list_of_cols_vec = [] list_of_cols_idx = [] for i in list_of_cols: list_of_cols_vec.append(i + "_vec") list_of_cols_idx.append(i + "_index") odf_indexed = idf if version.parse(pyspark.__version__) < version.parse("3.0.0"): for idx, i in enumerate(list_of_cols): if pre_existing_model: indexerModel = StringIndexerModel.load( model_path + "/cat_to_num_unsupervised/indexer-model/" + i ) else: stringIndexer = StringIndexer( inputCol=i, outputCol=i + "_index", stringOrderType=index_order, handleInvalid="keep", ) indexerModel = stringIndexer.fit(idf.select(i)) if model_path != "NA": indexerModel.write().overwrite().save( model_path + "/cat_to_num_unsupervised/indexer-model/" + i ) odf_indexed = indexerModel.transform(odf_indexed) else: if pre_existing_model: indexerModel = StringIndexerModel.load( model_path + "/cat_to_num_unsupervised/indexer" ) else: stringIndexer = StringIndexer( inputCols=list_of_cols, outputCols=list_of_cols_idx, stringOrderType=index_order, handleInvalid="keep", ) indexerModel = stringIndexer.fit(odf_indexed) if model_path != "NA": indexerModel.write().overwrite().save( model_path + "/cat_to_num_unsupervised/indexer" ) odf_indexed = indexerModel.transform(odf_indexed) if method_type == "onehot_encoding": if pre_existing_model: encoder = OneHotEncoder.load( model_path + "/cat_to_num_unsupervised/encoder" ) else: encoder = OneHotEncoder( inputCols=list_of_cols_idx, outputCols=list_of_cols_vec, handleInvalid="keep", ) if model_path != "NA": encoder.write().overwrite().save( model_path + "/cat_to_num_unsupervised/encoder" ) odf = encoder.fit(odf_indexed).transform(odf_indexed) new_cols = [] odf_sample = odf.take(1) for i in list_of_cols: odf_schema = odf.schema uniq_cats = odf_sample[0].asDict()[i + "_vec"].size for j in range(0, uniq_cats): odf_schema = odf_schema.add( T.StructField(i + "_" + str(j), T.IntegerType()) ) new_cols.append(i + "_" + str(j)) odf = odf.rdd.map( lambda x: ( *x, *(DenseVector(x[i + "_vec"]).toArray().astype(int).tolist()), ) ).toDF(schema=odf_schema) if output_mode == "replace": odf = odf.drop(i, i + "_vec", i + "_index") else: odf = odf.drop(i + "_vec", i + "_index") else: odf = odf_indexed for i in list_of_cols: odf = odf.withColumn( i + "_index", F.when(F.col(i).isNull(), None).otherwise( F.col(i + "_index").cast(T.IntegerType()) ), ) if output_mode == "replace": for i in list_of_cols: odf = odf.drop(i).withColumnRenamed(i + "_index", i) odf = odf.select(idf.columns) if print_impact: if method_type == "label_encoding": if output_mode == "append": new_cols = [i + "_index" for i in list_of_cols] else: new_cols = list_of_cols print("Before") idf.select(list_of_cols).summary("count", "min", "max").show(3, False) print("After") odf.select(new_cols).summary("count", "min", "max").show(3, False) if method_type == "onehot_encoding": print("Before") idf.select(list_of_cols).printSchema() print("After") if output_mode == "append": odf.select(list_of_cols + new_cols).printSchema() else: odf.select(new_cols).printSchema() if skip_cols: print( "Columns dropped from encoding due to high cardinality: " + ",".join(skip_cols) ) return odf
def expression_parser(idf, list_of_expr, postfix='', print_impact=False)
-
expression_parser can be used to evaluate a list of SQL expressions and output the result as new features. It is able to handle column names containing special characters such as “.”, “-”, “@”, “^”, etc, by converting them to “_” first before the evaluation and convert them back to the original names before returning the output dataframe.
Parameters
idf
- Input Dataframe
list_of_expr
- List of expressions to evaluate as new features e.g., ["expr1","expr2"]. Alternatively, expressions can be specified in a string format, where different expressions are separated by pipe delimiter “|” e.g., "expr1|expr2".
postfix
- postfix for new feature name.Naming convention "f" + expression_index + postfix e.g. with postfix of "new", new added features are named as f0new, f1new etc. (Default value = "")
print_impact
- True, False This argument is to print the descriptive statistics of the parsed features (Default value = False)
Returns
DataFrame
- Parsed Dataframe
Expand source code
def expression_parser(idf, list_of_expr, postfix="", print_impact=False): """ expression_parser can be used to evaluate a list of SQL expressions and output the result as new features. It is able to handle column names containing special characters such as “.”, “-”, “@”, “^”, etc, by converting them to “_” first before the evaluation and convert them back to the original names before returning the output dataframe. Parameters ---------- idf Input Dataframe list_of_expr List of expressions to evaluate as new features e.g., ["expr1","expr2"]. Alternatively, expressions can be specified in a string format, where different expressions are separated by pipe delimiter “|” e.g., "expr1|expr2". postfix postfix for new feature name.Naming convention "f" + expression_index + postfix e.g. with postfix of "new", new added features are named as f0new, f1new etc. (Default value = "") print_impact True, False This argument is to print the descriptive statistics of the parsed features (Default value = False) Returns ------- DataFrame Parsed Dataframe """ if isinstance(list_of_expr, str): list_of_expr = [x.strip() for x in list_of_expr.split("|")] special_chars = [ "&", "$", ";", ":", ",", "*", "#", "@", "?", "%", "!", "^", "(", ")", "-", "/", "'", ".", '"', ] rename_cols = [] replace_chars = {} for char in special_chars: for col in idf.columns: if char in col: rename_cols.append(col) if col in replace_chars.keys(): (replace_chars[col]).append(char) else: replace_chars[col] = [char] rename_mapping_to_new, rename_mapping_to_old = {}, {} idf_renamed = idf for col in rename_cols: new_col = col for char in replace_chars[col]: new_col = new_col.replace(char, "_") rename_mapping_to_old[new_col] = col rename_mapping_to_new[col] = new_col idf_renamed = idf_renamed.withColumnRenamed(col, new_col) list_of_expr_ = [] for expr in list_of_expr: new_expr = expr for col in rename_cols: if col in expr: new_expr = new_expr.replace(col, rename_mapping_to_new[col]) list_of_expr_.append(new_expr) list_of_expr = list_of_expr_ odf = idf_renamed new_cols = [] for index, exp in enumerate(list_of_expr): odf = odf.withColumn("f" + str(index) + postfix, F.expr(exp)) new_cols.append("f" + str(index) + postfix) for new_col, col in rename_mapping_to_old.items(): odf = odf.withColumnRenamed(new_col, col) if print_impact: print("Columns Added: ", new_cols) odf.select(new_cols).describe().show(5, False) return odf
def feature_transformation(idf, list_of_cols='all', drop_cols=[], method_type='sqrt', N=None, output_mode='replace', print_impact=False)
-
As the name indicates, feature_transformation performs mathematical transformation over selected attributes. The following methods are supported for an input attribute x: ln(x), log10(x), log2(x), e^x, 2^x, 10^x, N^x, square and cube root of x, x^2, x^3, x^N, trigonometric transformations of x (sin, cos, tan, asin, acos, atan), radians, x%N, x!, 1/x, floor and ceiling of x and x rounded to N decimal places. Some transformations only work with positive or non-negative input values such as log and square root and an error will be returned if violated.
Parameters
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
method_type
- "ln", "log10", "log2", "exp", "powOf2" (2^x), "powOf10" (10^x), "powOfN" (N^x), "sqrt" (square root), "cbrt" (cube root), "sq" (square), "cb" (cube), "toPowerN" (x^N), "sin", "cos", "tan", "asin", "acos", "atan", "radians", "remainderDivByN" (x%N), "factorial" (x!), "mul_inv" (1/x), "floor", "ceil", "roundN" (round to N decimal places) (Default value = "sqrt")
N
- None by default. If method_type is "powOfN", "toPowerN", "remainderDivByN" or "roundN", N will be used as the required constant.
output_mode
- "replace", "append".
“replace” option replaces original columns with transformed columns.
“append” option append transformed columns with a postfix (E.g. "_ln", "_powOf
") to the input dataset. (Default value = "replace") print_impact
- True, False This argument is to print before and after descriptive statistics of the transformed features (Default value = False)
Returns
DataFrame
- Transformed Dataframe
Expand source code
def feature_transformation( idf, list_of_cols="all", drop_cols=[], method_type="sqrt", N=None, output_mode="replace", print_impact=False, ): """ As the name indicates, feature_transformation performs mathematical transformation over selected attributes. The following methods are supported for an input attribute x: ln(x), log10(x), log2(x), e^x, 2^x, 10^x, N^x, square and cube root of x, x^2, x^3, x^N, trigonometric transformations of x (sin, cos, tan, asin, acos, atan), radians, x%N, x!, 1/x, floor and ceiling of x and x rounded to N decimal places. Some transformations only work with positive or non-negative input values such as log and square root and an error will be returned if violated. Parameters ---------- idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "ln", "log10", "log2", "exp", "powOf2" (2^x), "powOf10" (10^x), "powOfN" (N^x), "sqrt" (square root), "cbrt" (cube root), "sq" (square), "cb" (cube), "toPowerN" (x^N), "sin", "cos", "tan", "asin", "acos", "atan", "radians", "remainderDivByN" (x%N), "factorial" (x!), "mul_inv" (1/x), "floor", "ceil", "roundN" (round to N decimal places) (Default value = "sqrt") N None by default. If method_type is "powOfN", "toPowerN", "remainderDivByN" or "roundN", N will be used as the required constant. output_mode "replace", "append". “replace” option replaces original columns with transformed columns. “append” option append transformed columns with a postfix (E.g. "_ln", "_powOf<N>") to the input dataset. (Default value = "replace") print_impact True, False This argument is to print before and after descriptive statistics of the transformed features (Default value = False) Returns ------- DataFrame Transformed Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if (len(list_of_cols) == 0) | (any(x not in num_cols for x in list_of_cols)): raise TypeError("Invalid input for Column(s)") if method_type not in [ "ln", "log10", "log2", "exp", "powOf2", "powOf10", "powOfN", "sqrt", "cbrt", "sq", "cb", "toPowerN", "sin", "cos", "tan", "asin", "acos", "atan", "radians", "remainderDivByN", "factorial", "mul_inv", "floor", "ceil", "roundN", ]: raise TypeError("Invalid input method_type") num_cols = attributeType_segregation(idf.select(list_of_cols))[0] list_of_cols = num_cols odf = idf transformation_function = { "ln": F.log, "log10": F.log10, "log2": F.log2, "exp": F.exp, "powOf2": (lambda x: F.pow(2.0, x)), "powOf10": (lambda x: F.pow(10.0, x)), "powOfN": (lambda x: F.pow(N, x)), "sqrt": F.sqrt, "cbrt": F.cbrt, "sq": (lambda x: x**2), "cb": (lambda x: x**3), "toPowerN": (lambda x: x**N), "sin": F.sin, "cos": F.cos, "tan": F.tan, "asin": F.asin, "acos": F.acos, "atan": F.atan, "radians": F.radians, "remainderDivByN": (lambda x: x % F.lit(N)), "factorial": F.factorial, "mul_inv": (lambda x: F.lit(1) / x), "floor": F.floor, "ceil": F.ceil, "roundN": (lambda x: F.round(x, N)), } def get_col_name(i): if output_mode == "replace": return i else: if method_type in ["powOfN", "toPowerN", "remainderDivByN", "roundN"]: return i + "_" + method_type[:-1] + str(N) else: return i + "_" + method_type output_cols = [] for i in list_of_cols: modify_col = get_col_name(i) odf = odf.withColumn(modify_col, transformation_function[method_type](F.col(i))) output_cols.append(modify_col) if print_impact: print("Before:") idf.select(list_of_cols).describe().show(5, False) print("After:") odf.select(output_cols).describe().show(5, False) return odf
def imputation_MMM(spark, idf, list_of_cols='missing', drop_cols=[], method_type='median', pre_existing_model=False, model_path='NA', output_mode='replace', stats_missing={}, stats_mode={}, print_impact=False)
-
This function handles missing value related issues by substituting null values by the measure of central tendency (mode for categorical features and mean/median for numerical features). For numerical attributes, it leverages Imputer functionality of Spark MLlib. Though, Imputer can be used for categorical attributes but this feature is available only in Spark3.x, therefore for categorical features, we compute mode or leverage mode computation from Measures of Central Tendency.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
method_type
- "median", "mean" (valid only for for numerical columns attributes). Mode is only option for categorical columns. (Default value = "median")
pre_existing_model
- Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
stats_missing
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
stats_mode
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
print_impact
- True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns.
Returns
DataFrame
- Imputed Dataframe
Expand source code
def imputation_MMM( spark, idf, list_of_cols="missing", drop_cols=[], method_type="median", pre_existing_model=False, model_path="NA", output_mode="replace", stats_missing={}, stats_mode={}, print_impact=False, ): """ This function handles missing value related issues by substituting null values by the measure of central tendency (mode for categorical features and mean/median for numerical features). For numerical attributes, it leverages [Imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer .html) functionality of Spark MLlib. Though, Imputer can be used for categorical attributes but this feature is available only in Spark3.x, therefore for categorical features, we compute mode or leverage mode computation from Measures of Central Tendency. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) method_type "median", "mean" (valid only for for numerical columns attributes). Mode is only option for categorical columns. (Default value = "median") pre_existing_model Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) stats_mode Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or mode_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. Returns ------- DataFrame Imputed Dataframe """ if stats_missing == {}: missing_df = missingCount_computation(spark, idf) else: missing_df = read_dataset(spark, **stats_missing).select( "attribute", "missing_count", "missing_pct" ) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if str(pre_existing_model).lower() == "true": pre_existing_model = True elif str(pre_existing_model).lower() == "false": pre_existing_model = False else: raise TypeError("Non-Boolean input for pre_existing_model") if (len(missing_cols) == 0) & (not pre_existing_model) & (model_path == "NA"): return idf num_cols, cat_cols, other_cols = attributeType_segregation(idf) if list_of_cols == "all": list_of_cols = num_cols + cat_cols if list_of_cols == "missing": list_of_cols = [x for x in missing_cols if x in num_cols + cat_cols] if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if len(list_of_cols) == 0: warnings.warn("No Imputation performed- No column(s) to impute") return idf if any(x not in num_cols + cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if method_type not in ("mode", "mean", "median"): raise TypeError("Invalid input for method_type") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) odf = idf if len(num_cols) > 0: recast_cols = [] recast_type = [] for i in num_cols: if get_dtype(idf, i) not in ("float", "double"): odf = odf.withColumn(i, F.col(i).cast(T.DoubleType())) recast_cols.append(i + "_imputed") recast_type.append(get_dtype(idf, i)) # For mode imputation if method_type == "mode": if stats_mode == {}: parameters = [ str( ( idf.select(i) .dropna() .groupby(i) .count() .orderBy("count", ascending=False) .first() or [None] )[0] ) for i in num_cols ] else: mode_df = read_dataset(spark, **stats_mode).replace("None", None) mode_df_cols = list(mode_df.select("attribute").toPandas()["attribute"]) parameters = [] for i in num_cols: if i not in mode_df_cols: parameters.append( str( ( idf.select(i) .dropna() .groupby(i) .count() .orderBy("count", ascending=False) .first() or [None] )[0] ) ) else: parameters.append( mode_df.where(F.col("attribute") == i) .select("mode") .rdd.flatMap(list) .collect()[0] ) for index, i in enumerate(num_cols): odf = odf.withColumn( i + "_imputed", F.when(F.col(i).isNull(), parameters[index]).otherwise(F.col(i)), ) else: # For mean, median imputation # Building new imputer model or uploading the existing model if pre_existing_model: imputerModel = ImputerModel.load( model_path + "/imputation_MMM/num_imputer-model" ) else: imputer = Imputer( strategy=method_type, inputCols=num_cols, outputCols=[(e + "_imputed") for e in num_cols], ) imputerModel = imputer.fit(odf) # Applying model # odf = recast_column(imputerModel.transform(odf), recast_cols, recast_type) odf = imputerModel.transform(odf) for i, j in zip(recast_cols, recast_type): odf = odf.withColumn(i, F.col(i).cast(j)) # Saving model if required if (not pre_existing_model) & (model_path != "NA"): imputerModel.write().overwrite().save( model_path + "/imputation_MMM/num_imputer-model" ) if len(cat_cols) > 0: if pre_existing_model: df_model = spark.read.csv( model_path + "/imputation_MMM/cat_imputer", header=True, inferSchema=True, ) parameters = [] for i in cat_cols: mapped_value = ( df_model.where(F.col("attribute") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) parameters.append(mapped_value) else: if stats_mode == {}: parameters = [ str( ( idf.select(i) .dropna() .groupby(i) .count() .orderBy("count", ascending=False) .first() or [None] )[0] ) for i in cat_cols ] else: mode_df = read_dataset(spark, **stats_mode).replace("None", None) parameters = [ mode_df.where(F.col("attribute") == i) .select("mode") .rdd.flatMap(list) .collect()[0] for i in cat_cols ] for index, i in enumerate(cat_cols): odf = odf.withColumn( i + "_imputed", F.when(F.col(i).isNull(), parameters[index]).otherwise(F.col(i)), ) if (not pre_existing_model) & (model_path != "NA"): df_model = spark.createDataFrame( zip(cat_cols, parameters), schema=["attribute", "parameters"] ) df_model.repartition(1).write.csv( model_path + "/imputation_MMM/cat_imputer", header=True, mode="overwrite", ) for i in num_cols + cat_cols: if i not in missing_cols: odf = odf.drop(i + "_imputed") elif output_mode == "replace": odf = odf.drop(i).withColumnRenamed(i + "_imputed", i) if print_impact: if output_mode == "replace": odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, list_of_cols).select( "attribute", F.col("missing_count").alias("missingCount_after") ), "attribute", "inner", ) else: output_cols = [ (i + "_imputed") for i in [e for e in (num_cols + cat_cols) if e in missing_cols] ] odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, output_cols) .withColumnRenamed("attribute", "attribute_after") .withColumn( "attribute", F.expr("substring(attribute_after, 1, length(attribute_after)-8)"), ) .drop("missing_pct"), "attribute", "inner", ) odf_print.show(len(list_of_cols), False) return odf
def imputation_matrixFactorization(spark, idf, list_of_cols='missing', drop_cols=[], id_col='', output_mode='replace', stats_missing={}, print_impact=False)
-
imputation_matrixFactorization uses collaborative filtering technique to impute missing values. Collaborative filtering is commonly used in recommender systems to fill the missing user-item entries and PySpark provides an implementation using alternating least squares (ALS) algorithm, which is used in this function. To fit our problem into the ALS model, each attribute is treated as an item and an id column needs to be specified by the user to generate the user-item pairs. In the case, ID column doesn't exist in the dataset and proxu ID column is implicitly generated by the function. Subsequently, all user-item pairs with known values will be used to train the ALS model and the trained model can be used to predict the user-item pairs with missing values.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
id_col
- ID column (Default value = "")
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
stats_missing
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
print_impact
- True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns.
Returns
DataFrame
- Imputed Dataframe
Expand source code
def imputation_matrixFactorization( spark, idf, list_of_cols="missing", drop_cols=[], id_col="", output_mode="replace", stats_missing={}, print_impact=False, ): """ imputation_matrixFactorization uses collaborative filtering technique to impute missing values. Collaborative filtering is commonly used in recommender systems to fill the missing user-item entries and PySpark provides an implementation using alternating least squares (ALS) algorithm, which is used in this function. To fit our problem into the ALS model, each attribute is treated as an item and an id column needs to be specified by the user to generate the user-item pairs. In the case, ID column doesn't exist in the dataset and proxu ID column is implicitly generated by the function. Subsequently, all user-item pairs with known values will be used to train the ALS model and the trained model can be used to predict the user-item pairs with missing values. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) id_col ID column (Default value = "") output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. Returns ------- DataFrame Imputed Dataframe """ num_cols = attributeType_segregation(idf)[0] if stats_missing == {}: missing_df = missingCount_computation(spark, idf, num_cols) else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(num_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "Following columns dropped from the imputation as all values are null: " + ",".join(empty_cols) ) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .where(F.col("missing_pct") < 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if list_of_cols == "all": list_of_cols = num_cols if list_of_cols == "missing": list_of_cols = missing_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set( [ e for e in list_of_cols if (e not in drop_cols) & (e != id_col) & (e not in empty_cols) ] ) ) if (len(list_of_cols) == 0) | ( len([e for e in list_of_cols if e in missing_cols]) == 0 ): warnings.warn("No Imputation Performed - No Column(s) to Impute") return idf if len(list_of_cols) == 1: warnings.warn( "No Imputation Performed - Needs more than 1 column for matrix factorization" ) return idf if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") remove_id = False if id_col == "": idf = idf.withColumn("id", F.monotonically_increasing_id()).withColumn( "id", F.row_number().over(Window.orderBy("id")) ) id_col = "id" remove_id = True key_and_val = F.create_map( list(chain.from_iterable([[F.lit(c), F.col(c)] for c in list_of_cols])) ) df_flatten = idf.select(id_col, F.explode(key_and_val)).withColumn( "key", F.concat(F.col("key"), F.lit("_imputed")) ) id_type = get_dtype(idf, id_col) if id_type == "string": id_indexer = StringIndexer().setInputCol(id_col).setOutputCol("IDLabel") id_indexer_model = id_indexer.fit(df_flatten) df_flatten = id_indexer_model.transform(df_flatten).drop(id_col) else: df_flatten = df_flatten.withColumnRenamed(id_col, "IDLabel") indexer = StringIndexer().setInputCol("key").setOutputCol("keyLabel") indexer_model = indexer.fit(df_flatten) df_encoded = indexer_model.transform(df_flatten).drop("key") df_model = df_encoded.where(F.col("value").isNotNull()) df_test = df_encoded.where(F.col("value").isNull()) if ( df_model.select("IDLabel").distinct().count() < df_encoded.select("IDLabel").distinct().count() ): warnings.warn( "The returned odf may not be fully imputed because values for all list_of_cols are null for some IDs" ) als = ALS( maxIter=20, regParam=0.01, userCol="IDLabel", itemCol="keyLabel", ratingCol="value", coldStartStrategy="drop", ) model = als.fit(df_model) df_pred = ( model.transform(df_test).drop("value").withColumnRenamed("prediction", "value") ) df_encoded_pred = df_model.union(df_pred.select(df_model.columns)) if id_type == "string": IDlabelReverse = IndexToString().setInputCol("IDLabel").setOutputCol(id_col) df_encoded_pred = IDlabelReverse.transform(df_encoded_pred) else: df_encoded_pred = df_encoded_pred.withColumnRenamed("IDLabel", id_col) keylabelReverse = IndexToString().setInputCol("keyLabel").setOutputCol("key") odf_imputed = ( keylabelReverse.transform(df_encoded_pred) .groupBy(id_col) .pivot("key") .agg(F.first("value")) .select( [id_col] + [(i + "_imputed") for i in list_of_cols if i in missing_cols] ) ) odf = idf.join(odf_imputed, id_col, "left_outer") for i in list_of_cols: if i not in missing_cols: odf = odf.drop(i + "_imputed") elif output_mode == "replace": odf = odf.drop(i).withColumnRenamed(i + "_imputed", i) if remove_id: odf = odf.drop("id") if print_impact: if output_mode == "replace": odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, list_of_cols).select( "attribute", F.col("missing_count").alias("missingCount_after") ), "attribute", "inner", ) else: output_cols = [ (i + "_imputed") for i in [e for e in list_of_cols if e in missing_cols] ] odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, output_cols) .withColumnRenamed("attribute", "attribute_after") .withColumn( "attribute", F.expr("substring(attribute_after, 1, length(attribute_after)-8)"), ) .drop("missing_pct"), "attribute", "inner", ) odf_print.show(len(list_of_cols), False) return odf
def imputation_sklearn(spark, idf, list_of_cols='missing', drop_cols=[], missing_threshold=1.0, method_type='regression', use_sampling=True, sample_method='random', strata_cols='all', stratified_type='population', sample_size=10000, sample_seed=42, persist=True, persist_option=StorageLevel(True, True, False, False, 1), pre_existing_model=False, model_path='NA', output_mode='replace', stats_missing={}, run_type='local', auth_key='NA', print_impact=False)
-
The function "imputation_sklearn" leverages sklearn imputer algorithms. Two methods are supported via this function: “KNN” and “regression”.
“KNN” option trains a sklearn.impute.KNNImputer which is based on k-Nearest Neighbors algorithm. The missing values of a sample are imputed using the mean of its 5 nearest neighbors in the training set. “regression” option trains a sklearn.impute.IterativeImputer which models attribute to impute as a function of rest of the attributes and imputes using the estimation. Imputation is performed in an iterative way from attributes with fewest number of missing values to most. All the hyperparameters used in the above mentioned imputers are their default values.
However, sklearn imputers are not scalable, which might be slow if the size of the input dataframe is large. In fact, if the input dataframe size exceeds 10 GigaBytes, the model fitting step powered by sklearn might fail. Thus, an input sample_size (the default value is 10,000) can be set to control the number of samples to be used to train the imputer. If the total number of input dataset exceeds sample_size, the rest of the samples will be imputed using the trained imputer in a scalable manner. This is one of the way to demonstrate how Anovos has been designed as a scalable feature engineering library.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
missing_threshold
- Float argument - If list_of_cols is "missing", this argument is used to determined the missing threshold for every column. The column that has more (count of missing value/ count of total value) >= missing_threshold will be excluded from the list of columns to be imputed. (Default value = 1.0)
method_type
- "KNN", "regression". "KNN" option trains a sklearn.impute.KNNImputer. "regression" option trains a sklearn.impute.IterativeImputer (Default value = "regression")
use_sampling
- Boolean argument - True or False. This argument is used to determine whether to use sampling on source and target dataset, True will enable the use of sample method, otherwise False. It is recommended to set this as True for large datasets. (Default value = True)
sample_method
- If use_sampling is True, this argument is used to determine the sampling method. "stratified" for Stratified sampling, "random" for Random Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "random")
strata_cols
- If use_sampling is True and sample_method is "stratified", this argument is used to determine the list of columns used to be treated as strata. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "all")
stratified_type
- If use_sampling is True and sample_method is "stratified", this argument is used to determine the stratified sampling method. "population" stands for Proportionate Stratified Sampling, "balanced" stands for Optimum Stratified Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "population")
sample_size
- If use_sampling is True, this argument is used to determine maximum rows for training the sklearn imputer (Default value = 10000)
sample_seed
- If use_sampling is True, this argument is used to determine the seed of sampling method. (Default value = 42)
persist
- Boolean argument - True or False. This argument is used to determine whether to persist on binning result of source and target dataset, True will enable the use of persist, otherwise False. It is recommended to set this as True for large datasets. (Default value = True)
persist_option
- If persist is True, this argument is used to determine the type of persist. For all the pyspark.StorageLevel option available in persist, please refer to https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html (Default value = pyspark.StorageLevel.MEMORY_AND_DISK)
pre_existing_model
- Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
stats_missing
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
run_type
- "local", "emr", "databricks", "ak8s" (Default value = "local")
auth_key
- Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA"
print_impact
- True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns.
Returns
DataFrame
- Imputed Dataframe
Expand source code
def imputation_sklearn( spark, idf, list_of_cols="missing", drop_cols=[], missing_threshold=1.0, method_type="regression", use_sampling=True, sample_method="random", strata_cols="all", stratified_type="population", sample_size=10000, sample_seed=42, persist=True, persist_option=pyspark.StorageLevel.MEMORY_AND_DISK, pre_existing_model=False, model_path="NA", output_mode="replace", stats_missing={}, run_type="local", auth_key="NA", print_impact=False, ): """ The function "imputation_sklearn" leverages sklearn imputer algorithms. Two methods are supported via this function: “KNN” and “regression”. “KNN” option trains a sklearn.impute.KNNImputer which is based on k-Nearest Neighbors algorithm. The missing values of a sample are imputed using the mean of its 5 nearest neighbors in the training set. “regression” option trains a sklearn.impute.IterativeImputer which models attribute to impute as a function of rest of the attributes and imputes using the estimation. Imputation is performed in an iterative way from attributes with fewest number of missing values to most. All the hyperparameters used in the above mentioned imputers are their default values. However, sklearn imputers are not scalable, which might be slow if the size of the input dataframe is large. In fact, if the input dataframe size exceeds 10 GigaBytes, the model fitting step powered by sklearn might fail. Thus, an input sample_size (the default value is 10,000) can be set to control the number of samples to be used to train the imputer. If the total number of input dataset exceeds sample_size, the rest of the samples will be imputed using the trained imputer in a scalable manner. This is one of the way to demonstrate how Anovos has been designed as a scalable feature engineering library. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) missing_threshold Float argument - If list_of_cols is "missing", this argument is used to determined the missing threshold for every column. The column that has more (count of missing value/ count of total value) >= missing_threshold will be excluded from the list of columns to be imputed. (Default value = 1.0) method_type "KNN", "regression". "KNN" option trains a sklearn.impute.KNNImputer. "regression" option trains a sklearn.impute.IterativeImputer (Default value = "regression") use_sampling Boolean argument - True or False. This argument is used to determine whether to use sampling on source and target dataset, True will enable the use of sample method, otherwise False. It is recommended to set this as True for large datasets. (Default value = True) sample_method If use_sampling is True, this argument is used to determine the sampling method. "stratified" for Stratified sampling, "random" for Random Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "random") strata_cols If use_sampling is True and sample_method is "stratified", this argument is used to determine the list of columns used to be treated as strata. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "all") stratified_type If use_sampling is True and sample_method is "stratified", this argument is used to determine the stratified sampling method. "population" stands for Proportionate Stratified Sampling, "balanced" stands for Optimum Stratified Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "population") sample_size If use_sampling is True, this argument is used to determine maximum rows for training the sklearn imputer (Default value = 10000) sample_seed If use_sampling is True, this argument is used to determine the seed of sampling method. (Default value = 42) persist Boolean argument - True or False. This argument is used to determine whether to persist on binning result of source and target dataset, True will enable the use of persist, otherwise False. It is recommended to set this as True for large datasets. (Default value = True) persist_option If persist is True, this argument is used to determine the type of persist. For all the pyspark.StorageLevel option available in persist, please refer to https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html (Default value = pyspark.StorageLevel.MEMORY_AND_DISK) pre_existing_model Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace") stats_missing Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) run_type "local", "emr", "databricks", "ak8s" (Default value = "local") auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" print_impact True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. Returns ------- DataFrame Imputed Dataframe """ if persist: idf = idf.persist(persist_option) num_cols = attributeType_segregation(idf)[0] if stats_missing == {}: missing_df = missingCount_computation(spark, idf, num_cols) else: missing_df = ( read_dataset(spark, **stats_missing) .select("attribute", "missing_count", "missing_pct") .where(F.col("attribute").isin(num_cols)) ) empty_cols = ( missing_df.where(F.col("missing_pct") == 1.0) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if len(empty_cols) > 0: warnings.warn( "Following columns dropped from the imputation as all values are null: " + ",".join(empty_cols) ) missing_cols = ( missing_df.where(F.col("missing_count") > 0) .where(F.col("missing_pct") < missing_threshold) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) if list_of_cols == "all": list_of_cols = num_cols if list_of_cols == "missing": list_of_cols = missing_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if (e not in drop_cols) & (e not in empty_cols)]) ) if len(list_of_cols) <= 1: warnings.warn( "No Imputation Performed - No Column(s) or Insufficient Column(s) to Impute" ) return idf if str(pre_existing_model).lower() == "true": pre_existing_model = True elif str(pre_existing_model).lower() == "false": pre_existing_model = False else: raise TypeError("Non-Boolean input for pre_existing_model") if ( (len([e for e in list_of_cols if e in missing_cols]) == 0) & (not pre_existing_model) & (model_path == "NA") ): warnings.warn( "No Imputation Performed - No Column(s) to Impute and No Imputation Model to be saved" ) return idf if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if method_type not in ("KNN", "regression"): raise TypeError("Invalid input for method_type") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") if pre_existing_model: if run_type == "emr": bash_cmd = "aws s3 cp " + model_path + "/imputation_sklearn.sav ." output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) elif run_type == "ak8s": bash_cmd = ( 'azcopy cp "' + model_path + "/imputation_sklearn.sav" + str(auth_key) + '" .' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) else: imputer = pickle.load(open(model_path + "/imputation_sklearn.sav", "rb")) else: if use_sampling: count_idf = idf.count() if count_idf > sample_size: idf_model = data_sample( idf, strata_cols=strata_cols, fraction=sample_size / count_idf, method_type=sample_method, stratified_type=stratified_type, seed_value=sample_seed, ) else: idf_model = idf else: idf_model = idf if persist: idf_model = idf_model.persist(persist_option) idf_pd = idf_model.select(list_of_cols).toPandas() if method_type == "KNN": imputer = KNNImputer( n_neighbors=5, weights="uniform", metric="nan_euclidean" ) if method_type == "regression": imputer = IterativeImputer() imputer.fit(idf_pd) if (not pre_existing_model) & (model_path != "NA"): if run_type == "emr": pickle.dump(imputer, open("imputation_sklearn.sav", "wb")) bash_cmd = ( "aws s3 cp imputation_sklearn.sav " + model_path + "/imputation_sklearn.sav" ) output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) elif run_type == "ak8s": pickle.dump(imputer, open("imputation_sklearn.sav", "wb")) bash_cmd = ( 'azcopy cp "imputation_sklearn.sav" "' + ends_with(model_path) + "imputation_sklearn.sav" + str(auth_key) + '"' ) output = subprocess.check_output(["bash", "-c", bash_cmd]) imputer = pickle.load(open("imputation_sklearn.sav", "rb")) else: local_path = model_path + "/imputation_sklearn.sav" os.makedirs(os.path.dirname(local_path), exist_ok=True) pickle.dump(imputer, open(local_path, "wb")) imputer = pickle.load( open(model_path + "/imputation_sklearn.sav", "rb") ) @F.pandas_udf(returnType=T.ArrayType(T.DoubleType())) def prediction(*cols): input_pdf = pd.concat(cols, axis=1) return pd.Series(row.tolist() for row in imputer.transform(input_pdf)) result_df = idf.withColumn("features", prediction(*list_of_cols)) if persist: result_df = result_df.persist(persist_option) odf_schema = result_df.schema for i in list_of_cols: odf_schema = odf_schema.add(T.StructField(i + "_imputed", T.FloatType())) odf = ( result_df.rdd.map(lambda x: (*x, *x["features"])) .toDF(schema=odf_schema) .drop("features") ) output_cols = [] for i in list_of_cols: if output_mode == "append": if i not in missing_cols: odf = odf.drop(i + "_imputed") else: output_cols.append(i + "_imputed") else: odf = odf.drop(i).withColumnRenamed(i + "_imputed", i) odf = odf.select(idf.columns + output_cols) if print_impact: if output_mode == "replace": odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, list_of_cols).select( "attribute", F.col("missing_count").alias("missingCount_after") ), "attribute", "inner", ) else: odf_print = missing_df.select( "attribute", F.col("missing_count").alias("missingCount_before") ).join( missingCount_computation(spark, odf, output_cols) .withColumnRenamed("attribute", "attribute_after") .withColumn( "attribute", F.expr("substring(attribute_after, 1, length(attribute_after)-8)"), ) .drop("missing_pct"), "attribute", "inner", ) odf_print.show(len(list_of_cols), False) if persist: idf.unpersist() if not pre_existing_model: idf_model.unpersist() result_df.unpersist() return odf
def monotonic_binning(spark, idf, list_of_cols='all', drop_cols=[], label_col='label', event_label=1, bin_method='equal_range', bin_size=10, bin_dtype='numerical', output_mode='replace')
-
This function constitutes supervised way of binning the numerical attribute into discrete (integer or categorical values) attribute. Instead of pre-defined fixed number of bins, number of bins are dynamically computed to ensure the monotonic nature of bins i.e. % event should increase or decrease with the bin. Monotonic nature of bins is evaluated by looking at spearman rank correlation, which should be either +1 or -1, between the bin index and % event. In case, the monotonic nature is not attained, user defined fixed number of bins are used for the binning.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
label_col
- Label/Target column (Default value = "label")
event_label
- Value of (positive) event (i.e label 1) (Default value = 1)
bin_method
- "equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
bin_size
- Default number of bins in case monotonicity is not achieved.
bin_dtype
- "numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical")
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace")
Returns
DataFrame
- Binned Dataframe
Expand source code
def monotonic_binning( spark, idf, list_of_cols="all", drop_cols=[], label_col="label", event_label=1, bin_method="equal_range", bin_size=10, bin_dtype="numerical", output_mode="replace", ): """ This function constitutes supervised way of binning the numerical attribute into discrete (integer or categorical values) attribute. Instead of pre-defined fixed number of bins, number of bins are dynamically computed to ensure the monotonic nature of bins i.e. % event should increase or decrease with the bin. Monotonic nature of bins is evaluated by looking at spearman rank correlation, which should be either +1 or -1, between the bin index and % event. In case, the monotonic nature is not attained, user defined fixed number of bins are used for the binning. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) label_col Label/Target column (Default value = "label") event_label Value of (positive) event (i.e label 1) (Default value = 1) bin_method "equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range") bin_size Default number of bins in case monotonicity is not achieved. bin_dtype "numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical") output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace") Returns ------- DataFrame Binned Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + [label_col])]) ) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") odf = idf for col in list_of_cols: n = 20 r = 0 while n > 2: tmp = ( attribute_binning( spark, idf, [col], drop_cols=[], method_type=bin_method, bin_size=n, output_mode="append", ) .select(label_col, col, col + "_binned") .withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col + "_binned") .agg(F.avg(col).alias("mean_val"), F.avg(label_col).alias("mean_label")) .dropna() ) r, p = stats.spearmanr( tmp.toPandas()[["mean_val"]], tmp.toPandas()[["mean_label"]] ) if r == 1.0: odf = attribute_binning( spark, odf, [col], drop_cols=[], method_type=bin_method, bin_size=n, bin_dtype=bin_dtype, output_mode=output_mode, ) break n = n - 1 r = 0 if r < 1.0: odf = attribute_binning( spark, odf, [col], drop_cols=[], method_type=bin_method, bin_size=bin_size, bin_dtype=bin_dtype, output_mode=output_mode, ) return odf
def normalization(idf, list_of_cols='all', drop_cols=[], pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)
-
Parameters
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
pre_existing_model
- Boolean argument – True or False. True if normalization/scalar model exists already, False Otherwise (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
print_impact
- True, False (Default value = False) This argument is to print out before and after descriptive statistics of rescaled columns.
Returns
DataFrame
- Rescaled Dataframe
Expand source code
def normalization( idf, list_of_cols="all", drop_cols=[], pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Parameters ---------- idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) pre_existing_model Boolean argument – True or False. True if normalization/scalar model exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out before and after descriptive statistics of rescaled columns. Returns ------- DataFrame Rescaled Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Normalization Performed - No numerical column(s) to transform" ) return idf if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") idf_id = idf.withColumn("tempID", F.monotonically_increasing_id()) idf_partial = idf_id.select(["tempID"] + list_of_cols) assembler = VectorAssembler( inputCols=list_of_cols, outputCol="list_of_cols_vector", handleInvalid="keep" ) assembled_data = assembler.transform(idf_partial) if pre_existing_model: scalerModel = MinMaxScalerModel.load(model_path + "/normalization") else: scaler = MinMaxScaler( inputCol="list_of_cols_vector", outputCol="list_of_cols_scaled" ) scalerModel = scaler.fit(assembled_data) if model_path != "NA": scalerModel.write().overwrite().save(model_path + "/normalization") scaledData = scalerModel.transform(assembled_data) def vector_to_array(v): return v.toArray().tolist() f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType())) odf_partial = scaledData.withColumn( "list_of_cols_array", f_vector_to_array("list_of_cols_scaled") ).drop(*["list_of_cols_scaled", "list_of_cols_vector"]) odf_schema = odf_partial.schema for i in list_of_cols: odf_schema = odf_schema.add(T.StructField(i + "_scaled", T.FloatType())) odf_partial = ( odf_partial.rdd.map(lambda x: (*x, *x["list_of_cols_array"])) .toDF(schema=odf_schema) .drop("list_of_cols_array") ) odf = idf_id.join(odf_partial.drop(*list_of_cols), "tempID", "left_outer").select( idf.columns + [ ( F.when(F.isnan(F.col(i + "_scaled")), None).otherwise( F.col(i + "_scaled") ) ).alias(i + "_scaled") for i in list_of_cols ] ) if output_mode == "replace": for i in list_of_cols: odf = odf.drop(i).withColumnRenamed(i + "_scaled", i) odf = odf.select(idf.columns) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_scaled") for i in list_of_cols] print("Before: ") idf.select(list_of_cols).describe().show(5, False) print("After: ") odf.select(output_cols).describe().show(5, False) return odf
def outlier_categories(spark, idf, list_of_cols='all', drop_cols=[], coverage=1.0, max_category=50, pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)
-
This function replaces less frequently seen values (called as outlier values in the current context) in a categorical column by 'outlier_categories'. Outlier values can be defined in two ways – a) Max N categories, where N is user defined value. In this method, top N-1 frequently seen categories are considered and rest are clubbed under single category 'outlier_categories'. or Alternatively, b) Coverage – top frequently seen categories are considered till it covers minimum N% of rows and rest lesser seen values are mapped to 'outlier_categories'. Even if the Coverage is less, maximum category constraint is given priority. Further, there is a caveat that when multiple categories have same rank. Then, number of categorical values can be more than max_category defined by the user.
This function performs better when distinct values, in any column, are not more than 100. It is recommended that to drop those columns from the computation which has more than 100 distinct values, to get better performance out of this function.
Parameters
spark
- Spark Session
idf
- Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
coverage
- Defines the minimum % of rows that will be mapped to actual category name and the rest to be mapped to 'outlier_categories' and takes value between 0 to 1. Coverage of 0.8 can be interpreted as top frequently seen categories are considered till it covers minimum 80% of rows and rest lesser seen values are mapped to 'outlier_categories'. (Default value = 1.0)
max_category
- Even if coverage is less, only (max_category - 1) categories will be mapped to actual name and rest to 'outlier_categories'. Caveat is when multiple categories have same rank, then #categories can be more than max_category. (Default value = 50)
pre_existing_model
- Boolean argument – True or False. True if the model with the outlier/other values for each attribute exists already to be used, False Otherwise. (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for the pre-saved model. If pre_existing_model is False, this field can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_outliered" e.g. column X is appended as X_outliered. (Default value = "replace")
print_impact
- True, False This argument is to print before and after unique count of the transformed features (Default value = False)
Returns
DataFrame
- Transformed Dataframe
Expand source code
def outlier_categories( spark, idf, list_of_cols="all", drop_cols=[], coverage=1.0, max_category=50, pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ This function replaces less frequently seen values (called as outlier values in the current context) in a categorical column by 'outlier_categories'. Outlier values can be defined in two ways – a) Max N categories, where N is user defined value. In this method, top N-1 frequently seen categories are considered and rest are clubbed under single category 'outlier_categories'. or Alternatively, b) Coverage – top frequently seen categories are considered till it covers minimum N% of rows and rest lesser seen values are mapped to 'outlier_categories'. Even if the Coverage is less, maximum category constraint is given priority. Further, there is a caveat that when multiple categories have same rank. Then, number of categorical values can be more than max_category defined by the user. This function performs better when distinct values, in any column, are not more than 100. It is recommended that to drop those columns from the computation which has more than 100 distinct values, to get better performance out of this function. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of categorical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) coverage Defines the minimum % of rows that will be mapped to actual category name and the rest to be mapped to 'outlier_categories' and takes value between 0 to 1. Coverage of 0.8 can be interpreted as top frequently seen categories are considered till it covers minimum 80% of rows and rest lesser seen values are mapped to 'outlier_categories'. (Default value = 1.0) max_category Even if coverage is less, only (max_category - 1) categories will be mapped to actual name and rest to 'outlier_categories'. Caveat is when multiple categories have same rank, then #categories can be more than max_category. (Default value = 50) pre_existing_model Boolean argument – True or False. True if the model with the outlier/other values for each attribute exists already to be used, False Otherwise. (Default value = False) model_path If pre_existing_model is True, this argument is path for the pre-saved model. If pre_existing_model is False, this field can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_outliered" e.g. column X is appended as X_outliered. (Default value = "replace") print_impact True, False This argument is to print before and after unique count of the transformed features (Default value = False) Returns ------- DataFrame Transformed Dataframe """ cat_cols = attributeType_segregation(idf)[1] if list_of_cols == "all": list_of_cols = cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in cat_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Outlier Categories Computation - No categorical column(s) to transform" ) return idf if (coverage <= 0) | (coverage > 1): raise TypeError("Invalid input for Coverage Value") if max_category < 2: raise TypeError("Invalid input for Maximum No. of Categories Allowed") if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") idf = idf.persist(pyspark.StorageLevel.MEMORY_AND_DISK) if pre_existing_model: df_model = spark.read.csv( model_path + "/outlier_categories", header=True, inferSchema=True ) else: for index, i in enumerate(list_of_cols): window = Window.partitionBy().orderBy(F.desc("count_pct")) df_cats = ( idf.select(i) .groupBy(i) .count() .dropna() .withColumn( "count_pct", F.col("count") / F.sum("count").over(Window.partitionBy()), ) .withColumn("rank", F.rank().over(window)) .withColumn( "cumu", F.sum("count_pct").over( window.rowsBetween(Window.unboundedPreceding, 0) ), ) .withColumn("lag_cumu", F.lag("cumu").over(window)) .fillna(0) .where(~((F.col("cumu") >= coverage) & (F.col("lag_cumu") >= coverage))) .where(F.col("rank") <= (max_category - 1)) .select(F.lit(i).alias("attribute"), F.col(i).alias("parameters")) ) if index == 0: df_model = df_cats else: df_model = df_model.union(df_cats) df_params = df_model.rdd.groupByKey().mapValues(list).collect() dict_params = dict(df_params) broadcast_params = spark.sparkContext.broadcast(dict_params) def get_params(key): parameters = list() dict_params = broadcast_params.value params = dict_params.get(key) if params: parameters = params return parameters odf = idf for i in list_of_cols: parameters = get_params(i) if output_mode == "replace": odf = odf.withColumn( i, F.when( (F.col(i).isin(parameters)) | (F.col(i).isNull()), F.col(i) ).otherwise("outlier_categories"), ) else: odf = odf.withColumn( i + "_outliered", F.when( (F.col(i).isin(parameters)) | (F.col(i).isNull()), F.col(i) ).otherwise("outlier_categories"), ) if (not pre_existing_model) & (model_path != "NA"): df_model.repartition(1).write.csv( model_path + "/outlier_categories", header=True, mode="overwrite" ) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [(i + "_outliered") for i in list_of_cols] uniqueCount_computation(spark, idf, list_of_cols).select( "attribute", F.col("unique_values").alias("uniqueValues_before") ).show(len(list_of_cols), False) uniqueCount_computation(spark, odf, output_cols).select( "attribute", F.col("unique_values").alias("uniqueValues_after") ).show(len(list_of_cols), False) idf.unpersist() return odf
def z_standardization(spark, idf, list_of_cols='all', drop_cols=[], pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)
-
Standardization is commonly used in data pre-processing process. z_standardization standardizes the selected attributes of an input dataframe by normalizing each attribute to have standard deviation of 1 and mean of 0. For each attribute, the standard deviation (s) and mean (u) are calculated and a sample x will be standardized into ( x-u)/s. If the standard deviation of an attribute is 0, it will be excluded in standardization and a warning will be shown. None values will be kept as None in the output dataframe.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
pre_existing_model
- Boolean argument – True or False. True if model files (Mean/stddev for each feature) exists already, False Otherwise (Default value = False)
model_path
- If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
- "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
print_impact
- True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns.
Returns
DataFrame
- Rescaled Dataframe
Expand source code
def z_standardization( spark, idf, list_of_cols="all", drop_cols=[], pre_existing_model=False, model_path="NA", output_mode="replace", print_impact=False, ): """ Standardization is commonly used in data pre-processing process. z_standardization standardizes the selected attributes of an input dataframe by normalizing each attribute to have standard deviation of 1 and mean of 0. For each attribute, the standard deviation (s) and mean (u) are calculated and a sample x will be standardized into ( x-u)/s. If the standard deviation of an attribute is 0, it will be excluded in standardization and a warning will be shown. None values will be kept as None in the output dataframe. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all") drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = []) pre_existing_model Boolean argument – True or False. True if model files (Mean/stddev for each feature) exists already, False Otherwise (Default value = False) model_path If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one. output_mode "replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace") print_impact True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns. Returns ------- DataFrame Rescaled Dataframe """ num_cols = attributeType_segregation(idf)[0] if list_of_cols == "all": list_of_cols = num_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in num_cols for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn( "No Standardization Performed - No numerical column(s) to transform" ) return idf if output_mode not in ("replace", "append"): raise TypeError("Invalid input for output_mode") parameters = [] excluded_cols = [] if pre_existing_model: df_model = spark.read.parquet(model_path + "/z_standardization") for i in list_of_cols: mapped_value = ( df_model.where(F.col("feature") == i) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) parameters.append(mapped_value) else: for i in list_of_cols: mean, stddev = idf.select(F.mean(i), F.stddev(i)).first() parameters.append( [float(mean) if mean else None, float(stddev) if stddev else None] ) if stddev: if round(stddev, 5) == 0.0: excluded_cols.append(i) else: excluded_cols.append(i) if len(excluded_cols) > 0: warnings.warn( "The following column(s) are excluded from standardization because the standard deviation is zero:" + str(excluded_cols) ) odf = idf for index, i in enumerate(list_of_cols): if i not in excluded_cols: modify_col = (i + "_scaled") if (output_mode == "append") else i odf = odf.withColumn( modify_col, (F.col(i) - parameters[index][0]) / parameters[index][1] ) if (not pre_existing_model) & (model_path != "NA"): df_model = spark.createDataFrame( zip(list_of_cols, parameters), schema=["feature", "parameters"] ) df_model.coalesce(1).write.parquet( model_path + "/z_standardization", mode="overwrite" ) if print_impact: if output_mode == "replace": output_cols = list_of_cols else: output_cols = [ (i + "_scaled") for i in list_of_cols if i not in excluded_cols ] print("Before: ") idf.select(list_of_cols).describe().show(5, False) print("After: ") odf.select(output_cols).describe().show(5, False) return odf