Skip to content

basic_report_generation

Expand source code
import subprocess
from pathlib import Path

import datapane as dp
import mlflow
import pandas as pd
import plotly.express as px

from anovos.data_analyzer.association_evaluator import (
    IG_calculation,
    IV_calculation,
    correlation_matrix,
    variable_clustering,
)
from anovos.data_analyzer.quality_checker import (
    IDness_detection,
    biasedness_detection,
    duplicate_detection,
    invalidEntries_detection,
    nullColumns_detection,
    nullRows_detection,
    outlier_detection,
)
from anovos.data_analyzer.stats_generator import (
    global_summary,
    measures_of_cardinality,
    measures_of_centralTendency,
    measures_of_counts,
    measures_of_dispersion,
    measures_of_percentiles,
    measures_of_shape,
)
from anovos.shared.utils import ends_with, output_to_local, path_ak8s_modify

global_theme = px.colors.sequential.Plasma
global_theme_r = px.colors.sequential.Plasma_r
global_plot_bg_color = "rgba(0,0,0,0)"
global_paper_bg_color = "rgba(0,0,0,0)"

default_template = (
    dp.HTML(
        """
        <html>
            <img src="https://mobilewalla-anovos.s3.amazonaws.com/anovos.png"
                 style="height:100px;display:flex;margin:auto;float:right"/>
        </html>
        """
    ),
    dp.Text("# ML-Anovos Report"),
)

blank_df = dp.DataTable(pd.DataFrame(columns=[" "], index=range(1)), label=" ")


def stats_args(path, func):
    """

    Parameters
    ----------
    path
        Path to pre-saved statistics
    func
        Quality Checker function


    Returns
    -------
    Dictionary
        Each key/value is argument (related to pre-saved statistics) to be passed for the quality checker function.

    """
    output = {}
    mainfunc_to_args = {
        "biasedness_detection": ["stats_mode"],
        "IDness_detection": ["stats_unique"],
        "nullColumns_detection": ["stats_unique", "stats_mode", "stats_missing"],
        "variable_clustering": ["stats_mode"],
    }
    args_to_statsfunc = {
        "stats_unique": "measures_of_cardinality",
        "stats_mode": "measures_of_centralTendency",
        "stats_missing": "measures_of_counts",
    }

    for arg in mainfunc_to_args.get(func, []):
        output[arg] = {
            "file_path": (ends_with(path) + args_to_statsfunc[arg] + ".csv"),
            "file_type": "csv",
            "file_configs": {"header": True, "inferSchema": True},
        }

    return output


