Skip to content

transformers

The data transformer module supports selected pre-processing & transformation functions, such as binning, encoding, scaling, imputation, to name a few, which are required for statistics generation and quality checks. Functions supported through this modules are listed below:

  • attribute_binning
  • monotonic_binning
  • cat_to_num_unsupervised
  • cat_to_num_supervised
  • z_standardization
  • IQR_standardization
  • normalization
  • imputation_MMM
  • imputation_sklearn
  • imputation_matrixFactorization
  • auto_imputation
  • autoencoder_latentFeatures
  • PCA_latentFeatures
  • feature_transformation
  • boxcox_transformation
  • outlier_categories
  • expression_parser
Expand source code
# coding=utf-8
"""
The data transformer module supports selected pre-processing & transformation functions, such as binning, encoding,
scaling, imputation, to name a few, which are required for statistics generation and quality checks. Functions
supported through this modules are listed below:

- attribute_binning
- monotonic_binning
- cat_to_num_unsupervised
- cat_to_num_supervised
- z_standardization
- IQR_standardization
- normalization
- imputation_MMM
- imputation_sklearn
- imputation_matrixFactorization
- auto_imputation
- autoencoder_latentFeatures
- PCA_latentFeatures
- feature_transformation
- boxcox_transformation
- outlier_categories
- expression_parser

"""
import copy
import os
import pickle
import random
import subprocess
import tempfile
import warnings
from itertools import chain
import numpy as np
import pandas as pd
import pyspark
from packaging import version
from scipy import stats

if version.parse(pyspark.__version__) < version.parse("3.0.0"):
    from pyspark.ml.feature import OneHotEncoderEstimator as OneHotEncoder
else:
    from pyspark.ml.feature import OneHotEncoder

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import StringIndexerModel
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Imputer, ImputerModel, StringIndexer, IndexToString
from pyspark.ml.feature import (
    VectorAssembler,
    MinMaxScaler,
    MinMaxScalerModel,
    PCA,
    PCAModel,
)
from pyspark.ml.linalg import DenseVector

from anovos.data_analyzer.stats_generator import (
    missingCount_computation,
    uniqueCount_computation,
)
from anovos.data_ingest.data_ingest import read_dataset, recast_column
from anovos.shared.utils import attributeType_segregation, get_dtype
from ..shared.utils import platform_root_path

# enable_iterative_imputer is prequisite for importing IterativeImputer
# check the following issue for more details https://github.com/scikit-learn/scikit-learn/issues/16833
from sklearn.experimental import enable_iterative_imputer  # noqa
from sklearn.impute import KNNImputer, IterativeImputer

import tensorflow
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.layers import Dense, Input, BatchNormalization, LeakyReLU


