Turkish Food Prices Dataset Example

This example assumes you have the test database up and running locally. From the root of the project run the command: make infra-up.

[1]:
from datetime import datetime
import logging
from typing import Any, List

import polars as pl

from polars_hist_db.config import Config
from polars_hist_db.core import (
    AuditOps,
    DataframeOps,
    TableConfigOps,
    TimeHint,
    make_engine,
)
from polars_hist_db.dataset import run_workflows
from polars_hist_db.loaders import FunctionRegistry

Initialise Configuration and Logging

This example uses configuration and data used in the test suite.

[2]:
config_path = "../tests/_testdata_dataset_configs/foodprices_dataset.yaml"
data_path = "../tests/_testdata_dataset_configs/foodprices_data.csv"

logging.basicConfig(level=logging.INFO)

config = Config.from_yaml(config_path)
loading Config from: ../tests/_testdata_dataset_configs/foodprices_dataset.yaml

(Optionally) Print the config file to the console

[3]:
with open(config_path, "r") as f:
    print(f.read())
db:
  backend: mariadb
  hostname: 127.0.0.1
  port: 3306
  username: root
  password: admin

table_configs:
  - name: unit_info
    schema: test
    is_temporal: false
    delta_config:
      drop_unchanged_rows: false
      # pre_prune_idential_rows: false
      # truncate_table_first: false
      on_duplicate_key: take_last
    primary_keys:
      - id
    columns:
      - id<um_id
      - name<um_name

  - name: product_info
    schema: test
    is_temporal: false
    delta_config:
      drop_unchanged_rows: false
      # pre_prune_idential_rows: false
      # truncate_table_first: false
      on_duplicate_key: take_last
    primary_keys:
      - id
    columns:
      - id<product_id
      - name<product_name

  - name: food_prices
    schema: test
    is_temporal: true
    delta_config:
      drop_unchanged_rows: true
      # pre_prune_idential_rows: true
      # truncate_table_first: true
      time_partition:
        column: time
        truncate: 5d
        unique_strategy: last
    primary_keys:
      - product_id
      - um_id
    foreign_keys:
      - { name: product_id, references: { table: product_info, column: id } }
      - { name: um_id, references: { table: unit_info, column: id } }
    columns:
        - product_id
        - um_id
        - price
        - price_usd

column_definitions:
  - { name: region, data_type: VARCHAR(32), header: Place, nullable: false }
  - { name: product_id, data_type: INT, header: ProductId, nullable: false }
  - { name: product_name, data_type: VARCHAR(64), header: ProductName }
  - { name: um_id, data_type: INT, header: UmId, nullable: false }
  - { name: um_name, data_type: VARCHAR(16), header: UmName }
  - {
      name: price,
      data_type: 'DECIMAL(10,4)',
      header: Price
    }
  - {
      name: price_usd,
      data_type: 'DECIMAL(10,4)',
      transforms: {
        try_to_usd: [ 'Price', 'Year' ]
      }
    }
  - { name: month, data_type: VARCHAR(2), header: Month }
  - { name: year, data_type: INT, header: Year }
  - {
      name: time,
      data_type: DATETIME,
      transforms: { combine_columns: ['${Year}', '-', '${Month}', '-01' ] }
    }

datasets:
  - name: turkey_food_prices
    delta_table_schema: test
    scrape_limit: ~
    search_paths:
      - root_path: ../_testdata_dataset_data
        file_include: ['turkey_food_prices.csv']
        is_enabled: true
        timestamp:
          source_tz: Europe/London
          target_tz: UTC
          method: mtime

    pipeline:
      - table: unit_info
        columns:
          - id<um_id!
          - name<um_name!

      - table: product_info
        columns:
          - id<product_id!
          - name<product_name!

      - table: food_prices
        type: primary
        columns:
        - product_id!
        - um_id!
        - price!
        - price_usd!

Create a SQLAlchemy Engine to connect to the database

[4]:
engine = make_engine(
    backend="mariadb",
    hostname="127.0.0.1",
    port=3306,
    username="root",
    password="admin",
)

Register a custom parser function

The food prices from the data soruce are given in Turkish TRY.

