6 - Amazon Athena¶
awswrangler has three ways to run queries on Athena and fetch the result as a DataFrame:
ctas_approach=True (Default)
Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
CONS
:Requires create/delete table permissions on Glue.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
A temporary table will be created and then deleted immediately.
Does not support custom data_source/catalog_id.
unload_approach=True and ctas_approach=False
Does an UNLOAD query on Athena and parse the Parquet result on s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
Does not modify Glue Data Catalog.
CONS
:Output S3 path must be empty.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
ctas_approach=False
Does a regular query on Athena and parse the regular CSV result on s3.
PROS
:Faster for small result sizes (less latency).
Does not require create/delete table permissions on Glue
Supports timestamp with time zone.
Support custom data_source/catalog_id.
CONS
:Slower (But stills faster than other libraries that uses the regular Athena API)
Does not handle nested types at all.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[ ]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/by_year/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[ ]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
)
[ ]:
wr.catalog.table(database="awswrangler_test", table="noaa")
Reading with ctas_approach=False¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
Default with ctas_approach=True - 13x faster (default)¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
Using categories to speed up and save memory - 24x faster¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
Reading with unload_approach=True¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/")
Batching (Good for restricted memory environments)¶
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=True # Chunksize calculated automatically for ctas_approach.
)
for df in dfs: # Batching
print(len(df.index))
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=100_000_000
)
for df in dfs: # Batching
print(len(df.index))
Cleaning Up S3¶
[ ]:
wr.s3.delete_objects(path)
Delete table¶
[ ]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
Delete Database¶
[ ]:
wr.catalog.delete_database('awswrangler_test')