def attribute_binning(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    method_type="equal_range",
    bin_size=10,
    bin_dtype="numerical",
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """
    Attribute binning (or discretization) is a method of numerical attribute into discrete (integer or categorical
    values) using pre-defined number of bins. This data pre-processing technique is used to reduce the effects of
    minor observation errors. Also, Binning introduces non-linearity and tends to improve the performance of the
    model. In this function, we are focussing on unsupervised way of binning i.e. without taking the target
    variable into account - Equal Range Binning, Equal Frequency Binning. In Equal Range method, each bin is of equal
    size/width and computed as:

    w = max- min / no. of bins

    *bins cutoff=[min, min+w,min+2w…..,max-w,max]*

    whereas in Equal Frequency binning method, bins are created in such a way that each bin has equal no. of rows,
    though the width of bins may vary from each other.

    w = 1 / no. of bins

    *bins cutoff=[min, wthpctile, 2wthpctile….,max ]*


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    method_type
        "equal_frequency", "equal_range".
        In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has
        equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
    bin_size
        Number of bins. (Default value = 10)
    bin_dtype
        "numerical", "categorical".
        With "numerical" option, original value is replaced with an Integer (1,2,…) and
        with "categorical" option, original replaced with a string describing min and max value allowed
        in the bin ("minval-maxval"). (Default value = "numerical")
    pre_existing_model
        Boolean argument – True or False. True if binning model exists already, False Otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print the number of categories generated for each attribute (may or may be not same as bin_size)

    Returns
    -------
    DataFrame
        Binned Dataframe
    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols

    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]

    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    if len(list_of_cols) == 0:
        warnings.warn("No Binning Performed - No numerical column(s) to transform")
        return idf

    if method_type not in ("equal_frequency", "equal_range"):
        raise TypeError("Invalid input for method_type")

    if bin_size < 2:
        raise TypeError("Invalid input for bin_size")

    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    if pre_existing_model:
        df_model = spark.read.parquet(model_path + "/attribute_binning")
        bin_cutoffs = []
        for i in list_of_cols:
            mapped_value = (
                df_model.where(F.col("attribute") == i)
                .select("parameters")
                .rdd.flatMap(lambda x: x)
                .collect()[0]
            )
            bin_cutoffs.append(mapped_value)
    else:
        if method_type == "equal_frequency":
            pctile_width = 1 / bin_size
            pctile_cutoff = []
            for j in range(1, bin_size):
                pctile_cutoff.append(j * pctile_width)
            bin_cutoffs = idf.approxQuantile(list_of_cols, pctile_cutoff, 0.01)

        else:
            bin_cutoffs = []
            for i in list_of_cols:
                max_val = (
                    idf.select(F.col(i))
                    .groupBy()
                    .max()
                    .rdd.flatMap(lambda x: x)
                    .collect()
                    + [None]
                )[0]

                min_val = (
                    idf.select(F.col(i))
                    .groupBy()
                    .min()
                    .rdd.flatMap(lambda x: x)
                    .collect()
                    + [None]
                )[0]

                bin_cutoff = []

                if max_val is not None:
                    bin_width = (max_val - min_val) / bin_size
                    for j in range(1, bin_size):
                        bin_cutoff.append(min_val + j * bin_width)
                bin_cutoffs.append(bin_cutoff)

        if model_path != "NA":
            df_model = spark.createDataFrame(
                zip(list_of_cols, bin_cutoffs), schema=["attribute", "parameters"]
            )

            df_model.write.parquet(model_path + "/attribute_binning", mode="overwrite")

    def bucket_label(value, index):
        if value is None:
            return None

        for i in range(0, len(bin_cutoffs[index])):
            if value <= bin_cutoffs[index][i]:
                if bin_dtype == "numerical":
                    return i + 1
                else:
                    if i == 0:
                        return "<= " + str(round(bin_cutoffs[index][i], 4))
                    else:
                        return (
                            str(round(bin_cutoffs[index][i - 1], 4))
                            + "-"
                            + str(round(bin_cutoffs[index][i], 4))
                        )
            else:
                next

        if bin_dtype == "numerical":
            return len(bin_cutoffs[0]) + 1
        else:
            return "> " + str(round(bin_cutoffs[index][len(bin_cutoffs[0]) - 1], 4))

    if bin_dtype == "numerical":
        f_bucket_label = F.udf(bucket_label, T.IntegerType())
    else:
        f_bucket_label = F.udf(bucket_label, T.StringType())

    odf = idf
    for idx, i in enumerate(list_of_cols):
        odf = odf.withColumn(i + "_binned", f_bucket_label(F.col(i), F.lit(idx)))

        if idx % 5 == 0:
            odf.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    if output_mode == "replace":
        for col in list_of_cols:
            odf = odf.drop(col).withColumnRenamed(col + "_binned", col)

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [(i + "_binned") for i in list_of_cols]
        uniqueCount_computation(spark, odf, output_cols).show(len(output_cols), False)
    return odf


def monotonic_binning(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    label_col="label",
    event_label=1,
    bin_method="equal_range",
    bin_size=10,
    bin_dtype="numerical",
    output_mode="replace",
):
    """
    This function constitutes supervised way of binning the numerical attribute into discrete (integer or
    categorical values) attribute. Instead of pre-defined fixed number of bins, number of bins are dynamically
    computed to ensure the monotonic nature of bins i.e. % event should increase or decrease with the bin. Monotonic
    nature of bins is evaluated by looking at spearman rank correlation, which should be either +1 or -1, between the
    bin index and % event. In case, the monotonic nature is not attained, user defined fixed number of bins are used
    for the binning.

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    label_col
        Label/Target column (Default value = "label")
    event_label
        Value of (positive) event (i.e label 1) (Default value = 1)
    bin_method
        "equal_frequency", "equal_range".
        In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has
        equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
    bin_size
        Default number of bins in case monotonicity is not achieved.
    bin_dtype
        "numerical", "categorical".
        With "numerical" option, original value is replaced with an Integer (1,2,…) and
        with "categorical" option, original replaced with a string describing min and max value allowed
        in the bin ("minval-maxval"). (Default value = "numerical")
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace")

    Returns
    -------
    DataFrame
        Binned Dataframe
    """
    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(
        set([e for e in list_of_cols if e not in (drop_cols + [label_col])])
    )

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    attribute_binning(
        spark,
        idf,
        list_of_cols="all",
        drop_cols=[],
        method_type="equal_range",
        bin_size=10,
        pre_existing_model=False,
        model_path="NA",
        output_mode="replace",
        print_impact=False,
    )

    odf = idf
    for col in list_of_cols:
        n = 20
        r = 0
        while n > 2:
            tmp = (
                attribute_binning(
                    spark,
                    idf,
                    [col],
                    drop_cols=[],
                    method_type=bin_method,
                    bin_size=n,
                    output_mode="append",
                )
                .select(label_col, col, col + "_binned")
                .withColumn(
                    label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0)
                )
                .groupBy(col + "_binned")
                .agg(F.avg(col).alias("mean_val"), F.avg(label_col).alias("mean_label"))
                .dropna()
            )
            r, p = stats.spearmanr(
                tmp.toPandas()[["mean_val"]], tmp.toPandas()[["mean_label"]]
            )
            if r == 1.0:
                odf = attribute_binning(
                    spark,
                    odf,
                    [col],
                    drop_cols=[],
                    method_type=bin_method,
                    bin_size=n,
                    bin_dtype=bin_dtype,
                    output_mode=output_mode,
                )
                break
            n = n - 1
            r = 0
        if r < 1.0:
            odf = attribute_binning(
                spark,
                odf,
                [col],
                drop_cols=[],
                method_type=bin_method,
                bin_size=bin_size,
                bin_dtype=bin_dtype,
                output_mode=output_mode,
            )

    return odf


def cat_to_num_unsupervised(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    method_type=1,
    index_order="frequencyDesc",
    cardinality_threshold=100,
    pre_existing_model=False,
    model_path="NA",
    stats_unique={},
    output_mode="replace",
    print_impact=False,
):
    """
    This is unsupervised method of converting a categorical attribute into numerical attribute(s). This is among
    the most important transformations required for any modelling exercise, as most of the machine learning
    algorithms cannot process categorical values. It covers two popular encoding techniques – label encoding &
    one-hot encoding.

    In label encoding, each categorical value is assigned a unique integer based on alphabetical or frequency
    ordering (both ascending & descending options are available – can be selected by index_order argument). One of
    the pitfalls of using this technique is that the model may learn some spurious relationship, which doesn't exist
    or might not make any logical sense in the real world settings. In one-hot encoding, every unique value in the
    attribute will be added as a feature in a form of dummy/binary attribute. However, using this method on high
    cardinality attributes can further aggravate the dimensionality issue.

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of categorical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    method_type
        1 for Label Encoding or 0 for One hot encoding.
        In label encoding, each categorical value is assigned a unique integer based on alphabetical
        or frequency ordering (both ascending & descending options are available that can be selected by index_order argument).
        In one-hot encoding, every unique value in the column will be added in a form of dummy/binary column. (Default value = 1)
    index_order
        "frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc".
        Valid only for Label Encoding method_type. (Default value = "frequencyDesc")
    cardinality_threshold
        Defines threshold to skip columns with higher cardinality values from encoding (Warning is issued). (Default value = 100)
    pre_existing_model
        Boolean argument – True or False. True if encoding model exists already, False Otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre existing model nor there is a need to save one.
    stats_unique
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or
        uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_index" for label encoding e.g. column X is appended as X_index, or
        a postfix "_{n}" for one hot encoding, n varies from 0 to unique value count e.g. column X is appended as X_0,
        X_1, X_2 (n = 3 i.e. no. of unique values for X). (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print out the change in schema (one hot encoding) or descriptive statistics (label encoding)

    Returns
    -------
    DataFrame
        Encoded Dataframe

    """

    cat_cols = attributeType_segregation(idf)[1]
    if list_of_cols == "all":
        list_of_cols = cat_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    if any(x not in cat_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    if method_type not in (0, 1):
        raise TypeError("Invalid input for method_type")
    if index_order not in (
        "frequencyDesc",
        "frequencyAsc",
        "alphabetDesc",
        "alphabetAsc",
    ):
        raise TypeError("Invalid input for Encoding Index Order")
    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    skip_cols = []
    if method_type == 0:
        if stats_unique == {}:
            skip_cols = (
                uniqueCount_computation(spark, idf, list_of_cols)
                .where(F.col("unique_values") > cardinality_threshold)
                .select("attribute")
                .rdd.flatMap(lambda x: x)
                .collect()
            )
        else:
            skip_cols = (
                read_dataset(spark, **stats_unique)
                .where(F.col("unique_values") > cardinality_threshold)
                .select("attribute")
                .rdd.flatMap(lambda x: x)
                .collect()
            )
        skip_cols = list(
            set([e for e in skip_cols if e in list_of_cols and e not in drop_cols])
        )

        if skip_cols:
            warnings.warn(
                "Columns dropped from one-hot encoding due to high cardinality: "
                + ",".join(skip_cols)
            )

    list_of_cols = list(
        set([e for e in list_of_cols if e not in drop_cols + skip_cols])
    )

    if len(list_of_cols) == 0:
        warnings.warn("No Encoding Computation - No categorical column(s) to transform")
        return idf

    list_of_cols_vec = []
    list_of_cols_idx = []
    for i in list_of_cols:
        list_of_cols_vec.append(i + "_vec")
        list_of_cols_idx.append(i + "_index")
    idf_id = idf.withColumn("tempID", F.monotonically_increasing_id())
    idf_indexed = idf_id.select(["tempID"] + list_of_cols)
    if version.parse(pyspark.__version__) < version.parse("3.0.0"):
        for idx, i in enumerate(list_of_cols):
            if pre_existing_model:
                indexerModel = StringIndexerModel.load(
                    model_path + "/cat_to_num_unsupervised/indexer-model/" + i
                )
            else:
                stringIndexer = StringIndexer(
                    inputCol=i,
                    outputCol=i + "_index",
                    stringOrderType=index_order,
                    handleInvalid="keep",
                )
                indexerModel = stringIndexer.fit(idf.select(i))

                if model_path != "NA":
                    indexerModel.write().overwrite().save(
                        model_path + "/cat_to_num_unsupervised/indexer-model/" + i
                    )

            idf_indexed = indexerModel.transform(idf_indexed)
            if idx % 5 == 0:
                idf_indexed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    else:
        if pre_existing_model:
            indexerModel = StringIndexerModel.load(
                model_path + "/cat_to_num_unsupervised/indexer"
            )
        else:
            stringIndexer = StringIndexer(
                inputCols=list_of_cols,
                outputCols=list_of_cols_idx,
                stringOrderType=index_order,
                handleInvalid="keep",
            )
            indexerModel = stringIndexer.fit(idf_indexed)
            if model_path != "NA":
                indexerModel.write().overwrite().save(
                    model_path + "/cat_to_num_unsupervised/indexer"
                )
        idf_indexed = indexerModel.transform(idf_indexed)

    odf_indexed = idf_id.join(
        idf_indexed.drop(*list_of_cols), "tempID", "left_outer"
    ).drop("tempID")
    odf_indexed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    if method_type == 0:
        if pre_existing_model:
            encoder = OneHotEncoder.load(
                model_path + "/cat_to_num_unsupervised/encoder"
            )
        else:
            encoder = OneHotEncoder(
                inputCols=list_of_cols_idx,
                outputCols=list_of_cols_vec,
                handleInvalid="keep",
            )
            if model_path != "NA":
                encoder.write().overwrite().save(
                    model_path + "/cat_to_num_unsupervised/encoder"
                )

        odf_encoded = encoder.fit(odf_indexed).transform(odf_indexed)

        odf = odf_encoded

        def vector_to_array(v):
            v = DenseVector(v)
            new_array = list([int(x) for x in v])
            return new_array

        f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.IntegerType()))

        odf_sample = odf.take(1)
        for i in list_of_cols:
            uniq_cats = odf_sample[0].asDict()[i + "_vec"].size
            odf_schema = odf.schema.add(
                T.StructField("tmp", T.ArrayType(T.IntegerType()))
            )

            for j in range(0, uniq_cats):
                odf_schema = odf_schema.add(
                    T.StructField(i + "_" + str(j), T.IntegerType())
                )

            odf = (
                odf.withColumn("tmp", f_vector_to_array(i + "_vec"))
                .rdd.map(lambda x: (*x, *x["tmp"]))
                .toDF(schema=odf_schema)
            )

            if output_mode == "replace":
                odf = odf.drop(i, i + "_vec", i + "_index", "tmp")
            else:
                odf = odf.drop(i + "_vec", i + "_index", "tmp")

    else:
        odf = odf_indexed
        for i in list_of_cols:
            odf = odf.withColumn(
                i + "_index",
                F.when(F.col(i).isNull(), None).otherwise(
                    F.col(i + "_index").cast(T.IntegerType())
                ),
            )
        if output_mode == "replace":
            for i in list_of_cols:
                odf = odf.drop(i).withColumnRenamed(i + "_index", i)
            odf = odf.select(idf.columns)

    if print_impact and method_type == 1:
        print("Before")
        idf.describe().where(F.col("summary").isin("count", "min", "max")).show(
            3, False
        )
        print("After")
        odf.describe().where(F.col("summary").isin("count", "min", "max")).show(
            3, False
        )
    if print_impact and method_type == 0:
        print("Before")
        idf.printSchema()
        print("After")
        odf.printSchema()

    return odf


def cat_to_num_supervised(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    label_col="label",
    event_label=1,
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    run_type="local",
    print_impact=False,
):
    """
    This is a supervised method to convert a categorical attribute into a numerical attribute. It takes a
    label/target column to indicate whether the event is positive or negative. For each column, the positive event
    rate for each categorical value is used as the encoded numerical value.

    For example, there are 3 distinct values in a categorical attribute X: X1, X2 and X3. Within the input dataframe, there are
    - 15 positive events and 5 negative events with X==X1;
    - 10 positive events and 40 negative events with X==X2;
    - 20 positive events and 20 negative events with X==X3.

    Thus, value X1 is mapped to 15/(15+5) = 0.75, value X2 is mapped to 10/(10+40) = 0.2 and value X3 is mapped to
    20/(20+20) = 0.5. This mapping will be applied to all values in attribute X. This encoding method can be helpful
    in avoiding creating too many dummy variables which may cause dimensionality issue and it also works with
    categorical attributes without an order or rank.

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of categorical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    label_col
        Label/Target column (Default value = "label")
    event_label
        Value of (positive) event (i.e label 1) (Default value = 1)
    pre_existing_model
        Boolean argument – True or False. True if model (original and mapped numerical value
        for each column) exists already, False Otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" is used to save the model in "intermediate_data/" folder for optimization purpose.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_encoded" e.g. column X is appended as X_encoded. (Default value = "replace")
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False (Default value = False)
        This argument is to print out the descriptive statistics of encoded columns.

    Returns
    -------
    DataFrame
        Encoded Dataframe
    """

    cat_cols = attributeType_segregation(idf)[1]
    if list_of_cols == "all":
        list_of_cols = cat_cols
    elif isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]
    list_of_cols = list(
        set([e for e in list_of_cols if (e not in drop_cols) & (e != label_col)])
    )

    if any(x not in cat_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn("No Categorical Encoding - No categorical column(s) to transform")
        return idf
    if label_col not in idf.columns:
        raise TypeError("Invalid input for Label Column")

    idf_id = idf.withColumn("tempID", F.monotonically_increasing_id())
    odf_partial = idf_id.select(["tempID"] + list_of_cols)

    for index, i in enumerate(list_of_cols):
        if pre_existing_model:
            df_tmp = spark.read.csv(
                model_path + "/cat_to_num_supervised/" + i,
                header=True,
                inferSchema=True,
            )
        else:
            df_tmp = (
                idf.withColumn(
                    label_col,
                    F.when(F.col(label_col) == event_label, "1").otherwise("0"),
                )
                .groupBy(i)
                .pivot(label_col)
                .count()
                .fillna(0)
                .withColumn(
                    i + "_encoded", F.round(F.col("1") / (F.col("1") + F.col("0")), 4)
                )
                .drop(*["1", "0"])
            )

            if run_type in list(platform_root_path.keys()):
                root_path = platform_root_path[run_type]
            else:
                root_path = ""

            if model_path == "NA":
                model_path = root_path + "intermediate_data"

            df_tmp.coalesce(1).write.csv(
                model_path + "/cat_to_num_supervised/" + i,
                header=True,
                mode="overwrite",
            )
            df_tmp = spark.read.csv(
                model_path + "/cat_to_num_supervised/" + i,
                header=True,
                inferSchema=True,
            )

        if df_tmp.count() > 1:
            odf_partial = odf_partial.join(df_tmp, i, "left_outer")
        else:
            odf_partial = odf_partial.crossJoin(df_tmp)

    odf = idf_id.join(odf_partial.drop(*list_of_cols), "tempID", "left_outer").drop(
        "tempID"
    )

    if output_mode == "replace":
        for i in list_of_cols:
            odf = odf.drop(i).withColumnRenamed(i + "_encoded", i)
        odf = odf.select(idf.columns)

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [(i + "_encoded") for i in list_of_cols]
        print("Before: ")
        idf.select(list_of_cols).describe().where(
            F.col("summary").isin("count", "min", "max")
        ).show(3, False)
        print("After: ")
        odf.select(output_cols).describe().where(
            F.col("summary").isin("count", "min", "max")
        ).show(3, False)

    return odf


def z_standardization(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """
    Standardization is commonly used in data pre-processing process. z_standardization standardizes the selected
    attributes of an input dataframe by normalizing each attribute to have standard deviation of 1 and mean of 0. For
    each attribute, the standard deviation (s) and mean (u) are calculated and a sample x will be standardized into (
    x-u)/s. If the standard deviation of an attribute is 0, it will be excluded in standardization and a warning will
    be shown. None values will be kept as None in the output dataframe.

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    pre_existing_model
        Boolean argument – True or False. True if model files (Mean/stddev for each feature) exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print out the before and after descriptive statistics of rescaled columns.

    Returns
    -------
    DataFrame
        Rescaled Dataframe

    """
    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn(
            "No Standardization Performed - No numerical column(s) to transform"
        )
        return idf

    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    parameters = []
    excluded_cols = []
    if pre_existing_model:
        df_model = spark.read.parquet(model_path + "/z_standardization")
        for i in list_of_cols:
            mapped_value = (
                df_model.where(F.col("feature") == i)
                .select("parameters")
                .rdd.flatMap(lambda x: x)
                .collect()[0]
            )
            parameters.append(mapped_value)
    else:
        for i in list_of_cols:
            mean, stddev = idf.select(F.mean(i), F.stddev(i)).first()
            parameters.append(
                [float(mean) if mean else None, float(stddev) if stddev else None]
            )
            if stddev:
                if round(stddev, 5) == 0.0:
                    excluded_cols.append(i)
            else:
                excluded_cols.append(i)
    if len(excluded_cols) > 0:
        warnings.warn(
            "The following column(s) are excluded from standardization because the standard deviation is zero:"
            + str(excluded_cols)
        )

    odf = idf
    for index, i in enumerate(list_of_cols):
        if i not in excluded_cols:
            modify_col = (i + "_scaled") if (output_mode == "append") else i
            odf = odf.withColumn(
                modify_col, (F.col(i) - parameters[index][0]) / parameters[index][1]
            )

    if (not pre_existing_model) & (model_path != "NA"):
        df_model = spark.createDataFrame(
            zip(list_of_cols, parameters), schema=["feature", "parameters"]
        )
        df_model.coalesce(1).write.parquet(
            model_path + "/z_standardization", mode="overwrite"
        )

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [
                (i + "_scaled") for i in list_of_cols if i not in excluded_cols
            ]
        print("Before: ")
        idf.select(list_of_cols).describe().show(5, False)
        print("After: ")
        odf.select(output_cols).describe().show(5, False)

    return odf


def IQR_standardization(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    pre_existing_model
        Boolean argument – True or False. True if model files (25/50/75 percentile for each feature) exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print out the before and after descriptive statistics of rescaled columns.

    Returns
    -------
    DataFrame
        Rescaled Dataframe
    """
    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn(
            "No Standardization Performed - No numerical column(s) to transform"
        )
        return idf

    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    if pre_existing_model:
        df_model = spark.read.parquet(model_path + "/IQR_standardization")
        parameters = []
        for i in list_of_cols:
            mapped_value = (
                df_model.where(F.col("feature") == i)
                .select("parameters")
                .rdd.flatMap(lambda x: x)
                .collect()[0]
            )
            parameters.append(mapped_value)
    else:
        parameters = idf.approxQuantile(list_of_cols, [0.25, 0.5, 0.75], 0.01)

    excluded_cols = []
    for i, param in zip(list_of_cols, parameters):
        if len(param) > 0:
            if round(param[0], 5) == round(param[2], 5):
                excluded_cols.append(i)
        else:
            excluded_cols.append(i)
    if len(excluded_cols) > 0:
        warnings.warn(
            "The following column(s) are excluded from standardization because the 75th and 25th percentiles are the same:"
            + str(excluded_cols)
        )

    odf = idf
    for index, i in enumerate(list_of_cols):
        if i not in excluded_cols:
            modify_col = (i + "_scaled") if (output_mode == "append") else i
            odf = odf.withColumn(
                modify_col,
                (F.col(i) - parameters[index][1])
                / (parameters[index][2] - parameters[index][0]),
            )

    if (not pre_existing_model) & (model_path != "NA"):
        df_model = spark.createDataFrame(
            zip(list_of_cols, parameters), schema=["feature", "parameters"]
        )
        df_model.coalesce(1).write.parquet(
            model_path + "/IQR_standardization", mode="overwrite"
        )

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [
                (i + "_scaled") for i in list_of_cols if i not in excluded_cols
            ]
        print("Before: ")
        idf.select(list_of_cols).describe().show(5, False)
        print("After: ")
        odf.select(output_cols).describe().show(5, False)

    return odf


def normalization(
    idf,
    list_of_cols="all",
    drop_cols=[],
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """

    Parameters
    ----------
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    pre_existing_model
        Boolean argument – True or False. True if normalization/scalar model exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print out before and after descriptive statistics of rescaled columns.

    Returns
    -------
    DataFrame
        Rescaled Dataframe

    """
    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn(
            "No Normalization Performed - No numerical column(s) to transform"
        )
        return idf

    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    idf_id = idf.withColumn("tempID", F.monotonically_increasing_id())
    idf_partial = idf_id.select(["tempID"] + list_of_cols)

    assembler = VectorAssembler(
        inputCols=list_of_cols, outputCol="list_of_cols_vector", handleInvalid="keep"
    )
    assembled_data = assembler.transform(idf_partial)
    if pre_existing_model:
        scalerModel = MinMaxScalerModel.load(model_path + "/normalization")
    else:
        scaler = MinMaxScaler(
            inputCol="list_of_cols_vector", outputCol="list_of_cols_scaled"
        )
        scalerModel = scaler.fit(assembled_data)

        if model_path != "NA":
            scalerModel.write().overwrite().save(model_path + "/normalization")

    scaledData = scalerModel.transform(assembled_data)

    def vector_to_array(v):
        return v.toArray().tolist()

    f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType()))

    odf_partial = scaledData.withColumn(
        "list_of_cols_array", f_vector_to_array("list_of_cols_scaled")
    ).drop(*["list_of_cols_scaled", "list_of_cols_vector"])

    odf_schema = odf_partial.schema
    for i in list_of_cols:
        odf_schema = odf_schema.add(T.StructField(i + "_scaled", T.FloatType()))
    odf_partial = (
        odf_partial.rdd.map(lambda x: (*x, *x["list_of_cols_array"]))
        .toDF(schema=odf_schema)
        .drop("list_of_cols_array")
    )

    odf = idf_id.join(odf_partial.drop(*list_of_cols), "tempID", "left_outer").select(
        idf.columns
        + [
            (
                F.when(F.isnan(F.col(i + "_scaled")), None).otherwise(
                    F.col(i + "_scaled")
                )
            ).alias(i + "_scaled")
            for i in list_of_cols
        ]
    )
    if output_mode == "replace":
        for i in list_of_cols:
            odf = odf.drop(i).withColumnRenamed(i + "_scaled", i)
        odf = odf.select(idf.columns)

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [(i + "_scaled") for i in list_of_cols]
        print("Before: ")
        idf.select(list_of_cols).describe().show(5, False)
        print("After: ")
        odf.select(output_cols).describe().show(5, False)

    return odf


def imputation_MMM(
    spark,
    idf,
    list_of_cols="missing",
    drop_cols=[],
    method_type="median",
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    stats_missing={},
    stats_mode={},
    print_impact=False,
):
    """
    This function handles missing value related issues by substituting null values by the measure of central
    tendency (mode for categorical features and mean/median for numerical features). For numerical attributes,
    it leverages [Imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer
    .html) functionality of Spark MLlib. Though, Imputer can be used for categorical attributes but this feature is
    available only in Spark3.x, therefore for categorical features, we compute mode or leverage mode computation from
    Measures of Central Tendency.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of columns to impute e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all (non-array) columns for analysis.
        This is super useful instead of specifying all column names manually.
        "missing" (default) can be passed to include only those columns with missing values.
        One of the usecases where "all" may be preferable over "missing" is when the user wants to save
        the imputation model for the future use e.g. a column may not have missing value in the training
        dataset but missing values may possibly appear in the prediction dataset.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    method_type
        "median", "mean" (valid only for for numerical columns attributes).
        Mode is only option for categorical columns. (Default value = "median")
    pre_existing_model
        Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    stats_mode
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on most frequently seen values i.e. if measures_of_centralTendency or
        mode_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    print_impact
        True, False (Default value = False)
        This argument is to print out before and after missing counts of imputed columns.

    Returns
    -------
    DataFrame
        Imputed Dataframe

    """
    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf)
    else:
        missing_df = read_dataset(spark, **stats_missing).select(
            "attribute", "missing_count", "missing_pct"
        )

    missing_cols = (
        missing_df.where(F.col("missing_count") > 0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    if str(pre_existing_model).lower() == "true":
        pre_existing_model = True
    elif str(pre_existing_model).lower() == "false":
        pre_existing_model = False
    else:
        raise TypeError("Non-Boolean input for pre_existing_model")

    if (len(missing_cols) == 0) & (not pre_existing_model) & (model_path == "NA"):
        return idf

    num_cols, cat_cols, other_cols = attributeType_segregation(idf)
    if list_of_cols == "all":
        list_of_cols = num_cols + cat_cols
    if list_of_cols == "missing":
        list_of_cols = [x for x in missing_cols if x in num_cols + cat_cols]
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if len(list_of_cols) == 0:
        warnings.warn("No Imputation performed- No column(s) to impute")
        return idf
    if any(x not in num_cols + cat_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if method_type not in ("mode", "mean", "median"):
        raise TypeError("Invalid input for method_type")
    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols))

    odf = idf
    if len(num_cols) > 0:
        recast_cols = []
        recast_type = []
        for i in num_cols:
            if get_dtype(idf, i) not in ("float", "double"):
                odf = odf.withColumn(i, F.col(i).cast(T.DoubleType()))
                recast_cols.append(i + "_imputed")
                recast_type.append(get_dtype(idf, i))

        # For mode imputation
        if method_type == "mode":
            if stats_mode == {}:
                parameters = [
                    str(
                        (
                            idf.select(i)
                            .dropna()
                            .groupby(i)
                            .count()
                            .orderBy("count", ascending=False)
                            .first()
                            or [None]
                        )[0]
                    )
                    for i in num_cols
                ]
            else:
                mode_df = read_dataset(spark, **stats_mode).replace("None", None)
                mode_df_cols = list(mode_df.select("attribute").toPandas()["attribute"])
                parameters = []
                for i in num_cols:
                    if i not in mode_df_cols:
                        parameters.append(
                            str(
                                (
                                    idf.select(i)
                                    .dropna()
                                    .groupby(i)
                                    .count()
                                    .orderBy("count", ascending=False)
                                    .first()
                                    or [None]
                                )[0]
                            )
                        )
                    else:
                        parameters.append(
                            mode_df.where(F.col("attribute") == i)
                            .select("mode")
                            .rdd.flatMap(list)
                            .collect()[0]
                        )

            for index, i in enumerate(num_cols):
                odf = odf.withColumn(
                    i + "_imputed",
                    F.when(F.col(i).isNull(), parameters[index]).otherwise(F.col(i)),
                )

        else:  # For mean, median imputation
            # Building new imputer model or uploading the existing model
            if pre_existing_model:
                imputerModel = ImputerModel.load(
                    model_path + "/imputation_MMM/num_imputer-model"
                )
            else:
                imputer = Imputer(
                    strategy=method_type,
                    inputCols=num_cols,
                    outputCols=[(e + "_imputed") for e in num_cols],
                )
                imputerModel = imputer.fit(odf)

            # Applying model
            # odf = recast_column(imputerModel.transform(odf), recast_cols, recast_type)
            odf = imputerModel.transform(odf)
            for i, j in zip(recast_cols, recast_type):
                odf = odf.withColumn(i, F.col(i).cast(j))

            # Saving model if required
            if (not pre_existing_model) & (model_path != "NA"):
                imputerModel.write().overwrite().save(
                    model_path + "/imputation_MMM/num_imputer-model"
                )

    if len(cat_cols) > 0:
        if pre_existing_model:
            df_model = spark.read.csv(
                model_path + "/imputation_MMM/cat_imputer",
                header=True,
                inferSchema=True,
            )
            parameters = []
            for i in cat_cols:
                mapped_value = (
                    df_model.where(F.col("attribute") == i)
                    .select("parameters")
                    .rdd.flatMap(lambda x: x)
                    .collect()[0]
                )
                parameters.append(mapped_value)
        else:
            if stats_mode == {}:
                parameters = [
                    str(
                        (
                            idf.select(i)
                            .dropna()
                            .groupby(i)
                            .count()
                            .orderBy("count", ascending=False)
                            .first()
                            or [None]
                        )[0]
                    )
                    for i in cat_cols
                ]
            else:
                mode_df = read_dataset(spark, **stats_mode).replace("None", None)
                parameters = [
                    mode_df.where(F.col("attribute") == i)
                    .select("mode")
                    .rdd.flatMap(list)
                    .collect()[0]
                    for i in cat_cols
                ]

        for index, i in enumerate(cat_cols):
            odf = odf.withColumn(
                i + "_imputed",
                F.when(F.col(i).isNull(), parameters[index]).otherwise(F.col(i)),
            )

        if (not pre_existing_model) & (model_path != "NA"):
            df_model = spark.createDataFrame(
                zip(cat_cols, parameters), schema=["attribute", "parameters"]
            )
            df_model.repartition(1).write.csv(
                model_path + "/imputation_MMM/cat_imputer",
                header=True,
                mode="overwrite",
            )

    for i in num_cols + cat_cols:
        if i not in missing_cols:
            odf = odf.drop(i + "_imputed")
        elif output_mode == "replace":
            odf = odf.drop(i).withColumnRenamed(i + "_imputed", i)

    if print_impact:
        if output_mode == "replace":
            odf_print = missing_df.select(
                "attribute", F.col("missing_count").alias("missingCount_before")
            ).join(
                missingCount_computation(spark, odf, list_of_cols).select(
                    "attribute", F.col("missing_count").alias("missingCount_after")
                ),
                "attribute",
                "inner",
            )
        else:
            output_cols = [
                (i + "_imputed")
                for i in [e for e in (num_cols + cat_cols) if e in missing_cols]
            ]
            odf_print = missing_df.select(
                "attribute", F.col("missing_count").alias("missingCount_before")
            ).join(
                missingCount_computation(spark, odf, output_cols)
                .withColumnRenamed("attribute", "attribute_after")
                .withColumn(
                    "attribute",
                    F.expr("substring(attribute_after, 1, length(attribute_after)-8)"),
                )
                .drop("missing_pct"),
                "attribute",
                "inner",
            )
        odf_print.show(len(list_of_cols), False)
    return odf


def imputation_sklearn(
    spark,
    idf,
    list_of_cols="missing",
    drop_cols=[],
    method_type="KNN",
    sample_size=500000,
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    stats_missing={},
    run_type="local",
    print_impact=False,
):
    """
    The function "imputation_sklearn" leverages sklearn imputer algorithms. Two methods are supported via this function:
    “KNN” and “regression”.

    “KNN” option trains a sklearn.impute.KNNImputer which is based on k-Nearest Neighbors algorithm. The missing
    values of a sample are imputed using the mean of its 5 nearest neighbors in the training set. “regression” option
    trains a sklearn.impute.IterativeImputer which models attribute to impute as a function of rest of the attributes
    and imputes using the estimation. Imputation is performed in an iterative way from attributes with fewest number
    of missing values to most. All the hyperparameters used in the above mentioned imputers are their default values.

    However, sklearn imputers are not scalable, which might be slow if the size of the input dataframe is large.
    Thus, an input sample_size (the default value is 500,000) can be set to control the number of samples to be used
    to train the imputer. If the total number of input dataset exceeds sample_size, the rest of the samples will be imputed
    using the trained imputer in a scalable manner. This is one of the way to demonstrate how Anovos has been
    designed as a scalable feature engineering library.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of columns to impute e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all (non-array) columns for analysis.
        This is super useful instead of specifying all column names manually.
        "missing" (default) can be passed to include only those columns with missing values.
        One of the usecases where "all" may be preferable over "missing" is when the user wants to save
        the imputation model for the future use e.g. a column may not have missing value in the training
        dataset but missing values may possibly appear in the prediction dataset.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    method_type
        "KNN", "regression".
        "KNN" option trains a sklearn.impute.KNNImputer. "regression" option trains a sklearn.impute.IterativeImputer (Default value = "KNN")
    sample_size
        Maximum rows for training the sklearn imputer (Default value = 500000)
    pre_existing_model
        Boolean argument – True or False. True if imputation model exists already, False otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False (Default value = False)
        This argument is to print out before and after missing counts of imputed columns.

    Returns
    -------
    DataFrame
        Imputed Dataframe

    """

    num_cols = attributeType_segregation(idf)[0]
    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf, num_cols)
    else:
        missing_df = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .where(F.col("attribute").isin(num_cols))
        )
    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn(
            "Following columns dropped from the imputation as all values are null: "
            + ",".join(empty_cols)
        )

    missing_cols = (
        missing_df.where(F.col("missing_count") > 0)
        .where(F.col("missing_pct") < 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    if list_of_cols == "all":
        list_of_cols = num_cols
    if list_of_cols == "missing":
        list_of_cols = missing_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(
        set([e for e in list_of_cols if (e not in drop_cols) & (e not in empty_cols)])
    )

    if len(list_of_cols) <= 1:
        warnings.warn(
            "No Imputation Performed - No Column(s) or Insufficient Column(s) to Impute"
        )
        return idf

    if str(pre_existing_model).lower() == "true":
        pre_existing_model = True
    elif str(pre_existing_model).lower() == "false":
        pre_existing_model = False
    else:
        raise TypeError("Non-Boolean input for pre_existing_model")
    if (
        (len([e for e in list_of_cols if e in missing_cols]) == 0)
        & (not pre_existing_model)
        & (model_path == "NA")
    ):
        warnings.warn(
            "No Imputation Performed - No Column(s) to Impute and No Imputation Model to be saved"
        )
        return idf

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if method_type not in ("KNN", "regression"):
        raise TypeError("Invalid input for method_type")
    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    if pre_existing_model:
        if run_type == "emr":
            bash_cmd = "aws s3 cp " + model_path + "/imputation_sklearn.sav ."
            output = subprocess.check_output(["bash", "-c", bash_cmd])
            imputer = pickle.load(open("imputation_sklearn.sav", "rb"))
        else:
            imputer = pickle.load(open(model_path + "/imputation_sklearn.sav", "rb"))
        idf_rest = idf
    else:
        sample_ratio = min(1.0, float(sample_size) / idf.count())
        idf_model = idf.sample(False, sample_ratio, 0)
        idf_pd = idf_model.select(list_of_cols).toPandas()

        if method_type == "KNN":
            imputer = KNNImputer(
                n_neighbors=5, weights="uniform", metric="nan_euclidean"
            )
        if method_type == "regression":
            imputer = IterativeImputer()
        imputer.fit(idf_pd)

        if (not pre_existing_model) & (model_path != "NA"):
            if run_type == "emr":
                pickle.dump(imputer, open("imputation_sklearn.sav", "wb"))
                bash_cmd = (
                    "aws s3 cp imputation_sklearn.sav "
                    + model_path
                    + "/imputation_sklearn.sav"
                )
                output = subprocess.check_output(["bash", "-c", bash_cmd])
                imputer = pickle.load(open("imputation_sklearn.sav", "rb"))
            else:
                local_path = model_path + "/imputation_sklearn.sav"
                os.makedirs(os.path.dirname(local_path), exist_ok=True)
                pickle.dump(imputer, open(local_path, "wb"))
                imputer = pickle.load(
                    open(model_path + "/imputation_sklearn.sav", "rb")
                )

    @F.pandas_udf(returnType=T.ArrayType(T.DoubleType()))
    def prediction(*cols):
        X = pd.concat(cols, axis=1)
        return pd.Series(row.tolist() for row in imputer.transform(X))

    odf = idf.withColumn("features", prediction(*list_of_cols))
    odf.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    odf_schema = odf.schema
    for i in list_of_cols:
        odf_schema = odf_schema.add(T.StructField(i + "_imputed", T.FloatType()))
    odf = (
        odf.rdd.map(lambda x: (*x, *x["features"]))
        .toDF(schema=odf_schema)
        .drop("features")
    )

    output_cols = []
    for i in list_of_cols:
        if output_mode == "append":
            if i not in missing_cols:
                odf = odf.drop(i + "_imputed")
            else:
                output_cols.append(i + "_imputed")
        else:
            odf = odf.drop(i).withColumnRenamed(i + "_imputed", i)
    odf = odf.select(idf.columns + output_cols)

    if print_impact:
        if output_mode == "replace":
            odf_print = missing_df.select(
                "attribute", F.col("missing_count").alias("missingCount_before")
            ).join(
                missingCount_computation(spark, odf, list_of_cols).select(
                    "attribute", F.col("missing_count").alias("missingCount_after")
                ),
                "attribute",
                "inner",
            )
        else:
            odf_print = missing_df.select(
                "attribute", F.col("missing_count").alias("missingCount_before")
            ).join(
                missingCount_computation(spark, odf, output_cols)
                .withColumnRenamed("attribute", "attribute_after")
                .withColumn(
                    "attribute",
                    F.expr("substring(attribute_after, 1, length(attribute_after)-8)"),
                )
                .drop("missing_pct"),
                "attribute",
                "inner",
            )
        odf_print.show(len(list_of_cols), False)
    return odf


def imputation_matrixFactorization(
    spark,
    idf,
    list_of_cols="missing",
    drop_cols=[],
    id_col="",
    output_mode="replace",
    stats_missing={},
    print_impact=False,
):
    """
    imputation_matrixFactorization uses collaborative filtering technique to impute missing values. Collaborative
    filtering is commonly used in recommender systems to fill the missing user-item entries and PySpark provides an
    implementation using alternating least squares (ALS) algorithm, which is used in this function. To fit our
    problem into the ALS model, each attribute is treated as an item and an id column needs to be specified by the
    user to generate the user-item pairs. In the case, ID column doesn't exist in the dataset and proxu ID column is
    implicitly generated by the function. Subsequently, all user-item pairs with known values will be used to train
    the ALS model and the trained model can be used to predict the user-item pairs with missing values.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of columns to impute e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all (non-array) columns for analysis.
        This is super useful instead of specifying all column names manually.
        "missing" (default) can be passed to include only those columns with missing values.
        One of the usecases where "all" may be preferable over "missing" is when the user wants to save
        the imputation model for the future use e.g. a column may not have missing value in the training
        dataset but missing values may possibly appear in the prediction dataset.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    id_col
        ID column (Default value = "")
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    print_impact
        True, False (Default value = False)
        This argument is to print out before and after missing counts of imputed columns.

    Returns
    -------
    DataFrame
        Imputed Dataframe

    """

    num_cols = attributeType_segregation(idf)[0]
    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf, num_cols)
    else:
        missing_df = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .where(F.col("attribute").isin(num_cols))
        )

    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn(
            "Following columns dropped from the imputation as all values are null: "
            + ",".join(empty_cols)
        )

    missing_cols = (
        missing_df.where(F.col("missing_count") > 0)
        .where(F.col("missing_pct") < 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    if list_of_cols == "all":
        list_of_cols = num_cols
    if list_of_cols == "missing":
        list_of_cols = missing_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(
        set(
            [
                e
                for e in list_of_cols
                if (e not in drop_cols) & (e != id_col) & (e not in empty_cols)
            ]
        )
    )

    if (len(list_of_cols) == 0) | (
        len([e for e in list_of_cols if e in missing_cols]) == 0
    ):
        warnings.warn("No Imputation Performed - No Column(s) to Impute")
        return idf
    if len(list_of_cols) == 1:
        warnings.warn(
            "No Imputation Performed - Needs more than 1 column for matrix factorization"
        )
        return idf
    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    remove_id = False
    if id_col == "":
        idf = idf.withColumn("id", F.monotonically_increasing_id()).withColumn(
            "id", F.row_number().over(Window.orderBy("id"))
        )
        id_col = "id"
        remove_id = True

    key_and_val = F.create_map(
        list(chain.from_iterable([[F.lit(c), F.col(c)] for c in list_of_cols]))
    )
    df_flatten = idf.select(id_col, F.explode(key_and_val)).withColumn(
        "key", F.concat(F.col("key"), F.lit("_imputed"))
    )

    id_type = get_dtype(idf, id_col)
    if id_type == "string":
        id_indexer = StringIndexer().setInputCol(id_col).setOutputCol("IDLabel")
        id_indexer_model = id_indexer.fit(df_flatten)
        df_flatten = id_indexer_model.transform(df_flatten).drop(id_col)
    else:
        df_flatten = df_flatten.withColumnRenamed(id_col, "IDLabel")

    indexer = StringIndexer().setInputCol("key").setOutputCol("keyLabel")
    indexer_model = indexer.fit(df_flatten)
    df_encoded = indexer_model.transform(df_flatten).drop("key")
    df_model = df_encoded.where(F.col("value").isNotNull())
    df_test = df_encoded.where(F.col("value").isNull())
    if (
        df_model.select("IDLabel").distinct().count()
        < df_encoded.select("IDLabel").distinct().count()
    ):
        warnings.warn(
            "The returned odf may not be fully imputed because values for all list_of_cols are null for some IDs"
        )
    als = ALS(
        maxIter=20,
        regParam=0.01,
        userCol="IDLabel",
        itemCol="keyLabel",
        ratingCol="value",
        coldStartStrategy="drop",
    )
    model = als.fit(df_model)
    df_pred = (
        model.transform(df_test).drop("value").withColumnRenamed("prediction", "value")
    )
    df_encoded_pred = df_model.union(df_pred.select(df_model.columns))

    if id_type == "string":
        IDlabelReverse = IndexToString().setInputCol("IDLabel").setOutputCol(id_col)
        df_encoded_pred = IDlabelReverse.transform(df_encoded_pred)
    else:
        df_encoded_pred = df_encoded_pred.withColumnRenamed("IDLabel", id_col)

    keylabelReverse = IndexToString().setInputCol("keyLabel").setOutputCol("key")
    odf_imputed = (
        keylabelReverse.transform(df_encoded_pred)
        .groupBy(id_col)
        .pivot("key")
        .agg(F.first("value"))
        .select(
            [id_col] + [(i + "_imputed") for i in list_of_cols if i in missing_cols]
        )
    )

    odf = idf.join(odf_imputed, id_col, "left_outer")

    for i in list_of_cols:
        if i not in missing_cols:
            odf = odf.drop(i + "_imputed")
        elif output_mode == "replace":
            odf = odf.drop(i).withColumnRenamed(i + "_imputed", i)

    if remove_id:
        odf = odf.drop("id")

    if print_impact:
        if output_mode == "replace":
            odf_print = missing_df.select(
                "attribute", F.col("missing_count").alias("missingCount_before")
            ).join(
                missingCount_computation(spark, odf, list_of_cols).select(
                    "attribute", F.col("missing_count").alias("missingCount_after")
                ),
                "attribute",
                "inner",
            )
        else:
            output_cols = [
                (i + "_imputed") for i in [e for e in list_of_cols if e in missing_cols]
            ]
            odf_print = missing_df.select(
                "attribute", F.col("missing_count").alias("missingCount_before")
            ).join(
                missingCount_computation(spark, odf, output_cols)
                .withColumnRenamed("attribute", "attribute_after")
                .withColumn(
                    "attribute",
                    F.expr("substring(attribute_after, 1, length(attribute_after)-8)"),
                )
                .drop("missing_pct"),
                "attribute",
                "inner",
            )
        odf_print.show(len(list_of_cols), False)
    return odf


def auto_imputation(
    spark,
    idf,
    list_of_cols="missing",
    drop_cols=[],
    id_col="",
    null_pct=0.1,
    stats_missing={},
    output_mode="replace",
    run_type="local",
    print_impact=True,
):
    """
    auto_imputation tests for 5 imputation methods using the other imputation functions provided in this module
    and returns the one with the best performance. The 5 methods are: (1) imputation_MMM with method_type="mean" (2)
    imputation_MMM with method_type="median" (3) imputation_sklearn with method_type="KNN" (4) imputation_sklearn
    with method_type="regression" (5) imputation_matrixFactorization

    Samples without missing values in attributes to impute are used for testing by removing some % of values and impute
    them again using the above 5 methods. RMSE/attribute_mean is used as the evaluation metric for each attribute to
    reduce the effect of unit difference among attributes. The final error of a method is calculated by the sum of (
    RMSE/attribute_mean) for all numerical attributes to impute and the method with the least error will be selected.

    The above testing is only applicable for numerical attributes. If categorical attributes are included,
    they will be automatically imputed using imputation_MMM. In addition, if there is only one numerical attribute to
    impute, only method (1) and (2) will be tested because the rest of the methods require more than one column.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of columns to impute e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all (non-array) columns for analysis.
        This is super useful instead of specifying all column names manually.
        "missing" (default) can be passed to include only those columns with missing values.
        One of the usecases where "all" may be preferable over "missing" is when the user wants to save
        the imputation model for the future use e.g. a column may not have missing value in the training
        dataset but missing values may possibly appear in the prediction dataset.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    id_col
        ID column (Default value = "")
    null_pct
        proportion of the valid input data to be replaced by None to form the test data (Default value = 0.1)
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False (Default value = False)
        This argument is to print out before and after missing counts of imputed columns. It also print the name of best
        performing imputation method along with RMSE details.

    Returns
    -------
    DataFrame
        Imputed Dataframe

    """

    if run_type in list(platform_root_path.keys()):
        root_path = platform_root_path[run_type]
    else:
        root_path = ""

    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf)
        missing_df.write.parquet(
            root_path
            + "intermediate_data/imputation_comparison/missingCount_computation",
            mode="overwrite",
        )
        stats_missing = {
            "file_path": root_path
            + "intermediate_data/imputation_comparison/missingCount_computation",
            "file_type": "parquet",
        }
    else:
        missing_df = read_dataset(spark, **stats_missing).select(
            "attribute", "missing_count", "missing_pct"
        )

    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn("Following columns have all null values: " + ",".join(empty_cols))

    missing_cols = (
        missing_df.where(F.col("missing_count") > 0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    if list_of_cols == "all":
        list_of_cols = idf.columns
    if list_of_cols == "missing":
        list_of_cols = missing_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(
        set([e for e in list_of_cols if (e not in drop_cols) & (e != id_col)])
    )
    if any(x not in idf.columns for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    del_cols = [e for e in list_of_cols if e in empty_cols]
    odf_del = idf.drop(*del_cols)

    list_of_cols = [e for e in list_of_cols if e not in empty_cols]
    num_cols, cat_cols, other_cols = attributeType_segregation(
        odf_del.select(list_of_cols)
    )
    missing_catcols = [e for e in cat_cols if e in missing_cols]
    missing_numcols = [e for e in num_cols if e in missing_cols]

    if missing_catcols:
        odf_imputed_cat = imputation_MMM(
            spark, odf_del, list_of_cols=missing_catcols, stats_missing=stats_missing
        )
    else:
        odf_imputed_cat = odf_del

    if len(missing_numcols) == 0:
        warnings.warn(
            "No Imputation Performed for numerical columns - No Column(s) to Impute"
        )
        return odf_imputed_cat

    idf_test = (
        odf_imputed_cat.dropna(subset=missing_numcols)
        .withColumn("index", F.monotonically_increasing_id())
        .withColumn("index", F.row_number().over(Window.orderBy("index")))
    )
    null_count = int(null_pct * idf_test.count())
    idf_null = idf_test
    for i in missing_numcols:
        null_index = random.sample(range(idf_test.count()), null_count)
        idf_null = idf_null.withColumn(
            i, F.when(F.col("index").isin(null_index), None).otherwise(F.col(i))
        )

    idf_null.write.parquet(
        root_path + "intermediate_data/imputation_comparison/test_dataset",
        mode="overwrite",
    )
    idf_null = spark.read.parquet(
        root_path + "intermediate_data/imputation_comparison/test_dataset"
    )

    method1 = imputation_MMM(
        spark,
        idf_null,
        list_of_cols=missing_numcols,
        method_type="mean",
        stats_missing=stats_missing,
        output_mode=output_mode,
    )
    method2 = imputation_MMM(
        spark,
        idf_null,
        list_of_cols=missing_numcols,
        method_type="median",
        stats_missing=stats_missing,
        output_mode=output_mode,
    )
    valid_methods = [method1, method2]
    if len(num_cols) > 1:
        method3 = imputation_sklearn(
            spark,
            idf_null,
            list_of_cols=num_cols,
            method_type="KNN",
            stats_missing=stats_missing,
            output_mode=output_mode,
        )
        method4 = imputation_sklearn(
            spark,
            idf_null,
            list_of_cols=num_cols,
            method_type="regression",
            stats_missing=stats_missing,
            output_mode=output_mode,
        )
        method5 = imputation_matrixFactorization(
            spark,
            idf_null,
            list_of_cols=num_cols,
            id_col=id_col,
            stats_missing=stats_missing,
            output_mode=output_mode,
        )
        valid_methods = [method1, method2, method3, method4, method5]

    nrmse_all = []
    method_all = ["MMM-mean", "MMM-median", "KNN", "regression", "matrix_factorization"]

    for index, method in enumerate(valid_methods):
        nrmse = 0
        for i in missing_numcols:
            pred_col = (i + "_imputed") if output_mode == "append" else i
            idf_joined = (
                idf_test.select("index", F.col(i).alias("val"))
                .join(
                    method.select("index", F.col(pred_col).alias("pred")),
                    "index",
                    "left_outer",
                )
                .dropna()
            )
            idf_joined = recast_column(
                idf=idf_joined,
                list_of_cols=["val", "pred"],
                list_of_dtypes=["double", "double"],
            )
            pred_mean = float(
                method.select(F.mean(pred_col)).rdd.flatMap(lambda x: x).collect()[0]
            )
            i_nrmse = (
                RegressionEvaluator(
                    metricName="rmse", labelCol="val", predictionCol="pred"
                ).evaluate(idf_joined)
            ) / abs(pred_mean)
            nrmse += i_nrmse
        nrmse_all.append(nrmse)

    min_index = nrmse_all.index(np.min(nrmse_all))
    best_method = method_all[min_index]
    odf = valid_methods[min_index]

    if print_impact:
        print(list(zip(method_all, nrmse_all)))
        print("Best Imputation Method: ", best_method)
    return odf


def autoencoder_latentFeatures(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    reduction_params=0.5,
    sample_size=500000,
    epochs=100,
    batch_size=256,
    pre_existing_model=False,
    model_path="NA",
    standardization=True,
    standardization_configs={"pre_existing_model": False, "model_path": "NA"},
    imputation=False,
    imputation_configs={"imputation_function": "imputation_MMM"},
    stats_missing={},
    output_mode="replace",
    run_type="local",
    print_impact=False,
):
    """
    Many machine learning models suffer from "the curse of dimensionality" when the number of features is too
    large. autoencoder_latentFeatures is able to reduce the dimensionality by compressing input attributes to a
    smaller number of latent features.

    To be more specific, it trains a neural network model using TensorFlow library. The neural network contains an
    encoder and a decoder, where the encoder learns to represent the input using smaller number of latent features
    controlled by the input *reduction_params* and the decoder learns to reproduce the input using the latent
    features. In the end, only the encoder will be kept and the latent features generated by the encoder will be
    added to the output dataframe.

    However, the neural network model is not trained in a scalable manner, which might not be able to handle large
    input dataframe. Thus, an input *sample_size* (the default value is 500,000) can be set to control the number of
    samples to be used to train the model. If the total number of samples exceeds *sample_size*, the rest of the
    samples will be predicted using the fitted encoder.

    Standardization is highly recommended if the input attributes are not of the same scale. Otherwise, the model
    might not converge smoothly. Inputs *standardization* and *standardization_configs* can be set accordingly to
    perform standardization within the function. In addition, if a sample contains missing values in the model input,
    the output values for its latent features will all be None. Thus data imputation is also recommended if missing
    values exist, which can be done within the function by setting inputs *imputation* and *imputation_configs*.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to encode e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    reduction_params
        Determines the number of encoded features in the result.
        If reduction_params < 1, int(reduction_params * <number of columns>)
        columns will be generated. Else, reduction_params columns will be generated. (Default value = 0.5)
    sample_size
        Maximum rows for training the autoencoder model using tensorflow. (Default value = 500000)
    epochs
        Number of epochs to train the tensorflow model. (Default value = 100)
    batch_size
        Number of samples per gradient update when fitting the tensorflow model. (Default value = 256)
    pre_existing_model
        Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    standardization
        Boolean argument – True or False. True, if the standardization required. (Default value = True)
    standardization_configs
        z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
    imputation
        Boolean argument – True or False. True, if the imputation required. (Default value = False)
    imputation_configs
        Takes input in dictionary format.
        Imputation function name is provided with key "imputation_name".
        optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed columns: latent_<col_index>.
        “append” option append transformed columns with format latent_<col_index> to the input dataset,
        e.g. latent_0, latent_1 will be appended if reduction_params=2. (Default value = "replace")
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False
        This argument is to print descriptive statistics of the latest features (Default value = False)

    Returns
    -------
    DataFrame
        Dataframe with Latent Features

    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]
    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn("No Latent Features Generated - No Column(s) to Transform")
        return idf

    if run_type in list(platform_root_path.keys()):
        root_path = platform_root_path[run_type]
    else:
        root_path = ""

    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf, list_of_cols)
        missing_df.write.parquet(
            root_path
            + "intermediate_data/autoencoder_latentFeatures/missingCount_computation",
            mode="overwrite",
        )
        stats_missing = {
            "file_path": root_path
            + "intermediate_data/autoencoder_latentFeatures/missingCount_computation",
            "file_type": "parquet",
        }
    else:
        missing_df = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .where(F.col("attribute").isin(list_of_cols))
        )

    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn(
            "The following column(s) are excluded from dimensionality reduction as all values are null: "
            + ",".join(empty_cols)
        )
        list_of_cols = [e for e in list_of_cols if e not in empty_cols]

    if standardization:
        idf_standardized = z_standardization(
            spark,
            idf,
            list_of_cols=list_of_cols,
            output_mode="append",
            **standardization_configs,
        )
        list_of_cols_scaled = [
            i + "_scaled"
            for i in list_of_cols
            if (i + "_scaled") in idf_standardized.columns
        ]
    else:
        idf_standardized = idf
        for i in list_of_cols:
            idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i))
            list_of_cols_scaled = [i + "_scaled" for i in list_of_cols]

    if imputation:
        all_functions = globals().copy()
        all_functions.update(locals())
        f = all_functions.get(imputation_configs["imputation_function"])
        args = copy.deepcopy(imputation_configs)
        args.pop("imputation_function", None)
        missing_df_scaled = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled")))
        )
        missing_df_scaled.write.parquet(
            root_path
            + "intermediate_data/autoencoder_latentFeatures/missingCount_computation_scaled",
            mode="overwrite",
        )
        stats_missing_scaled = {
            "file_path": root_path
            + "intermediate_data/autoencoder_latentFeatures/missingCount_computation_scaled",
            "file_type": "parquet",
        }
        idf_imputed = f(
            spark,
            idf_standardized,
            list_of_cols_scaled,
            stats_missing=stats_missing_scaled,
            **args,
        )
    else:
        idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled)

    n_inputs = len(list_of_cols_scaled)
    if reduction_params < 1:
        n_bottleneck = int(reduction_params * n_inputs)
    else:
        n_bottleneck = int(reduction_params)

    if pre_existing_model:
        if run_type == "emr":
            bash_cmd = (
                "aws s3 cp " + model_path + "/autoencoders_latentFeatures/encoder.h5 ."
            )
            output = subprocess.check_output(["bash", "-c", bash_cmd])
            bash_cmd = (
                "aws s3 cp " + model_path + "/autoencoders_latentFeatures/model.h5 ."
            )
            output = subprocess.check_output(["bash", "-c", bash_cmd])
            encoder = load_model("encoder.h5")
            model = load_model("model.h5")
        else:
            encoder = load_model(model_path + "/autoencoders_latentFeatures/encoder.h5")
            model = load_model(model_path + "/autoencoders_latentFeatures/model.h5")
    else:
        idf_valid = idf_imputed.select(list_of_cols_scaled)
        idf_model = idf_valid.sample(
            False, min(1.0, float(sample_size) / idf_valid.count()), 0
        )

        idf_train = idf_model.sample(False, 0.8, 0)
        idf_test = idf_model.subtract(idf_train)
        X_train = idf_train.toPandas()
        X_test = idf_test.toPandas()

        visible = Input(shape=(n_inputs,))
        e = Dense(n_inputs * 2)(visible)
        e = BatchNormalization()(e)
        e = LeakyReLU()(e)
        e = Dense(n_inputs)(e)
        e = BatchNormalization()(e)
        e = LeakyReLU()(e)
        bottleneck = Dense(n_bottleneck)(e)
        d = Dense(n_inputs)(bottleneck)
        d = BatchNormalization()(d)
        d = LeakyReLU()(d)
        d = Dense(n_inputs * 2)(d)
        d = BatchNormalization()(d)
        d = LeakyReLU()(d)
        output = Dense(n_inputs, activation="linear")(d)

        model = Model(inputs=visible, outputs=output)
        encoder = Model(inputs=visible, outputs=bottleneck)
        model.compile(optimizer="adam", loss="mse")
        history = model.fit(
            X_train,
            X_train,
            epochs=int(epochs),
            batch_size=int(batch_size),
            verbose=2,
            validation_data=(X_test, X_test),
        )

        if (not pre_existing_model) & (model_path != "NA"):
            if run_type == "emr":
                encoder.save("encoder.h5")
                model.save("model.h5")
                bash_cmd = (
                    "aws s3 cp encoder.h5 "
                    + model_path
                    + "/autoencoders_latentFeatures/encoder.h5"
                )
                output = subprocess.check_output(["bash", "-c", bash_cmd])
                bash_cmd = (
                    "aws s3 cp model.h5 "
                    + model_path
                    + "/autoencoders_latentFeatures/model.h5"
                )
                output = subprocess.check_output(["bash", "-c", bash_cmd])
            else:
                if not os.path.exists(model_path + "/autoencoders_latentFeatures/"):
                    os.makedirs(model_path + "/autoencoders_latentFeatures/")
                encoder.save(model_path + "/autoencoders_latentFeatures/encoder.h5")
                model.save(model_path + "/autoencoders_latentFeatures/model.h5")

    class ModelWrapperPickable:
        def __init__(self, model):
            self.model = model

        def __getstate__(self):
            model_str = ""
            with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
                tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True)
                model_str = fd.read()
            d = {"model_str": model_str}
            return d

        def __setstate__(self, state):
            with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
                fd.write(state["model_str"])
                fd.flush()
                self.model = tensorflow.keras.models.load_model(fd.name)

    model_wrapper = ModelWrapperPickable(encoder)

    def compute_output_pandas_udf(model_wrapper):
        @F.pandas_udf(returnType=T.ArrayType(T.DoubleType()))
        def predict_pandas_udf(*cols):
            X = pd.concat(cols, axis=1)
            return pd.Series(row.tolist() for row in model_wrapper.model.predict(X))

        return predict_pandas_udf

    odf = idf_imputed.withColumn(
        "predicted_output",
        compute_output_pandas_udf(model_wrapper)(*list_of_cols_scaled),
    )
    odf_schema = odf.schema
    for i in range(0, n_bottleneck):
        odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType()))
    odf = (
        odf.rdd.map(lambda x: (*x, *x["predicted_output"]))
        .toDF(schema=odf_schema)
        .drop("predicted_output")
    )
    if output_mode == "replace":
        odf = odf.drop(*list_of_cols)

    if print_impact:
        output_cols = ["latent_" + str(j) for j in range(0, n_bottleneck)]
        odf.select(output_cols).describe().show(5, False)

    return odf


def PCA_latentFeatures(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    explained_variance_cutoff=0.95,
    pre_existing_model=False,
    model_path="NA",
    standardization=True,
    standardization_configs={"pre_existing_model": False, "model_path": "NA"},
    imputation=False,
    imputation_configs={"imputation_function": "imputation_MMM"},
    stats_missing={},
    output_mode="replace",
    run_type="local",
    print_impact=False,
):
    """
    Similar to autoencoder_latentFeatures, PCA_latentFeatures also generates latent features which reduces the
    dimensionality of the input dataframe but through a different technique: Principal Component Analysis (PCA). PCA
    algorithm produces principal components such that it can describe most of the remaining variance and all the
    principal components are orthogonal to each other. The final number of generated principal components is
    controlled by the input *explained_variance_cutoff*. In other words, the number of selected principal components,
    k, is the minimum value such that top k components (latent features) can explain at least
    *explained_variance_cutoff* of the total variance.

    Standardization is highly recommended if the input attributes are not of the same scale. Otherwise the generated
    latent features might be dominated by the attributes with larger variance. Inputs *standardization* and
    *standardization_configs* can be set accordingly to perform standardization within the function. In addition,
    data imputation is also recommended if missing values exist, which can be done within the function by setting
    inputs *imputation* and *imputation_configs*.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to encode e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    explained_variance_cutoff
        Determines the number of latent columns in the output. If N is the smallest
        integer such that top N latent columns explain more than explained_variance_cutoff
        variance, these N columns will be selected. (Default value = 0.95)
    pre_existing_model
        Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    standardization
        Boolean argument – True or False. True, if the standardization required. (Default value = True)
    standardization_configs
        z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
    imputation
        Boolean argument – True or False. True, if the imputation required. (Default value = False)
    imputation_configs
        Takes input in dictionary format.
        Imputation function name is provided with key "imputation_name".
        optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed columns: latent_<col_index>.
        “append” option append transformed columns with format latent_<col_index> to the input dataset,
        e.g. latent_0, latent_1. (Default value = "replace")
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False
        This argument is to print descriptive statistics of the latest features (Default value = False)

    Returns
    -------
    DataFrame
        Dataframe with Latent Features

    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]
    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn("No Latent Features Generated - No Column(s) to Transform")
        return idf

    if run_type in list(platform_root_path.keys()):
        root_path = platform_root_path[run_type]
    else:
        root_path = ""

    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf, list_of_cols)
        missing_df.write.parquet(
            root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation",
            mode="overwrite",
        )
        stats_missing = {
            "file_path": root_path
            + "intermediate_data/PCA_latentFeatures/missingCount_computation",
            "file_type": "parquet",
        }
    else:
        missing_df = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .where(F.col("attribute").isin(list_of_cols))
        )

    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn(
            "The following column(s) are excluded from dimensionality reduction as all values are null: "
            + ",".join(empty_cols)
        )
        list_of_cols = [e for e in list_of_cols if e not in empty_cols]

    if standardization:
        idf_standardized = z_standardization(
            spark,
            idf,
            list_of_cols=list_of_cols,
            output_mode="append",
            **standardization_configs,
        )
        list_of_cols_scaled = [
            i + "_scaled"
            for i in list_of_cols
            if (i + "_scaled") in idf_standardized.columns
        ]
    else:
        idf_standardized = idf
        for i in list_of_cols:
            idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i))
            list_of_cols_scaled = [i + "_scaled" for i in list_of_cols]

    if imputation:
        all_functions = globals().copy()
        all_functions.update(locals())
        f = all_functions.get(imputation_configs["imputation_function"])
        args = copy.deepcopy(imputation_configs)
        args.pop("imputation_function", None)
        missing_df_scaled = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled")))
        )
        missing_df_scaled.write.parquet(
            root_path
            + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled",
            mode="overwrite",
        )
        stats_missing_scaled = {
            "file_path": root_path
            + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled",
            "file_type": "parquet",
        }
        idf_imputed = f(
            spark,
            idf_standardized,
            list_of_cols_scaled,
            stats_missing=stats_missing_scaled,
            **args,
        )
    else:
        idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled)

    idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    assembler = VectorAssembler(inputCols=list_of_cols_scaled, outputCol="features")
    assembled_data = assembler.transform(idf_imputed).drop(*list_of_cols_scaled)

    if pre_existing_model:
        pca = PCA.load(model_path + "/PCA_latentFeatures/pca_path")
        pcaModel = PCAModel.load(model_path + "/PCA_latentFeatures/pcaModel_path")
        n = pca.getK()
    else:
        pca = PCA(
            k=len(list_of_cols_scaled), inputCol="features", outputCol="features_pca"
        )
        pcaModel = pca.fit(assembled_data)
        explained_variance = 0
        for n in range(1, len(list_of_cols) + 1):
            explained_variance += pcaModel.explainedVariance[n - 1]
            if explained_variance > explained_variance_cutoff:
                break

        pca = PCA(k=n, inputCol="features", outputCol="features_pca")
        pcaModel = pca.fit(assembled_data)
        if (not pre_existing_model) & (model_path != "NA"):
            pcaModel.write().overwrite().save(
                model_path + "/PCA_latentFeatures/pcaModel_path"
            )
            pca.write().overwrite().save(model_path + "/PCA_latentFeatures/pca_path")

    def vector_to_array(v):
        return v.toArray().tolist()

    f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType()))

    odf = (
        pcaModel.transform(assembled_data)
        .withColumn("features_pca_array", f_vector_to_array("features_pca"))
        .drop(*["features", "features_pca"])
    )

    odf_schema = odf.schema
    for i in range(0, n):
        odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType()))
    odf = (
        odf.rdd.map(lambda x: (*x, *x["features_pca_array"]))
        .toDF(schema=odf_schema)
        .drop("features_pca_array")
        .replace(float("nan"), None, subset=["latent_" + str(j) for j in range(0, n)])
    )

    if output_mode == "replace":
        odf = odf.drop(*list_of_cols)

    if print_impact:
        print("Explained Variance: ", round(np.sum(pcaModel.explainedVariance[0:n]), 4))
        output_cols = ["latent_" + str(j) for j in range(0, n)]
        odf.select(output_cols).describe().show(5, False)

    return odf


