AWS SDK for pandas

12 - CSV Crawler

awswrangler can extract only the metadata from a Pandas DataFrame and then add it can be added to Glue Catalog as a table.

[1]:
from datetime import datetime

import pandas as pd

import awswrangler as wr

Enter your bucket name:

[2]:
import getpass

bucket = getpass.getpass()
path = f"s3://{bucket}/csv_crawler/"
 ············

Creating a Pandas DataFrame

[3]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f")  # noqa
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date()  # noqa

df = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "string": ["foo", None, "boo"],
        "float": [1.0, None, 2.0],
        "date": [dt("2020-01-01"), None, dt("2020-01-02")],
        "timestamp": [ts("2020-01-01 00:00:00.0"), None, ts("2020-01-02 00:00:01.0")],
        "bool": [True, None, False],
        "par0": [1, 1, 2],
        "par1": ["a", "b", "b"],
    }
)

df
[3]:
id string float date timestamp bool par0 par1
0 1 foo 1.0 2020-01-01 2020-01-01 00:00:00 True 1 a
1 2 None NaN None NaT None 1 b
2 3 boo 2.0 2020-01-02 2020-01-02 00:00:01 False 2 b

Extracting the metadata

[4]:
columns_types, partitions_types = wr.catalog.extract_athena_types(
    df=df, file_format="csv", index=False, partition_cols=["par0", "par1"]
)
[5]:
columns_types
[5]:
{'id': 'bigint',
 'string': 'string',
 'float': 'double',
 'date': 'date',
 'timestamp': 'timestamp',
 'bool': 'boolean'}
[6]:
partitions_types
[6]:
{'par0': 'bigint', 'par1': 'string'}

Creating the table

[7]:
wr.catalog.create_csv_table(
    table="csv_crawler",
    database="awswrangler_test",
    path=path,
    partitions_types=partitions_types,
    columns_types=columns_types,
)

Checking

[8]:
wr.catalog.table(database="awswrangler_test", table="csv_crawler")
[8]:
Column Name Type Partition Comment
0 id bigint False
1 string string False
2 float double False
3 date date False
4 timestamp timestamp False
5 bool boolean False
6 par0 bigint True
7 par1 string True

We can still using the extracted metadata to ensure all data types consistence to new data

[9]:
df = pd.DataFrame(
    {
        "id": [1],
        "string": ["1"],
        "float": [1],
        "date": [ts("2020-01-01 00:00:00.0")],
        "timestamp": [dt("2020-01-02")],
        "bool": [1],
        "par0": [1],
        "par1": ["a"],
    }
)

df
[9]:
id string float date timestamp bool par0 par1
0 1 1 1 2020-01-01 2020-01-02 1 1 a
[10]:
res = wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    database="awswrangler_test",
    table="csv_crawler",
    partition_cols=["par0", "par1"],
    dtype=columns_types,
)

You can also extract the metadata directly from the Catalog if you want

[11]:
dtype = wr.catalog.get_table_types(database="awswrangler_test", table="csv_crawler")
[12]:
res = wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    database="awswrangler_test",
    table="csv_crawler",
    partition_cols=["par0", "par1"],
    dtype=dtype,
)

Checking out

[13]:
df = wr.athena.read_sql_table(database="awswrangler_test", table="csv_crawler")

df
[13]:
id string float date timestamp bool par0 par1
0 1 1 1.0 None 2020-01-02 True 1 a
1 1 1 1.0 None 2020-01-02 True 1 a
[14]:
df.dtypes
[14]:
id                    Int64
string               string
float               float64
date                 object
timestamp    datetime64[ns]
bool                boolean
par0                  Int64
par1                 string
dtype: object

Cleaning Up S3

[15]:
wr.s3.delete_objects(path)

Cleaning Up the Database

[16]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="csv_crawler")
[16]:
True