# drift_detector

Expand source code
import sys
import operator
import functools
import pyspark
from loguru import logger
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

from anovos.data_ingest.data_sampling import data_sample
from anovos.data_transformer.transformers import attribute_binning
from anovos.shared.utils import attributeType_segregation
from .validations import check_distance_method, check_list_of_columns

@check_distance_method
@check_list_of_columns
def statistics(
spark,
idf_target,
idf_source,
list_of_cols="all",
drop_cols=None,
method_type="PSI",
bin_method="equal_range",
bin_size=10,
threshold=0.1,
use_sampling=True,
sample_method="random",
strata_cols="all",
stratified_type="population",
sample_size=100000,
sample_seed=42,
persist=True,
persist_option=pyspark.StorageLevel.MEMORY_AND_DISK,
pre_existing_source=False,
source_save=True,
source_path="NA",
model_directory="drift_statistics",
print_impact=False,
):
"""
When the performance of a deployed machine learning model degrades in production, one potential reason is that
the data used in training and prediction are not following the same distribution.

Data drift mainly includes the following manifestations:

- Covariate shift: training and test data follow different distributions. For example, An algorithm predicting
income that is trained on younger population but tested on older population.
- Prior probability shift: change of prior probability. For example in a spam classification problem,
the proportion of spam emails changes from 0.2
in training data to 0.6 in testing data.
- Concept shift: the distribution of the target variable changes given fixed input values. For example in
the same spam classification problem, emails tagged as spam in training data are more likely to be tagged
as non-spam in testing data.

In our module, we mainly focus on covariate shift detection.

In summary, given 2 datasets, source and target datasets, we would like to quantify the drift of some numerical
attributes from source to target datasets. The whole process can be broken down into 2 steps: (1) convert each
attribute of interest in source and target datasets into source and target probability distributions. (2)
calculate the statistical distance between source and target distributions for each attribute.

In the first step, attribute_binning is firstly performed to bin the numerical attributes of the source dataset,
which requires two input variables: bin_method and bin_size. The same binning method is applied on the target
dataset to align two results. The probability distributions are computed by dividing the frequency of each bin by
the total frequency.

In the second step, 4 choices of statistical metrics are provided to measure the data drift of an attribute from
source to target distribution: Population Stability Index (PSI), Jensen-Shannon Divergence (JSD),
Hellinger Distance (HD) and Kolmogorov-Smirnov Distance (KS).

They are calculated as below:
For two discrete probability distributions *P=(p_1,…,p_k)* and *Q=(q_1,…,q_k),*

![https://raw.githubusercontent.com/anovos/anovos-docs/main/docs/assets/drift_stats_formulae.png](https://raw.githubusercontent.com/anovos/anovos-docs/main/docs/assets/drift_stats_formulae.png)

A threshold can be set to flag out drifted attributes. If multiple statistical metrics have been calculated,
an attribute will be marked as drifted if any of its statistical metric is larger than the threshold.

This function can be used in many scenarios. For example:

1. Attribute level data drift can be analysed together with the attribute importance of a machine learning model.
The more important an attribute is, the more attention it needs to be given if drift presents.
2. To analyse data drift over time, one can treat one dataset as the source / baseline dataset and multiple
datasets as the target datasets. Drift analysis can be performed between the source dataset and each of the
target dataset to quantify the drift over time.

Parameters
----------
spark
Spark Session
idf_target
Input Dataframe
idf_source
Baseline/Source Dataframe. This argument is ignored if pre_existing_source is True.
list_of_cols
List of columns to check drift e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all (non-array) columns for analysis.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2". (Default value = None)
method_type
"PSI", "JSD", "HD", "KS","all".
"all" can be passed to calculate all drift metrics.
One or more methods can be passed in a form of list or string where different metrics are separated
by pipe delimiter “|” e.g. ["PSI", "JSD"] or "PSI|JSD". (Default value = "PSI")
bin_method
String argument - "equal_frequency" or "equal_range".
In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin
has equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
bin_size
Integer argument - Number of bins for creating histogram. (Default value = 10)
threshold
Float argument - A column is flagged if any drift metric is above the threshold. (Default value = 0.1)
use_sampling
Boolean argument - True or False. This argument is used to determine whether to use random sample method on
source and target dataset, True will enable the use of sample method, otherwise False.
It is recommended to set this as True for large datasets. (Default value = True)
sample_method
String argument - "random" or "stratified".
If use_sampling is True, this argument is used to determine the sampling method.
"stratified" for Stratified sampling, "random" for Random Sampling.
For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html.
(Default value = "random")
strata_cols
If use_sampling is True and sample_method is "stratified", this argument is used to determine the list
of columns used to be treated as strata. For more details, please refer to
https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "all")
stratified_type
String argument - "population" or "balanced". If use_sampling is True and sample_method is "stratified",
this string argument is used to determine the stratified sampling method. "population" stands for
Proportionate Stratified Sampling, "balanced" stands for Optimum Stratified Sampling.
For more details, please refer to
https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "population")
sample_size
Integer argument - If use_sampling is True, this argument is used to determine the sample size of sampling method.
(Default value = 100000)
sample_seed
Integer argument - If use_sampling is True, this argument is used to determine the seed of sampling method.
(Default value = 42)
persist
Boolean argument - True or False. This argument is used to determine whether to persist on
binning result of source and target dataset, True will enable the use of persist, otherwise False.
It is recommended to set this as True for large datasets. (Default value = True)
persist_option
If persist is True, this argument is used to determine the type of persist.
(Default value = pyspark.StorageLevel.MEMORY_AND_DISK)
pre_existing_source
Boolean argument – True or False. True if the drift_statistics folder (binning model &
frequency counts for each attribute) exists already, False Otherwise. (Default value = False)
source_save
Boolean argument - True or False. This argument will determine whether or not to save the source to source_path.
(Default value = False)
source_path
If pre_existing_source is False, this argument can be used for saving the drift_statistics folder.
The drift_statistics folder will have attribute_binning (binning model) & frequency_counts sub-folders.
If pre_existing_source is True, this argument is path for referring the drift_statistics folder.
Default "NA" for temporarily saving data in "intermediate_data/" folder. (Default value = "NA")
model_directory
If pre_existing_source is False, this argument can be used for saving the drift stats to folder.
The default drift statics directory is drift_statistics folder will have attribute_binning
If pre_existing_source is True, this argument is model_directory for referring the drift statistics dir.
Default "drift_statistics" for temporarily saving source dataset attribute_binning folder.
(Default value = "drift_statistics")
print_impact
Boolean argument - True or False. This argument is to print out the drift statistics of all attributes
and attributes meeting the threshold. (Default value = False)

Returns
-------
DataFrame
[attribute, *metric, flagged]
Number of columns will be dependent on method argument. There will be one column for each drift method/metric.

"""
drop_cols = drop_cols or []
num_cols = attributeType_segregation(idf_target.select(list_of_cols))[0]