def feature_transformation(
    idf,
    list_of_cols="all",
    drop_cols=[],
    method_type="sqrt",
    N=None,
    output_mode="replace",
    print_impact=False,
):
    """
    As the name indicates, feature_transformation performs mathematical transformation over selected attributes.
    The following methods are supported for an input attribute x: ln(x), log10(x), log2(x), e^x, 2^x, 10^x, N^x,
    square and cube root of x, x^2, x^3, x^N, trigonometric transformations of x (sin, cos, tan, asin, acos, atan),
    radians, x%N, x!, 1/x, floor and ceiling of x and x rounded to N decimal places. Some transformations only work
    with positive or non-negative input values such as log and square root and an error will be returned if violated.


    Parameters
    ----------
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    method_type
        "ln", "log10", "log2", "exp", "powOf2" (2^x), "powOf10" (10^x), "powOfN" (N^x),
        "sqrt" (square root), "cbrt" (cube root), "sq" (square), "cb" (cube), "toPowerN" (x^N),
        "sin", "cos", "tan", "asin", "acos", "atan", "radians",
        "remainderDivByN" (x%N), "factorial" (x!), "mul_inv" (1/x),
        "floor", "ceil", "roundN" (round to N decimal places) (Default value = "sqrt")
    N
        None by default. If method_type is "powOfN", "toPowerN", "remainderDivByN" or "roundN", N will
        be used as the required constant.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed columns.
        “append” option append transformed columns with a postfix (E.g. "_ln", "_powOf<N>")
        to the input dataset. (Default value = "replace")
    print_impact
        True, False
        This argument is to print before and after descriptive statistics of the transformed features (Default value = False)

    Returns
    -------
    DataFrame
        Transformed Dataframe

    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]
    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if (len(list_of_cols) == 0) | (any(x not in num_cols for x in list_of_cols)):
        raise TypeError("Invalid input for Column(s)")

    if method_type not in [
        "ln",
        "log10",
        "log2",
        "exp",
        "powOf2",
        "powOf10",
        "powOfN",
        "sqrt",
        "cbrt",
        "sq",
        "cb",
        "toPowerN",
        "sin",
        "cos",
        "tan",
        "asin",
        "acos",
        "atan",
        "radians",
        "remainderDivByN",
        "factorial",
        "mul_inv",
        "floor",
        "ceil",
        "roundN",
    ]:
        raise TypeError("Invalid input method_type")

    num_cols = attributeType_segregation(idf.select(list_of_cols))[0]
    list_of_cols = num_cols
    odf = idf

    transformation_function = {
        "ln": F.log,
        "log10": F.log10,
        "log2": F.log2,
        "exp": F.exp,
        "powOf2": (lambda x: F.pow(2.0, x)),
        "powOf10": (lambda x: F.pow(10.0, x)),
        "powOfN": (lambda x: F.pow(N, x)),
        "sqrt": F.sqrt,
        "cbrt": F.cbrt,
        "sq": (lambda x: x**2),
        "cb": (lambda x: x**3),
        "toPowerN": (lambda x: x**N),
        "sin": F.sin,
        "cos": F.cos,
        "tan": F.tan,
        "asin": F.asin,
        "acos": F.acos,
        "atan": F.atan,
        "radians": F.radians,
        "remainderDivByN": (lambda x: x % F.lit(N)),
        "factorial": F.factorial,
        "mul_inv": (lambda x: F.lit(1) / x),
        "floor": F.floor,
        "ceil": F.ceil,
        "roundN": (lambda x: F.round(x, N)),
    }

    def get_col_name(i):
        if output_mode == "replace":
            return i
        else:
            if method_type in ["powOfN", "toPowerN", "remainderDivByN", "roundN"]:
                return i + "_" + method_type[:-1] + str(N)
            else:
                return i + "_" + method_type

    output_cols = []
    for i in list_of_cols:
        modify_col = get_col_name(i)
        odf = odf.withColumn(modify_col, transformation_function[method_type](F.col(i)))
        output_cols.append(modify_col)

    if print_impact:
        print("Before:")
        idf.select(list_of_cols).describe().show(5, False)
        print("After:")
        odf.select(output_cols).describe().show(5, False)

    return odf


def boxcox_transformation(
    idf,
    list_of_cols="all",
    drop_cols=[],
    boxcox_lambda=None,
    output_mode="replace",
    print_impact=False,
):
    """
    Some machine learning algorithms require the input data to follow normal distributions. Thus, when the input
    data is too skewed, boxcox_transformation can be used to transform it into a more normal-like distribution. The
    transformed value of a sample x depends on a coefficient lambda:
    (1) if lambda = 0, x is transformed into log(x);
    (2) if lambda !=0, x is transformed into (x^lambda-1)/lambda.

    The value of lambda can be either specified by the user or automatically selected within the function.
    If lambda needs to be selected within the function, a range of values (0, 1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5)
    will be tested and the lambda, which optimizes the KolmogorovSmirnovTest by PySpark with the theoretical distribution being normal,
    will be used to perform the transformation. Different lambda values can be assigned to different attributes but one attribute
    can only be assigned one lambda value.


    Parameters
    ----------
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    boxcox_lambda
        Lambda value for box_cox transormation.
        If boxcox_lambda is not None, it will be directly used for the transformation. It can be a
        (1) list: each element represents a lambda value for an attribute and the length of the list
        must be the same as the number of columns to transform.
        (2) int/float: all attributes will be assigned the same lambda value.
        Else, search for the best lambda among [1,-1,0.5,-0.5,2,-2,0.25,-0.25,3,-3,4,-4,5,-5]
        for each column and apply the transformation (Default value = None)
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed columns.
        “append” option append transformed columns with a postfix "_bxcx_<lambda>"
        to the input dataset. (Default value = "replace")
    print_impact
        True, False
        This argument is to print before and after descriptive statistics of the transformed features (Default value = False)

    Returns
    -------
    DataFrame
        Transformed Dataframe

    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]
    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if (len(list_of_cols) == 0) | (any(x not in num_cols for x in list_of_cols)):
        raise TypeError("Invalid input for Column(s)")

    num_cols = attributeType_segregation(idf.select(list_of_cols))[0]
    list_of_cols = num_cols
    odf = idf
    col_mins = idf.select([F.min(i) for i in list_of_cols])
    if any([i <= 0 for i in col_mins.rdd.flatMap(lambda x: x).collect()]):
        col_mins.show(1, False)
        raise ValueError("Data must be positive")

    if boxcox_lambda is not None:
        if isinstance(boxcox_lambda, (list, tuple)):
            if len(boxcox_lambda) != len(list_of_cols):
                raise TypeError("Invalid input for boxcox_lambda")
            elif not all([isinstance(l, (float, int)) for l in boxcox_lambda]):
                raise TypeError("Invalid input for boxcox_lambda")
            else:
                boxcox_lambda_list = list(boxcox_lambda)

        elif isinstance(boxcox_lambda, (float, int)):
            boxcox_lambda_list = [boxcox_lambda] * len(list_of_cols)
        else:
            raise TypeError("Invalid input for boxcox_lambda")

    else:
        boxcox_lambda_list = []
        for i in list_of_cols:
            lambdaVal = [1, -1, 0.5, -0.5, 2, -2, 0.25, -0.25, 3, -3, 4, -4, 5, -5]
            best_pVal = 0
            for j in lambdaVal:
                pVal = Statistics.kolmogorovSmirnovTest(
                    odf.select(F.pow(F.col(i), j)).rdd.flatMap(lambda x: x), "norm"
                ).pValue
                if pVal > best_pVal:
                    best_pVal = pVal
                    best_lambdaVal = j

            pVal = Statistics.kolmogorovSmirnovTest(
                odf.select(F.log(F.col(i))).rdd.flatMap(lambda x: x), "norm"
            ).pValue
            if pVal > best_pVal:
                best_pVal = pVal
                best_lambdaVal = 0
            boxcox_lambda_list.append(best_lambdaVal)

    output_cols = []
    for i, curr_lambdaVal in zip(list_of_cols, boxcox_lambda_list):
        if curr_lambdaVal != 1:
            modify_col = (
                (i + "_bxcx_" + str(curr_lambdaVal)) if output_mode == "append" else i
            )
            output_cols.append(modify_col)
            if curr_lambdaVal == 0:
                odf = odf.withColumn(modify_col, F.log(F.col(i)))
            else:
                odf = odf.withColumn(modify_col, F.pow(F.col(i), curr_lambdaVal))
    if len(output_cols) == 0:
        warnings.warn(
            "lambdaVal for all columns are 1 so no transformation is performed and idf is returned"
        )
        return idf

    if print_impact:
        print("Transformed Columns: ", list_of_cols)
        print("Best BoxCox Parameter(s): ", boxcox_lambda_list)
        print("Before:")
        idf.select(list_of_cols).describe().unionByName(
            idf.select([F.skewness(i).alias(i) for i in list_of_cols]).withColumn(
                "summary", F.lit("skewness")
            )
        ).show(6, False)
        print("After:")
        if output_mode == "replace":
            odf.select(list_of_cols).describe().unionByName(
                odf.select([F.skewness(i).alias(i) for i in list_of_cols]).withColumn(
                    "summary", F.lit("skewness")
                )
            ).show(6, False)
        else:
            output_cols = [("`" + i + "`") for i in output_cols]
            odf.select(output_cols).describe().unionByName(
                odf.select(
                    [F.skewness(i).alias(i[1:-1]) for i in output_cols]
                ).withColumn("summary", F.lit("skewness"))
            ).show(6, False)

    return odf


