AWS SDK for pandas

27 - Amazon Timestream - Example 2

Reading test data

[1]:
from datetime import datetime

import pandas as pd

import awswrangler as wr

df = pd.read_csv(
    "https://raw.githubusercontent.com/aws/amazon-timestream-tools/master/sample_apps/data/sample.csv",
    names=[
        "ignore0",
        "region",
        "ignore1",
        "az",
        "ignore2",
        "hostname",
        "measure_kind",
        "measure",
        "ignore3",
        "ignore4",
        "ignore5",
    ],
    usecols=["region", "az", "hostname", "measure_kind", "measure"],
)
df["time"] = datetime.now()
df.reset_index(inplace=True, drop=False)

df
[1]:
index region az hostname measure_kind measure time
0 0 us-east-1 us-east-1a host-fj2hx cpu_utilization 21.394363 2020-12-08 16:18:47.599597
1 1 us-east-1 us-east-1a host-fj2hx memory_utilization 68.563420 2020-12-08 16:18:47.599597
2 2 us-east-1 us-east-1a host-6kMPE cpu_utilization 17.144579 2020-12-08 16:18:47.599597
3 3 us-east-1 us-east-1a host-6kMPE memory_utilization 73.507870 2020-12-08 16:18:47.599597
4 4 us-east-1 us-east-1a host-sxj7X cpu_utilization 26.584865 2020-12-08 16:18:47.599597
... ... ... ... ... ... ... ...
125995 125995 eu-north-1 eu-north-1c host-De8RB memory_utilization 68.063468 2020-12-08 16:18:47.599597
125996 125996 eu-north-1 eu-north-1c host-2z8tn memory_utilization 72.203680 2020-12-08 16:18:47.599597
125997 125997 eu-north-1 eu-north-1c host-2z8tn cpu_utilization 29.212219 2020-12-08 16:18:47.599597
125998 125998 eu-north-1 eu-north-1c host-9FczW memory_utilization 71.746134 2020-12-08 16:18:47.599597
125999 125999 eu-north-1 eu-north-1c host-9FczW cpu_utilization 1.677793 2020-12-08 16:18:47.599597

126000 rows × 7 columns

Creating resources

[2]:
wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1)

Write CPU_UTILIZATION records

[3]:
df_cpu = df[df.measure_kind == "cpu_utilization"].copy()
df_cpu.rename(columns={"measure": "cpu_utilization"}, inplace=True)
df_cpu
[3]:
index region az hostname measure_kind cpu_utilization time
0 0 us-east-1 us-east-1a host-fj2hx cpu_utilization 21.394363 2020-12-08 16:18:47.599597
2 2 us-east-1 us-east-1a host-6kMPE cpu_utilization 17.144579 2020-12-08 16:18:47.599597
4 4 us-east-1 us-east-1a host-sxj7X cpu_utilization 26.584865 2020-12-08 16:18:47.599597
6 6 us-east-1 us-east-1a host-ExOui cpu_utilization 52.930970 2020-12-08 16:18:47.599597
8 8 us-east-1 us-east-1a host-Bwb3j cpu_utilization 99.134110 2020-12-08 16:18:47.599597
... ... ... ... ... ... ... ...
125990 125990 eu-north-1 eu-north-1c host-aPtc6 cpu_utilization 89.566125 2020-12-08 16:18:47.599597
125992 125992 eu-north-1 eu-north-1c host-7ZF9L cpu_utilization 75.510598 2020-12-08 16:18:47.599597
125994 125994 eu-north-1 eu-north-1c host-De8RB cpu_utilization 2.771261 2020-12-08 16:18:47.599597
125997 125997 eu-north-1 eu-north-1c host-2z8tn cpu_utilization 29.212219 2020-12-08 16:18:47.599597
125999 125999 eu-north-1 eu-north-1c host-9FczW cpu_utilization 1.677793 2020-12-08 16:18:47.599597

63000 rows × 7 columns

[4]:
rejected_records = wr.timestream.write(
    df=df_cpu,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="cpu_utilization",
    dimensions_cols=["index", "region", "az", "hostname"],
)

assert len(rejected_records) == 0

Batch Load MEMORY_UTILIZATION records

[5]:
df_memory = df[df.measure_kind == "memory_utilization"].copy()
df_memory.rename(columns={"measure": "memory_utilization"}, inplace=True)

