AWS Data Wrangler

11 - CSV Datasets

Wrangler has 3 different write modes to store CSV Datasets on Amazon S3.

  • append (Default)

    Only adds new files without any delete.

  • overwrite

    Deletes everything in the target directory and then add new files.

  • overwrite_partitions (Partition Upsert)

    Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.

[1]:
from datetime import date
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

Creating the Dataset

[4]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[4]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

Appending

[5]:
df = pd.DataFrame({
    "id": [3],
    "value": ["bar"],
    "date": [date(2020, 1, 3)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="append",
    database="awswrangler_test",
    table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[5]:
id value date
0 3 bar 2020-01-03
1 1 foo 2020-01-01
2 2 boo 2020-01-02

Overwriting

[6]:
wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[6]:
id value date
0 3 bar 2020-01-03

Creating a Partitoned Dataset

[7]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset",
    partition_cols=["date"]
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[7]:
id value date
0 2 boo 2020-01-02
1 1 foo 2020-01-01

Upserting partitions (overwrite_partitions)

[8]:

df = pd.DataFrame({
    "id": [2, 3],
    "value": ["xoo", "bar"],
    "date": [date(2020, 1, 2), date(2020, 1, 3)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite_partitions",
    database="awswrangler_test",
    table="csv_dataset",
    partition_cols=["date"]
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[8]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
0 3 bar 2020-01-03

BONUS - Glue/Athena integration

[9]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    index=False,
    mode="overwrite",
    database="aws_data_wrangler",
    table="my_table",
    compression="gzip"
)

wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_data_wrangler")
[9]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02