For some reason, the team only cares about the USD price. As per the dataset config, a yearly TRYUSD fx-rate is applied to the price column at scrape time, creating a derived price_usd column of type DECIMAL(10,4) in the database.

[5]:
registry = FunctionRegistry()


def custom_try_to_usd(df: pl.DataFrame, args: List[Any]) -> pl.DataFrame:
    usdtry_fx_rates = pl.from_dict(
        {
            "Year": [
                2010,
                2011,
                2012,
                2013,
                2014,
                2015,
                2016,
                2017,
                2018,
                2019,
                2020,
                2021,
                2022,
                2023,
            ],
            "fx_usdtry": [
                1.507,
                1.674,
                1.802,
                1.915,
                2.188,
                2.724,
                3.020,
                3.646,
                4.830,
                5.680,
                7.004,
                8.886,
                16.566,
                23.085,
            ],
        }
    )

    col_result = args[0]
    col_try = args[1]
    col_year = args[2]
    df = (
        df.join(usdtry_fx_rates, left_on=col_year, right_on="Year", how="left")
        .with_columns((pl.col(col_try) * 1 / pl.col("fx_usdtry")).alias(col_result))
        .drop("fx_usdtry")
    )

    return df


registry.delete_function("try_to_usd")
registry.register_function("try_to_usd", custom_try_to_usd)

print("loaded functions", registry.list_functions())
loaded functions ['null_if_gte', 'apply_type_casts', 'combine_columns', 'try_to_usd']

Run the workflow

This scrapes any new files into to the database.

(Try running the function a second time…)

[6]:
run_workflows(config, engine)
INFO:polars_hist_db.dataset.workflow:scraping dataset turkey_food_prices
INFO:polars_hist_db.dataset.workflow:starting ingest for food_prices
INFO:polars_hist_db.loaders.file_search:searching files ['turkey_food_prices.csv'] in ../tests/_testdata_dataset_data
INFO:polars_hist_db.loaders.file_search:found 1 files
INFO:polars_hist_db.loaders.file_search:found total 1 files
INFO:polars_hist_db.core.table_config:creating table test.__audit_log
INFO:polars_hist_db.core.table_config:creating table test.unit_info
INFO:polars_hist_db.core.table_config:creating table test.product_info
INFO:polars_hist_db.core.table_config:creating table test.food_prices
INFO:polars_hist_db.core.table_config:creating table test.turkey_food_prices
INFO:polars_hist_db.dataset.workflow:[1/1] processing file mtime=2025-01-01 00:00:01
INFO:polars_hist_db.loaders.dsv_loader:loading csv ../tests/_testdata_dataset_data/turkey_food_prices.csv
INFO:polars_hist_db.loaders.fn_registry:applying fn try_to_usd to dataframe (7381, 8)
INFO:polars_hist_db.loaders.fn_registry:applying fn combine_columns to dataframe (7381, 9)
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (1/75) 2013-04-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (2/75) 2013-05-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (3/75) 2013-11-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (4/75) 2013-12-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (5/75) 2013-12-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (6/75) 2014-01-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (7/75) 2014-03-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (8/75) 2014-04-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (9/75) 2014-05-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (10/75) 2014-06-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (11/75) 2014-07-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (12/75) 2014-08-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (13/75) 2014-09-27T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (14/75) 2014-11-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (15/75) 2014-12-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (16/75) 2014-12-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (17/75) 2015-01-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (18/75) 2015-03-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (19/75) 2015-03-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (20/75) 2015-04-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (21/75) 2015-05-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (22/75) 2015-06-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (23/75) 2015-07-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (24/75) 2015-08-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (25/75) 2015-09-27T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (26/75) 2015-11-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (27/75) 2015-12-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (28/75) 2015-12-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (29/75) 2016-01-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (30/75) 2016-02-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (31/75) 2016-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (32/75) 2016-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (33/75) 2016-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (34/75) 2016-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (35/75) 2016-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (36/75) 2016-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (37/75) 2016-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (38/75) 2016-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (39/75) 2016-11-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (40/75) 2016-12-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (41/75) 2017-01-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (42/75) 2017-02-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (43/75) 2017-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (44/75) 2017-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (45/75) 2017-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (46/75) 2017-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (47/75) 2017-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (48/75) 2017-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (49/75) 2017-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (50/75) 2017-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (51/75) 2017-11-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (52/75) 2017-12-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (53/75) 2018-01-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (54/75) 2018-02-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (55/75) 2018-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (56/75) 2018-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (57/75) 2018-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (58/75) 2018-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (59/75) 2018-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (60/75) 2018-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (61/75) 2018-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (62/75) 2018-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (63/75) 2018-11-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (64/75) 2018-12-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (65/75) 2019-01-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (66/75) 2019-02-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (67/75) 2019-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (68/75) 2019-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (69/75) 2019-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (70/75) 2019-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (71/75) 2019-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (72/75) 2019-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (73/75) 2019-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (74/75) 2019-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (75/75) 2019-11-30T00:00:00
INFO:polars_hist_db.dataset.workflow:stopped scrape - food_prices