count_target = idf_target.count()
count_source = idf_source.count()
if use_sampling:
if count_target > sample_size:
idf_target = data_sample(
idf_target,
strata_cols=strata_cols,
fraction=sample_size / count_target,
method_type=sample_method,
stratified_type=stratified_type,
seed_value=sample_seed,
)
if persist:
idf_target = idf_target.persist(persist_option)
count_target = idf_target.count()
if count_source > sample_size:
idf_source = data_sample(
idf_source,
strata_cols=strata_cols,
fraction=sample_size / count_source,
method_type=sample_method,
stratified_type=stratified_type,
seed_value=sample_seed,
)
if persist:
idf_source = idf_source.persist(persist_option)
count_source = idf_source.count()

if source_path == "NA":
source_path = "intermediate_data"

if not pre_existing_source:
source_bin = attribute_binning(
spark,
idf_source,
list_of_cols=num_cols,
method_type=bin_method,
bin_size=bin_size,
pre_existing_model=False,
model_path=source_path + "/" + model_directory,
)
if persist:
source_bin = source_bin.persist(persist_option)

target_bin = attribute_binning(
spark,
idf_target,
list_of_cols=num_cols,
method_type=bin_method,
bin_size=bin_size,
pre_existing_model=True,
model_path=source_path + "/" + model_directory,
)

if persist:
target_bin = target_bin.persist(persist_option)

temp_list = []
for i in list_of_cols:
temp_method_join_list = []
if pre_existing_source:
source_path + "/" + model_directory + "/frequency_counts/" + i,
inferSchema=True,
)
else:
x = (
source_bin.groupBy(i)
.agg((F.count(i) / count_source).alias("p"))
.fillna(-1)
)
if source_save:
x.coalesce(1).write.csv(
source_path + "/" + model_directory + "/frequency_counts/" + i,
mode="overwrite",
)

