Skip to content

basic_report_generation

Expand source code
import subprocess
from pathlib import Path

import datapane as dp
import pandas as pd
import plotly.express as px

from anovos.data_analyzer.association_evaluator import (
    correlation_matrix,
    variable_clustering,
    IV_calculation,
    IG_calculation,
)
from anovos.data_analyzer.quality_checker import (
    duplicate_detection,
    nullRows_detection,
    nullColumns_detection,
    outlier_detection,
    IDness_detection,
    biasedness_detection,
    invalidEntries_detection,
)
from anovos.data_analyzer.stats_generator import (
    global_summary,
    measures_of_counts,
    measures_of_centralTendency,
    measures_of_cardinality,
    measures_of_dispersion,
    measures_of_percentiles,
    measures_of_shape,
)
from anovos.shared.utils import ends_with

global_theme = px.colors.sequential.Plasma
global_theme_r = px.colors.sequential.Plasma_r
global_plot_bg_color = "rgba(0,0,0,0)"
global_paper_bg_color = "rgba(0,0,0,0)"

default_template = (
    dp.HTML(
        """
        <html>
            <img src="https://mobilewalla-anovos.s3.amazonaws.com/anovos.png"
                 style="height:100px;display:flex;margin:auto;float:right"/>
        </html>
        """
    ),
    dp.Text("# ML-Anovos Report"),
)


def stats_args(path, func):
    """

    Parameters
    ----------
    path
        Path to pre-saved statistics
    func
        Quality Checker function


    Returns
    -------
    Dictionary
        Each key/value is argument (related to pre-saved statistics) to be passed for the quality checker function.

    """
    output = {}
    mainfunc_to_args = {
        "biasedness_detection": ["stats_mode"],
        "IDness_detection": ["stats_unique"],
        "outlier_detection": ["stats_unique"],
        "correlation_matrix": ["stats_unique"],
        "nullColumns_detection": ["stats_unique", "stats_mode", "stats_missing"],
        "variable_clustering": ["stats_unique", "stats_mode"],
    }
    args_to_statsfunc = {
        "stats_unique": "measures_of_cardinality",
        "stats_mode": "measures_of_centralTendency",
        "stats_missing": "measures_of_counts",
    }

    for arg in mainfunc_to_args.get(func, []):
        output[arg] = {
            "file_path": (ends_with(path) + args_to_statsfunc[arg] + ".csv"),
            "file_type": "csv",
            "file_configs": {"header": True, "inferSchema": True},
        }

    return output


