6 - Amazon Athena¶
Wrangler has three ways to run queries on Athena and fetch the result as a DataFrame:
ctas_approach=True (Default)
Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
CONS
:Requires create/delete table permissions on Glue.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
A temporary table will be created and then deleted immediately.
Does not support custom data_source/catalog_id.
unload_approach=True and ctas_approach=False
Does an UNLOAD query on Athena and parse the Parquet result on s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
Does not modify Glue Data Catalog.
CONS
:Output S3 path must be empty.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
Does not support custom data_source/catalog_id.
ctas_approach=False
Does a regular query on Athena and parse the regular CSV result on s3.
PROS
:Faster for small result sizes (less latency).
Does not require create/delete table permissions on Glue
Supports timestamp with time zone.
Support custom data_source/catalog_id.
CONS
:Slower (But stills faster than other libraries that uses the regular Athena API)
Does not handle nested types at all.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
# bucket = getpass.getpass()
bucket = "antonkuk-us-east-1"
path = f"s3://{bucket}/data/"
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[ ]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[ ]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
);
[ ]:
wr.catalog.table(database="awswrangler_test", table="noaa")
Reading with ctas_approach=False¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
Default with ctas_approach=True - 13x faster (default)¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
Using categories to speed up and save memory - 24x faster¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
Reading with unload_approach=True¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/")
Batching (Good for restricted memory environments)¶
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=True # Chunksize calculated automatically for ctas_approach.
)
for df in dfs: # Batching
print(len(df.index))
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=100_000_000
)
for df in dfs: # Batching
print(len(df.index))
Cleaning Up S3¶
[ ]:
wr.s3.delete_objects(path)
Delete table¶
[ ]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa");
Delete Database¶
[ ]:
wr.catalog.delete_database('awswrangler_test')