Turkish Food Prices Dataset Example
This example assumes you have the test database up and running locally. From the root of the project run the command: make infra-up
.
[1]:
from datetime import datetime
import logging
from typing import Any, List
import polars as pl
from polars_hist_db.config import Config
from polars_hist_db.core import (
AuditOps,
DataframeOps,
TableConfigOps,
TimeHint,
make_engine,
)
from polars_hist_db.dataset import run_datasets
from polars_hist_db.loaders import FunctionRegistry
Initialise Configuration and Logging
This example uses configuration and data used in the test suite.
[ ]:
config_path = "../tests/_testdata_dataset_configs/dataset_foodprices.yaml"
data_path = "../tests/_testdata_dataset_configs/foodprices_data.csv"
logging.basicConfig(level=logging.INFO)
config = Config.from_yaml(config_path)
(Optionally) Print the config file to the console
[ ]:
with open(config_path, "r") as f:
print(f.read())
Create a SQLAlchemy Engine to connect to the database
[4]:
engine = make_engine(
backend="mariadb",
hostname="127.0.0.1",
port=3306,
username="root",
password="admin",
)
Register a custom parser function
The food prices from the data soruce are given in Turkish TRY.
For some reason, the team only cares about the USD price. As per the dataset config, a yearly TRYUSD fx-rate is applied to the price
column at scrape time, creating a derived price_usd
column of type DECIMAL(10,4)
in the database.
[ ]:
registry = FunctionRegistry()
def custom_try_to_usd(df: pl.DataFrame, args: List[Any]) -> pl.DataFrame:
usdtry_fx_rates = pl.from_dict(
{
"Year": [
2010,
2011,
2012,
2013,
2014,
2015,
2016,
2017,
2018,
2019,
2020,
2021,
2022,
2023,
],
"fx_usdtry": [
1.507,
1.674,
1.802,
1.915,
2.188,
2.724,
3.020,
3.646,
4.830,
5.680,
7.004,
8.886,
16.566,
23.085,
],
}
)
col_result = args[0]
col_try = args[1]
col_year = args[2]
df = (
df.join(usdtry_fx_rates, left_on=col_year, right_on="Year", how="left")
.with_columns((pl.col(col_try) * 1 / pl.col("fx_usdtry")).alias(col_result))
.drop("fx_usdtry")
)
return df
registry.delete_function("try_to_usd")
registry.register_function("try_to_usd", custom_try_to_usd)
print("loaded functions", registry.list_functions())
Run the dataset
This scrapes any new files into to the database.
(Try running the function a second time…)
[ ]:
run_datasets(config, engine)
Querying the temporal tables
Query the latest food prices.
[7]:
with engine.begin() as connection:
latest_food_prices_df = DataframeOps(connection).from_table("test", "food_prices")
Query all the food prices.
[ ]:
with engine.begin() as connection:
time_hint = TimeHint(mode="all")
all_food_prices = DataframeOps(connection).from_table(
"test", "food_prices", time_hint
)
all_food_prices
Query the food prices at a specific point in time. In this case 1-Jan-2015.
[ ]:
with engine.begin() as connection:
time_hint = TimeHint(mode="asof", asof_utc=datetime(2015, 1, 1))
food_prices_at_2015_date = DataframeOps(connection).from_table(
"test", "food_prices", time_hint
)
food_prices_at_2015_date
Delete the data associated with the dataset
Reset the example. Subsequent attempts to upload the same data (or past data) into the database will fail.
[ ]:
with engine.begin() as connection:
TableConfigOps(connection).drop_all(config.tables)
AuditOps(config.tables.schemas()[0]).drop(connection)