Advanced topics¶
Using the Snowflake deserializer¶
diepvries
is bundled with a deserializer: it is able to extract
metadata from Snowflake in order to automatically instantiate Data
Vault entities. As long as tables follow the naming conventions,
Hub
,
Satellite
and
Link
objects can be created
automatically.
When using the deserializer, a connection to your database will be
initiated, in order to query the metadata tables. This will provide
diepvries
with all the information required to automatically
create Data Vault entity objects.
Here’s how to use the deserializer:
from diepvries.deserializers.snowflake_deserializer import (
DatabaseConfiguration,
SnowflakeDeserializer,
)
def deserialize():
database_configuration = DatabaseConfiguration(
database="<DB>",
user="<USER>",
password="<PASSWORD>",
warehouse="<WAREHOUSE>",
account="<ACCOUNT>",
)
deserializer = SnowflakeDeserializer(
target_schema="dv",
target_tables=["h_customer", "hs_customer"],
database_configuration=database_configuration,
)
print(deserializer.deserialized_target_tables)
print([x.name for x in deserializer.deserialized_target_tables])
if __name__ == "__main__":
deserialize()
% python3 deserializer.py
[
<diepvries.hub.Hub object at 0x7fbe3cc5baf0>,
<diepvries.satellite.Satellite object at 0x7fbe3cc5be20>
]
['h_customer', 'hs_customer']
The main advantage of the deserializer is DRY: your Data Vault DDLs
can live anywhere, stored in your favourite format, and diepvries
will extrapolate this information, so you don’t have to describe the
tables in Python.
Not using the deserializer¶
It is also possible to use diepvries
without the deserializer, if
you prefer to “manually” configure it. For example, you might have
your Data Vault tables configuration stored in JSON or YAML, and you
could use those files to instantiate Hubs, Links and
Satellites. Another potential reason is if you don’t have access to
your database instance when diepvries
is running, maybe you only
want to generate SQL and then forward the queries to another component
of your system.
In that case, you can instantiate everything manually:
from datetime import datetime, timezone
from diepvries import FieldDataType
from diepvries.data_vault_load import DataVaultLoad
from diepvries.field import Field
from diepvries.hub import Hub
from diepvries.satellite import Satellite
def prepare_sql():
# Instantiate structures.
hub_foo = Hub(
schema="dv",
name="h_foo",
fields=[
Field(
parent_table_name="h_foo",
name="h_foo_hashkey",
data_type=FieldDataType.NUMBER,
position=1,
is_mandatory=True,
),
Field(
parent_table_name="h_foo",
name="r_source",
data_type=FieldDataType.NUMBER,
position=2,
is_mandatory=True,
),
Field(
parent_table_name="h_foo",
name="r_timestamp",
data_type=FieldDataType.NUMBER,
position=3,
is_mandatory=True,
),
Field(
parent_table_name="h_foo",
name="foo_id",
data_type=FieldDataType.TEXT,
position=4,
is_mandatory=True,
),
],
)
hub_satellite_foo = Satellite(
schema="dv",
name="hs_foo",
fields=[
Field(
parent_table_name="hs_foo",
name="h_foo_hashkey",
data_type=FieldDataType.NUMBER,
position=1,
is_mandatory=True,
),
Field(
parent_table_name="hs_foo",
name="s_hashdiff",
data_type=FieldDataType.NUMBER,
position=2,
is_mandatory=True,
),
Field(
parent_table_name="hs_foo",
name="r_timestamp",
data_type=FieldDataType.NUMBER,
position=4,
is_mandatory=True,
),
Field(
parent_table_name="hs_foo",
name="r_timestamp_end",
data_type=FieldDataType.NUMBER,
position=5,
is_mandatory=True,
),
Field(
parent_table_name="hs_foo",
name="r_source",
data_type=FieldDataType.NUMBER,
position=3,
is_mandatory=True,
),
Field(
parent_table_name="hs_foo",
name="some_foo_property",
data_type=FieldDataType.TEXT,
position=6,
is_mandatory=True,
),
],
)
# Prepare data load.
dv_load = DataVaultLoad(
extract_schema="dv_extract",
extract_table="foobar",
staging_schema="dv_staging",
staging_table="foobar",
extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
target_tables=[hub_foo, hub_satellite_foo],
source="some_source",
)
# Show generated SQL statements.
for statement in dv_load.sql_load_script:
print(statement)
if __name__ == "__main__":
prepare_sql()
Role-playing hubs¶
Role-playing hubs are hubs not materialized as tables, but rather as
views pointing to a main hub. They are useful to model different roles
around the same concept. For example, on top of the account hub
h_account
, different role-playing hubs could be
h_account_supplier
and h_account_transporter
, implemented as
views pointing to h_account
.
diepvries
can work with role-playing hubs. The deserializer
supports them:
from diepvries.deserializers.snowflake_deserializer import (
DatabaseConfiguration,
SnowflakeDeserializer,
)
def deserialize():
database_configuration = DatabaseConfiguration(
database="<DB>",
user="<USER>",
password="<PASSWORD>",
warehouse="<WAREHOUSE>",
account="<ACCOUNT>",
)
deserializer = SnowflakeDeserializer(
target_schema="dv",
target_tables=["h_account", "h_account_supplier", "h_account_transporter"],
database_configuration=database_configuration,
role_playing_hubs={
"h_account_supplier": "h_account",
"h_account_transporter": "h_account",
},
)
print(deserializer.deserialized_target_tables)
print([x.name for x in deserializer.deserialized_target_tables])
if __name__ == "__main__":
deserialize()
% python3 rph_deserializer.py
[
<diepvries.hub.Hub object at 0x7fb83c455250>,
<diepvries.role_playing_hub.RolePlayingHub object at 0x7fb83e7ae700>,
<diepvries.role_playing_hub.RolePlayingHub object at 0x7fb83e7ae640>
]
['h_account', 'h_account_supplier', 'h_account_transporter']
Role-playing hubs can also be directly instantiated:
from datetime import datetime, timezone
from diepvries import FieldDataType
from diepvries.data_vault_load import DataVaultLoad
from diepvries.field import Field
from diepvries.hub import Hub
from diepvries.role_playing_hub import RolePlayingHub
from diepvries.satellite import Satellite
def prepare_sql():
# Instantiate structures.
hub_account = Hub(
schema="dv",
name="h_account",
fields=[
Field(
parent_table_name="h_account",
name="h_account_hashkey",
data_type=FieldDataType.NUMBER,
position=1,
is_mandatory=True,
),
Field(
parent_table_name="h_account",
name="r_source",
data_type=FieldDataType.NUMBER,
position=2,
is_mandatory=True,
),
Field(
parent_table_name="h_account",
name="r_timestamp",
data_type=FieldDataType.NUMBER,
position=3,
is_mandatory=True,
),
Field(
parent_table_name="h_account",
name="account_id",
data_type=FieldDataType.TEXT,
position=4,
is_mandatory=True,
),
],
)
role_playing_hub_account_supplier = RolePlayingHub(
schema="dv",
name="h_account_supplier",
fields=[
Field(
parent_table_name="h_account_supplier",
name="h_account_supplier_hashkey",
data_type=FieldDataType.NUMBER,
position=1,
is_mandatory=True,
),
Field(
parent_table_name="h_account_supplier",
name="r_source",
data_type=FieldDataType.NUMBER,
position=2,
is_mandatory=True,
),
Field(
parent_table_name="h_account_supplier",
name="r_timestamp",
data_type=FieldDataType.NUMBER,
position=3,
is_mandatory=True,
),
Field(
parent_table_name="h_account_supplier",
name="account_id",
data_type=FieldDataType.TEXT,
position=4,
is_mandatory=True,
),
],
)
role_playing_hub_account_supplier.parent_table = hub_account
# Prepare data load.
dv_load = DataVaultLoad(
extract_schema="dv_extract",
extract_table="foobar",
staging_schema="dv_staging",
staging_table="foobar",
extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
target_tables=[hub_account, role_playing_hub_account_supplier],
source="some_source",
)
# Show generated SQL statements.
for statement in dv_load.sql_load_script:
print(statement)
if __name__ == "__main__":
prepare_sql()
Effectivity satellites¶
Effectivity satellites model links which can change over time. For example, imagine a link between a Customer and a Contact entry. A Customer can have multiple Contact entries, however it could be required to keep only one Contact entry per Customer open at a given point in time. This means that, if a Customer changes its Contact entry, only the latest relationship between the Customer and its Contact is kept as an open relationship.
This is also supported by diepvries
:
from diepvries.deserializers.snowflake_deserializer import (
DatabaseConfiguration,
SnowflakeDeserializer,
)
from diepvries.driving_key_field import DrivingKeyField
def deserialize():
database_configuration = DatabaseConfiguration(
database="<DB>",
user="<USER>",
password="<PASSWORD>",
warehouse="<WAREHOUSE>",
account="<ACCOUNT>",
)
deserializer = SnowflakeDeserializer(
target_schema="dv",
target_tables=["l_foo_bar", "ls_foo_bar_eff"],
database_configuration=database_configuration,
driving_keys=[
DrivingKeyField(
name="h_foo_hashkey",
parent_table_name="l_foo_bar",
satellite_name="ls_foo_bar_eff",
)
],
)
print(deserializer.deserialized_target_tables)
print([x.name for x in deserializer.deserialized_target_tables])
if __name__ == "__main__":
deserialize()
% python3 effsat_deserializer.py
[
<diepvries.link.Link object at 0x7fd64db22520>,
<diepvries.effectivity_satellite.EffectivitySatellite object at 0x7fd64db22ac0>
]
['l_foo_bar', 'ls_foo_bar_eff']
Effectivity satellites can also be directly instantiated:
from datetime import datetime, timezone
from diepvries import FieldDataType
from diepvries.data_vault_load import DataVaultLoad
from diepvries.driving_key_field import DrivingKeyField
from diepvries.effectivity_satellite import EffectivitySatellite
from diepvries.field import Field
from diepvries.hub import Hub
from diepvries.satellite import Satellite
def prepare_sql():
# Instantiate structures.
hub_foo = Hub(...)
hub_bar = Hub(...)
satellite_foo_bar = Satellite(...)
eff_satellite_foo_bar = EffectivitySatellite(
schema="dv",
name="ls_foo_bar_eff",
driving_keys=[
DrivingKeyField(
name="h_foo_hashkey",
parent_table_name="ls_foo_bar",
satellite_name="ls_foo_eff",
)
],
fields=[
Field(
parent_table_name="ls_foo_bar_eff",
name="h_foo_hashkey",
data_type=FieldDataType.NUMBER,
position=1,
is_mandatory=True,
),
Field(
parent_table_name="ls_foo_bar_eff",
name="s_hashdiff",
data_type=FieldDataType.NUMBER,
position=2,
is_mandatory=True,
),
Field(
parent_table_name="ls_foo_bar_eff",
name="r_timestamp",
data_type=FieldDataType.NUMBER,
position=4,
is_mandatory=True,
),
Field(
parent_table_name="ls_foo_bar_eff",
name="r_timestamp_end",
data_type=FieldDataType.NUMBER,
position=5,
is_mandatory=True,
),
Field(
parent_table_name="ls_foo_bar_eff",
name="r_source",
data_type=FieldDataType.NUMBER,
position=3,
is_mandatory=True,
),
Field(
parent_table_name="ls_foo_bar_eff",
name="some_property",
data_type=FieldDataType.TEXT,
position=6,
is_mandatory=True,
),
],
)
# Prepare data load.
dv_load = DataVaultLoad(
extract_schema="dv_extract",
extract_table="foobar",
staging_schema="dv_staging",
staging_table="foobar",
extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
target_tables=[hub_foo, hub_bar, satellite_foo_bar, eff_satellite_foo_bar],
source="some_source",
)
# Show generated SQL statements.
for statement in dv_load.sql_load_script:
print(statement)
if __name__ == "__main__":
prepare_sql()