Skip to content

feast_exporter

Expand source code
import os
from datetime import datetime

import isort
from black import FileMode, format_str
from jinja2 import Template
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit

ANOVOS_SOURCE = "anovos_source"

dataframe_to_feast_type_mapping = {
    "string": "String",
    "int": "Int64",
    "float": "Float32",
    "timestamp": "String"
    # TODO: default type
}


def check_feast_configuration(feast_config: dict, repartition_count: int):
    if repartition_count != 1:
        raise ValueError(
            "Please, set repartition parameter to 1 in write_main block in your config yml!"
        )
    if "file_path" not in feast_config:
        raise ValueError(
            "Please, provide a path to the anovos feature_store repository!"
        )
    if "entity" not in feast_config:
        raise ValueError("Please, provide an entity definition in your config yml!")
    if "file_source" not in feast_config:
        raise ValueError("Please, provide a file source definition in your config yml!")
    if "feature_view" not in feast_config:
        raise ValueError(
            "Please, provide a feature view definition in your config yml!"
        )


def generate_entity_definition(config: dict) -> str:
    source_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "entity.txt"
    )

    with open(source_template_path, "r") as f:
        template_string = f.read()
        entity_template = Template(template_string)
        data = {
            "entity_name": config["name"],
            "join_keys": config["id_col"],
            "value_type": "STRING",
            "description": config["description"],
        }

        return entity_template.render(data)


def generate_feature_view(
    types: list, exclude_list: list, config: dict, entity_name: str
) -> str:
    source_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "feature_view.txt"
    )

    with open(source_template_path, "r") as f:
        template_string = f.read()

        fields = generate_fields(types, exclude_list)

        feature_view_template = Template(template_string)
        data = {
            "feature_view_name": config["name"],
            "source": ANOVOS_SOURCE,
            "view_name": config["name"],
            "entity": entity_name,
            "fields": fields,
            "ttl_in_seconds": config["ttl_in_seconds"],
            "owner": config["owner"],
        }

        return feature_view_template.render(data)


def generate_fields(types: list, exclude_list: list) -> str:
    fields = ""
    for (field_name, field_type) in types:
        if field_name not in exclude_list:
            fields += generate_field(
                field_name, dataframe_to_feast_type_mapping[field_type]
            )

    return fields


def generate_field(field_name: str, field_type: str) -> str:
    template_string = """ Field(name="{{name}}", dtype={{type}}),\n"""
    field_template = Template(template_string)

    return field_template.render({"name": field_name, "type": field_type})


def generate_file_source(config: dict, file_name="Test") -> str:
    source_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "file_source.txt"
    )

    with open(source_template_path, "r") as f:
        template_string = f.read()

        file_source_template = Template(template_string)
        data = {
            "source_name": ANOVOS_SOURCE,
            "filename": file_name,
            "ts_column": config["timestamp_col"],
            "create_ts_column": config["create_timestamp_col"],
            "source_description": config["description"],
            "owner": config["owner"],
        }

    return file_source_template.render(data)


def generate_prefix():
    prefix_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "prefix.txt"
    )

    with open(prefix_path, "r") as f:
        prefix = f.read()
        return prefix


def generate_feature_service(service_name: str, view_name: str):
    service_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "feature_service.txt"
    )

    with open(service_template_path, "r") as f:
        template_string = f.read()
        service_template = Template(template_string)
        data = {
            "feature_service_name": service_name,
            "view_name": view_name,
        }

        return service_template.render(data)


def generate_feature_description(types: list, feast_config: dict, file_name: str):
    print("Building feature definitions for feature_store")
    prefix = generate_prefix()

    file_source_config = feast_config["file_source"]
    file_source_definition = generate_file_source(file_source_config, file_name)

    entity_config = feast_config["entity"]
    entity_definition = generate_entity_definition(entity_config)

    feature_view_config = feast_config["feature_view"]
    columns_to_exclude = [
        feast_config["entity"]["id_col"],
        feast_config["file_source"]["timestamp_col"],
        feast_config["file_source"]["create_timestamp_col"],
    ]
    feature_view = generate_feature_view(
        types, columns_to_exclude, feature_view_config, entity_config["name"]
    )

    feature_service = (
        generate_feature_service(
            feast_config["service_name"], feature_view_config["name"]
        )
        if "service_name" in feast_config
        else ""
    )

    complete_file_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "complete_file.txt"
    )

    with open(complete_file_path, "r") as f:
        template_string = f.read()

        complete_file_template = Template(template_string)
        data = {
            "prefix": prefix,
            "file_source": file_source_definition,
            "entity": entity_definition,
            "feature_view": feature_view,
            "feature_service": feature_service,
        }

        file_content = complete_file_template.render(data)
        file_content = format_str(file_content, mode=FileMode())
        file_content = isort.code(file_content)

        feature_file = os.path.join(feast_config["file_path"], "anovos.py")
        with open(feature_file, "w") as of:
            of.write(file_content)


def add_timestamp_columns(idf: DataFrame, feast_file_source__config: dict):
    print("Adding timestamp columns")
    return idf.withColumn(
        feast_file_source__config["timestamp_col"], lit(datetime.now())
    ).withColumn(feast_file_source__config["create_timestamp_col"], lit(datetime.now()))

Functions

def add_timestamp_columns(idf: pyspark.sql.dataframe.DataFrame, feast_file_source__config: dict)
Expand source code
def add_timestamp_columns(idf: DataFrame, feast_file_source__config: dict):
    print("Adding timestamp columns")
    return idf.withColumn(
        feast_file_source__config["timestamp_col"], lit(datetime.now())
    ).withColumn(feast_file_source__config["create_timestamp_col"], lit(datetime.now()))
