Skip to content

geospatial

The geospatial module supports transformation & calculation functions for geospatial fields, such as transforming between different formats, controlling geohash values precisions, checking if a location is inside a country, calculating centroid and Radius of Gyration, and reverse latitude-longitude pairs into address. Functions supported through this module are listed below: - geo_format_latlon - geo_format_cartesian - geo_format_geohash - location_distance - geohash_precision_control - location_in_polygon - location_in_country - centroid - weighted_centroid - rog_calculation - reverse_geocoding

Expand source code
"""
The geospatial module supports transformation & calculation functions for geospatial fields, such as transforming
between different formats, controlling geohash values precisions, checking if a location is inside a country, calculating
centroid and Radius of Gyration, and reverse latitude-longitude pairs into address.
Functions supported through this module are listed below:
- geo_format_latlon
- geo_format_cartesian
- geo_format_geohash
- location_distance
- geohash_precision_control
- location_in_polygon
- location_in_country
- centroid
- weighted_centroid
- rog_calculation
- reverse_geocoding
"""


from math import sin, cos, sqrt, atan2, pi, radians
from loguru import logger
from pyspark.sql import functions as F
from pyspark.sql import types as T
import reverse_geocoder as rg
import warnings
from anovos.data_ingest.data_ingest import recast_column
from anovos.data_transformer.geo_utils import (
    EARTH_RADIUS,
    from_latlon_decimal_degrees,
    to_latlon_decimal_degrees,
    haversine_distance,
    vincenty_distance,
    euclidean_distance,
    f_point_in_polygons,
    point_in_country_approx,
)


def geo_format_latlon(
    idf,
    list_of_lat,
    list_of_lon,
    input_format,
    output_format,
    result_prefix=[],
    optional_configs={"geohash_precision": 8, "radius": EARTH_RADIUS},
    output_mode="append",
):
    """
    This function is the main function to convert the input data's location columns from lat,lon format to desired format
    based on output_format. If output_mode is set to True, the original location columns will be dropped.
    For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then
    "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format
    If output_format is "dd" or "dms" or "radian", 2 new columns containing "_lat_", "_long_" in the column names will be created.
    If output_format is "cartesian", 3 new columns containing "_x_", "_y_", "_z_" in the column names will be created.
    If output_format is "geohash", 1 new column containing "_geohash" in the column name will be created.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_lat
        List of columns representing latitude e.g., ["lat1","lat2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
    list_of_lon
        List of columns representing longitude e.g., ["lon1","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2".
        list_of_lon must have the same length as list_of_lat such that i-th element of
        list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
    input_format
        "dd", "dms", "radian".
        "dd" represents latitude and longitude in decimal degrees.
        "dms" represents latitude and longitude in degrees minutes second.
        "radian" represents latitude and longitude in radians.
    output_format
        "dd", "dms", "radian", "cartesian", "geohash".
        "cartesian" represents the Cartesian coordinates of the point in three-dimensional space.
        "geohash" represents geocoded locations.
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_lat and list_of_lon.
        If it is empty, <lat>_<lon> will be used for each lat-lon pair.
        For example, list_of_lat is "lat1|lat2", list_of_lon is "L1|L2".
        Case 1: result_prefix = "L1|L2".
            If output_format is "dd", "dms" or "radian", new columns will be named as
            L1_lat_<output_format>, L1_lon_<output_format>, L2_lat_<output_format>, L2_lon_<output_format>.
            If output_format is "cartesian", new columns will be named as
            L1_x, L1_y, L1_z, L2_x, L2_y, L2_z.
            If output_format is "geohash", new columns will be named as
            L1_geohash and L2_geohash.
        Case 2: result_prefix = [].
            Prefixes "L1" and "L2" in above column names will be replaced by "lat1_lon1" and "lat2_lon2".
        (Default value = [])
    optional_configs
        The following keys can be used:
        - geohash_precision: precision of the resultant geohash. This key is only used when output_format
          is "geohash". (Default value = 8)
        - radius: radius of Earth. Necessary only when output_format is "cartesian".
          (Default value = EARTH_RADIUS)
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """
    geohash_precision = int(optional_configs.get("geohash_precision", 8))
    radius = optional_configs.get("radius", EARTH_RADIUS)

    if isinstance(list_of_lat, str):
        list_of_lat = [x.strip() for x in list_of_lat.split("|")]
    if isinstance(list_of_lon, str):
        list_of_lon = [x.strip() for x in list_of_lon.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_lat + list_of_lon):
        raise TypeError("Invalid input for list_of_lat or list_of_lon")

    format_list = ["dd", "dms", "radian", "cartesian", "geohash"]
    if (input_format not in format_list[:3]) or (output_format not in format_list):
        raise TypeError("Invalid input for input_format or output_format")

    if len(list_of_lat) != len(list_of_lon):
        raise TypeError("list_of_lat and list_of_lon must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_lat)):
        raise TypeError(
            "result_prefix must have the same length as list_of_lat and list_of_lon if it is not empty"
        )

    f_to_latlon_dd = F.udf(
        lambda loc: to_latlon_decimal_degrees(loc, input_format, radius),
        T.ArrayType(T.FloatType()),
    )

    from_latlon_dd_ = lambda loc: from_latlon_decimal_degrees(
        loc, output_format, radius, geohash_precision
    )
    if output_format in ["dd", "radian", "cartesian"]:
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.ArrayType(T.FloatType()))
    elif output_format == "dms":
        f_from_latlon_dd = F.udf(
            from_latlon_dd_, T.ArrayType(T.ArrayType(T.FloatType()))
        )
    elif output_format == "geohash":
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.StringType())

    odf = idf
    for i, (lat, lon) in enumerate(zip(list_of_lat, list_of_lon)):
        col = result_prefix[i] if result_prefix else (lat + "_" + lon)

        odf = odf.withColumn(col + "_temp", F.array(lat, lon)).withColumn(
            col + "_" + output_format, f_from_latlon_dd(f_to_latlon_dd(col + "_temp"))
        )

        if output_format in ["dd", "dms", "radian"]:
            odf = (
                odf.withColumn(
                    col + "_lat_" + output_format, F.col(col + "_" + output_format)[0]
                )
                .withColumn(
                    col + "_lon_" + output_format, F.col(col + "_" + output_format)[1]
                )
                .drop(col + "_" + output_format)
            )

        if output_format == "cartesian":
            odf = (
                odf.withColumn(col + "_x", F.col(col + "_" + output_format)[0])
                .withColumn(col + "_y", F.col(col + "_" + output_format)[1])
                .withColumn(col + "_z", F.col(col + "_" + output_format)[2])
                .drop(col + "_" + output_format)
            )

        odf = odf.drop(col + "_temp")

        if output_mode == "replace":
            odf = odf.drop(lat, lon)

    return odf


def geo_format_cartesian(
    idf,
    list_of_x,
    list_of_y,
    list_of_z,
    output_format,
    result_prefix=[],
    optional_configs={"geohash_precision": 8, "radius": EARTH_RADIUS},
    output_mode="append",
):
    """
    This function helps to convert the input data's location columns from cartesian format to desired format based on
    output_format. If output_mode is set to True, the original location columns will be dropped.
    For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then
    "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format
    If output_format is "dd" or "dms" or "radian", 2 new columns containing "_lat_", "_long_" in the column names will be created.
    If output_format is "geohash", 1 new column containing "_geohash" in the column name will be created.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_x
        List of columns representing x axis values e.g., ["x1","x2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "x1|x2".
    list_of_y
        List of columns representing y axis values e.g., ["y1","y2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "y1|y2".
    list_of_z
        List of columns representing z axis values e.g., ["z1","z2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "z1|z2".
        list_of_x, list_of_y and list_of_z must have the same length such that the
        i-th element of 3 lists form an x-y-z pair to format.
    output_format
        "dd", "dms", "radian", "geohash"
        "dd" represents latitude and longitude in decimal degrees.
        "dms" represents latitude and longitude in degrees minutes second.
        "radian" represents latitude and longitude in radians.
        "geohash" represents geocoded locations.
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_x, list_of_y and list_of_z.
        If it is empty, <x>_<y>_<z> will be used for each x-y-z pair.
        For example, list_of_x is "x1|x2", list_of_y is "y1|y2" and list_of_z is "z1|z2"
        Case 1: result_prefix = "L1|L2".
            If output_format is "dd", "dms" or "radian", new columns will be named as
            L1_lat_<output_format>, L1_lon_<output_format>, L2_lat_<output_format>, L2_lon_<output_format>.
            If output_format is "geohash", new columns will be named as
            L1_geohash and L2_geohash.
        Case 2: result_prefix = [].
            Prefixes "L1" and "L2" in above column names will be replaced by "x1_y1_z1" and "x2_y2_z2".
        (Default value = [])
    optional_configs
        The following keys can be used:
        - geohash_precision: precision of the resultant geohash. This key is only used when output_format
          is "geohash". (Default value = 8)
        - radius: radius of Earth. (Default value = EARTH_RADIUS)
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    geohash_precision = int(optional_configs.get("geohash_precision", 8))
    radius = optional_configs.get("radius", EARTH_RADIUS)

    if isinstance(list_of_x, str):
        list_of_x = [x.strip() for x in list_of_x.split("|")]
    if isinstance(list_of_y, str):
        list_of_y = [x.strip() for x in list_of_y.split("|")]
    if isinstance(list_of_z, str):
        list_of_z = [x.strip() for x in list_of_z.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_x + list_of_y + list_of_z):
        raise TypeError("Invalid input for list_of_x or list_of_y or list_of_z")

    format_list = ["dd", "dms", "radian", "geohash"]
    if output_format not in format_list:
        raise TypeError("Invalid input for output_format")

    if len({len(list_of_x), len(list_of_y), len(list_of_z)}) != 1:
        raise TypeError("list_of_x, list_of_y and list_of_z must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_x)):
        raise TypeError(
            "result_prefix must have the same length as list_of_x, list_of_y and list_of_y if it is not empty"
        )

    f_to_latlon_dd = F.udf(
        lambda loc: to_latlon_decimal_degrees(loc, "cartesian", radius),
        T.ArrayType(T.FloatType()),
    )

    from_latlon_dd_ = lambda loc: from_latlon_decimal_degrees(
        loc, output_format, radius, geohash_precision
    )
    if output_format in ["dd", "radian"]:
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.ArrayType(T.FloatType()))
    elif output_format == "dms":
        f_from_latlon_dd = F.udf(
            from_latlon_dd_, T.ArrayType(T.ArrayType(T.FloatType()))
        )
    elif output_format == "geohash":
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.StringType())

    odf = idf
    for i, (x, y, z) in enumerate(zip(list_of_x, list_of_y, list_of_z)):
        col = result_prefix[i] if result_prefix else (x + "_" + y + "_" + z)

        odf = odf.withColumn(col + "_temp", F.array(x, y, z)).withColumn(
            col + "_" + output_format, f_from_latlon_dd(f_to_latlon_dd(col + "_temp"))
        )

        if output_format in ["dd", "dms", "radian"]:
            odf = (
                odf.withColumn(
                    col + "_lat_" + output_format, F.col(col + "_" + output_format)[0]
                )
                .withColumn(
                    col + "_lon_" + output_format, F.col(col + "_" + output_format)[1]
                )
                .drop(col + "_" + output_format)
            )

        odf = odf.drop(col + "_temp")

        if output_mode == "replace":
            odf = odf.drop(x, y, z)

    return odf


def geo_format_geohash(
    idf,
    list_of_geohash,
    output_format,
    result_prefix=[],
    optional_configs={"radius": EARTH_RADIUS},
    output_mode="append",
):
    """
    This function is the main function to convert the input data's location columns from geohash  format to desired
    format based on output_format. If output_mode is set to True, the original location columns will be dropped.
    For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then
    "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format
    If output_format is "dd" or "dms" or "radian", 2 new columns containing "_lat_", "_long_" in the column names will be created.
    If output_format is "cartesian", 3 new columns containing "_x_", "_y_", "_z_" in the column names will be created.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_geohash
        List of columns representing geohash e.g., ["gh1","gh2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "gh1|gh2".
    output_format
        "dd", "dms", "radian", "cartesian"
        "dd" represents latitude and longitude in decimal degrees.
        "dms" represents latitude and longitude in degrees minutes second.
        "radian" represents latitude and longitude in radians.
        "cartesian" represents the Cartesian coordinates of the point in three-dimensional space.
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_geohash.
        If it is empty, <geohash column name> will be used as the prefix for transformed columns.
        For example, list_of_geohash is "gh1|gh2".
        Case 1: result_prefix = "L1|L2".
            If output_format is "dd", "dms" or "radian", new columns will be named as
            L1_lat_<output_format>, L1_lon_<output_format>, L2_lat_<output_format>, L2_lon_<output_format>.
            If output_format is "cartesian", new columns will be named as
            L1_x, L1_y, L1_z, L2_x, L2_y, L2_z.
        Case 2: result_prefix = [].
            Prefixes "L1" and "L2" in above column names will be replaced by "gh1" and "gh2".
        (Default value = [])
    optional_configs
        The following key can be used:
        - radius: radius of Earth. Necessary only when output_format is "cartesian".
          (Default value = EARTH_RADIUS)
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    radius = optional_configs.get("radius", EARTH_RADIUS)

    if isinstance(list_of_geohash, str):
        list_of_geohash = [x.strip() for x in list_of_geohash.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_geohash):
        raise TypeError("Invalid input for list_of_geohash")

    format_list = ["dd", "dms", "radian", "cartesian"]
    if output_format not in format_list:
        raise TypeError("Invalid input for output_format")

    if result_prefix and (len(result_prefix) != len(list_of_geohash)):
        raise TypeError(
            "result_prefix must have the same length as list_of_geohash if it is not empty"
        )

    f_to_latlon_dd = F.udf(
        lambda loc: to_latlon_decimal_degrees(loc, "geohash", radius),
        T.ArrayType(T.FloatType()),
    )

    from_latlon_dd_ = lambda loc: from_latlon_decimal_degrees(
        loc, output_format, radius
    )
    if output_format in ["dd", "radian", "cartesian"]:
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.ArrayType(T.FloatType()))
    elif output_format == "dms":
        f_from_latlon_dd = F.udf(
            from_latlon_dd_, T.ArrayType(T.ArrayType(T.FloatType()))
        )

    odf = idf
    for i, geohash in enumerate(list_of_geohash):
        col = result_prefix[i] if result_prefix else geohash

        odf = odf.withColumn(
            col + "_" + output_format, f_from_latlon_dd(f_to_latlon_dd(geohash))
        )

        if output_format in ["dd", "dms", "radian"]:
            odf = (
                odf.withColumn(
                    col + "_lat_" + output_format, F.col(col + "_" + output_format)[0]
                )
                .withColumn(
                    col + "_lon_" + output_format, F.col(col + "_" + output_format)[1]
                )
                .drop(col + "_" + output_format)
            )

        if output_format == "cartesian":
            odf = (
                odf.withColumn(col + "_x", F.col(col + "_" + output_format)[0])
                .withColumn(col + "_y", F.col(col + "_" + output_format)[1])
                .withColumn(col + "_z", F.col(col + "_" + output_format)[2])
                .drop(col + "_" + output_format)
            )

        if output_mode == "replace":
            odf = odf.drop(geohash)

    return odf