df_memory
[5]:
index region az hostname measure_kind memory_utilization time
1 1 us-east-1 us-east-1a host-fj2hx memory_utilization 68.563420 2020-12-08 16:18:47.599597
3 3 us-east-1 us-east-1a host-6kMPE memory_utilization 73.507870 2020-12-08 16:18:47.599597
5 5 us-east-1 us-east-1a host-sxj7X memory_utilization 22.401424 2020-12-08 16:18:47.599597
7 7 us-east-1 us-east-1a host-ExOui memory_utilization 45.440135 2020-12-08 16:18:47.599597
9 9 us-east-1 us-east-1a host-Bwb3j memory_utilization 15.042701 2020-12-08 16:18:47.599597
... ... ... ... ... ... ... ...
125991 125991 eu-north-1 eu-north-1c host-aPtc6 memory_utilization 75.686739 2020-12-08 16:18:47.599597
125993 125993 eu-north-1 eu-north-1c host-7ZF9L memory_utilization 18.386152 2020-12-08 16:18:47.599597
125995 125995 eu-north-1 eu-north-1c host-De8RB memory_utilization 68.063468 2020-12-08 16:18:47.599597
125996 125996 eu-north-1 eu-north-1c host-2z8tn memory_utilization 72.203680 2020-12-08 16:18:47.599597
125998 125998 eu-north-1 eu-north-1c host-9FczW memory_utilization 71.746134 2020-12-08 16:18:47.599597

63000 rows × 7 columns

[6]:
response = wr.timestream.batch_load(
    df=df_memory,
    path="s3://bucket/prefix/",
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_cols=["memory_utilization"],
    dimensions_cols=["index", "region", "az", "hostname"],
    measure_name_col="measure_kind",
    report_s3_configuration={"BucketName": "error_bucket", "ObjectKeyPrefix": "error_prefix"},
)
assert response["BatchLoadTaskDescription"]["ProgressReport"]["RecordIngestionFailures"] == 0

Querying CPU_UTILIZATION

[7]:
wr.timestream.query(
    """
    SELECT
        hostname, region, az, measure_name, measure_value::double, time
    FROM "sampleDB"."sampleTable"
    WHERE measure_name = 'cpu_utilization'
    ORDER BY time DESC
    LIMIT 10
"""
)
[7]:
hostname region az measure_name measure_value::double time
0 host-OgvFx us-west-1 us-west-1a cpu_utilization 39.617911 2020-12-08 19:18:47.600
1 host-rZUNx eu-north-1 eu-north-1a cpu_utilization 30.793332 2020-12-08 19:18:47.600
2 host-t1kAB us-east-2 us-east-2b cpu_utilization 74.453239 2020-12-08 19:18:47.600
3 host-RdQRf us-east-1 us-east-1c cpu_utilization 76.984448 2020-12-08 19:18:47.600
4 host-4Llhu us-east-1 us-east-1c cpu_utilization 41.862733 2020-12-08 19:18:47.600
5 host-2plqa us-west-1 us-west-1a cpu_utilization 34.864762 2020-12-08 19:18:47.600
6 host-J3Q4z us-east-1 us-east-1b cpu_utilization 71.574266 2020-12-08 19:18:47.600
7 host-VIR5T ap-east-1 ap-east-1a cpu_utilization 14.017491 2020-12-08 19:18:47.600
8 host-G042D us-east-1 us-east-1c cpu_utilization 60.199068 2020-12-08 19:18:47.600
9 host-8EBHm us-west-2 us-west-2c cpu_utilization 96.631624 2020-12-08 19:18:47.600

Querying MEMORY_UTILIZATION

[8]:
wr.timestream.query(
    """
    SELECT
        hostname, region, az, measure_name, measure_value::double, time
    FROM "sampleDB"."sampleTable"
    WHERE measure_name = 'memory_utilization'
    ORDER BY time DESC
    LIMIT 10
"""
)
[8]:
hostname region az measure_name measure_value::double time
0 host-7c897 us-west-2 us-west-2b memory_utilization 63.427726 2020-12-08 19:18:47.600
1 host-2z8tn eu-north-1 eu-north-1c memory_utilization 41.071368 2020-12-08 19:18:47.600
2 host-J3Q4z us-east-1 us-east-1b memory_utilization 23.944388 2020-12-08 19:18:47.600
3 host-mjrQb us-east-1 us-east-1b memory_utilization 69.173431 2020-12-08 19:18:47.600
4 host-AyWSI us-east-1 us-east-1c memory_utilization 75.591467 2020-12-08 19:18:47.600
5 host-Axf0g us-west-2 us-west-2a memory_utilization 29.720739 2020-12-08 19:18:47.600
6 host-ilMBa us-east-2 us-east-2b memory_utilization 71.544134 2020-12-08 19:18:47.600
7 host-CWdXX us-west-2 us-west-2c memory_utilization 79.792799 2020-12-08 19:18:47.600
8 host-8EBHm us-west-2 us-west-2c memory_utilization 66.082554 2020-12-08 19:18:47.600
9 host-dRIJj us-east-1 us-east-1c memory_utilization 86.748960 2020-12-08 19:18:47.600

Deleting resources

[9]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")