def check_feast_configuration(feast_config: dict, repartition_count: int)
Expand source code
def check_feast_configuration(feast_config: dict, repartition_count: int):
    if repartition_count != 1:
        raise ValueError(
            "Please, set repartition parameter to 1 in write_main block in your config yml!"
        )
    if "file_path" not in feast_config:
        raise ValueError(
            "Please, provide a path to the anovos feature_store repository!"
        )
    if "entity" not in feast_config:
        raise ValueError("Please, provide an entity definition in your config yml!")
    if "file_source" not in feast_config:
        raise ValueError("Please, provide a file source definition in your config yml!")
    if "feature_view" not in feast_config:
        raise ValueError(
            "Please, provide a feature view definition in your config yml!"
        )
def generate_entity_definition(config: dict) ‑> str
Expand source code
def generate_entity_definition(config: dict) -> str:
    source_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "entity.txt"
    )

    with open(source_template_path, "r") as f:
        template_string = f.read()
        entity_template = Template(template_string)
        data = {
            "entity_name": config["name"],
            "join_keys": config["id_col"],
            "value_type": "STRING",
            "description": config["description"],
        }

        return entity_template.render(data)
def generate_feature_description(types: list, feast_config: dict, file_name: str)
Expand source code
def generate_feature_description(types: list, feast_config: dict, file_name: str):
    print("Building feature definitions for feature_store")
    prefix = generate_prefix()

    file_source_config = feast_config["file_source"]
    file_source_definition = generate_file_source(file_source_config, file_name)

    entity_config = feast_config["entity"]
    entity_definition = generate_entity_definition(entity_config)

    feature_view_config = feast_config["feature_view"]
    columns_to_exclude = [
        feast_config["entity"]["id_col"],
        feast_config["file_source"]["timestamp_col"],
        feast_config["file_source"]["create_timestamp_col"],
    ]
    feature_view = generate_feature_view(
        types, columns_to_exclude, feature_view_config, entity_config["name"]
    )

    feature_service = (
        generate_feature_service(
            feast_config["service_name"], feature_view_config["name"]
        )
        if "service_name" in feast_config
        else ""
    )

    complete_file_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "complete_file.txt"
    )

    with open(complete_file_path, "r") as f:
        template_string = f.read()

        complete_file_template = Template(template_string)
        data = {
            "prefix": prefix,
            "file_source": file_source_definition,
            "entity": entity_definition,
            "feature_view": feature_view,
            "feature_service": feature_service,
        }

        file_content = complete_file_template.render(data)
        file_content = format_str(file_content, mode=FileMode())
        file_content = isort.code(file_content)

        feature_file = os.path.join(feast_config["file_path"], "anovos.py")
        with open(feature_file, "w") as of:
            of.write(file_content)
def generate_feature_service(service_name: str, view_name: str)
Expand source code
def generate_feature_service(service_name: str, view_name: str):
    service_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "feature_service.txt"
    )

    with open(service_template_path, "r") as f:
        template_string = f.read()
        service_template = Template(template_string)
        data = {
            "feature_service_name": service_name,
            "view_name": view_name,
        }

        return service_template.render(data)
def generate_feature_view(types: list, exclude_list: list, config: dict, entity_name: str) ‑> str
Expand source code
def generate_feature_view(
    types: list, exclude_list: list, config: dict, entity_name: str
) -> str:
    source_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "feature_view.txt"
    )

    with open(source_template_path, "r") as f:
        template_string = f.read()

        fields = generate_fields(types, exclude_list)

        feature_view_template = Template(template_string)
        data = {
            "feature_view_name": config["name"],
            "source": ANOVOS_SOURCE,
            "view_name": config["name"],
            "entity": entity_name,
            "fields": fields,
            "ttl_in_seconds": config["ttl_in_seconds"],
            "owner": config["owner"],
        }

        return feature_view_template.render(data)
def generate_field(field_name: str, field_type: str) ‑> str
Expand source code
def generate_field(field_name: str, field_type: str) -> str:
    template_string = """ Field(name="{{name}}", dtype={{type}}),\n"""
    field_template = Template(template_string)

    return field_template.render({"name": field_name, "type": field_type})
def generate_fields(types: list, exclude_list: list) ‑> str
Expand source code
def generate_fields(types: list, exclude_list: list) -> str:
    fields = ""
    for (field_name, field_type) in types:
        if field_name not in exclude_list:
            fields += generate_field(
                field_name, dataframe_to_feast_type_mapping[field_type]
            )

    return fields
def generate_file_source(config: dict, file_name='Test') ‑> str
Expand source code
def generate_file_source(config: dict, file_name="Test") -> str:
    source_template_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "file_source.txt"
    )

    with open(source_template_path, "r") as f:
        template_string = f.read()

        file_source_template = Template(template_string)
        data = {
            "source_name": ANOVOS_SOURCE,
            "filename": file_name,
            "ts_column": config["timestamp_col"],
            "create_ts_column": config["create_timestamp_col"],
            "source_description": config["description"],
            "owner": config["owner"],
        }

    return file_source_template.render(data)
def generate_prefix()
Expand source code
def generate_prefix():
    prefix_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "templates", "prefix.txt"
    )

    with open(prefix_path, "r") as f:
        prefix = f.read()
        return prefix