def location_distance(
    idf,
    list_of_cols_loc1,
    list_of_cols_loc2,
    loc_format="dd",
    result_prefix="",
    distance_type="haversine",
    unit="m",
    optional_configs={"radius": EARTH_RADIUS, "vincenty_model": "WGS-84"},
    output_mode="append",
):
    """
    This function calculates the distance between 2 locations, and the distance formula is determined by distance_type.
    If distance_type = "vincenty", thed loc_format should be "dd", and list_of_cols_loc1, list_of_cols_loc2
    should be in [lat1, lon1] and [lat2, lon2] format respectively. "vincenty_distance" function will be called to
    calculate the distance.
    If distance_type = "haversine", then loc_format should be "radian", and list_of_cols_loc1, list_of_cols_loc2
    should be in [lat1, lon1] and [lat2, lon2] format respectively. "haversine_distance" function will be called to
    calculate the distance.
     If distance_type = "euclidean", then loc_format should be "cartesian", and list_of_cols_loc1, list_of_cols_loc2
    should be in  [x1, y1, z1] and [x2, y2, z2] format respectively. "euclidean_distance" function will be called to
    calculate the distance.
    If loc_format does not match with distance_type's desired format, necessary conversion of location columns will be performed
    with the help of "geo_format_latlon", "geo_format_cartesian" and "geo_format_geohash" functions.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_cols_loc1
        List of columns to express the first location e.g., ["lat1","lon1"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lon1".
    list_of_cols_loc2
        List of columns to express the second location e.g., ["lat2","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat2|lon2".
    loc_format
        "dd", "dms", "radian", "cartesian", "geohash". (Default value = "dd")
    result_prefix
        Prefix for the newly generated column. It must be a string or a list with one element.
        If it is empty, <list_of_cols_loc1 joined by '_'>_<list_of_cols_loc2 joined by '_'>
        will be used as the prefix.
        For example, list_of_cols_loc1 is "lat1|lon1", list_of_cols_loc2 is "lat2|lon2".
        Case 1: result_prefix = "L1_L2": the new column will be named as L1_L2_distance.
        Case 2: result_prefix = []: the new column will be named as lat1_lon1_lat2_lon2_distance.
        (Default value = '')
    distance_type
        "vincenty", "haversine", "euclidean". (Default value = "haversine")
        "vincenty" option calculates the distance between two points on the surface of a spheroid.
        "haversine" option calculates the great-circle distance between two points on a sphere.
        "euclidean" option calculates the length of the line segment between two points.
    unit
        "m", "km".
        Unit of the result. (Default value = "m")
    optional_configs
        The following keys can be used:
        - radius: radius of Earth. Necessary only when output_format is "cartesian".
          (Default value = EARTH_RADIUS)
        - vincenty_model: The ellipsoidal model to use. Supported values: "WGS-84", "GRS-80", "Airy (1830)",
          "Intl 1924", "Clarke (1880)", "GRS-67". For more information, please refer to geopy.distance.ELLIPSOIDS.
          (Default value = "WGS-84")
    output_mode
        "replace", "append".
        "replace" option replaces original columns with transformed column.
        "append" option appends the transformed column to the input dataset with name "<loc1>_<loc2>_distance".
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    radius = optional_configs.get("radius", EARTH_RADIUS)
    vincenty_model = optional_configs.get("vincenty_model", "WGS-84")

    if isinstance(list_of_cols_loc1, str):
        list_of_cols_loc1 = [x.strip() for x in list_of_cols_loc1.split("|")]
    if isinstance(list_of_cols_loc2, str):
        list_of_cols_loc2 = [x.strip() for x in list_of_cols_loc2.split("|")]

    if isinstance(result_prefix, list):
        if len(result_prefix) > 1:
            raise TypeError(
                "If result_prefix is a list, it can contain maximally 1 element"
            )
        elif len(result_prefix) == 1:
            result_prefix = result_prefix[0]

    if any(i not in idf.columns for i in list_of_cols_loc1 + list_of_cols_loc2):
        raise TypeError("Invalid input for list_of_cols_loc1 or list_of_cols_loc2")

    if distance_type not in ["vincenty", "haversine", "euclidean"]:
        raise TypeError("Invalid input for distance_type")

    if loc_format not in ["dd", "dms", "radian", "cartesian", "geohash"]:
        raise TypeError("Invalid input for loc_format")

    format_mapping = {"vincenty": "dd", "haversine": "radian", "euclidean": "cartesian"}
    format_required = format_mapping[distance_type]

    if loc_format != format_required:
        if loc_format in ["dd", "dms", "radian"]:
            idf = geo_format_latlon(
                idf,
                list_of_lat=[list_of_cols_loc1[0], list_of_cols_loc2[0]],
                list_of_lon=[list_of_cols_loc1[1], list_of_cols_loc2[1]],
                input_format=loc_format,
                output_format=format_required,
                result_prefix=["temp_loc1", "temp_loc2"],
                optional_configs={"radius": radius},
                output_mode="append",
            )

        elif loc_format == "cartesian":
            idf = geo_format_cartesian(
                idf,
                list_of_x=[list_of_cols_loc1[0], list_of_cols_loc2[0]],
                list_of_y=[list_of_cols_loc1[1], list_of_cols_loc2[1]],
                list_of_z=[list_of_cols_loc1[2], list_of_cols_loc2[2]],
                output_format=format_required,
                result_prefix=["temp_loc1", "temp_loc2"],
                optional_configs={"radius": radius},
                output_mode="append",
            )

        elif loc_format == "geohash":
            idf = geo_format_geohash(
                idf,
                list_of_geohash=[list_of_cols_loc1[0], list_of_cols_loc2[0]],
                output_format=format_required,
                result_prefix=["temp_loc1", "temp_loc2"],
                optional_configs={"radius": radius},
                output_mode="append",
            )

        if format_required == "dd":
            loc1, loc2 = ["temp_loc1_lat_dd", "temp_loc1_lon_dd"], [
                "temp_loc2_lat_dd",
                "temp_loc2_lon_dd",
            ]

        elif format_required == "radian":
            loc1, loc2 = ["temp_loc1_lat_radian", "temp_loc1_lon_radian"], [
                "temp_loc2_lat_radian",
                "temp_loc2_lon_radian",
            ]

        elif format_required == "cartesian":
            loc1, loc2 = ["temp_loc1_x", "temp_loc1_y", "temp_loc1_z"], [
                "temp_loc2_x",
                "temp_loc2_y",
                "temp_loc2_z",
            ]

        idf = (
            idf.withColumn("temp_loc1", F.array(*loc1))
            .withColumn("temp_loc2", F.array(*loc2))
            .drop(*(loc1 + loc2))
        )
    else:
        idf = idf.withColumn("temp_loc1", F.array(*list_of_cols_loc1)).withColumn(
            "temp_loc2", F.array(*list_of_cols_loc2)
        )

    if distance_type == "vincenty":
        compute_distance = lambda x1, x2: vincenty_distance(
            x1, x2, unit, vincenty_model
        )
    elif distance_type == "haversine":
        compute_distance = lambda x1, x2: haversine_distance(
            x1, x2, "radian", unit, radius
        )
    else:
        compute_distance = lambda x1, x2: euclidean_distance(x1, x2, unit)

    f_compute_distance = F.udf(compute_distance, T.FloatType())

    col_prefix = (
        result_prefix
        if result_prefix
        else "_".join(list_of_cols_loc1) + "_" + "_".join(list_of_cols_loc2)
    )
    odf = idf.withColumn(
        col_prefix + "_distance", f_compute_distance("temp_loc1", "temp_loc2")
    ).drop("temp_loc1", "temp_loc2")

    if output_mode == "replace":
        odf = odf.drop(*(list_of_cols_loc1 + list_of_cols_loc2))

    return odf


def geohash_precision_control(
    idf, list_of_geohash, output_precision=8, km_max_error=None, output_mode="append"
):
    """
    This function controls the precision of input data's geohash columns.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_geohash
        List of columns in geohash format e.g., ["gh1","gh2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "gh1|gh2".
    output_precision
        Precision of the transformed geohash in the output dataframe. (Default value = 8)
    km_max_error
        Maximum permissible error in kilometers. If km_max_error is specified, output_precision
        will be ignored and km_max_error will be mapped to an output_precision according to the
        following dictionary: {2500: 1, 630: 2, 78: 3, 20: 4, 2.4: 5, 0.61: 6, 0.076: 7,
        0.019: 8, 0.0024: 9, 0.00060: 10, 0.000074: 11}. (Default value = None)
    output_mode
        "replace", "append".
        "replace" option replaces original columns with transformed column.
        "append" option appends the transformed column to the input dataset
        with postfix "_precision_<output_precision>". (Default value = "append")

    Returns
    -------
    DataFrame
    """

    if isinstance(list_of_geohash, str):
        list_of_geohash = [x.strip() for x in list_of_geohash.split("|")]

    if any(x not in idf.columns for x in list_of_geohash):
        raise TypeError("Invalid input for list_of_geohash")

    error_precision_mapping = {
        2500: 1,
        630: 2,
        78: 3,
        20: 4,
        2.4: 5,
        0.61: 6,
        0.076: 7,
        0.019: 8,
        0.0024: 9,
        0.00060: 10,
        0.000074: 11,
    }
    if km_max_error is not None:
        output_precision = 12
        for key, val in error_precision_mapping.items():
            if km_max_error >= key:
                output_precision = val
                break
    output_precision = int(output_precision)
    logger.info(
        "Precision of the output geohashes will be capped at "
        + str(output_precision)
        + "."
    )
    odf = idf
    for i, geohash in enumerate(list_of_geohash):
        if output_mode == "replace":
            col_name = geohash
        else:
            col_name = geohash + "_precision_" + str(output_precision)
        odf = odf.withColumn(col_name, F.substring(geohash, 1, output_precision))

    return odf


def location_in_polygon(
    idf, list_of_lat, list_of_lon, polygon, result_prefix=[], output_mode="append"
):
    """
    This function checks whether each lat-lon pair is insided a GeoJSON object. The following types of GeoJSON objects
    are supported by this function: Polygon, MultiPolygon, Feature or FeatureCollection

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_lat
        List of columns representing latitude e.g., ["lat1","lat2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
    list_of_lon
        List of columns representing longitude e.g., ["lon1","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2".
        list_of_lon must have the same length as list_of_lat such that i-th element of
        list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
    polygon
        The following types of GeoJSON objects are supported: Polygon, MultiPolygon, Feature or FeatureCollection
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_lat and list_of_lon.
        If it is empty, <lat>_<lon> will be used for each lat-lon pair.
        For example, list_of_lat is "lat1|lat2", list_of_lon is "lon1|lon2".
        Case 1: result_prefix = "L1|L2".
            New columns will be named as L1_in_poly and L2_in_poly.
        Calse 2: result_prefix = [].
            New columns will be named as lat1_lon1_in_poly and lat2_lon2_in_poly.
        (Default value = [])
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    if isinstance(list_of_lat, str):
        list_of_lat = [x.strip() for x in list_of_lat.split("|")]
    if isinstance(list_of_lon, str):
        list_of_lon = [x.strip() for x in list_of_lon.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_lat + list_of_lon):
        raise TypeError("Invalid input for list_of_lat or list_of_lon")

    if len(list_of_lat) != len(list_of_lon):
        raise TypeError("list_of_lat and list_of_lon must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_lat)):
        raise TypeError(
            "result_prefix must have the same length as list_of_lat and list_of_lon if it is not empty"
        )

    if "coordinates" in polygon.keys():
        polygon_list = polygon["coordinates"]
        if polygon["type"] == "Polygon":
            polygon_list = [polygon_list]
    elif "geometry" in polygon.keys():
        polygon_list = [polygon["geometry"]["coordinates"]]
    elif "features" in polygon.keys():
        polygon_list = []
        for poly in polygon["features"]:
            polygon_list.append(poly["geometry"]["coordinates"])

    odf = idf
    for i, (lat, lon) in enumerate(zip(list_of_lat, list_of_lon)):
        col = result_prefix[i] if result_prefix else (lat + "_" + lon)
        odf = odf.withColumn(
            col + "_in_poly", f_point_in_polygons(polygon_list)(F.col(lon), F.col(lat))
        )

        if output_mode == "replace":
            odf = odf.drop(lat, lon)

    return odf


def location_in_country(
    spark,
    idf,
    list_of_lat,
    list_of_lon,
    country,
    country_shapefile_path="",
    method_type="approx",
    result_prefix=[],
    output_mode="append",
):
    """
    This function checks whether each lat-lon pair is insided a country. Two ways of checking are supported: "approx" (using the
    bounding box of a country) and "exact" (using the shapefile of a country).

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe.
    list_of_lat
        List of columns representing latitude e.g., ["lat1","lat2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
    list_of_lon
        List of columns representing longitude e.g., ["lon1","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2".
        list_of_lon must have the same length as list_of_lat such that i-th element of
        list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
    country
        The Alpha-2 country code.
    country_shapefile_path
        The geojson file with a FeatureCollection object containing polygons for each country. One example
        file country_polygons.geojson can be downloaded from Anovos GitHub repository:
        https://github.com/anovos/anovos/tree/main/data/
    method_type
        "approx", "exact".
        "approx" uses the bounding box of a country to estimate whether a location is inside the country
        "exact" uses the shapefile of a country to calculate whether a location is inside the country
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_lat and list_of_lon.
        If it is empty, <lat>_<lon> will be used for each lat-lon pair.
        For example, list_of_lat is "lat1|lat2", list_of_lon is "lon1|lon2".
        Case 1: result_prefix = "L1|L2", country="US"
            New columns will be named as L1_in_US and L2_in_US.
        Calse 2: result_prefix = [], country="US"
            New columns will be named as lat1_lon1_in_US and lat2_lon2_in_US.
        (Default value = [])
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    if isinstance(list_of_lat, str):
        list_of_lat = [x.strip() for x in list_of_lat.split("|")]
    if isinstance(list_of_lon, str):
        list_of_lon = [x.strip() for x in list_of_lon.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_lat + list_of_lon):
        raise TypeError("Invalid input for list_of_lat or list_of_lon")

    if len(list_of_lat) != len(list_of_lon):
        raise TypeError("list_of_lat and list_of_lon must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_lat)):
        raise TypeError(
            "result_prefix must have the same length as list_of_lat and list_of_lon if it is not empty"
        )
    if method_type not in ("approx", "exact"):
        raise TypeError("Invalid input for method_type.")

    f_point_in_country_approx = F.udf(point_in_country_approx, T.IntegerType())

    if method_type == "exact":

        def zip_feats(x, y):
            """zipping two features (in list form) elementwise"""
            return zip(x, y)

        f_zip_feats = F.udf(
            zip_feats,
            T.ArrayType(
                T.StructType(
                    [
                        T.StructField("first", T.StringType()),
                        T.StructField(
                            "second",
                            T.ArrayType(
                                T.ArrayType(T.ArrayType(T.ArrayType(T.DoubleType())))
                            ),
                        ),
                    ]
                )
            ),
        )

        geo_data = spark.read.json(country_shapefile_path, multiLine=True).withColumn(
            "tmp",
            f_zip_feats("features.properties.ISO_A2", "features.geometry.coordinates"),
        )
        polygon_list = (
            geo_data.select(F.explode(F.col("tmp")).alias("country_coord"))
            .withColumn("country_code", F.col("country_coord").getItem("first"))
            .withColumn("coordinates", F.col("country_coord").getItem("second"))
            .where(F.col("country_code") == country)
            .select("coordinates")
            .rdd.map(lambda x: x[0])
            .collect()[0]
        )
        print("No. of polygon: " + str(len(polygon_list)))

        min_lon, min_lat = polygon_list[0][0][0]
        max_lon, max_lat = polygon_list[0][0][0]
        for polygon in polygon_list:
            exterior = polygon[0]
            for loc in exterior:
                if loc[0] < min_lon:
                    min_lon = loc[0]
                elif loc[0] > max_lon:
                    max_lon = loc[0]

                if loc[1] < min_lat:
                    min_lat = loc[1]
                elif loc[1] > max_lat:
                    max_lat = loc[1]

    odf = idf
    for i, (lat, lon) in enumerate(zip(list_of_lat, list_of_lon)):
        col = result_prefix[i] if result_prefix else (lat + "_" + lon)

        if method_type == "exact":
            odf = odf.withColumn(
                col + "_in_" + country + "_exact",
                f_point_in_polygons(
                    polygon_list, [min_lon, min_lat], [max_lon, max_lat]
                )(F.col(lon), F.col(lat)),
            )
        else:
            odf = odf.withColumn(
                col + "_in_" + country + "_approx",
                f_point_in_country_approx(F.col(lat), F.col(lon), F.lit(country)),
            )

        if output_mode == "replace":
            odf = odf.drop(lat, lon)

    return odf


def centroid(idf, lat_col, long_col, id_col=None):
    """
    This function calculates the centroid of a given DataFrame using lat-long pairs based on its identifier column (if applicable)

    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    id_col
        Column in the input DataFrame that contains identifier for the DataFrame (Default is None)
        If id_col=None, the function will calculate centriod of latitude and longitude for the whole dataset.
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains lat_centroid and long_centroid and identifier (if applicable)
    """
    if id_col not in idf.columns and id_col:
        raise TypeError("Invalid input for id_col")
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def degree_to_radian(deg):
        return deg * pi / 180

    f_degree_to_radian = F.udf(degree_to_radian, T.FloatType())

    idf_rad = (
        idf.withColumn("lat_rad", f_degree_to_radian(lat_col))
        .withColumn("long_rad", f_degree_to_radian(long_col))
        .withColumn("x", F.cos("lat_rad") * F.cos("long_rad"))
        .withColumn("y", F.cos("lat_rad") * F.sin("long_rad"))
        .withColumn("z", F.sin("lat_rad"))
    )
    if id_col:
        idf_groupby = idf_rad.groupby(id_col).agg(
            F.sum("x").alias("x_group"),
            F.sum("y").alias("y_group"),
            F.sum("z").alias("z_group"),
        )

        odf = (
            idf_groupby.withColumn(
                "hyp",
                F.sqrt(
                    F.col("x_group") * F.col("x_group")
                    + F.col("y_group") * F.col("y_group")
                ),
            )
            .withColumn(
                lat_col + "_centroid",
                F.atan2(F.col("z_group"), F.col("hyp")) * 180 / pi,
            )
            .withColumn(
                long_col + "_centroid",
                F.atan2(F.col("y_group"), F.col("x_group")) * 180 / pi,
            )
            .select(id_col, lat_col + "_centroid", long_col + "_centroid")
        )
    else:
        idf_groupby = idf_rad.groupby().agg(
            F.sum("x").alias("x_group"),
            F.sum("y").alias("y_group"),
            F.sum("z").alias("z_group"),
        )

        odf = (
            idf_groupby.withColumn(
                "hyp",
                F.sqrt(
                    F.col("x_group") * F.col("x_group")
                    + F.col("y_group") * F.col("y_group")
                ),
            )
            .withColumn(
                lat_col + "_centroid",
                F.atan2(F.col("z_group"), F.col("hyp")) * 180 / pi,
            )
            .withColumn(
                long_col + "_centroid",
                F.atan2(F.col("y_group"), F.col("x_group")) * 180 / pi,
            )
            .select(lat_col + "_centroid", long_col + "_centroid")
        )
    return odf


def weighted_centroid(idf, id_col, lat_col, long_col):
    """
    This function calculates the weighted centroid of a given DataFrame using lat-long pairs based on its identifier column

    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    id_col
        Column in the input DataFrame that contains identifier for the DataFrame
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains weighted lat_centroid and long_centroid and identifier
    """
    if id_col not in idf.columns:
        raise TypeError("Invalid input for id_col")
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def degree_to_radian(deg):
        return deg * pi / 180

    f_degree_to_radian = F.udf(degree_to_radian, T.FloatType())

    idf_rad = (
        idf.withColumn("lat_rad", f_degree_to_radian(lat_col))
        .withColumn("long_rad", f_degree_to_radian(long_col))
        .withColumn("x", F.cos("lat_rad") * F.cos("long_rad"))
        .withColumn("y", F.cos("lat_rad") * F.sin("long_rad"))
        .withColumn("z", F.sin("lat_rad"))
    )

    idf_groupby = (
        idf_rad.groupby(id_col)
        .agg(
            F.sum("x").alias("x_group"),
            F.sum("y").alias("y_group"),
            F.sum("z").alias("z_group"),
            F.count(id_col).alias("weight_group"),
        )
        .withColumn("weighted_x", F.col("x_group") * F.col("weight_group"))
        .withColumn("weighted_y", F.col("y_group") * F.col("weight_group"))
        .withColumn("weighted_z", F.col("z_group") * F.col("weight_group"))
    )

    total_weight = (
        idf_groupby.groupBy()
        .agg(F.sum("weight_group"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )
    total_x = (
        idf_groupby.groupBy()
        .agg(F.sum("weighted_x"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )
    total_y = (
        idf_groupby.groupBy()
        .agg(F.sum("weighted_y"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )
    total_z = (
        idf_groupby.groupBy()
        .agg(F.sum("weighted_z"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )

    x = total_x / total_weight
    y = total_y / total_weight
    z = total_z / total_weight
    hyp = sqrt(x * x + y * y)
    lat_centroid, long_centroid = atan2(z, hyp) * 180 / pi, atan2(y, x) * 180 / pi

    odf = (
        idf_groupby.select(id_col)
        .withColumn(lat_col + "_centroid", F.lit(lat_centroid))
        .withColumn(long_col + "_centroid", F.lit(long_centroid))
    )

    return odf


def rog_calculation(idf, lat_col, long_col, id_col=None):
    """
    This function calculates the Radius of Gyration (in meter) of a given DataFrame, based on its identifier column (if applicable)
    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    id_col
        Column in the input DataFrame that contains identifier for the DataFrame (Default is None)
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains Radius of Gyration (in meter) and identifier (if applicable)
    """
    if id_col not in idf.columns and id_col:
        raise TypeError("Invalid input for id_col")
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def getHaversineDist(lat1, lon1, lat2, lon2):

        R = 6378126  # approximate radius of earth in m

        lat1 = radians(float(lat1))
        lon1 = radians(float(lon1))
        lat2 = radians(float(lat2))
        lon2 = radians(float(lon2))

        dlon = lon2 - lon1
        dlat = lat2 - lat1
        a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
        c = 2 * atan2(sqrt(a), sqrt(1 - a))
        distance = R * c
        return distance

    udf_harver_dist = F.udf(getHaversineDist, T.FloatType())

    if id_col:
        idf_centroid = centroid(idf, lat_col, long_col, id_col)
        idf_join = idf_centroid.join(idf, id_col, "inner")
        idf_calc = idf_join.withColumn(
            "distance",
            udf_harver_dist(
                F.col(lat_col),
                F.col(long_col),
                F.col(lat_col + "_centroid"),
                F.col(long_col + "_centroid"),
            ),
        )

        odf = idf_calc.groupby(id_col).agg(
            F.mean("distance").alias("radius_of_gyration")
        )
    else:
        centroid_info = centroid(idf, lat_col, long_col, id_col).rdd.collect()
        lat_centroid = centroid_info[0][0]
        long_centroid = centroid_info[0][1]
        idf_join = idf.withColumn(
            lat_col + "_centroid", F.lit(lat_centroid)
        ).withColumn(long_col + "_centroid", F.lit(long_centroid))
        idf_calc = idf_join.withColumn(
            "distance",
            udf_harver_dist(
                F.col(lat_col),
                F.col(long_col),
                F.col(lat_col + "_centroid"),
                F.col(long_col + "_centroid"),
            ),
        )

        odf = idf_calc.groupby().agg(F.mean("distance").alias("radius_of_gyration"))
    return odf


def reverse_geocoding(idf, lat_col, long_col):
    """
    This function reverses the input latitude and longitude of a given DataFrame into address
    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains latitude, longitude and address appropriately
    """
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def reverse_geocode(lat, long):
        coordinates = (float(lat), float(long))
        location = rg.search(coordinates, mode=1)
        if location:
            return (
                str(location[0]["name"])
                + ","
                + str(location[0]["admin1"])
                + ","
                + str(location[0]["cc"])
            )
        else:
            return "N/A"

    udf_reverse_geocode = F.udf(reverse_geocode)
    odf = (
        idf.withColumn("info", udf_reverse_geocode(F.col(lat_col), F.col(long_col)))
        .select(lat_col, long_col, "info")
        .withColumn("name_of_place", F.split(F.col("info"), ",").getItem(0))
        .withColumn("region", F.split(F.col("info"), ",").getItem(1))
        .withColumn("country_code", F.split(F.col("info"), ",").getItem(2))
        .drop("info")
    )
    return odf

Functions

def centroid(idf, lat_col, long_col, id_col=None)

This function calculates the centroid of a given DataFrame using lat-long pairs based on its identifier column (if applicable)

Parameters

idf
Input Dataframe
lat_col
Column in the input DataFrame that contains latitude data
long_col
Column in the input DataFrame that contains longitude data
id_col
Column in the input DataFrame that contains identifier for the DataFrame (Default is None) If id_col=None, the function will calculate centriod of latitude and longitude for the whole dataset.

Returns

odf : DataFrame
Output DataFrame, which contains lat_centroid and long_centroid and identifier (if applicable)
Expand source code
def centroid(idf, lat_col, long_col, id_col=None):
    """
    This function calculates the centroid of a given DataFrame using lat-long pairs based on its identifier column (if applicable)

    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    id_col
        Column in the input DataFrame that contains identifier for the DataFrame (Default is None)
        If id_col=None, the function will calculate centriod of latitude and longitude for the whole dataset.
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains lat_centroid and long_centroid and identifier (if applicable)
    """
    if id_col not in idf.columns and id_col:
        raise TypeError("Invalid input for id_col")
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def degree_to_radian(deg):
        return deg * pi / 180

    f_degree_to_radian = F.udf(degree_to_radian, T.FloatType())

    idf_rad = (
        idf.withColumn("lat_rad", f_degree_to_radian(lat_col))
        .withColumn("long_rad", f_degree_to_radian(long_col))
        .withColumn("x", F.cos("lat_rad") * F.cos("long_rad"))
        .withColumn("y", F.cos("lat_rad") * F.sin("long_rad"))
        .withColumn("z", F.sin("lat_rad"))
    )
    if id_col:
        idf_groupby = idf_rad.groupby(id_col).agg(
            F.sum("x").alias("x_group"),
            F.sum("y").alias("y_group"),
            F.sum("z").alias("z_group"),
        )

        odf = (
            idf_groupby.withColumn(
                "hyp",
                F.sqrt(
                    F.col("x_group") * F.col("x_group")
                    + F.col("y_group") * F.col("y_group")
                ),
            )
            .withColumn(
                lat_col + "_centroid",
                F.atan2(F.col("z_group"), F.col("hyp")) * 180 / pi,
            )
            .withColumn(
                long_col + "_centroid",
                F.atan2(F.col("y_group"), F.col("x_group")) * 180 / pi,
            )
            .select(id_col, lat_col + "_centroid", long_col + "_centroid")
        )
    else:
        idf_groupby = idf_rad.groupby().agg(
            F.sum("x").alias("x_group"),
            F.sum("y").alias("y_group"),
            F.sum("z").alias("z_group"),
        )

        odf = (
            idf_groupby.withColumn(
                "hyp",
                F.sqrt(
                    F.col("x_group") * F.col("x_group")
                    + F.col("y_group") * F.col("y_group")
                ),
            )
            .withColumn(
                lat_col + "_centroid",
                F.atan2(F.col("z_group"), F.col("hyp")) * 180 / pi,
            )
            .withColumn(
                long_col + "_centroid",
                F.atan2(F.col("y_group"), F.col("x_group")) * 180 / pi,
            )
            .select(lat_col + "_centroid", long_col + "_centroid")
        )
    return odf
def geo_format_cartesian(idf, list_of_x, list_of_y, list_of_z, output_format, result_prefix=[], optional_configs={'geohash_precision': 8, 'radius': 6371009}, output_mode='append')

This function helps to convert the input data's location columns from cartesian format to desired format based on output_format. If output_mode is set to True, the original location columns will be dropped. For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format If output_format is "dd" or "dms" or "radian", 2 new columns containing "lat", "long" in the column names will be created. If output_format is "geohash", 1 new column containing "_geohash" in the column name will be created.

Parameters

idf
Input Dataframe.
list_of_x
List of columns representing x axis values e.g., ["x1","x2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "x1|x2".
list_of_y
List of columns representing y axis values e.g., ["y1","y2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "y1|y2".
list_of_z
List of columns representing z axis values e.g., ["z1","z2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "z1|z2". list_of_x, list_of_y and list_of_z must have the same length such that the i-th element of 3 lists form an x-y-z pair to format.
output_format
"dd", "dms", "radian", "geohash" "dd" represents latitude and longitude in decimal degrees. "dms" represents latitude and longitude in degrees minutes second. "radian" represents latitude and longitude in radians. "geohash" represents geocoded locations.
result_prefix
List of prefixes for the newly generated column names. Alternatively, prefixes can be specified in a string format, where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2". result_prefix must have the same length as list_of_x, list_of_y and list_of_z. If it is empty, will be used for each x-y-z pair. For example, list_of_x is "x1|x2", list_of_y is "y1|y2" and list_of_z is "z1|z2" Case 1: result_prefix = "L1|L2". If output_format is "dd", "dms" or "radian", new columns will be named as L1_lat_, L1_lon_, L2_lat_, L2_lon_. If output_format is "geohash", new columns will be named as L1_geohash and L2_geohash. Case 2: result_prefix = []. Prefixes "L1" and "L2" in above column names will be replaced by "x1_y1_z1" and "x2_y2_z2". (Default value = [])
optional_configs
The following keys can be used: - geohash_precision: precision of the resultant geohash. This key is only used when output_format is "geohash". (Default value = 8) - radius: radius of Earth. (Default value = EARTH_RADIUS)
output_mode
"replace", "append". "replace" option appends transformed column to the input dataset and removes the original ones. "append" option appends transformed column to the input dataset. (Default value = "append")

Returns

DataFrame
 
Expand source code
def geo_format_cartesian(
    idf,
    list_of_x,
    list_of_y,
    list_of_z,
    output_format,
    result_prefix=[],
    optional_configs={"geohash_precision": 8, "radius": EARTH_RADIUS},
    output_mode="append",
):
    """
    This function helps to convert the input data's location columns from cartesian format to desired format based on
    output_format. If output_mode is set to True, the original location columns will be dropped.
    For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then
    "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format
    If output_format is "dd" or "dms" or "radian", 2 new columns containing "_lat_", "_long_" in the column names will be created.
    If output_format is "geohash", 1 new column containing "_geohash" in the column name will be created.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_x
        List of columns representing x axis values e.g., ["x1","x2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "x1|x2".
    list_of_y
        List of columns representing y axis values e.g., ["y1","y2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "y1|y2".
    list_of_z
        List of columns representing z axis values e.g., ["z1","z2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "z1|z2".
        list_of_x, list_of_y and list_of_z must have the same length such that the
        i-th element of 3 lists form an x-y-z pair to format.
    output_format
        "dd", "dms", "radian", "geohash"
        "dd" represents latitude and longitude in decimal degrees.
        "dms" represents latitude and longitude in degrees minutes second.
        "radian" represents latitude and longitude in radians.
        "geohash" represents geocoded locations.
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_x, list_of_y and list_of_z.
        If it is empty, <x>_<y>_<z> will be used for each x-y-z pair.
        For example, list_of_x is "x1|x2", list_of_y is "y1|y2" and list_of_z is "z1|z2"
        Case 1: result_prefix = "L1|L2".
            If output_format is "dd", "dms" or "radian", new columns will be named as
            L1_lat_<output_format>, L1_lon_<output_format>, L2_lat_<output_format>, L2_lon_<output_format>.
            If output_format is "geohash", new columns will be named as
            L1_geohash and L2_geohash.
        Case 2: result_prefix = [].
            Prefixes "L1" and "L2" in above column names will be replaced by "x1_y1_z1" and "x2_y2_z2".
        (Default value = [])
    optional_configs
        The following keys can be used:
        - geohash_precision: precision of the resultant geohash. This key is only used when output_format
          is "geohash". (Default value = 8)
        - radius: radius of Earth. (Default value = EARTH_RADIUS)
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    geohash_precision = int(optional_configs.get("geohash_precision", 8))
    radius = optional_configs.get("radius", EARTH_RADIUS)

    if isinstance(list_of_x, str):
        list_of_x = [x.strip() for x in list_of_x.split("|")]
    if isinstance(list_of_y, str):
        list_of_y = [x.strip() for x in list_of_y.split("|")]
    if isinstance(list_of_z, str):
        list_of_z = [x.strip() for x in list_of_z.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_x + list_of_y + list_of_z):
        raise TypeError("Invalid input for list_of_x or list_of_y or list_of_z")

    format_list = ["dd", "dms", "radian", "geohash"]
    if output_format not in format_list:
        raise TypeError("Invalid input for output_format")

    if len({len(list_of_x), len(list_of_y), len(list_of_z)}) != 1:
        raise TypeError("list_of_x, list_of_y and list_of_z must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_x)):
        raise TypeError(
            "result_prefix must have the same length as list_of_x, list_of_y and list_of_y if it is not empty"
        )

    f_to_latlon_dd = F.udf(
        lambda loc: to_latlon_decimal_degrees(loc, "cartesian", radius),
        T.ArrayType(T.FloatType()),
    )

    from_latlon_dd_ = lambda loc: from_latlon_decimal_degrees(
        loc, output_format, radius, geohash_precision
    )
    if output_format in ["dd", "radian"]:
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.ArrayType(T.FloatType()))
    elif output_format == "dms":
        f_from_latlon_dd = F.udf(
            from_latlon_dd_, T.ArrayType(T.ArrayType(T.FloatType()))
        )
    elif output_format == "geohash":
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.StringType())

    odf = idf
    for i, (x, y, z) in enumerate(zip(list_of_x, list_of_y, list_of_z)):
        col = result_prefix[i] if result_prefix else (x + "_" + y + "_" + z)

        odf = odf.withColumn(col + "_temp", F.array(x, y, z)).withColumn(
            col + "_" + output_format, f_from_latlon_dd(f_to_latlon_dd(col + "_temp"))
        )

        if output_format in ["dd", "dms", "radian"]:
            odf = (
                odf.withColumn(
                    col + "_lat_" + output_format, F.col(col + "_" + output_format)[0]
                )
                .withColumn(
                    col + "_lon_" + output_format, F.col(col + "_" + output_format)[1]
                )
                .drop(col + "_" + output_format)
            )

        odf = odf.drop(col + "_temp")

        if output_mode == "replace":
            odf = odf.drop(x, y, z)

    return odf
def geo_format_geohash(idf, list_of_geohash, output_format, result_prefix=[], optional_configs={'radius': 6371009}, output_mode='append')

This function is the main function to convert the input data's location columns from geohash format to desired format based on output_format. If output_mode is set to True, the original location columns will be dropped. For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format If output_format is "dd" or "dms" or "radian", 2 new columns containing "lat", "long" in the column names will be created. If output_format is "cartesian", 3 new columns containing "x", "y", "z" in the column names will be created.

Parameters

idf
Input Dataframe.
list_of_geohash
List of columns representing geohash e.g., ["gh1","gh2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "gh1|gh2".
output_format
"dd", "dms", "radian", "cartesian" "dd" represents latitude and longitude in decimal degrees. "dms" represents latitude and longitude in degrees minutes second. "radian" represents latitude and longitude in radians. "cartesian" represents the Cartesian coordinates of the point in three-dimensional space.
result_prefix
List of prefixes for the newly generated column names. Alternatively, prefixes can be specified in a string format, where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2". result_prefix must have the same length as list_of_geohash. If it is empty, will be used as the prefix for transformed columns. For example, list_of_geohash is "gh1|gh2". Case 1: result_prefix = "L1|L2". If output_format is "dd", "dms" or "radian", new columns will be named as L1_lat_, L1_lon_, L2_lat_, L2_lon_. If output_format is "cartesian", new columns will be named as L1_x, L1_y, L1_z, L2_x, L2_y, L2_z. Case 2: result_prefix = []. Prefixes "L1" and "L2" in above column names will be replaced by "gh1" and "gh2". (Default value = [])
optional_configs
The following key can be used: - radius: radius of Earth. Necessary only when output_format is "cartesian". (Default value = EARTH_RADIUS)
output_mode
"replace", "append". "replace" option appends transformed column to the input dataset and removes the original ones. "append" option appends transformed column to the input dataset. (Default value = "append")

Returns

DataFrame
 
Expand source code
def geo_format_geohash(
    idf,
    list_of_geohash,
    output_format,
    result_prefix=[],
    optional_configs={"radius": EARTH_RADIUS},
    output_mode="append",
):
    """
    This function is the main function to convert the input data's location columns from geohash  format to desired
    format based on output_format. If output_mode is set to True, the original location columns will be dropped.
    For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then
    "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format
    If output_format is "dd" or "dms" or "radian", 2 new columns containing "_lat_", "_long_" in the column names will be created.
    If output_format is "cartesian", 3 new columns containing "_x_", "_y_", "_z_" in the column names will be created.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_geohash
        List of columns representing geohash e.g., ["gh1","gh2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "gh1|gh2".
    output_format
        "dd", "dms", "radian", "cartesian"
        "dd" represents latitude and longitude in decimal degrees.
        "dms" represents latitude and longitude in degrees minutes second.
        "radian" represents latitude and longitude in radians.
        "cartesian" represents the Cartesian coordinates of the point in three-dimensional space.
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_geohash.
        If it is empty, <geohash column name> will be used as the prefix for transformed columns.
        For example, list_of_geohash is "gh1|gh2".
        Case 1: result_prefix = "L1|L2".
            If output_format is "dd", "dms" or "radian", new columns will be named as
            L1_lat_<output_format>, L1_lon_<output_format>, L2_lat_<output_format>, L2_lon_<output_format>.
            If output_format is "cartesian", new columns will be named as
            L1_x, L1_y, L1_z, L2_x, L2_y, L2_z.
        Case 2: result_prefix = [].
            Prefixes "L1" and "L2" in above column names will be replaced by "gh1" and "gh2".
        (Default value = [])
    optional_configs
        The following key can be used:
        - radius: radius of Earth. Necessary only when output_format is "cartesian".
          (Default value = EARTH_RADIUS)
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    radius = optional_configs.get("radius", EARTH_RADIUS)

    if isinstance(list_of_geohash, str):
        list_of_geohash = [x.strip() for x in list_of_geohash.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_geohash):
        raise TypeError("Invalid input for list_of_geohash")

    format_list = ["dd", "dms", "radian", "cartesian"]
    if output_format not in format_list:
        raise TypeError("Invalid input for output_format")

    if result_prefix and (len(result_prefix) != len(list_of_geohash)):
        raise TypeError(
            "result_prefix must have the same length as list_of_geohash if it is not empty"
        )

    f_to_latlon_dd = F.udf(
        lambda loc: to_latlon_decimal_degrees(loc, "geohash", radius),
        T.ArrayType(T.FloatType()),
    )

    from_latlon_dd_ = lambda loc: from_latlon_decimal_degrees(
        loc, output_format, radius
    )
    if output_format in ["dd", "radian", "cartesian"]:
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.ArrayType(T.FloatType()))
    elif output_format == "dms":
        f_from_latlon_dd = F.udf(
            from_latlon_dd_, T.ArrayType(T.ArrayType(T.FloatType()))
        )

    odf = idf
    for i, geohash in enumerate(list_of_geohash):
        col = result_prefix[i] if result_prefix else geohash

        odf = odf.withColumn(
            col + "_" + output_format, f_from_latlon_dd(f_to_latlon_dd(geohash))
        )

        if output_format in ["dd", "dms", "radian"]:
            odf = (
                odf.withColumn(
                    col + "_lat_" + output_format, F.col(col + "_" + output_format)[0]
                )
                .withColumn(
                    col + "_lon_" + output_format, F.col(col + "_" + output_format)[1]
                )
                .drop(col + "_" + output_format)
            )

        if output_format == "cartesian":
            odf = (
                odf.withColumn(col + "_x", F.col(col + "_" + output_format)[0])
                .withColumn(col + "_y", F.col(col + "_" + output_format)[1])
                .withColumn(col + "_z", F.col(col + "_" + output_format)[2])
                .drop(col + "_" + output_format)
            )

        if output_mode == "replace":
            odf = odf.drop(geohash)

    return odf
def geo_format_latlon(idf, list_of_lat, list_of_lon, input_format, output_format, result_prefix=[], optional_configs={'geohash_precision': 8, 'radius': 6371009}, output_mode='append')

This function is the main function to convert the input data's location columns from lat,lon format to desired format based on output_format. If output_mode is set to True, the original location columns will be dropped. For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format If output_format is "dd" or "dms" or "radian", 2 new columns containing "lat", "long" in the column names will be created. If output_format is "cartesian", 3 new columns containing "x", "y", "z" in the column names will be created. If output_format is "geohash", 1 new column containing "_geohash" in the column name will be created.

Parameters

idf
Input Dataframe.
list_of_lat
List of columns representing latitude e.g., ["lat1","lat2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
list_of_lon
List of columns representing longitude e.g., ["lon1","lon2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2". list_of_lon must have the same length as list_of_lat such that i-th element of list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
input_format
"dd", "dms", "radian". "dd" represents latitude and longitude in decimal degrees. "dms" represents latitude and longitude in degrees minutes second. "radian" represents latitude and longitude in radians.
output_format
"dd", "dms", "radian", "cartesian", "geohash". "cartesian" represents the Cartesian coordinates of the point in three-dimensional space. "geohash" represents geocoded locations.
result_prefix
List of prefixes for the newly generated column names. Alternatively, prefixes can be specified in a string format, where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2". result_prefix must have the same length as list_of_lat and list_of_lon. If it is empty, will be used for each lat-lon pair. For example, list_of_lat is "lat1|lat2", list_of_lon is "L1|L2". Case 1: result_prefix = "L1|L2". If output_format is "dd", "dms" or "radian", new columns will be named as L1_lat, L1_lon_, L2_lat_, L2_lon_. If output_format is "cartesian", new columns will be named as L1_x, L1_y, L1_z, L2_x, L2_y, L2_z. If output_format is "geohash", new columns will be named as L1_geohash and L2_geohash. Case 2: result_prefix = []. Prefixes "L1" and "L2" in above column names will be replaced by "lat1_lon1" and "lat2_lon2". (Default value = [])
optional_configs
The following keys can be used: - geohash_precision: precision of the resultant geohash. This key is only used when output_format is "geohash". (Default value = 8) - radius: radius of Earth. Necessary only when output_format is "cartesian". (Default value = EARTH_RADIUS)
output_mode
"replace", "append". "replace" option appends transformed column to the input dataset and removes the original ones. "append" option appends transformed column to the input dataset. (Default value = "append")

Returns

DataFrame
 
Expand source code
def geo_format_latlon(
    idf,
    list_of_lat,
    list_of_lon,
    input_format,
    output_format,
    result_prefix=[],
    optional_configs={"geohash_precision": 8, "radius": EARTH_RADIUS},
    output_mode="append",
):
    """
    This function is the main function to convert the input data's location columns from lat,lon format to desired format
    based on output_format. If output_mode is set to True, the original location columns will be dropped.
    For each location column, "to_latlon_decimal_degrees" will be called to convert them to decimal degrees, and then
    "from_latlon_decimal_degrees" will be called to transform decimal degrees to output_format
    If output_format is "dd" or "dms" or "radian", 2 new columns containing "_lat_", "_long_" in the column names will be created.
    If output_format is "cartesian", 3 new columns containing "_x_", "_y_", "_z_" in the column names will be created.
    If output_format is "geohash", 1 new column containing "_geohash" in the column name will be created.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_lat
        List of columns representing latitude e.g., ["lat1","lat2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
    list_of_lon
        List of columns representing longitude e.g., ["lon1","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2".
        list_of_lon must have the same length as list_of_lat such that i-th element of
        list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
    input_format
        "dd", "dms", "radian".
        "dd" represents latitude and longitude in decimal degrees.
        "dms" represents latitude and longitude in degrees minutes second.
        "radian" represents latitude and longitude in radians.
    output_format
        "dd", "dms", "radian", "cartesian", "geohash".
        "cartesian" represents the Cartesian coordinates of the point in three-dimensional space.
        "geohash" represents geocoded locations.
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_lat and list_of_lon.
        If it is empty, <lat>_<lon> will be used for each lat-lon pair.
        For example, list_of_lat is "lat1|lat2", list_of_lon is "L1|L2".
        Case 1: result_prefix = "L1|L2".
            If output_format is "dd", "dms" or "radian", new columns will be named as
            L1_lat_<output_format>, L1_lon_<output_format>, L2_lat_<output_format>, L2_lon_<output_format>.
            If output_format is "cartesian", new columns will be named as
            L1_x, L1_y, L1_z, L2_x, L2_y, L2_z.
            If output_format is "geohash", new columns will be named as
            L1_geohash and L2_geohash.
        Case 2: result_prefix = [].
            Prefixes "L1" and "L2" in above column names will be replaced by "lat1_lon1" and "lat2_lon2".
        (Default value = [])
    optional_configs
        The following keys can be used:
        - geohash_precision: precision of the resultant geohash. This key is only used when output_format
          is "geohash". (Default value = 8)
        - radius: radius of Earth. Necessary only when output_format is "cartesian".
          (Default value = EARTH_RADIUS)
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """
    geohash_precision = int(optional_configs.get("geohash_precision", 8))
    radius = optional_configs.get("radius", EARTH_RADIUS)

    if isinstance(list_of_lat, str):
        list_of_lat = [x.strip() for x in list_of_lat.split("|")]
    if isinstance(list_of_lon, str):
        list_of_lon = [x.strip() for x in list_of_lon.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_lat + list_of_lon):
        raise TypeError("Invalid input for list_of_lat or list_of_lon")

    format_list = ["dd", "dms", "radian", "cartesian", "geohash"]
    if (input_format not in format_list[:3]) or (output_format not in format_list):
        raise TypeError("Invalid input for input_format or output_format")

    if len(list_of_lat) != len(list_of_lon):
        raise TypeError("list_of_lat and list_of_lon must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_lat)):
        raise TypeError(
            "result_prefix must have the same length as list_of_lat and list_of_lon if it is not empty"
        )

    f_to_latlon_dd = F.udf(
        lambda loc: to_latlon_decimal_degrees(loc, input_format, radius),
        T.ArrayType(T.FloatType()),
    )

    from_latlon_dd_ = lambda loc: from_latlon_decimal_degrees(
        loc, output_format, radius, geohash_precision
    )
    if output_format in ["dd", "radian", "cartesian"]:
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.ArrayType(T.FloatType()))
    elif output_format == "dms":
        f_from_latlon_dd = F.udf(
            from_latlon_dd_, T.ArrayType(T.ArrayType(T.FloatType()))
        )
    elif output_format == "geohash":
        f_from_latlon_dd = F.udf(from_latlon_dd_, T.StringType())

    odf = idf
    for i, (lat, lon) in enumerate(zip(list_of_lat, list_of_lon)):
        col = result_prefix[i] if result_prefix else (lat + "_" + lon)

        odf = odf.withColumn(col + "_temp", F.array(lat, lon)).withColumn(
            col + "_" + output_format, f_from_latlon_dd(f_to_latlon_dd(col + "_temp"))
        )

        if output_format in ["dd", "dms", "radian"]:
            odf = (
                odf.withColumn(
                    col + "_lat_" + output_format, F.col(col + "_" + output_format)[0]
                )
                .withColumn(
                    col + "_lon_" + output_format, F.col(col + "_" + output_format)[1]
                )
                .drop(col + "_" + output_format)
            )

        if output_format == "cartesian":
            odf = (
                odf.withColumn(col + "_x", F.col(col + "_" + output_format)[0])
                .withColumn(col + "_y", F.col(col + "_" + output_format)[1])
                .withColumn(col + "_z", F.col(col + "_" + output_format)[2])
                .drop(col + "_" + output_format)
            )

        odf = odf.drop(col + "_temp")

        if output_mode == "replace":
            odf = odf.drop(lat, lon)

    return odf
def geohash_precision_control(idf, list_of_geohash, output_precision=8, km_max_error=None, output_mode='append')

This function controls the precision of input data's geohash columns.

Parameters

idf
Input Dataframe.
list_of_geohash
List of columns in geohash format e.g., ["gh1","gh2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "gh1|gh2".
output_precision
Precision of the transformed geohash in the output dataframe. (Default value = 8)
km_max_error
Maximum permissible error in kilometers. If km_max_error is specified, output_precision will be ignored and km_max_error will be mapped to an output_precision according to the following dictionary: {2500: 1, 630: 2, 78: 3, 20: 4, 2.4: 5, 0.61: 6, 0.076: 7, 0.019: 8, 0.0024: 9, 0.00060: 10, 0.000074: 11}. (Default value = None)
output_mode
"replace", "append". "replace" option replaces original columns with transformed column. "append" option appends the transformed column to the input dataset with postfix "precision". (Default value = "append")

Returns

DataFrame
 
Expand source code
def geohash_precision_control(
    idf, list_of_geohash, output_precision=8, km_max_error=None, output_mode="append"
):
    """
    This function controls the precision of input data's geohash columns.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_geohash
        List of columns in geohash format e.g., ["gh1","gh2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "gh1|gh2".
    output_precision
        Precision of the transformed geohash in the output dataframe. (Default value = 8)
    km_max_error
        Maximum permissible error in kilometers. If km_max_error is specified, output_precision
        will be ignored and km_max_error will be mapped to an output_precision according to the
        following dictionary: {2500: 1, 630: 2, 78: 3, 20: 4, 2.4: 5, 0.61: 6, 0.076: 7,
        0.019: 8, 0.0024: 9, 0.00060: 10, 0.000074: 11}. (Default value = None)
    output_mode
        "replace", "append".
        "replace" option replaces original columns with transformed column.
        "append" option appends the transformed column to the input dataset
        with postfix "_precision_<output_precision>". (Default value = "append")

    Returns
    -------
    DataFrame
    """

    if isinstance(list_of_geohash, str):
        list_of_geohash = [x.strip() for x in list_of_geohash.split("|")]

    if any(x not in idf.columns for x in list_of_geohash):
        raise TypeError("Invalid input for list_of_geohash")

    error_precision_mapping = {
        2500: 1,
        630: 2,
        78: 3,
        20: 4,
        2.4: 5,
        0.61: 6,
        0.076: 7,
        0.019: 8,
        0.0024: 9,
        0.00060: 10,
        0.000074: 11,
    }
    if km_max_error is not None:
        output_precision = 12
        for key, val in error_precision_mapping.items():
            if km_max_error >= key:
                output_precision = val
                break
    output_precision = int(output_precision)
    logger.info(
        "Precision of the output geohashes will be capped at "
        + str(output_precision)
        + "."
    )
    odf = idf
    for i, geohash in enumerate(list_of_geohash):
        if output_mode == "replace":
            col_name = geohash
        else:
            col_name = geohash + "_precision_" + str(output_precision)
        odf = odf.withColumn(col_name, F.substring(geohash, 1, output_precision))

    return odf
def location_distance(idf, list_of_cols_loc1, list_of_cols_loc2, loc_format='dd', result_prefix='', distance_type='haversine', unit='m', optional_configs={'radius': 6371009, 'vincenty_model': 'WGS-84'}, output_mode='append')

This function calculates the distance between 2 locations, and the distance formula is determined by distance_type. If distance_type = "vincenty", thed loc_format should be "dd", and list_of_cols_loc1, list_of_cols_loc2 should be in [lat1, lon1] and [lat2, lon2] format respectively. "vincenty_distance" function will be called to calculate the distance. If distance_type = "haversine", then loc_format should be "radian", and list_of_cols_loc1, list_of_cols_loc2 should be in [lat1, lon1] and [lat2, lon2] format respectively. "haversine_distance" function will be called to calculate the distance. If distance_type = "euclidean", then loc_format should be "cartesian", and list_of_cols_loc1, list_of_cols_loc2 should be in [x1, y1, z1] and [x2, y2, z2] format respectively. "euclidean_distance" function will be called to calculate the distance. If loc_format does not match with distance_type's desired format, necessary conversion of location columns will be performed with the help of "geo_format_latlon", "geo_format_cartesian" and "geo_format_geohash" functions.

Parameters

idf
Input Dataframe.
list_of_cols_loc1
List of columns to express the first location e.g., ["lat1","lon1"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lat1|lon1".
list_of_cols_loc2
List of columns to express the second location e.g., ["lat2","lon2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lat2|lon2".
loc_format
"dd", "dms", "radian", "cartesian", "geohash". (Default value = "dd")
result_prefix
Prefix for the newly generated column. It must be a string or a list with one element. If it is empty, _ will be used as the prefix. For example, list_of_cols_loc1 is "lat1|lon1", list_of_cols_loc2 is "lat2|lon2". Case 1: result_prefix = "L1_L2": the new column will be named as L1_L2_distance. Case 2: result_prefix = []: the new column will be named as lat1_lon1_lat2_lon2_distance. (Default value = '')
distance_type
"vincenty", "haversine", "euclidean". (Default value = "haversine") "vincenty" option calculates the distance between two points on the surface of a spheroid. "haversine" option calculates the great-circle distance between two points on a sphere. "euclidean" option calculates the length of the line segment between two points.
unit
"m", "km". Unit of the result. (Default value = "m")
optional_configs
The following keys can be used: - radius: radius of Earth. Necessary only when output_format is "cartesian". (Default value = EARTH_RADIUS) - vincenty_model: The ellipsoidal model to use. Supported values: "WGS-84", "GRS-80", "Airy (1830)", "Intl 1924", "Clarke (1880)", "GRS-67". For more information, please refer to geopy.distance.ELLIPSOIDS. (Default value = "WGS-84")
output_mode
"replace", "append". "replace" option replaces original columns with transformed column. "append" option appends the transformed column to the input dataset with name "__distance". (Default value = "append")

Returns

DataFrame
 
Expand source code
def location_distance(
    idf,
    list_of_cols_loc1,
    list_of_cols_loc2,
    loc_format="dd",
    result_prefix="",
    distance_type="haversine",
    unit="m",
    optional_configs={"radius": EARTH_RADIUS, "vincenty_model": "WGS-84"},
    output_mode="append",
):
    """
    This function calculates the distance between 2 locations, and the distance formula is determined by distance_type.
    If distance_type = "vincenty", thed loc_format should be "dd", and list_of_cols_loc1, list_of_cols_loc2
    should be in [lat1, lon1] and [lat2, lon2] format respectively. "vincenty_distance" function will be called to
    calculate the distance.
    If distance_type = "haversine", then loc_format should be "radian", and list_of_cols_loc1, list_of_cols_loc2
    should be in [lat1, lon1] and [lat2, lon2] format respectively. "haversine_distance" function will be called to
    calculate the distance.
     If distance_type = "euclidean", then loc_format should be "cartesian", and list_of_cols_loc1, list_of_cols_loc2
    should be in  [x1, y1, z1] and [x2, y2, z2] format respectively. "euclidean_distance" function will be called to
    calculate the distance.
    If loc_format does not match with distance_type's desired format, necessary conversion of location columns will be performed
    with the help of "geo_format_latlon", "geo_format_cartesian" and "geo_format_geohash" functions.

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_cols_loc1
        List of columns to express the first location e.g., ["lat1","lon1"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lon1".
    list_of_cols_loc2
        List of columns to express the second location e.g., ["lat2","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat2|lon2".
    loc_format
        "dd", "dms", "radian", "cartesian", "geohash". (Default value = "dd")
    result_prefix
        Prefix for the newly generated column. It must be a string or a list with one element.
        If it is empty, <list_of_cols_loc1 joined by '_'>_<list_of_cols_loc2 joined by '_'>
        will be used as the prefix.
        For example, list_of_cols_loc1 is "lat1|lon1", list_of_cols_loc2 is "lat2|lon2".
        Case 1: result_prefix = "L1_L2": the new column will be named as L1_L2_distance.
        Case 2: result_prefix = []: the new column will be named as lat1_lon1_lat2_lon2_distance.
        (Default value = '')
    distance_type
        "vincenty", "haversine", "euclidean". (Default value = "haversine")
        "vincenty" option calculates the distance between two points on the surface of a spheroid.
        "haversine" option calculates the great-circle distance between two points on a sphere.
        "euclidean" option calculates the length of the line segment between two points.
    unit
        "m", "km".
        Unit of the result. (Default value = "m")
    optional_configs
        The following keys can be used:
        - radius: radius of Earth. Necessary only when output_format is "cartesian".
          (Default value = EARTH_RADIUS)
        - vincenty_model: The ellipsoidal model to use. Supported values: "WGS-84", "GRS-80", "Airy (1830)",
          "Intl 1924", "Clarke (1880)", "GRS-67". For more information, please refer to geopy.distance.ELLIPSOIDS.
          (Default value = "WGS-84")
    output_mode
        "replace", "append".
        "replace" option replaces original columns with transformed column.
        "append" option appends the transformed column to the input dataset with name "<loc1>_<loc2>_distance".
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    radius = optional_configs.get("radius", EARTH_RADIUS)
    vincenty_model = optional_configs.get("vincenty_model", "WGS-84")

    if isinstance(list_of_cols_loc1, str):
        list_of_cols_loc1 = [x.strip() for x in list_of_cols_loc1.split("|")]
    if isinstance(list_of_cols_loc2, str):
        list_of_cols_loc2 = [x.strip() for x in list_of_cols_loc2.split("|")]

    if isinstance(result_prefix, list):
        if len(result_prefix) > 1:
            raise TypeError(
                "If result_prefix is a list, it can contain maximally 1 element"
            )
        elif len(result_prefix) == 1:
            result_prefix = result_prefix[0]

    if any(i not in idf.columns for i in list_of_cols_loc1 + list_of_cols_loc2):
        raise TypeError("Invalid input for list_of_cols_loc1 or list_of_cols_loc2")

    if distance_type not in ["vincenty", "haversine", "euclidean"]:
        raise TypeError("Invalid input for distance_type")

    if loc_format not in ["dd", "dms", "radian", "cartesian", "geohash"]:
        raise TypeError("Invalid input for loc_format")

    format_mapping = {"vincenty": "dd", "haversine": "radian", "euclidean": "cartesian"}
    format_required = format_mapping[distance_type]

    if loc_format != format_required:
        if loc_format in ["dd", "dms", "radian"]:
            idf = geo_format_latlon(
                idf,
                list_of_lat=[list_of_cols_loc1[0], list_of_cols_loc2[0]],
                list_of_lon=[list_of_cols_loc1[1], list_of_cols_loc2[1]],
                input_format=loc_format,
                output_format=format_required,
                result_prefix=["temp_loc1", "temp_loc2"],
                optional_configs={"radius": radius},
                output_mode="append",
            )

        elif loc_format == "cartesian":
            idf = geo_format_cartesian(
                idf,
                list_of_x=[list_of_cols_loc1[0], list_of_cols_loc2[0]],
                list_of_y=[list_of_cols_loc1[1], list_of_cols_loc2[1]],
                list_of_z=[list_of_cols_loc1[2], list_of_cols_loc2[2]],
                output_format=format_required,
                result_prefix=["temp_loc1", "temp_loc2"],
                optional_configs={"radius": radius},
                output_mode="append",
            )

        elif loc_format == "geohash":
            idf = geo_format_geohash(
                idf,
                list_of_geohash=[list_of_cols_loc1[0], list_of_cols_loc2[0]],
                output_format=format_required,
                result_prefix=["temp_loc1", "temp_loc2"],
                optional_configs={"radius": radius},
                output_mode="append",
            )

        if format_required == "dd":
            loc1, loc2 = ["temp_loc1_lat_dd", "temp_loc1_lon_dd"], [
                "temp_loc2_lat_dd",
                "temp_loc2_lon_dd",
            ]

        elif format_required == "radian":
            loc1, loc2 = ["temp_loc1_lat_radian", "temp_loc1_lon_radian"], [
                "temp_loc2_lat_radian",
                "temp_loc2_lon_radian",
            ]

        elif format_required == "cartesian":
            loc1, loc2 = ["temp_loc1_x", "temp_loc1_y", "temp_loc1_z"], [
                "temp_loc2_x",
                "temp_loc2_y",
                "temp_loc2_z",
            ]

        idf = (
            idf.withColumn("temp_loc1", F.array(*loc1))
            .withColumn("temp_loc2", F.array(*loc2))
            .drop(*(loc1 + loc2))
        )
    else:
        idf = idf.withColumn("temp_loc1", F.array(*list_of_cols_loc1)).withColumn(
            "temp_loc2", F.array(*list_of_cols_loc2)
        )

    if distance_type == "vincenty":
        compute_distance = lambda x1, x2: vincenty_distance(
            x1, x2, unit, vincenty_model
        )
    elif distance_type == "haversine":
        compute_distance = lambda x1, x2: haversine_distance(
            x1, x2, "radian", unit, radius
        )
    else:
        compute_distance = lambda x1, x2: euclidean_distance(x1, x2, unit)

    f_compute_distance = F.udf(compute_distance, T.FloatType())

    col_prefix = (
        result_prefix
        if result_prefix
        else "_".join(list_of_cols_loc1) + "_" + "_".join(list_of_cols_loc2)
    )
    odf = idf.withColumn(
        col_prefix + "_distance", f_compute_distance("temp_loc1", "temp_loc2")
    ).drop("temp_loc1", "temp_loc2")

    if output_mode == "replace":
        odf = odf.drop(*(list_of_cols_loc1 + list_of_cols_loc2))

    return odf
def location_in_country(spark, idf, list_of_lat, list_of_lon, country, country_shapefile_path='', method_type='approx', result_prefix=[], output_mode='append')

This function checks whether each lat-lon pair is insided a country. Two ways of checking are supported: "approx" (using the bounding box of a country) and "exact" (using the shapefile of a country).

Parameters

spark
Spark Session
idf
Input Dataframe.
list_of_lat
List of columns representing latitude e.g., ["lat1","lat2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
list_of_lon
List of columns representing longitude e.g., ["lon1","lon2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2". list_of_lon must have the same length as list_of_lat such that i-th element of list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
country
The Alpha-2 country code.
country_shapefile_path
The geojson file with a FeatureCollection object containing polygons for each country. One example file country_polygons.geojson can be downloaded from Anovos GitHub repository: https://github.com/anovos/anovos/tree/main/data/
method_type
"approx", "exact". "approx" uses the bounding box of a country to estimate whether a location is inside the country "exact" uses the shapefile of a country to calculate whether a location is inside the country
result_prefix
List of prefixes for the newly generated column names. Alternatively, prefixes can be specified in a string format, where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2". result_prefix must have the same length as list_of_lat and list_of_lon. If it is empty, _ will be used for each lat-lon pair. For example, list_of_lat is "lat1|lat2", list_of_lon is "lon1|lon2". Case 1: result_prefix = "L1|L2", country="US" New columns will be named as L1_in_US and L2_in_US. Calse 2: result_prefix = [], country="US" New columns will be named as lat1_lon1_in_US and lat2_lon2_in_US. (Default value = [])
output_mode
"replace", "append". "replace" option appends transformed column to the input dataset and removes the original ones. "append" option appends transformed column to the input dataset. (Default value = "append")

Returns

DataFrame
 
Expand source code
def location_in_country(
    spark,
    idf,
    list_of_lat,
    list_of_lon,
    country,
    country_shapefile_path="",
    method_type="approx",
    result_prefix=[],
    output_mode="append",
):
    """
    This function checks whether each lat-lon pair is insided a country. Two ways of checking are supported: "approx" (using the
    bounding box of a country) and "exact" (using the shapefile of a country).

    Parameters
    ----------
    spark
        Spark Session
    idf
        Input Dataframe.
    list_of_lat
        List of columns representing latitude e.g., ["lat1","lat2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
    list_of_lon
        List of columns representing longitude e.g., ["lon1","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2".
        list_of_lon must have the same length as list_of_lat such that i-th element of
        list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
    country
        The Alpha-2 country code.
    country_shapefile_path
        The geojson file with a FeatureCollection object containing polygons for each country. One example
        file country_polygons.geojson can be downloaded from Anovos GitHub repository:
        https://github.com/anovos/anovos/tree/main/data/
    method_type
        "approx", "exact".
        "approx" uses the bounding box of a country to estimate whether a location is inside the country
        "exact" uses the shapefile of a country to calculate whether a location is inside the country
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_lat and list_of_lon.
        If it is empty, <lat>_<lon> will be used for each lat-lon pair.
        For example, list_of_lat is "lat1|lat2", list_of_lon is "lon1|lon2".
        Case 1: result_prefix = "L1|L2", country="US"
            New columns will be named as L1_in_US and L2_in_US.
        Calse 2: result_prefix = [], country="US"
            New columns will be named as lat1_lon1_in_US and lat2_lon2_in_US.
        (Default value = [])
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    if isinstance(list_of_lat, str):
        list_of_lat = [x.strip() for x in list_of_lat.split("|")]
    if isinstance(list_of_lon, str):
        list_of_lon = [x.strip() for x in list_of_lon.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_lat + list_of_lon):
        raise TypeError("Invalid input for list_of_lat or list_of_lon")

    if len(list_of_lat) != len(list_of_lon):
        raise TypeError("list_of_lat and list_of_lon must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_lat)):
        raise TypeError(
            "result_prefix must have the same length as list_of_lat and list_of_lon if it is not empty"
        )
    if method_type not in ("approx", "exact"):
        raise TypeError("Invalid input for method_type.")

    f_point_in_country_approx = F.udf(point_in_country_approx, T.IntegerType())

    if method_type == "exact":

        def zip_feats(x, y):
            """zipping two features (in list form) elementwise"""
            return zip(x, y)

        f_zip_feats = F.udf(
            zip_feats,
            T.ArrayType(
                T.StructType(
                    [
                        T.StructField("first", T.StringType()),
                        T.StructField(
                            "second",
                            T.ArrayType(
                                T.ArrayType(T.ArrayType(T.ArrayType(T.DoubleType())))
                            ),
                        ),
                    ]
                )
            ),
        )

        geo_data = spark.read.json(country_shapefile_path, multiLine=True).withColumn(
            "tmp",
            f_zip_feats("features.properties.ISO_A2", "features.geometry.coordinates"),
        )
        polygon_list = (
            geo_data.select(F.explode(F.col("tmp")).alias("country_coord"))
            .withColumn("country_code", F.col("country_coord").getItem("first"))
            .withColumn("coordinates", F.col("country_coord").getItem("second"))
            .where(F.col("country_code") == country)
            .select("coordinates")
            .rdd.map(lambda x: x[0])
            .collect()[0]
        )
        print("No. of polygon: " + str(len(polygon_list)))

        min_lon, min_lat = polygon_list[0][0][0]
        max_lon, max_lat = polygon_list[0][0][0]
        for polygon in polygon_list:
            exterior = polygon[0]
            for loc in exterior:
                if loc[0] < min_lon:
                    min_lon = loc[0]
                elif loc[0] > max_lon:
                    max_lon = loc[0]

                if loc[1] < min_lat:
                    min_lat = loc[1]
                elif loc[1] > max_lat:
                    max_lat = loc[1]

    odf = idf
    for i, (lat, lon) in enumerate(zip(list_of_lat, list_of_lon)):
        col = result_prefix[i] if result_prefix else (lat + "_" + lon)

        if method_type == "exact":
            odf = odf.withColumn(
                col + "_in_" + country + "_exact",
                f_point_in_polygons(
                    polygon_list, [min_lon, min_lat], [max_lon, max_lat]
                )(F.col(lon), F.col(lat)),
            )
        else:
            odf = odf.withColumn(
                col + "_in_" + country + "_approx",
                f_point_in_country_approx(F.col(lat), F.col(lon), F.lit(country)),
            )

        if output_mode == "replace":
            odf = odf.drop(lat, lon)

    return odf
def location_in_polygon(idf, list_of_lat, list_of_lon, polygon, result_prefix=[], output_mode='append')

This function checks whether each lat-lon pair is insided a GeoJSON object. The following types of GeoJSON objects are supported by this function: Polygon, MultiPolygon, Feature or FeatureCollection

Parameters

idf
Input Dataframe.
list_of_lat
List of columns representing latitude e.g., ["lat1","lat2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
list_of_lon
List of columns representing longitude e.g., ["lon1","lon2"]. Alternatively, columns can be specified in a string format, where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2". list_of_lon must have the same length as list_of_lat such that i-th element of list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
polygon
The following types of GeoJSON objects are supported: Polygon, MultiPolygon, Feature or FeatureCollection
result_prefix
List of prefixes for the newly generated column names. Alternatively, prefixes can be specified in a string format, where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2". result_prefix must have the same length as list_of_lat and list_of_lon. If it is empty, _ will be used for each lat-lon pair. For example, list_of_lat is "lat1|lat2", list_of_lon is "lon1|lon2". Case 1: result_prefix = "L1|L2". New columns will be named as L1_in_poly and L2_in_poly. Calse 2: result_prefix = []. New columns will be named as lat1_lon1_in_poly and lat2_lon2_in_poly. (Default value = [])
output_mode
"replace", "append". "replace" option appends transformed column to the input dataset and removes the original ones. "append" option appends transformed column to the input dataset. (Default value = "append")

Returns

DataFrame
 
Expand source code
def location_in_polygon(
    idf, list_of_lat, list_of_lon, polygon, result_prefix=[], output_mode="append"
):
    """
    This function checks whether each lat-lon pair is insided a GeoJSON object. The following types of GeoJSON objects
    are supported by this function: Polygon, MultiPolygon, Feature or FeatureCollection

    Parameters
    ----------
    idf
        Input Dataframe.
    list_of_lat
        List of columns representing latitude e.g., ["lat1","lat2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lat1|lat2".
    list_of_lon
        List of columns representing longitude e.g., ["lon1","lon2"].
        Alternatively, columns can be specified in a string format,
        where different column names are separated by pipe delimiter "|" e.g., "lon1|lon2".
        list_of_lon must have the same length as list_of_lat such that i-th element of
        list_of_lat and i-th element of list_of_lon form a lat-lon pair to format.
    polygon
        The following types of GeoJSON objects are supported: Polygon, MultiPolygon, Feature or FeatureCollection
    result_prefix
        List of prefixes for the newly generated column names.
        Alternatively, prefixes can be specified in a string format,
        where different prefixes are separated by pipe delimiter "|" e.g., "pf1|pf2".
        result_prefix must have the same length as list_of_lat and list_of_lon.
        If it is empty, <lat>_<lon> will be used for each lat-lon pair.
        For example, list_of_lat is "lat1|lat2", list_of_lon is "lon1|lon2".
        Case 1: result_prefix = "L1|L2".
            New columns will be named as L1_in_poly and L2_in_poly.
        Calse 2: result_prefix = [].
            New columns will be named as lat1_lon1_in_poly and lat2_lon2_in_poly.
        (Default value = [])
    output_mode
        "replace", "append".
        "replace" option appends transformed column to the input dataset and removes the original ones.
        "append" option appends transformed column to the input dataset.
        (Default value = "append")

    Returns
    -------
    DataFrame
    """

    if isinstance(list_of_lat, str):
        list_of_lat = [x.strip() for x in list_of_lat.split("|")]
    if isinstance(list_of_lon, str):
        list_of_lon = [x.strip() for x in list_of_lon.split("|")]
    if isinstance(result_prefix, str):
        result_prefix = [x.strip() for x in result_prefix.split("|")]

    if any(x not in idf.columns for x in list_of_lat + list_of_lon):
        raise TypeError("Invalid input for list_of_lat or list_of_lon")

    if len(list_of_lat) != len(list_of_lon):
        raise TypeError("list_of_lat and list_of_lon must have the same length")
    if result_prefix and (len(result_prefix) != len(list_of_lat)):
        raise TypeError(
            "result_prefix must have the same length as list_of_lat and list_of_lon if it is not empty"
        )

    if "coordinates" in polygon.keys():
        polygon_list = polygon["coordinates"]
        if polygon["type"] == "Polygon":
            polygon_list = [polygon_list]
    elif "geometry" in polygon.keys():
        polygon_list = [polygon["geometry"]["coordinates"]]
    elif "features" in polygon.keys():
        polygon_list = []
        for poly in polygon["features"]:
            polygon_list.append(poly["geometry"]["coordinates"])

    odf = idf
    for i, (lat, lon) in enumerate(zip(list_of_lat, list_of_lon)):
        col = result_prefix[i] if result_prefix else (lat + "_" + lon)
        odf = odf.withColumn(
            col + "_in_poly", f_point_in_polygons(polygon_list)(F.col(lon), F.col(lat))
        )

        if output_mode == "replace":
            odf = odf.drop(lat, lon)

    return odf
def reverse_geocoding(idf, lat_col, long_col)

This function reverses the input latitude and longitude of a given DataFrame into address Parameters


idf
Input Dataframe
lat_col
Column in the input DataFrame that contains latitude data
long_col
Column in the input DataFrame that contains longitude data

Returns

odf : DataFrame
Output DataFrame, which contains latitude, longitude and address appropriately
Expand source code
def reverse_geocoding(idf, lat_col, long_col):
    """
    This function reverses the input latitude and longitude of a given DataFrame into address
    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains latitude, longitude and address appropriately
    """
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def reverse_geocode(lat, long):
        coordinates = (float(lat), float(long))
        location = rg.search(coordinates, mode=1)
        if location:
            return (
                str(location[0]["name"])
                + ","
                + str(location[0]["admin1"])
                + ","
                + str(location[0]["cc"])
            )
        else:
            return "N/A"

    udf_reverse_geocode = F.udf(reverse_geocode)
    odf = (
        idf.withColumn("info", udf_reverse_geocode(F.col(lat_col), F.col(long_col)))
        .select(lat_col, long_col, "info")
        .withColumn("name_of_place", F.split(F.col("info"), ",").getItem(0))
        .withColumn("region", F.split(F.col("info"), ",").getItem(1))
        .withColumn("country_code", F.split(F.col("info"), ",").getItem(2))
        .drop("info")
    )
    return odf
def rog_calculation(idf, lat_col, long_col, id_col=None)

This function calculates the Radius of Gyration (in meter) of a given DataFrame, based on its identifier column (if applicable) Parameters


idf
Input Dataframe
lat_col
Column in the input DataFrame that contains latitude data
long_col
Column in the input DataFrame that contains longitude data
id_col
Column in the input DataFrame that contains identifier for the DataFrame (Default is None)

Returns

odf : DataFrame
Output DataFrame, which contains Radius of Gyration (in meter) and identifier (if applicable)
Expand source code
def rog_calculation(idf, lat_col, long_col, id_col=None):
    """
    This function calculates the Radius of Gyration (in meter) of a given DataFrame, based on its identifier column (if applicable)
    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    id_col
        Column in the input DataFrame that contains identifier for the DataFrame (Default is None)
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains Radius of Gyration (in meter) and identifier (if applicable)
    """
    if id_col not in idf.columns and id_col:
        raise TypeError("Invalid input for id_col")
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def getHaversineDist(lat1, lon1, lat2, lon2):

        R = 6378126  # approximate radius of earth in m

        lat1 = radians(float(lat1))
        lon1 = radians(float(lon1))
        lat2 = radians(float(lat2))
        lon2 = radians(float(lon2))

        dlon = lon2 - lon1
        dlat = lat2 - lat1
        a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
        c = 2 * atan2(sqrt(a), sqrt(1 - a))
        distance = R * c
        return distance

    udf_harver_dist = F.udf(getHaversineDist, T.FloatType())

    if id_col:
        idf_centroid = centroid(idf, lat_col, long_col, id_col)
        idf_join = idf_centroid.join(idf, id_col, "inner")
        idf_calc = idf_join.withColumn(
            "distance",
            udf_harver_dist(
                F.col(lat_col),
                F.col(long_col),
                F.col(lat_col + "_centroid"),
                F.col(long_col + "_centroid"),
            ),
        )

        odf = idf_calc.groupby(id_col).agg(
            F.mean("distance").alias("radius_of_gyration")
        )
    else:
        centroid_info = centroid(idf, lat_col, long_col, id_col).rdd.collect()
        lat_centroid = centroid_info[0][0]
        long_centroid = centroid_info[0][1]
        idf_join = idf.withColumn(
            lat_col + "_centroid", F.lit(lat_centroid)
        ).withColumn(long_col + "_centroid", F.lit(long_centroid))
        idf_calc = idf_join.withColumn(
            "distance",
            udf_harver_dist(
                F.col(lat_col),
                F.col(long_col),
                F.col(lat_col + "_centroid"),
                F.col(long_col + "_centroid"),
            ),
        )

        odf = idf_calc.groupby().agg(F.mean("distance").alias("radius_of_gyration"))
    return odf
def weighted_centroid(idf, id_col, lat_col, long_col)

This function calculates the weighted centroid of a given DataFrame using lat-long pairs based on its identifier column

Parameters

idf
Input Dataframe
lat_col
Column in the input DataFrame that contains latitude data
long_col
Column in the input DataFrame that contains longitude data
id_col
Column in the input DataFrame that contains identifier for the DataFrame

Returns

odf : DataFrame
Output DataFrame, which contains weighted lat_centroid and long_centroid and identifier
Expand source code
def weighted_centroid(idf, id_col, lat_col, long_col):
    """
    This function calculates the weighted centroid of a given DataFrame using lat-long pairs based on its identifier column

    Parameters
    ----------
    idf
        Input Dataframe
    lat_col
        Column in the input DataFrame that contains latitude data
    long_col
        Column in the input DataFrame that contains longitude data
    id_col
        Column in the input DataFrame that contains identifier for the DataFrame
    Returns
    -------
    odf : DataFrame
        Output DataFrame, which contains weighted lat_centroid and long_centroid and identifier
    """
    if id_col not in idf.columns:
        raise TypeError("Invalid input for id_col")
    if lat_col not in idf.columns:
        raise TypeError("Invalid input for lat_col")
    if long_col not in idf.columns:
        raise TypeError("Invalid input for long_col")

    idf = recast_column(
        idf, list_of_cols=[lat_col, long_col], list_of_dtypes=["double", "double"]
    )

    if idf != idf.dropna(subset=(lat_col, long_col)):
        warnings.warn(
            "Rows dropped due to null value in longitude and/or latitude values"
        )
        idf = idf.dropna(subset=(lat_col, long_col))

    if not idf.where(
        (F.col(lat_col) > 90)
        | (F.col(lat_col) < -90)
        | (F.col(long_col) > 180)
        | (F.col(long_col) < -180)
    ).rdd.isEmpty():
        warnings.warn(
            "Rows dropped due to longitude and/or latitude values being out of the valid range"
        )
        idf = idf.where(
            (F.col(lat_col) <= 90)
            & (F.col(lat_col) >= -90)
            & (F.col(long_col) <= 180)
            & (F.col(long_col) >= -180)
        )

    if idf.rdd.isEmpty():
        warnings.warn(
            "No reverse_geocoding Computation - No valid latitude/longitude row(s) to compute"
        )
        return idf

    def degree_to_radian(deg):
        return deg * pi / 180

    f_degree_to_radian = F.udf(degree_to_radian, T.FloatType())

    idf_rad = (
        idf.withColumn("lat_rad", f_degree_to_radian(lat_col))
        .withColumn("long_rad", f_degree_to_radian(long_col))
        .withColumn("x", F.cos("lat_rad") * F.cos("long_rad"))
        .withColumn("y", F.cos("lat_rad") * F.sin("long_rad"))
        .withColumn("z", F.sin("lat_rad"))
    )

    idf_groupby = (
        idf_rad.groupby(id_col)
        .agg(
            F.sum("x").alias("x_group"),
            F.sum("y").alias("y_group"),
            F.sum("z").alias("z_group"),
            F.count(id_col).alias("weight_group"),
        )
        .withColumn("weighted_x", F.col("x_group") * F.col("weight_group"))
        .withColumn("weighted_y", F.col("y_group") * F.col("weight_group"))
        .withColumn("weighted_z", F.col("z_group") * F.col("weight_group"))
    )

    total_weight = (
        idf_groupby.groupBy()
        .agg(F.sum("weight_group"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )
    total_x = (
        idf_groupby.groupBy()
        .agg(F.sum("weighted_x"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )
    total_y = (
        idf_groupby.groupBy()
        .agg(F.sum("weighted_y"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )
    total_z = (
        idf_groupby.groupBy()
        .agg(F.sum("weighted_z"))
        .rdd.map(lambda x: x[0])
        .collect()[0]
    )

    x = total_x / total_weight
    y = total_y / total_weight
    z = total_z / total_weight
    hyp = sqrt(x * x + y * y)
    lat_centroid, long_centroid = atan2(z, hyp) * 180 / pi, atan2(y, x) * 180 / pi

    odf = (
        idf_groupby.select(id_col)
        .withColumn(lat_col + "_centroid", F.lit(lat_centroid))
        .withColumn(long_col + "_centroid", F.lit(long_centroid))
    )

    return odf