def outlier_categories(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    coverage=1.0,
    max_category=50,
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """
    This function replaces less frequently seen values (called as outlier values in the current context) in a
    categorical column by 'others'. Outlier values can be defined in two ways –
    a) Max N categories, where N is used defined value. In this method, top N-1 frequently seen categories are considered
    and rest are clubbed under single category 'others'. or Alternatively,
    b) Coverage – top frequently seen categories are considered till it covers minimum N% of rows and rest lesser seen values
    are mapped to mapped to others. Even if the Coverage is less, maximum category constraint is given priority. Further,
    there is a caveat that when multiple categories have same rank. Then, number of categorical values can be more than
    max_category defined by the user.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
        list_of_cols
        List of categorical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all categorical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    coverage
        Defines the minimum % of rows that will be mapped to actual category name and the rest to be mapped
        to others and takes value between 0 to 1. Coverage of 0.8 can be interpreted as top frequently seen
        categories are considered till it covers minimum 80% of rows and rest lesser seen values are mapped to others. (Default value = 1.0)
    max_category
        Even if coverage is less, only (max_category - 1) categories will be mapped to actual name and rest to others.
        Caveat is when multiple categories have same rank, then #categories can be more than max_category. (Default value = 50)
    pre_existing_model
        Boolean argument – True or False. True if the model with the outlier/other values
        for each attribute exists already to be used, False Otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for the pre-saved model.
        If pre_existing_model is False, this field can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_outliered" e.g. column X is appended as X_outliered. (Default value = "replace")
    print_impact
        True, False
        This argument is to print before and after unique count of the transformed features (Default value = False)

    Returns
    -------
    DataFrame
        Transformed Dataframe

    """

    cat_cols = attributeType_segregation(idf)[1]
    if list_of_cols == "all":
        list_of_cols = cat_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in cat_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    if len(list_of_cols) == 0:
        warnings.warn(
            "No Outlier Categories Computation - No categorical column(s) to transform"
        )
        return idf
    if (coverage <= 0) | (coverage > 1):
        raise TypeError("Invalid input for Coverage Value")
    if max_category < 2:
        raise TypeError("Invalid input for Maximum No. of Categories Allowed")
    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    if pre_existing_model:
        df_model = spark.read.csv(
            model_path + "/outlier_categories", header=True, inferSchema=True
        )
    else:
        for index, i in enumerate(list_of_cols):
            window = Window.partitionBy().orderBy(F.desc("count_pct"))
            df_cats = (
                idf.groupBy(i)
                .count()
                .dropna()
                .withColumn(
                    "count_pct",
                    F.col("count") / F.sum("count").over(Window.partitionBy()),
                )
                .withColumn("rank", F.rank().over(window))
                .withColumn(
                    "cumu",
                    F.sum("count_pct").over(
                        window.rowsBetween(Window.unboundedPreceding, 0)
                    ),
                )
                .withColumn("lag_cumu", F.lag("cumu").over(window))
                .fillna(0)
                .where(~((F.col("cumu") >= coverage) & (F.col("lag_cumu") >= coverage)))
                .where(F.col("rank") <= (max_category - 1))
                .select(F.lit(i).alias("attribute"), F.col(i).alias("parameters"))
            )
            if index == 0:
                df_model = df_cats
            else:
                df_model = df_model.union(df_cats)

    odf = idf
    for i in list_of_cols:
        parameters = (
            df_model.where(F.col("attribute") == i)
            .select("parameters")
            .rdd.flatMap(lambda x: x)
            .collect()
        )
        if output_mode == "replace":
            odf = odf.withColumn(
                i,
                F.when(
                    (F.col(i).isin(parameters)) | (F.col(i).isNull()), F.col(i)
                ).otherwise("others"),
            )
        else:
            odf = odf.withColumn(
                i + "_outliered",
                F.when(
                    (F.col(i).isin(parameters)) | (F.col(i).isNull()), F.col(i)
                ).otherwise("others"),
            )

    if (not pre_existing_model) & (model_path != "NA"):
        df_model.repartition(1).write.csv(
            model_path + "/outlier_categories", header=True, mode="overwrite"
        )

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [(i + "_outliered") for i in list_of_cols]
        uniqueCount_computation(spark, idf, list_of_cols).select(
            "attribute", F.col("unique_values").alias("uniqueValues_before")
        ).show(len(list_of_cols), False)
        uniqueCount_computation(spark, odf, output_cols).select(
            "attribute", F.col("unique_values").alias("uniqueValues_after")
        ).show(len(list_of_cols), False)

    return odf


def expression_parser(idf, list_of_expr, postfix="", print_impact=False):
    """
    expression_parser can be used to evaluate a list of SQL expressions and output the result as new features. It
    is able to handle column names containing special characters such as “.”, “-”, “@”, “^”, etc, by converting them
    to “_” first before the evaluation and convert them back to the original names before returning the output
    dataframe.


    Parameters
    ----------
    idf
        Input Dataframe
    list_of_expr
        List of expressions to evaluate as new features e.g., ["expr1","expr2"].
        Alternatively, expressions can be specified in a string format,
        where different expressions are separated by pipe delimiter “|” e.g., "expr1|expr2".
    postfix
        postfix for new feature name.Naming convention "f" + expression_index + postfix
        e.g. with postfix of "new", new added features are named as f0new, f1new etc. (Default value = "")
    print_impact
        True, False
        This argument is to print the descriptive statistics of the parsed features (Default value = False)

    Returns
    -------
    DataFrame
        Parsed Dataframe

    """
    if isinstance(list_of_expr, str):
        list_of_expr = [x.strip() for x in list_of_expr.split("|")]

    special_chars = [
        "&",
        "$",
        ";",
        ":",
        ",",
        "*",
        "#",
        "@",
        "?",
        "%",
        "!",
        "^",
        "(",
        ")",
        "-",
        "/",
        "'",
        ".",
        '"',
    ]
    rename_cols = []
    replace_chars = {}
    for char in special_chars:
        for col in idf.columns:
            if char in col:
                rename_cols.append(col)
                if col in replace_chars.keys():
                    (replace_chars[col]).append(char)
                else:
                    replace_chars[col] = [char]

    rename_mapping_to_new, rename_mapping_to_old = {}, {}
    idf_renamed = idf
    for col in rename_cols:
        new_col = col
        for char in replace_chars[col]:
            new_col = new_col.replace(char, "_")
        rename_mapping_to_old[new_col] = col
        rename_mapping_to_new[col] = new_col

        idf_renamed = idf_renamed.withColumnRenamed(col, new_col)

    list_of_expr_ = []
    for expr in list_of_expr:
        new_expr = expr
        for col in rename_cols:
            if col in expr:
                new_expr = new_expr.replace(col, rename_mapping_to_new[col])
        list_of_expr_.append(new_expr)

    list_of_expr = list_of_expr_

    odf = idf_renamed
    new_cols = []
    for index, exp in enumerate(list_of_expr):
        odf = odf.withColumn("f" + str(index) + postfix, F.expr(exp))
        new_cols.append("f" + str(index) + postfix)

    for new_col, col in rename_mapping_to_old.items():
        odf = odf.withColumnRenamed(new_col, col)

    if print_impact:
        print("Columns Added: ", new_cols)
        odf.select(new_cols).describe().show(5, False)

    return odf

Functions

def IQR_standardization(spark, idf, list_of_cols='all', drop_cols=[], pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)

Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
pre_existing_model
Boolean argument – True or False. True if model files (25/50/75 percentile for each feature) exists already, False Otherwise (Default value = False)
model_path
If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
"replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
print_impact
True, False (Default value = False) This argument is to print out the before and after descriptive statistics of rescaled columns.

Returns

DataFrame
Rescaled Dataframe
Expand source code
def IQR_standardization(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    pre_existing_model
        Boolean argument – True or False. True if model files (25/50/75 percentile for each feature) exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_scaled" e.g. column X is appended as X_scaled. (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print out the before and after descriptive statistics of rescaled columns.

    Returns
    -------
    DataFrame
        Rescaled Dataframe
    """
    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn(
            "No Standardization Performed - No numerical column(s) to transform"
        )
        return idf

    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    if pre_existing_model:
        df_model = spark.read.parquet(model_path + "/IQR_standardization")
        parameters = []
        for i in list_of_cols:
            mapped_value = (
                df_model.where(F.col("feature") == i)
                .select("parameters")
                .rdd.flatMap(lambda x: x)
                .collect()[0]
            )
            parameters.append(mapped_value)
    else:
        parameters = idf.approxQuantile(list_of_cols, [0.25, 0.5, 0.75], 0.01)

    excluded_cols = []
    for i, param in zip(list_of_cols, parameters):
        if len(param) > 0:
            if round(param[0], 5) == round(param[2], 5):
                excluded_cols.append(i)
        else:
            excluded_cols.append(i)
    if len(excluded_cols) > 0:
        warnings.warn(
            "The following column(s) are excluded from standardization because the 75th and 25th percentiles are the same:"
            + str(excluded_cols)
        )

    odf = idf
    for index, i in enumerate(list_of_cols):
        if i not in excluded_cols:
            modify_col = (i + "_scaled") if (output_mode == "append") else i
            odf = odf.withColumn(
                modify_col,
                (F.col(i) - parameters[index][1])
                / (parameters[index][2] - parameters[index][0]),
            )

    if (not pre_existing_model) & (model_path != "NA"):
        df_model = spark.createDataFrame(
            zip(list_of_cols, parameters), schema=["feature", "parameters"]
        )
        df_model.coalesce(1).write.parquet(
            model_path + "/IQR_standardization", mode="overwrite"
        )

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [
                (i + "_scaled") for i in list_of_cols if i not in excluded_cols
            ]
        print("Before: ")
        idf.select(list_of_cols).describe().show(5, False)
        print("After: ")
        odf.select(output_cols).describe().show(5, False)

    return odf
def PCA_latentFeatures(spark, idf, list_of_cols='all', drop_cols=[], explained_variance_cutoff=0.95, pre_existing_model=False, model_path='NA', standardization=True, standardization_configs={'pre_existing_model': False, 'model_path': 'NA'}, imputation=False, imputation_configs={'imputation_function': 'imputation_MMM'}, stats_missing={}, output_mode='replace', run_type='local', print_impact=False)

Similar to autoencoder_latentFeatures, PCA_latentFeatures also generates latent features which reduces the dimensionality of the input dataframe but through a different technique: Principal Component Analysis (PCA). PCA algorithm produces principal components such that it can describe most of the remaining variance and all the principal components are orthogonal to each other. The final number of generated principal components is controlled by the input explained_variance_cutoff. In other words, the number of selected principal components, k, is the minimum value such that top k components (latent features) can explain at least explained_variance_cutoff of the total variance.

Standardization is highly recommended if the input attributes are not of the same scale. Otherwise the generated latent features might be dominated by the attributes with larger variance. Inputs standardization and standardization_configs can be set accordingly to perform standardization within the function. In addition, data imputation is also recommended if missing values exist, which can be done within the function by setting inputs imputation and imputation_configs.

Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
explained_variance_cutoff
Determines the number of latent columns in the output. If N is the smallest integer such that top N latent columns explain more than explained_variance_cutoff variance, these N columns will be selected. (Default value = 0.95)
pre_existing_model
Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
model_path
If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
standardization
Boolean argument – True or False. True, if the standardization required. (Default value = True)
standardization_configs
z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
imputation
Boolean argument – True or False. True, if the imputation required. (Default value = False)
imputation_configs
Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
stats_missing
Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
"replace", "append". “replace” option replaces original columns with transformed columns: latent_. “append” option append transformed columns with format latent_ to the input dataset, e.g. latent_0, latent_1. (Default value = "replace")
run_type
"local", "emr", "databricks" (Default value = "local")
print_impact
True, False This argument is to print descriptive statistics of the latest features (Default value = False)

Returns

DataFrame
Dataframe with Latent Features
Expand source code
def PCA_latentFeatures(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    explained_variance_cutoff=0.95,
    pre_existing_model=False,
    model_path="NA",
    standardization=True,
    standardization_configs={"pre_existing_model": False, "model_path": "NA"},
    imputation=False,
    imputation_configs={"imputation_function": "imputation_MMM"},
    stats_missing={},
    output_mode="replace",
    run_type="local",
    print_impact=False,
):
    """
    Similar to autoencoder_latentFeatures, PCA_latentFeatures also generates latent features which reduces the
    dimensionality of the input dataframe but through a different technique: Principal Component Analysis (PCA). PCA
    algorithm produces principal components such that it can describe most of the remaining variance and all the
    principal components are orthogonal to each other. The final number of generated principal components is
    controlled by the input *explained_variance_cutoff*. In other words, the number of selected principal components,
    k, is the minimum value such that top k components (latent features) can explain at least
    *explained_variance_cutoff* of the total variance.

    Standardization is highly recommended if the input attributes are not of the same scale. Otherwise the generated
    latent features might be dominated by the attributes with larger variance. Inputs *standardization* and
    *standardization_configs* can be set accordingly to perform standardization within the function. In addition,
    data imputation is also recommended if missing values exist, which can be done within the function by setting
    inputs *imputation* and *imputation_configs*.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to encode e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    explained_variance_cutoff
        Determines the number of latent columns in the output. If N is the smallest
        integer such that top N latent columns explain more than explained_variance_cutoff
        variance, these N columns will be selected. (Default value = 0.95)
    pre_existing_model
        Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    standardization
        Boolean argument – True or False. True, if the standardization required. (Default value = True)
    standardization_configs
        z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
    imputation
        Boolean argument – True or False. True, if the imputation required. (Default value = False)
    imputation_configs
        Takes input in dictionary format.
        Imputation function name is provided with key "imputation_name".
        optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed columns: latent_<col_index>.
        “append” option append transformed columns with format latent_<col_index> to the input dataset,
        e.g. latent_0, latent_1. (Default value = "replace")
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False
        This argument is to print descriptive statistics of the latest features (Default value = False)

    Returns
    -------
    DataFrame
        Dataframe with Latent Features

    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]
    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")
    if len(list_of_cols) == 0:
        warnings.warn("No Latent Features Generated - No Column(s) to Transform")
        return idf

    if run_type in list(platform_root_path.keys()):
        root_path = platform_root_path[run_type]
    else:
        root_path = ""

    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf, list_of_cols)
        missing_df.write.parquet(
            root_path + "intermediate_data/PCA_latentFeatures/missingCount_computation",
            mode="overwrite",
        )
        stats_missing = {
            "file_path": root_path
            + "intermediate_data/PCA_latentFeatures/missingCount_computation",
            "file_type": "parquet",
        }
    else:
        missing_df = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .where(F.col("attribute").isin(list_of_cols))
        )

    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn(
            "The following column(s) are excluded from dimensionality reduction as all values are null: "
            + ",".join(empty_cols)
        )
        list_of_cols = [e for e in list_of_cols if e not in empty_cols]

    if standardization:
        idf_standardized = z_standardization(
            spark,
            idf,
            list_of_cols=list_of_cols,
            output_mode="append",
            **standardization_configs,
        )
        list_of_cols_scaled = [
            i + "_scaled"
            for i in list_of_cols
            if (i + "_scaled") in idf_standardized.columns
        ]
    else:
        idf_standardized = idf
        for i in list_of_cols:
            idf_standardized = idf_standardized.withColumn(i + "_scaled", F.col(i))
            list_of_cols_scaled = [i + "_scaled" for i in list_of_cols]

    if imputation:
        all_functions = globals().copy()
        all_functions.update(locals())
        f = all_functions.get(imputation_configs["imputation_function"])
        args = copy.deepcopy(imputation_configs)
        args.pop("imputation_function", None)
        missing_df_scaled = (
            read_dataset(spark, **stats_missing)
            .select("attribute", "missing_count", "missing_pct")
            .withColumn("attribute", F.concat(F.col("attribute"), F.lit("_scaled")))
        )
        missing_df_scaled.write.parquet(
            root_path
            + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled",
            mode="overwrite",
        )
        stats_missing_scaled = {
            "file_path": root_path
            + "intermediate_data/PCA_latentFeatures/missingCount_computation_scaled",
            "file_type": "parquet",
        }
        idf_imputed = f(
            spark,
            idf_standardized,
            list_of_cols_scaled,
            stats_missing=stats_missing_scaled,
            **args,
        )
    else:
        idf_imputed = idf_standardized.dropna(subset=list_of_cols_scaled)

    idf_imputed.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    assembler = VectorAssembler(inputCols=list_of_cols_scaled, outputCol="features")
    assembled_data = assembler.transform(idf_imputed).drop(*list_of_cols_scaled)

    if pre_existing_model:
        pca = PCA.load(model_path + "/PCA_latentFeatures/pca_path")
        pcaModel = PCAModel.load(model_path + "/PCA_latentFeatures/pcaModel_path")
        n = pca.getK()
    else:
        pca = PCA(
            k=len(list_of_cols_scaled), inputCol="features", outputCol="features_pca"
        )
        pcaModel = pca.fit(assembled_data)
        explained_variance = 0
        for n in range(1, len(list_of_cols) + 1):
            explained_variance += pcaModel.explainedVariance[n - 1]
            if explained_variance > explained_variance_cutoff:
                break

        pca = PCA(k=n, inputCol="features", outputCol="features_pca")
        pcaModel = pca.fit(assembled_data)
        if (not pre_existing_model) & (model_path != "NA"):
            pcaModel.write().overwrite().save(
                model_path + "/PCA_latentFeatures/pcaModel_path"
            )
            pca.write().overwrite().save(model_path + "/PCA_latentFeatures/pca_path")

    def vector_to_array(v):
        return v.toArray().tolist()

    f_vector_to_array = F.udf(vector_to_array, T.ArrayType(T.FloatType()))

    odf = (
        pcaModel.transform(assembled_data)
        .withColumn("features_pca_array", f_vector_to_array("features_pca"))
        .drop(*["features", "features_pca"])
    )

    odf_schema = odf.schema
    for i in range(0, n):
        odf_schema = odf_schema.add(T.StructField("latent_" + str(i), T.FloatType()))
    odf = (
        odf.rdd.map(lambda x: (*x, *x["features_pca_array"]))
        .toDF(schema=odf_schema)
        .drop("features_pca_array")
        .replace(float("nan"), None, subset=["latent_" + str(j) for j in range(0, n)])
    )

    if output_mode == "replace":
        odf = odf.drop(*list_of_cols)

    if print_impact:
        print("Explained Variance: ", round(np.sum(pcaModel.explainedVariance[0:n]), 4))
        output_cols = ["latent_" + str(j) for j in range(0, n)]
        odf.select(output_cols).describe().show(5, False)

    return odf
