Advanced topics

Using the Snowflake deserializer

diepvries is bundled with a deserializer: it is able to extract metadata from Snowflake in order to automatically instantiate Data Vault entities. As long as tables follow the naming conventions, Hub, Satellite and Link objects can be created automatically.

When using the deserializer, a connection to your database will be initiated, in order to query the metadata tables. This will provide diepvries with all the information required to automatically create Data Vault entity objects.

Here’s how to use the deserializer:

from diepvries.deserializers.snowflake_deserializer import (
    DatabaseConfiguration,
    SnowflakeDeserializer,
)


def deserialize():
    database_configuration = DatabaseConfiguration(
        database="<DB>",
        user="<USER>",
        password="<PASSWORD>",
        warehouse="<WAREHOUSE>",
        account="<ACCOUNT>",
    )

    deserializer = SnowflakeDeserializer(
        target_schema="dv",
        target_tables=["h_customer", "hs_customer"],
        database_configuration=database_configuration,
    )

    print(deserializer.deserialized_target_tables)
    print([x.name for x in deserializer.deserialized_target_tables])


if __name__ == "__main__":
    deserialize()
% python3 deserializer.py
[
  <diepvries.hub.Hub object at 0x7fbe3cc5baf0>,
  <diepvries.satellite.Satellite object at 0x7fbe3cc5be20>
]
['h_customer', 'hs_customer']

The main advantage of the deserializer is DRY: your Data Vault DDLs can live anywhere, stored in your favourite format, and diepvries will extrapolate this information, so you don’t have to describe the tables in Python.

Not using the deserializer

It is also possible to use diepvries without the deserializer, if you prefer to “manually” configure it. For example, you might have your Data Vault tables configuration stored in JSON or YAML, and you could use those files to instantiate Hubs, Links and Satellites. Another potential reason is if you don’t have access to your database instance when diepvries is running, maybe you only want to generate SQL and then forward the queries to another component of your system.

In that case, you can instantiate everything manually:

from datetime import datetime, timezone

from diepvries import FieldDataType
from diepvries.data_vault_load import DataVaultLoad
from diepvries.field import Field
from diepvries.hub import Hub
from diepvries.satellite import Satellite


def prepare_sql():
    # Instantiate structures.
    hub_foo = Hub(
        schema="dv",
        name="h_foo",
        fields=[
            Field(
                parent_table_name="h_foo",
                name="h_foo_hashkey",
                data_type=FieldDataType.NUMBER,
                position=1,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_foo",
                name="r_source",
                data_type=FieldDataType.NUMBER,
                position=2,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_foo",
                name="r_timestamp",
                data_type=FieldDataType.NUMBER,
                position=3,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_foo",
                name="foo_id",
                data_type=FieldDataType.TEXT,
                position=4,
                is_mandatory=True,
            ),
        ],
    )

    hub_satellite_foo = Satellite(
        schema="dv",
        name="hs_foo",
        fields=[
            Field(
                parent_table_name="hs_foo",
                name="h_foo_hashkey",
                data_type=FieldDataType.NUMBER,
                position=1,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="hs_foo",
                name="s_hashdiff",
                data_type=FieldDataType.NUMBER,
                position=2,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="hs_foo",
                name="r_timestamp",
                data_type=FieldDataType.NUMBER,
                position=4,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="hs_foo",
                name="r_timestamp_end",
                data_type=FieldDataType.NUMBER,
                position=5,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="hs_foo",
                name="r_source",
                data_type=FieldDataType.NUMBER,
                position=3,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="hs_foo",
                name="some_foo_property",
                data_type=FieldDataType.TEXT,
                position=6,
                is_mandatory=True,
            ),
        ],
    )

    # Prepare data load.
    dv_load = DataVaultLoad(
        extract_schema="dv_extract",
        extract_table="foobar",
        staging_schema="dv_staging",
        staging_table="foobar",
        extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
        target_tables=[hub_foo, hub_satellite_foo],
        source="some_source",
    )

    # Show generated SQL statements.
    for statement in dv_load.sql_load_script:
        print(statement)


