Note

Due the new major version 1.0.0 with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. pip install awswrangler==0.3.2).

Quick Start

>>> pip install awswrangler
import awswrangler as wr
import pandas as pd

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")

# Getting Redshift connection (SQLAlchemy) from Glue Catalog Connections
engine = wr.catalog.get_engine("my-redshift-connection")

# Retrieving the data from Amazon Redshift Spectrum
df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)

Read The Docs

Note

We just released a new major version 1.0 with breaking changes. Please make sure that all your old projects has dependencies frozen on the desired version (e.g. pip install awswrangler==0.3.2).

What is AWS Data Wrangler?

An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, etc).

Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

Check our tutorials or the list of functionalities.

Note

We just released a new major version 1.0 with breaking changes. Please make sure that all your old projects has dependencies frozen on the desired version (e.g. pip install awswrangler==0.3.2).

Install

AWS Data Wrangler runs with Python 3.6, 3.7 and 3.8 and on several platforms (AWS Lambda, AWS Glue Python Shell, EMR, EC2, on-premises, Amazon SageMaker, local, etc).

Some good practices for most of the methods bellow are:
  • Use new and individual Virtual Environments for each project (venv).

  • On Notebooks, always restart your kernel after installations.

PyPI (pip)

>>> pip install awswrangler

Conda

>>> conda install -c conda-forge awswrangler

AWS Lambda Layer

1 - Go to GitHub’s release section and download the layer zip related to the desired version.

2 - Go to the AWS Lambda Panel, open the layer section (left side) and click create layer.

3 - Set name and python version, upload your fresh downloaded zip file and press create to create the layer.

4 - Go to your Lambda and select your new layer!

AWS Glue Wheel

Note

AWS Data Wrangler has compiled dependencies (C/C++) so there is only support for Glue Python Shell, not for Glue PySpark.

1 - Go to GitHub’s release page and download the wheel file (.whl) related to the desired version.

2 - Upload the wheel file to any Amazon S3 location.

3 - Got to your Glue Python Shell job and point to the new file on s3.

Amazon SageMaker Notebook

Run this command in any Python 3 notebook paragraph and then make sure to restart the kernel before import the awswrangler package.

>>> !pip install awswrangler

Amazon SageMaker Notebook Lifecycle

Open SageMaker console, go to the lifecycle section and use the follow snippet to configure AWS Data Wrangler for all compatible SageMaker kernels (Reference).

#!/bin/bash

set -e

# OVERVIEW
# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which
# is a system environment reserved for Jupyter.
# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using
# "nohup" to run this as a background process in that case.

sudo -u ec2-user -i <<'EOF'

# PARAMETERS
PACKAGE=awswrangler

