AWS Data Wrangler

6 - Amazon Athena

Wrangler has three ways to run queries on Athena and fetch the result as a DataFrame:

  • ctas_approach=True (Default)

    Wraps the query with a CTAS and then reads the table data as parquet directly from s3.

    • PROS:

      • Faster for mid and big result sizes.

      • Can handle some level of nested types.

    • CONS:

      • Requires create/delete table permissions on Glue.

      • Does not support timestamp with time zone

      • Does not support columns with repeated names.

      • Does not support columns with undefined data types.

      • A temporary table will be created and then deleted immediately.

      • Does not support custom data_source/catalog_id.

  • unload_approach=True and ctas_approach=False

    Does an UNLOAD query on Athena and parse the Parquet result on s3.

    • PROS:

      • Faster for mid and big result sizes.

      • Can handle some level of nested types.

      • Does not modify Glue Data Catalog.

    • CONS:

      • Output S3 path must be empty.

      • Does not support timestamp with time zone

      • Does not support columns with repeated names.

      • Does not support columns with undefined data types.

      • Does not support custom data_source/catalog_id.

  • ctas_approach=False

    Does a regular query on Athena and parse the regular CSV result on s3.

    • PROS:

      • Faster for small result sizes (less latency).

      • Does not require create/delete table permissions on Glue

      • Supports timestamp with time zone.

      • Support custom data_source/catalog_id.

    • CONS:

      • Slower (But stills faster than other libraries that uses the regular Athena API)

      • Does not handle nested types at all.

[1]:
import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
# bucket = getpass.getpass()
bucket = "antonkuk-us-east-1"
path = f"s3://{bucket}/data/"

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

Creating a Parquet Table from the NOAA’s CSV files

Reference

[ ]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

df
[ ]:
wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);
[ ]:
wr.catalog.table(database="awswrangler_test", table="noaa")

Reading with ctas_approach=False

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)

Default with ctas_approach=True - 13x faster (default)

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")

Using categories to speed up and save memory - 24x faster

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])

Reading with unload_approach=True

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/")

Batching (Good for restricted memory environments)

[ ]:
%%time

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=True  # Chunksize calculated automatically for ctas_approach.
)

for df in dfs:  # Batching
    print(len(df.index))
[ ]:
%%time

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=100_000_000
)

for df in dfs:  # Batching
    print(len(df.index))

Cleaning Up S3

[ ]:
wr.s3.delete_objects(path)

Delete table

[ ]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa");

Delete Database

[ ]:
wr.catalog.delete_database('awswrangler_test')