def anovos_basic_report(
    spark,
    idf,
    id_col="",
    label_col="",
    event_label="",
    skip_corr_matrix=True,
    output_path=".",
    run_type="local",
    auth_key="NA",
    print_impact=True,
    mlflow_config=None,
):
    """

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    id_col
        ID column (Default value = "")
    label_col
        Label/Target column (Default value = "")
    event_label
        Value of (positive) event (i.e label 1) (Default value = "")
    skip_corr_matrix
        True, False.
        This argument is to skip correlation matrix generation in basic_report.(Default value = True)
    output_path
        File Path for saving metrics and basic report (Default value = ".")
    run_type
        "local", "emr" or "databricks" or "ak8s"
        "emr" if the files are read from or written in AWS s3
        "databricks" if the files are read from or written in dbfs in azure databricks
        "ak8s" if the files are read from or written to in wasbs:// container in azure environment (Default value = "local")
    auth_key
        Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type.
    print_impact
        True, False.
        This argument is to print out the data analyzer statistics.(Default value = False)
    mlflow_config
        MLflow configuration. If None, all MLflow features are disabled.
    """
    global num_cols
    global cat_cols

    SG_funcs = [
        global_summary,
        measures_of_counts,
        measures_of_centralTendency,
        measures_of_cardinality,
        measures_of_dispersion,
        measures_of_percentiles,
        measures_of_shape,
    ]
    QC_rows_funcs = [duplicate_detection, nullRows_detection]
    QC_cols_funcs = [
        nullColumns_detection,
        outlier_detection,
        IDness_detection,
        biasedness_detection,
        invalidEntries_detection,
    ]

    if mlflow_config is not None:
        output_path = output_path + "/" + mlflow_config.get("run_id", "")

    if skip_corr_matrix:
        AA_funcs = [variable_clustering]
    else:
        AA_funcs = [correlation_matrix, variable_clustering]
    AT_funcs = [IV_calculation, IG_calculation]
    all_funcs = SG_funcs + QC_rows_funcs + QC_cols_funcs + AA_funcs + AT_funcs

    if run_type == "local":
        local_path = output_path
    elif run_type == "databricks":
        local_path = output_to_local(output_path)
    elif run_type in ("emr", "ak8s"):
        local_path = "report_stats"
    else:
        raise ValueError("Invalid run_type")

    Path(local_path).mkdir(parents=True, exist_ok=True)

    for func in all_funcs:
        if func in SG_funcs:
            stats = func(spark, idf)
        elif func in (QC_rows_funcs + QC_cols_funcs):
            extra_args = stats_args(output_path, func.__name__)
            if func.__name__ in ["outlier_detection", "duplicate_detection"]:
                extra_args["print_impact"] = True
            stats = func(spark, idf, **extra_args)[1]
        elif func in AA_funcs:
            extra_args = stats_args(output_path, func.__name__)
            stats = func(spark, idf, drop_cols=id_col, **extra_args)
        elif label_col:
            if func in AT_funcs:
                stats = func(spark, idf, label_col=label_col, event_label=event_label)
        else:
            continue

        stats.toPandas().to_csv(
            ends_with(local_path) + func.__name__ + ".csv", index=False
        )

        if run_type == "emr":
            bash_cmd = (
                "aws s3 cp "
                + ends_with(local_path)
                + func.__name__
                + ".csv "
                + ends_with(output_path)
            )
            subprocess.check_output(["bash", "-c", bash_cmd])

        elif run_type == "ak8s":
            local_file = ends_with(local_path) + func.__name__ + ".csv"
            output_path_mod = path_ak8s_modify(output_path)
            bash_cmd = (
                'azcopy cp "'
                + local_file
                + '" "'
                + ends_with(output_path_mod)
                + str(auth_key)
                + '" --recursive=true'
            )

            subprocess.check_output(["bash", "-c", bash_cmd])

        if print_impact:
            print(func.__name__, ":\n")
            stats = spark.read.csv(
                ends_with(output_path) + func.__name__ + ".csv",
                header=True,
                inferSchema=True,
            )
            stats.show()

    def remove_u_score(col):
        col_ = col.split("_")
        bl = []

        for i in col_:
            if i == "nullColumns" or i == "nullRows":
                bl.append("Null")
            else:
                bl.append(i[0].upper() + i[1:])

        return " ".join(bl)

    global_summary_df = pd.read_csv(ends_with(local_path) + "global_summary.csv")
    rows_count = int(
        global_summary_df[global_summary_df.metric.values == "rows_count"].value.values[
            0
        ]
    )
    catcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "catcols_count"
        ].value.values[0]
    )
    numcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "numcols_count"
        ].value.values[0]
    )
    columns_count = int(
        global_summary_df[
            global_summary_df.metric.values == "columns_count"
        ].value.values[0]
    )
    catcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "catcols_name"
            ].value.values
        )
    )
    numcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "numcols_name"
            ].value.values
        )
    )

    l1 = dp.Group(
        dp.Text("# "),
        dp.Text("*This section summarizes the dataset with key statistical metrics.*"),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Global Summary"),
        dp.Group(
            dp.Text(" Total Number of Records: **" + str(f"{rows_count:,d}") + "**"),
            dp.Text(" Total Number of Attributes: **" + str(columns_count) + "**"),
            dp.Text(" Number of Numerical Attributes : **" + str(numcols_count) + "**"),
            dp.Text(" Numerical Attributes Name : **" + str(numcols_name) + "**"),
            dp.Text(
                " Number of Categorical Attributes : **" + str(catcols_count) + "**"
            ),
            dp.Text(" Categorical Attributes Name : **" + str(catcols_name) + "**"),
        ),
    )

    l2 = dp.Text("### Statistics by Metric Type")

    SG_content = []
    for i in SG_funcs:
        if i.__name__ != "global_summary":
            SG_content.append(
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    ),
                    label=remove_u_score(i.__name__),
                )
            )
    l3 = dp.Group(dp.Select(blocks=SG_content, type=dp.SelectType.TABS), dp.Text("# "))

    tab1 = dp.Group(
        l1,
        dp.Text("# "),
        l2,
        l3,
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("# "),
        label="Descriptive Statistics",
    )

    QCcol_content = []
    for i in QC_cols_funcs:
        QCcol_content.append(
            [
                dp.Text("### " + str(remove_u_score(i.__name__))),
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    )
                ),
                dp.Text("#"),
                dp.Text("#"),
            ]
        )
    QCrow_content = []
    for i in QC_rows_funcs:
        if i.__name__ == "duplicate_detection":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            unique_rows_count = (
                " No. Of Unique Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "unique_rows_count"].value.values),
                        ",",
                    )
                )
                + "**"
            )
            total_rows_count = (
                " No. of Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "rows_count"].value.values), ","
                    )
                )
                + "**"
            )

            duplicate_rows_count = (
                " No. of Duplicate Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "duplicate_rows"].value.values),
                        ",",
                    )
                )
                + "**"
            )

            duplicate_rows_pct = (
                " Percentage of Duplicate Rows: **"
                + str(
                    float(
                        stats[stats["metric"] == "duplicate_pct"].value.values * 100.0
                    )
                )
                + " %"
                + "**"
            )

            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.Group(
                        dp.Text(total_rows_count),
                        dp.Text(unique_rows_count),
                        dp.Text(duplicate_rows_count),
                        dp.Text(duplicate_rows_pct),
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
        else:
            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.DataTable(
                        pd.read_csv(
                            ends_with(local_path) + str(i.__name__) + ".csv"
                        ).round(3)
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
    QCcol_content = [item for sublist in QCcol_content for item in sublist]
    QCrow_content = [item for sublist in QCrow_content for item in sublist]

    tab2 = dp.Group(
        dp.Text("# "),
        dp.Text(
            "*This section identifies the data quality issues at both row and column level.*"
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Select(
            blocks=[
                dp.Group(dp.Text("# "), dp.Group(*QCcol_content), label="Column Level"),
                dp.Group(dp.Text("# "), dp.Group(*QCrow_content), label="Row Level"),
            ],
            type=dp.SelectType.TABS,
        ),
        dp.Text("# "),
        dp.Text("# "),
        label="Quality Check",
    )

    AA_content = []
    for i in AA_funcs + AT_funcs:
        if i.__name__ == "correlation_matrix":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            feats_order = list(stats["attribute"].values)
            stats = stats.round(3)
            fig = px.imshow(
                stats[feats_order],
                y=feats_order,
                color_continuous_scale=global_theme,
                aspect="auto",
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats[["attribute"] + feats_order]),
                    dp.Plot(fig),
                    label=remove_u_score(i.__name__),
                )
            )

        elif i.__name__ == "variable_clustering":
            stats = (
                pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv")
                .round(3)
                .sort_values(by=["Cluster"], ascending=True)
            )
            fig = px.sunburst(
                stats,
                path=["Cluster", "Attribute"],
                values="RS_Ratio",
                color_discrete_sequence=global_theme,
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            fig.layout.autosize = True
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats),
                    dp.Plot(fig),
                    label=remove_u_score(i.__name__),
                )
            )

        else:
            if label_col:
                stats = pd.read_csv(
                    ends_with(local_path) + str(i.__name__) + ".csv"
                ).round(3)
                col_nm = [x for x in list(stats.columns) if "attribute" not in x]
                stats = stats.sort_values(col_nm[0], ascending=True)
                fig = px.bar(
                    stats,
                    x=col_nm[0],
                    y="attribute",
                    orientation="h",
                    color_discrete_sequence=global_theme,
                )
                fig.layout.plot_bgcolor = global_plot_bg_color
                fig.layout.paper_bgcolor = global_paper_bg_color
                fig.layout.autosize = True
                AA_content.append(
                    dp.Group(
                        dp.Text("##"),
                        dp.DataTable(stats),
                        dp.Plot(fig),
                        label=remove_u_score(i.__name__),
                    )
                )

    if len(AA_content) == 1:
        AA_content.append(blank_df)
    else:
        AA_content

    # @TODO: is there better templating approach such as jinja
    tab3 = dp.Group(
        dp.Text("# "),
        dp.Text(
            """
            *This section analyzes the interaction between different attributes and/or the relationship
            between an attribute & the binary target variable.*
            """
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Association Matrix & Plot"),
        dp.Select(blocks=AA_content, type=dp.SelectType.DROPDOWN),
        dp.Text("### "),
        dp.Text("## "),
        dp.Text("## "),
        dp.Text("## "),
        label="Attribute Associations",
    )

    dp.Report(
        default_template[0],
        default_template[1],
        dp.Select(blocks=[tab1, tab2, tab3], type=dp.SelectType.TABS),
    ).save(ends_with(local_path) + "basic_report.html", open=True)

    if mlflow_config is not None:
        mlflow.log_artifacts(local_dir=local_path, artifact_path=output_path)

    if run_type == "emr":
        bash_cmd = (
            "aws s3 cp "
            + ends_with(local_path)
            + "basic_report.html "
            + ends_with(output_path)
        )
        subprocess.check_output(["bash", "-c", bash_cmd])

    if run_type == "ak8s":
        output_path_mod = path_ak8s_modify(output_path)
        bash_cmd = (
            'azcopy cp "'
            + ends_with(local_path)
            + 'basic_report.html" "'
            + ends_with(output_path_mod)
            + str(auth_key)
            + '"'
        )
        subprocess.check_output(["bash", "-c", bash_cmd])

Functions

def anovos_basic_report(spark, idf, id_col='', label_col='', event_label='', skip_corr_matrix=True, output_path='.', run_type='local', auth_key='NA', print_impact=True, mlflow_config=None)

Parameters

spark
Spark Session
idf
Input Dataframe
id_col
ID column (Default value = "")
label_col
Label/Target column (Default value = "")
event_label
Value of (positive) event (i.e label 1) (Default value = "")
skip_corr_matrix
True, False. This argument is to skip correlation matrix generation in basic_report.(Default value = True)
output_path
File Path for saving metrics and basic report (Default value = ".")
run_type
"local", "emr" or "databricks" or "ak8s" "emr" if the files are read from or written in AWS s3 "databricks" if the files are read from or written in dbfs in azure databricks "ak8s" if the files are read from or written to in wasbs:// container in azure environment (Default value = "local")
auth_key
Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type.
print_impact
True, False. This argument is to print out the data analyzer statistics.(Default value = False)
mlflow_config
MLflow configuration. If None, all MLflow features are disabled.
Expand source code
def anovos_basic_report(
    spark,
    idf,
    id_col="",
    label_col="",
    event_label="",
    skip_corr_matrix=True,
    output_path=".",
    run_type="local",
    auth_key="NA",
    print_impact=True,
    mlflow_config=None,
):
    """

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    id_col
        ID column (Default value = "")
    label_col
        Label/Target column (Default value = "")
    event_label
        Value of (positive) event (i.e label 1) (Default value = "")
    skip_corr_matrix
        True, False.
        This argument is to skip correlation matrix generation in basic_report.(Default value = True)
    output_path
        File Path for saving metrics and basic report (Default value = ".")
    run_type
        "local", "emr" or "databricks" or "ak8s"
        "emr" if the files are read from or written in AWS s3
        "databricks" if the files are read from or written in dbfs in azure databricks
        "ak8s" if the files are read from or written to in wasbs:// container in azure environment (Default value = "local")
    auth_key
        Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type.
    print_impact
        True, False.
        This argument is to print out the data analyzer statistics.(Default value = False)
    mlflow_config
        MLflow configuration. If None, all MLflow features are disabled.
    """
    global num_cols
    global cat_cols

    SG_funcs = [
        global_summary,
        measures_of_counts,
        measures_of_centralTendency,
        measures_of_cardinality,
        measures_of_dispersion,
        measures_of_percentiles,
        measures_of_shape,
    ]
    QC_rows_funcs = [duplicate_detection, nullRows_detection]
    QC_cols_funcs = [
        nullColumns_detection,
        outlier_detection,
        IDness_detection,
        biasedness_detection,
        invalidEntries_detection,
    ]

    if mlflow_config is not None:
        output_path = output_path + "/" + mlflow_config.get("run_id", "")

    if skip_corr_matrix:
        AA_funcs = [variable_clustering]
    else:
        AA_funcs = [correlation_matrix, variable_clustering]
    AT_funcs = [IV_calculation, IG_calculation]
    all_funcs = SG_funcs + QC_rows_funcs + QC_cols_funcs + AA_funcs + AT_funcs

    if run_type == "local":
        local_path = output_path
    elif run_type == "databricks":
        local_path = output_to_local(output_path)
    elif run_type in ("emr", "ak8s"):
        local_path = "report_stats"
    else:
        raise ValueError("Invalid run_type")

    Path(local_path).mkdir(parents=True, exist_ok=True)

    for func in all_funcs:
        if func in SG_funcs:
            stats = func(spark, idf)
        elif func in (QC_rows_funcs + QC_cols_funcs):
            extra_args = stats_args(output_path, func.__name__)
            if func.__name__ in ["outlier_detection", "duplicate_detection"]:
                extra_args["print_impact"] = True
            stats = func(spark, idf, **extra_args)[1]
        elif func in AA_funcs:
            extra_args = stats_args(output_path, func.__name__)
            stats = func(spark, idf, drop_cols=id_col, **extra_args)
        elif label_col:
            if func in AT_funcs:
                stats = func(spark, idf, label_col=label_col, event_label=event_label)
        else:
            continue

        stats.toPandas().to_csv(
            ends_with(local_path) + func.__name__ + ".csv", index=False
        )

        if run_type == "emr":
            bash_cmd = (
                "aws s3 cp "
                + ends_with(local_path)
                + func.__name__
                + ".csv "
                + ends_with(output_path)
            )
            subprocess.check_output(["bash", "-c", bash_cmd])

        elif run_type == "ak8s":
            local_file = ends_with(local_path) + func.__name__ + ".csv"
            output_path_mod = path_ak8s_modify(output_path)
            bash_cmd = (
                'azcopy cp "'
                + local_file
                + '" "'
                + ends_with(output_path_mod)
                + str(auth_key)
                + '" --recursive=true'
            )

            subprocess.check_output(["bash", "-c", bash_cmd])

        if print_impact:
            print(func.__name__, ":\n")
            stats = spark.read.csv(
                ends_with(output_path) + func.__name__ + ".csv",
                header=True,
                inferSchema=True,
            )
            stats.show()

    def remove_u_score(col):
        col_ = col.split("_")
        bl = []

        for i in col_:
            if i == "nullColumns" or i == "nullRows":
                bl.append("Null")
            else:
                bl.append(i[0].upper() + i[1:])

        return " ".join(bl)

    global_summary_df = pd.read_csv(ends_with(local_path) + "global_summary.csv")
    rows_count = int(
        global_summary_df[global_summary_df.metric.values == "rows_count"].value.values[
            0
        ]
    )
    catcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "catcols_count"
        ].value.values[0]
    )
    numcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "numcols_count"
        ].value.values[0]
    )
    columns_count = int(
        global_summary_df[
            global_summary_df.metric.values == "columns_count"
        ].value.values[0]
    )
    catcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "catcols_name"
            ].value.values
        )
    )
    numcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "numcols_name"
            ].value.values
        )
    )

    l1 = dp.Group(
        dp.Text("# "),
        dp.Text("*This section summarizes the dataset with key statistical metrics.*"),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Global Summary"),
        dp.Group(
            dp.Text(" Total Number of Records: **" + str(f"{rows_count:,d}") + "**"),
            dp.Text(" Total Number of Attributes: **" + str(columns_count) + "**"),
            dp.Text(" Number of Numerical Attributes : **" + str(numcols_count) + "**"),
            dp.Text(" Numerical Attributes Name : **" + str(numcols_name) + "**"),
            dp.Text(
                " Number of Categorical Attributes : **" + str(catcols_count) + "**"
            ),
            dp.Text(" Categorical Attributes Name : **" + str(catcols_name) + "**"),
        ),
    )

    l2 = dp.Text("### Statistics by Metric Type")

    SG_content = []
    for i in SG_funcs:
        if i.__name__ != "global_summary":
            SG_content.append(
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    ),
                    label=remove_u_score(i.__name__),
                )
            )
    l3 = dp.Group(dp.Select(blocks=SG_content, type=dp.SelectType.TABS), dp.Text("# "))

    tab1 = dp.Group(
        l1,
        dp.Text("# "),
        l2,
        l3,
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("# "),
        label="Descriptive Statistics",
    )

    QCcol_content = []
    for i in QC_cols_funcs:
        QCcol_content.append(
            [
                dp.Text("### " + str(remove_u_score(i.__name__))),
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    )
                ),
                dp.Text("#"),
                dp.Text("#"),
            ]
        )
    QCrow_content = []
    for i in QC_rows_funcs:
        if i.__name__ == "duplicate_detection":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            unique_rows_count = (
                " No. Of Unique Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "unique_rows_count"].value.values),
                        ",",
                    )
                )
                + "**"
            )
            total_rows_count = (
                " No. of Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "rows_count"].value.values), ","
                    )
                )
                + "**"
            )

            duplicate_rows_count = (
                " No. of Duplicate Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "duplicate_rows"].value.values),
                        ",",
                    )
                )
                + "**"
            )

            duplicate_rows_pct = (
                " Percentage of Duplicate Rows: **"
                + str(
                    float(
                        stats[stats["metric"] == "duplicate_pct"].value.values * 100.0
                    )
                )
                + " %"
                + "**"
            )

            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.Group(
                        dp.Text(total_rows_count),
                        dp.Text(unique_rows_count),
                        dp.Text(duplicate_rows_count),
                        dp.Text(duplicate_rows_pct),
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
        else:
            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.DataTable(
                        pd.read_csv(
                            ends_with(local_path) + str(i.__name__) + ".csv"
                        ).round(3)
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
    QCcol_content = [item for sublist in QCcol_content for item in sublist]
    QCrow_content = [item for sublist in QCrow_content for item in sublist]

    tab2 = dp.Group(
        dp.Text("# "),
        dp.Text(
            "*This section identifies the data quality issues at both row and column level.*"
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Select(
            blocks=[
                dp.Group(dp.Text("# "), dp.Group(*QCcol_content), label="Column Level"),
                dp.Group(dp.Text("# "), dp.Group(*QCrow_content), label="Row Level"),
            ],
            type=dp.SelectType.TABS,
        ),
        dp.Text("# "),
        dp.Text("# "),
        label="Quality Check",
    )

    AA_content = []
    for i in AA_funcs + AT_funcs:
        if i.__name__ == "correlation_matrix":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            feats_order = list(stats["attribute"].values)
            stats = stats.round(3)
            fig = px.imshow(
                stats[feats_order],
                y=feats_order,
                color_continuous_scale=global_theme,
                aspect="auto",
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats[["attribute"] + feats_order]),
                    dp.Plot(fig),
                    label=remove_u_score(i.__name__),
                )
            )

        elif i.__name__ == "variable_clustering":
            stats = (
                pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv")
                .round(3)
                .sort_values(by=["Cluster"], ascending=True)
            )
            fig = px.sunburst(
                stats,
                path=["Cluster", "Attribute"],
                values="RS_Ratio",
                color_discrete_sequence=global_theme,
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            fig.layout.autosize = True
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats),
                    dp.Plot(fig),
                    label=remove_u_score(i.__name__),
                )
            )

        else:
            if label_col:
                stats = pd.read_csv(
                    ends_with(local_path) + str(i.__name__) + ".csv"
                ).round(3)
                col_nm = [x for x in list(stats.columns) if "attribute" not in x]
                stats = stats.sort_values(col_nm[0], ascending=True)
                fig = px.bar(
                    stats,
                    x=col_nm[0],
                    y="attribute",
                    orientation="h",
                    color_discrete_sequence=global_theme,
                )
                fig.layout.plot_bgcolor = global_plot_bg_color
                fig.layout.paper_bgcolor = global_paper_bg_color
                fig.layout.autosize = True
                AA_content.append(
                    dp.Group(
                        dp.Text("##"),
                        dp.DataTable(stats),
                        dp.Plot(fig),
                        label=remove_u_score(i.__name__),
                    )
                )

    if len(AA_content) == 1:
        AA_content.append(blank_df)
    else:
        AA_content

    # @TODO: is there better templating approach such as jinja
    tab3 = dp.Group(
        dp.Text("# "),
        dp.Text(
            """
            *This section analyzes the interaction between different attributes and/or the relationship
            between an attribute & the binary target variable.*
            """
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Association Matrix & Plot"),
        dp.Select(blocks=AA_content, type=dp.SelectType.DROPDOWN),
        dp.Text("### "),
        dp.Text("## "),
        dp.Text("## "),
        dp.Text("## "),
        label="Attribute Associations",
    )

    dp.Report(
        default_template[0],
        default_template[1],
        dp.Select(blocks=[tab1, tab2, tab3], type=dp.SelectType.TABS),
    ).save(ends_with(local_path) + "basic_report.html", open=True)

    if mlflow_config is not None:
        mlflow.log_artifacts(local_dir=local_path, artifact_path=output_path)

    if run_type == "emr":
        bash_cmd = (
            "aws s3 cp "
            + ends_with(local_path)
            + "basic_report.html "
            + ends_with(output_path)
        )
        subprocess.check_output(["bash", "-c", bash_cmd])

    if run_type == "ak8s":
        output_path_mod = path_ak8s_modify(output_path)
        bash_cmd = (
            'azcopy cp "'
            + ends_with(local_path)
            + 'basic_report.html" "'
            + ends_with(output_path_mod)
            + str(auth_key)
            + '"'
        )
        subprocess.check_output(["bash", "-c", bash_cmd])
def stats_args(path, func)

Parameters

path
Path to pre-saved statistics
func
Quality Checker function

Returns

Dictionary
Each key/value is argument (related to pre-saved statistics) to be passed for the quality checker function.
Expand source code
def stats_args(path, func):
    """

    Parameters
    ----------
    path
        Path to pre-saved statistics
    func
        Quality Checker function


    Returns
    -------
    Dictionary
        Each key/value is argument (related to pre-saved statistics) to be passed for the quality checker function.

    """
    output = {}
    mainfunc_to_args = {
        "biasedness_detection": ["stats_mode"],
        "IDness_detection": ["stats_unique"],
        "nullColumns_detection": ["stats_unique", "stats_mode", "stats_missing"],
        "variable_clustering": ["stats_mode"],
    }
    args_to_statsfunc = {
        "stats_unique": "measures_of_cardinality",
        "stats_mode": "measures_of_centralTendency",
        "stats_missing": "measures_of_counts",
    }

    for arg in mainfunc_to_args.get(func, []):
        output[arg] = {
            "file_path": (ends_with(path) + args_to_statsfunc[arg] + ".csv"),
            "file_type": "csv",
            "file_configs": {"header": True, "inferSchema": True},
        }

    return output