# Note that "base" is special environment name, include it there as well.
for env in base /home/ec2-user/anaconda3/envs/*; do
    source /home/ec2-user/anaconda3/bin/activate $(basename "$env")
    if [ $env = 'JupyterSystemEnv' ]; then
        continue
    fi
    nohup pip install --upgrade "$PACKAGE" &
    source /home/ec2-user/anaconda3/bin/deactivate
done
EOF

EMR Cluster

Even not being a distributed library, AWS Data Wrangler could be a good helper to complement Big Data pipelines.

  • Configure Python 3 as the default interpreter for PySpark under your cluster configuration

    [
      {
         "Classification": "spark-env",
         "Configurations": [
           {
             "Classification": "export",
             "Properties": {
                "PYSPARK_PYTHON": "/usr/bin/python3"
              }
           }
        ]
      }
    ]
    
  • Keep the bootstrap script above on S3 and reference it on your cluster.

    #!/usr/bin/env bash
    set -ex
    
    sudo pip-3.6 install awswrangler
    

Note

Make sure to freeze the Wrangler version in the bootstrap for productive environments (e.g. awswrangler==1.0.0)

From Source

>>> git clone https://github.com/awslabs/aws-data-wrangler.git
>>> cd aws-data-wrangler
>>> pip install .

Note

We just released a new major version 1.0 with breaking changes. Please make sure that all your old projects has the dependencies frozen on the desired version (e.g. pip install awswrangler==0.3.2).

API Reference

Amazon S3

delete_objects(path[, use_threads, …])

Delete Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

describe_objects(path[, wait_time, …])

Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

does_object_exist(path[, boto3_session])

Check if object exists on S3.

get_bucket_region(bucket[, boto3_session])

Get bucket region name.

list_objects(path[, suffix, boto3_session])

List Amazon S3 objects from a prefix.

list_directories(path[, boto3_session])

List Amazon S3 objects from a prefix.

read_csv(path[, use_threads, boto3_session, …])

Read CSV file(s) from from a received S3 prefix or list of S3 objects paths.

read_fwf(path[, use_threads, boto3_session, …])

Read fixed-width formatted file(s) from from a received S3 prefix or list of S3 objects paths.

read_json(path[, use_threads, …])

Read JSON file(s) from from a received S3 prefix or list of S3 objects paths.

read_parquet(path[, filters, columns, …])

Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.

read_parquet_table(table, database[, …])

Read Apache Parquet table registered on AWS Glue Catalog.

read_parquet_metadata(path[, filters, …])

Read Apache Parquet file(s) metadata from from a received S3 prefix or list of S3 objects paths.

size_objects(path[, wait_time, use_threads, …])

Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

store_parquet_metadata(path, database, table)

Infer and store parquet metadata on AWS Glue Catalog.

to_csv(df, path[, sep, index, columns, …])

Write CSV file or dataset on Amazon S3.

to_json(df, path[, boto3_session, …])

Write JSON file on Amazon S3.

to_parquet(df, path[, index, compression, …])

Write Parquet file or dataset on Amazon S3.

wait_objects_exist(paths[, delay, …])

Wait Amazon S3 objects exist.

wait_objects_not_exist(paths[, delay, …])

Wait Amazon S3 objects not exist.

copy_objects(paths, source_path, target_path)

Copy a list of S3 objects to another S3 directory.

merge_datasets(source_path, target_path[, …])

Merge a source dataset into a target dataset.

AWS Glue Catalog

add_parquet_partitions(database, table, …)

Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog.

create_parquet_table(database, table, path, …)

Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.

add_csv_partitions(database, table, …[, …])

Add partitions (metadata) to a CSV Table in the AWS Glue Catalog.

create_csv_table(database, table, path, …)

Create a CSV Table (Metadata Only) in the AWS Glue Catalog.

databases([limit, catalog_id, boto3_session])

Get a Pandas DataFrame with all listed databases.

delete_table_if_exists(database, table[, …])

Delete Glue table if exists.

does_table_exist(database, table[, …])

Check if the table exists.

get_databases([catalog_id, boto3_session])

Get an iterator of databases.

get_parquet_partitions(database, table[, …])

Get all partitions from a Table in the AWS Glue Catalog.

get_csv_partitions(database, table[, …])

Get all partitions from a Table in the AWS Glue Catalog.

get_table_location(database, table[, …])

Get table’s location on Glue catalog.

get_table_types(database, table[, boto3_session])

Get all columns and types from a table.

get_tables([catalog_id, database, …])

Get an iterator of tables.

search_tables(text[, catalog_id, boto3_session])

Get Pandas DataFrame of tables filtered by a search string.

table(database, table[, catalog_id, …])

Get table details as Pandas DataFrame.

tables([limit, catalog_id, database, …])

Get a DataFrame with tables filtered by a search term, prefix, suffix.

sanitize_column_name(column)

Convert the column name to be compatible with Amazon Athena.

sanitize_dataframe_columns_names(df)

Normalize all columns names to be compatible with Amazon Athena.

sanitize_table_name(table)

Convert the table name to be compatible with Amazon Athena.

drop_duplicated_columns(df)

Drop all repeated columns (duplicated names).

get_engine(connection[, catalog_id, …])

Return a SQLAlchemy Engine from a Glue Catalog Connection.

extract_athena_types(df[, index, …])

Extract columns and partitions types (Amazon Athena) from Pandas DataFrame.

get_table_parameters(database, table[, …])

Get all parameters.

upsert_table_parameters(parameters, …[, …])

Insert or Update the received parameters.

upsert_table_parameters(parameters, …[, …])

Insert or Update the received parameters.

Amazon Athena

read_sql_query(sql, database[, …])

Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.

read_sql_table(table, database[, …])

Extract the full table AWS Athena and return the results as a Pandas DataFrame.

repair_table(table[, database, s3_output, …])

Run the Hive’s metastore consistency check: ‘MSCK REPAIR TABLE table;’.

start_query_execution(sql[, database, …])

Start a SQL Query against AWS Athena.

stop_query_execution(query_execution_id[, …])

Stop a query execution.

wait_query(query_execution_id[, boto3_session])

Wait for the query end.

create_athena_bucket([boto3_session])

Create the default Athena bucket if it doesn’t exist.

get_query_columns_types(query_execution_id)

Get the data type of all columns queried.

get_work_group(workgroup[, boto3_session])

Return information about the workgroup with the specified name.

Databases (Redshift, PostgreSQL, MySQL)

to_sql(df, con, **pandas_kwargs)

Write records stored in a DataFrame to a SQL database.

read_sql_query(sql, con[, index_col, …])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, …])

Return a DataFrame corresponding to the result set of the query string.

get_engine(db_type, host, port, database, …)

Return a SQLAlchemy Engine from the given arguments.

get_redshift_temp_engine(cluster_identifier, …)

Get Glue connection details.

copy_to_redshift(df, path, con, table, …)

Load Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage.

copy_files_to_redshift(path, …[, mode, …])

Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command).

unload_redshift(sql, path, con, iam_role[, …])

Load Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage.

unload_redshift_to_files(sql, path, con, …)

Unload Parquet files from a Amazon Redshift query result to parquet files on s3 (Through UNLOAD command).

write_redshift_copy_manifest(manifest_path, …)

Write Redshift copy manifest and return its structure.

EMR

create_cluster(subnet_id[, cluster_name, …])

Create a EMR cluster with instance fleets configuration.

get_cluster_state(cluster_id[, boto3_session])

Get the EMR cluster state.

terminate_cluster(cluster_id[, boto3_session])

Terminate EMR cluster.

submit_step(cluster_id, command[, name, …])

Submit new job in the EMR Cluster.

submit_spark_step(cluster_id, path[, …])

Submit Spark Step.

submit_ecr_credentials_refresh(cluster_id, path)

Update internal ECR credentials.

submit_steps(cluster_id, steps[, boto3_session])

Submit a list of steps.

build_step(command[, name, …])

Build the Step structure (dictionary).

build_spark_step(path[, deploy_mode, …])

Build the Step structure (dictionary).

get_step_state(cluster_id, step_id[, …])

Get EMR step state.

CloudWatch Logs

read_logs(query, log_group_names[, …])

Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame.

run_query(query, log_group_names[, …])

Run a query against AWS CloudWatchLogs Insights and wait the results.

start_query(query, log_group_names[, …])

Run a query against AWS CloudWatchLogs Insights.

wait_query(query_id[, boto3_session])

Wait query ends.