report_preprocessing
Expand source code
import subprocess import warnings from pathlib import Path import mlflow import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go import pyspark from loguru import logger from pyspark.sql import functions as F from pyspark.sql import types as T from pyspark.sql.window import Window from anovos.data_analyzer.stats_generator import uniqueCount_computation from anovos.data_ingest.data_ingest import read_dataset from anovos.data_transformer.transformers import ( attribute_binning, imputation_MMM, outlier_categories, ) from anovos.shared.utils import ( attributeType_segregation, ends_with, output_to_local, path_ak8s_modify, ) warnings.filterwarnings("ignore") global_theme = px.colors.sequential.Plasma global_theme_r = px.colors.sequential.Plasma_r global_plot_bg_color = "rgba(0,0,0,0)" global_paper_bg_color = "rgba(0,0,0,0)" num_cols = [] cat_cols = [] def save_stats( spark, idf, master_path, function_name, reread=False, run_type="local", mlflow_config=None, auth_key="NA", ): """ Parameters ---------- spark Spark Session idf input dataframe master_path Path to master folder under which all statistics will be saved in a csv file format. function_name Function Name for which statistics need to be saved. file name will be saved as csv reread option to reread. Default value is kept as False run_type local or emr or databricks or ak8s based on the mode of execution. Default value is kept as local mlflow_config MLflow configuration. If None, all MLflow features are disabled. auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" Returns ------- """ if run_type == "local": local_path = master_path elif run_type == "databricks": local_path = output_to_local(master_path) elif run_type in ("emr", "ak8s"): local_path = "report_stats" else: raise ValueError("Invalid run_type") local_path = ( local_path + "/" + mlflow_config["run_id"] if mlflow_config is not None and mlflow_config.get("track_reports", False) else local_path ) Path(local_path).mkdir(parents=True, exist_ok=True) idf.toPandas().to_csv(ends_with(local_path) + function_name + ".csv", index=False) if mlflow_config is not None: mlflow.log_artifact(local_path) if run_type == "emr": bash_cmd = ( "aws s3 cp " + ends_with(local_path) + function_name + ".csv " + ends_with(master_path) ) subprocess.check_output(["bash", "-c", bash_cmd]) if run_type == "ak8s": output_path_mod = path_ak8s_modify(master_path) bash_cmd = ( 'azcopy cp "' + ends_with(local_path) + function_name + '.csv" "' + ends_with(output_path_mod) + str(auth_key) + '"' ) subprocess.check_output(["bash", "-c", bash_cmd]) if reread: odf = spark.read.csv( ends_with(master_path) + function_name + ".csv", header=True, inferSchema=True, ) return odf def edit_binRange(col): """ Parameters ---------- col The column which is passed as input and needs to be treated. The generated output will not contain any range whose value at either side is the same. Returns ------- """ try: list_col = col.split("-") deduped_col = list(set(list_col)) if len(list_col) != len(deduped_col): return deduped_col[0] else: return col except Exception as e: logger.error(f"processing failed during edit_binRange, error {e}") pass f_edit_binRange = F.udf(edit_binRange, T.StringType()) def binRange_to_binIdx(spark, col, cutoffs_path): """ Parameters ---------- spark Spark Session col The input column which is needed to by mapped with respective index cutoffs_path paths containing the range cutoffs applicable for each index Returns ------- """ bin_cutoffs = ( spark.read.parquet(cutoffs_path) .where(F.col("attribute") == col) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) bin_ranges = [] max_cat = len(bin_cutoffs) + 1 for idx in range(0, max_cat): if idx == 0: bin_ranges.append("<= " + str(round(bin_cutoffs[idx], 4))) elif idx < (max_cat - 1): bin_ranges.append( str(round(bin_cutoffs[idx - 1], 4)) + "-" + str(round(bin_cutoffs[idx], 4)) ) else: bin_ranges.append("> " + str(round(bin_cutoffs[idx - 1], 4))) mapping = spark.createDataFrame( zip(range(1, max_cat + 1), bin_ranges), schema=["bin_idx", col] ) return mapping def plot_frequency(spark, idf, col, cutoffs_path): """ Parameters ---------- spark Spark Session idf Input dataframe which would be referred for producing the frequency charts in form of bar plots / histograms col Analysis column cutoffs_path Path containing the range cut offs details for the analysis column Returns ------- """ odf = ( idf.groupBy(col) .count() .withColumn( "count_%", 100 * (F.col("count") / F.sum("count").over(Window.partitionBy())), ) .withColumn(col, f_edit_binRange(col)) ) if col in cat_cols: odf_pd = odf.orderBy("count", ascending=False).toPandas().fillna("Missing") odf_pd.loc[odf_pd[col] == "others", col] = "others*" if col in num_cols: mapping = binRange_to_binIdx(spark, col, cutoffs_path) odf_pd = ( odf.join(mapping, col, "left_outer") .orderBy("bin_idx") .toPandas() .fillna("Missing") ) fig = px.bar( odf_pd, x=col, y="count", text=odf_pd["count_%"].apply(lambda x: "{0:1.2f}%".format(x)), color_discrete_sequence=global_theme, ) fig.update_traces(textposition="outside") fig.update_layout(title_text=str("Frequency Distribution for " + str(col.upper()))) fig.update_xaxes(type="category") # fig.update_layout(barmode='stack', xaxis={'categoryorder':'total descending'}) fig.layout.plot_bgcolor = global_plot_bg_color fig.layout.paper_bgcolor = global_paper_bg_color # plotly.offline.plot(fig, auto_open=False, validate=False, filename=f"{base_loc}/{file_name_}bar_graph.html") return fig def plot_outlier(spark, idf, col, split_var=None, sample_size=500000): """ Parameters ---------- spark Spark Session idf Input dataframe which would be referred for capturing the outliers in form of violin charts col Analysis column split_var Column which is needed. Default value is kept as None sample_size Maximum Sample size. Default value is kept as 500000 Returns ------- """ idf_sample = idf.select(col).sample( False, min(1.0, float(sample_size) / idf.count()), 0 ) idf_sample.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() idf_imputed = imputation_MMM(spark, idf_sample) idf_pd = idf_imputed.toPandas() fig = px.violin( idf_pd, y=col, color=split_var, box=True, points="outliers", color_discrete_sequence=[global_theme_r[8], global_theme_r[4]], ) fig.layout.plot_bgcolor = global_plot_bg_color fig.layout.paper_bgcolor = global_paper_bg_color fig.update_layout( legend=dict(orientation="h", x=0.5, yanchor="bottom", xanchor="center") ) return fig def plot_eventRate(spark, idf, col, label_col, event_label, cutoffs_path): """ Parameters ---------- spark Spark Session idf Input dataframe which would be referred for producing the frequency charts in form of bar plots / histogram col Analysis column label_col Label column event_label Event label cutoffs_path Path containing the range cut offs details for the analysis column Returns ------- """ odf = ( idf.withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col) .pivot(label_col) .count() .fillna(0, subset=["0", "1"]) .withColumn("event_rate", 100 * (F.col("1") / (F.col("0") + F.col("1")))) .withColumn("attribute_name", F.lit(col)) .withColumn(col, f_edit_binRange(col)) ) if col in cat_cols: odf_pd = odf.orderBy("event_rate", ascending=False).toPandas() odf_pd.loc[odf_pd[col] == "others", col] = "others*" if col in num_cols: mapping = binRange_to_binIdx(spark, col, cutoffs_path) odf_pd = odf.join(mapping, col, "left_outer").orderBy("bin_idx").toPandas() fig = px.bar( odf_pd, x=col, y="event_rate", text=odf_pd["event_rate"].apply(lambda x: "{0:1.2f}%".format(x)), color_discrete_sequence=global_theme, ) fig.update_traces(textposition="outside") fig.update_layout( title_text=str( "Event Rate Distribution for " + str(col.upper()) + str(" [Target Variable : " + str(event_label) + str("]")) ) ) fig.update_xaxes(type="category") fig.layout.plot_bgcolor = global_plot_bg_color fig.layout.paper_bgcolor = global_paper_bg_color # plotly.offline.plot(fig, auto_open=False, validate=False, filename=f"{base_loc}/{file_name_}feat_analysis_label.html") return fig def plot_comparative_drift(spark, idf, source, col, cutoffs_path): """ Parameters ---------- spark Spark Session idf Target dataframe which would be referred for producing the frequency charts in form of bar plots / histogram source Source dataframe of comparison col Analysis column cutoffs_path Path containing the range cut offs details for the analysis column Returns ------- """ odf = ( idf.groupBy(col) .agg((F.count(col) / idf.count()).alias("countpct_target")) .fillna(np.nan, subset=[col]) ) if col in cat_cols: odf_pd = ( odf.join( source.withColumnRenamed("p", "countpct_source").fillna( np.nan, subset=[col] ), col, "full_outer", ) .orderBy("countpct_target", ascending=False) .toPandas() ) if col in num_cols: mapping = binRange_to_binIdx(spark, col, cutoffs_path) odf_pd = ( odf.join(mapping, col, "left_outer") .fillna(np.nan, subset=["bin_idx"]) .join( source.fillna(np.nan, subset=[col]).select( F.col(col).alias("bin_idx"), F.col("p").alias("countpct_source") ), "bin_idx", "full_outer", ) .orderBy("bin_idx") .toPandas() ) odf_pd.fillna( {col: "Missing", "countpct_source": 0, "countpct_target": 0}, inplace=True ) odf_pd["%_diff"] = ( (odf_pd["countpct_target"] / odf_pd["countpct_source"]) - 1 ) * 100 fig = go.Figure() fig.add_bar( y=list(odf_pd.countpct_source.values), x=odf_pd[col], name="source", marker=dict(color=global_theme), ) fig.update_traces(overwrite=True, marker={"opacity": 0.7}) fig.add_bar( y=list(odf_pd.countpct_target.values), x=odf_pd[col], name="target", text=odf_pd["%_diff"].apply(lambda x: "{0:0.2f}%".format(x)), marker=dict(color=global_theme), ) fig.update_traces(textposition="outside") fig.update_layout( paper_bgcolor=global_paper_bg_color, plot_bgcolor=global_plot_bg_color, showlegend=False, ) fig.update_layout( title_text=str( "Drift Comparison for " + col + "<br><sup>(L->R : Source->Target)</sup>" ) ) fig.update_traces(marker=dict(color=global_theme)) fig.update_xaxes(type="category") # fig.add_trace(go.Scatter(x=odf_pd[col], y=odf_pd.countpct_target.values, mode='lines+markers', # line=dict(color=px.colors.qualitative.Antique[10], width=3, dash='dot'))) fig.update_layout( xaxis_tickfont_size=14, yaxis=dict(title="frequency", titlefont_size=16, tickfont_size=14), ) return fig def charts_to_objects( spark, idf, list_of_cols="all", drop_cols=[], label_col=None, event_label=1, bin_method="equal_range", bin_size=10, coverage=1.0, drift_detector=False, outlier_charts=False, source_path="NA", master_path=".", stats_unique={}, run_type="local", auth_key="NA", ): """ Parameters ---------- spark Spark Session idf Input dataframe list_of_cols List of columns passed for analysis (Default value = "all") drop_cols List of columns dropped from analysis (Default value = []) label_col Label column (Default value = None) event_label Event label (Default value = 1) bin_method Binning method equal_range or equal_frequency (Default value = "equal_range") bin_size Maximum bin size categories. Default value is kept as 10 coverage Maximum coverage of categories. Default value is kept as 1.0 (which is 100%) drift_detector True or False as per the availability. Default value is kept as False source_path Source data path. Default value is kept as "NA" to save intermediate data in "intermediate_data/" folder. master_path Path where the output needs to be saved, ideally the same path where the analyzed data output is also saved (Default value = ".") stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) run_type local or emr or databricks or ak8s run type. Default value is kept as local auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" Returns ------- """ global num_cols global cat_cols if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if stats_unique == {}: remove_cols = ( uniqueCount_computation(spark, idf, list_of_cols) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: remove_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + remove_cols)]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) if cat_cols: idf_cleaned = outlier_categories( spark, idf, list_of_cols=cat_cols, coverage=coverage, max_category=bin_size ) else: idf_cleaned = idf if source_path == "NA": source_path = "intermediate_data" if drift_detector: encoding_model_exists = True binned_cols = ( spark.read.parquet(source_path + "/drift_statistics/attribute_binning") .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) to_be_binned = [e for e in num_cols if e not in binned_cols] else: encoding_model_exists = False binned_cols = [] to_be_binned = num_cols if to_be_binned: idf_encoded = attribute_binning( spark, idf_cleaned, list_of_cols=to_be_binned, method_type=bin_method, bin_size=bin_size, bin_dtype="categorical", pre_existing_model=False, model_path=source_path + "/charts_to_objects", output_mode="append", ) else: idf_encoded = idf_cleaned if binned_cols: idf_encoded = attribute_binning( spark, idf_encoded, list_of_cols=binned_cols, method_type=bin_method, bin_size=bin_size, bin_dtype="categorical", pre_existing_model=True, model_path=source_path + "/drift_statistics", output_mode="append", ) cutoffs_path1 = source_path + "/charts_to_objects/attribute_binning" cutoffs_path2 = source_path + "/drift_statistics/attribute_binning" idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK) if run_type == "local": local_path = master_path elif run_type == "databricks": local_path = output_to_local(master_path) elif run_type in ("emr", "ak8s"): local_path = "report_stats" else: raise ValueError("Invalid run_type") Path(local_path).mkdir(parents=True, exist_ok=True) for idx, col in enumerate(list_of_cols): if col in binned_cols: cutoffs_path = cutoffs_path2 else: cutoffs_path = cutoffs_path1 if col in cat_cols: f = plot_frequency(spark, idf_encoded, col, cutoffs_path) f.write_json(ends_with(local_path) + "freqDist_" + col) if label_col: if col != label_col: f = plot_eventRate( spark, idf_encoded, col, label_col, event_label, cutoffs_path ) f.write_json(ends_with(local_path) + "eventDist_" + col) if drift_detector: try: frequency_path = ( source_path + "/drift_statistics/frequency_counts/" + col ) idf_source = spark.read.csv( frequency_path, header=True, inferSchema=True ) f = plot_comparative_drift( spark, idf_encoded, idf_source, col, cutoffs_path ) f.write_json(ends_with(local_path) + "drift_" + col) except Exception as e: logger.error(f"processing failed during drift detection, error {e}") pass if col in num_cols: if outlier_charts: f = plot_outlier(spark, idf, col, split_var=None) f.write_json(ends_with(local_path) + "outlier_" + col) f = plot_frequency( spark, idf_encoded.drop(col).withColumnRenamed(col + "_binned", col), col, cutoffs_path, ) f.write_json(ends_with(local_path) + "freqDist_" + col) if label_col: if col != label_col: f = plot_eventRate( spark, idf_encoded.drop(col).withColumnRenamed(col + "_binned", col), col, label_col, event_label, cutoffs_path, ) f.write_json(ends_with(local_path) + "eventDist_" + col) if drift_detector: try: frequency_path = ( source_path + "/drift_statistics/frequency_counts/" + col ) idf_source = spark.read.csv( frequency_path, header=True, inferSchema=True ) f = plot_comparative_drift( spark, idf_encoded.drop(col).withColumnRenamed(col + "_binned", col), idf_source, col, cutoffs_path, ) f.write_json(ends_with(local_path) + "drift_" + col) except Exception as e: logger.error(f"processing failed during drift detection, error {e}") pass pd.DataFrame(idf.dtypes, columns=["attribute", "data_type"]).to_csv( ends_with(local_path) + "data_type.csv", index=False ) if run_type == "emr": bash_cmd = ( "aws s3 cp --recursive " + ends_with(local_path) + " " + ends_with(master_path) ) subprocess.check_output(["bash", "-c", bash_cmd]) if run_type == "ak8s": output_path_mod = path_ak8s_modify(master_path) bash_cmd = ( 'azcopy cp "' + ends_with(local_path) + '" "' + ends_with(output_path_mod) + str(auth_key) + '" --recursive=true' ) subprocess.check_output(["bash", "-c", bash_cmd])
Functions
def binRange_to_binIdx(spark, col, cutoffs_path)
-
Parameters
spark
- Spark Session
col
- The input column which is needed to by mapped with respective index
cutoffs_path
- paths containing the range cutoffs applicable for each index
Returns
Expand source code
def binRange_to_binIdx(spark, col, cutoffs_path): """ Parameters ---------- spark Spark Session col The input column which is needed to by mapped with respective index cutoffs_path paths containing the range cutoffs applicable for each index Returns ------- """ bin_cutoffs = ( spark.read.parquet(cutoffs_path) .where(F.col("attribute") == col) .select("parameters") .rdd.flatMap(lambda x: x) .collect()[0] ) bin_ranges = [] max_cat = len(bin_cutoffs) + 1 for idx in range(0, max_cat): if idx == 0: bin_ranges.append("<= " + str(round(bin_cutoffs[idx], 4))) elif idx < (max_cat - 1): bin_ranges.append( str(round(bin_cutoffs[idx - 1], 4)) + "-" + str(round(bin_cutoffs[idx], 4)) ) else: bin_ranges.append("> " + str(round(bin_cutoffs[idx - 1], 4))) mapping = spark.createDataFrame( zip(range(1, max_cat + 1), bin_ranges), schema=["bin_idx", col] ) return mapping
def charts_to_objects(spark, idf, list_of_cols='all', drop_cols=[], label_col=None, event_label=1, bin_method='equal_range', bin_size=10, coverage=1.0, drift_detector=False, outlier_charts=False, source_path='NA', master_path='.', stats_unique={}, run_type='local', auth_key='NA')
-
Parameters
spark
- Spark Session
idf
- Input dataframe
list_of_cols
- List of columns passed for analysis (Default value = "all")
drop_cols
- List of columns dropped from analysis (Default value = [])
label_col
- Label column (Default value = None)
event_label
- Event label (Default value = 1)
bin_method
- Binning method equal_range or equal_frequency (Default value = "equal_range")
bin_size
- Maximum bin size categories. Default value is kept as 10
coverage
- Maximum coverage of categories. Default value is kept as 1.0 (which is 100%)
drift_detector
- True or False as per the availability. Default value is kept as False
source_path
- Source data path. Default value is kept as "NA" to save intermediate data in "intermediate_data/" folder.
master_path
- Path where the output needs to be saved, ideally the same path where the analyzed data output is also saved (Default value = ".")
stats_unique
- Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {})
run_type
- local or emr or databricks or ak8s run type. Default value is kept as local
auth_key
- Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA"
Returns
Expand source code
def charts_to_objects( spark, idf, list_of_cols="all", drop_cols=[], label_col=None, event_label=1, bin_method="equal_range", bin_size=10, coverage=1.0, drift_detector=False, outlier_charts=False, source_path="NA", master_path=".", stats_unique={}, run_type="local", auth_key="NA", ): """ Parameters ---------- spark Spark Session idf Input dataframe list_of_cols List of columns passed for analysis (Default value = "all") drop_cols List of columns dropped from analysis (Default value = []) label_col Label column (Default value = None) event_label Event label (Default value = 1) bin_method Binning method equal_range or equal_frequency (Default value = "equal_range") bin_size Maximum bin size categories. Default value is kept as 10 coverage Maximum coverage of categories. Default value is kept as 1.0 (which is 100%) drift_detector True or False as per the availability. Default value is kept as False source_path Source data path. Default value is kept as "NA" to save intermediate data in "intermediate_data/" folder. master_path Path where the output needs to be saved, ideally the same path where the analyzed data output is also saved (Default value = ".") stats_unique Takes arguments for read_dataset (data_ingest module) function in a dictionary format to read pre-saved statistics on unique value count i.e. if measures_of_cardinality or uniqueCount_computation (data_analyzer.stats_generator module) has been computed & saved before. (Default value = {}) run_type local or emr or databricks or ak8s run type. Default value is kept as local auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" Returns ------- """ global num_cols global cat_cols if list_of_cols == "all": num_cols, cat_cols, other_cols = attributeType_segregation(idf) list_of_cols = num_cols + cat_cols if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] if stats_unique == {}: remove_cols = ( uniqueCount_computation(spark, idf, list_of_cols) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) else: remove_cols = ( read_dataset(spark, **stats_unique) .where(F.col("unique_values") < 2) .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) list_of_cols = list( set([e for e in list_of_cols if e not in (drop_cols + remove_cols)]) ) if any(x not in idf.columns for x in list_of_cols) | (len(list_of_cols) == 0): raise TypeError("Invalid input for Column(s)") num_cols, cat_cols, other_cols = attributeType_segregation(idf.select(list_of_cols)) if cat_cols: idf_cleaned = outlier_categories( spark, idf, list_of_cols=cat_cols, coverage=coverage, max_category=bin_size ) else: idf_cleaned = idf if source_path == "NA": source_path = "intermediate_data" if drift_detector: encoding_model_exists = True binned_cols = ( spark.read.parquet(source_path + "/drift_statistics/attribute_binning") .select("attribute") .rdd.flatMap(lambda x: x) .collect() ) to_be_binned = [e for e in num_cols if e not in binned_cols] else: encoding_model_exists = False binned_cols = [] to_be_binned = num_cols if to_be_binned: idf_encoded = attribute_binning( spark, idf_cleaned, list_of_cols=to_be_binned, method_type=bin_method, bin_size=bin_size, bin_dtype="categorical", pre_existing_model=False, model_path=source_path + "/charts_to_objects", output_mode="append", ) else: idf_encoded = idf_cleaned if binned_cols: idf_encoded = attribute_binning( spark, idf_encoded, list_of_cols=binned_cols, method_type=bin_method, bin_size=bin_size, bin_dtype="categorical", pre_existing_model=True, model_path=source_path + "/drift_statistics", output_mode="append", ) cutoffs_path1 = source_path + "/charts_to_objects/attribute_binning" cutoffs_path2 = source_path + "/drift_statistics/attribute_binning" idf_encoded.persist(pyspark.StorageLevel.MEMORY_AND_DISK) if run_type == "local": local_path = master_path elif run_type == "databricks": local_path = output_to_local(master_path) elif run_type in ("emr", "ak8s"): local_path = "report_stats" else: raise ValueError("Invalid run_type") Path(local_path).mkdir(parents=True, exist_ok=True) for idx, col in enumerate(list_of_cols): if col in binned_cols: cutoffs_path = cutoffs_path2 else: cutoffs_path = cutoffs_path1 if col in cat_cols: f = plot_frequency(spark, idf_encoded, col, cutoffs_path) f.write_json(ends_with(local_path) + "freqDist_" + col) if label_col: if col != label_col: f = plot_eventRate( spark, idf_encoded, col, label_col, event_label, cutoffs_path ) f.write_json(ends_with(local_path) + "eventDist_" + col) if drift_detector: try: frequency_path = ( source_path + "/drift_statistics/frequency_counts/" + col ) idf_source = spark.read.csv( frequency_path, header=True, inferSchema=True ) f = plot_comparative_drift( spark, idf_encoded, idf_source, col, cutoffs_path ) f.write_json(ends_with(local_path) + "drift_" + col) except Exception as e: logger.error(f"processing failed during drift detection, error {e}") pass if col in num_cols: if outlier_charts: f = plot_outlier(spark, idf, col, split_var=None) f.write_json(ends_with(local_path) + "outlier_" + col) f = plot_frequency( spark, idf_encoded.drop(col).withColumnRenamed(col + "_binned", col), col, cutoffs_path, ) f.write_json(ends_with(local_path) + "freqDist_" + col) if label_col: if col != label_col: f = plot_eventRate( spark, idf_encoded.drop(col).withColumnRenamed(col + "_binned", col), col, label_col, event_label, cutoffs_path, ) f.write_json(ends_with(local_path) + "eventDist_" + col) if drift_detector: try: frequency_path = ( source_path + "/drift_statistics/frequency_counts/" + col ) idf_source = spark.read.csv( frequency_path, header=True, inferSchema=True ) f = plot_comparative_drift( spark, idf_encoded.drop(col).withColumnRenamed(col + "_binned", col), idf_source, col, cutoffs_path, ) f.write_json(ends_with(local_path) + "drift_" + col) except Exception as e: logger.error(f"processing failed during drift detection, error {e}") pass pd.DataFrame(idf.dtypes, columns=["attribute", "data_type"]).to_csv( ends_with(local_path) + "data_type.csv", index=False ) if run_type == "emr": bash_cmd = ( "aws s3 cp --recursive " + ends_with(local_path) + " " + ends_with(master_path) ) subprocess.check_output(["bash", "-c", bash_cmd]) if run_type == "ak8s": output_path_mod = path_ak8s_modify(master_path) bash_cmd = ( 'azcopy cp "' + ends_with(local_path) + '" "' + ends_with(output_path_mod) + str(auth_key) + '" --recursive=true' ) subprocess.check_output(["bash", "-c", bash_cmd])
def edit_binRange(col)
-
Parameters
col
- The column which is passed as input and needs to be treated. The generated output will not contain any range whose value at either side is the same.
Returns
Expand source code
def edit_binRange(col): """ Parameters ---------- col The column which is passed as input and needs to be treated. The generated output will not contain any range whose value at either side is the same. Returns ------- """ try: list_col = col.split("-") deduped_col = list(set(list_col)) if len(list_col) != len(deduped_col): return deduped_col[0] else: return col except Exception as e: logger.error(f"processing failed during edit_binRange, error {e}") pass
def f_edit_binRange(col)
-
Parameters
col
- The column which is passed as input and needs to be treated. The generated output will not contain any range whose value at either side is the same.
Returns
Expand source code
def edit_binRange(col): """ Parameters ---------- col The column which is passed as input and needs to be treated. The generated output will not contain any range whose value at either side is the same. Returns ------- """ try: list_col = col.split("-") deduped_col = list(set(list_col)) if len(list_col) != len(deduped_col): return deduped_col[0] else: return col except Exception as e: logger.error(f"processing failed during edit_binRange, error {e}") pass
def plot_comparative_drift(spark, idf, source, col, cutoffs_path)
-
Parameters
spark
- Spark Session
idf
- Target dataframe which would be referred for producing the frequency charts in form of bar plots / histogram
source
- Source dataframe of comparison
col
- Analysis column
cutoffs_path
- Path containing the range cut offs details for the analysis column
Returns
Expand source code
def plot_comparative_drift(spark, idf, source, col, cutoffs_path): """ Parameters ---------- spark Spark Session idf Target dataframe which would be referred for producing the frequency charts in form of bar plots / histogram source Source dataframe of comparison col Analysis column cutoffs_path Path containing the range cut offs details for the analysis column Returns ------- """ odf = ( idf.groupBy(col) .agg((F.count(col) / idf.count()).alias("countpct_target")) .fillna(np.nan, subset=[col]) ) if col in cat_cols: odf_pd = ( odf.join( source.withColumnRenamed("p", "countpct_source").fillna( np.nan, subset=[col] ), col, "full_outer", ) .orderBy("countpct_target", ascending=False) .toPandas() ) if col in num_cols: mapping = binRange_to_binIdx(spark, col, cutoffs_path) odf_pd = ( odf.join(mapping, col, "left_outer") .fillna(np.nan, subset=["bin_idx"]) .join( source.fillna(np.nan, subset=[col]).select( F.col(col).alias("bin_idx"), F.col("p").alias("countpct_source") ), "bin_idx", "full_outer", ) .orderBy("bin_idx") .toPandas() ) odf_pd.fillna( {col: "Missing", "countpct_source": 0, "countpct_target": 0}, inplace=True ) odf_pd["%_diff"] = ( (odf_pd["countpct_target"] / odf_pd["countpct_source"]) - 1 ) * 100 fig = go.Figure() fig.add_bar( y=list(odf_pd.countpct_source.values), x=odf_pd[col], name="source", marker=dict(color=global_theme), ) fig.update_traces(overwrite=True, marker={"opacity": 0.7}) fig.add_bar( y=list(odf_pd.countpct_target.values), x=odf_pd[col], name="target", text=odf_pd["%_diff"].apply(lambda x: "{0:0.2f}%".format(x)), marker=dict(color=global_theme), ) fig.update_traces(textposition="outside") fig.update_layout( paper_bgcolor=global_paper_bg_color, plot_bgcolor=global_plot_bg_color, showlegend=False, ) fig.update_layout( title_text=str( "Drift Comparison for " + col + "<br><sup>(L->R : Source->Target)</sup>" ) ) fig.update_traces(marker=dict(color=global_theme)) fig.update_xaxes(type="category") # fig.add_trace(go.Scatter(x=odf_pd[col], y=odf_pd.countpct_target.values, mode='lines+markers', # line=dict(color=px.colors.qualitative.Antique[10], width=3, dash='dot'))) fig.update_layout( xaxis_tickfont_size=14, yaxis=dict(title="frequency", titlefont_size=16, tickfont_size=14), ) return fig
def plot_eventRate(spark, idf, col, label_col, event_label, cutoffs_path)
-
Parameters
spark
- Spark Session
idf
- Input dataframe which would be referred for producing the frequency charts in form of bar plots / histogram
col
- Analysis column
label_col
- Label column
event_label
- Event label
cutoffs_path
- Path containing the range cut offs details for the analysis column
Returns
Expand source code
def plot_eventRate(spark, idf, col, label_col, event_label, cutoffs_path): """ Parameters ---------- spark Spark Session idf Input dataframe which would be referred for producing the frequency charts in form of bar plots / histogram col Analysis column label_col Label column event_label Event label cutoffs_path Path containing the range cut offs details for the analysis column Returns ------- """ odf = ( idf.withColumn( label_col, F.when(F.col(label_col) == event_label, 1).otherwise(0) ) .groupBy(col) .pivot(label_col) .count() .fillna(0, subset=["0", "1"]) .withColumn("event_rate", 100 * (F.col("1") / (F.col("0") + F.col("1")))) .withColumn("attribute_name", F.lit(col)) .withColumn(col, f_edit_binRange(col)) ) if col in cat_cols: odf_pd = odf.orderBy("event_rate", ascending=False).toPandas() odf_pd.loc[odf_pd[col] == "others", col] = "others*" if col in num_cols: mapping = binRange_to_binIdx(spark, col, cutoffs_path) odf_pd = odf.join(mapping, col, "left_outer").orderBy("bin_idx").toPandas() fig = px.bar( odf_pd, x=col, y="event_rate", text=odf_pd["event_rate"].apply(lambda x: "{0:1.2f}%".format(x)), color_discrete_sequence=global_theme, ) fig.update_traces(textposition="outside") fig.update_layout( title_text=str( "Event Rate Distribution for " + str(col.upper()) + str(" [Target Variable : " + str(event_label) + str("]")) ) ) fig.update_xaxes(type="category") fig.layout.plot_bgcolor = global_plot_bg_color fig.layout.paper_bgcolor = global_paper_bg_color # plotly.offline.plot(fig, auto_open=False, validate=False, filename=f"{base_loc}/{file_name_}feat_analysis_label.html") return fig
def plot_frequency(spark, idf, col, cutoffs_path)
-
Parameters
spark
- Spark Session
idf
- Input dataframe which would be referred for producing the frequency charts in form of bar plots / histograms
col
- Analysis column
cutoffs_path
- Path containing the range cut offs details for the analysis column
Returns
Expand source code
def plot_frequency(spark, idf, col, cutoffs_path): """ Parameters ---------- spark Spark Session idf Input dataframe which would be referred for producing the frequency charts in form of bar plots / histograms col Analysis column cutoffs_path Path containing the range cut offs details for the analysis column Returns ------- """ odf = ( idf.groupBy(col) .count() .withColumn( "count_%", 100 * (F.col("count") / F.sum("count").over(Window.partitionBy())), ) .withColumn(col, f_edit_binRange(col)) ) if col in cat_cols: odf_pd = odf.orderBy("count", ascending=False).toPandas().fillna("Missing") odf_pd.loc[odf_pd[col] == "others", col] = "others*" if col in num_cols: mapping = binRange_to_binIdx(spark, col, cutoffs_path) odf_pd = ( odf.join(mapping, col, "left_outer") .orderBy("bin_idx") .toPandas() .fillna("Missing") ) fig = px.bar( odf_pd, x=col, y="count", text=odf_pd["count_%"].apply(lambda x: "{0:1.2f}%".format(x)), color_discrete_sequence=global_theme, ) fig.update_traces(textposition="outside") fig.update_layout(title_text=str("Frequency Distribution for " + str(col.upper()))) fig.update_xaxes(type="category") # fig.update_layout(barmode='stack', xaxis={'categoryorder':'total descending'}) fig.layout.plot_bgcolor = global_plot_bg_color fig.layout.paper_bgcolor = global_paper_bg_color # plotly.offline.plot(fig, auto_open=False, validate=False, filename=f"{base_loc}/{file_name_}bar_graph.html") return fig
def plot_outlier(spark, idf, col, split_var=None, sample_size=500000)
-
Parameters
spark
- Spark Session
idf
- Input dataframe which would be referred for capturing the outliers in form of violin charts
col
- Analysis column
split_var
- Column which is needed. Default value is kept as None
sample_size
- Maximum Sample size. Default value is kept as 500000
Returns
Expand source code
def plot_outlier(spark, idf, col, split_var=None, sample_size=500000): """ Parameters ---------- spark Spark Session idf Input dataframe which would be referred for capturing the outliers in form of violin charts col Analysis column split_var Column which is needed. Default value is kept as None sample_size Maximum Sample size. Default value is kept as 500000 Returns ------- """ idf_sample = idf.select(col).sample( False, min(1.0, float(sample_size) / idf.count()), 0 ) idf_sample.persist(pyspark.StorageLevel.MEMORY_AND_DISK).count() idf_imputed = imputation_MMM(spark, idf_sample) idf_pd = idf_imputed.toPandas() fig = px.violin( idf_pd, y=col, color=split_var, box=True, points="outliers", color_discrete_sequence=[global_theme_r[8], global_theme_r[4]], ) fig.layout.plot_bgcolor = global_plot_bg_color fig.layout.paper_bgcolor = global_paper_bg_color fig.update_layout( legend=dict(orientation="h", x=0.5, yanchor="bottom", xanchor="center") ) return fig
def save_stats(spark, idf, master_path, function_name, reread=False, run_type='local', mlflow_config=None, auth_key='NA')
-
Parameters
spark
- Spark Session
idf
- input dataframe
master_path
- Path to master folder under which all statistics will be saved in a csv file format.
function_name
- Function Name for which statistics need to be saved. file name will be saved as csv
reread
- option to reread. Default value is kept as False
run_type
- local or emr or databricks or ak8s based on the mode of execution. Default value is kept as local
mlflow_config
- MLflow configuration. If None, all MLflow features are disabled.
auth_key
- Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA"
Returns
Expand source code
def save_stats( spark, idf, master_path, function_name, reread=False, run_type="local", mlflow_config=None, auth_key="NA", ): """ Parameters ---------- spark Spark Session idf input dataframe master_path Path to master folder under which all statistics will be saved in a csv file format. function_name Function Name for which statistics need to be saved. file name will be saved as csv reread option to reread. Default value is kept as False run_type local or emr or databricks or ak8s based on the mode of execution. Default value is kept as local mlflow_config MLflow configuration. If None, all MLflow features are disabled. auth_key Option to pass an authorization key to write to filesystems. Currently applicable only for ak8s run_type. Default value is kept as "NA" Returns ------- """ if run_type == "local": local_path = master_path elif run_type == "databricks": local_path = output_to_local(master_path) elif run_type in ("emr", "ak8s"): local_path = "report_stats" else: raise ValueError("Invalid run_type") local_path = ( local_path + "/" + mlflow_config["run_id"] if mlflow_config is not None and mlflow_config.get("track_reports", False) else local_path ) Path(local_path).mkdir(parents=True, exist_ok=True) idf.toPandas().to_csv(ends_with(local_path) + function_name + ".csv", index=False) if mlflow_config is not None: mlflow.log_artifact(local_path) if run_type == "emr": bash_cmd = ( "aws s3 cp " + ends_with(local_path) + function_name + ".csv " + ends_with(master_path) ) subprocess.check_output(["bash", "-c", bash_cmd]) if run_type == "ak8s": output_path_mod = path_ak8s_modify(master_path) bash_cmd = ( 'azcopy cp "' + ends_with(local_path) + function_name + '.csv" "' + ends_with(output_path_mod) + str(auth_key) + '"' ) subprocess.check_output(["bash", "-c", bash_cmd]) if reread: odf = spark.read.csv( ends_with(master_path) + function_name + ".csv", header=True, inferSchema=True, ) return odf