def attribute_binning(spark, idf, list_of_cols='all', drop_cols=[], method_type='equal_range', bin_size=10, bin_dtype='numerical', pre_existing_model=False, model_path='NA', output_mode='replace', print_impact=False)

Attribute binning (or discretization) is a method of numerical attribute into discrete (integer or categorical values) using pre-defined number of bins. This data pre-processing technique is used to reduce the effects of minor observation errors. Also, Binning introduces non-linearity and tends to improve the performance of the model. In this function, we are focussing on unsupervised way of binning i.e. without taking the target variable into account - Equal Range Binning, Equal Frequency Binning. In Equal Range method, each bin is of equal size/width and computed as:

w = max- min / no. of bins

bins cutoff=[min, min+w,min+2w…..,max-w,max]

whereas in Equal Frequency binning method, bins are created in such a way that each bin has equal no. of rows, though the width of bins may vary from each other.

w = 1 / no. of bins

bins cutoff=[min, wthpctile, 2wthpctile….,max ]

Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to transform e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
method_type
"equal_frequency", "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
bin_size
Number of bins. (Default value = 10)
bin_dtype
"numerical", "categorical". With "numerical" option, original value is replaced with an Integer (1,2,…) and with "categorical" option, original replaced with a string describing min and max value allowed in the bin ("minval-maxval"). (Default value = "numerical")
pre_existing_model
Boolean argument – True or False. True if binning model exists already, False Otherwise. (Default value = False)
model_path
If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
output_mode
"replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace")
print_impact
True, False (Default value = False) This argument is to print the number of categories generated for each attribute (may or may be not same as bin_size)

