AWS SDK for pandas

23 - Flexible Partitions Filter (PUSH-DOWN)

  • partition_filter argument:

    - Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
    - This function MUST receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values.
    - This function MUST return a bool, True to read the partition or False to ignore it.
    - Ignored if `dataset=False`.
    

P.S. Check the function API doc to see it has some argument that can be configured through Global configurations.

[1]:
import pandas as pd

import awswrangler as wr

Enter your bucket name:

[2]:
import getpass

bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Creating the Dataset (Parquet)

[3]:
df = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "value": ["foo", "boo", "bar"],
    }
)

wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", partition_cols=["value"])

wr.s3.read_parquet(path, dataset=True)
[3]:
id value
0 3 bar
1 2 boo
2 1 foo

Parquet Example 1

[4]:
def my_filter(x):
    return x["value"].endswith("oo")


wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[4]:
id value
0 2 boo
1 1 foo

Parquet Example 2

[5]:
from Levenshtein import distance


def my_filter(partitions):
    return distance("boo", partitions["value"]) <= 1


wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[5]:
id value
0 2 boo
1 1 foo

Creating the Dataset (CSV)

[6]:
df = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "value": ["foo", "boo", "bar"],
    }
)

wr.s3.to_csv(
    df=df, path=path, dataset=True, mode="overwrite", partition_cols=["value"], compression="gzip", index=False
)

wr.s3.read_csv(path, dataset=True)
[6]:
id value
0 3 bar
1 2 boo
2 1 foo

CSV Example 1

[7]:
def my_filter(x):
    return x["value"].endswith("oo")


wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[7]:
id value
0 2 boo
1 1 foo

CSV Example 2

[8]:
from Levenshtein import distance


def my_filter(partitions):
    return distance("boo", partitions["value"]) <= 1


wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[8]:
id value
0 2 boo
1 1 foo