AWS Data Wrangler

23 - Flexible Partitions Filter (PUSH-DOWN)

  • partition_filter argument:

    - Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
    - This function MUST receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values.
    - This function MUST return a bool, True to read the partition or False to ignore it.
    - Ignored if `dataset=False`.
    

P.S. Check thefunction API docto see it has some argument that can be configured through Global configurations.

[1]:
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Creating the Dataset (PARQUET)

[3]:
df = pd.DataFrame({
    "id": [1, 2, 3],
    "value": ["foo", "boo", "bar"],
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["value"]
)

wr.s3.read_parquet(path, dataset=True)
[3]:
id value
0 3 bar
1 2 boo
2 1 foo

Example 1

[4]:
my_filter = lambda x: x["value"].endswith("oo")

wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[4]:
id value
0 2 boo
1 1 foo

Example 2

[5]:
from Levenshtein import distance


def my_filter(partitions):
    return distance("boo", partitions["value"]) <= 1


wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[5]:
id value
0 2 boo
1 1 foo

Creating the Dataset (CSV)

[6]:
df = pd.DataFrame({
    "id": [1, 2, 3],
    "value": ["foo", "boo", "bar"],
})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["value"],
    compression="gzip",
    index=False
)

wr.s3.read_csv(path, dataset=True)
[6]:
id value
0 3 bar
1 2 boo
2 1 foo

Example 1

[7]:
my_filter = lambda x: x["value"].endswith("oo")

wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[7]:
id value
0 2 boo
1 1 foo

Example 2

[8]:
from Levenshtein import distance


def my_filter(partitions):
    return distance("boo", partitions["value"]) <= 1


wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[8]:
id value
0 2 boo
1 1 foo