6 - Amazon Athena¶
Wrangler has two ways to run queries on Athena and fetch the result as a DataFrame:
ctas_approach=True (Default)
Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
CONS
:Requires create/delete table permissions on Glue.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
A temporary table will be created and then deleted immediately.
Does not support custom data_source/catalog_id.
ctas_approach=False
Does a regular query on Athena and parse the regular CSV result on s3.
PROS
:Faster for small result sizes (less latency).
Does not require create/delete table permissions on Glue
Supports timestamp with time zone.
Support custom data_source/catalog_id.
CONS
:Slower (But stills faster than other libraries that uses the regular Athena API)
Does not handle nested types at all.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
···········································
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | UZM00038457 | 1899-12-31 | PRCP | 16 | NaN | NaN | r | NaN |
29240015 | UZM00038457 | 1899-12-31 | TAVG | -73 | NaN | NaN | r | NaN |
29240016 | UZM00038618 | 1899-12-31 | TMIN | -76 | NaN | NaN | r | NaN |
29240017 | UZM00038618 | 1899-12-31 | PRCP | 0 | NaN | NaN | r | NaN |
29240018 | UZM00038618 | 1899-12-31 | TAVG | -60 | NaN | NaN | r | NaN |
29240019 rows × 8 columns
[5]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
);
[6]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[6]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | string | False | |
1 | dt | timestamp | False | |
2 | element | string | False | |
3 | value | bigint | False | |
4 | m_flag | string | False | |
5 | q_flag | string | False | |
6 | s_flag | string | False | |
7 | obs_time | string | False |
Reading with ctas_approach=False¶
[7]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
CPU times: user 8min 45s, sys: 6.52 s, total: 8min 51s
Wall time: 11min 3s
[7]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | <NA> | <NA> | E | <NA> |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | <NA> | <NA> | E | <NA> |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | <NA> | <NA> | E | <NA> |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | <NA> | <NA> | E | <NA> |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | <NA> | <NA> | E | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | UZM00038457 | 1899-12-31 | PRCP | 16 | <NA> | <NA> | r | <NA> |
29240015 | UZM00038457 | 1899-12-31 | TAVG | -73 | <NA> | <NA> | r | <NA> |
29240016 | UZM00038618 | 1899-12-31 | TMIN | -76 | <NA> | <NA> | r | <NA> |
29240017 | UZM00038618 | 1899-12-31 | PRCP | 0 | <NA> | <NA> | r | <NA> |
29240018 | UZM00038618 | 1899-12-31 | TAVG | -60 | <NA> | <NA> | r | <NA> |
29240019 rows × 8 columns
Default with ctas_approach=True - 13x faster (default)¶
[8]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
CPU times: user 28 s, sys: 6.07 s, total: 34.1 s
Wall time: 50.5 s
[8]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | ASN00017088 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
1 | ASN00017087 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
2 | ASN00017089 | 1890-06-11 | PRCP | 71 | <NA> | <NA> | a | <NA> |
3 | ASN00017095 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
4 | ASN00017094 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | USC00461260 | 1899-12-31 | SNOW | 0 | <NA> | <NA> | 6 | <NA> |
29240015 | USC00461515 | 1899-12-31 | TMAX | -89 | <NA> | <NA> | 6 | <NA> |
29240016 | USC00461515 | 1899-12-31 | TMIN | -189 | <NA> | <NA> | 6 | <NA> |
29240017 | USC00461515 | 1899-12-31 | PRCP | 0 | <NA> | <NA> | 6 | <NA> |
29240018 | USC00461515 | 1899-12-31 | SNOW | 0 | <NA> | <NA> | 6 | <NA> |
29240019 rows × 8 columns
Using categories to speed up and save memory - 24x faster¶
[9]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
CPU times: user 6.89 s, sys: 2.27 s, total: 9.16 s
Wall time: 27.3 s
[9]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | GME00102348 | 1890-08-03 | TMAX | 172 | NaN | NaN | E | NaN |
1 | GME00102348 | 1890-08-03 | TMIN | 117 | NaN | NaN | E | NaN |
2 | GME00102348 | 1890-08-03 | PRCP | 63 | NaN | NaN | E | NaN |
3 | GME00102348 | 1890-08-03 | SNWD | 0 | NaN | NaN | E | NaN |
4 | GME00121126 | 1890-08-03 | PRCP | 32 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | USC00461260 | 1899-12-31 | SNOW | 0 | NaN | NaN | 6 | NaN |
29240015 | USC00461515 | 1899-12-31 | TMAX | -89 | NaN | NaN | 6 | NaN |
29240016 | USC00461515 | 1899-12-31 | TMIN | -189 | NaN | NaN | 6 | NaN |
29240017 | USC00461515 | 1899-12-31 | PRCP | 0 | NaN | NaN | 6 | NaN |
29240018 | USC00461515 | 1899-12-31 | SNOW | 0 | NaN | NaN | 6 | NaN |
29240019 rows × 8 columns
Batching (Good for restricted memory environments)¶
[10]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=True # Chunksize calculated automatically for ctas_approach.
)
for df in dfs: # Batching
print(len(df.index))
1024
8086528
1024
1024
1024
1024
1024
15360
1024
10090496
2153472
8886995
CPU times: user 22.7 s, sys: 5.41 s, total: 28.1 s
Wall time: 48 s
[11]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=100_000_000
)
for df in dfs: # Batching
print(len(df.index))
29240019
CPU times: user 34.8 s, sys: 8.54 s, total: 43.4 s
Wall time: 1min 1s
Cleaning Up S3¶
[12]:
wr.s3.delete_objects(path)
Delete table¶
[13]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa");
Delete Database¶
[14]:
wr.catalog.delete_database('awswrangler_test')