Returns

DataFrame
Binned Dataframe
Expand source code
def attribute_binning(
    spark,
    idf,
    list_of_cols="all",
    drop_cols=[],
    method_type="equal_range",
    bin_size=10,
    bin_dtype="numerical",
    pre_existing_model=False,
    model_path="NA",
    output_mode="replace",
    print_impact=False,
):
    """
    Attribute binning (or discretization) is a method of numerical attribute into discrete (integer or categorical
    values) using pre-defined number of bins. This data pre-processing technique is used to reduce the effects of
    minor observation errors. Also, Binning introduces non-linearity and tends to improve the performance of the
    model. In this function, we are focussing on unsupervised way of binning i.e. without taking the target
    variable into account - Equal Range Binning, Equal Frequency Binning. In Equal Range method, each bin is of equal
    size/width and computed as:

    w = max- min / no. of bins

    *bins cutoff=[min, min+w,min+2w…..,max-w,max]*

    whereas in Equal Frequency binning method, bins are created in such a way that each bin has equal no. of rows,
    though the width of bins may vary from each other.

    w = 1 / no. of bins

    *bins cutoff=[min, wthpctile, 2wthpctile….,max ]*


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of numerical columns to transform e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    method_type
        "equal_frequency", "equal_range".
        In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has
        equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
    bin_size
        Number of bins. (Default value = 10)
    bin_dtype
        "numerical", "categorical".
        With "numerical" option, original value is replaced with an Integer (1,2,…) and
        with "categorical" option, original replaced with a string describing min and max value allowed
        in the bin ("minval-maxval"). (Default value = "numerical")
    pre_existing_model
        Boolean argument – True or False. True if binning model exists already, False Otherwise. (Default value = False)
    model_path
        If pre_existing_model is True, this argument is path for referring the pre-saved model.
        If pre_existing_model is False, this argument can be used for saving the model.
        Default "NA" means there is neither pre-existing model nor there is a need to save one.
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_binned" e.g. column X is appended as X_binned. (Default value = "replace")
    print_impact
        True, False (Default value = False)
        This argument is to print the number of categories generated for each attribute (may or may be not same as bin_size)

    Returns
    -------
    DataFrame
        Binned Dataframe
    """

    num_cols = attributeType_segregation(idf)[0]
    if list_of_cols == "all":
        list_of_cols = num_cols

    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]

    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols]))

    if any(x not in num_cols for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    if len(list_of_cols) == 0:
        warnings.warn("No Binning Performed - No numerical column(s) to transform")
        return idf

    if method_type not in ("equal_frequency", "equal_range"):
        raise TypeError("Invalid input for method_type")

    if bin_size < 2:
        raise TypeError("Invalid input for bin_size")

    if output_mode not in ("replace", "append"):
        raise TypeError("Invalid input for output_mode")

    if pre_existing_model:
        df_model = spark.read.parquet(model_path + "/attribute_binning")
        bin_cutoffs = []
        for i in list_of_cols:
            mapped_value = (
                df_model.where(F.col("attribute") == i)
                .select("parameters")
                .rdd.flatMap(lambda x: x)
                .collect()[0]
            )
            bin_cutoffs.append(mapped_value)
    else:
        if method_type == "equal_frequency":
            pctile_width = 1 / bin_size
            pctile_cutoff = []
            for j in range(1, bin_size):
                pctile_cutoff.append(j * pctile_width)
            bin_cutoffs = idf.approxQuantile(list_of_cols, pctile_cutoff, 0.01)

        else:
            bin_cutoffs = []
            for i in list_of_cols:
                max_val = (
                    idf.select(F.col(i))
                    .groupBy()
                    .max()
                    .rdd.flatMap(lambda x: x)
                    .collect()
                    + [None]
                )[0]

                min_val = (
                    idf.select(F.col(i))
                    .groupBy()
                    .min()
                    .rdd.flatMap(lambda x: x)
                    .collect()
                    + [None]
                )[0]

                bin_cutoff = []

                if max_val is not None:
                    bin_width = (max_val - min_val) / bin_size
                    for j in range(1, bin_size):
                        bin_cutoff.append(min_val + j * bin_width)
                bin_cutoffs.append(bin_cutoff)

        if model_path != "NA":
            df_model = spark.createDataFrame(
                zip(list_of_cols, bin_cutoffs), schema=["attribute", "parameters"]
            )

            df_model.write.parquet(model_path + "/attribute_binning", mode="overwrite")

    def bucket_label(value, index):
        if value is None:
            return None

        for i in range(0, len(bin_cutoffs[index])):
            if value <= bin_cutoffs[index][i]:
                if bin_dtype == "numerical":
                    return i + 1
                else:
                    if i == 0:
                        return "<= " + str(round(bin_cutoffs[index][i], 4))
                    else:
                        return (
                            str(round(bin_cutoffs[index][i - 1], 4))
                            + "-"
                            + str(round(bin_cutoffs[index][i], 4))
                        )
            else:
                next

        if bin_dtype == "numerical":
            return len(bin_cutoffs[0]) + 1
        else:
            return "> " + str(round(bin_cutoffs[index][len(bin_cutoffs[0]) - 1], 4))

    if bin_dtype == "numerical":
        f_bucket_label = F.udf(bucket_label, T.IntegerType())
    else:
        f_bucket_label = F.udf(bucket_label, T.StringType())

    odf = idf
    for idx, i in enumerate(list_of_cols):
        odf = odf.withColumn(i + "_binned", f_bucket_label(F.col(i), F.lit(idx)))

        if idx % 5 == 0:
            odf.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count()

    if output_mode == "replace":
        for col in list_of_cols:
            odf = odf.drop(col).withColumnRenamed(col + "_binned", col)

    if print_impact:
        if output_mode == "replace":
            output_cols = list_of_cols
        else:
            output_cols = [(i + "_binned") for i in list_of_cols]
        uniqueCount_computation(spark, odf, output_cols).show(len(output_cols), False)
    return odf
def auto_imputation(spark, idf, list_of_cols='missing', drop_cols=[], id_col='', null_pct=0.1, stats_missing={}, output_mode='replace', run_type='local', print_impact=True)

auto_imputation tests for 5 imputation methods using the other imputation functions provided in this module and returns the one with the best performance. The 5 methods are: (1) imputation_MMM with method_type="mean" (2) imputation_MMM with method_type="median" (3) imputation_sklearn with method_type="KNN" (4) imputation_sklearn with method_type="regression" (5) imputation_matrixFactorization

Samples without missing values in attributes to impute are used for testing by removing some % of values and impute them again using the above 5 methods. RMSE/attribute_mean is used as the evaluation metric for each attribute to reduce the effect of unit difference among attributes. The final error of a method is calculated by the sum of ( RMSE/attribute_mean) for all numerical attributes to impute and the method with the least error will be selected.

The above testing is only applicable for numerical attributes. If categorical attributes are included, they will be automatically imputed using imputation_MMM. In addition, if there is only one numerical attribute to impute, only method (1) and (2) will be tested because the rest of the methods require more than one column.

Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of columns to impute e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. This is super useful instead of specifying all column names manually. "missing" (default) can be passed to include only those columns with missing values. One of the usecases where "all" may be preferable over "missing" is when the user wants to save the imputation model for the future use e.g. a column may not have missing value in the training dataset but missing values may possibly appear in the prediction dataset. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
id_col
ID column (Default value = "")
null_pct
proportion of the valid input data to be replaced by None to form the test data (Default value = 0.1)
stats_missing
Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
"replace", "append". “replace” option replaces original columns with transformed column. “append” option append transformed column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
run_type
"local", "emr", "databricks" (Default value = "local")
print_impact
True, False (Default value = False) This argument is to print out before and after missing counts of imputed columns. It also print the name of best performing imputation method along with RMSE details.

Returns

DataFrame
Imputed Dataframe
Expand source code
def auto_imputation(
    spark,
    idf,
    list_of_cols="missing",
    drop_cols=[],
    id_col="",
    null_pct=0.1,
    stats_missing={},
    output_mode="replace",
    run_type="local",
    print_impact=True,
):
    """
    auto_imputation tests for 5 imputation methods using the other imputation functions provided in this module
    and returns the one with the best performance. The 5 methods are: (1) imputation_MMM with method_type="mean" (2)
    imputation_MMM with method_type="median" (3) imputation_sklearn with method_type="KNN" (4) imputation_sklearn
    with method_type="regression" (5) imputation_matrixFactorization

    Samples without missing values in attributes to impute are used for testing by removing some % of values and impute
    them again using the above 5 methods. RMSE/attribute_mean is used as the evaluation metric for each attribute to
    reduce the effect of unit difference among attributes. The final error of a method is calculated by the sum of (
    RMSE/attribute_mean) for all numerical attributes to impute and the method with the least error will be selected.

    The above testing is only applicable for numerical attributes. If categorical attributes are included,
    they will be automatically imputed using imputation_MMM. In addition, if there is only one numerical attribute to
    impute, only method (1) and (2) will be tested because the rest of the methods require more than one column.


    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    list_of_cols
        List of columns to impute e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        "all" can be passed to include all (non-array) columns for analysis.
        This is super useful instead of specifying all column names manually.
        "missing" (default) can be passed to include only those columns with missing values.
        One of the usecases where "all" may be preferable over "missing" is when the user wants to save
        the imputation model for the future use e.g. a column may not have missing value in the training
        dataset but missing values may possibly appear in the prediction dataset.
        Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
        drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols.
    drop_cols
        List of columns to be dropped e.g., ["col1","col2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
        It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except
        a few handful of them. (Default value = [])
    id_col
        ID column (Default value = "")
    null_pct
        proportion of the valid input data to be replaced by None to form the test data (Default value = 0.1)
    stats_missing
        Takes arguments for read_dataset (data_ingest module) function in a dictionary format
        to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or
        missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
    output_mode
        "replace", "append".
        “replace” option replaces original columns with transformed column. “append” option append transformed
        column to the input dataset with a postfix "_imputed" e.g. column X is appended as X_imputed. (Default value = "replace")
    run_type
        "local", "emr", "databricks" (Default value = "local")
    print_impact
        True, False (Default value = False)
        This argument is to print out before and after missing counts of imputed columns. It also print the name of best
        performing imputation method along with RMSE details.

    Returns
    -------
    DataFrame
        Imputed Dataframe

    """

    if run_type in list(platform_root_path.keys()):
        root_path = platform_root_path[run_type]
    else:
        root_path = ""

    if stats_missing == {}:
        missing_df = missingCount_computation(spark, idf)
        missing_df.write.parquet(
            root_path
            + "intermediate_data/imputation_comparison/missingCount_computation",
            mode="overwrite",
        )
        stats_missing = {
            "file_path": root_path
            + "intermediate_data/imputation_comparison/missingCount_computation",
            "file_type": "parquet",
        }
    else:
        missing_df = read_dataset(spark, **stats_missing).select(
            "attribute", "missing_count", "missing_pct"
        )

    empty_cols = (
        missing_df.where(F.col("missing_pct") == 1.0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    if len(empty_cols) > 0:
        warnings.warn("Following columns have all null values: " + ",".join(empty_cols))

    missing_cols = (
        missing_df.where(F.col("missing_count") > 0)
        .select("attribute")
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    if list_of_cols == "all":
        list_of_cols = idf.columns
    if list_of_cols == "missing":
        list_of_cols = missing_cols
    if isinstance(list_of_cols, str):
        list_of_cols = [x.strip() for x in list_of_cols.split("|")]
    if isinstance(drop_cols, str):
        drop_cols = [x.strip() for x in drop_cols.split("|")]

    list_of_cols = list(
        set([e for e in list_of_cols if (e not in drop_cols) & (e != id_col)])
    )
    if any(x not in idf.columns for x in list_of_cols):
        raise TypeError("Invalid input for Column(s)")

    del_cols = [e for e in list_of_cols if e in empty_cols]
    odf_del = idf.drop(*del_cols)

    list_of_cols = [e for e in list_of_cols if e not in empty_cols]
    num_cols, cat_cols, other_cols = attributeType_segregation(
        odf_del.select(list_of_cols)
    )
    missing_catcols = [e for e in cat_cols if e in missing_cols]
    missing_numcols = [e for e in num_cols if e in missing_cols]

    if missing_catcols:
        odf_imputed_cat = imputation_MMM(
            spark, odf_del, list_of_cols=missing_catcols, stats_missing=stats_missing
        )
    else:
        odf_imputed_cat = odf_del

    if len(missing_numcols) == 0:
        warnings.warn(
            "No Imputation Performed for numerical columns - No Column(s) to Impute"
        )
        return odf_imputed_cat

    idf_test = (
        odf_imputed_cat.dropna(subset=missing_numcols)
        .withColumn("index", F.monotonically_increasing_id())
        .withColumn("index", F.row_number().over(Window.orderBy("index")))
    )
    null_count = int(null_pct * idf_test.count())
    idf_null = idf_test
    for i in missing_numcols:
        null_index = random.sample(range(idf_test.count()), null_count)
        idf_null = idf_null.withColumn(
            i, F.when(F.col("index").isin(null_index), None).otherwise(F.col(i))
        )

    idf_null.write.parquet(
        root_path + "intermediate_data/imputation_comparison/test_dataset",
        mode="overwrite",
    )
    idf_null = spark.read.parquet(
        root_path + "intermediate_data/imputation_comparison/test_dataset"
    )

    method1 = imputation_MMM(
        spark,
        idf_null,
        list_of_cols=missing_numcols,
        method_type="mean",
        stats_missing=stats_missing,
        output_mode=output_mode,
    )
    method2 = imputation_MMM(
        spark,
        idf_null,
        list_of_cols=missing_numcols,
        method_type="median",
        stats_missing=stats_missing,
        output_mode=output_mode,
    )
    valid_methods = [method1, method2]
    if len(num_cols) > 1:
        method3 = imputation_sklearn(
            spark,
            idf_null,
            list_of_cols=num_cols,
            method_type="KNN",
            stats_missing=stats_missing,
            output_mode=output_mode,
        )
        method4 = imputation_sklearn(
            spark,
            idf_null,
            list_of_cols=num_cols,
            method_type="regression",
            stats_missing=stats_missing,
            output_mode=output_mode,
        )
        method5 = imputation_matrixFactorization(
            spark,
            idf_null,
            list_of_cols=num_cols,
            id_col=id_col,
            stats_missing=stats_missing,
            output_mode=output_mode,
        )
        valid_methods = [method1, method2, method3, method4, method5]

    nrmse_all = []
    method_all = ["MMM-mean", "MMM-median", "KNN", "regression", "matrix_factorization"]

    for index, method in enumerate(valid_methods):
        nrmse = 0
        for i in missing_numcols:
            pred_col = (i + "_imputed") if output_mode == "append" else i
            idf_joined = (
                idf_test.select("index", F.col(i).alias("val"))
                .join(
                    method.select("index", F.col(pred_col).alias("pred")),
                    "index",
                    "left_outer",
                )
                .dropna()
            )
            idf_joined = recast_column(
                idf=idf_joined,
                list_of_cols=["val", "pred"],
                list_of_dtypes=["double", "double"],
            )
            pred_mean = float(
                method.select(F.mean(pred_col)).rdd.flatMap(lambda x: x).collect()[0]
            )
            i_nrmse = (
                RegressionEvaluator(
                    metricName="rmse", labelCol="val", predictionCol="pred"
                ).evaluate(idf_joined)
            ) / abs(pred_mean)
            nrmse += i_nrmse
        nrmse_all.append(nrmse)

    min_index = nrmse_all.index(np.min(nrmse_all))
    best_method = method_all[min_index]
    odf = valid_methods[min_index]

    if print_impact:
        print(list(zip(method_all, nrmse_all)))
        print("Best Imputation Method: ", best_method)
    return odf
def autoencoder_latentFeatures(spark, idf, list_of_cols='all', drop_cols=[], reduction_params=0.5, sample_size=500000, epochs=100, batch_size=256, pre_existing_model=False, model_path='NA', standardization=True, standardization_configs={'pre_existing_model': False, 'model_path': 'NA'}, imputation=False, imputation_configs={'imputation_function': 'imputation_MMM'}, stats_missing={}, output_mode='replace', run_type='local', print_impact=False)

Many machine learning models suffer from "the curse of dimensionality" when the number of features is too large. autoencoder_latentFeatures is able to reduce the dimensionality by compressing input attributes to a smaller number of latent features.

To be more specific, it trains a neural network model using TensorFlow library. The neural network contains an encoder and a decoder, where the encoder learns to represent the input using smaller number of latent features controlled by the input reduction_params and the decoder learns to reproduce the input using the latent features. In the end, only the encoder will be kept and the latent features generated by the encoder will be added to the output dataframe.

However, the neural network model is not trained in a scalable manner, which might not be able to handle large input dataframe. Thus, an input sample_size (the default value is 500,000) can be set to control the number of samples to be used to train the model. If the total number of samples exceeds sample_size, the rest of the samples will be predicted using the fitted encoder.

Standardization is highly recommended if the input attributes are not of the same scale. Otherwise, the model might not converge smoothly. Inputs standardization and standardization_configs can be set accordingly to perform standardization within the function. In addition, if a sample contains missing values in the model input, the output values for its latent features will all be None. Thus data imputation is also recommended if missing values exist, which can be done within the function by setting inputs imputation and imputation_configs.

Parameters

spark
Spark Session
idf
Input Dataframe
list_of_cols
List of numerical columns to encode e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all numerical columns for analysis. This is super useful instead of specifying all column names manually. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of list_of_cols, when we need to consider all columns except a few handful of them. (Default value = [])
reduction_params
Determines the number of encoded features in the result. If reduction_params < 1, int(reduction_params * ) columns will be generated. Else, reduction_params columns will be generated. (Default value = 0.5)
sample_size
Maximum rows for training the autoencoder model using tensorflow. (Default value = 500000)
epochs
Number of epochs to train the tensorflow model. (Default value = 100)
batch_size
Number of samples per gradient update when fitting the tensorflow model. (Default value = 256)
pre_existing_model
Boolean argument – True or False. True if model exists already, False Otherwise (Default value = False)
model_path
If pre_existing_model is True, this argument is path for referring the pre-saved model. If pre_existing_model is False, this argument can be used for saving the model. Default "NA" means there is neither pre-existing model nor there is a need to save one.
standardization
Boolean argument – True or False. True, if the standardization required. (Default value = True)
standardization_configs
z_standardization function arguments in dictionary format. (Default value = {"pre_existing_model": False)
imputation
Boolean argument – True or False. True, if the imputation required. (Default value = False)
imputation_configs
Takes input in dictionary format. Imputation function name is provided with key "imputation_name". optional arguments pertaining to that imputation function can be provided with argument name as key. (Default value = {"imputation_function": "imputation_MMM"})
stats_missing
Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on missing count/pct i.e. if measures_of_counts or missingCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
output_mode
"replace", "append". “replace” option replaces original columns with transformed columns: latent_. “append” option append transformed columns with format latent_ to the input dataset, e.g. latent_0, latent_1 will be appende