An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com
Quick Start¶
>>> pip install awswrangler
import awswrangler as wr
import pandas as pd
from datetime import datetime
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
# Storing data on Data Lake
wr.s3.to_parquet(
df=df,
path="s3://bucket/dataset/",
dataset=True,
database="my_db",
table="my_table"
)
# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)
# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")
# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()
# Amazon Timestream Write
df = pd.DataFrame({
"time": [datetime.now(), datetime.now()],
"my_dimension": ["foo", "boo"],
"measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="measure",
dimensions_cols=["my_dimension"],
)
# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")
Read The Docs¶
What is AWS Data Wrangler?¶
An AWS Professional Service open source python initiative that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services.
Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.
Check our tutorials or the list of functionalities.
Install¶
AWS Data Wrangler runs with Python 3.6
, 3.7
, 3.8
and 3.9
and on several platforms (AWS Lambda, AWS Glue Python Shell, EMR, EC2,
on-premises, Amazon SageMaker, local, etc).
- Some good practices for most of the methods bellow are:
Use new and individual Virtual Environments for each project (venv).
On Notebooks, always restart your kernel after installations.
Note
If you want to use awswrangler
for connecting to Microsoft SQL Server, some additional configuration is needed. Please have a look at the corresponding section below.
PyPI (pip)¶
>>> pip install awswrangler
Conda¶
>>> conda install -c conda-forge awswrangler
AWS Lambda Layer¶
1 - Go to GitHub’s release section and download the layer zip related to the desired version.
2 - Go to the AWS Lambda Panel, open the layer section (left side) and click create layer.
3 - Set name and python version, upload your fresh downloaded zip file and press create to create the layer.
4 - Go to your Lambda and select your new layer!
AWS Glue Python Shell Jobs¶
1 - Go to GitHub’s release page and download the wheel file (.whl) related to the desired version.
2 - Upload the wheel file to any Amazon S3 location.
3 - Go to your Glue Python Shell job and point to the wheel file on S3 in the Python library path field.
AWS Glue PySpark Jobs¶
Note
AWS Data Wrangler has compiled dependencies (C/C++) so there is only support for Glue PySpark Jobs >= 2.0
.
Go to your Glue PySpark job and create a new Job parameters key/value:
Key:
--additional-python-modules
Value:
pyarrow==2,awswrangler
To install a specific version, set the value for above Job parameter as follows:
Value:
pyarrow==2,awswrangler==2.7.0
Note
Pyarrow 3 is not currently supported in Glue PySpark Jobs, which is why a previous installation of pyarrow 2 is required.
Amazon SageMaker Notebook¶
Run this command in any Python 3 notebook paragraph and then make sure to restart the kernel before import the awswrangler package.
>>> !pip install awswrangler
Amazon SageMaker Notebook Lifecycle¶
Open SageMaker console, go to the lifecycle section and use the follow snippet to configure AWS Data Wrangler for all compatible SageMaker kernels (Reference).
#!/bin/bash
set -e
# OVERVIEW
# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which
# is a system environment reserved for Jupyter.
# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using
# "nohup" to run this as a background process in that case.
sudo -u ec2-user -i <<'EOF'
# PARAMETERS
PACKAGE=awswrangler
# Note that "base" is special environment name, include it there as well.
for env in base /home/ec2-user/anaconda3/envs/*; do
source /home/ec2-user/anaconda3/bin/activate $(basename "$env")
if [ $env = 'JupyterSystemEnv' ]; then
continue
fi
nohup pip install --upgrade "$PACKAGE" &
source /home/ec2-user/anaconda3/bin/deactivate
done
EOF
EMR Cluster¶
Even not being a distributed library, AWS Data Wrangler could be a good helper to complement Big Data pipelines.
Configure Python 3 as the default interpreter for PySpark on your cluster configuration [ONLY REQUIRED FOR EMR < 6]
[ { "Classification": "spark-env", "Configurations": [ { "Classification": "export", "Properties": { "PYSPARK_PYTHON": "/usr/bin/python3" } } ] } ]
Keep the bootstrap script above on S3 and reference it on your cluster.
For EMR Release < 6
#!/usr/bin/env bash set -ex sudo pip-3.6 install pyarrow==2 awswrangler
For EMR Release >= 6
#!/usr/bin/env bash set -ex sudo pip install pyarrow==2 awswrangler
Note
Make sure to freeze the Wrangler version in the bootstrap for productive environments (e.g. awswrangler==2.7.0)
Note
Pyarrow 3 is not currently supported in the default EMR image, which is why a previous installation of pyarrow 2 is required.
From Source¶
>>> git clone https://github.com/awslabs/aws-data-wrangler.git
>>> cd aws-data-wrangler
>>> pip install .
Notes for Microsoft SQL Server¶
awswrangler
is using the pyodbc
for interacting with Microsoft SQL Server. For installing this package you need the ODBC header files,
which can be installed, for example, with the following commands:
>>> sudo apt install unixodbc-dev
>>> yum install unixODBC-devel
After installing these header files you can either just install pyodbc
or
awswrangler
with the sqlserver
extra, which will also install pyodbc
:
>>> pip install pyodbc
>>> pip install awswrangler[sqlserver]
Finally you also need the correct ODBC Driver for SQL Server. You can have a look at the documentation from Microsoft to see how they can be installed in your environment.
If you want to connect to Microsoft SQL Server from AWS Lambda, you can build a separate Layer including the needed OBDC drivers and pyobdc.
If you maintain your own environment, you need to take care of the above steps. Because of this limitation usage in combination with Glue jobs is limited and you need to rely on the provided functionality inside Glue itself.
Tutorials¶
Note
You can also find all Tutorial Notebooks on GitHub.
1 - Introduction¶
What is AWS Data Wrangler?¶
An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon Timestream, Amazon EMR, etc).
Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.
Check our list of functionalities.
How to install?¶
The Wrangler runs almost anywhere over Python 3.6, 3.7, 3.8 and 3.9, so there are several different ways to install it in the desired enviroment.
Some good practices for most of the above methods are: - Use new and individual Virtual Environments for each project (venv) - On Notebooks, always restart your kernel after installations.
Let’s Install it!¶
[ ]:
!pip install awswrangler
Restart your kernel after the installation!
[1]:
import awswrangler as wr
wr.__version__
[1]:
'2.0.0'
2 - Sessions¶
How Wrangler handle Sessions and AWS credentials?¶
After version 1.0.0 Wrangler absolutely relies on Boto3.Session() to manage AWS credentials and configurations.
Wrangler will not store any kind of state internally. Users are in charge of managing Sessions.
Most Wrangler functions receive the optional boto3_session
argument. If None is received, the default boto3 Session will be used.
[1]:
import awswrangler as wr
import boto3
Customizing and using the default Boto3 Session¶
[3]:
boto3.setup_default_session(region_name="us-east-2")
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake")
[3]:
False
Using a new custom Boto3 Session¶
[4]:
my_session = boto3.Session(region_name="us-east-2")
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake", boto3_session=my_session)
[4]:
False
3 - Amazon S3¶
Table of Contents¶
[1]:
import awswrangler as wr
import pandas as pd
import boto3
import pytz
from datetime import datetime
df1 = pd.DataFrame({
"id": [1, 2],
"name": ["foo", "boo"]
})
df2 = pd.DataFrame({
"id": [3],
"name": ["bar"]
})
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
1. CSV files¶
1.1 Writing CSV files¶
[3]:
path1 = f"s3://{bucket}/csv/file1.csv"
path2 = f"s3://{bucket}/csv/file2.csv"
wr.s3.to_csv(df1, path1, index=False)
wr.s3.to_csv(df2, path2, index=False);
2. JSON files¶
2.1 Writing JSON files¶
[7]:
path1 = f"s3://{bucket}/json/file1.json"
path2 = f"s3://{bucket}/json/file2.json"
wr.s3.to_json(df1, path1)
wr.s3.to_json(df2, path2)
[7]:
['s3://woodadw-test/json/file2.json']
3. Parquet files¶
For more complex features releated to Parquet Dataset check the tutorial number 4.
3.1 Writing Parquet files¶
[11]:
path1 = f"s3://{bucket}/parquet/file1.parquet"
path2 = f"s3://{bucket}/parquet/file2.parquet"
wr.s3.to_parquet(df1, path1)
wr.s3.to_parquet(df2, path2);
4. Fixed-width formatted files (only read)¶
As of today, Pandas doesn’t implement a to_fwf
functionality, so let’s manually write two files:
[15]:
content = "1 Herfelingen 27-12-18\n"\
"2 Lambusart 14-06-18\n"\
"3 Spormaggiore 15-04-18"
boto3.client("s3").put_object(Body=content, Bucket=bucket, Key="fwf/file1.txt")
content = "4 Buizingen 05-09-19\n"\
"5 San Rafael 04-09-19"
boto3.client("s3").put_object(Body=content, Bucket=bucket, Key="fwf/file2.txt")
path1 = f"s3://{bucket}/fwf/file1.txt"
path2 = f"s3://{bucket}/fwf/file2.txt"
4.1 Reading single FWF file¶
[16]:
wr.s3.read_fwf([path1], names=["id", "name", "date"])
[16]:
id | name | date | |
---|---|---|---|
0 | 1 | Herfelingen | 27-12-18 |
1 | 2 | Lambusart | 14-06-18 |
2 | 3 | Spormaggiore | 15-04-18 |
4.2 Reading multiple FWF files¶
4.2.1 Reading FWF by list¶
[17]:
wr.s3.read_fwf([path1, path2], names=["id", "name", "date"])
[17]:
id | name | date | |
---|---|---|---|
0 | 1 | Herfelingen | 27-12-18 |
1 | 2 | Lambusart | 14-06-18 |
2 | 3 | Spormaggiore | 15-04-18 |
3 | 4 | Buizingen | 05-09-19 |
4 | 5 | San Rafael | 04-09-19 |
4.2.2 Reading FWF by prefix¶
[18]:
wr.s3.read_fwf(f"s3://{bucket}/fwf/", names=["id", "name", "date"])
[18]:
id | name | date | |
---|---|---|---|
0 | 1 | Herfelingen | 27-12-18 |
1 | 2 | Lambusart | 14-06-18 |
2 | 3 | Spormaggiore | 15-04-18 |
3 | 4 | Buizingen | 05-09-19 |
4 | 5 | San Rafael | 04-09-19 |
5. Excel files¶
5.1 Writing Excel file¶
[19]:
path = f"s3://{bucket}/file0.xlsx"
wr.s3.to_excel(df1, path, index=False)
[19]:
's3://woodadw-test/file0.xlsx'
6. Reading with lastModified filter¶
Specify the filter by LastModified Date.
The filter needs to be specified as datime with time zone
Internally the path needs to be listed, after that the filter is applied.
The filter compare the s3 content with the variables lastModified_begin and lastModified_end
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
6.1 Define the Date time with UTC Timezone¶
[21]:
begin = datetime.strptime("20-07-31 20:30", "%y-%m-%d %H:%M")
end = datetime.strptime("21-07-31 20:30", "%y-%m-%d %H:%M")
begin_utc = pytz.utc.localize(begin)
end_utc = pytz.utc.localize(end)
6.2 Define the Date time and specify the Timezone¶
[22]:
begin = datetime.strptime("20-07-31 20:30", "%y-%m-%d %H:%M")
end = datetime.strptime("21-07-31 20:30", "%y-%m-%d %H:%M")
timezone = pytz.timezone("America/Los_Angeles")
begin_Los_Angeles = timezone.localize(begin)
end_Los_Angeles = timezone.localize(end)
6.3 Read json using the LastModified filters¶
[23]:
wr.s3.read_fwf(f"s3://{bucket}/fwf/", names=["id", "name", "date"], last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_json(f"s3://{bucket}/json/", last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_csv(f"s3://{bucket}/csv/", last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_parquet(f"s3://{bucket}/parquet/", last_modified_begin=begin_utc, last_modified_end=end_utc);
7. Download objects¶
Objects can be downloaded from S3 using either a path to a local file or a file-like object in binary mode.
7.1 Download object to a file path¶
[24]:
local_file_dir = getpass.getpass()
[25]:
import os
path1 = f"s3://{bucket}/csv/file1.csv"
local_file = os.path.join(local_file_dir, "file1.csv")
wr.s3.download(path=path1, local_file=local_file)
pd.read_csv(local_file)
[25]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
7.2 Download object to a file-like object in binary mode¶
[26]:
path2 = f"s3://{bucket}/csv/file2.csv"
local_file = os.path.join(local_file_dir, "file2.csv")
with open(local_file, mode="wb") as local_f:
wr.s3.download(path=path2, local_file=local_f)
pd.read_csv(local_file)
[26]:
id | name | |
---|---|---|
0 | 3 | bar |
8. Upload objects¶
Objects can be uploaded to S3 using either a path to a local file or a file-like object in binary mode.
8.1 Upload object from a file path¶
[27]:
local_file = os.path.join(local_file_dir, "file1.csv")
wr.s3.upload(local_file=local_file, path=path1)
wr.s3.read_csv(path1)
[27]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
8.2 Upload object from a file-like object in binary mode¶
[28]:
local_file = os.path.join(local_file_dir, "file2.csv")
with open(local_file, "rb") as local_f:
wr.s3.upload(local_file=local_f, path=path2)
wr.s3.read_csv(path2)
[28]:
id | name | |
---|---|---|
0 | 3 | bar |
9. Delete objects¶
[29]:
wr.s3.delete_objects(f"s3://{bucket}/")
4 - Parquet Datasets¶
Wrangler has 3 different write modes to store Parquet Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Creating the Dataset¶
[3]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite"
)
wr.s3.read_parquet(path, dataset=True)
[3]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Appending¶
[4]:
df = pd.DataFrame({
"id": [3],
"value": ["bar"],
"date": [date(2020, 1, 3)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="append"
)
wr.s3.read_parquet(path, dataset=True)
[4]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
1 | 1 | foo | 2020-01-01 |
2 | 2 | boo | 2020-01-02 |
Overwriting¶
[5]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite"
)
wr.s3.read_parquet(path, dataset=True)
[5]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
Creating a Partitoned Dataset¶
[6]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["date"]
)
wr.s3.read_parquet(path, dataset=True)
[6]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Upserting partitions (overwrite_partitions)¶
[7]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite_partitions",
partition_cols=["date"]
)
wr.s3.read_parquet(path, dataset=True)
[7]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
BONUS - Glue/Athena integration¶
[8]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="aws_data_wrangler",
table="my_table"
)
wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_data_wrangler")
[8]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
5 - Glue Catalog¶
Wrangler makes heavy use of Glue Catalog to store metadata of tables and connections.
[1]:
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
············
Creating a Pandas DataFrame¶
[3]:
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["shoes", "tshirt", "ball"],
"price": [50.3, 10.5, 20.0],
"in_stock": [True, True, False]
})
df
[3]:
id | name | price | in_stock | |
---|---|---|---|---|
0 | 1 | shoes | 50.3 | True |
1 | 2 | tshirt | 10.5 | True |
2 | 3 | ball | 20.0 | False |
Checking Glue Catalog Databases¶
[4]:
databases = wr.catalog.databases()
print(databases)
Database Description
0 aws_data_wrangler AWS Data Wrangler Test Arena - Glue Database
1 default Default Hive database
Create the database awswrangler_test if not exists¶
[5]:
if "awswrangler_test" not in databases.values:
wr.catalog.create_database("awswrangler_test")
print(wr.catalog.databases())
else:
print("Database awswrangler_test already exists")
Database Description
0 aws_data_wrangler AWS Data Wrangler Test Arena - Glue Database
1 awswrangler_test
2 default Default Hive database
Checking the empty database¶
[6]:
wr.catalog.tables(database="awswrangler_test")
[6]:
Database | Table | Description | Columns | Partitions |
---|
Writing DataFrames to Data Lake (S3 + Parquet + Glue Catalog)¶
[7]:
desc = "This is my product table."
param = {
"source": "Product Web Service",
"class": "e-commerce"
}
comments = {
"id": "Unique product ID.",
"name": "Product name",
"price": "Product price (dollar)",
"in_stock": "Is this product availaible in the stock?"
}
res = wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/products/",
dataset=True,
database="awswrangler_test",
table="products",
mode="overwrite",
description=desc,
parameters=param,
columns_comments=comments
)
Checking Glue Catalog (AWS Console)¶

Looking Up for the new table!¶
[8]:
wr.catalog.tables(name_contains="roduc")
[8]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
[9]:
wr.catalog.tables(name_prefix="pro")
[9]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
[10]:
wr.catalog.tables(name_suffix="ts")
[10]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
[11]:
wr.catalog.tables(search_text="This is my")
[11]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
Getting tables details¶
[12]:
wr.catalog.table(database="awswrangler_test", table="products")
[12]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | bigint | False | Unique product ID. |
1 | name | string | False | Product name |
2 | price | double | False | Product price (dollar) |
3 | in_stock | boolean | False | Is this product availaible in the stock? |
6 - Amazon Athena¶
Wrangler has two ways to run queries on Athena and fetch the result as a DataFrame:
ctas_approach=True (Default)
Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
CONS
:Requires create/delete table permissions on Glue.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
A temporary table will be created and then deleted immediately.
Does not support custom data_source/catalog_id.
ctas_approach=False
Does a regular query on Athena and parse the regular CSV result on s3.
PROS
:Faster for small result sizes (less latency).
Does not require create/delete table permissions on Glue
Supports timestamp with time zone.
Support custom data_source/catalog_id.
CONS
:Slower (But stills faster than other libraries that uses the regular Athena API)
Does not handle nested types at all.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
···········································
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | UZM00038457 | 1899-12-31 | PRCP | 16 | NaN | NaN | r | NaN |
29240015 | UZM00038457 | 1899-12-31 | TAVG | -73 | NaN | NaN | r | NaN |
29240016 | UZM00038618 | 1899-12-31 | TMIN | -76 | NaN | NaN | r | NaN |
29240017 | UZM00038618 | 1899-12-31 | PRCP | 0 | NaN | NaN | r | NaN |
29240018 | UZM00038618 | 1899-12-31 | TAVG | -60 | NaN | NaN | r | NaN |
29240019 rows × 8 columns
[5]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
);
[6]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[6]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | string | False | |
1 | dt | timestamp | False | |
2 | element | string | False | |
3 | value | bigint | False | |
4 | m_flag | string | False | |
5 | q_flag | string | False | |
6 | s_flag | string | False | |
7 | obs_time | string | False |
Reading with ctas_approach=False¶
[7]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
CPU times: user 8min 45s, sys: 6.52 s, total: 8min 51s
Wall time: 11min 3s
[7]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | <NA> | <NA> | E | <NA> |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | <NA> | <NA> | E | <NA> |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | <NA> | <NA> | E | <NA> |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | <NA> | <NA> | E | <NA> |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | <NA> | <NA> | E | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | UZM00038457 | 1899-12-31 | PRCP | 16 | <NA> | <NA> | r | <NA> |
29240015 | UZM00038457 | 1899-12-31 | TAVG | -73 | <NA> | <NA> | r | <NA> |
29240016 | UZM00038618 | 1899-12-31 | TMIN | -76 | <NA> | <NA> | r | <NA> |
29240017 | UZM00038618 | 1899-12-31 | PRCP | 0 | <NA> | <NA> | r | <NA> |
29240018 | UZM00038618 | 1899-12-31 | TAVG | -60 | <NA> | <NA> | r | <NA> |
29240019 rows × 8 columns
Default with ctas_approach=True - 13x faster (default)¶
[8]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
CPU times: user 28 s, sys: 6.07 s, total: 34.1 s
Wall time: 50.5 s
[8]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | ASN00017088 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
1 | ASN00017087 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
2 | ASN00017089 | 1890-06-11 | PRCP | 71 | <NA> | <NA> | a | <NA> |
3 | ASN00017095 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
4 | ASN00017094 | 1890-06-11 | PRCP | 0 | <NA> | <NA> | a | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | USC00461260 | 1899-12-31 | SNOW | 0 | <NA> | <NA> | 6 | <NA> |
29240015 | USC00461515 | 1899-12-31 | TMAX | -89 | <NA> | <NA> | 6 | <NA> |
29240016 | USC00461515 | 1899-12-31 | TMIN | -189 | <NA> | <NA> | 6 | <NA> |
29240017 | USC00461515 | 1899-12-31 | PRCP | 0 | <NA> | <NA> | 6 | <NA> |
29240018 | USC00461515 | 1899-12-31 | SNOW | 0 | <NA> | <NA> | 6 | <NA> |
29240019 rows × 8 columns
Using categories to speed up and save memory - 24x faster¶
[9]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
CPU times: user 6.89 s, sys: 2.27 s, total: 9.16 s
Wall time: 27.3 s
[9]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | GME00102348 | 1890-08-03 | TMAX | 172 | NaN | NaN | E | NaN |
1 | GME00102348 | 1890-08-03 | TMIN | 117 | NaN | NaN | E | NaN |
2 | GME00102348 | 1890-08-03 | PRCP | 63 | NaN | NaN | E | NaN |
3 | GME00102348 | 1890-08-03 | SNWD | 0 | NaN | NaN | E | NaN |
4 | GME00121126 | 1890-08-03 | PRCP | 32 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | USC00461260 | 1899-12-31 | SNOW | 0 | NaN | NaN | 6 | NaN |
29240015 | USC00461515 | 1899-12-31 | TMAX | -89 | NaN | NaN | 6 | NaN |
29240016 | USC00461515 | 1899-12-31 | TMIN | -189 | NaN | NaN | 6 | NaN |
29240017 | USC00461515 | 1899-12-31 | PRCP | 0 | NaN | NaN | 6 | NaN |
29240018 | USC00461515 | 1899-12-31 | SNOW | 0 | NaN | NaN | 6 | NaN |
29240019 rows × 8 columns
Batching (Good for restricted memory environments)¶
[10]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=True # Chunksize calculated automatically for ctas_approach.
)
for df in dfs: # Batching
print(len(df.index))
1024
8086528
1024
1024
1024
1024
1024
15360
1024
10090496
2153472
8886995
CPU times: user 22.7 s, sys: 5.41 s, total: 28.1 s
Wall time: 48 s
[11]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=100_000_000
)
for df in dfs: # Batching
print(len(df.index))
29240019
CPU times: user 34.8 s, sys: 8.54 s, total: 43.4 s
Wall time: 1min 1s
Cleaning Up S3¶
[12]:
wr.s3.delete_objects(path)
Delete table¶
[13]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa");
Delete Database¶
[14]:
wr.catalog.delete_database('awswrangler_test')
7 - Redshift, MySQL, PostgreSQL and SQL Server¶
Wrangler’s Redshift, MySQL and PostgreSQL have two basic function in common that tries to follow the Pandas conventions, but add more data type consistency.
[1]:
import awswrangler as wr
import pandas as pd
df = pd.DataFrame({
"id": [1, 2],
"name": ["foo", "boo"]
})
Connect using the Glue Catalog Connections¶
[2]:
con_redshift = wr.redshift.connect("aws-data-wrangler-redshift")
con_mysql = wr.mysql.connect("aws-data-wrangler-mysql")
con_postgresql = wr.postgresql.connect("aws-data-wrangler-postgresql")
con_sqlserver = wr.sqlserver.connect("aws-data-wrangler-sqlserver")
Raw SQL queries (No Pandas)¶
[3]:
with con_redshift.cursor() as cursor:
for row in cursor.execute("SELECT 1"):
print(row)
[1]
Loading data to Database¶
[4]:
wr.redshift.to_sql(df, con_redshift, schema="public", table="tutorial", mode="overwrite")
wr.mysql.to_sql(df, con_mysql, schema="test", table="tutorial", mode="overwrite")
wr.postgresql.to_sql(df, con_postgresql, schema="public", table="tutorial", mode="overwrite")
wr.sqlserver.to_sql(df, con_sqlserver, schema="dbo", table="tutorial", mode="overwrite")
Unloading data from Database¶
[5]:
wr.redshift.read_sql_query("SELECT * FROM public.tutorial", con=con_redshift)
wr.mysql.read_sql_query("SELECT * FROM test.tutorial", con=con_mysql)
wr.postgresql.read_sql_query("SELECT * FROM public.tutorial", con=con_postgresql)
wr.sqlserver.read_sql_query("SELECT * FROM dbo.tutorial", con=con_sqlserver)
[5]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
[6]:
con_redshift.close()
con_mysql.close()
con_postgresql.close()
con_sqlserver.close()
8 - Redshift - COPY & UNLOAD¶
Amazon Redshift
has two SQL command that help to load and unload large amount of data staging it on Amazon S3
:
1 - COPY
2 - UNLOAD
Let’s take a look and how Wrangler can use it.
[1]:
import awswrangler as wr
con = wr.redshift.connect("aws-data-wrangler-redshift")
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/stage/"
···········································
Enter your IAM ROLE ARN:¶
[3]:
iam_role = getpass.getpass()
····················································································
Creating a Dataframe from the NOAA’s CSV files¶
[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/1897.csv",
names=cols,
parse_dates=["dt", "obs_time"]) # ~127MB, ~4MM rows
df
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AG000060590 | 1897-01-01 | TMAX | 170 | NaN | NaN | E | NaN |
1 | AG000060590 | 1897-01-01 | TMIN | -14 | NaN | NaN | E | NaN |
2 | AG000060590 | 1897-01-01 | PRCP | 0 | NaN | NaN | E | NaN |
3 | AGE00135039 | 1897-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00135039 | 1897-01-01 | TMIN | 40 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
3923594 | UZM00038457 | 1897-12-31 | TMIN | -145 | NaN | NaN | r | NaN |
3923595 | UZM00038457 | 1897-12-31 | PRCP | 4 | NaN | NaN | r | NaN |
3923596 | UZM00038457 | 1897-12-31 | TAVG | -95 | NaN | NaN | r | NaN |
3923597 | UZM00038618 | 1897-12-31 | PRCP | 66 | NaN | NaN | r | NaN |
3923598 | UZM00038618 | 1897-12-31 | TAVG | -45 | NaN | NaN | r | NaN |
3923599 rows × 8 columns
Load and Unload with COPY and UNLOAD commands¶
Note: Please use a empty S3 path for the COPY command.
[5]:
%%time
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="commands",
mode="overwrite",
iam_role=iam_role,
)
CPU times: user 2.78 s, sys: 293 ms, total: 3.08 s
Wall time: 20.7 s
[6]:
%%time
wr.redshift.unload(
sql="SELECT * FROM public.commands",
con=con,
iam_role=iam_role,
path=path,
keep_files=True,
)
CPU times: user 10 s, sys: 1.14 s, total: 11.2 s
Wall time: 27.5 s
[6]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AG000060590 | 1897-01-01 | TMAX | 170 | <NA> | <NA> | E | <NA> |
1 | AG000060590 | 1897-01-01 | PRCP | 0 | <NA> | <NA> | E | <NA> |
2 | AGE00135039 | 1897-01-01 | TMIN | 40 | <NA> | <NA> | E | <NA> |
3 | AGE00147705 | 1897-01-01 | TMAX | 164 | <NA> | <NA> | E | <NA> |
4 | AGE00147705 | 1897-01-01 | PRCP | 0 | <NA> | <NA> | E | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... |
3923594 | USW00094967 | 1897-12-31 | TMAX | -144 | <NA> | <NA> | 6 | <NA> |
3923595 | USW00094967 | 1897-12-31 | PRCP | 0 | P | <NA> | 6 | <NA> |
3923596 | UZM00038457 | 1897-12-31 | TMAX | -49 | <NA> | <NA> | r | <NA> |
3923597 | UZM00038457 | 1897-12-31 | PRCP | 4 | <NA> | <NA> | r | <NA> |
3923598 | UZM00038618 | 1897-12-31 | PRCP | 66 | <NA> | <NA> | r | <NA> |
7847198 rows × 8 columns
[7]:
con.close()
9 - Redshift - Append, Overwrite and Upsert¶
Wrangler’s copy/to_sql
function has three different mode
options for Redshift.
1 - append
2 - overwrite
3 - upsert
[2]:
import awswrangler as wr
import pandas as pd
from datetime import date
con = wr.redshift.connect("aws-data-wrangler-redshift")
Enter your bucket name:¶
[3]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/stage/"
···········································
Enter your IAM ROLE ARN:¶
[4]:
iam_role = getpass.getpass()
····················································································
Creating the table (Overwriting if it exists)¶
[10]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="my_table",
mode="overwrite",
iam_role=iam_role,
primary_keys=["id"]
)
wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[10]:
id | value | date | |
---|---|---|---|
0 | 2 | boo | 2020-01-02 |
1 | 1 | foo | 2020-01-01 |
Appending¶
[11]:
df = pd.DataFrame({
"id": [3],
"value": ["bar"],
"date": [date(2020, 1, 3)]
})
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="my_table",
mode="append",
iam_role=iam_role,
primary_keys=["id"]
)
wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[11]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
Upserting¶
[12]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="my_table",
mode="upsert",
iam_role=iam_role,
primary_keys=["id"]
)
wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[12]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
Cleaning Up¶
[13]:
with con.cursor() as cursor:
cursor.execute("DROP TABLE public.my_table")
con.close()
10 - Parquet Crawler¶
Wrangler can extract only the metadata from Parquet files and Partitions and then add it to the Glue Catalog.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
············
Creating a Parquet Table from the NOAA’s CSV files¶
[3]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[3]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29249753 | UZM00038457 | 1899-12-31 | PRCP | 16 | NaN | NaN | r | NaN |
29249754 | UZM00038457 | 1899-12-31 | TAVG | -73 | NaN | NaN | r | NaN |
29249755 | UZM00038618 | 1899-12-31 | TMIN | -76 | NaN | NaN | r | NaN |
29249756 | UZM00038618 | 1899-12-31 | PRCP | 0 | NaN | NaN | r | NaN |
29249757 | UZM00038618 | 1899-12-31 | TAVG | -60 | NaN | NaN | r | NaN |
29249758 rows × 8 columns
[4]:
df["year"] = df["dt"].dt.year
df.head(3)
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | year | |
---|---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN | 1890 |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN | 1890 |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN | 1890 |
[5]:
res = wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["year"],
)
[6]:
[ x.split("data/", 1)[1] for x in wr.s3.list_objects(path)]
[6]:
['year=1890/06a519afcf8e48c9b08c8908f30adcfe.snappy.parquet',
'year=1891/5a99c28dbef54008bfc770c946099e02.snappy.parquet',
'year=1892/9b1ea5d1cfad40f78c920f93540ca8ec.snappy.parquet',
'year=1893/92259b49c134401eaf772506ee802af6.snappy.parquet',
'year=1894/c734469ffff944f69dc277c630064a16.snappy.parquet',
'year=1895/cf7ccde86aaf4d138f86c379c0817aa6.snappy.parquet',
'year=1896/ce02f4c2c554438786b766b33db451b6.snappy.parquet',
'year=1897/e04de04ad3c444deadcc9c410ab97ca1.snappy.parquet',
'year=1898/acb0e02878f04b56a6200f4b5a97be0e.snappy.parquet',
'year=1899/a269bdbb0f6a48faac55f3bcfef7df7a.snappy.parquet']
Crawling!¶
[7]:
%%time
res = wr.s3.store_parquet_metadata(
path=path,
database="awswrangler_test",
table="crawler",
dataset=True,
mode="overwrite",
dtype={"year": "int"}
)
CPU times: user 1.81 s, sys: 528 ms, total: 2.33 s
Wall time: 3.21 s
Checking¶
[8]:
wr.catalog.table(database="awswrangler_test", table="crawler")
[8]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | string | False | |
1 | dt | timestamp | False | |
2 | element | string | False | |
3 | value | bigint | False | |
4 | m_flag | string | False | |
5 | q_flag | string | False | |
6 | s_flag | string | False | |
7 | obs_time | string | False | |
8 | year | int | True |
[9]:
%%time
wr.athena.read_sql_query("SELECT * FROM crawler WHERE year=1890", database="awswrangler_test")
CPU times: user 3.52 s, sys: 811 ms, total: 4.33 s
Wall time: 9.6 s
[9]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | year | |
---|---|---|---|---|---|---|---|---|---|
0 | USC00195145 | 1890-01-01 | TMIN | -28 | <NA> | <NA> | 6 | <NA> | 1890 |
1 | USC00196770 | 1890-01-01 | PRCP | 0 | P | <NA> | 6 | <NA> | 1890 |
2 | USC00196770 | 1890-01-01 | SNOW | 0 | <NA> | <NA> | 6 | <NA> | 1890 |
3 | USC00196915 | 1890-01-01 | PRCP | 0 | P | <NA> | 6 | <NA> | 1890 |
4 | USC00196915 | 1890-01-01 | SNOW | 0 | <NA> | <NA> | 6 | <NA> | 1890 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
6139 | ASN00022006 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6140 | ASN00022007 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6141 | ASN00022008 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6142 | ASN00022009 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6143 | ASN00022011 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
1276246 rows × 9 columns
Cleaning Up S3¶
[10]:
wr.s3.delete_objects(path)
Cleaning Up the Database¶
[11]:
for table in wr.catalog.get_tables(database="awswrangler_test"):
wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])
11 - CSV Datasets¶
Wrangler has 3 different write modes to store CSV Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating the Dataset¶
[4]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[4]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Appending¶
[5]:
df = pd.DataFrame({
"id": [3],
"value": ["bar"],
"date": [date(2020, 1, 3)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="append",
database="awswrangler_test",
table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[5]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
1 | 1 | foo | 2020-01-01 |
2 | 2 | boo | 2020-01-02 |
Overwriting¶
[6]:
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[6]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
Creating a Partitoned Dataset¶
[7]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset",
partition_cols=["date"]
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[7]:
id | value | date | |
---|---|---|---|
0 | 2 | boo | 2020-01-02 |
1 | 1 | foo | 2020-01-01 |
Upserting partitions (overwrite_partitions)¶
[8]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite_partitions",
database="awswrangler_test",
table="csv_dataset",
partition_cols=["date"]
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[8]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
0 | 3 | bar | 2020-01-03 |
BONUS - Glue/Athena integration¶
[9]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
index=False,
mode="overwrite",
database="aws_data_wrangler",
table="my_table",
compression="gzip"
)
wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_data_wrangler")
[9]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
12 - CSV Crawler¶
Wrangler can extract only the metadata from a Pandas DataFrame and then add it can be added to Glue Catalog as a table.
[1]:
import awswrangler as wr
from datetime import datetime
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/csv_crawler/"
············
Creating a Pandas DataFrame¶
[3]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") # noqa
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date() # noqa
df = pd.DataFrame(
{
"id": [1, 2, 3],
"string": ["foo", None, "boo"],
"float": [1.0, None, 2.0],
"date": [dt("2020-01-01"), None, dt("2020-01-02")],
"timestamp": [ts("2020-01-01 00:00:00.0"), None, ts("2020-01-02 00:00:01.0")],
"bool": [True, None, False],
"par0": [1, 1, 2],
"par1": ["a", "b", "b"],
}
)
df
[3]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | foo | 1.0 | 2020-01-01 | 2020-01-01 00:00:00 | True | 1 | a |
1 | 2 | None | NaN | None | NaT | None | 1 | b |
2 | 3 | boo | 2.0 | 2020-01-02 | 2020-01-02 00:00:01 | False | 2 | b |
Extracting the metadata¶
[4]:
columns_types, partitions_types = wr.catalog.extract_athena_types(
df=df,
file_format="csv",
index=False,
partition_cols=["par0", "par1"]
)
[5]:
columns_types
[5]:
{'id': 'bigint',
'string': 'string',
'float': 'double',
'date': 'date',
'timestamp': 'timestamp',
'bool': 'boolean'}
[6]:
partitions_types
[6]:
{'par0': 'bigint', 'par1': 'string'}
Creating the table¶
[7]:
wr.catalog.create_csv_table(
table="csv_crawler",
database="awswrangler_test",
path=path,
partitions_types=partitions_types,
columns_types=columns_types,
)
Checking¶
[8]:
wr.catalog.table(database="awswrangler_test", table="csv_crawler")
[8]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | bigint | False | |
1 | string | string | False | |
2 | float | double | False | |
3 | date | date | False | |
4 | timestamp | timestamp | False | |
5 | bool | boolean | False | |
6 | par0 | bigint | True | |
7 | par1 | string | True |
We can still using the extracted metadata to ensure all data types consistence to new data¶
[9]:
df = pd.DataFrame(
{
"id": [1],
"string": ["1"],
"float": [1],
"date": [ts("2020-01-01 00:00:00.0")],
"timestamp": [dt("2020-01-02")],
"bool": [1],
"par0": [1],
"par1": ["a"],
}
)
df
[9]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 1 | 2020-01-01 | 2020-01-02 | 1 | 1 | a |
[10]:
res = wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
database="awswrangler_test",
table="csv_crawler",
partition_cols=["par0", "par1"],
dtype=columns_types
)
You can also extract the metadata directly from the Catalog if you want¶
[11]:
dtype = wr.catalog.get_table_types(database="awswrangler_test", table="csv_crawler")
[12]:
res = wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
database="awswrangler_test",
table="csv_crawler",
partition_cols=["par0", "par1"],
dtype=dtype
)
Checking out¶
[13]:
df = wr.athena.read_sql_table(database="awswrangler_test", table="csv_crawler")
df
[13]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 1.0 | None | 2020-01-02 | True | 1 | a |
1 | 1 | 1 | 1.0 | None | 2020-01-02 | True | 1 | a |
[14]:
df.dtypes
[14]:
id Int64
string string
float float64
date object
timestamp datetime64[ns]
bool boolean
par0 Int64
par1 string
dtype: object
Cleaning Up S3¶
[15]:
wr.s3.delete_objects(path)
Cleaning Up the Database¶
[16]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="csv_crawler")
[16]:
True
13 - Merging Datasets on S3¶
Wrangler has 3 different copy modes to store Parquet Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path1 = f"s3://{bucket}/dataset1/"
path2 = f"s3://{bucket}/dataset2/"
············
Creating Dataset 1¶
[3]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path1,
dataset=True,
mode="overwrite",
partition_cols=["date"]
)
wr.s3.read_parquet(path1, dataset=True)
[3]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Creating Dataset 2¶
[4]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
dataset2_files = wr.s3.to_parquet(
df=df,
path=path2,
dataset=True,
mode="overwrite",
partition_cols=["date"]
)["paths"]
wr.s3.read_parquet(path2, dataset=True)
[4]:
id | value | date | |
---|---|---|---|
0 | 2 | xoo | 2020-01-02 |
1 | 3 | bar | 2020-01-03 |
Merging (Dataset 2 -> Dataset 1) (APPEND)¶
[5]:
wr.s3.merge_datasets(
source_path=path2,
target_path=path1,
mode="append"
)
wr.s3.read_parquet(path1, dataset=True)
[5]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 2 | boo | 2020-01-02 |
3 | 3 | bar | 2020-01-03 |
Merging (Dataset 2 -> Dataset 1) (OVERWRITE_PARTITIONS)¶
[6]:
wr.s3.merge_datasets(
source_path=path2,
target_path=path1,
mode="overwrite_partitions"
)
wr.s3.read_parquet(path1, dataset=True)
[6]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
Merging (Dataset 2 -> Dataset 1) (OVERWRITE)¶
[7]:
wr.s3.merge_datasets(
source_path=path2,
target_path=path1,
mode="overwrite"
)
wr.s3.read_parquet(path1, dataset=True)
[7]:
id | value | date | |
---|---|---|---|
0 | 2 | xoo | 2020-01-02 |
1 | 3 | bar | 2020-01-03 |
Cleaning Up¶
[8]:
wr.s3.delete_objects(path1)
wr.s3.delete_objects(path2)
14 - Schema Evolution¶
Wrangler support new columns on Parquet Dataset through:
wr.s3.store_parquet_metadata() i.e. “Crawler”
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
···········································
Creating the Dataset¶
[3]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="aws_data_wrangler",
table="my_table"
)
wr.s3.read_parquet(path, dataset=True)
[3]:
id | value | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
Schema Version 0 on Glue Catalog (AWS Console)¶

Appending with NEW COLUMNS¶
[4]:
df = pd.DataFrame({
"id": [3, 4],
"value": ["bar", None],
"date": [date(2020, 1, 3), date(2020, 1, 4)],
"flag": [True, False]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="append",
database="aws_data_wrangler",
table="my_table",
catalog_versioning=True # Optional
)
wr.s3.read_parquet(path, dataset=True, validate_schema=False)
[4]:
id | value | date | flag | |
---|---|---|---|---|
0 | 3 | bar | 2020-01-03 | True |
1 | 4 | None | 2020-01-04 | False |
2 | 1 | foo | NaN | NaN |
3 | 2 | boo | NaN | NaN |
Schema Version 1 on Glue Catalog (AWS Console)¶

Reading from Athena¶
[5]:
wr.athena.read_sql_table(table="my_table", database="aws_data_wrangler")
[5]:
id | value | date | flag | |
---|---|---|---|---|
0 | 3 | bar | 2020-01-03 | True |
1 | 4 | None | 2020-01-04 | False |
2 | 1 | foo | None | <NA> |
3 | 2 | boo | None | <NA> |
Cleaning Up¶
[6]:
wr.s3.delete_objects(path)
wr.catalog.delete_table_if_exists(table="my_table", database="aws_data_wrangler")
[6]:
True
15 - EMR¶
[1]:
import awswrangler as wr
import boto3
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
··········································
Creating EMR Cluster¶
[9]:
cluster_id = wr.emr.create_cluster(subnet)
Uploading our PySpark script to Amazon S3¶
[10]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext
print("Spark Initialized")
"""
_ = boto3.client("s3").put_object(
Body=script,
Bucket=bucket,
Key="test.py"
)
Submit PySpark step¶
[11]:
step_id = wr.emr.submit_step(cluster_id, command=f"spark-submit s3://{bucket}/test.py")
Wait Step¶
[12]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
16 - EMR & Docker¶
[ ]:
import awswrangler as wr
import boto3
import getpass
Build and Upload Docker Image to ECR repository¶
Replace the {ACCOUNT_ID}
placeholder.
[ ]:
%%writefile Dockerfile
FROM amazoncorretto:8
RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development
RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv
RUN python -V
RUN python3 -V
ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3
RUN pip3 install --upgrade pip
RUN pip3 install awswrangler
RUN python3 -c "import awswrangler as wr"
[ ]:
%%bash
docker build -t 'local/emr-wrangler' .
aws ecr create-repository --repository-name emr-wrangler
docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
eval $(aws ecr get-login --region us-east-1 --no-include-email)
docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
Creating EMR Cluster¶
[4]:
cluster_id = wr.emr.create_cluster(subnet, docker=True)
Refresh ECR credentials in the cluster (expiration time: 12h )¶
[5]:
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/")
[5]:
's-1B0O45RWJL8CL'
Uploading application script to Amazon S3 (PySpark)¶
[7]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext
print("Spark Initialized")
import awswrangler as wr
print(f"Wrangler version: {wr.__version__}")
"""
boto3.client("s3").put_object(Body=script, Bucket=bucket, Key="test_docker.py");
Submit PySpark step¶
[8]:
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
step_id = wr.emr.submit_spark_step(
cluster_id,
f"s3://{bucket}/test_docker.py",
docker_image=DOCKER_IMAGE
)
Wait Step¶
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
Terminate Cluster¶
[ ]:
wr.emr.terminate_cluster(cluster_id)
Another example with custom configurations¶
[9]:
cluster_id = wr.emr.create_cluster(
cluster_name="my-demo-cluster-v2",
logging_s3_path=f"s3://{bucket}/emr-logs/",
emr_release="emr-6.0.0",
subnet_id=subnet,
emr_ec2_role="EMR_EC2_DefaultRole",
emr_role="EMR_DefaultRole",
instance_type_master="m5.2xlarge",
instance_type_core="m5.2xlarge",
instance_ebs_size_master=50,
instance_ebs_size_core=50,
instance_num_on_demand_master=0,
instance_num_on_demand_core=0,
instance_num_spot_master=1,
instance_num_spot_core=2,
spot_bid_percentage_of_on_demand_master=100,
spot_bid_percentage_of_on_demand_core=100,
spot_provisioning_timeout_master=5,
spot_provisioning_timeout_core=5,
spot_timeout_to_on_demand_master=False,
spot_timeout_to_on_demand_core=False,
python3=True,
docker=True,
spark_glue_catalog=True,
hive_glue_catalog=True,
presto_glue_catalog=True,
debugging=True,
applications=["Hadoop", "Spark", "Hive", "Zeppelin", "Livy"],
visible_to_all_users=True,
maximize_resource_allocation=True,
keep_cluster_alive_when_no_steps=True,
termination_protected=False,
spark_pyarrow=True
)
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/emr/")
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
step_id = wr.emr.submit_spark_step(
cluster_id,
f"s3://{bucket}/test_docker.py",
docker_image=DOCKER_IMAGE
)
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
wr.emr.terminate_cluster(cluster_id)
[ ]:
17 - Partition Projection¶
https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html
[1]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
import getpass
Enter your bucket name:¶
[2]:
bucket = getpass.getpass()
···········································
Integer projection¶
[3]:
df = pd.DataFrame({
"value": [1, 2, 3],
"year": [2019, 2020, 2021],
"month": [10, 11, 12],
"day": [25, 26, 27]
})
df
[3]:
value | year | month | day | |
---|---|---|---|---|
0 | 1 | 2019 | 10 | 25 |
1 | 2 | 2020 | 11 | 26 |
2 | 3 | 2021 | 12 | 27 |
[4]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_integer/",
dataset=True,
partition_cols=["year", "month", "day"],
database="default",
table="table_integer",
projection_enabled=True,
projection_types={
"year": "integer",
"month": "integer",
"day": "integer"
},
projection_ranges={
"year": "2000,2025",
"month": "1,12",
"day": "1,31"
},
);
[5]:
wr.athena.read_sql_query(f"SELECT * FROM table_integer", database="default")
[5]:
value | year | month | day | |
---|---|---|---|---|
0 | 3 | 2021 | 12 | 27 |
1 | 2 | 2020 | 11 | 26 |
2 | 1 | 2019 | 10 | 25 |
Enum projection¶
[6]:
df = pd.DataFrame({
"value": [1, 2, 3],
"city": ["São Paulo", "Tokio", "Seattle"],
})
df
[6]:
value | city | |
---|---|---|
0 | 1 | São Paulo |
1 | 2 | Tokio |
2 | 3 | Seattle |
[7]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_enum/",
dataset=True,
partition_cols=["city"],
database="default",
table="table_enum",
projection_enabled=True,
projection_types={
"city": "enum",
},
projection_values={
"city": "São Paulo,Tokio,Seattle"
},
);
[8]:
wr.athena.read_sql_query(f"SELECT * FROM table_enum", database="default")
[8]:
value | city | |
---|---|---|
0 | 1 | São Paulo |
1 | 3 | Seattle |
2 | 2 | Tokio |
Date projection¶
[9]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date()
df = pd.DataFrame({
"value": [1, 2, 3],
"dt": [dt("2020-01-01"), dt("2020-01-02"), dt("2020-01-03")],
"ts": [ts("2020-01-01 00:00:00"), ts("2020-01-01 00:00:01"), ts("2020-01-01 00:00:02")],
})
df
[9]:
value | dt | ts | |
---|---|---|---|
0 | 1 | 2020-01-01 | 2020-01-01 00:00:00 |
1 | 2 | 2020-01-02 | 2020-01-01 00:00:01 |
2 | 3 | 2020-01-03 | 2020-01-01 00:00:02 |
[10]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_date/",
dataset=True,
partition_cols=["dt", "ts"],
database="default",
table="table_date",
projection_enabled=True,
projection_types={
"dt": "date",
"ts": "date",
},
projection_ranges={
"dt": "2020-01-01,2020-01-03",
"ts": "2020-01-01 00:00:00,2020-01-01 00:00:02"
},
);
[11]:
wr.athena.read_sql_query(f"SELECT * FROM table_date", database="default")
[11]:
value | dt | ts | |
---|---|---|---|
0 | 1 | 2020-01-01 | 2020-01-01 00:00:00 |
1 | 2 | 2020-01-02 | 2020-01-01 00:00:01 |
2 | 3 | 2020-01-03 | 2020-01-01 00:00:02 |
Injected projection¶
[12]:
df = pd.DataFrame({
"value": [1, 2, 3],
"uuid": ["761e2488-a078-11ea-bb37-0242ac130002", "b89ed095-8179-4635-9537-88592c0f6bc3", "87adc586-ce88-4f0a-b1c8-bf8e00d32249"],
})
df
[12]:
value | uuid | |
---|---|---|
0 | 1 | 761e2488-a078-11ea-bb37-0242ac130002 |
1 | 2 | b89ed095-8179-4635-9537-88592c0f6bc3 |
2 | 3 | 87adc586-ce88-4f0a-b1c8-bf8e00d32249 |
[13]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_injected/",
dataset=True,
partition_cols=["uuid"],
database="default",
table="table_injected",
projection_enabled=True,
projection_types={
"uuid": "injected",
}
);
[14]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM table_injected WHERE uuid='b89ed095-8179-4635-9537-88592c0f6bc3'",
database="default"
)
[14]:
value | uuid | |
---|---|---|
0 | 2 | b89ed095-8179-4635-9537-88592c0f6bc3 |
Cleaning Up¶
[15]:
wr.s3.delete_objects(f"s3://{bucket}/table_integer/")
wr.s3.delete_objects(f"s3://{bucket}/table_enum/")
wr.s3.delete_objects(f"s3://{bucket}/table_date/")
wr.s3.delete_objects(f"s3://{bucket}/table_injected/")
[16]:
wr.catalog.delete_table_if_exists(table="table_integer", database="default")
wr.catalog.delete_table_if_exists(table="table_enum", database="default")
wr.catalog.delete_table_if_exists(table="table_date", database="default")
wr.catalog.delete_table_if_exists(table="table_injected", database="default");
[ ]:
18 - QuickSight¶
For this tutorial we will use the public AWS COVID-19 data lake.
References:
Please, install the Cloudformation template above to have access to the public data lake.
P.S. To be able to access the public data lake, you must allow explicitly QuickSight to access the related external bucket.
[1]:
import awswrangler as wr
from time import sleep
List users of QuickSight account
[2]:
[{"username": user["UserName"], "role": user["Role"]} for user in wr.quicksight.list_users('default')]
[2]:
[{'username': 'dev', 'role': 'ADMIN'}]
[3]:
wr.catalog.databases()
[3]:
Database | Description | |
---|---|---|
0 | aws_data_wrangler | AWS Data Wrangler Test Arena - Glue Database |
1 | awswrangler_test | |
2 | covid-19 | |
3 | default | Default Hive database |
[4]:
wr.catalog.tables(database="covid-19")
[4]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | covid-19 | alleninstitute_comprehend_medical | Comprehend Medical results run against Allen I... | paper_id, date, dx_name, test_name, procedure_... | |
1 | covid-19 | alleninstitute_metadata | Metadata on papers pulled from the Allen Insti... | cord_uid, sha, source_x, title, doi, pmcid, pu... | |
2 | covid-19 | country_codes | Lookup table for country codes | country, alpha-2 code, alpha-3 code, numeric c... | |
3 | covid-19 | county_populations | Lookup table for population for each county ba... | id, id2, county, state, population estimate 2018 | |
4 | covid-19 | covid_knowledge_graph_edges | AWS Knowledge Graph for COVID-19 data | id, label, from, to, score | |
5 | covid-19 | covid_knowledge_graph_nodes_author | AWS Knowledge Graph for COVID-19 data | id, label, first, last, full_name | |
6 | covid-19 | covid_knowledge_graph_nodes_concept | AWS Knowledge Graph for COVID-19 data | id, label, entity, concept | |
7 | covid-19 | covid_knowledge_graph_nodes_institution | AWS Knowledge Graph for COVID-19 data | id, label, institution, country, settlement | |
8 | covid-19 | covid_knowledge_graph_nodes_paper | AWS Knowledge Graph for COVID-19 data | id, label, doi, sha_code, publish_time, source... | |
9 | covid-19 | covid_knowledge_graph_nodes_topic | AWS Knowledge Graph for COVID-19 data | id, label, topic, topic_num | |
10 | covid-19 | covid_testing_states_daily | USA total test daily trend by state. Sourced ... | date, state, positive, negative, pending, hosp... | |
11 | covid-19 | covid_testing_us_daily | USA total test daily trend. Sourced from covi... | date, states, positive, negative, posneg, pend... | |
12 | covid-19 | covid_testing_us_total | USA total tests. Sourced from covidtracking.c... | positive, negative, posneg, hospitalized, deat... | |
13 | covid-19 | covidcast_data | CMU Delphi's COVID-19 Surveillance Data | data_source, signal, geo_type, time_value, geo... | |
14 | covid-19 | covidcast_metadata | CMU Delphi's COVID-19 Surveillance Metadata | data_source, signal, time_type, geo_type, min_... | |
15 | covid-19 | enigma_jhu | Johns Hopkins University Consolidated data on ... | fips, admin2, province_state, country_region, ... | |
16 | covid-19 | enigma_jhu_timeseries | Johns Hopkins University data on COVID-19 case... | uid, fips, iso2, iso3, code3, admin2, latitude... | |
17 | covid-19 | hospital_beds | Data on hospital beds and their utilization in... | objectid, hospital_name, hospital_type, hq_add... | |
18 | covid-19 | nytimes_counties | Data on COVID-19 cases from NY Times at US cou... | date, county, state, fips, cases, deaths | |
19 | covid-19 | nytimes_states | Data on COVID-19 cases from NY Times at US sta... | date, state, fips, cases, deaths | |
20 | covid-19 | prediction_models_county_predictions | County-level Predictions Data. Sourced from Yu... | countyfips, countyname, statename, severity_co... | |
21 | covid-19 | prediction_models_severity_index | Severity Index models. Sourced from Yu Group a... | severity_1-day, severity_2-day, severity_3-day... | |
22 | covid-19 | tableau_covid_datahub | COVID-19 data that has been gathered and unifi... | country_short_name, country_alpha_3_code, coun... | |
23 | covid-19 | tableau_jhu | Johns Hopkins University data on COVID-19 case... | case_type, cases, difference, date, country_re... | |
24 | covid-19 | us_state_abbreviations | Lookup table for US state abbreviations | state, abbreviation | |
25 | covid-19 | world_cases_deaths_testing | Data on confirmed cases, deaths, and testing. ... | iso_code, location, date, total_cases, new_cas... |
Create data source of QuickSight Note: data source stores the connection information.
[5]:
wr.quicksight.create_athena_data_source(
name="covid-19",
workgroup="primary",
allowed_to_manage=["dev"]
)
[6]:
wr.catalog.tables(database="covid-19", name_contains="nyt")
[6]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | covid-19 | nytimes_counties | Data on COVID-19 cases from NY Times at US cou... | date, county, state, fips, cases, deaths | |
1 | covid-19 | nytimes_states | Data on COVID-19 cases from NY Times at US sta... | date, state, fips, cases, deaths |
[7]:
wr.athena.read_sql_query("SELECT * FROM nytimes_counties limit 10", database="covid-19", ctas_approach=False)
[7]:
date | county | state | fips | cases | deaths | |
---|---|---|---|---|---|---|
0 | 2020-01-21 | Snohomish | Washington | 53061 | 1 | 0 |
1 | 2020-01-22 | Snohomish | Washington | 53061 | 1 | 0 |
2 | 2020-01-23 | Snohomish | Washington | 53061 | 1 | 0 |
3 | 2020-01-24 | Cook | Illinois | 17031 | 1 | 0 |
4 | 2020-01-24 | Snohomish | Washington | 53061 | 1 | 0 |
5 | 2020-01-25 | Orange | California | 06059 | 1 | 0 |
6 | 2020-01-25 | Cook | Illinois | 17031 | 1 | 0 |
7 | 2020-01-25 | Snohomish | Washington | 53061 | 1 | 0 |
8 | 2020-01-26 | Maricopa | Arizona | 04013 | 1 | 0 |
9 | 2020-01-26 | Los Angeles | California | 06037 | 1 | 0 |
[8]:
sql = """
SELECT
j.*,
co.Population,
co.county AS county2,
hb.*
FROM
(
SELECT
date,
county,
state,
fips,
cases as confirmed,
deaths
FROM "covid-19".nytimes_counties
) j
LEFT OUTER JOIN (
SELECT
DISTINCT county,
state,
"population estimate 2018" AS Population
FROM
"covid-19".county_populations
WHERE
state IN (
SELECT
DISTINCT state
FROM
"covid-19".nytimes_counties
)
AND county IN (
SELECT
DISTINCT county as county
FROM "covid-19".nytimes_counties
)
) co ON co.county = j.county
AND co.state = j.state
LEFT OUTER JOIN (
SELECT
count(objectid) as Hospital,
fips as hospital_fips,
sum(num_licensed_beds) as licensed_beds,
sum(num_staffed_beds) as staffed_beds,
sum(num_icu_beds) as icu_beds,
avg(bed_utilization) as bed_utilization,
sum(
potential_increase_in_bed_capac
) as potential_increase_bed_capacity
FROM "covid-19".hospital_beds
WHERE
fips in (
SELECT
DISTINCT fips
FROM
"covid-19".nytimes_counties
)
GROUP BY
2
) hb ON hb.hospital_fips = j.fips
"""
wr.athena.read_sql_query(sql, database="covid-19", ctas_approach=False)
[8]:
date | county | state | fips | confirmed | deaths | population | county2 | Hospital | hospital_fips | licensed_beds | staffed_beds | icu_beds | bed_utilization | potential_increase_bed_capacity | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2020-04-12 | Park | Montana | 30067 | 7 | 0 | 16736 | Park | 0 | 30067 | 25 | 25 | 4 | 0.432548 | 0 |
1 | 2020-04-12 | Ravalli | Montana | 30081 | 3 | 0 | 43172 | Ravalli | 0 | 30081 | 25 | 25 | 5 | 0.567781 | 0 |
2 | 2020-04-12 | Silver Bow | Montana | 30093 | 11 | 0 | 34993 | Silver Bow | 0 | 30093 | 98 | 71 | 11 | 0.551457 | 27 |
3 | 2020-04-12 | Clay | Nebraska | 31035 | 2 | 0 | 6214 | Clay | <NA> | <NA> | <NA> | <NA> | <NA> | NaN | <NA> |
4 | 2020-04-12 | Cuming | Nebraska | 31039 | 2 | 0 | 8940 | Cuming | 0 | 31039 | 25 | 25 | 4 | 0.204493 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
227684 | 2020-06-11 | Hockley | Texas | 48219 | 28 | 1 | 22980 | Hockley | 0 | 48219 | 48 | 48 | 8 | 0.120605 | 0 |
227685 | 2020-06-11 | Hudspeth | Texas | 48229 | 11 | 0 | 4795 | Hudspeth | <NA> | <NA> | <NA> | <NA> | <NA> | NaN | <NA> |
227686 | 2020-06-11 | Jones | Texas | 48253 | 633 | 0 | 19817 | Jones | 0 | 48253 | 45 | 7 | 1 | 0.718591 | 38 |
227687 | 2020-06-11 | La Salle | Texas | 48283 | 4 | 0 | 7531 | La Salle | <NA> | <NA> | <NA> | <NA> | <NA> | NaN | <NA> |
227688 | 2020-06-11 | Limestone | Texas | 48293 | 36 | 1 | 23519 | Limestone | 0 | 48293 | 78 | 69 | 9 | 0.163940 | 9 |
227689 rows × 15 columns
Create Dataset with custom SQL option
[9]:
wr.quicksight.create_athena_dataset(
name="covid19-nytimes-usa",
sql=sql,
sql_name='CustomSQL',
data_source_name="covid-19",
import_mode='SPICE',
allowed_to_manage=["dev"]
)
[10]:
ingestion_id = wr.quicksight.create_ingestion("covid19-nytimes-usa")
Wait ingestion
[11]:
while wr.quicksight.describe_ingestion(ingestion_id=ingestion_id, dataset_name="covid19-nytimes-usa")["IngestionStatus"] not in ["COMPLETED", "FAILED"]:
sleep(1)
Describe last ingestion
[12]:
wr.quicksight.describe_ingestion(ingestion_id=ingestion_id, dataset_name="covid19-nytimes-usa")["RowInfo"]
[12]:
{'RowsIngested': 227689, 'RowsDropped': 0}
List all ingestions
[13]:
[{"time": user["CreatedTime"], "source": user["RequestSource"]} for user in wr.quicksight.list_ingestions("covid19-nytimes-usa")]
[13]:
[{'time': datetime.datetime(2020, 6, 12, 15, 13, 46, 996000, tzinfo=tzlocal()),
'source': 'MANUAL'},
{'time': datetime.datetime(2020, 6, 12, 15, 13, 42, 344000, tzinfo=tzlocal()),
'source': 'MANUAL'}]
Create new dataset from a table directly
[14]:
wr.quicksight.create_athena_dataset(
name="covid-19-tableau_jhu",
table="tableau_jhu",
data_source_name="covid-19",
database="covid-19",
import_mode='DIRECT_QUERY',
rename_columns={
"cases": "Count_of_Cases",
"combined_key": "County"
},
cast_columns_types={
"Count_of_Cases": "INTEGER"
},
allowed_to_manage=["dev"]
)
Cleaning up
[15]:
wr.quicksight.delete_data_source("covid-19")
wr.quicksight.delete_dataset("covid19-nytimes-usa")
wr.quicksight.delete_dataset("covid-19-tableau_jhu")
19 - Amazon Athena Cache¶
Wrangler has a cache strategy that is disabled by default and can be enabled passing max_cache_seconds
biggier than 0. This cache strategy for Amazon Athena can help you to decrease query times and costs.
When calling read_sql_query
, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within max_cache_seconds
(a new parameter to read_sql_query
), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.
The detailed approach is: - When read_sql_query
is called with max_cache_seconds > 0
(it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination). - By default it will check the last 50 queries, but you can customize it throught the max_cache_query_inspections
argument. - We then sort those queries based on CompletionDateTime, descending - For each of those queries, we check if their CompletionDateTime is still within the
max_cache_seconds
window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both ctas_approach
es). If they are the same, we check if the last one’s results are still on S3, and then return them instead of re-running the query. - During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual read_sql_query
path.
P.S. The ``cache scope is bounded for the current workgroup``, so you will be able to reuse queries results from others colleagues running in the same environment.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
···········································
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29240014 | UZM00038457 | 1899-12-31 | PRCP | 16 | NaN | NaN | r | NaN |
29240015 | UZM00038457 | 1899-12-31 | TAVG | -73 | NaN | NaN | r | NaN |
29240016 | UZM00038618 | 1899-12-31 | TMIN | -76 | NaN | NaN | r | NaN |
29240017 | UZM00038618 | 1899-12-31 | PRCP | 0 | NaN | NaN | r | NaN |
29240018 | UZM00038618 | 1899-12-31 | TAVG | -60 | NaN | NaN | r | NaN |
29240019 rows × 8 columns
[5]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
);
[6]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[6]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | string | False | |
1 | dt | timestamp | False | |
2 | element | string | False | |
3 | value | bigint | False | |
4 | m_flag | string | False | |
5 | q_flag | string | False | |
6 | s_flag | string | False | |
7 | obs_time | string | False |
The test query¶
The more computational resources the query needs, the more the cache will help you. That’s why we’re doing it using this long running query.
[7]:
query = """
SELECT
n1.element,
count(1) as cnt
FROM
noaa n1
JOIN
noaa n2
ON
n1.id = n2.id
GROUP BY
n1.element
"""
First execution…¶
[8]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test")
CPU times: user 5.31 s, sys: 232 ms, total: 5.54 s
Wall time: 6min 42s
[8]:
element | cnt | |
---|---|---|
0 | WDMV | 49755046 |
1 | SNWD | 5089486328 |
2 | DATN | 10817510 |
3 | DAPR | 102579666 |
4 | MDTN | 10817510 |
5 | WT03 | 71184687 |
6 | WT09 | 584412 |
7 | TOBS | 146984266 |
8 | DASF | 7764526 |
9 | WT04 | 9648963 |
10 | WT18 | 92635444 |
11 | WT01 | 87526136 |
12 | WT16 | 323354156 |
13 | PRCP | 71238907298 |
14 | SNOW | 21950890838 |
15 | WT06 | 307339 |
16 | TAVG | 2340863803 |
17 | TMIN | 41450979633 |
18 | MDTX | 11210687 |
19 | WT07 | 4486872 |
20 | WT10 | 137873 |
21 | EVAP | 970404 |
22 | WT14 | 8073701 |
23 | DATX | 11210687 |
24 | WT08 | 33933005 |
25 | WT05 | 8211491 |
26 | TMAX | 39876132467 |
27 | MDPR | 114320989 |
28 | WT11 | 22212890 |
29 | DWPR | 69005655 |
30 | MDSF | 12004843 |
Second execution with CACHE (400x faster)¶
[9]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test", max_cache_seconds=900)
CPU times: user 493 ms, sys: 34.9 ms, total: 528 ms
Wall time: 975 ms
[9]:
element | cnt | |
---|---|---|
0 | WDMV | 49755046 |
1 | SNWD | 5089486328 |
2 | DATN | 10817510 |
3 | DAPR | 102579666 |
4 | MDTN | 10817510 |
5 | WT03 | 71184687 |
6 | WT09 | 584412 |
7 | TOBS | 146984266 |
8 | DASF | 7764526 |
9 | WT04 | 9648963 |
10 | WT18 | 92635444 |
11 | WT01 | 87526136 |
12 | WT16 | 323354156 |
13 | PRCP | 71238907298 |
14 | SNOW | 21950890838 |
15 | WT06 | 307339 |
16 | TAVG | 2340863803 |
17 | TMIN | 41450979633 |
18 | MDTX | 11210687 |
19 | WT07 | 4486872 |
20 | WT10 | 137873 |
21 | EVAP | 970404 |
22 | WT14 | 8073701 |
23 | DATX | 11210687 |
24 | WT08 | 33933005 |
25 | WT05 | 8211491 |
26 | TMAX | 39876132467 |
27 | MDPR | 114320989 |
28 | WT11 | 22212890 |
29 | DWPR | 69005655 |
30 | MDSF | 12004843 |
Allowing Wrangler to inspect up to 500 historical queries to find same result to reuse.¶
[10]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test", max_cache_seconds=900, max_cache_query_inspections=500)
CPU times: user 504 ms, sys: 44 ms, total: 548 ms
Wall time: 1.19 s
[10]:
element | cnt | |
---|---|---|
0 | WDMV | 49755046 |
1 | SNWD | 5089486328 |
2 | DATN | 10817510 |
3 | DAPR | 102579666 |
4 | MDTN | 10817510 |
5 | WT03 | 71184687 |
6 | WT09 | 584412 |
7 | TOBS | 146984266 |
8 | DASF | 7764526 |
9 | WT04 | 9648963 |
10 | WT18 | 92635444 |
11 | WT01 | 87526136 |
12 | WT16 | 323354156 |
13 | PRCP | 71238907298 |
14 | SNOW | 21950890838 |
15 | WT06 | 307339 |
16 | TAVG | 2340863803 |
17 | TMIN | 41450979633 |
18 | MDTX | 11210687 |
19 | WT07 | 4486872 |
20 | WT10 | 137873 |
21 | EVAP | 970404 |
22 | WT14 | 8073701 |
23 | DATX | 11210687 |
24 | WT08 | 33933005 |
25 | WT05 | 8211491 |
26 | TMAX | 39876132467 |
27 | MDPR | 114320989 |
28 | WT11 | 22212890 |
29 | DWPR | 69005655 |
30 | MDSF | 12004843 |
Cleaning Up S3¶
[11]:
wr.s3.delete_objects(path)
Delete table¶
[12]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
[12]:
True
Delete Database¶
[13]:
wr.catalog.delete_database('awswrangler_test')
20 - Spark Table Interoperability¶
Wrangler has no difficults to insert, overwrite or do any other kind of interaction with a Table created by Apache Spark.
But if you want to do the oposite (Spark interacting with a table created by Wrangler) you should be aware that Wrangler follows the Hive’s format and you must be explicit when using the Spark’s saveAsTable
method:
[ ]:
spark_df.write.format("hive").saveAsTable("database.table")
Or just move forward using the insertInto
alternative:
[ ]:
spark_df.write.insertInto("database.table")
21 - Global Configurations¶
Wrangler has two ways to set global configurations that will override the regular default arguments configured in functions signatures.
Environment variables
wr.config
P.S. Check thefunction API docto see if your function has some argument that can be configured through Global configurations.
P.P.S. One exception to the above mentioned rules is the ``botocore_config`` property. It cannot be set through environment variables but only via ``wr.config``. It will be used as the ``botocore.config.Config`` for all underlying ``boto3`` calls. The default config is ``botocore.config.Config(retries={“max_attempts”: 5}, connect_timeout=10, max_pool_connections=10)``. If you only want to change the retry behavior, you can use the environment variables ``AWS_MAX_ATTEMPTS`` and ``AWS_RETRY_MODE``. (seeBoto3 documentation)
Environment Variables¶
[1]:
%env WR_DATABASE=default
%env WR_CTAS_APPROACH=False
%env WR_MAX_CACHE_SECONDS=900
%env WR_MAX_CACHE_QUERY_INSPECTIONS=500
%env WR_MAX_REMOTE_CACHE_ENTRIES=50
%env WR_MAX_LOCAL_CACHE_ENTRIES=100
env: WR_DATABASE=default
env: WR_CTAS_APPROACH=False
env: WR_MAX_CACHE_SECONDS=900
env: WR_MAX_CACHE_QUERY_INSPECTIONS=500
env: WR_MAX_REMOTE_CACHE_ENTRIES=50
env: WR_MAX_LOCAL_CACHE_ENTRIES=100
[2]:
import awswrangler as wr
import botocore
[3]:
wr.athena.read_sql_query("SELECT 1 AS FOO")
[3]:
foo | |
---|---|
0 | 1 |
Resetting¶
[4]:
# Specific
wr.config.reset("database")
# All
wr.config.reset()
wr.config¶
[5]:
wr.config.database = "default"
wr.config.ctas_approach = False
wr.config.max_cache_seconds = 900
wr.config.max_cache_query_inspections = 500
wr.config.max_remote_cache_entries = 50
wr.config.max_local_cache_entries = 100
# Set botocore.config.Config that will be used for all boto3 calls
wr.config.botocore_config = botocore.config.Config(
retries={"max_attempts": 10},
connect_timeout=20,
max_pool_connections=20
)
[6]:
wr.athena.read_sql_query("SELECT 1 AS FOO")
[6]:
foo | |
---|---|
0 | 1 |
Visualizing¶
[7]:
wr.config
[7]:
name | Env. Variable | type | nullable | enforced | configured | value | |
---|---|---|---|---|---|---|---|
0 | catalog_id | WR_CATALOG_ID | <class 'str'> | True | False | False | None |
1 | concurrent_partitioning | WR_CONCURRENT_PARTITIONING | <class 'bool'> | False | False | False | None |
2 | ctas_approach | WR_CTAS_APPROACH | <class 'bool'> | False | False | True | False |
3 | database | WR_DATABASE | <class 'str'> | True | False | True | default |
4 | max_cache_query_inspections | WR_MAX_CACHE_QUERY_INSPECTIONS | <class 'int'> | False | False | True | 500 |
5 | max_cache_seconds | WR_MAX_CACHE_SECONDS | <class 'int'> | False | False | True | 900 |
6 | max_remote_cache_entries | WR_MAX_REMOTE_CACHE_ENTRIES | <class 'int'> | False | False | True | 50 |
7 | max_local_cache_entries | WR_MAX_LOCAL_CACHE_ENTRIES | <class 'int'> | False | False | True | 100 |
8 | s3_block_size | WR_S3_BLOCK_SIZE | <class 'int'> | False | True | False | None |
9 | workgroup | WR_WORKGROUP | <class 'str'> | False | True | False | None |
10 | s3_endpoint_url | WR_S3_ENDPOINT_URL | <class 'str'> | True | True | True | None |
11 | athena_endpoint_url | WR_ATHENA_ENDPOINT_URL | <class 'str'> | True | True | True | None |
12 | sts_endpoint_url | WR_STS_ENDPOINT_URL | <class 'str'> | True | True | True | None |
13 | glue_endpoint_url | WR_GLUE_ENDPOINT_URL | <class 'str'> | True | True | True | None |
14 | redshift_endpoint_url | WR_REDSHIFT_ENDPOINT_URL | <class 'str'> | True | True | True | None |
15 | kms_endpoint_url | WR_KMS_ENDPOINT_URL | <class 'str'> | True | True | True | None |
16 | emr_endpoint_url | WR_EMR_ENDPOINT_URL | <class 'str'> | True | True | True | None |
17 | botocore_config | WR_BOTOCORE_CONFIG | <class 'botocore.config.Config'> | True | False | True | <botocore.config.Config object at 0x000002D3A362E760> |
22 - Writing Partitions Concurrently¶
concurrent_partitioning
argument:If True will increase the parallelism level during the partitions writing. It will decrease the writing time and increase the memory usage.
P.S. Check thefunction API docto see it has some argument that can be configured through Global configurations.
[1]:
%reload_ext memory_profiler
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
············
Reading 4 GB of CSV from NOAA’s historical data and creating a year column¶
[3]:
noaa_path = "s3://noaa-ghcn-pds/csv/193"
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
dates = ["dt", "obs_time"]
dtype = {x: "category" for x in ["element", "m_flag", "q_flag", "s_flag"]}
df = wr.s3.read_csv(noaa_path, names=cols, parse_dates=dates, dtype=dtype)
df["year"] = df["dt"].dt.year
print(f"Number of rows: {len(df.index)}")
print(f"Number of columns: {len(df.columns)}")
Number of rows: 125407761
Number of columns: 9
Default Writing¶
[4]:
%%time
%%memit
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["year"],
);
peak memory: 22169.04 MiB, increment: 11119.68 MiB
CPU times: user 49 s, sys: 12.5 s, total: 1min 1s
Wall time: 1min 11s
Concurrent Partitioning (Decreasing writing time, but increasing memory usage)¶
[5]:
%%time
%%memit
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["year"],
concurrent_partitioning=True # <-----
);
peak memory: 27819.48 MiB, increment: 15743.30 MiB
CPU times: user 52.3 s, sys: 13.6 s, total: 1min 5s
Wall time: 41.6 s
[ ]:
23 - Flexible Partitions Filter (PUSH-DOWN)¶
partition_filter
argument:- Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter). - This function MUST receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values. - This function MUST return a bool, True to read the partition or False to ignore it. - Ignored if `dataset=False`.
P.S. Check thefunction API docto see it has some argument that can be configured through Global configurations.
[1]:
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Creating the Dataset (PARQUET)¶
[3]:
df = pd.DataFrame({
"id": [1, 2, 3],
"value": ["foo", "boo", "bar"],
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["value"]
)
wr.s3.read_parquet(path, dataset=True)
[3]:
id | value | |
---|---|---|
0 | 3 | bar |
1 | 2 | boo |
2 | 1 | foo |
Example 1¶
[4]:
my_filter = lambda x: x["value"].endswith("oo")
wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[4]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
Example 2¶
[5]:
from Levenshtein import distance
def my_filter(partitions):
return distance("boo", partitions["value"]) <= 1
wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[5]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
Creating the Dataset (CSV)¶
[6]:
df = pd.DataFrame({
"id": [1, 2, 3],
"value": ["foo", "boo", "bar"],
})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["value"],
compression="gzip",
index=False
)
wr.s3.read_csv(path, dataset=True)
[6]:
id | value | |
---|---|---|
0 | 3 | bar |
1 | 2 | boo |
2 | 1 | foo |
Example 1¶
[7]:
my_filter = lambda x: x["value"].endswith("oo")
wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[7]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
Example 2¶
[8]:
from Levenshtein import distance
def my_filter(partitions):
return distance("boo", partitions["value"]) <= 1
wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[8]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
24 - Athena Query Metadata¶
For wr.athena.read_sql_query()
and wr.athena.read_sql_table()
the resulting DataFrame (or every DataFrame in the returned Iterator for chunked queries) have a query_metadata
attribute, which brings the query result metadata returned by Boto3/Athena.
The expected query_metadata
format is the same returned by:
Environment Variables¶
[1]:
%env WR_DATABASE=default
env: WR_DATABASE=default
[2]:
import awswrangler as wr
[5]:
df = wr.athena.read_sql_query("SELECT 1 AS foo")
df
[5]:
foo | |
---|---|
0 | 1 |
Getting statistics from query metadata¶
[6]:
print(f'DataScannedInBytes: {df.query_metadata["Statistics"]["DataScannedInBytes"]}')
print(f'TotalExecutionTimeInMillis: {df.query_metadata["Statistics"]["TotalExecutionTimeInMillis"]}')
print(f'QueryQueueTimeInMillis: {df.query_metadata["Statistics"]["QueryQueueTimeInMillis"]}')
print(f'QueryPlanningTimeInMillis: {df.query_metadata["Statistics"]["QueryPlanningTimeInMillis"]}')
print(f'ServiceProcessingTimeInMillis: {df.query_metadata["Statistics"]["ServiceProcessingTimeInMillis"]}')
DataScannedInBytes: 0
TotalExecutionTimeInMillis: 2311
QueryQueueTimeInMillis: 121
QueryPlanningTimeInMillis: 250
ServiceProcessingTimeInMillis: 37
25 - Redshift - Loading Parquet files with Spectrum¶
Enter your bucket name:¶
[1]:
import getpass
bucket = getpass.getpass()
PATH = f"s3://{bucket}/files/"
···········································
Mocking some Parquet Files on S3¶
[2]:
import awswrangler as wr
import pandas as pd
df = pd.DataFrame({
"col0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
"col1": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"],
})
df
[2]:
col0 | col1 | |
---|---|---|
0 | 0 | a |
1 | 1 | b |
2 | 2 | c |
3 | 3 | d |
4 | 4 | e |
5 | 5 | f |
6 | 6 | g |
7 | 7 | h |
8 | 8 | i |
9 | 9 | j |
[3]:
wr.s3.to_parquet(df, PATH, max_rows_by_file=2, dataset=True, mode="overwrite");
Crawling the metadata and adding into Glue Catalog¶
[4]:
wr.s3.store_parquet_metadata(
path=PATH,
database="aws_data_wrangler",
table="test",
dataset=True,
mode="overwrite"
)
[4]:
({'col0': 'bigint', 'col1': 'string'}, None, None)
Running the CTAS query to load the data into Redshift storage¶
[5]:
con = wr.redshift.connect(connection="aws-data-wrangler-redshift")
[6]:
query = "CREATE TABLE public.test AS (SELECT * FROM aws_data_wrangler_external.test)"
[7]:
with con.cursor() as cursor:
cursor.execute(query)
Running an INSERT INTO query to load MORE data into Redshift storage¶
[8]:
df = pd.DataFrame({
"col0": [10, 11],
"col1": ["k", "l"],
})
wr.s3.to_parquet(df, PATH, dataset=True, mode="overwrite");
[9]:
query = "INSERT INTO public.test (SELECT * FROM aws_data_wrangler_external.test)"
[10]:
with con.cursor() as cursor:
cursor.execute(query)
Checking the result¶
[11]:
query = "SELECT * FROM public.test"
[13]:
wr.redshift.read_sql_table(con=con, schema="public", table="test")
[13]:
col0 | col1 | |
---|---|---|
0 | 5 | f |
1 | 1 | b |
2 | 3 | d |
3 | 6 | g |
4 | 8 | i |
5 | 10 | k |
6 | 4 | e |
7 | 0 | a |
8 | 2 | c |
9 | 7 | h |
10 | 9 | j |
11 | 11 | l |
[14]:
con.close()
26 - Amazon Timestream¶
Creating resources¶
[10]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1);
Write¶
[11]:
df = pd.DataFrame(
{
"time": [datetime.now(), datetime.now(), datetime.now()],
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.2],
}
)
rejected_records = wr.timestream.write(
df=df,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="measure",
dimensions_cols=["dim0", "dim1"],
)
print(f"Number of rejected records: {len(rejected_records)}")
Number of rejected records: 0
Query¶
[12]:
wr.timestream.query(
'SELECT time, measure_value::double, dim0, dim1 FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3'
)
[12]:
time | measure_value::double | dim0 | dim1 | |
---|---|---|---|---|
0 | 2020-12-08 19:15:32.468 | 1.0 | foo | 1 |
1 | 2020-12-08 19:15:32.468 | 1.2 | bar | 3 |
2 | 2020-12-08 19:15:32.468 | 1.1 | boo | 2 |
Deleting resources¶
[13]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")
27 - Amazon Timestream - Example 2¶
Reading test data¶
[1]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
df = pd.read_csv(
"https://raw.githubusercontent.com/awslabs/amazon-timestream-tools/master/sample_apps/data/sample.csv",
names=[
"ignore0",
"region",
"ignore1",
"az",
"ignore2",
"hostname",
"measure_kind",
"measure",
"ignore3",
"ignore4",
"ignore5",
],
usecols=["region", "az", "hostname", "measure_kind", "measure"],
)
df["time"] = datetime.now()
df.reset_index(inplace=True, drop=False)
df
[1]:
index | region | az | hostname | measure_kind | measure | time | |
---|---|---|---|---|---|---|---|
0 | 0 | us-east-1 | us-east-1a | host-fj2hx | cpu_utilization | 21.394363 | 2020-12-08 16:18:47.599597 |
1 | 1 | us-east-1 | us-east-1a | host-fj2hx | memory_utilization | 68.563420 | 2020-12-08 16:18:47.599597 |
2 | 2 | us-east-1 | us-east-1a | host-6kMPE | cpu_utilization | 17.144579 | 2020-12-08 16:18:47.599597 |
3 | 3 | us-east-1 | us-east-1a | host-6kMPE | memory_utilization | 73.507870 | 2020-12-08 16:18:47.599597 |
4 | 4 | us-east-1 | us-east-1a | host-sxj7X | cpu_utilization | 26.584865 | 2020-12-08 16:18:47.599597 |
... | ... | ... | ... | ... | ... | ... | ... |
125995 | 125995 | eu-north-1 | eu-north-1c | host-De8RB | memory_utilization | 68.063468 | 2020-12-08 16:18:47.599597 |
125996 | 125996 | eu-north-1 | eu-north-1c | host-2z8tn | memory_utilization | 72.203680 | 2020-12-08 16:18:47.599597 |
125997 | 125997 | eu-north-1 | eu-north-1c | host-2z8tn | cpu_utilization | 29.212219 | 2020-12-08 16:18:47.599597 |
125998 | 125998 | eu-north-1 | eu-north-1c | host-9FczW | memory_utilization | 71.746134 | 2020-12-08 16:18:47.599597 |
125999 | 125999 | eu-north-1 | eu-north-1c | host-9FczW | cpu_utilization | 1.677793 | 2020-12-08 16:18:47.599597 |
126000 rows × 7 columns
Creating resources¶
[2]:
wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1);
Write CPU_UTILIZATION records¶
[3]:
df_cpu = df[df.measure_kind == "cpu_utilization"].copy()
df_cpu.rename(columns={"measure": "cpu_utilization"}, inplace=True)
df_cpu
[3]:
index | region | az | hostname | measure_kind | cpu_utilization | time | |
---|---|---|---|---|---|---|---|
0 | 0 | us-east-1 | us-east-1a | host-fj2hx | cpu_utilization | 21.394363 | 2020-12-08 16:18:47.599597 |
2 | 2 | us-east-1 | us-east-1a | host-6kMPE | cpu_utilization | 17.144579 | 2020-12-08 16:18:47.599597 |
4 | 4 | us-east-1 | us-east-1a | host-sxj7X | cpu_utilization | 26.584865 | 2020-12-08 16:18:47.599597 |
6 | 6 | us-east-1 | us-east-1a | host-ExOui | cpu_utilization | 52.930970 | 2020-12-08 16:18:47.599597 |
8 | 8 | us-east-1 | us-east-1a | host-Bwb3j | cpu_utilization | 99.134110 | 2020-12-08 16:18:47.599597 |
... | ... | ... | ... | ... | ... | ... | ... |
125990 | 125990 | eu-north-1 | eu-north-1c | host-aPtc6 | cpu_utilization | 89.566125 | 2020-12-08 16:18:47.599597 |
125992 | 125992 | eu-north-1 | eu-north-1c | host-7ZF9L | cpu_utilization | 75.510598 | 2020-12-08 16:18:47.599597 |
125994 | 125994 | eu-north-1 | eu-north-1c | host-De8RB | cpu_utilization | 2.771261 | 2020-12-08 16:18:47.599597 |
125997 | 125997 | eu-north-1 | eu-north-1c | host-2z8tn | cpu_utilization | 29.212219 | 2020-12-08 16:18:47.599597 |
125999 | 125999 | eu-north-1 | eu-north-1c | host-9FczW | cpu_utilization | 1.677793 | 2020-12-08 16:18:47.599597 |
63000 rows × 7 columns
[4]:
rejected_records = wr.timestream.write(
df=df_cpu,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="cpu_utilization",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0
Write MEMORY_UTILIZATION records¶
[5]:
df_memory = df[df.measure_kind == "memory_utilization"].copy()
df_memory.rename(columns={"measure": "memory_utilization"}, inplace=True)
df_memory
[5]:
index | region | az | hostname | measure_kind | memory_utilization | time | |
---|---|---|---|---|---|---|---|
1 | 1 | us-east-1 | us-east-1a | host-fj2hx | memory_utilization | 68.563420 | 2020-12-08 16:18:47.599597 |
3 | 3 | us-east-1 | us-east-1a | host-6kMPE | memory_utilization | 73.507870 | 2020-12-08 16:18:47.599597 |
5 | 5 | us-east-1 | us-east-1a | host-sxj7X | memory_utilization | 22.401424 | 2020-12-08 16:18:47.599597 |
7 | 7 | us-east-1 | us-east-1a | host-ExOui | memory_utilization | 45.440135 | 2020-12-08 16:18:47.599597 |
9 | 9 | us-east-1 | us-east-1a | host-Bwb3j | memory_utilization | 15.042701 | 2020-12-08 16:18:47.599597 |
... | ... | ... | ... | ... | ... | ... | ... |
125991 | 125991 | eu-north-1 | eu-north-1c | host-aPtc6 | memory_utilization | 75.686739 | 2020-12-08 16:18:47.599597 |
125993 | 125993 | eu-north-1 | eu-north-1c | host-7ZF9L | memory_utilization | 18.386152 | 2020-12-08 16:18:47.599597 |
125995 | 125995 | eu-north-1 | eu-north-1c | host-De8RB | memory_utilization | 68.063468 | 2020-12-08 16:18:47.599597 |
125996 | 125996 | eu-north-1 | eu-north-1c | host-2z8tn | memory_utilization | 72.203680 | 2020-12-08 16:18:47.599597 |
125998 | 125998 | eu-north-1 | eu-north-1c | host-9FczW | memory_utilization | 71.746134 | 2020-12-08 16:18:47.599597 |
63000 rows × 7 columns
[6]:
rejected_records = wr.timestream.write(
df=df_memory,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="memory_utilization",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0
Querying CPU_UTILIZATION¶
[7]:
wr.timestream.query("""
SELECT
hostname, region, az, measure_name, measure_value::double, time
FROM "sampleDB"."sampleTable"
WHERE measure_name = 'cpu_utilization'
ORDER BY time DESC
LIMIT 10
""")
[7]:
hostname | region | az | measure_name | measure_value::double | time | |
---|---|---|---|---|---|---|
0 | host-OgvFx | us-west-1 | us-west-1a | cpu_utilization | 39.617911 | 2020-12-08 19:18:47.600 |
1 | host-rZUNx | eu-north-1 | eu-north-1a | cpu_utilization | 30.793332 | 2020-12-08 19:18:47.600 |
2 | host-t1kAB | us-east-2 | us-east-2b | cpu_utilization | 74.453239 | 2020-12-08 19:18:47.600 |
3 | host-RdQRf | us-east-1 | us-east-1c | cpu_utilization | 76.984448 | 2020-12-08 19:18:47.600 |
4 | host-4Llhu | us-east-1 | us-east-1c | cpu_utilization | 41.862733 | 2020-12-08 19:18:47.600 |
5 | host-2plqa | us-west-1 | us-west-1a | cpu_utilization | 34.864762 | 2020-12-08 19:18:47.600 |
6 | host-J3Q4z | us-east-1 | us-east-1b | cpu_utilization | 71.574266 | 2020-12-08 19:18:47.600 |
7 | host-VIR5T | ap-east-1 | ap-east-1a | cpu_utilization | 14.017491 | 2020-12-08 19:18:47.600 |
8 | host-G042D | us-east-1 | us-east-1c | cpu_utilization | 60.199068 | 2020-12-08 19:18:47.600 |
9 | host-8EBHm | us-west-2 | us-west-2c | cpu_utilization | 96.631624 | 2020-12-08 19:18:47.600 |
Querying MEMORY_UTILIZATION¶
[8]:
wr.timestream.query("""
SELECT
hostname, region, az, measure_name, measure_value::double, time
FROM "sampleDB"."sampleTable"
WHERE measure_name = 'memory_utilization'
ORDER BY time DESC
LIMIT 10
""")
[8]:
hostname | region | az | measure_name | measure_value::double | time | |
---|---|---|---|---|---|---|
0 | host-7c897 | us-west-2 | us-west-2b | memory_utilization | 63.427726 | 2020-12-08 19:18:47.600 |
1 | host-2z8tn | eu-north-1 | eu-north-1c | memory_utilization | 41.071368 | 2020-12-08 19:18:47.600 |
2 | host-J3Q4z | us-east-1 | us-east-1b | memory_utilization | 23.944388 | 2020-12-08 19:18:47.600 |
3 | host-mjrQb | us-east-1 | us-east-1b | memory_utilization | 69.173431 | 2020-12-08 19:18:47.600 |
4 | host-AyWSI | us-east-1 | us-east-1c | memory_utilization | 75.591467 | 2020-12-08 19:18:47.600 |
5 | host-Axf0g | us-west-2 | us-west-2a | memory_utilization | 29.720739 | 2020-12-08 19:18:47.600 |
6 | host-ilMBa | us-east-2 | us-east-2b | memory_utilization | 71.544134 | 2020-12-08 19:18:47.600 |
7 | host-CWdXX | us-west-2 | us-west-2c | memory_utilization | 79.792799 | 2020-12-08 19:18:47.600 |
8 | host-8EBHm | us-west-2 | us-west-2c | memory_utilization | 66.082554 | 2020-12-08 19:18:47.600 |
9 | host-dRIJj | us-east-1 | us-east-1c | memory_utilization | 86.748960 | 2020-12-08 19:18:47.600 |
Deleting resources¶
[9]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")
28 - Amazon DynamoDB¶
Writing Data¶
[1]:
import awswrangler as wr
import pandas as pd
from pathlib import Path
Writing DataFrame¶
[2]:
df = pd.DataFrame({
"key": [1, 2],
"value": ["foo", "boo"]
})
wr.dynamodb.put_df(df=df, table_name="table")
Writing CSV file¶
[3]:
filepath = Path("items.csv")
df.to_csv(filepath, index=False)
wr.dynamodb.put_csv(path=filepath, table_name="table")
filepath.unlink()
Writing JSON files¶
[4]:
filepath = Path("items.json")
df.to_json(filepath, orient="records")
wr.dynamodb.put_json(path="items.json", table_name="table")
filepath.unlink()
Writing list of items¶
[5]:
items = df.to_dict(orient="records")
wr.dynamodb.put_items(items=items, table_name="table")
Deleting items¶
[6]:
wr.dynamodb.delete_items(items=items, table_name="table")
API Reference¶
Amazon S3¶
|
Copy a list of S3 objects to another S3 directory. |
|
Delete Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Check if object exists on S3. |
|
Download file from from a received S3 path to local file. |
|
Get bucket region name. |
|
List Amazon S3 objects from a prefix. |
|
List Amazon S3 objects from a prefix. |
|
Merge a source dataset into a target dataset. |
|
Perform Upsert (Update else Insert) onto an existing Glue table. |
|
Read CSV file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read EXCEL file(s) from from a received S3 path. |
|
Read fixed-width formatted file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read JSON file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet file(s) metadata from from a received S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet table registered on AWS Glue Catalog. |
|
Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Infer and store parquet metadata on AWS Glue Catalog. |
|
Write CSV file or dataset on Amazon S3. |
|
Write EXCEL file on Amazon S3. |
|
Write JSON file on Amazon S3. |
|
Write Parquet file or dataset on Amazon S3. |
|
Upload file from a local file to received S3 path. |
|
Wait Amazon S3 objects exist. |
|
Wait Amazon S3 objects not exist. |
AWS Glue Catalog¶
|
Add a column in a AWS Glue Catalog table. |
|
Add partitions (metadata) to a CSV Table in the AWS Glue Catalog. |
|
Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog. |
|
Create a CSV Table (Metadata Only) in the AWS Glue Catalog. |
|
Create a database in AWS Glue Catalog. |
|
Create a Parquet Table (Metadata Only) in the AWS Glue Catalog. |
|
Get a Pandas DataFrame with all listed databases. |
|
Delete a column in a AWS Glue Catalog table. |
|
Create a database in AWS Glue Catalog. |
|
Delete specified partitions in a AWS Glue Catalog table. |
|
Delete all partitions in a AWS Glue Catalog table. |
|
Delete Glue table if exists. |
|
Check if the table exists. |
Drop all repeated columns (duplicated names). |
|
|
Extract columns and partitions types (Amazon Athena) from Pandas DataFrame. |
|
Get all columns comments. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get an iterator of databases. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get table description. |
|
Get table’s location on Glue catalog. |
|
Get tatal number of versions. |
|
Get all parameters. |
|
Get all columns and types from a table. |
|
Get all versions. |
|
Get an iterator of tables. |
|
Overwrite all existing parameters. |
|
Convert the column name to be compatible with Amazon Athena. |
Normalize all columns names to be compatible with Amazon Athena. |
|
|
Convert the table name to be compatible with Amazon Athena. |
|
Get Pandas DataFrame of tables filtered by a search string. |
|
Get table details as Pandas DataFrame. |
|
Get a DataFrame with tables filtered by a search term, prefix, suffix. |
|
Insert or Update the received parameters. |
Amazon Athena¶
|
Create the default Athena bucket if it doesn’t exist. |
|
Get the data type of all columns queried. |
|
Fetch query execution details. |
|
Return information about the workgroup with the specified name. |
|
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame. |
|
Extract the full table AWS Athena and return the results as a Pandas DataFrame. |
|
Run the Hive’s metastore consistency check: ‘MSCK REPAIR TABLE table;’. |
|
Start a SQL Query against AWS Athena. |
|
Stop a query execution. |
|
Wait for the query end. |
Amazon Redshift¶
|
Return a redshift_connector connection from a Glue Catalog or Secret Manager. |
|
Return a redshift_connector temporary connection (No password required). |
|
Load Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage. |
|
Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command). |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into Redshift. |
|
Load Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage. |
|
Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command). |
PostgreSQL¶
|
Return a pg8000 connection from a Glue Catalog Connection. |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into PostgreSQL. |
MySQL¶
|
Return a pymysql connection from a Glue Catalog Connection. |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into MySQL. |
Microsoft SQL Server¶
|
Return a pyodbc connection from a Glue Catalog Connection. |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into Microsoft SQL Server. |
DynamoDB¶
|
Delete all items in the specified DynamoDB table. |
|
Get DynamoDB table object for specified table name. |
|
Write all items from a CSV file to a DynamoDB. |
|
Write all items from a DataFrame to a DynamoDB. |
|
Insert all items to the specified DynamoDB table. |
|
Write all items from JSON file to a DynamoDB. |
Amazon Timestream¶
|
Create a new Timestream database. |
|
Create a new Timestream database. |
|
Delete a given Timestream database. |
|
Delete a given Timestream table. |
|
Run a query and retrieve the result as a Pandas DataFrame. |
|
Store a Pandas DataFrame into a Amazon Timestream table. |
Amazon EMR¶
|
Build the Step structure (dictionary). |
|
Build the Step structure (dictionary). |
|
Create a EMR cluster with instance fleets configuration. |
|
Get the EMR cluster state. |
|
Get EMR step state. |
|
Update internal ECR credentials. |
|
Submit Spark Step. |
|
Submit new job in the EMR Cluster. |
|
Submit a list of steps. |
|
Terminate EMR cluster. |
Amazon CloudWatch Logs¶
|
Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame. |
|
Run a query against AWS CloudWatchLogs Insights and wait the results. |
|
Run a query against AWS CloudWatchLogs Insights. |
|
Wait query ends. |
Amazon QuickSight¶
|
Cancel an ongoing ingestion of data into SPICE. |
|
Create a QuickSight data source pointing to an Athena/Workgroup. |
|
Create a QuickSight dataset. |
|
Create and starts a new SPICE ingestion on a dataset. |
|
Delete all dashboards. |
|
Delete all data sources. |
|
Delete all datasets. |
|
Delete all templates. |
|
Delete a dashboard. |
|
Delete a data source. |
|
Delete a dataset. |
|
Delete a tamplate. |
|
Describe a QuickSight dashboard by name or ID. |
|
Describe a QuickSight data source by name or ID. |
|
Describe a QuickSight data source permissions by name or ID. |
|
Describe a QuickSight dataset by name or ID. |
|
Describe a QuickSight ingestion by ID. |
|
Get QuickSight dashboard ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight dashboard IDs given a name. |
|
Get QuickSight data source ARN given a name and fails if there is more than 1 ARN associated with this name. |
|
Get QuickSight Data source ARNs given a name. |
|
Get QuickSight data source ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight data source IDs given a name. |
|
Get QuickSight Dataset ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight dataset IDs given a name. |
|
Get QuickSight template ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight template IDs given a name. |
|
List dashboards in an AWS account. |
|
List all QuickSight Data sources summaries. |
|
List all QuickSight datasets summaries. |
|
List all QuickSight Groups. |
|
List all QuickSight Group memberships. |
|
List IAM policy assignments in the current Amazon QuickSight account. |
|
List all the IAM policy assignments. |
|
List the history of SPICE ingestions for a dataset. |
|
List all QuickSight templates. |
|
Return a list of all of the Amazon QuickSight users belonging to this account. |
|
List the Amazon QuickSight groups that an Amazon QuickSight user is a member of. |
AWS STS¶
|
Get Account ID. |
|
Get current user/role ARN. |
|
Get current user/role name. |
AWS Secrets Manager¶
|
Get secret value. |
|
Get JSON secret value. |
Amazon Chime¶
|
Send message on an existing Chime Chat rooms. |