y = target_bin.groupBy(i).agg((F.count(i) / count_target).alias("q")).fillna(-1)

xy = (
x.join(y, i, "full_outer")
.fillna(0.0001, subset=["p", "q"])
.replace(0, 0.0001)
.orderBy(i)
)

if "PSI" in method_type:
xy_psi = (
xy.withColumn(
"deduct_ln_mul",
((F.col("p") - F.col("q")) * (F.log(F.col("p") / F.col("q")))),
)
.select(F.sum(F.col("deduct_ln_mul")).alias("PSI"))
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "PSI")
)
temp_method_join_list.append(xy_psi)

if "HD" in method_type:
xy_hd = (
xy.withColumn(
"pow",
F.pow((F.sqrt(F.col("p")) - F.sqrt(F.col("q"))), 2),
)
.select(F.sqrt(F.sum(F.col("pow")) / 2).alias("HD"))
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "HD")
)
temp_method_join_list.append(xy_hd)

if "JSD" in method_type:
xy_jsd = (
xy.withColumn("m", ((F.col("p") + F.col("q")) / 2))
.withColumn("log_pm", (F.col("p") * F.log(F.col("p") / F.col("m"))))
.withColumn("log_qm", (F.col("q") * F.log(F.col("q") / F.col("m"))))
.select(
F.sum(F.col("log_pm")).alias("pm"),
F.sum(F.col("log_qm")).alias("qm"),
)
.select(((F.col("pm") + F.col("qm")) / 2).alias("JSD"))
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "JSD")
)
temp_method_join_list.append(xy_jsd)

if "KS" in method_type:
xy_ks = (
xy.withColumn(
"cum_sum_p",
F.sum(F.col("p")).over(
Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)
),
)
.withColumn(
"cum_sum_q",
F.sum(F.col("q")).over(
Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)
),
)
.withColumn(
"deduct_abs", F.abs(F.col("cum_sum_p") - F.col("cum_sum_q"))
)
.select(
F.max(F.col("deduct_abs")).alias("KS"),
)
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "KS")
)
temp_method_join_list.append(xy_ks)

xy_temp = temp_method_join_list[0]
if len(temp_method_join_list) > 1:
for count in range(1, len(temp_method_join_list)):
xy_temp = xy_temp.join(
temp_method_join_list[count], "attribute", "inner"
)

temp_list.append(xy_temp)

def unionAll(dfs):
first, *_ = dfs
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema
)

odf_union = unionAll(temp_list)
cond_expr = functools.reduce(
operator.or_, [(F.col(c) > threshold) for c in odf_union.columns[1:]]
)
odf = odf_union.withColumn("flagged", F.when(cond_expr, 1).otherwise(0))

if print_impact:
logger.info("All Attributes:")
odf.show(len(list_of_cols))
logger.info("Attributes meeting Data Drift threshold:")
drift = odf.where(F.col("flagged") == 1)
drift.show(drift.count())

if persist:
idf_target.unpersist()
idf_source.unpersist()
if not pre_existing_source:
source_bin.unpersist()
target_bin.unpersist()
return odf



## Functions

 def statistics(spark, idf_target, idf_source, list_of_cols='all', drop_cols=None, method_type='PSI', bin_method='equal_range', bin_size=10, threshold=0.1, use_sampling=True, sample_method='random', strata_cols='all', stratified_type='population', sample_size=100000, sample_seed=42, persist=True, persist_option=StorageLevel(True, True, False, False, 1), pre_existing_source=False, source_save=True, source_path='NA', model_directory='drift_statistics', print_impact=False) 

When the performance of a deployed machine learning model degrades in production, one potential reason is that the data used in training and prediction are not following the same distribution.

Data drift mainly includes the following manifestations:

• Covariate shift: training and test data follow different distributions. For example, An algorithm predicting income that is trained on younger population but tested on older population.
• Prior probability shift: change of prior probability. For example in a spam classification problem, the proportion of spam emails changes from 0.2 in training data to 0.6 in testing data.
• Concept shift: the distribution of the target variable changes given fixed input values. For example in the same spam classification problem, emails tagged as spam in training data are more likely to be tagged as non-spam in testing data.