def anovos_basic_report(
    spark,
    idf,
    id_col="",
    label_col="",
    event_label="",
    output_path=".",
    run_type="local",
    print_impact=True,
):
    """

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    id_col
        ID column (Default value = "")
    label_col
        Label/Target column (Default value = "")
    event_label
        Value of (positive) event (i.e label 1) (Default value = "")
    output_path
        File Path for saving metrics and basic report (Default value = ".")
    run_type
        "local", "emr" or "databricks"
        "emr" if the files are read from or written in AWS s3
        "databricks" if the files are read from or written in dbfs in azure databricks (Default value = "local")
    print_impact
        True, False.
        This argument is to print out the data analyzer statistics.(Default value = False)

    """
    global num_cols
    global cat_cols

    SG_funcs = [
        global_summary,
        measures_of_counts,
        measures_of_centralTendency,
        measures_of_cardinality,
        measures_of_dispersion,
        measures_of_percentiles,
        measures_of_shape,
    ]
    QC_rows_funcs = [duplicate_detection, nullRows_detection]
    QC_cols_funcs = [
        nullColumns_detection,
        outlier_detection,
        IDness_detection,
        biasedness_detection,
        invalidEntries_detection,
    ]
    AA_funcs = [correlation_matrix, variable_clustering]
    AT_funcs = [IV_calculation, IG_calculation]
    all_funcs = SG_funcs + QC_rows_funcs + QC_cols_funcs + AA_funcs + AT_funcs

    def output_to_local(output_path):
        punctuations = ":"
        for x in output_path:
            if x in punctuations:
                local_path = output_path.replace(x, "")
                local_path = "/" + local_path
        return local_path

    if run_type == "local":
        local_path = output_path
    elif run_type == "databricks":
        local_path = output_to_local(output_path)
    elif run_type == "emr":
        local_path = "report_stats"
    else:
        raise ValueError("Invalid run_type")

    Path(local_path).mkdir(parents=True, exist_ok=True)

    for func in all_funcs:
        if func in SG_funcs:
            stats = func(spark, idf)
        elif func in (QC_rows_funcs + QC_cols_funcs):
            extra_args = stats_args(output_path, func.__name__)
            stats = func(spark, idf, **extra_args)[1]
        elif func in AA_funcs:
            extra_args = stats_args(output_path, func.__name__)
            stats = func(spark, idf, drop_cols=id_col, **extra_args)
        elif label_col:
            if func in AT_funcs:
                stats = func(spark, idf, label_col=label_col, event_label=event_label)
        else:
            continue

        stats.toPandas().to_csv(
            ends_with(local_path) + func.__name__ + ".csv", index=False
        )

        if run_type == "emr":
            bash_cmd = (
                "aws s3 cp "
                + ends_with(local_path)
                + func.__name__
                + ".csv "
                + ends_with(output_path)
            )
            subprocess.check_output(["bash", "-c", bash_cmd])

        if print_impact:
            print(func.__name__, ":\n")
            stats = spark.read.csv(
                ends_with(output_path) + func.__name__ + ".csv",
                header=True,
                inferSchema=True,
            )
            stats.show()

    def remove_u_score(col):
        col_ = col.split("_")
        bl = []

        for i in col_:
            if i == "nullColumns" or i == "nullRows":
                bl.append("Null")
            else:
                bl.append(i[0].upper() + i[1:])

        return " ".join(bl)

    global_summary_df = pd.read_csv(ends_with(local_path) + "global_summary.csv")
    rows_count = int(
        global_summary_df[global_summary_df.metric.values == "rows_count"].value.values[
            0
        ]
    )
    catcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "catcols_count"
        ].value.values[0]
    )
    numcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "numcols_count"
        ].value.values[0]
    )
    columns_count = int(
        global_summary_df[
            global_summary_df.metric.values == "columns_count"
        ].value.values[0]
    )
    catcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "catcols_name"
            ].value.values
        )
    )
    numcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "numcols_name"
            ].value.values
        )
    )

    l1 = dp.Group(
        dp.Text("# "),
        dp.Text("*This section summarizes the dataset with key statistical metrics.*"),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Global Summary"),
        dp.Group(
            dp.Text(" Total Number of Records: **" + str(f"{rows_count:,d}") + "**"),
            dp.Text(" Total Number of Attributes: **" + str(columns_count) + "**"),
            dp.Text(" Number of Numerical Attributes : **" + str(numcols_count) + "**"),
            dp.Text(" Numerical Attributes Name : **" + str(numcols_name) + "**"),
            dp.Text(
                " Number of Categorical Attributes : **" + str(catcols_count) + "**"
            ),
            dp.Text(" Categorical Attributes Name : **" + str(catcols_name) + "**"),
            rows=6,
        ),
        rows=8,
    )

    l2 = dp.Text("### Statistics by Metric Type")

    SG_content = []
    for i in SG_funcs:
        if i.__name__ != "global_summary":
            SG_content.append(
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    ),
                    label=remove_u_score(i.__name__),
                )
            )
    l3 = dp.Group(dp.Select(blocks=SG_content, type=dp.SelectType.TABS), dp.Text("# "))

    tab1 = dp.Group(
        l1,
        dp.Text("# "),
        l2,
        l3,
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("# "),
        label="Descriptive Statistics",
    )

    QCcol_content = []
    for i in QC_cols_funcs:
        QCcol_content.append(
            [
                dp.Text("### " + str(remove_u_score(i.__name__))),
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    )
                ),
                dp.Text("#"),
                dp.Text("#"),
            ]
        )
    QCrow_content = []
    for i in QC_rows_funcs:
        if i.__name__ == "duplicate_detection":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            unique_rows_count = (
                " No. Of Unique Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "unique_rows_count"].value.values),
                        ",",
                    )
                )
                + "**"
            )
            total_rows_count = (
                " No. of Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "rows_count"].value.values), ","
                    )
                )
                + "**"
            )

            duplicate_rows_count = (
                " No. of Duplicate Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "duplicate_rows"].value.values),
                        ",",
                    )
                )
                + "**"
            )

            duplicate_rows_pct = (
                " Percentage of Duplicate Rows: **"
                + str(
                    float(
                        stats[stats["metric"] == "duplicate_pct"].value.values * 100.0
                    )
                )
                + " %"
                + "**"
            )

            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.Group(
                        dp.Text(total_rows_count),
                        dp.Text(unique_rows_count),
                        dp.Text(duplicate_rows_count),
                        dp.Text(duplicate_rows_pct),
                        rows=4,
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
        else:
            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.DataTable(
                        pd.read_csv(
                            ends_with(local_path) + str(i.__name__) + ".csv"
                        ).round(3)
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
    QCcol_content = [item for sublist in QCcol_content for item in sublist]
    QCrow_content = [item for sublist in QCrow_content for item in sublist]

    tab2 = dp.Group(
        dp.Text("# "),
        dp.Text(
            "*This section identifies the data quality issues at both row and column level.*"
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Select(
            blocks=[
                dp.Group(
                    dp.Text("# "),
                    dp.Group(*QCcol_content),
                    rows=2,
                    label="Column Level",
                ),
                dp.Group(
                    dp.Text("# "), dp.Group(*QCrow_content), rows=2, label="Row Level"
                ),
            ],
            type=dp.SelectType.TABS,
        ),
        dp.Text("# "),
        dp.Text("# "),
        label="Quality Check",
    )

    AA_content = []
    for i in AA_funcs + AT_funcs:
        if i.__name__ == "correlation_matrix":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            feats_order = list(stats["attribute"].values)
            stats = stats.round(3)
            fig = px.imshow(
                stats[feats_order],
                y=feats_order,
                color_continuous_scale=global_theme,
                aspect="auto",
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats[["attribute"] + feats_order]),
                    dp.Plot(fig),
                    rows=3,
                    label=remove_u_score(i.__name__),
                )
            )

        elif i.__name__ == "variable_clustering":
            stats = (
                pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv")
                .round(3)
                .sort_values(by=["Cluster"], ascending=True)
            )
            fig = px.sunburst(
                stats,
                path=["Cluster", "Attribute"],
                values="RS_Ratio",
                color_discrete_sequence=global_theme,
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            fig.layout.autosize = True
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats),
                    dp.Plot(fig),
                    rows=3,
                    label=remove_u_score(i.__name__),
                )
            )

        else:
            if label_col:
                stats = pd.read_csv(
                    ends_with(local_path) + str(i.__name__) + ".csv"
                ).round(3)
                col_nm = [x for x in list(stats.columns) if "attribute" not in x]
                stats = stats.sort_values(col_nm[0], ascending=True)
                fig = px.bar(
                    stats,
                    x=col_nm[0],
                    y="attribute",
                    orientation="h",
                    color_discrete_sequence=global_theme,
                )
                fig.layout.plot_bgcolor = global_plot_bg_color
                fig.layout.paper_bgcolor = global_paper_bg_color
                fig.layout.autosize = True
                AA_content.append(
                    dp.Group(
                        dp.Text("##"),
                        dp.DataTable(stats),
                        dp.Plot(fig),
                        label=remove_u_score(i.__name__),
                        rows=3,
                    )
                )

    # @TODO: is there better templating approach such as jinja
    tab3 = dp.Group(
        dp.Text("# "),
        dp.Text(
            """
            *This section analyzes the interaction between different attributes and/or the relationship
            between an attribute & the binary target variable.*
            """
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Association Matrix & Plot"),
        dp.Select(blocks=AA_content, type=dp.SelectType.DROPDOWN),
        dp.Text("### "),
        dp.Text("## "),
        dp.Text("## "),
        dp.Text("## "),
        label="Attribute Associations",
    )

    dp.Report(
        default_template[0],
        default_template[1],
        dp.Select(blocks=[tab1, tab2, tab3], type=dp.SelectType.TABS),
    ).save(ends_with(local_path) + "basic_report.html", open=True)

    if run_type == "emr":
        bash_cmd = (
            "aws s3 cp "
            + ends_with(local_path)
            + "basic_report.html "
            + ends_with(output_path)
        )
        subprocess.check_output(["bash", "-c", bash_cmd])

Functions

def anovos_basic_report(spark, idf, id_col='', label_col='', event_label='', output_path='.', run_type='local', print_impact=True)

Parameters

spark
Spark Session
idf
Input Dataframe
id_col
ID column (Default value = "")
label_col
Label/Target column (Default value = "")
event_label
Value of (positive) event (i.e label 1) (Default value = "")
output_path
File Path for saving metrics and basic report (Default value = ".")
run_type
"local", "emr" or "databricks" "emr" if the files are read from or written in AWS s3 "databricks" if the files are read from or written in dbfs in azure databricks (Default value = "local")
print_impact
True, False. This argument is to print out the data analyzer statistics.(Default value = False)
Expand source code
def anovos_basic_report(
    spark,
    idf,
    id_col="",
    label_col="",
    event_label="",
    output_path=".",
    run_type="local",
    print_impact=True,
):
    """

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe
    id_col
        ID column (Default value = "")
    label_col
        Label/Target column (Default value = "")
    event_label
        Value of (positive) event (i.e label 1) (Default value = "")
    output_path
        File Path for saving metrics and basic report (Default value = ".")
    run_type
        "local", "emr" or "databricks"
        "emr" if the files are read from or written in AWS s3
        "databricks" if the files are read from or written in dbfs in azure databricks (Default value = "local")
    print_impact
        True, False.
        This argument is to print out the data analyzer statistics.(Default value = False)

    """
    global num_cols
    global cat_cols

    SG_funcs = [
        global_summary,
        measures_of_counts,
        measures_of_centralTendency,
        measures_of_cardinality,
        measures_of_dispersion,
        measures_of_percentiles,
        measures_of_shape,
    ]
    QC_rows_funcs = [duplicate_detection, nullRows_detection]
    QC_cols_funcs = [
        nullColumns_detection,
        outlier_detection,
        IDness_detection,
        biasedness_detection,
        invalidEntries_detection,
    ]
    AA_funcs = [correlation_matrix, variable_clustering]
    AT_funcs = [IV_calculation, IG_calculation]
    all_funcs = SG_funcs + QC_rows_funcs + QC_cols_funcs + AA_funcs + AT_funcs

    def output_to_local(output_path):
        punctuations = ":"
        for x in output_path:
            if x in punctuations:
                local_path = output_path.replace(x, "")
                local_path = "/" + local_path
        return local_path

    if run_type == "local":
        local_path = output_path
    elif run_type == "databricks":
        local_path = output_to_local(output_path)
    elif run_type == "emr":
        local_path = "report_stats"
    else:
        raise ValueError("Invalid run_type")

    Path(local_path).mkdir(parents=True, exist_ok=True)

    for func in all_funcs:
        if func in SG_funcs:
            stats = func(spark, idf)
        elif func in (QC_rows_funcs + QC_cols_funcs):
            extra_args = stats_args(output_path, func.__name__)
            stats = func(spark, idf, **extra_args)[1]
        elif func in AA_funcs:
            extra_args = stats_args(output_path, func.__name__)
            stats = func(spark, idf, drop_cols=id_col, **extra_args)
        elif label_col:
            if func in AT_funcs:
                stats = func(spark, idf, label_col=label_col, event_label=event_label)
        else:
            continue

        stats.toPandas().to_csv(
            ends_with(local_path) + func.__name__ + ".csv", index=False
        )

        if run_type == "emr":
            bash_cmd = (
                "aws s3 cp "
                + ends_with(local_path)
                + func.__name__
                + ".csv "
                + ends_with(output_path)
            )
            subprocess.check_output(["bash", "-c", bash_cmd])

        if print_impact:
            print(func.__name__, ":\n")
            stats = spark.read.csv(
                ends_with(output_path) + func.__name__ + ".csv",
                header=True,
                inferSchema=True,
            )
            stats.show()

    def remove_u_score(col):
        col_ = col.split("_")
        bl = []

        for i in col_:
            if i == "nullColumns" or i == "nullRows":
                bl.append("Null")
            else:
                bl.append(i[0].upper() + i[1:])

        return " ".join(bl)

    global_summary_df = pd.read_csv(ends_with(local_path) + "global_summary.csv")
    rows_count = int(
        global_summary_df[global_summary_df.metric.values == "rows_count"].value.values[
            0
        ]
    )
    catcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "catcols_count"
        ].value.values[0]
    )
    numcols_count = int(
        global_summary_df[
            global_summary_df.metric.values == "numcols_count"
        ].value.values[0]
    )
    columns_count = int(
        global_summary_df[
            global_summary_df.metric.values == "columns_count"
        ].value.values[0]
    )
    catcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "catcols_name"
            ].value.values
        )
    )
    numcols_name = ",".join(
        list(
            global_summary_df[
                global_summary_df.metric.values == "numcols_name"
            ].value.values
        )
    )

    l1 = dp.Group(
        dp.Text("# "),
        dp.Text("*This section summarizes the dataset with key statistical metrics.*"),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Global Summary"),
        dp.Group(
            dp.Text(" Total Number of Records: **" + str(f"{rows_count:,d}") + "**"),
            dp.Text(" Total Number of Attributes: **" + str(columns_count) + "**"),
            dp.Text(" Number of Numerical Attributes : **" + str(numcols_count) + "**"),
            dp.Text(" Numerical Attributes Name : **" + str(numcols_name) + "**"),
            dp.Text(
                " Number of Categorical Attributes : **" + str(catcols_count) + "**"
            ),
            dp.Text(" Categorical Attributes Name : **" + str(catcols_name) + "**"),
            rows=6,
        ),
        rows=8,
    )

    l2 = dp.Text("### Statistics by Metric Type")

    SG_content = []
    for i in SG_funcs:
        if i.__name__ != "global_summary":
            SG_content.append(
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    ),
                    label=remove_u_score(i.__name__),
                )
            )
    l3 = dp.Group(dp.Select(blocks=SG_content, type=dp.SelectType.TABS), dp.Text("# "))

    tab1 = dp.Group(
        l1,
        dp.Text("# "),
        l2,
        l3,
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("# "),
        label="Descriptive Statistics",
    )

    QCcol_content = []
    for i in QC_cols_funcs:
        QCcol_content.append(
            [
                dp.Text("### " + str(remove_u_score(i.__name__))),
                dp.DataTable(
                    pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                        3
                    )
                ),
                dp.Text("#"),
                dp.Text("#"),
            ]
        )
    QCrow_content = []
    for i in QC_rows_funcs:
        if i.__name__ == "duplicate_detection":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            unique_rows_count = (
                " No. Of Unique Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "unique_rows_count"].value.values),
                        ",",
                    )
                )
                + "**"
            )
            total_rows_count = (
                " No. of Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "rows_count"].value.values), ","
                    )
                )
                + "**"
            )

            duplicate_rows_count = (
                " No. of Duplicate Rows: **"
                + str(
                    format(
                        int(stats[stats["metric"] == "duplicate_rows"].value.values),
                        ",",
                    )
                )
                + "**"
            )

            duplicate_rows_pct = (
                " Percentage of Duplicate Rows: **"
                + str(
                    float(
                        stats[stats["metric"] == "duplicate_pct"].value.values * 100.0
                    )
                )
                + " %"
                + "**"
            )

            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.Group(
                        dp.Text(total_rows_count),
                        dp.Text(unique_rows_count),
                        dp.Text(duplicate_rows_count),
                        dp.Text(duplicate_rows_pct),
                        rows=4,
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
        else:
            QCrow_content.append(
                [
                    dp.Text("### " + str(remove_u_score(i.__name__))),
                    dp.DataTable(
                        pd.read_csv(
                            ends_with(local_path) + str(i.__name__) + ".csv"
                        ).round(3)
                    ),
                    dp.Text("#"),
                    dp.Text("#"),
                ]
            )
    QCcol_content = [item for sublist in QCcol_content for item in sublist]
    QCrow_content = [item for sublist in QCrow_content for item in sublist]

    tab2 = dp.Group(
        dp.Text("# "),
        dp.Text(
            "*This section identifies the data quality issues at both row and column level.*"
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Select(
            blocks=[
                dp.Group(
                    dp.Text("# "),
                    dp.Group(*QCcol_content),
                    rows=2,
                    label="Column Level",
                ),
                dp.Group(
                    dp.Text("# "), dp.Group(*QCrow_content), rows=2, label="Row Level"
                ),
            ],
            type=dp.SelectType.TABS,
        ),
        dp.Text("# "),
        dp.Text("# "),
        label="Quality Check",
    )

    AA_content = []
    for i in AA_funcs + AT_funcs:
        if i.__name__ == "correlation_matrix":
            stats = pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv").round(
                3
            )
            feats_order = list(stats["attribute"].values)
            stats = stats.round(3)
            fig = px.imshow(
                stats[feats_order],
                y=feats_order,
                color_continuous_scale=global_theme,
                aspect="auto",
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats[["attribute"] + feats_order]),
                    dp.Plot(fig),
                    rows=3,
                    label=remove_u_score(i.__name__),
                )
            )

        elif i.__name__ == "variable_clustering":
            stats = (
                pd.read_csv(ends_with(local_path) + str(i.__name__) + ".csv")
                .round(3)
                .sort_values(by=["Cluster"], ascending=True)
            )
            fig = px.sunburst(
                stats,
                path=["Cluster", "Attribute"],
                values="RS_Ratio",
                color_discrete_sequence=global_theme,
            )
            fig.layout.plot_bgcolor = global_plot_bg_color
            fig.layout.paper_bgcolor = global_paper_bg_color
            fig.layout.autosize = True
            AA_content.append(
                dp.Group(
                    dp.Text("##"),
                    dp.DataTable(stats),
                    dp.Plot(fig),
                    rows=3,
                    label=remove_u_score(i.__name__),
                )
            )

        else:
            if label_col:
                stats = pd.read_csv(
                    ends_with(local_path) + str(i.__name__) + ".csv"
                ).round(3)
                col_nm = [x for x in list(stats.columns) if "attribute" not in x]
                stats = stats.sort_values(col_nm[0], ascending=True)
                fig = px.bar(
                    stats,
                    x=col_nm[0],
                    y="attribute",
                    orientation="h",
                    color_discrete_sequence=global_theme,
                )
                fig.layout.plot_bgcolor = global_plot_bg_color
                fig.layout.paper_bgcolor = global_paper_bg_color
                fig.layout.autosize = True
                AA_content.append(
                    dp.Group(
                        dp.Text("##"),
                        dp.DataTable(stats),
                        dp.Plot(fig),
                        label=remove_u_score(i.__name__),
                        rows=3,
                    )
                )

    # @TODO: is there better templating approach such as jinja
    tab3 = dp.Group(
        dp.Text("# "),
        dp.Text(
            """
            *This section analyzes the interaction between different attributes and/or the relationship
            between an attribute & the binary target variable.*
            """
        ),
        dp.Text("# "),
        dp.Text("# "),
        dp.Text("### Association Matrix & Plot"),
        dp.Select(blocks=AA_content, type=dp.SelectType.DROPDOWN),
        dp.Text("### "),
        dp.Text("## "),
        dp.Text("## "),
        dp.Text("## "),
        label="Attribute Associations",
    )

    dp.Report(
        default_template[0],
        default_template[1],
        dp.Select(blocks=[tab1, tab2, tab3], type=dp.SelectType.TABS),
    ).save(ends_with(local_path) + "basic_report.html", open=True)

    if run_type == "emr":
        bash_cmd = (
            "aws s3 cp "
            + ends_with(local_path)
            + "basic_report.html "
            + ends_with(output_path)
        )
        subprocess.check_output(["bash", "-c", bash_cmd])