Querying the temporal tables

Query the latest food prices.

[7]:
with engine.begin() as connection:
    latest_food_prices_df = DataframeOps(connection).from_table("test", "food_prices")

Query all the food prices.

[8]:
with engine.begin() as connection:
    time_hint = TimeHint(mode="all")
    all_food_prices = DataframeOps(connection).from_table(
        "test", "food_prices", time_hint
    )


all_food_prices
[8]:
shape: (2_847, 6)
product_idum_idpriceprice_usd__valid_from__valid_to
i32i32decimal[10,4]decimal[10,4]datetime[μs]datetime[μs]
5254.49202.34562013-04-30 00:00:002013-05-30 00:00:00
5254.57862.39092013-05-30 00:00:002013-11-01 00:00:00
5254.78652.49942013-11-01 00:00:002013-12-01 00:00:00
5255.13372.68072013-12-01 00:00:002013-12-31 00:00:00
5255.50992.51822013-12-31 00:00:002014-01-30 00:00:00
502547.90408.43382019-07-28 00:00:002019-09-01 00:00:00
502549.11768.64742019-09-01 00:00:002019-10-01 00:00:00
502550.83478.94972019-10-01 00:00:002019-10-31 00:00:00
502551.79859.11942019-10-31 00:00:002019-11-30 00:00:00
502551.60719.08572019-11-30 00:00:002106-02-07 06:28:15.999999

Query the food prices at a specific point in time. In this case 1-Jan-2015.

[9]:
with engine.begin() as connection:
    time_hint = TimeHint(mode="asof", asof_utc=datetime(2015, 1, 1))
    food_prices_at_2015_date = DataframeOps(connection).from_table(
        "test", "food_prices", time_hint
    )


food_prices_at_2015_date
[9]:
shape: (34, 6)
product_idum_idpriceprice_usd__valid_from__valid_to
i32i32decimal[10,4]decimal[10,4]datetime[μs]datetime[μs]
5256.75502.47982014-12-31 00:00:002015-01-30 00:00:00
5852.68400.98532014-12-31 00:00:002015-01-30 00:00:00
6657.66502.81382014-12-31 00:00:002015-01-30 00:00:00
92330.32100.11782014-12-31 00:00:002015-01-30 00:00:00
9456.67202.44932014-12-31 00:00:002015-01-30 00:00:00
36051.11600.40962014-12-31 00:00:002015-01-30 00:00:00
401513.48404.95002014-12-31 00:00:002015-01-30 00:00:00
433528.239010.36672014-12-31 00:00:002015-01-30 00:00:00
463152.84101.04292014-12-31 00:00:002015-01-30 00:00:00
502537.337013.70662014-12-31 00:00:002015-01-30 00:00:00

Delete the data associated with the dataset

Reset the example. Subsequent attempts to upload the same data (or past data) into the database will fail.

[10]:
with engine.begin() as connection:
    TableConfigOps(connection).drop_all(config.tables)
    AuditOps(config.tables.schemas()[0]).drop(connection)
INFO:polars_hist_db.core.table_config:dropped table food_prices
INFO:polars_hist_db.core.table_config:dropped table product_info
INFO:polars_hist_db.core.table_config:dropped table unit_info
INFO:polars_hist_db.core.table_config:dropped table __audit_log