In our module, we mainly focus on covariate shift detection.

In summary, given 2 datasets, source and target datasets, we would like to quantify the drift of some numerical attributes from source to target datasets. The whole process can be broken down into 2 steps: (1) convert each attribute of interest in source and target datasets into source and target probability distributions. (2) calculate the statistical distance between source and target distributions for each attribute.

In the first step, attribute_binning is firstly performed to bin the numerical attributes of the source dataset, which requires two input variables: bin_method and bin_size. The same binning method is applied on the target dataset to align two results. The probability distributions are computed by dividing the frequency of each bin by the total frequency.

In the second step, 4 choices of statistical metrics are provided to measure the data drift of an attribute from source to target distribution: Population Stability Index (PSI), Jensen-Shannon Divergence (JSD), Hellinger Distance (HD) and Kolmogorov-Smirnov Distance (KS).

They are calculated as below: For two discrete probability distributions P=(p_1,…,p_k) and Q=(q_1,…,q_k),

A threshold can be set to flag out drifted attributes. If multiple statistical metrics have been calculated, an attribute will be marked as drifted if any of its statistical metric is larger than the threshold.

This function can be used in many scenarios. For example:

1. Attribute level data drift can be analysed together with the attribute importance of a machine learning model. The more important an attribute is, the more attention it needs to be given if drift presents.
2. To analyse data drift over time, one can treat one dataset as the source / baseline dataset and multiple datasets as the target datasets. Drift analysis can be performed between the source dataset and each of the target dataset to quantify the drift over time.

## Parameters

spark
Spark Session
idf_target
Input Dataframe
idf_source
Baseline/Source Dataframe. This argument is ignored if pre_existing_source is True.
list_of_cols
List of columns to check drift e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". "all" can be passed to include all (non-array) columns for analysis. Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". (Default value = None)
method_type
"PSI", "JSD", "HD", "KS","all". "all" can be passed to calculate all drift metrics. One or more methods can be passed in a form of list or string where different metrics are separated by pipe delimiter “|” e.g. ["PSI", "JSD"] or "PSI|JSD". (Default value = "PSI")
bin_method
String argument - "equal_frequency" or "equal_range". In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin has equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
bin_size
Integer argument - Number of bins for creating histogram. (Default value = 10)
threshold
Float argument - A column is flagged if any drift metric is above the threshold. (Default value = 0.1)
use_sampling
Boolean argument - True or False. This argument is used to determine whether to use random sample method on source and target dataset, True will enable the use of sample method, otherwise False. It is recommended to set this as True for large datasets. (Default value = True)
sample_method
String argument - "random" or "stratified". If use_sampling is True, this argument is used to determine the sampling method. "stratified" for Stratified sampling, "random" for Random Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "random")
strata_cols
If use_sampling is True and sample_method is "stratified", this argument is used to determine the list of columns used to be treated as strata. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "all")
stratified_type
String argument - "population" or "balanced". If use_sampling is True and sample_method is "stratified", this string argument is used to determine the stratified sampling method. "population" stands for Proportionate Stratified Sampling, "balanced" stands for Optimum Stratified Sampling. For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "population")
sample_size
Integer argument - If use_sampling is True, this argument is used to determine the sample size of sampling method. (Default value = 100000)
sample_seed
Integer argument - If use_sampling is True, this argument is used to determine the seed of sampling method. (Default value = 42)
persist
Boolean argument - True or False. This argument is used to determine whether to persist on binning result of source and target dataset, True will enable the use of persist, otherwise False. It is recommended to set this as True for large datasets. (Default value = True)
persist_option
If persist is True, this argument is used to determine the type of persist. (Default value = pyspark.StorageLevel.MEMORY_AND_DISK)
pre_existing_source
Boolean argument – True or False. True if the drift_statistics folder (binning model & frequency counts for each attribute) exists already, False Otherwise. (Default value = False)
source_save
Boolean argument - True or False. This argument will determine whether or not to save the source to source_path. (Default value = False)
source_path
If pre_existing_source is False, this argument can be used for saving the drift_statistics folder. The drift_statistics folder will have attribute_binning (binning model) & frequency_counts sub-folders. If pre_existing_source is True, this argument is path for referring the drift_statistics folder. Default "NA" for temporarily saving data in "intermediate_data/" folder. (Default value = "NA")
model_directory
If pre_existing_source is False, this argument can be used for saving the drift stats to folder. The default drift statics directory is drift_statistics folder will have attribute_binning If pre_existing_source is True, this argument is model_directory for referring the drift statistics dir. Default "drift_statistics" for temporarily saving source dataset attribute_binning folder. (Default value = "drift_statistics")
print_impact
Boolean argument - True or False. This argument is to print out the drift statistics of all attributes and attributes meeting the threshold. (Default value = False)

