data_ingest
This module consists of functions to read the dataset as Spark DataFrame, concatenate/join with other functions (if required), and perform some basic ETL actions such as selecting, deleting, renaming and/or recasting columns. List of functions included in this module are: - read_dataset - write_dataset - concatenate_dataset - join_dataset - delete_column - select_column - rename_column - recast_column
Expand source code
# coding=utf-8 """ This module consists of functions to read the dataset as Spark DataFrame, concatenate/join with other functions (if required), and perform some basic ETL actions such as selecting, deleting, renaming and/or recasting columns. List of functions included in this module are: - read_dataset - write_dataset - concatenate_dataset - join_dataset - delete_column - select_column - rename_column - recast_column """ import warnings import pyspark.sql.functions as F from pyspark.sql import DataFrame from pyspark.sql import types as T from anovos.shared.utils import attributeType_segregation, pairwise_reduce def read_dataset(spark, file_path, file_type, file_configs={}): """ This function reads the input data path and return a Spark DataFrame. Under the hood, this function is based on generic Load functionality of Spark SQL. Parameters ---------- spark Spark Session file_path Path to input data (directory or filename). Compatible with local path and s3 path (when running in AWS environment). file_type "csv", "parquet", "avro", "json". Avro data source requires an external package to run, which can be configured with spark-submit (--packages org.apache.spark:spark-avro_2.11:2.4.0). file_configs This optional argument is passed in a dictionary format as key/value pairs e.g. {"header": "True","delimiter": "|","inferSchema": "True"} for csv files. All the key/value pairs in this argument are passed as options to DataFrameReader, which is created using SparkSession.read. (Default value = {}) Returns ------- DataFrame """ odf = spark.read.format(file_type).options(**file_configs).load(file_path) return odf def write_dataset(idf, file_path, file_type, file_configs={}, column_order=[]): """ This function saves the Spark DataFrame in the user-provided output path. Like read_dataset, this function is based on the generic Save functionality of Spark SQL. Parameters ---------- idf Input Dataframe i.e. Spark DataFrame to be saved file_path Path to output data (directory or filename). Compatible with local path and s3 path (when running in AWS environment). file_type "csv", "parquet", "avro", "json". Avro data source requires an external package to run, which can be configured with spark-submit (--packages org.apache.spark:spark-avro_2.11:2.4.0). file_configs This argument is passed in dictionary format as key/value pairs. Some of the potential keys are header, delimiter, mode, compression, repartition. compression options - uncompressed, gzip (doesn't work with avro), snappy (only valid for parquet) mode options - error (default), overwrite, append repartition - None (automatic partitioning) or an integer value () e.g. {"header":"True", "delimiter":",",'compression':'snappy','mode':'overwrite','repartition':'10'}. All the key/value pairs (except repartition, mode) written in this argument are passed as options to DataFrameWriter is available using Dataset.write operator. If the number of repartitions mentioned through this argument is less than the existing DataFrame partitions, then the coalesce operation is used instead of the repartition operation to make the execution work. This is because the coalesce operation doesn’t require any shuffling like repartition which is known to be an expensive step. column_order list of columns in the order in which Dataframe is to be written. If None or [] is specified, then the default order is applied. """ if not column_order: column_order = idf.columns else: if len(column_order) != len(idf.columns): raise ValueError( "Count of column(s) specified in column_order argument do not match Dataframe" ) diff_cols = [x for x in column_order if x not in set(idf.columns)] if diff_cols: raise ValueError( "Column(s) specified in column_order argument not found in Dataframe: " + str(diff_cols) ) mode = file_configs["mode"] if "mode" in file_configs else "error" repartition = ( int(file_configs["repartition"]) if "repartition" in file_configs else None ) if repartition is None: idf.select(column_order).write.format(file_type).options(**file_configs).save( file_path, mode=mode ) else: exist_parts = idf.rdd.getNumPartitions() req_parts = int(repartition) if req_parts > exist_parts: idf.select(column_order).repartition(req_parts).write.format( file_type ).options(**file_configs).save(file_path, mode=mode) else: idf.select(column_order).coalesce(req_parts).write.format( file_type ).options(**file_configs).save(file_path, mode=mode) def concatenate_dataset(*idfs, method_type="name"): """ This function combines multiple dataframes into a single dataframe. A pairwise concatenation is performed on the dataframes, instead of adding one dataframe at a time to the bigger dataframe. This function leverages union functionality of Spark SQL. Parameters ---------- *idfs All dataframes to be concatenated (with the first dataframe columns) method_type "index", "name". This argument needs to be passed as a keyword argument. The “index” method concatenates the dataframes by the column index (without shuffling columns). If the sequence of column is not fixed among the dataframe, this method should be avoided. The “name” method concatenates after shuffling and arranging columns as per the first dataframe order. First dataframe passed under idfs will define the final columns in the concatenated dataframe, and will throw error if any column in first dataframe is not available in any of other dataframes. (Default value = "name") Returns ------- DataFrame Concatenated dataframe """ if method_type not in ["index", "name"]: raise TypeError("Invalid input for concatenate_dataset method") if method_type == "name": odf = pairwise_reduce( lambda idf1, idf2: idf1.union(idf2.select(idf1.columns)), idfs ) # odf = reduce(DataFrame.unionByName, idfs) # only if exact no. of columns else: odf = pairwise_reduce(DataFrame.union, idfs) return odf def join_dataset(*idfs, join_cols, join_type): """ This function joins multiple dataframes into a single dataframe by joining key column(s). For optimization, Pairwise joining is done on the dataframes, instead of joining individual dataframes to the bigger dataframe. This function leverages join functionality of Spark SQL. Parameters ---------- idfs All dataframes to be joined join_cols Key column(s) to join all dataframes together. In case of multiple key columns to join, they can be passed in a list format or a string format where different column names are separated by pipe delimiter “|” e.g. "col1|col2". join_type "inner", “full”, “left”, “right”, “left_semi”, “left_anti” Returns ------- DataFrame Joined dataframe """ if isinstance(join_cols, str): join_cols = [x.strip() for x in join_cols.split("|")] list_of_df_cols = [x.columns for x in idfs] list_of_all_cols = [x for sublist in list_of_df_cols for x in sublist] list_of_nonjoin_cols = [x for x in list_of_all_cols if x not in join_cols] if len(list_of_nonjoin_cols) != ( len(list_of_all_cols) - (len(list_of_df_cols) * len(join_cols)) ): raise ValueError("Specified join_cols do not match all the Input Dataframe(s)") if len(list_of_nonjoin_cols) != len(set(list_of_nonjoin_cols)): raise ValueError( "Duplicate column(s) present in non joining column(s) in Input Dataframe(s)" ) odf = pairwise_reduce( lambda idf1, idf2: idf1.join(idf2, join_cols, join_type), idfs ) return odf def delete_column(idf, list_of_cols, print_impact=False): """ This function is used to delete specific columns from the input data. It is executed using drop functionality of Spark SQL. It is advisable to use this function if the number of columns to delete is lesser than the number of columns to select; otherwise, it is recommended to use select_column. Parameters ---------- idf Input Dataframe list_of_cols List of columns to delete e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". print_impact True, False This argument is to compare number of columns before and after the operation.(Default value = False) Returns ------- DataFrame Dataframe after dropping columns """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] list_of_cols = list(set(list_of_cols)) odf = idf.drop(*list_of_cols) if print_impact: print("Before: \nNo. of Columns- ", len(idf.columns)) print(idf.columns) print("After: \nNo. of Columns- ", len(odf.columns)) print(odf.columns) return odf def select_column(idf, list_of_cols, print_impact=False): """ This function is used to select specific columns from the input data. It is executed using select operation of spark dataframe. It is advisable to use this function if the number of columns to select is lesser than the number of columns to drop; otherwise, it is recommended to use delete_column. Parameters ---------- idf Input Dataframe list_of_cols List of columns to select e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". print_impact True, False This argument is to compare number of columns before and after the operation.(Default value = False) Returns ------- DataFrame Dataframe with the selected columns """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] list_of_cols = list(set(list_of_cols)) odf = idf.select(list_of_cols) if print_impact: print("Before: \nNo. of Columns-", len(idf.columns)) print(idf.columns) print("\nAfter: \nNo. of Columns-", len(odf.columns)) print(odf.columns) return odf def rename_column(idf, list_of_cols, list_of_newcols, print_impact=False): """ This function is used to rename the columns of the input data. Multiple columns can be renamed; however, the sequence they passed as an argument is critical and must be consistent between list_of_cols and list_of_newcols. Parameters ---------- idf Input Dataframe list_of_cols List of old column names e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". list_of_newcols List of corresponding new column names e.g., ["newcol1","newcol2"]. Alternatively, new column names can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "newcol1|newcol2". First element in list_of_cols will be original column name, and corresponding first column in list_of_newcols will be new column name. print_impact True, False This argument is to compare column names before and after the operation. (Default value = False) Returns ------- DataFrame Dataframe with revised column names """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(list_of_newcols, str): list_of_newcols = [x.strip() for x in list_of_newcols.split("|")] mapping = dict(zip(list_of_cols, list_of_newcols)) odf = idf.select([F.col(i).alias(mapping.get(i, i)) for i in idf.columns]) if print_impact: print("Before: \nNo. of Columns- ", len(idf.columns)) print(idf.columns) print("After: \nNo. of Columns- ", len(odf.columns)) print(odf.columns) return odf def recast_column(idf, list_of_cols, list_of_dtypes, print_impact=False): """ This function is used to modify the datatype of columns. Multiple columns can be cast; however, the sequence they passed as argument is critical and needs to be consistent between list_of_cols and list_of_dtypes. Parameters ---------- idf Input Dataframe list_of_cols List of columns to cast e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". list_of_dtypes List of corresponding datatypes e.g., ["type1","type2"]. Alternatively, datatypes can be specified in a string format, where they are separated by pipe delimiter “|” e.g., "type1|type2". First element in list_of_cols will column name and corresponding element in list_of_dtypes will be new datatypes such as "float", "integer", "long", "string", "double", decimal" etc. Datatypes are case insensitive e.g. float or Float are treated as same. print_impact True, False This argument is to compare schema before and after the operation. (Default value = False) Returns ------- DataFrame Dataframe with revised datatypes """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(list_of_dtypes, str): list_of_dtypes = [x.strip() for x in list_of_dtypes.split("|")] odf = idf for i, j in zip(list_of_cols, list_of_dtypes): odf = odf.withColumn(i, F.col(i).cast(j)) if print_impact: print("Before: ") idf.printSchema() print("After: ") odf.printSchema() return odf def recommend_type( spark, idf, list_of_cols="all", drop_cols=[], dynamic_threshold=0.01, static_threshold=100, ): """ This function is to recommend the form and datatype of columns. Cardinality of each column will be measured, then both dynamic_threshold and static_threshold will be used to determine the recommended form and datatype for each column. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to cast e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". (Default value = 'all') drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of strata_cols, when we need to consider all columns except a few handful of them. (Default value = []) dynamic_threshold Cardinality threshold to determine columns recommended form and datatype. If the column's unique values < column total records * dynamic_threshold, column will be recommended as categorical, and in string datatype. Else, column will be recommended as numerical, and in double datatype In recommend_type, we will use the general threshold equals to the minimum of dynamic_threshold and static_threshold. (Default value = 0.01) static_threshold Cardinality threshold to determine columns recommended form and datatype. If the column's unique values < static_threshold, column will be recommended as categorical, and in string datatype. Else, column will be recommended as numerical, and in double datatype In recommend_type, we will use the general threshold equals to the minimum of dynamic_threshold and static_threshold. (Default value = 100) Returns ------- DataFrame Dataframe with attributes and their original/recommended form and datatype """ if list_of_cols == "all": list_of_cols = idf.columns if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in idf.columns for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No recommend_attributeType analysis - No column(s) to analyze") schema = T.StructType( [ T.StructField("attribute", T.StringType(), True), T.StructField("original_form", T.StringType(), True), T.StructField("original_dataType", T.StringType(), True), T.StructField("recommended_form", T.StringType(), True), T.StructField("recommended_dataType", T.StringType(), True), T.StructField("distinct_value_count", T.StringType(), True), ] ) odf = spark.sparkContext.emptyRDD().toDF(schema) return odf if type(dynamic_threshold) != float: raise TypeError("Invalid input for dynamic_threshold: float type only") if dynamic_threshold <= 0 or dynamic_threshold > 1: raise TypeError( "Invalid input for dynamic_threshold: Value need to be between 0 and 1" ) if type(static_threshold) != int: raise TypeError("Invalid input for static_threshold: int type only") def min_val(val1, val2): if val1 > val2: return val2 else: return val1 num_cols, cat_cols, other_cols = attributeType_segregation(idf) rec_num_cols = [] rec_cat_cols = [] for col in num_cols: if idf.select(col).distinct().na.drop().count() < min_val( (dynamic_threshold * idf.select(col).na.drop().count()), static_threshold ): rec_cat_cols.append(col) for col in cat_cols: idf_inter = ( idf.na.drop(subset=col).withColumn(col, idf[col].cast("double")).select(col) ) if ( idf_inter.distinct().na.drop().count() == idf_inter.distinct().count() and idf_inter.distinct().na.drop().count() != 0 ): if idf.select(col).distinct().na.drop().count() >= min_val( (dynamic_threshold * idf.select(col).na.drop().count()), static_threshold, ): rec_num_cols.append(col) rec_cols = rec_num_cols + rec_cat_cols ori_form = [] ori_type = [] rec_form = [] rec_type = [] num_dist_val = [] if len(rec_cols) > 0: for col in rec_cols: if col in rec_num_cols: ori_form.append("categorical") ori_type.append(idf.select(col).dtypes[0][1]) rec_form.append("numerical") rec_type.append("double") num_dist_val.append(idf.select(col).distinct().count()) else: ori_form.append("numerical") ori_type.append(idf.select(col).dtypes[0][1]) rec_form.append("categorical") rec_type.append("string") num_dist_val.append(idf.select(col).distinct().count()) odf_rec = spark.createDataFrame( zip(rec_cols, ori_form, ori_type, rec_form, rec_type, num_dist_val), schema=( "attribute", "original_form", "original_dataType", "recommended_form", "recommended_dataType", "distinct_value_count", ), ) return odf_rec else: warnings.warn("No column type change recommendation is made") schema = T.StructType( [ T.StructField("attribute", T.StringType(), True), T.StructField("original_form", T.StringType(), True), T.StructField("original_dataType", T.StringType(), True), T.StructField("recommended_form", T.StringType(), True), T.StructField("recommended_dataType", T.StringType(), True), T.StructField("distinct_value_count", T.StringType(), True), ] ) odf = spark.sparkContext.emptyRDD().toDF(schema) return odf
Functions
def concatenate_dataset(*idfs, method_type='name')
-
This function combines multiple dataframes into a single dataframe. A pairwise concatenation is performed on the dataframes, instead of adding one dataframe at a time to the bigger dataframe. This function leverages union functionality of Spark SQL.
Parameters
*idfs
- All dataframes to be concatenated (with the first dataframe columns)
method_type
- "index", "name". This argument needs to be passed as a keyword argument. The “index” method concatenates the dataframes by the column index (without shuffling columns). If the sequence of column is not fixed among the dataframe, this method should be avoided. The “name” method concatenates after shuffling and arranging columns as per the first dataframe order. First dataframe passed under idfs will define the final columns in the concatenated dataframe, and will throw error if any column in first dataframe is not available in any of other dataframes. (Default value = "name")
Returns
DataFrame
- Concatenated dataframe
Expand source code
def concatenate_dataset(*idfs, method_type="name"): """ This function combines multiple dataframes into a single dataframe. A pairwise concatenation is performed on the dataframes, instead of adding one dataframe at a time to the bigger dataframe. This function leverages union functionality of Spark SQL. Parameters ---------- *idfs All dataframes to be concatenated (with the first dataframe columns) method_type "index", "name". This argument needs to be passed as a keyword argument. The “index” method concatenates the dataframes by the column index (without shuffling columns). If the sequence of column is not fixed among the dataframe, this method should be avoided. The “name” method concatenates after shuffling and arranging columns as per the first dataframe order. First dataframe passed under idfs will define the final columns in the concatenated dataframe, and will throw error if any column in first dataframe is not available in any of other dataframes. (Default value = "name") Returns ------- DataFrame Concatenated dataframe """ if method_type not in ["index", "name"]: raise TypeError("Invalid input for concatenate_dataset method") if method_type == "name": odf = pairwise_reduce( lambda idf1, idf2: idf1.union(idf2.select(idf1.columns)), idfs ) # odf = reduce(DataFrame.unionByName, idfs) # only if exact no. of columns else: odf = pairwise_reduce(DataFrame.union, idfs) return odf
def delete_column(idf, list_of_cols, print_impact=False)
-
This function is used to delete specific columns from the input data. It is executed using drop functionality of Spark SQL. It is advisable to use this function if the number of columns to delete is lesser than the number of columns to select; otherwise, it is recommended to use select_column.
Parameters
idf
- Input Dataframe
list_of_cols
- List of columns to delete e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
print_impact
- True, False This argument is to compare number of columns before and after the operation.(Default value = False)
Returns
DataFrame
- Dataframe after dropping columns
Expand source code
def delete_column(idf, list_of_cols, print_impact=False): """ This function is used to delete specific columns from the input data. It is executed using drop functionality of Spark SQL. It is advisable to use this function if the number of columns to delete is lesser than the number of columns to select; otherwise, it is recommended to use select_column. Parameters ---------- idf Input Dataframe list_of_cols List of columns to delete e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". print_impact True, False This argument is to compare number of columns before and after the operation.(Default value = False) Returns ------- DataFrame Dataframe after dropping columns """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] list_of_cols = list(set(list_of_cols)) odf = idf.drop(*list_of_cols) if print_impact: print("Before: \nNo. of Columns- ", len(idf.columns)) print(idf.columns) print("After: \nNo. of Columns- ", len(odf.columns)) print(odf.columns) return odf
def join_dataset(*idfs, join_cols, join_type)
-
This function joins multiple dataframes into a single dataframe by joining key column(s). For optimization, Pairwise joining is done on the dataframes, instead of joining individual dataframes to the bigger dataframe. This function leverages join functionality of Spark SQL.
Parameters
idfs
- All dataframes to be joined
join_cols
- Key column(s) to join all dataframes together. In case of multiple key columns to join, they can be passed in a list format or a string format where different column names are separated by pipe delimiter “|” e.g. "col1|col2".
join_type
- "inner", “full”, “left”, “right”, “left_semi”, “left_anti”
Returns
DataFrame
- Joined dataframe
Expand source code
def join_dataset(*idfs, join_cols, join_type): """ This function joins multiple dataframes into a single dataframe by joining key column(s). For optimization, Pairwise joining is done on the dataframes, instead of joining individual dataframes to the bigger dataframe. This function leverages join functionality of Spark SQL. Parameters ---------- idfs All dataframes to be joined join_cols Key column(s) to join all dataframes together. In case of multiple key columns to join, they can be passed in a list format or a string format where different column names are separated by pipe delimiter “|” e.g. "col1|col2". join_type "inner", “full”, “left”, “right”, “left_semi”, “left_anti” Returns ------- DataFrame Joined dataframe """ if isinstance(join_cols, str): join_cols = [x.strip() for x in join_cols.split("|")] list_of_df_cols = [x.columns for x in idfs] list_of_all_cols = [x for sublist in list_of_df_cols for x in sublist] list_of_nonjoin_cols = [x for x in list_of_all_cols if x not in join_cols] if len(list_of_nonjoin_cols) != ( len(list_of_all_cols) - (len(list_of_df_cols) * len(join_cols)) ): raise ValueError("Specified join_cols do not match all the Input Dataframe(s)") if len(list_of_nonjoin_cols) != len(set(list_of_nonjoin_cols)): raise ValueError( "Duplicate column(s) present in non joining column(s) in Input Dataframe(s)" ) odf = pairwise_reduce( lambda idf1, idf2: idf1.join(idf2, join_cols, join_type), idfs ) return odf
def read_dataset(spark, file_path, file_type, file_configs={})
-
This function reads the input data path and return a Spark DataFrame. Under the hood, this function is based on generic Load functionality of Spark SQL.
Parameters
spark
- Spark Session
file_path
- Path to input data (directory or filename). Compatible with local path and s3 path (when running in AWS environment).
file_type
- "csv", "parquet", "avro", "json". Avro data source requires an external package to run, which can be configured with spark-submit (–packages org.apache.spark:spark-avro_2.11:2.4.0).
file_configs
- This optional argument is passed in a dictionary format as key/value pairs e.g. {"header": "True","delimiter": "|","inferSchema": "True"} for csv files. All the key/value pairs in this argument are passed as options to DataFrameReader, which is created using SparkSession.read. (Default value = {})
Returns
DataFrame
Expand source code
def read_dataset(spark, file_path, file_type, file_configs={}): """ This function reads the input data path and return a Spark DataFrame. Under the hood, this function is based on generic Load functionality of Spark SQL. Parameters ---------- spark Spark Session file_path Path to input data (directory or filename). Compatible with local path and s3 path (when running in AWS environment). file_type "csv", "parquet", "avro", "json". Avro data source requires an external package to run, which can be configured with spark-submit (--packages org.apache.spark:spark-avro_2.11:2.4.0). file_configs This optional argument is passed in a dictionary format as key/value pairs e.g. {"header": "True","delimiter": "|","inferSchema": "True"} for csv files. All the key/value pairs in this argument are passed as options to DataFrameReader, which is created using SparkSession.read. (Default value = {}) Returns ------- DataFrame """ odf = spark.read.format(file_type).options(**file_configs).load(file_path) return odf
def recast_column(idf, list_of_cols, list_of_dtypes, print_impact=False)
-
This function is used to modify the datatype of columns. Multiple columns can be cast; however, the sequence they passed as argument is critical and needs to be consistent between list_of_cols and list_of_dtypes.
Parameters
idf
- Input Dataframe
list_of_cols
- List of columns to cast e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
list_of_dtypes
- List of corresponding datatypes e.g., ["type1","type2"]. Alternatively, datatypes can be specified in a string format, where they are separated by pipe delimiter “|” e.g., "type1|type2". First element in list_of_cols will column name and corresponding element in list_of_dtypes will be new datatypes such as "float", "integer", "long", "string", "double", decimal" etc. Datatypes are case insensitive e.g. float or Float are treated as same.
print_impact
- True, False This argument is to compare schema before and after the operation. (Default value = False)
Returns
DataFrame
- Dataframe with revised datatypes
Expand source code
def recast_column(idf, list_of_cols, list_of_dtypes, print_impact=False): """ This function is used to modify the datatype of columns. Multiple columns can be cast; however, the sequence they passed as argument is critical and needs to be consistent between list_of_cols and list_of_dtypes. Parameters ---------- idf Input Dataframe list_of_cols List of columns to cast e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". list_of_dtypes List of corresponding datatypes e.g., ["type1","type2"]. Alternatively, datatypes can be specified in a string format, where they are separated by pipe delimiter “|” e.g., "type1|type2". First element in list_of_cols will column name and corresponding element in list_of_dtypes will be new datatypes such as "float", "integer", "long", "string", "double", decimal" etc. Datatypes are case insensitive e.g. float or Float are treated as same. print_impact True, False This argument is to compare schema before and after the operation. (Default value = False) Returns ------- DataFrame Dataframe with revised datatypes """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(list_of_dtypes, str): list_of_dtypes = [x.strip() for x in list_of_dtypes.split("|")] odf = idf for i, j in zip(list_of_cols, list_of_dtypes): odf = odf.withColumn(i, F.col(i).cast(j)) if print_impact: print("Before: ") idf.printSchema() print("After: ") odf.printSchema() return odf
def recommend_type(spark, idf, list_of_cols='all', drop_cols=[], dynamic_threshold=0.01, static_threshold=100)
-
This function is to recommend the form and datatype of columns. Cardinality of each column will be measured, then both dynamic_threshold and static_threshold will be used to determine the recommended form and datatype for each column.
Parameters
spark
- Spark Session
idf
- Input Dataframe
list_of_cols
- List of columns to cast e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". (Default value = 'all')
drop_cols
- List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of strata_cols, when we need to consider all columns except a few handful of them. (Default value = [])
dynamic_threshold
- Cardinality threshold to determine columns recommended form and datatype. If the column's unique values < column total records * dynamic_threshold, column will be recommended as categorical, and in string datatype. Else, column will be recommended as numerical, and in double datatype In recommend_type, we will use the general threshold equals to the minimum of dynamic_threshold and static_threshold. (Default value = 0.01)
static_threshold
- Cardinality threshold to determine columns recommended form and datatype. If the column's unique values < static_threshold, column will be recommended as categorical, and in string datatype. Else, column will be recommended as numerical, and in double datatype In recommend_type, we will use the general threshold equals to the minimum of dynamic_threshold and static_threshold. (Default value = 100)
Returns
DataFrame
- Dataframe with attributes and their original/recommended form and datatype
Expand source code
def recommend_type( spark, idf, list_of_cols="all", drop_cols=[], dynamic_threshold=0.01, static_threshold=100, ): """ This function is to recommend the form and datatype of columns. Cardinality of each column will be measured, then both dynamic_threshold and static_threshold will be used to determine the recommended form and datatype for each column. Parameters ---------- spark Spark Session idf Input Dataframe list_of_cols List of columns to cast e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". (Default value = 'all') drop_cols List of columns to be dropped e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". It is most useful when coupled with the “all” value of strata_cols, when we need to consider all columns except a few handful of them. (Default value = []) dynamic_threshold Cardinality threshold to determine columns recommended form and datatype. If the column's unique values < column total records * dynamic_threshold, column will be recommended as categorical, and in string datatype. Else, column will be recommended as numerical, and in double datatype In recommend_type, we will use the general threshold equals to the minimum of dynamic_threshold and static_threshold. (Default value = 0.01) static_threshold Cardinality threshold to determine columns recommended form and datatype. If the column's unique values < static_threshold, column will be recommended as categorical, and in string datatype. Else, column will be recommended as numerical, and in double datatype In recommend_type, we will use the general threshold equals to the minimum of dynamic_threshold and static_threshold. (Default value = 100) Returns ------- DataFrame Dataframe with attributes and their original/recommended form and datatype """ if list_of_cols == "all": list_of_cols = idf.columns if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(drop_cols, str): drop_cols = [x.strip() for x in drop_cols.split("|")] list_of_cols = list(set([e for e in list_of_cols if e not in drop_cols])) if any(x not in idf.columns for x in list_of_cols): raise TypeError("Invalid input for Column(s)") if len(list_of_cols) == 0: warnings.warn("No recommend_attributeType analysis - No column(s) to analyze") schema = T.StructType( [ T.StructField("attribute", T.StringType(), True), T.StructField("original_form", T.StringType(), True), T.StructField("original_dataType", T.StringType(), True), T.StructField("recommended_form", T.StringType(), True), T.StructField("recommended_dataType", T.StringType(), True), T.StructField("distinct_value_count", T.StringType(), True), ] ) odf = spark.sparkContext.emptyRDD().toDF(schema) return odf if type(dynamic_threshold) != float: raise TypeError("Invalid input for dynamic_threshold: float type only") if dynamic_threshold <= 0 or dynamic_threshold > 1: raise TypeError( "Invalid input for dynamic_threshold: Value need to be between 0 and 1" ) if type(static_threshold) != int: raise TypeError("Invalid input for static_threshold: int type only") def min_val(val1, val2): if val1 > val2: return val2 else: return val1 num_cols, cat_cols, other_cols = attributeType_segregation(idf) rec_num_cols = [] rec_cat_cols = [] for col in num_cols: if idf.select(col).distinct().na.drop().count() < min_val( (dynamic_threshold * idf.select(col).na.drop().count()), static_threshold ): rec_cat_cols.append(col) for col in cat_cols: idf_inter = ( idf.na.drop(subset=col).withColumn(col, idf[col].cast("double")).select(col) ) if ( idf_inter.distinct().na.drop().count() == idf_inter.distinct().count() and idf_inter.distinct().na.drop().count() != 0 ): if idf.select(col).distinct().na.drop().count() >= min_val( (dynamic_threshold * idf.select(col).na.drop().count()), static_threshold, ): rec_num_cols.append(col) rec_cols = rec_num_cols + rec_cat_cols ori_form = [] ori_type = [] rec_form = [] rec_type = [] num_dist_val = [] if len(rec_cols) > 0: for col in rec_cols: if col in rec_num_cols: ori_form.append("categorical") ori_type.append(idf.select(col).dtypes[0][1]) rec_form.append("numerical") rec_type.append("double") num_dist_val.append(idf.select(col).distinct().count()) else: ori_form.append("numerical") ori_type.append(idf.select(col).dtypes[0][1]) rec_form.append("categorical") rec_type.append("string") num_dist_val.append(idf.select(col).distinct().count()) odf_rec = spark.createDataFrame( zip(rec_cols, ori_form, ori_type, rec_form, rec_type, num_dist_val), schema=( "attribute", "original_form", "original_dataType", "recommended_form", "recommended_dataType", "distinct_value_count", ), ) return odf_rec else: warnings.warn("No column type change recommendation is made") schema = T.StructType( [ T.StructField("attribute", T.StringType(), True), T.StructField("original_form", T.StringType(), True), T.StructField("original_dataType", T.StringType(), True), T.StructField("recommended_form", T.StringType(), True), T.StructField("recommended_dataType", T.StringType(), True), T.StructField("distinct_value_count", T.StringType(), True), ] ) odf = spark.sparkContext.emptyRDD().toDF(schema) return odf
def rename_column(idf, list_of_cols, list_of_newcols, print_impact=False)
-
This function is used to rename the columns of the input data. Multiple columns can be renamed; however, the sequence they passed as an argument is critical and must be consistent between list_of_cols and list_of_newcols.
Parameters
idf
- Input Dataframe
list_of_cols
- List of old column names e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
list_of_newcols
- List of corresponding new column names e.g., ["newcol1","newcol2"]. Alternatively, new column names can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "newcol1|newcol2". First element in list_of_cols will be original column name, and corresponding first column in list_of_newcols will be new column name.
print_impact
- True, False This argument is to compare column names before and after the operation. (Default value = False)
Returns
DataFrame
- Dataframe with revised column names
Expand source code
def rename_column(idf, list_of_cols, list_of_newcols, print_impact=False): """ This function is used to rename the columns of the input data. Multiple columns can be renamed; however, the sequence they passed as an argument is critical and must be consistent between list_of_cols and list_of_newcols. Parameters ---------- idf Input Dataframe list_of_cols List of old column names e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". list_of_newcols List of corresponding new column names e.g., ["newcol1","newcol2"]. Alternatively, new column names can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "newcol1|newcol2". First element in list_of_cols will be original column name, and corresponding first column in list_of_newcols will be new column name. print_impact True, False This argument is to compare column names before and after the operation. (Default value = False) Returns ------- DataFrame Dataframe with revised column names """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] if isinstance(list_of_newcols, str): list_of_newcols = [x.strip() for x in list_of_newcols.split("|")] mapping = dict(zip(list_of_cols, list_of_newcols)) odf = idf.select([F.col(i).alias(mapping.get(i, i)) for i in idf.columns]) if print_impact: print("Before: \nNo. of Columns- ", len(idf.columns)) print(idf.columns) print("After: \nNo. of Columns- ", len(odf.columns)) print(odf.columns) return odf
def select_column(idf, list_of_cols, print_impact=False)
-
This function is used to select specific columns from the input data. It is executed using select operation of spark dataframe. It is advisable to use this function if the number of columns to select is lesser than the number of columns to drop; otherwise, it is recommended to use delete_column.
Parameters
idf
- Input Dataframe
list_of_cols
- List of columns to select e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2".
print_impact
- True, False This argument is to compare number of columns before and after the operation.(Default value = False)
Returns
DataFrame
- Dataframe with the selected columns
Expand source code
def select_column(idf, list_of_cols, print_impact=False): """ This function is used to select specific columns from the input data. It is executed using select operation of spark dataframe. It is advisable to use this function if the number of columns to select is lesser than the number of columns to drop; otherwise, it is recommended to use delete_column. Parameters ---------- idf Input Dataframe list_of_cols List of columns to select e.g., ["col1","col2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter “|” e.g., "col1|col2". print_impact True, False This argument is to compare number of columns before and after the operation.(Default value = False) Returns ------- DataFrame Dataframe with the selected columns """ if isinstance(list_of_cols, str): list_of_cols = [x.strip() for x in list_of_cols.split("|")] list_of_cols = list(set(list_of_cols)) odf = idf.select(list_of_cols) if print_impact: print("Before: \nNo. of Columns-", len(idf.columns)) print(idf.columns) print("\nAfter: \nNo. of Columns-", len(odf.columns)) print(odf.columns) return odf
def write_dataset(idf, file_path, file_type, file_configs={}, column_order=[])
-
This function saves the Spark DataFrame in the user-provided output path. Like read_dataset, this function is based on the generic Save functionality of Spark SQL.
Parameters
idf
- Input Dataframe i.e. Spark DataFrame to be saved
file_path
- Path to output data (directory or filename). Compatible with local path and s3 path (when running in AWS environment).
file_type
- "csv", "parquet", "avro", "json". Avro data source requires an external package to run, which can be configured with spark-submit (–packages org.apache.spark:spark-avro_2.11:2.4.0).
file_configs
- This argument is passed in dictionary format as key/value pairs. Some of the potential keys are header, delimiter, mode, compression, repartition. compression options - uncompressed, gzip (doesn't work with avro), snappy (only valid for parquet) mode options - error (default), overwrite, append repartition - None (automatic partitioning) or an integer value () e.g. {"header":"True", "delimiter":",",'compression':'snappy','mode':'overwrite','repartition':'10'}. All the key/value pairs (except repartition, mode) written in this argument are passed as options to DataFrameWriter is available using Dataset.write operator. If the number of repartitions mentioned through this argument is less than the existing DataFrame partitions, then the coalesce operation is used instead of the repartition operation to make the execution work. This is because the coalesce operation doesn’t require any shuffling like repartition which is known to be an expensive step.
column_order
- list of columns in the order in which Dataframe is to be written. If None or [] is specified, then the default order is applied.
Expand source code
def write_dataset(idf, file_path, file_type, file_configs={}, column_order=[]): """ This function saves the Spark DataFrame in the user-provided output path. Like read_dataset, this function is based on the generic Save functionality of Spark SQL. Parameters ---------- idf Input Dataframe i.e. Spark DataFrame to be saved file_path Path to output data (directory or filename). Compatible with local path and s3 path (when running in AWS environment). file_type "csv", "parquet", "avro", "json". Avro data source requires an external package to run, which can be configured with spark-submit (--packages org.apache.spark:spark-avro_2.11:2.4.0). file_configs This argument is passed in dictionary format as key/value pairs. Some of the potential keys are header, delimiter, mode, compression, repartition. compression options - uncompressed, gzip (doesn't work with avro), snappy (only valid for parquet) mode options - error (default), overwrite, append repartition - None (automatic partitioning) or an integer value () e.g. {"header":"True", "delimiter":",",'compression':'snappy','mode':'overwrite','repartition':'10'}. All the key/value pairs (except repartition, mode) written in this argument are passed as options to DataFrameWriter is available using Dataset.write operator. If the number of repartitions mentioned through this argument is less than the existing DataFrame partitions, then the coalesce operation is used instead of the repartition operation to make the execution work. This is because the coalesce operation doesn’t require any shuffling like repartition which is known to be an expensive step. column_order list of columns in the order in which Dataframe is to be written. If None or [] is specified, then the default order is applied. """ if not column_order: column_order = idf.columns else: if len(column_order) != len(idf.columns): raise ValueError( "Count of column(s) specified in column_order argument do not match Dataframe" ) diff_cols = [x for x in column_order if x not in set(idf.columns)] if diff_cols: raise ValueError( "Column(s) specified in column_order argument not found in Dataframe: " + str(diff_cols) ) mode = file_configs["mode"] if "mode" in file_configs else "error" repartition = ( int(file_configs["repartition"]) if "repartition" in file_configs else None ) if repartition is None: idf.select(column_order).write.format(file_type).options(**file_configs).save( file_path, mode=mode ) else: exist_parts = idf.rdd.getNumPartitions() req_parts = int(repartition) if req_parts > exist_parts: idf.select(column_order).repartition(req_parts).write.format( file_type ).options(**file_configs).save(file_path, mode=mode) else: idf.select(column_order).coalesce(req_parts).write.format( file_type ).options(**file_configs).save(file_path, mode=mode)