Turkish Food Prices Dataset Example
This example assumes you have the test database up and running locally. From the root of the project run the command: make infra-up
.
[1]:
from datetime import datetime
import logging
from typing import Any, List
import polars as pl
from polars_hist_db.config import Config
from polars_hist_db.core import (
AuditOps,
DataframeOps,
TableConfigOps,
TimeHint,
make_engine,
)
from polars_hist_db.dataset import run_workflows
from polars_hist_db.loaders import FunctionRegistry
Initialise Configuration and Logging
This example uses configuration and data used in the test suite.
[2]:
config_path = "../tests/_testdata_dataset_configs/foodprices_dataset.yaml"
data_path = "../tests/_testdata_dataset_configs/foodprices_data.csv"
logging.basicConfig(level=logging.INFO)
config = Config.from_yaml(config_path)
loading Config from: ../tests/_testdata_dataset_configs/foodprices_dataset.yaml
(Optionally) Print the config file to the console
[3]:
with open(config_path, "r") as f:
print(f.read())
db:
backend: mariadb
hostname: 127.0.0.1
port: 3306
username: root
password: admin
table_configs:
- name: unit_info
schema: test
is_temporal: false
delta_config:
drop_unchanged_rows: false
# pre_prune_idential_rows: false
# truncate_table_first: false
on_duplicate_key: take_last
primary_keys:
- id
columns:
- id<um_id
- name<um_name
- name: product_info
schema: test
is_temporal: false
delta_config:
drop_unchanged_rows: false
# pre_prune_idential_rows: false
# truncate_table_first: false
on_duplicate_key: take_last
primary_keys:
- id
columns:
- id<product_id
- name<product_name
- name: food_prices
schema: test
is_temporal: true
delta_config:
drop_unchanged_rows: true
# pre_prune_idential_rows: true
# truncate_table_first: true
time_partition:
column: time
truncate: 5d
unique_strategy: last
primary_keys:
- product_id
- um_id
foreign_keys:
- { name: product_id, references: { table: product_info, column: id } }
- { name: um_id, references: { table: unit_info, column: id } }
columns:
- product_id
- um_id
- price
- price_usd
column_definitions:
- { name: region, data_type: VARCHAR(32), header: Place, nullable: false }
- { name: product_id, data_type: INT, header: ProductId, nullable: false }
- { name: product_name, data_type: VARCHAR(64), header: ProductName }
- { name: um_id, data_type: INT, header: UmId, nullable: false }
- { name: um_name, data_type: VARCHAR(16), header: UmName }
- {
name: price,
data_type: 'DECIMAL(10,4)',
header: Price
}
- {
name: price_usd,
data_type: 'DECIMAL(10,4)',
transforms: {
try_to_usd: [ 'Price', 'Year' ]
}
}
- { name: month, data_type: VARCHAR(2), header: Month }
- { name: year, data_type: INT, header: Year }
- {
name: time,
data_type: DATETIME,
transforms: { combine_columns: ['${Year}', '-', '${Month}', '-01' ] }
}
datasets:
- name: turkey_food_prices
delta_table_schema: test
scrape_limit: ~
search_paths:
- root_path: ../_testdata_dataset_data
file_include: ['turkey_food_prices.csv']
is_enabled: true
timestamp:
source_tz: Europe/London
target_tz: UTC
method: mtime
pipeline:
- table: unit_info
columns:
- id<um_id!
- name<um_name!
- table: product_info
columns:
- id<product_id!
- name<product_name!
- table: food_prices
type: primary
columns:
- product_id!
- um_id!
- price!
- price_usd!
Create a SQLAlchemy Engine to connect to the database
[4]:
engine = make_engine(
backend="mariadb",
hostname="127.0.0.1",
port=3306,
username="root",
password="admin",
)
Register a custom parser function
The food prices from the data soruce are given in Turkish TRY.
For some reason, the team only cares about the USD price. As per the dataset config, a yearly TRYUSD fx-rate is applied to the price
column at scrape time, creating a derived price_usd
column of type DECIMAL(10,4)
in the database.
[5]:
registry = FunctionRegistry()
def custom_try_to_usd(df: pl.DataFrame, args: List[Any]) -> pl.DataFrame:
usdtry_fx_rates = pl.from_dict(
{
"Year": [
2010,
2011,
2012,
2013,
2014,
2015,
2016,
2017,
2018,
2019,
2020,
2021,
2022,
2023,
],
"fx_usdtry": [
1.507,
1.674,
1.802,
1.915,
2.188,
2.724,
3.020,
3.646,
4.830,
5.680,
7.004,
8.886,
16.566,
23.085,
],
}
)
col_result = args[0]
col_try = args[1]
col_year = args[2]
df = (
df.join(usdtry_fx_rates, left_on=col_year, right_on="Year", how="left")
.with_columns((pl.col(col_try) * 1 / pl.col("fx_usdtry")).alias(col_result))
.drop("fx_usdtry")
)
return df
registry.delete_function("try_to_usd")
registry.register_function("try_to_usd", custom_try_to_usd)
print("loaded functions", registry.list_functions())
loaded functions ['null_if_gte', 'apply_type_casts', 'combine_columns', 'try_to_usd']
Run the workflow
This scrapes any new files into to the database.
(Try running the function a second time…)
[6]:
run_workflows(config, engine)
INFO:polars_hist_db.dataset.workflow:scraping dataset turkey_food_prices
INFO:polars_hist_db.dataset.workflow:starting ingest for food_prices
INFO:polars_hist_db.loaders.file_search:searching files ['turkey_food_prices.csv'] in ../tests/_testdata_dataset_data
INFO:polars_hist_db.loaders.file_search:found 1 files
INFO:polars_hist_db.loaders.file_search:found total 1 files
INFO:polars_hist_db.core.table_config:creating table test.__audit_log
INFO:polars_hist_db.core.table_config:creating table test.unit_info
INFO:polars_hist_db.core.table_config:creating table test.product_info
INFO:polars_hist_db.core.table_config:creating table test.food_prices
INFO:polars_hist_db.core.table_config:creating table test.turkey_food_prices
INFO:polars_hist_db.dataset.workflow:[1/1] processing file mtime=2025-01-01 00:00:01
INFO:polars_hist_db.loaders.dsv_loader:loading csv ../tests/_testdata_dataset_data/turkey_food_prices.csv
INFO:polars_hist_db.loaders.fn_registry:applying fn try_to_usd to dataframe (7381, 8)
INFO:polars_hist_db.loaders.fn_registry:applying fn combine_columns to dataframe (7381, 9)
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (1/75) 2013-04-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (2/75) 2013-05-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (3/75) 2013-11-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (4/75) 2013-12-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (5/75) 2013-12-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (6/75) 2014-01-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (7/75) 2014-03-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (8/75) 2014-04-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (9/75) 2014-05-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (10/75) 2014-06-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (11/75) 2014-07-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (12/75) 2014-08-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (13/75) 2014-09-27T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (14/75) 2014-11-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (15/75) 2014-12-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (16/75) 2014-12-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (17/75) 2015-01-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (18/75) 2015-03-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (19/75) 2015-03-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (20/75) 2015-04-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (21/75) 2015-05-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (22/75) 2015-06-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (23/75) 2015-07-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (24/75) 2015-08-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (25/75) 2015-09-27T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (26/75) 2015-11-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (27/75) 2015-12-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (28/75) 2015-12-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (29/75) 2016-01-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (30/75) 2016-02-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (31/75) 2016-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (32/75) 2016-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (33/75) 2016-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (34/75) 2016-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (35/75) 2016-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (36/75) 2016-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (37/75) 2016-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (38/75) 2016-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (39/75) 2016-11-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (40/75) 2016-12-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (41/75) 2017-01-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (42/75) 2017-02-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (43/75) 2017-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (44/75) 2017-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (45/75) 2017-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (46/75) 2017-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (47/75) 2017-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (48/75) 2017-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (49/75) 2017-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (50/75) 2017-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (51/75) 2017-11-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (52/75) 2017-12-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (53/75) 2018-01-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (54/75) 2018-02-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (55/75) 2018-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (56/75) 2018-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (57/75) 2018-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (58/75) 2018-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (59/75) 2018-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (60/75) 2018-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (61/75) 2018-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (62/75) 2018-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (63/75) 2018-11-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (64/75) 2018-12-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (65/75) 2019-01-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (66/75) 2019-02-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (67/75) 2019-03-30T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (68/75) 2019-04-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (69/75) 2019-05-29T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (70/75) 2019-06-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (71/75) 2019-07-28T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (72/75) 2019-09-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (73/75) 2019-10-01T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (74/75) 2019-10-31T00:00:00
INFO:polars_hist_db.dataset.scrape:-- processing time_partition (75/75) 2019-11-30T00:00:00
INFO:polars_hist_db.dataset.workflow:stopped scrape - food_prices
Querying the temporal tables
Query the latest food prices.
[7]:
with engine.begin() as connection:
latest_food_prices_df = DataframeOps(connection).from_table("test", "food_prices")
Query all the food prices.
[8]:
with engine.begin() as connection:
time_hint = TimeHint(mode="all")
all_food_prices = DataframeOps(connection).from_table(
"test", "food_prices", time_hint
)
all_food_prices
[8]:
product_id | um_id | price | price_usd | __valid_from | __valid_to |
---|---|---|---|---|---|
i32 | i32 | decimal[10,4] | decimal[10,4] | datetime[μs] | datetime[μs] |
52 | 5 | 4.4920 | 2.3456 | 2013-04-30 00:00:00 | 2013-05-30 00:00:00 |
52 | 5 | 4.5786 | 2.3909 | 2013-05-30 00:00:00 | 2013-11-01 00:00:00 |
52 | 5 | 4.7865 | 2.4994 | 2013-11-01 00:00:00 | 2013-12-01 00:00:00 |
52 | 5 | 5.1337 | 2.6807 | 2013-12-01 00:00:00 | 2013-12-31 00:00:00 |
52 | 5 | 5.5099 | 2.5182 | 2013-12-31 00:00:00 | 2014-01-30 00:00:00 |
… | … | … | … | … | … |
502 | 5 | 47.9040 | 8.4338 | 2019-07-28 00:00:00 | 2019-09-01 00:00:00 |
502 | 5 | 49.1176 | 8.6474 | 2019-09-01 00:00:00 | 2019-10-01 00:00:00 |
502 | 5 | 50.8347 | 8.9497 | 2019-10-01 00:00:00 | 2019-10-31 00:00:00 |
502 | 5 | 51.7985 | 9.1194 | 2019-10-31 00:00:00 | 2019-11-30 00:00:00 |
502 | 5 | 51.6071 | 9.0857 | 2019-11-30 00:00:00 | 2106-02-07 06:28:15.999999 |
Query the food prices at a specific point in time. In this case 1-Jan-2015.
[9]:
with engine.begin() as connection:
time_hint = TimeHint(mode="asof", asof_utc=datetime(2015, 1, 1))
food_prices_at_2015_date = DataframeOps(connection).from_table(
"test", "food_prices", time_hint
)
food_prices_at_2015_date
[9]:
product_id | um_id | price | price_usd | __valid_from | __valid_to |
---|---|---|---|---|---|
i32 | i32 | decimal[10,4] | decimal[10,4] | datetime[μs] | datetime[μs] |
52 | 5 | 6.7550 | 2.4798 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
58 | 5 | 2.6840 | 0.9853 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
66 | 5 | 7.6650 | 2.8138 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
92 | 33 | 0.3210 | 0.1178 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
94 | 5 | 6.6720 | 2.4493 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
… | … | … | … | … | … |
360 | 5 | 1.1160 | 0.4096 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
401 | 5 | 13.4840 | 4.9500 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
433 | 5 | 28.2390 | 10.3667 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
463 | 15 | 2.8410 | 1.0429 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
502 | 5 | 37.3370 | 13.7066 | 2014-12-31 00:00:00 | 2015-01-30 00:00:00 |
Delete the data associated with the dataset
Reset the example. Subsequent attempts to upload the same data (or past data) into the database will fail.
[10]:
with engine.begin() as connection:
TableConfigOps(connection).drop_all(config.tables)
AuditOps(config.tables.schemas()[0]).drop(connection)
INFO:polars_hist_db.core.table_config:dropped table food_prices
INFO:polars_hist_db.core.table_config:dropped table product_info
INFO:polars_hist_db.core.table_config:dropped table unit_info
INFO:polars_hist_db.core.table_config:dropped table __audit_log