## Returns

DataFrame
[attribute, *metric, flagged] Number of columns will be dependent on method argument. There will be one column for each drift method/metric.
Expand source code
@check_distance_method
@check_list_of_columns
def statistics(
spark,
idf_target,
idf_source,
list_of_cols="all",
drop_cols=None,
method_type="PSI",
bin_method="equal_range",
bin_size=10,
threshold=0.1,
use_sampling=True,
sample_method="random",
strata_cols="all",
stratified_type="population",
sample_size=100000,
sample_seed=42,
persist=True,
persist_option=pyspark.StorageLevel.MEMORY_AND_DISK,
pre_existing_source=False,
source_save=True,
source_path="NA",
model_directory="drift_statistics",
print_impact=False,
):
"""
When the performance of a deployed machine learning model degrades in production, one potential reason is that
the data used in training and prediction are not following the same distribution.

Data drift mainly includes the following manifestations:

- Covariate shift: training and test data follow different distributions. For example, An algorithm predicting
income that is trained on younger population but tested on older population.
- Prior probability shift: change of prior probability. For example in a spam classification problem,
the proportion of spam emails changes from 0.2
in training data to 0.6 in testing data.
- Concept shift: the distribution of the target variable changes given fixed input values. For example in
the same spam classification problem, emails tagged as spam in training data are more likely to be tagged
as non-spam in testing data.

In our module, we mainly focus on covariate shift detection.

In summary, given 2 datasets, source and target datasets, we would like to quantify the drift of some numerical
attributes from source to target datasets. The whole process can be broken down into 2 steps: (1) convert each
attribute of interest in source and target datasets into source and target probability distributions. (2)
calculate the statistical distance between source and target distributions for each attribute.

In the first step, attribute_binning is firstly performed to bin the numerical attributes of the source dataset,
which requires two input variables: bin_method and bin_size. The same binning method is applied on the target
dataset to align two results. The probability distributions are computed by dividing the frequency of each bin by
the total frequency.

In the second step, 4 choices of statistical metrics are provided to measure the data drift of an attribute from
source to target distribution: Population Stability Index (PSI), Jensen-Shannon Divergence (JSD),
Hellinger Distance (HD) and Kolmogorov-Smirnov Distance (KS).

They are calculated as below:
For two discrete probability distributions *P=(p_1,…,p_k)* and *Q=(q_1,…,q_k),*

![https://raw.githubusercontent.com/anovos/anovos-docs/main/docs/assets/drift_stats_formulae.png](https://raw.githubusercontent.com/anovos/anovos-docs/main/docs/assets/drift_stats_formulae.png)

A threshold can be set to flag out drifted attributes. If multiple statistical metrics have been calculated,
an attribute will be marked as drifted if any of its statistical metric is larger than the threshold.

This function can be used in many scenarios. For example:

1. Attribute level data drift can be analysed together with the attribute importance of a machine learning model.
The more important an attribute is, the more attention it needs to be given if drift presents.
2. To analyse data drift over time, one can treat one dataset as the source / baseline dataset and multiple
datasets as the target datasets. Drift analysis can be performed between the source dataset and each of the
target dataset to quantify the drift over time.

Parameters
----------
spark
Spark Session
idf_target
Input Dataframe
idf_source
Baseline/Source Dataframe. This argument is ignored if pre_existing_source is True.
list_of_cols
List of columns to check drift e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
"all" can be passed to include all (non-array) columns for analysis.
Please note that this argument is used in conjunction with drop_cols i.e. a column mentioned in
drop_cols argument is not considered for analysis even if it is mentioned in list_of_cols. (Default value = "all")
drop_cols
List of columns to be dropped e.g., ["col1","col2"].
Alternatively, columns can be specified in a string format,
where different column names are separated by pipe delimiter “|” e.g., "col1|col2". (Default value = None)
method_type
"PSI", "JSD", "HD", "KS","all".
"all" can be passed to calculate all drift metrics.
One or more methods can be passed in a form of list or string where different metrics are separated
by pipe delimiter “|” e.g. ["PSI", "JSD"] or "PSI|JSD". (Default value = "PSI")
bin_method
String argument - "equal_frequency" or "equal_range".
In "equal_range" method, each bin is of equal size/width and in "equal_frequency", each bin
has equal no. of rows, though the width of bins may vary. (Default value = "equal_range")
bin_size
Integer argument - Number of bins for creating histogram. (Default value = 10)
threshold
Float argument - A column is flagged if any drift metric is above the threshold. (Default value = 0.1)
use_sampling
Boolean argument - True or False. This argument is used to determine whether to use random sample method on
source and target dataset, True will enable the use of sample method, otherwise False.
It is recommended to set this as True for large datasets. (Default value = True)
sample_method
String argument - "random" or "stratified".
If use_sampling is True, this argument is used to determine the sampling method.
"stratified" for Stratified sampling, "random" for Random Sampling.
For more details, please refer to https://docs.anovos.ai/api/data_ingest/data_sampling.html.
(Default value = "random")
strata_cols
If use_sampling is True and sample_method is "stratified", this argument is used to determine the list
of columns used to be treated as strata. For more details, please refer to
https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "all")
stratified_type
String argument - "population" or "balanced". If use_sampling is True and sample_method is "stratified",
this string argument is used to determine the stratified sampling method. "population" stands for
Proportionate Stratified Sampling, "balanced" stands for Optimum Stratified Sampling.
For more details, please refer to
https://docs.anovos.ai/api/data_ingest/data_sampling.html. (Default value = "population")
sample_size
Integer argument - If use_sampling is True, this argument is used to determine the sample size of sampling method.
(Default value = 100000)
sample_seed
Integer argument - If use_sampling is True, this argument is used to determine the seed of sampling method.
(Default value = 42)
persist
Boolean argument - True or False. This argument is used to determine whether to persist on
binning result of source and target dataset, True will enable the use of persist, otherwise False.
It is recommended to set this as True for large datasets. (Default value = True)
persist_option
If persist is True, this argument is used to determine the type of persist.
(Default value = pyspark.StorageLevel.MEMORY_AND_DISK)
pre_existing_source
Boolean argument – True or False. True if the drift_statistics folder (binning model &
frequency counts for each attribute) exists already, False Otherwise. (Default value = False)
source_save
Boolean argument - True or False. This argument will determine whether or not to save the source to source_path.
(Default value = False)
source_path
If pre_existing_source is False, this argument can be used for saving the drift_statistics folder.
The drift_statistics folder will have attribute_binning (binning model) & frequency_counts sub-folders.
If pre_existing_source is True, this argument is path for referring the drift_statistics folder.
Default "NA" for temporarily saving data in "intermediate_data/" folder. (Default value = "NA")
model_directory
If pre_existing_source is False, this argument can be used for saving the drift stats to folder.
The default drift statics directory is drift_statistics folder will have attribute_binning
If pre_existing_source is True, this argument is model_directory for referring the drift statistics dir.
Default "drift_statistics" for temporarily saving source dataset attribute_binning folder.
(Default value = "drift_statistics")
print_impact
Boolean argument - True or False. This argument is to print out the drift statistics of all attributes
and attributes meeting the threshold. (Default value = False)

Returns
-------
DataFrame
[attribute, *metric, flagged]
Number of columns will be dependent on method argument. There will be one column for each drift method/metric.

"""
drop_cols = drop_cols or []
num_cols = attributeType_segregation(idf_target.select(list_of_cols))[0]