if __name__ == "__main__":
    prepare_sql()

Role-playing hubs

Role-playing hubs are hubs not materialized as tables, but rather as views pointing to a main hub. They are useful to model different roles around the same concept. For example, on top of the account hub h_account, different role-playing hubs could be h_account_supplier and h_account_transporter, implemented as views pointing to h_account.

diepvries can work with role-playing hubs. The deserializer supports them:

from diepvries.deserializers.snowflake_deserializer import (
    DatabaseConfiguration,
    SnowflakeDeserializer,
)


def deserialize():
    database_configuration = DatabaseConfiguration(
        database="<DB>",
        user="<USER>",
        password="<PASSWORD>",
        warehouse="<WAREHOUSE>",
        account="<ACCOUNT>",
    )

    deserializer = SnowflakeDeserializer(
        target_schema="dv",
        target_tables=["h_account", "h_account_supplier", "h_account_transporter"],
        database_configuration=database_configuration,
        role_playing_hubs={
            "h_account_supplier": "h_account",
            "h_account_transporter": "h_account",
        },
    )

    print(deserializer.deserialized_target_tables)
    print([x.name for x in deserializer.deserialized_target_tables])


if __name__ == "__main__":
    deserialize()
% python3 rph_deserializer.py
[
  <diepvries.hub.Hub object at 0x7fb83c455250>,
  <diepvries.role_playing_hub.RolePlayingHub object at 0x7fb83e7ae700>,
  <diepvries.role_playing_hub.RolePlayingHub object at 0x7fb83e7ae640>
]
['h_account', 'h_account_supplier', 'h_account_transporter']

Role-playing hubs can also be directly instantiated:

from datetime import datetime, timezone

from diepvries import FieldDataType
from diepvries.data_vault_load import DataVaultLoad
from diepvries.field import Field
from diepvries.hub import Hub
from diepvries.role_playing_hub import RolePlayingHub
from diepvries.satellite import Satellite


def prepare_sql():
    # Instantiate structures.
    hub_account = Hub(
        schema="dv",
        name="h_account",
        fields=[
            Field(
                parent_table_name="h_account",
                name="h_account_hashkey",
                data_type=FieldDataType.NUMBER,
                position=1,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_account",
                name="r_source",
                data_type=FieldDataType.NUMBER,
                position=2,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_account",
                name="r_timestamp",
                data_type=FieldDataType.NUMBER,
                position=3,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_account",
                name="account_id",
                data_type=FieldDataType.TEXT,
                position=4,
                is_mandatory=True,
            ),
        ],
    )

    role_playing_hub_account_supplier = RolePlayingHub(
        schema="dv",
        name="h_account_supplier",
        fields=[
            Field(
                parent_table_name="h_account_supplier",
                name="h_account_supplier_hashkey",
                data_type=FieldDataType.NUMBER,
                position=1,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_account_supplier",
                name="r_source",
                data_type=FieldDataType.NUMBER,
                position=2,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_account_supplier",
                name="r_timestamp",
                data_type=FieldDataType.NUMBER,
                position=3,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="h_account_supplier",
                name="account_id",
                data_type=FieldDataType.TEXT,
                position=4,
                is_mandatory=True,
            ),
        ],
    )
    role_playing_hub_account_supplier.parent_table = hub_account

    # Prepare data load.
    dv_load = DataVaultLoad(
        extract_schema="dv_extract",
        extract_table="foobar",
        staging_schema="dv_staging",
        staging_table="foobar",
        extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
        target_tables=[hub_account, role_playing_hub_account_supplier],
        source="some_source",
    )

    # Show generated SQL statements.
    for statement in dv_load.sql_load_script:
        print(statement)


if __name__ == "__main__":
    prepare_sql()

