AWS Data Wrangler

6 - Amazon Athena

Wrangler has two ways to run queries on Athena and fetch the result as a DataFrame:

  • ctas_approach=True (Default)

    Wraps the query with a CTAS and then reads the table data as parquet directly from s3.

    • PROS:

      • Faster for mid and big result sizes.

      • Can handle some level of nested types.

    • CONS:

      • Requires create/delete table permissions on Glue.

      • Does not support timestamp with time zone

      • Does not support columns with repeated names.

      • Does not support columns with undefined data types.

      • A temporary table will be created and then deleted immediately.

      • Does not support custom data_source/catalog_id.

  • ctas_approach=False

    Does a regular query on Athena and parse the regular CSV result on s3.

    • PROS:

      • Faster for small result sizes (less latency).

      • Does not require create/delete table permissions on Glue

      • Supports timestamp with time zone.

      • Support custom data_source/catalog_id.

    • CONS:

      • Slower (But stills faster than other libraries that uses the regular Athena API)

      • Does not handle nested types at all.

[1]:
import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
 ···········································

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

Creating a Parquet Table from the NOAA’s CSV files

Reference

[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

df
[4]:
id dt element value m_flag q_flag s_flag obs_time
0 AGE00135039 1890-01-01 TMAX 160 NaN NaN E NaN
1 AGE00135039 1890-01-01 TMIN 30 NaN NaN E NaN
2 AGE00135039 1890-01-01 PRCP 45 NaN NaN E NaN
3 AGE00147705 1890-01-01 TMAX 140 NaN NaN E NaN
4 AGE00147705 1890-01-01 TMIN 74 NaN NaN E NaN
... ... ... ... ... ... ... ... ...
29240014 UZM00038457 1899-12-31 PRCP 16 NaN NaN r NaN
29240015 UZM00038457 1899-12-31 TAVG -73 NaN NaN r NaN
29240016 UZM00038618 1899-12-31 TMIN -76 NaN NaN r NaN
29240017 UZM00038618 1899-12-31 PRCP 0 NaN NaN r NaN
29240018 UZM00038618 1899-12-31 TAVG -60 NaN NaN r NaN

29240019 rows × 8 columns

[5]:
wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);
[6]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[6]:
Column Name Type Partition Comment
0 id string False
1 dt timestamp False
2 element string False
3 value bigint False
4 m_flag string False
5 q_flag string False
6 s_flag string False
7 obs_time string False

Reading with ctas_approach=False

[7]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
CPU times: user 8min 45s, sys: 6.52 s, total: 8min 51s
Wall time: 11min 3s
[7]:
id dt element value m_flag q_flag s_flag obs_time
0 AGE00135039 1890-01-01 TMAX 160 <NA> <NA> E <NA>
1 AGE00135039 1890-01-01 TMIN 30 <NA> <NA> E <NA>
2 AGE00135039 1890-01-01 PRCP 45 <NA> <NA> E <NA>
3 AGE00147705 1890-01-01 TMAX 140 <NA> <NA> E <NA>
4 AGE00147705 1890-01-01 TMIN 74 <NA> <NA> E <NA>
... ... ... ... ... ... ... ... ...
29240014 UZM00038457 1899-12-31 PRCP 16 <NA> <NA> r <NA>
29240015 UZM00038457 1899-12-31 TAVG -73 <NA> <NA> r <NA>
29240016 UZM00038618 1899-12-31 TMIN -76 <NA> <NA> r <NA>
29240017 UZM00038618 1899-12-31 PRCP 0 <NA> <NA> r <NA>
29240018 UZM00038618 1899-12-31 TAVG -60 <NA> <NA> r <NA>

29240019 rows × 8 columns

Default with ctas_approach=True - 13x faster (default)

[8]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
CPU times: user 28 s, sys: 6.07 s, total: 34.1 s
Wall time: 50.5 s
[8]:
id dt element value m_flag q_flag s_flag obs_time
0 ASN00017088 1890-06-11 PRCP 0 <NA> <NA> a <NA>
1 ASN00017087 1890-06-11 PRCP 0 <NA> <NA> a <NA>
2 ASN00017089 1890-06-11 PRCP 71 <NA> <NA> a <NA>
3 ASN00017095 1890-06-11 PRCP 0 <NA> <NA> a <NA>
4 ASN00017094 1890-06-11 PRCP 0 <NA> <NA> a <NA>
... ... ... ... ... ... ... ... ...
29240014 USC00461260 1899-12-31 SNOW 0 <NA> <NA> 6 <NA>
29240015 USC00461515 1899-12-31 TMAX -89 <NA> <NA> 6 <NA>
29240016 USC00461515 1899-12-31 TMIN -189 <NA> <NA> 6 <NA>
29240017 USC00461515 1899-12-31 PRCP 0 <NA> <NA> 6 <NA>
29240018 USC00461515 1899-12-31 SNOW 0 <NA> <NA> 6 <NA>

29240019 rows × 8 columns

Using categories to speed up and save memory - 24x faster

[9]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
CPU times: user 6.89 s, sys: 2.27 s, total: 9.16 s
Wall time: 27.3 s
[9]:
id dt element value m_flag q_flag s_flag obs_time
0 GME00102348 1890-08-03 TMAX 172 NaN NaN E NaN
1 GME00102348 1890-08-03 TMIN 117 NaN NaN E NaN
2 GME00102348 1890-08-03 PRCP 63 NaN NaN E NaN
3 GME00102348 1890-08-03 SNWD 0 NaN NaN E NaN
4 GME00121126 1890-08-03 PRCP 32 NaN NaN E NaN
... ... ... ... ... ... ... ... ...
29240014 USC00461260 1899-12-31 SNOW 0 NaN NaN 6 NaN
29240015 USC00461515 1899-12-31 TMAX -89 NaN NaN 6 NaN
29240016 USC00461515 1899-12-31 TMIN -189 NaN NaN 6 NaN
29240017 USC00461515 1899-12-31 PRCP 0 NaN NaN 6 NaN
29240018 USC00461515 1899-12-31 SNOW 0 NaN NaN 6 NaN

29240019 rows × 8 columns

Batching (Good for restricted memory environments)

[10]:
%%time

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=True  # Chunksize calculated automatically for ctas_approach.
)

for df in dfs:  # Batching
    print(len(df.index))
1024
8086528
1024
1024
1024
1024
1024
15360
1024
10090496
2153472
8886995
CPU times: user 22.7 s, sys: 5.41 s, total: 28.1 s
Wall time: 48 s
[11]:
%%time

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=100_000_000
)

for df in dfs:  # Batching
    print(len(df.index))
29240019
CPU times: user 34.8 s, sys: 8.54 s, total: 43.4 s
Wall time: 1min 1s

Cleaning Up S3

[12]:
wr.s3.delete_objects(path)

Delete table

[13]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa");

Delete Database

[14]:
wr.catalog.delete_database('awswrangler_test')