awswrangler.s3.to_csv

awswrangler.s3.to_csv(df: pandas.core.frame.DataFrame, path: str, sep: str = ',', index: bool = True, columns: Optional[List[str]] = None, use_threads: bool = True, boto3_session: Optional[boto3.session.Session] = None, s3_additional_kwargs: Optional[Dict[str, Any]] = None, sanitize_columns: bool = False, dataset: bool = False, partition_cols: Optional[List[str]] = None, concurrent_partitioning: bool = False, mode: Optional[str] = None, catalog_versioning: bool = False, database: Optional[str] = None, table: Optional[str] = None, dtype: Optional[Dict[str, str]] = None, description: Optional[str] = None, parameters: Optional[Dict[str, str]] = None, columns_comments: Optional[Dict[str, str]] = None, regular_partitions: bool = True, projection_enabled: bool = False, projection_types: Optional[Dict[str, str]] = None, projection_ranges: Optional[Dict[str, str]] = None, projection_values: Optional[Dict[str, str]] = None, projection_intervals: Optional[Dict[str, str]] = None, projection_digits: Optional[Dict[str, str]] = None, catalog_id: Optional[str] = None, **pandas_kwargs: Any) → Any

Write CSV file or dataset on Amazon S3.

The concept of Dataset goes beyond the simple idea of ordinary files and enable more complex features like partitioning and catalog integration (Amazon Athena/AWS Glue Catalog).

Note