def stats_args(path, func)

Parameters

path
Path to pre-saved statistics
func
Quality Checker function

Returns

Dictionary
Each key/value is argument (related to pre-saved statistics) to be passed for the quality checker function.
Expand source code
def stats_args(path, func):
    """

    Parameters
    ----------
    path
        Path to pre-saved statistics
    func
        Quality Checker function


    Returns
    -------
    Dictionary
        Each key/value is argument (related to pre-saved statistics) to be passed for the quality checker function.

    """
    output = {}
    mainfunc_to_args = {
        "biasedness_detection": ["stats_mode"],
        "IDness_detection": ["stats_unique"],
        "outlier_detection": ["stats_unique"],
        "correlation_matrix": ["stats_unique"],
        "nullColumns_detection": ["stats_unique", "stats_mode", "stats_missing"],
        "variable_clustering": ["stats_unique", "stats_mode"],
    }
    args_to_statsfunc = {
        "stats_unique": "measures_of_cardinality",
        "stats_mode": "measures_of_centralTendency",
        "stats_missing": "measures_of_counts",
    }

    for arg in mainfunc_to_args.get(func, []):
        output[arg] = {
            "file_path": (ends_with(path) + args_to_statsfunc[arg] + ".csv"),
            "file_type": "csv",
            "file_configs": {"header": True, "inferSchema": True},
        }

    return output