count_target = idf_target.count()
count_source = idf_source.count()
if use_sampling:
if count_target > sample_size:
idf_target = data_sample(
idf_target,
strata_cols=strata_cols,
fraction=sample_size / count_target,
method_type=sample_method,
stratified_type=stratified_type,
seed_value=sample_seed,
)
if persist:
idf_target = idf_target.persist(persist_option)
count_target = idf_target.count()
if count_source > sample_size:
idf_source = data_sample(
idf_source,
strata_cols=strata_cols,
fraction=sample_size / count_source,
method_type=sample_method,
stratified_type=stratified_type,
seed_value=sample_seed,
)
if persist:
idf_source = idf_source.persist(persist_option)
count_source = idf_source.count()

if source_path == "NA":
source_path = "intermediate_data"

if not pre_existing_source:
source_bin = attribute_binning(
spark,
idf_source,
list_of_cols=num_cols,
method_type=bin_method,
bin_size=bin_size,
pre_existing_model=False,
model_path=source_path + "/" + model_directory,
)
if persist:
source_bin = source_bin.persist(persist_option)

target_bin = attribute_binning(
spark,
idf_target,
list_of_cols=num_cols,
method_type=bin_method,
bin_size=bin_size,
pre_existing_model=True,
model_path=source_path + "/" + model_directory,
)