Effectivity satellites

Effectivity satellites model links which can change over time. For example, imagine a link between a Customer and a Contact entry. A Customer can have multiple Contact entries, however it could be required to keep only one Contact entry per Customer open at a given point in time. This means that, if a Customer changes its Contact entry, only the latest relationship between the Customer and its Contact is kept as an open relationship.

This is also supported by diepvries:

from diepvries.deserializers.snowflake_deserializer import (
    DatabaseConfiguration,
    SnowflakeDeserializer,
)
from diepvries.driving_key_field import DrivingKeyField


def deserialize():
    database_configuration = DatabaseConfiguration(
        database="<DB>",
        user="<USER>",
        password="<PASSWORD>",
        warehouse="<WAREHOUSE>",
        account="<ACCOUNT>",
    )

    deserializer = SnowflakeDeserializer(
        target_schema="dv",
        target_tables=["l_foo_bar", "ls_foo_bar_eff"],
        database_configuration=database_configuration,
        driving_keys=[
            DrivingKeyField(
                name="h_foo_hashkey",
                parent_table_name="l_foo_bar",
                satellite_name="ls_foo_bar_eff",
            )
        ],
    )

    print(deserializer.deserialized_target_tables)
    print([x.name for x in deserializer.deserialized_target_tables])


if __name__ == "__main__":
    deserialize()
% python3 effsat_deserializer.py
[
  <diepvries.link.Link object at 0x7fd64db22520>,
  <diepvries.effectivity_satellite.EffectivitySatellite object at 0x7fd64db22ac0>
]
['l_foo_bar', 'ls_foo_bar_eff']

Effectivity satellites can also be directly instantiated:

from datetime import datetime, timezone

from diepvries import FieldDataType
from diepvries.data_vault_load import DataVaultLoad
from diepvries.driving_key_field import DrivingKeyField
from diepvries.effectivity_satellite import EffectivitySatellite
from diepvries.field import Field
from diepvries.hub import Hub
from diepvries.satellite import Satellite


def prepare_sql():
    # Instantiate structures.
    hub_foo = Hub(...)
    hub_bar = Hub(...)
    satellite_foo_bar = Satellite(...)

    eff_satellite_foo_bar = EffectivitySatellite(
        schema="dv",
        name="ls_foo_bar_eff",
        driving_keys=[
            DrivingKeyField(
                name="h_foo_hashkey",
                parent_table_name="ls_foo_bar",
                satellite_name="ls_foo_eff",
            )
        ],
        fields=[
            Field(
                parent_table_name="ls_foo_bar_eff",
                name="h_foo_hashkey",
                data_type=FieldDataType.NUMBER,
                position=1,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="ls_foo_bar_eff",
                name="s_hashdiff",
                data_type=FieldDataType.NUMBER,
                position=2,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="ls_foo_bar_eff",
                name="r_timestamp",
                data_type=FieldDataType.NUMBER,
                position=4,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="ls_foo_bar_eff",
                name="r_timestamp_end",
                data_type=FieldDataType.NUMBER,
                position=5,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="ls_foo_bar_eff",
                name="r_source",
                data_type=FieldDataType.NUMBER,
                position=3,
                is_mandatory=True,
            ),
            Field(
                parent_table_name="ls_foo_bar_eff",
                name="some_property",
                data_type=FieldDataType.TEXT,
                position=6,
                is_mandatory=True,
            ),
        ],
    )

    # Prepare data load.
    dv_load = DataVaultLoad(
        extract_schema="dv_extract",
        extract_table="foobar",
        staging_schema="dv_staging",
        staging_table="foobar",
        extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
        target_tables=[hub_foo, hub_bar, satellite_foo_bar, eff_satellite_foo_bar],
        source="some_source",
    )

    # Show generated SQL statements.
    for statement in dv_load.sql_load_script:
        print(statement)


if __name__ == "__main__":
    prepare_sql()