34 - Distributing Calls Using Ray¶
AWS SDK for pandas supports distribution of specific calls using ray and modin.
When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale.
[1]:
!pip install "awswrangler[modin,ray,redshift]"
Importing awswrangler
when ray
and modin
are installed will automatically initialize a local Ray instance.
[2]:
import awswrangler as wr
import modin.pandas as pd
print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")
2022-10-24 14:59:36,287 INFO worker.py:1518 -- Started a local Ray instance.
Execution Engine: EngineEnum.RAY
Memory Format: MemoryFormatEnum.MODIN
Read data at scale¶
Data is read using all cores on a single machine or multiple nodes on a cluster
[3]:
df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Furniture/")
df.head(5)
Read progress: 100%|██████████| 10/10 [01:10<00:00, 7.03s/it]
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[3]:
marketplace | customer_id | review_id | product_id | product_parent | product_title | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 35680291 | R34O1VWWYVAU9A | B000MWFEV6 | 406798096 | Baxton Studio Full Leather Storage Bench Ottom... | 5 | 1 | 1 | N | Y | High quality and roomy | I bought this bench as a storage necessity as ... | 2009-05-17 | 2009 |
1 | US | 21000590 | RU1I9NHALXPW5 | B004C1RULU | 239421036 | Alera Fraze Series Leather High-Back Swivel/Ti... | 3 | 8 | 9 | N | Y | Do not judge the chair on the first day alone. | Received this chair really fast because I had ... | 2012-06-29 | 2012 |
2 | US | 12140069 | R2O8R9CLCUQTB8 | B000GFWQDI | 297104356 | Matching Cherry Printer Stand with Casters and... | 5 | 4 | 4 | N | Y | Printer stand made into printer / PC stand | I wanted to get my pc's off the floor and off ... | 2009-05-17 | 2009 |
3 | US | 23755701 | R12FOIKUUXPHBZ | B0055DOI50 | 39731200 | Marquette Bed | 5 | 6 | 6 | N | Y | Excellent Value!! | Great quality for the price. This bed is easy ... | 2012-06-29 | 2012 |
4 | US | 50735969 | RK0XUO7P40TK9 | B0026RH3X2 | 751769063 | Cape Craftsman Shutter 2-Door Cabinet | 3 | 12 | 12 | N | N | Nice, but not best quality | I love the design of this cabinet! It's a very... | 2009-05-17 | 2009 |
The data type is a modin DataFrame
[4]:
type(df)
[4]:
modin.pandas.dataframe.DataFrame
However, this type is interoperable with standard pandas calls:
[5]:
filtered_df = df[df.helpful_votes > 10]
excluded_columns = ["product_title", "review_headline", "review_body"]
filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]
Enter your bucket name:
[6]:
bucket = "BUCKET_NAME"
Write data at scale¶
The write operation is parallelized, leading to significant speed-ups
[7]:
result = wr.s3.to_parquet(
filtered_df,
path=f"s3://{bucket}/amazon-reviews/",
dataset=True,
dtype={"review_date": "timestamp"},
)
print(f"Data has been written to {len(result['paths'])} files")
Write Progress: 100%|██████████| 10/10 [00:21<00:00, 2.14s/it]
Data has been written to 10 files
Copy to Redshift at scale…¶
Data is first staged in S3 then a COPY command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster
[8]:
# Connect to the Redshift instance
con = wr.redshift.connect("aws-sdk-pandas-redshift")
path = f"s3://{bucket}/stage/"
iam_role = "IAM_ROLE"
schema = "public"
table = "amazon_reviews"
wr.redshift.copy(
df=filtered_df,
path=path,
con=con,
schema=schema,
table=table,
mode="overwrite",
iam_role=iam_role,
max_rows_by_file=None,
)
Repartition: 100%|██████████| 1/1 [00:00<00:00, 1.42it/s]
Write Progress: 100%|██████████| 1/1 [00:06<00:00, 6.19s/it]
… and UNLOAD it back¶
Parallel calls can also be leveraged when reading from the cluster. The UNLOAD command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe
[9]:
wr.redshift.unload(
sql=f"SELECT * FROM {schema}.{table} where star_rating = 5",
con=con,
iam_role=iam_role,
path=path,
keep_files=True,
)
2022-10-20 11:20:02,369 WARNING read_api.py:291 -- ⚠️ The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|██████████| 2/2 [00:01<00:00, 1.41it/s]
[9]:
marketplace | customer_id | review_id | product_id | product_parent | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 23875938 | RC5BC3HYUV324 | B000EPKLFA | 878266274 | 5 | 15 | 17 | N | Y | 2009-07-12 | 2009 |
1 | US | 22174246 | R3MFRIKP6HMH0W | B001NJ4J6I | 394928248 | 5 | 20 | 23 | N | Y | 2009-07-19 | 2009 |
2 | US | 52886745 | R1T9C0QELFI939 | B0012ZNNR4 | 364197484 | 5 | 32 | 33 | N | N | 2009-07-24 | 2009 |
3 | US | 14527742 | R2CIP31EO2GXDK | B000M5Z98G | 199037166 | 5 | 12 | 12 | N | Y | 2009-08-23 | 2009 |
4 | US | 41393002 | R29IOXB832QR6L | B0071HBVYE | 956030824 | 5 | 16 | 16 | N | Y | 2012-07-12 | 2012 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
16022 | US | 20481704 | R2KV325KBKDKL8 | B00G701H5E | 703622282 | 5 | 16 | 16 | N | N | 2014-11-06 | 2014 |
16023 | US | 37023256 | R1FJT6UF7KM8GV | B005VY8U8Y | 220718418 | 5 | 23 | 25 | N | Y | 2014-11-08 | 2014 |
16024 | US | 24286944 | R1RSIZBY4Z3PF2 | B00LNCDGKU | 934098561 | 5 | 47 | 49 | N | Y | 2014-11-14 | 2014 |
16025 | US | 15276457 | R31YFDIUQ2HI2X | B005KFHWPG | 310427061 | 5 | 19 | 20 | N | Y | 2014-11-15 | 2014 |
16026 | US | 52215985 | R11U6K1OIDEUKH | B00NEJ4Y4M | 22567782 | 5 | 62 | 67 | Y | N | 2014-11-16 | 2014 |
16027 rows x 12 columns
Find a needle in a hay stack with S3 Select¶
[10]:
# Run S3 Select query against all objects in the category for a given customer ID
wr.s3.select_query(
sql="SELECT * FROM s3object s where s.\"customer_id\" = '51624146'",
path="s3://amazon-reviews-pds/parquet/product_category=Office_Products/*.parquet",
input_serialization="Parquet",
input_serialization_params={},
scan_range_chunk_size=32*1024*1024,
)
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[10]:
marketplace | customer_id | review_id | product_id | product_parent | product_title | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 51624146 | RU9SWH8SHOBBS | B001ERDENS | 658861629 | LINKYO Compatible Toner Cartridge Replacement ... | 5 | 0 | 0 | N | Y | Perfect fit for my HP LaserJet M1522 nf | I will never buy "official" toner cart... | 2013-07-12 | 2013 |
1 | US | 51624146 | RAO9QADXC9TUH | B00GJQA4TG | 184072656 | SuperChalks White Liquid Chalk Marker Pens 4-P... | 4 | 0 | 0 | N | Y | Smooth flowing "ink, " but these markers left ... | Smooth flowing "ink," but these marker... | 2014-10-06 | 2014 |
2 | US | 51624146 | R1D94CA7TKY9DU | B000MK647G | 396184528 | Fax Toner Cartridge for Brother IntelliFax 575... | 5 | 0 | 0 | N | Y | Came quickly, works great | I bought four of these for my office. Just kno... | 2014-03-26 | 2014 |