If database` and table arguments are passed, the table name and all column names will be automatically sanitized using wr.catalog.sanitize_table_name and wr.catalog.sanitize_column_name. Please, pass sanitize_columns=True to enforce this behaviour always.

Note

If dataset=True, pandas_kwargs will be ignored due restrictive quoting, date_format, escapechar, encoding, etc required by Athena/Glue Catalog.

Note

By now Pandas does not support in-memory CSV compression. https://github.com/pandas-dev/pandas/issues/22555 So the compression will not be supported on Wrangler too.

Note

On append mode, the parameters will be upsert on an existing table.

Note

In case of use_threads=True the number of threads that will be spawned will be gotten from os.cpu_count().

Note

This functions has arguments that can has default values configured globally through wr.config or environment variables:

  • catalog_id

  • concurrent_partitioning

  • database

Check out the Global Configurations Tutorial for details.

Parameters
  • df (pandas.DataFrame) – Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html

  • path (str) – Amazon S3 path (e.g. s3://bucket/filename.csv).

  • sep (str) – String of length 1. Field delimiter for the output file.

  • index (bool) – Write row names (index).

  • columns (List[str], optional) – Columns to write.

  • use_threads (bool) – True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads.

  • boto3_session (boto3.Session(), optional) – Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

  • s3_additional_kwargs (Optional[Dict[str, Any]]) – Forward to botocore requests. Valid parameters: “ACL”, “Metadata”, “ServerSideEncryption”, “StorageClass”, “SSECustomerAlgorithm”, “SSECustomerKey”, “SSEKMSKeyId”, “SSEKMSEncryptionContext”, “Tagging”. e.g. s3_additional_kwargs={‘ServerSideEncryption’: ‘aws:kms’, ‘SSEKMSKeyId’: ‘YOUR_KMY_KEY_ARN’}

  • sanitize_columns (bool) – True to sanitize columns names or False to keep it as is. True value is forced if dataset=True.

  • dataset (bool) – If True store a parquet dataset instead of a ordinary file(s) If True, enable all follow arguments: partition_cols, mode, database, table, description, parameters, columns_comments, concurrent_partitioning, catalog_versioning, projection_enabled, projection_types, projection_ranges, projection_values, projection_intervals, projection_digits, catalog_id, schema_evolution.

  • partition_cols (List[str], optional) – List of column names that will be used to create partitions. Only takes effect if dataset=True.

  • concurrent_partitioning (bool) – If True will increase the parallelism level during the partitions writing. It will decrease the writing time and increase the memory usage. https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/022%20-%20Writing%20Partitions%20Concurrently.ipynb

  • mode (str, optional) – append (Default), overwrite, overwrite_partitions. Only takes effect if dataset=True. For details check the related tutorial: https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.s3.to_parquet.html#awswrangler.s3.to_parquet

  • catalog_versioning (bool) – If True and mode=”overwrite”, creates an archived version of the table catalog before updating it.

  • database (str, optional) – Glue/Athena catalog: Database name.

  • table (str, optional) – Glue/Athena catalog: Table name.

  • dtype (Dict[str, str], optional) – Dictionary of columns names and Athena/Glue types to be casted. Useful when you have columns with undetermined or mixed data types. (e.g. {‘col name’: ‘bigint’, ‘col2 name’: ‘int’})

  • description (str, optional) – Glue/Athena catalog: Table description

  • parameters (Dict[str, str], optional) – Glue/Athena catalog: Key/value pairs to tag the table.

  • columns_comments (Dict[str, str], optional) – Glue/Athena catalog: Columns names and the related comments (e.g. {‘col0’: ‘Column 0.’, ‘col1’: ‘Column 1.’, ‘col2’: ‘Partition.’}).

  • regular_partitions (bool) – Create regular partitions (Non projected partitions) on Glue Catalog. Disable when you will work only with Partition Projection. Keep enabled even when working with projections is useful to keep Redshift Spectrum working with the regular partitions.

  • projection_enabled (bool) – Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)

  • projection_types (Optional[Dict[str, str]]) – Dictionary of partitions names and Athena projections types. Valid types: “enum”, “integer”, “date”, “injected” https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html (e.g. {‘col_name’: ‘enum’, ‘col2_name’: ‘integer’})

  • projection_ranges (Optional[Dict[str, str]]) – Dictionary of partitions names and Athena projections ranges. https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html (e.g. {‘col_name’: ‘0,10’, ‘col2_name’: ‘-1,8675309’})

  • projection_values (Optional[Dict[str, str]]) – Dictionary of partitions names and Athena projections values. https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html (e.g. {‘col_name’: ‘A,B,Unknown’, ‘col2_name’: ‘foo,boo,bar’})

  • projection_intervals (Optional[Dict[str, str]]) – Dictionary of partitions names and Athena projections intervals. https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html (e.g. {‘col_name’: ‘1’, ‘col2_name’: ‘5’})

  • projection_digits (Optional[Dict[str, str]]) – Dictionary of partitions names and Athena projections digits. https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html (e.g. {‘col_name’: ‘1’, ‘col2_name’: ‘2’})

  • catalog_id (str, optional) – The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.

  • pandas_kwargs – KEYWORD arguments forwarded to pandas.DataFrame.to_csv(). You can NOT pass pandas_kwargs explicit, just add valid Pandas arguments in the function call and Wrangler will accept it. e.g. wr.s3.to_csv(df, path, sep=’|’, na_rep=’NULL’, decimal=’,’) https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html

Returns

Dictionary with: ‘paths’: List of all stored files paths on S3. ‘partitions_values’: Dictionary of partitions added with keys as S3 path locations and values as a list of partitions values as str.

Return type

Dict[str, Union[List[str], Dict[str, List[str]]]]

Examples

Writing single file

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_csv(
...     df=pd.DataFrame({'col': [1, 2, 3]}),
...     path='s3://bucket/prefix/my_file.csv',
... )
{
    'paths': ['s3://bucket/prefix/my_file.csv'],
    'partitions_values': {}
}

Writing single file with pandas_kwargs

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_csv(
...     df=pd.DataFrame({'col': [1, 2, 3]}),
...     path='s3://bucket/prefix/my_file.csv',
...     sep='|',
...     na_rep='NULL',
...     decimal=','
... )
{
    'paths': ['s3://bucket/prefix/my_file.csv'],
    'partitions_values': {}
}

Writing single file encrypted with a KMS key

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_csv(
...     df=pd.DataFrame({'col': [1, 2, 3]}),
...     path='s3://bucket/prefix/my_file.csv',
...     s3_additional_kwargs={
...         'ServerSideEncryption': 'aws:kms',
...         'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'
...     }
... )
{
    'paths': ['s3://bucket/prefix/my_file.csv'],
    'partitions_values': {}
}

Writing partitioned dataset

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_csv(
...     df=pd.DataFrame({
...         'col': [1, 2, 3],
...         'col2': ['A', 'A', 'B']
...     }),
...     path='s3://bucket/prefix',
...     dataset=True,
...     partition_cols=['col2']
... )
{
    'paths': ['s3://.../col2=A/x.csv', 's3://.../col2=B/y.csv'],
    'partitions_values: {
        's3://.../col2=A/': ['A'],
        's3://.../col2=B/': ['B']
    }
}

Writing dataset to S3 with metadata on Athena/Glue Catalog.

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_csv(
...     df=pd.DataFrame({
...         'col': [1, 2, 3],
...         'col2': ['A', 'A', 'B']
...     }),
...     path='s3://bucket/prefix',
...     dataset=True,
...     partition_cols=['col2'],
...     database='default',  # Athena/Glue database
...     table='my_table'  # Athena/Glue table
... )
{
    'paths': ['s3://.../col2=A/x.csv', 's3://.../col2=B/y.csv'],
    'partitions_values: {
        's3://.../col2=A/': ['A'],
        's3://.../col2=B/': ['B']
    }
}

Writing dataset casting empty column data type

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_csv(
...     df=pd.DataFrame({
...         'col': [1, 2, 3],
...         'col2': ['A', 'A', 'B'],
...         'col3': [None, None, None]
...     }),
...     path='s3://bucket/prefix',
...     dataset=True,
...     database='default',  # Athena/Glue database
...     table='my_table'  # Athena/Glue table
...     dtype={'col3': 'date'}
... )
{
    'paths': ['s3://.../x.csv'],
    'partitions_values: {}
}