AWS Data Wrangler

22 - Writing Partitions Concurrently

  • concurrent_partitioning argument:

    If True will increase the parallelism level during the partitions writing. It will decrease the
    writing time and increase the memory usage.
    

P.S. Check thefunction API docto see it has some argument that can be configured through Global configurations.

[1]:
%reload_ext memory_profiler

import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
 ············

Reading 4 GB of CSV from NOAA’s historical data and creating a year column

[3]:
noaa_path = "s3://noaa-ghcn-pds/csv/193"

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
dates = ["dt", "obs_time"]
dtype = {x: "category" for x in ["element", "m_flag", "q_flag", "s_flag"]}

df = wr.s3.read_csv(noaa_path, names=cols, parse_dates=dates, dtype=dtype)

df["year"] = df["dt"].dt.year

print(f"Number of rows: {len(df.index)}")
print(f"Number of columns: {len(df.columns)}")
Number of rows: 125407761
Number of columns: 9

Default Writing

[4]:
%%time
%%memit

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["year"],
);
peak memory: 22169.04 MiB, increment: 11119.68 MiB
CPU times: user 49 s, sys: 12.5 s, total: 1min 1s
Wall time: 1min 11s

Concurrent Partitioning (Decreasing writing time, but increasing memory usage)

[5]:
%%time
%%memit

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["year"],
    concurrent_partitioning=True  # <-----
);
peak memory: 27819.48 MiB, increment: 15743.30 MiB
CPU times: user 52.3 s, sys: 13.6 s, total: 1min 5s
Wall time: 41.6 s
[ ]: