An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com
Quick Start¶
>>> pip install awswrangler
import awswrangler as wr
import pandas as pd
from datetime import datetime
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
# Storing data on Data Lake
wr.s3.to_parquet(
df=df,
path="s3://bucket/dataset/",
dataset=True,
database="my_db",
table="my_table"
)
# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)
# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")
# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()
# Amazon Timestream Write
df = pd.DataFrame({
"time": [datetime.now(), datetime.now()],
"my_dimension": ["foo", "boo"],
"measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="measure",
dimensions_cols=["my_dimension"],
)
# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")
Read The Docs¶
What is AWS Data Wrangler?¶
An AWS Professional Service open source python initiative that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services.
Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.
Check our tutorials or the list of functionalities.
Install¶
AWS Data Wrangler runs with Python 3.6
, 3.7
and 3.8
and on several platforms (AWS Lambda, AWS Glue Python Shell, EMR, EC2,
on-premises, Amazon SageMaker, local, etc).
- Some good practices for most of the methods bellow are:
Use new and individual Virtual Environments for each project (venv).
On Notebooks, always restart your kernel after installations.
Note
If you want to use awswrangler
for connecting to Microsoft SQL Server, some additional configuration is needed. Please have a look at the corresponding section below.
PyPI (pip)¶
>>> pip install awswrangler
Conda¶
>>> conda install -c conda-forge awswrangler
AWS Lambda Layer¶
1 - Go to GitHub’s release section and download the layer zip related to the desired version.
2 - Go to the AWS Lambda Panel, open the layer section (left side) and click create layer.
3 - Set name and python version, upload your fresh downloaded zip file and press create to create the layer.
4 - Go to your Lambda and select your new layer!
AWS Glue Python Shell Jobs¶
1 - Go to GitHub’s release page and download the wheel file (.whl) related to the desired version.
2 - Upload the wheel file to any Amazon S3 location.
3 - Go to your Glue Python Shell job and point to the wheel file on S3 in the Python library path field.
AWS Glue PySpark Jobs¶
Note
AWS Data Wrangler has compiled dependencies (C/C++) so there is only support for Glue PySpark Jobs >= 2.0
.
Go to your Glue PySpark job and create a new Job parameters key/value:
Key:
--additional-python-modules
Value:
awswrangler
To install a specific version, set the value for above Job parameter as follows:
Value:
awswrangler==2.3.0
Amazon SageMaker Notebook¶
Run this command in any Python 3 notebook paragraph and then make sure to restart the kernel before import the awswrangler package.
>>> !pip install awswrangler
Amazon SageMaker Notebook Lifecycle¶
Open SageMaker console, go to the lifecycle section and use the follow snippet to configure AWS Data Wrangler for all compatible SageMaker kernels (Reference).
#!/bin/bash
set -e
# OVERVIEW
# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which
# is a system environment reserved for Jupyter.
# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using
# "nohup" to run this as a background process in that case.
sudo -u ec2-user -i <<'EOF'
# PARAMETERS
PACKAGE=awswrangler
# Note that "base" is special environment name, include it there as well.
for env in base /home/ec2-user/anaconda3/envs/*; do
source /home/ec2-user/anaconda3/bin/activate $(basename "$env")
if [ $env = 'JupyterSystemEnv' ]; then
continue
fi
nohup pip install --upgrade "$PACKAGE" &
source /home/ec2-user/anaconda3/bin/deactivate
done
EOF
EMR Cluster¶
Even not being a distributed library, AWS Data Wrangler could be a good helper to complement Big Data pipelines.
Configure Python 3 as the default interpreter for PySpark under your cluster configuration
[ { "Classification": "spark-env", "Configurations": [ { "Classification": "export", "Properties": { "PYSPARK_PYTHON": "/usr/bin/python3" } } ] } ]
Keep the bootstrap script above on S3 and reference it on your cluster.
#!/usr/bin/env bash set -ex sudo pip-3.6 install awswrangler
Note
Make sure to freeze the Wrangler version in the bootstrap for productive environments (e.g. awswrangler==1.8.1)
From Source¶
>>> git clone https://github.com/awslabs/aws-data-wrangler.git
>>> cd aws-data-wrangler
>>> pip install .
Notes for Microsoft SQL Server¶
awswrangler
is using the pyodbc
for interacting with Microsoft SQL Server. For installing this package you need the ODBC header files,
which can be installed, for example, with the following commands:
>>> sudo apt install unixodbc-dev
>>> yum install unixODBC-devel
After installing these header files you can either just install pyodbc
or
awswrangler
with the sqlserver
extra, which will also install pyodbc
:
>>> pip install pyodbc
>>> pip install awswrangler[sqlserver]
Finally you also need the correct ODBC Driver for SQL Server. You can have a look at the documentation from Microsoft to see how they can be installed in your environment.
If you want to connect to Microsoft SQL Server from AWS Lambda, you can build a separate Layer including the needed OBDC drivers and pyobdc.
If you maintain your own environment, you need to take care of the above steps. Because of this limitation usage in combination with Glue jobs is limited and you need to rely on the provided functionality inside Glue itself.
API Reference¶
Amazon S3¶
|
Copy a list of S3 objects to another S3 directory. |
|
Delete Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Check if object exists on S3. |
|
Get bucket region name. |
|
List Amazon S3 objects from a prefix. |
|
List Amazon S3 objects from a prefix. |
|
Merge a source dataset into a target dataset. |
|
Read CSV file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read EXCEL file(s) from from a received S3 path. |
|
Read fixed-width formatted file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read JSON file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet file(s) metadata from from a received S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet table registered on AWS Glue Catalog. |
|
Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Infer and store parquet metadata on AWS Glue Catalog. |
|
Write CSV file or dataset on Amazon S3. |
|
Write EXCEL file on Amazon S3. |
|
Write JSON file on Amazon S3. |
|
Write Parquet file or dataset on Amazon S3. |
|
Wait Amazon S3 objects exist. |
|
Wait Amazon S3 objects not exist. |
AWS Glue Catalog¶
|
Add a column in a AWS Glue Catalog table. |
|
Add partitions (metadata) to a CSV Table in the AWS Glue Catalog. |
|
Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog. |
|
Create a CSV Table (Metadata Only) in the AWS Glue Catalog. |
|
Create a database in AWS Glue Catalog. |
|
Create a Parquet Table (Metadata Only) in the AWS Glue Catalog. |
|
Get a Pandas DataFrame with all listed databases. |
|
Delete a column in a AWS Glue Catalog table. |
|
Create a database in AWS Glue Catalog. |
|
Delete specified partitions in a AWS Glue Catalog table. |
|
Delete all partitions in a AWS Glue Catalog table. |
|
Delete Glue table if exists. |
|
Check if the table exists. |
Drop all repeated columns (duplicated names). |
|
|
Extract columns and partitions types (Amazon Athena) from Pandas DataFrame. |
|
Get all columns comments. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get an iterator of databases. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get table description. |
|
Get table’s location on Glue catalog. |
|
Get tatal number of versions. |
|
Get all parameters. |
|
Get all columns and types from a table. |
|
Get all versions. |
|
Get an iterator of tables. |
|
Overwrite all existing parameters. |
|
Convert the column name to be compatible with Amazon Athena. |
Normalize all columns names to be compatible with Amazon Athena. |
|
|
Convert the table name to be compatible with Amazon Athena. |
|
Get Pandas DataFrame of tables filtered by a search string. |
|
Get table details as Pandas DataFrame. |
|
Get a DataFrame with tables filtered by a search term, prefix, suffix. |
|
Insert or Update the received parameters. |
Amazon Athena¶
|
Create the default Athena bucket if it doesn’t exist. |
|
Get the data type of all columns queried. |
|
Fetch query execution details. |
|
Return information about the workgroup with the specified name. |
|
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame. |
|
Extract the full table AWS Athena and return the results as a Pandas DataFrame. |
|
Run the Hive’s metastore consistency check: ‘MSCK REPAIR TABLE table;’. |
|
Start a SQL Query against AWS Athena. |
|
Stop a query execution. |
|
Wait for the query end. |
Amazon Redshift¶
|
Return a redshift_connector connection from a Glue Catalog or Secret Manager. |
|
Return a redshift_connector temporary connection (No password required). |
|
Load Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage. |
|
Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command). |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into Redshift. |
|
Load Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage. |
|
Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command). |
PostgreSQL¶
|
Return a pg8000 connection from a Glue Catalog Connection. |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into PostgreSQL. |
MySQL¶
|
Return a pymysql connection from a Glue Catalog Connection. |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into MySQL. |
Microsoft SQL Server¶
|
|
|
|
|
|
|
DynamoDB¶
|
Delete all items in the specified DynamoDB table. |
|
Get DynamoDB table object for specified table name. |
|
Write all items from a CSV file to a DynamoDB. |
|
Write all items from a DataFrame to a DynamoDB. |
|
Insert all items to the specified DynamoDB table. |
|
Write all items from JSON file to a DynamoDB. |
Amazon Timestream¶
|
Create a new Timestream database. |
|
Create a new Timestream database. |
|
Delete a given Timestream database. |
|
Delete a given Timestream table. |
|
Run a query and retrieve the result as a Pandas DataFrame. |
|
Store a Pandas DataFrame into a Amazon Timestream table. |
Amazon EMR¶
|
Build the Step structure (dictionary). |
|
Build the Step structure (dictionary). |
|
Create a EMR cluster with instance fleets configuration. |
|
Get the EMR cluster state. |
|
Get EMR step state. |
|
Update internal ECR credentials. |
|
Submit Spark Step. |
|
Submit new job in the EMR Cluster. |
|
Submit a list of steps. |
|
Terminate EMR cluster. |
Amazon CloudWatch Logs¶
|
Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame. |
|
Run a query against AWS CloudWatchLogs Insights and wait the results. |
|
Run a query against AWS CloudWatchLogs Insights. |
|
Wait query ends. |
Amazon QuickSight¶
|
Cancel an ongoing ingestion of data into SPICE. |
|
Create a QuickSight data source pointing to an Athena/Workgroup. |
|
Create a QuickSight dataset. |
|
Create and starts a new SPICE ingestion on a dataset. |
|
Delete all dashboards. |
|
Delete all data sources. |
|
Delete all datasets. |
|
Delete all templates. |
|
Delete a dashboard. |
|
Delete a data source. |
|
Delete a dataset. |
|
Delete a tamplate. |
|
Describe a QuickSight dashboard by name or ID. |
|
Describe a QuickSight data source by name or ID. |
|
Describe a QuickSight data source permissions by name or ID. |
|
Describe a QuickSight dataset by name or ID. |
|
Describe a QuickSight ingestion by ID. |
|
Get QuickSight dashboard ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight dashboard IDs given a name. |
|
Get QuickSight data source ARN given a name and fails if there is more than 1 ARN associated with this name. |
|
Get QuickSight Data source ARNs given a name. |
|
Get QuickSight data source ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight data source IDs given a name. |
|
Get QuickSight Dataset ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight dataset IDs given a name. |
|
Get QuickSight template ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight template IDs given a name. |
|
List dashboards in an AWS account. |
|
List all QuickSight Data sources summaries. |
|
List all QuickSight datasets summaries. |
|
List all QuickSight Groups. |
|
List all QuickSight Group memberships. |
|
List IAM policy assignments in the current Amazon QuickSight account. |
|
List all the IAM policy assignments. |
|
List the history of SPICE ingestions for a dataset. |
|
List all QuickSight templates. |
|
Return a list of all of the Amazon QuickSight users belonging to this account. |
|
List the Amazon QuickSight groups that an Amazon QuickSight user is a member of. |
AWS STS¶
|
Get Account ID. |
|
Get current user/role ARN. |
|
Get current user/role name. |
AWS Secrets Manager¶
|
Get secret value. |
|
Get JSON secret value. |
Amazon Chime¶
|
Send message on an existing Chime Chat rooms. |