An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com

Quick Start

>>> pip install awswrangler
import awswrangler as wr
import pandas as pd
from datetime import datetime

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")

# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()

# Amazon Timestream Write
df = pd.DataFrame({
    "time": [datetime.now(), datetime.now()],
    "my_dimension": ["foo", "boo"],
    "measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="measure",
    dimensions_cols=["my_dimension"],
)

# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")

Read The Docs

What is AWS Data Wrangler?

An AWS Professional Service open source python initiative that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon Timestream, Amazon EMR, Amazon QuickSight, etc).

Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

Check our tutorials or the list of functionalities.

Install

AWS Data Wrangler runs with Python 3.6, 3.7 and 3.8 and on several platforms (AWS Lambda, AWS Glue Python Shell, EMR, EC2, on-premises, Amazon SageMaker, local, etc).

Some good practices for most of the methods bellow are:
  • Use new and individual Virtual Environments for each project (venv).

  • On Notebooks, always restart your kernel after installations.

PyPI (pip)

>>> pip install awswrangler

Conda

>>> conda install -c conda-forge awswrangler

AWS Lambda Layer

1 - Go to GitHub’s release section and download the layer zip related to the desired version.

2 - Go to the AWS Lambda Panel, open the layer section (left side) and click create layer.

3 - Set name and python version, upload your fresh downloaded zip file and press create to create the layer.

4 - Go to your Lambda and select your new layer!

AWS Glue Python Shell Jobs

1 - Go to GitHub’s release page and download the wheel file (.whl) related to the desired version.

2 - Upload the wheel file to any Amazon S3 location.

3 - Go to your Glue Python Shell job and point to the wheel file on S3 in the Python library path field.

Official Glue Python Shell Reference

AWS Glue PySpark Jobs

Note

AWS Data Wrangler has compiled dependencies (C/C++) so there is only support for Glue PySpark Jobs >= 2.0.

Go to your Glue PySpark job and create a new Job parameters key/value:

  • Key: --additional-python-modules

  • Value: awswrangler

To install a specific version, set the value for above Job parameter as follows:

  • Value: awswrangler==1.10.0

Official Glue PySpark Reference

Amazon SageMaker Notebook

Run this command in any Python 3 notebook paragraph and then make sure to restart the kernel before import the awswrangler package.

>>> !pip install awswrangler

Amazon SageMaker Notebook Lifecycle

Open SageMaker console, go to the lifecycle section and use the follow snippet to configure AWS Data Wrangler for all compatible SageMaker kernels (Reference).

#!/bin/bash

set -e

# OVERVIEW
# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which
# is a system environment reserved for Jupyter.
# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using
# "nohup" to run this as a background process in that case.

sudo -u ec2-user -i <<'EOF'

# PARAMETERS
PACKAGE=awswrangler

