awswrangler.timestream.write

awswrangler.timestream.write(df: DataFrame, database: str, table: str, time_col: str | None = None, measure_col: str | list[str | None] | None = None, dimensions_cols: list[str | None] | None = None, version: int = 1, time_unit: Literal['MILLISECONDS', 'SECONDS', 'MICROSECONDS', 'NANOSECONDS'] = 'MILLISECONDS', use_threads: bool | int = True, measure_name: str | None = None, common_attributes: dict[str, Any] | None = None, boto3_session: Session | None = None) list[dict[str, str]]

Store a Pandas DataFrame into an Amazon Timestream table.

Note

In case use_threads=True, the number of threads from os.cpu_count() is used.

If the Timestream service rejects a record(s), this function will not throw a Python exception. Instead it will return the rejection information.

Note

If time_col column is supplied, it must be of type timestamp. time_unit is set to MILLISECONDS by default. NANOSECONDS is not supported as python datetime objects are limited to microseconds precision.

Note

Following arguments are not supported in distributed mode with engine EngineEnum.RAY:

  • boto3_session

Parameters:
  • df (pandas.DataFrame) – Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html

  • database (str) – Amazon Timestream database name.

  • table (str) – Amazon Timestream table name.

  • time_col (str, optional) – DataFrame column name to be used as time. MUST be a timestamp column.

  • measure_col (str | List[str] | None) – DataFrame column name(s) to be used as measure.

  • dimensions_cols (list[str]) – List of DataFrame column names to be used as dimensions.

  • version (int) – Version number used for upserts. Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.

  • time_unit (str, optional) – Time unit for the time column. MILLISECONDS by default.

  • use_threads (bool | int) – True to enable concurrent writing, False to disable multiple threads. If enabled, os.cpu_count() is used as the number of threads. If integer is provided, specified number is used.

  • measure_name (str, optional) – Name that represents the data attribute of the time series. Overrides measure_col if specified.

  • common_attributes (dict[str, Any], optional) – Dictionary of attributes shared across all records in the request. Using common attributes can optimize the cost of writes by reducing the size of request payloads. Values in common_attributes take precedence over all other arguments and data frame values. Dimension attributes are merged with attributes in record objects. Example: {"Dimensions": [{"Name": "device_id", "Value": "12345"}], "MeasureValueType": "DOUBLE"}.

  • boto3_session (boto3.Session(), optional) – Boto3 Session. If None, the default boto3 Session is used.

Returns:

Rejected records. Possible reasons for rejection are described here: https://docs.aws.amazon.com/timestream/latest/developerguide/API_RejectedRecord.html

Return type:

List[Dict[str, str]]

Examples

Store a Pandas DataFrame into a Amazon Timestream table.

>>> import awswrangler as wr
>>> import pandas as pd
>>> df = pd.DataFrame(
>>>     {
>>>         "time": [datetime.now(), datetime.now(), datetime.now()],
>>>         "dim0": ["foo", "boo", "bar"],
>>>         "dim1": [1, 2, 3],
>>>         "measure": [1.0, 1.1, 1.2],
>>>     }
>>> )
>>> rejected_records = wr.timestream.write(
>>>     df=df,
>>>     database="sampleDB",
>>>     table="sampleTable",
>>>     time_col="time",
>>>     measure_col="measure",
>>>     dimensions_cols=["dim0", "dim1"],
>>> )
>>> assert len(rejected_records) == 0

Return value if some records are rejected.

>>> [
>>>     {
>>>         'ExistingVersion': 2,
>>>         'Reason': 'The record version 1 is lower than the existing version 2. A '
>>>                   'higher version is required to update the measure value.',
>>>         'RecordIndex': 0
>>>     }
>>> ]