if persist:
target_bin = target_bin.persist(persist_option)

temp_list = []
for i in list_of_cols:
temp_method_join_list = []
if pre_existing_source:
source_path + "/" + model_directory + "/frequency_counts/" + i,
inferSchema=True,
)
else:
x = (
source_bin.groupBy(i)
.agg((F.count(i) / count_source).alias("p"))
.fillna(-1)
)
if source_save:
x.coalesce(1).write.csv(
source_path + "/" + model_directory + "/frequency_counts/" + i,
mode="overwrite",
)

y = target_bin.groupBy(i).agg((F.count(i) / count_target).alias("q")).fillna(-1)

xy = (
x.join(y, i, "full_outer")
.fillna(0.0001, subset=["p", "q"])
.replace(0, 0.0001)
.orderBy(i)
)

if "PSI" in method_type:
xy_psi = (
xy.withColumn(
"deduct_ln_mul",
((F.col("p") - F.col("q")) * (F.log(F.col("p") / F.col("q")))),
)
.select(F.sum(F.col("deduct_ln_mul")).alias("PSI"))
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "PSI")
)
temp_method_join_list.append(xy_psi)

if "HD" in method_type:
xy_hd = (
xy.withColumn(
"pow",
F.pow((F.sqrt(F.col("p")) - F.sqrt(F.col("q"))), 2),
)
.select(F.sqrt(F.sum(F.col("pow")) / 2).alias("HD"))
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "HD")
)
temp_method_join_list.append(xy_hd)

if "JSD" in method_type:
xy_jsd = (
xy.withColumn("m", ((F.col("p") + F.col("q")) / 2))
.withColumn("log_pm", (F.col("p") * F.log(F.col("p") / F.col("m"))))
.withColumn("log_qm", (F.col("q") * F.log(F.col("q") / F.col("m"))))
.select(
F.sum(F.col("log_pm")).alias("pm"),
F.sum(F.col("log_qm")).alias("qm"),
)
.select(((F.col("pm") + F.col("qm")) / 2).alias("JSD"))
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "JSD")
)
temp_method_join_list.append(xy_jsd)

if "KS" in method_type:
xy_ks = (
xy.withColumn(
"cum_sum_p",
F.sum(F.col("p")).over(
Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)
),
)
.withColumn(
"cum_sum_q",
F.sum(F.col("q")).over(
Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)
),
)
.withColumn(
"deduct_abs", F.abs(F.col("cum_sum_p") - F.col("cum_sum_q"))
)
.select(
F.max(F.col("deduct_abs")).alias("KS"),
)
.withColumn("attribute", F.lit(str(i)))
.select("attribute", "KS")
)
temp_method_join_list.append(xy_ks)

xy_temp = temp_method_join_list[0]
if len(temp_method_join_list) > 1:
for count in range(1, len(temp_method_join_list)):
xy_temp = xy_temp.join(
temp_method_join_list[count], "attribute", "inner"
)

temp_list.append(xy_temp)

def unionAll(dfs):
first, *_ = dfs
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema
)

odf_union = unionAll(temp_list)
cond_expr = functools.reduce(
operator.or_, [(F.col(c) > threshold) for c in odf_union.columns[1:]]
)
odf = odf_union.withColumn("flagged", F.when(cond_expr, 1).otherwise(0))

if print_impact:
logger.info("All Attributes:")
odf.show(len(list_of_cols))
logger.info("Attributes meeting Data Drift threshold:")
drift = odf.where(F.col("flagged") == 1)
drift.show(drift.count())

if persist:
idf_target.unpersist()
idf_source.unpersist()
if not pre_existing_source:
source_bin.unpersist()
target_bin.unpersist()
return odf