# Note that "base" is special environment name, include it there as well.
for env in base /home/ec2-user/anaconda3/envs/*; do
    source /home/ec2-user/anaconda3/bin/activate $(basename "$env")
    if [ $env = 'JupyterSystemEnv' ]; then
        continue
    fi
    nohup pip install --upgrade "$PACKAGE" &
    source /home/ec2-user/anaconda3/bin/deactivate
done
EOF

EMR Cluster

Even not being a distributed library, AWS Data Wrangler could be a good helper to complement Big Data pipelines.

  • Configure Python 3 as the default interpreter for PySpark under your cluster configuration

    [
      {
         "Classification": "spark-env",
         "Configurations": [
           {
             "Classification": "export",
             "Properties": {
                "PYSPARK_PYTHON": "/usr/bin/python3"
              }
           }
        ]
      }
    ]
    
  • Keep the bootstrap script above on S3 and reference it on your cluster.

    #!/usr/bin/env bash
    set -ex
    
    sudo pip-3.6 install awswrangler
    

Note

Make sure to freeze the Wrangler version in the bootstrap for productive environments (e.g. awswrangler==1.8.1)

From Source

>>> git clone https://github.com/awslabs/aws-data-wrangler.git
>>> cd aws-data-wrangler
>>> pip install .

API Reference

Amazon S3

copy_objects(paths, source_path, target_path)

Copy a list of S3 objects to another S3 directory.

delete_objects(path[, use_threads, …])

Delete Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

describe_objects(path[, use_threads, …])

Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

does_object_exist(path[, boto3_session])

Check if object exists on S3.

get_bucket_region(bucket[, boto3_session])

Get bucket region name.

list_directories(path[, boto3_session])

List Amazon S3 objects from a prefix.

list_objects(path[, suffix, ignore_suffix, …])

List Amazon S3 objects from a prefix.

merge_datasets(source_path, target_path[, …])

Merge a source dataset into a target dataset.

read_csv(path[, path_suffix, …])

Read CSV file(s) from from a received S3 prefix or list of S3 objects paths.

read_fwf(path[, path_suffix, …])

Read fixed-width formatted file(s) from from a received S3 prefix or list of S3 objects paths.

read_json(path[, path_suffix, …])

Read JSON file(s) from from a received S3 prefix or list of S3 objects paths.

read_parquet(path[, path_suffix, …])

Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.

read_parquet_metadata(path[, path_suffix, …])

Read Apache Parquet file(s) metadata from from a received S3 prefix or list of S3 objects paths.

read_parquet_table(table, database[, …])

Read Apache Parquet table registered on AWS Glue Catalog.

size_objects(path[, use_threads, boto3_session])

Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

store_parquet_metadata(path, database, table)

Infer and store parquet metadata on AWS Glue Catalog.

to_csv(df, path[, sep, index, columns, …])

Write CSV file or dataset on Amazon S3.

to_json(df, path[, boto3_session, …])

Write JSON file on Amazon S3.

to_parquet(df, path[, index, compression, …])

Write Parquet file or dataset on Amazon S3.

wait_objects_exist(paths[, delay, …])

Wait Amazon S3 objects exist.

wait_objects_not_exist(paths[, delay, …])

Wait Amazon S3 objects not exist.

AWS Glue Catalog

add_column(database, table, column_name[, …])

Add a column in a AWS Glue Catalog table.

add_csv_partitions(database, table, …[, …])

Add partitions (metadata) to a CSV Table in the AWS Glue Catalog.

add_parquet_partitions(database, table, …)

Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog.

create_csv_table(database, table, path, …)

Create a CSV Table (Metadata Only) in the AWS Glue Catalog.

create_database(name[, description, …])

Create a database in AWS Glue Catalog.

create_parquet_table(database, table, path, …)

Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.

databases([limit, catalog_id, boto3_session])

Get a Pandas DataFrame with all listed databases.

delete_column(database, table, column_name)

Delete a column in a AWS Glue Catalog table.

delete_database(name[, catalog_id, …])

Create a database in AWS Glue Catalog.

delete_partitions(table, database, …[, …])

Delete specified partitions in a AWS Glue Catalog table.

delete_all_partitions(table, database[, …])

Delete all partitions in a AWS Glue Catalog table.

delete_table_if_exists(database, table[, …])

Delete Glue table if exists.

does_table_exist(database, table[, …])

Check if the table exists.

drop_duplicated_columns(df)

Drop all repeated columns (duplicated names).

extract_athena_types(df[, index, …])

Extract columns and partitions types (Amazon Athena) from Pandas DataFrame.

get_columns_comments(database, table[, …])

Get all columns comments.

get_csv_partitions(database, table[, …])

Get all partitions from a Table in the AWS Glue Catalog.

get_databases([catalog_id, boto3_session])

Get an iterator of databases.

get_parquet_partitions(database, table[, …])

Get all partitions from a Table in the AWS Glue Catalog.

get_partitions(database, table[, …])

Get all partitions from a Table in the AWS Glue Catalog.

get_table_description(database, table[, …])

Get table description.

get_table_location(database, table[, …])

Get table’s location on Glue catalog.

get_table_number_of_versions(database, table)

Get tatal number of versions.

get_table_parameters(database, table[, …])

Get all parameters.

get_table_types(database, table[, boto3_session])

Get all columns and types from a table.

get_table_versions(database, table[, …])

Get all versions.

get_tables([catalog_id, database, …])

Get an iterator of tables.

overwrite_table_parameters(parameters, …)

Overwrite all existing parameters.

sanitize_column_name(column)

Convert the column name to be compatible with Amazon Athena.

sanitize_dataframe_columns_names(df)

Normalize all columns names to be compatible with Amazon Athena.

sanitize_table_name(table)

Convert the table name to be compatible with Amazon Athena.

search_tables(text[, catalog_id, boto3_session])

Get Pandas DataFrame of tables filtered by a search string.

table(database, table[, catalog_id, …])

Get table details as Pandas DataFrame.

tables([limit, catalog_id, database, …])

Get a DataFrame with tables filtered by a search term, prefix, suffix.

upsert_table_parameters(parameters, …[, …])

Insert or Update the received parameters.

Amazon Athena

create_athena_bucket([boto3_session])

Create the default Athena bucket if it doesn’t exist.

get_query_columns_types(query_execution_id)

Get the data type of all columns queried.

get_query_execution(query_execution_id[, …])

Fetch query execution details.

get_work_group(workgroup[, boto3_session])

Return information about the workgroup with the specified name.

read_sql_query(sql, database[, …])

Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.

read_sql_table(table, database[, …])

Extract the full table AWS Athena and return the results as a Pandas DataFrame.

repair_table(table[, database, s3_output, …])

Run the Hive’s metastore consistency check: ‘MSCK REPAIR TABLE table;’.

start_query_execution(sql[, database, …])

Start a SQL Query against AWS Athena.

stop_query_execution(query_execution_id[, …])

Stop a query execution.

wait_query(query_execution_id[, boto3_session])

Wait for the query end.

Amazon Redshift

connect([connection, secret_id, catalog_id, …])

Return a redshift_connector connection from a Glue Catalog or Secret Manager.

connect_temp(cluster_identifier, user[, …])

Return a redshift_connector temporary connection (No password required).

copy(df, path, con, table, schema[, …])

Load Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage.

copy_from_files(path, con, table, schema[, …])

Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command).

read_sql_query(sql, con[, index_col, …])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, …])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, …])

Write records stored in a DataFrame into Redshift.

unload(sql, path, con[, iam_role, …])

Load Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage.

unload_to_files(sql, path, con[, iam_role, …])

Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command).

PostgreSQL

connect([connection, secret_id, catalog_id, …])

Return a pg8000 connection from a Glue Catalog Connection.

read_sql_query(sql, con[, index_col, …])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, …])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, …])

Write records stored in a DataFrame into PostgreSQL.

MySQL

connect([connection, secret_id, catalog_id, …])

Return a pymysql connection from a Glue Catalog Connection.

read_sql_query(sql, con[, index_col, …])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, …])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, …])

Write records stored in a DataFrame into MySQL.

Amazon Timestream

create_database(database[, kms_key_id, …])

Create a new Timestream database.

create_table(database, table, …[, tags, …])

Create a new Timestream database.

delete_database(database[, boto3_session])

Delete a given Timestream database.

delete_table(database, table[, boto3_session])

Delete a given Timestream table.

query(sql[, boto3_session])

Run a query and retrieve the result as a Pandas DataFrame.

write(df, database, table, time_col, …[, …])

Store a Pandas DataFrame into a Amazon Timestream table.

Amazon EMR

build_spark_step(path[, deploy_mode, …])

Build the Step structure (dictionary).

build_step(command[, name, …])

Build the Step structure (dictionary).

create_cluster(subnet_id[, cluster_name, …])

Create a EMR cluster with instance fleets configuration.

get_cluster_state(cluster_id[, boto3_session])

Get the EMR cluster state.

get_step_state(cluster_id, step_id[, …])

Get EMR step state.

submit_ecr_credentials_refresh(cluster_id, path)

Update internal ECR credentials.

submit_spark_step(cluster_id, path[, …])

Submit Spark Step.

submit_step(cluster_id, command[, name, …])

Submit new job in the EMR Cluster.

submit_steps(cluster_id, steps[, boto3_session])

Submit a list of steps.

terminate_cluster(cluster_id[, boto3_session])

Terminate EMR cluster.

Amazon CloudWatch Logs

read_logs(query, log_group_names[, …])

Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame.

run_query(query, log_group_names[, …])

Run a query against AWS CloudWatchLogs Insights and wait the results.

start_query(query, log_group_names[, …])

Run a query against AWS CloudWatchLogs Insights.

wait_query(query_id[, boto3_session])

Wait query ends.

Amazon QuickSight

cancel_ingestion(ingestion_id[, …])

Cancel an ongoing ingestion of data into SPICE.

create_athena_data_source(name[, workgroup, …])

Create a QuickSight data source pointing to an Athena/Workgroup.

create_athena_dataset(name[, database, …])

Create a QuickSight dataset.

create_ingestion([dataset_name, dataset_id, …])

Create and starts a new SPICE ingestion on a dataset.

delete_all_dashboards([account_id, …])

Delete all dashboards.

delete_all_data_sources([account_id, …])

Delete all data sources.

delete_all_datasets([account_id, boto3_session])

Delete all datasets.

delete_all_templates([account_id, boto3_session])

Delete all templates.

delete_dashboard([name, dashboard_id, …])

Delete a dashboard.

delete_data_source([name, data_source_id, …])

Delete a data source.

delete_dataset([name, dataset_id, …])

Delete a dataset.

delete_template([name, template_id, …])

Delete a tamplate.

describe_dashboard([name, dashboard_id, …])

Describe a QuickSight dashboard by name or ID.

describe_data_source([name, data_source_id, …])

Describe a QuickSight data source by name or ID.

describe_data_source_permissions([name, …])

Describe a QuickSight data source permissions by name or ID.

describe_dataset([name, dataset_id, …])

Describe a QuickSight dataset by name or ID.

describe_ingestion(ingestion_id[, …])

Describe a QuickSight ingestion by ID.

get_dashboard_id(name[, account_id, …])

Get QuickSight dashboard ID given a name and fails if there is more than 1 ID associated with this name.

get_dashboard_ids(name[, account_id, …])

Get QuickSight dashboard IDs given a name.

get_data_source_arn(name[, account_id, …])

Get QuickSight data source ARN given a name and fails if there is more than 1 ARN associated with this name.

get_data_source_arns(name[, account_id, …])

Get QuickSight Data source ARNs given a name.

get_data_source_id(name[, account_id, …])

Get QuickSight data source ID given a name and fails if there is more than 1 ID associated with this name.

get_data_source_ids(name[, account_id, …])

Get QuickSight data source IDs given a name.

get_dataset_id(name[, account_id, boto3_session])

Get QuickSight Dataset ID given a name and fails if there is more than 1 ID associated with this name.

get_dataset_ids(name[, account_id, …])

Get QuickSight dataset IDs given a name.

get_template_id(name[, account_id, …])

Get QuickSight template ID given a name and fails if there is more than 1 ID associated with this name.

get_template_ids(name[, account_id, …])

Get QuickSight template IDs given a name.

list_dashboards([account_id, boto3_session])

List dashboards in an AWS account.

list_data_sources([account_id, boto3_session])

List all QuickSight Data sources summaries.

list_datasets([account_id, boto3_session])

List all QuickSight datasets summaries.

list_groups([namespace, account_id, …])

List all QuickSight Groups.

list_group_memberships(group_name[, …])

List all QuickSight Group memberships.

list_iam_policy_assignments([status, …])

List IAM policy assignments in the current Amazon QuickSight account.

list_iam_policy_assignments_for_user(user_name)

List all the IAM policy assignments.

list_ingestions([dataset_name, dataset_id, …])

List the history of SPICE ingestions for a dataset.

list_templates([account_id, boto3_session])

List all QuickSight templates.

list_users([namespace, account_id, …])

Return a list of all of the Amazon QuickSight users belonging to this account.

list_user_groups(user_name[, namespace, …])

List the Amazon QuickSight groups that an Amazon QuickSight user is a member of.

AWS STS

get_account_id([boto3_session])

Get Account ID.

get_current_identity_arn([boto3_session])

Get current user/role ARN.

get_current_identity_name([boto3_session])

Get current user/role name.

AWS Secrets Manager

get_secret(name[, boto3_session])

Get secret value.

get_secret_json(name[, boto3_session])

Get JSON secret value.

Global Configurations

reset([item])

Reset one or all (if None is received) configuration values.

to_pandas()

Load all configurations on a Pandas DataFrame.