32 - AWS Lake Formation - Glue Governed tables¶
This tutorial assumes that your IAM user/role has the required Lake Formation permissions to create and read AWS Glue Governed tables¶
Table of Contents¶
1. Read Governed table¶
1.1 Read PartiQL query¶
[ ]:
import awswrangler as wr
database = "gov_db" # Assumes a Glue database registered with Lake Formation exists in the account
table = "gov_table" # Assumes a Governed table exists in the account
catalog_id = "111111111111" # AWS Account Id
# Note 1: If a transaction_id is not specified, a new transaction is started
df = wr.lakeformation.read_sql_query(
sql=f"SELECT * FROM {table};",
database=database,
catalog_id=catalog_id
)
1.1.1 Read within transaction¶
[ ]:
transaction_id = wr.lakeformation.start_transaction(read_only=True)
df = wr.lakeformation.read_sql_query(
sql=f"SELECT * FROM {table};",
database=database,
transaction_id=transaction_id
)
1.1.2 Read within query as of time¶
[ ]:
import calendar
import time
query_as_of_time = query_as_of_time = calendar.timegm(time.gmtime())
df = wr.lakeformation.read_sql_query(
sql=f"SELECT * FROM {table} WHERE id=:id; AND name=:name;",
database=database,
query_as_of_time=query_as_of_time,
params={"id": 1, "name": "Ayoub"}
)
1.2 Read full table¶
[ ]:
df = wr.lakeformation.read_sql_table(
table=table,
database=database
)
2. Write Governed table¶
2.1 Create a new Governed table¶
Enter your bucket name:¶
[ ]:
import getpass
bucket = getpass.getpass()
If a governed table does not exist, it can be created by passing an S3 path
argument. Make sure your IAM user/role has enough permissions in the Lake Formation database
2.1.1 CSV table¶
[ ]:
import pandas as pd
table = "gov_table_csv"
df=pd.DataFrame({
"col": [1, 2, 3],
"col2": ["A", "A", "B"],
"col3": [None, "test", None]
})
# Note 1: If a transaction_id is not specified, a new transaction is started
# Note 2: When creating a new Governed table, `table_type="GOVERNED"` must be specified. Otherwise the default is to create an EXTERNAL_TABLE
wr.s3.to_csv(
df=df,
path=f"s3://{bucket}/{database}/{table}/", # S3 path
dataset=True,
database=database,
table=table,
table_type="GOVERNED"
)
2.1.2 Parquet table¶
[ ]:
table = "gov_table_parquet"
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/{database}/{table}/",
dataset=True,
database=database,
table=table,
table_type="GOVERNED",
description="c0",
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
columns_comments={"c0": "0"}
)
2.2 Overwrite operations¶
2.2.1 Overwrite¶
[ ]:
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
df=df,
dataset=True,
mode="overwrite",
database=database,
table=table,
description="c1",
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
columns_comments={"c1": "1"}
)
2.2.2 Append¶
[ ]:
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int8")
wr.s3.to_parquet(
df=df,
dataset=True,
mode="append",
database=database,
table=table,
description="c1",
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)},
columns_comments={"c1": "1"}
)
2.2.3 Create partitioned Governed table¶
[ ]:
table = "gov_table_parquet_partitioned"
df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/{database}/{table}/",
dataset=True,
database=database,
table=table,
table_type="GOVERNED",
partition_cols=["c1"],
description="c0+c1",
parameters={"num_cols": "2", "num_rows": "2"},
columns_comments={"c0": "zero", "c1": "one"}
)
2.2.4 Overwrite partitions¶
[ ]:
df = pd.DataFrame({"c0": [None, None], "c1": [0, 2]})
wr.s3.to_parquet(
df=df,
dataset=True,
mode="overwrite_partitions",
database=database,
table=table,
partition_cols=["c1"],
description="c0+c1",
parameters={"num_cols": "2", "num_rows": "3"},
columns_comments={"c0": "zero", "c1": "one"}
)
3. Multiple read/write operations within a transaction¶
[ ]:
read_table = "gov_table_parquet"
write_table = "gov_table_multi_parquet"
transaction_id = wr.lakeformation.start_transaction(read_only=False)
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/{database}/{write_table}_1",
dataset=True,
database=database,
table=f"{write_table}_1",
table_type="GOVERNED",
transaction_id=transaction_id,
)
df2 = wr.lakeformation.read_sql_table(
table=read_table,
database=database,
transaction_id=transaction_id,
use_threads=True
)
df3 = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
df=df2,
path=f"s3://{bucket}/{database}/{write_table}_2",
dataset=True,
mode="append",
database=database,
table=f"{write_table}_2",
table_type="GOVERNED",
transaction_id=transaction_id,
)
wr.lakeformation.commit_transaction(transaction_id=transaction_id)