AWS Data Wrangler

32 - AWS Lake Formation - Glue Governed tables

This tutorial assumes that your IAM user/role has the required Lake Formation permissions to create and read AWS Glue Governed tables

Table of Contents

1. Read Governed table

1.1 Read PartiQL query

[ ]:
import awswrangler as wr

database = "gov_db"  # Assumes a Glue database registered with Lake Formation exists in the account
table = "gov_table"  # Assumes a Governed table exists in the account
catalog_id = "111111111111"  # AWS Account Id

# Note 1: If a transaction_id is not specified, a new transaction is started
df = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {table};",
    database=database,
    catalog_id=catalog_id
)

1.1.1 Read within transaction

[ ]:
transaction_id = wr.lakeformation.start_transaction(read_only=True)
df = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {table};",
    database=database,
    transaction_id=transaction_id
)

1.1.2 Read within query as of time

[ ]:
import calendar
import time

query_as_of_time = query_as_of_time = calendar.timegm(time.gmtime())
df = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {table} WHERE id=:id; AND name=:name;",
    database=database,
    query_as_of_time=query_as_of_time,
    params={"id": 1, "name": "Ayoub"}
)

1.2 Read full table

[ ]:
df = wr.lakeformation.read_sql_table(
    table=table,
    database=database
)

2. Write Governed table

2.1 Create a new Governed table

Enter your bucket name:

[ ]:
import getpass

bucket = getpass.getpass()

If a governed table does not exist, it can be created by passing an S3 path argument. Make sure your IAM user/role has enough permissions in the Lake Formation database

2.1.1 CSV table

[ ]:
import pandas as pd

table = "gov_table_csv"

df=pd.DataFrame({
    "col": [1, 2, 3],
    "col2": ["A", "A", "B"],
    "col3": [None, "test", None]
})
# Note 1: If a transaction_id is not specified, a new transaction is started
# Note 2: When creating a new Governed table, `table_type="GOVERNED"` must be specified. Otherwise the default is to create an EXTERNAL_TABLE
wr.s3.to_csv(
    df=df,
    path=f"s3://{bucket}/{database}/{table}/",  # S3 path
    dataset=True,
    database=database,
    table=table,
    table_type="GOVERNED"
)

2.1.2 Parquet table

[ ]:
table = "gov_table_parquet"

df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/{database}/{table}/",
    dataset=True,
    database=database,
    table=table,
    table_type="GOVERNED",
    description="c0",
    parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
    columns_comments={"c0": "0"}
)

2.2 Overwrite operations

2.2.1 Overwrite

[ ]:
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
    df=df,
    dataset=True,
    mode="overwrite",
    database=database,
    table=table,
    description="c1",
    parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
    columns_comments={"c1": "1"}
)

2.2.2 Append

[ ]:
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int8")
wr.s3.to_parquet(
    df=df,
    dataset=True,
    mode="append",
    database=database,
    table=table,
    description="c1",
    parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)},
    columns_comments={"c1": "1"}
)

2.2.3 Create partitioned Governed table

[ ]:
table = "gov_table_parquet_partitioned"

df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/{database}/{table}/",
    dataset=True,
    database=database,
    table=table,
    table_type="GOVERNED",
    partition_cols=["c1"],
    description="c0+c1",
    parameters={"num_cols": "2", "num_rows": "2"},
    columns_comments={"c0": "zero", "c1": "one"}
)

2.2.4 Overwrite partitions

[ ]:
df = pd.DataFrame({"c0": [None, None], "c1": [0, 2]})
wr.s3.to_parquet(
    df=df,
    dataset=True,
    mode="overwrite_partitions",
    database=database,
    table=table,
    partition_cols=["c1"],
    description="c0+c1",
    parameters={"num_cols": "2", "num_rows": "3"},
    columns_comments={"c0": "zero", "c1": "one"}
)

3. Multiple read/write operations within a transaction

[ ]:
read_table = "gov_table_parquet"
write_table = "gov_table_multi_parquet"

transaction_id = wr.lakeformation.start_transaction(read_only=False)

df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/{database}/{write_table}_1",
    dataset=True,
    database=database,
    table=f"{write_table}_1",
    table_type="GOVERNED",
    transaction_id=transaction_id,
)

df2 = wr.lakeformation.read_sql_table(
    table=read_table,
    database=database,
    transaction_id=transaction_id,
    use_threads=True
)

df3 = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
    df=df2,
    path=f"s3://{bucket}/{database}/{write_table}_2",
    dataset=True,
    mode="append",
    database=database,
    table=f"{write_table}_2",
    table_type="GOVERNED",
    transaction_id=transaction_id,
)

wr.lakeformation.commit_transaction(transaction_id=transaction_id)