Turkish Food Prices Dataset Example

This example assumes you have the test database up and running locally. From the root of the project run the command: make infra-up.

[1]:
from datetime import datetime
import logging
from typing import Any, List

import polars as pl

from polars_hist_db.config import Config
from polars_hist_db.core import (
    AuditOps,
    DataframeOps,
    TableConfigOps,
    TimeHint,
    make_engine,
)
from polars_hist_db.dataset import run_datasets
from polars_hist_db.loaders import FunctionRegistry

Initialise Configuration and Logging

This example uses configuration and data used in the test suite.

[ ]:
config_path = "../tests/_testdata_dataset_configs/dataset_foodprices.yaml"
data_path = "../tests/_testdata_dataset_configs/foodprices_data.csv"

logging.basicConfig(level=logging.INFO)

config = Config.from_yaml(config_path)

(Optionally) Print the config file to the console

[ ]:
with open(config_path, "r") as f:
    print(f.read())

Create a SQLAlchemy Engine to connect to the database

[4]:
engine = make_engine(
    backend="mariadb",
    hostname="127.0.0.1",
    port=3306,
    username="root",
    password="admin",
)

Register a custom parser function

The food prices from the data soruce are given in Turkish TRY.

For some reason, the team only cares about the USD price. As per the dataset config, a yearly TRYUSD fx-rate is applied to the price column at scrape time, creating a derived price_usd column of type DECIMAL(10,4) in the database.

[ ]:
registry = FunctionRegistry()


def custom_try_to_usd(df: pl.DataFrame, args: List[Any]) -> pl.DataFrame:
    usdtry_fx_rates = pl.from_dict(
        {
            "Year": [
                2010,
                2011,
                2012,
                2013,
                2014,
                2015,
                2016,
                2017,
                2018,
                2019,
                2020,
                2021,
                2022,
                2023,
            ],
            "fx_usdtry": [
                1.507,
                1.674,
                1.802,
                1.915,
                2.188,
                2.724,
                3.020,
                3.646,
                4.830,
                5.680,
                7.004,
                8.886,
                16.566,
                23.085,
            ],
        }
    )

    col_result = args[0]
    col_try = args[1]
    col_year = args[2]
    df = (
        df.join(usdtry_fx_rates, left_on=col_year, right_on="Year", how="left")
        .with_columns((pl.col(col_try) * 1 / pl.col("fx_usdtry")).alias(col_result))
        .drop("fx_usdtry")
    )

    return df


registry.delete_function("try_to_usd")
registry.register_function("try_to_usd", custom_try_to_usd)

print("loaded functions", registry.list_functions())

Run the dataset

This scrapes any new files into to the database.

(Try running the function a second time…)

[ ]:
run_datasets(config, engine)

Querying the temporal tables

Query the latest food prices.

[7]:
with engine.begin() as connection:
    latest_food_prices_df = DataframeOps(connection).from_table("test", "food_prices")

Query all the food prices.

[ ]:
with engine.begin() as connection:
    time_hint = TimeHint(mode="all")
    all_food_prices = DataframeOps(connection).from_table(
        "test", "food_prices", time_hint
    )


all_food_prices

Query the food prices at a specific point in time. In this case 1-Jan-2015.

[ ]:
with engine.begin() as connection:
    time_hint = TimeHint(mode="asof", asof_utc=datetime(2015, 1, 1))
    food_prices_at_2015_date = DataframeOps(connection).from_table(
        "test", "food_prices", time_hint
    )


food_prices_at_2015_date

Delete the data associated with the dataset

Reset the example. Subsequent attempts to upload the same data (or past data) into the database will fail.

[ ]:
with engine.begin() as connection:
    TableConfigOps(connection).drop_all(config.tables)
    AuditOps(config.tables.schemas()[0]).drop(connection)