May 04, 2023
Scale From Local to Distributed Cloud Compute with Ibis, PySpark, and Amazon EMR
Philip Moore
Proper use of hardware will lead to working smarter, moving faster, and spending fewer resources. We design and build composable data systems based on deep hardware expertise that leads businesses to maximize the use of CPUs, GPUs, networking, and storage solutions with accelerated data engines.
Ibis is quickly gaining steam as the definitive Python data analytics library due to its ability to leverage different data platforms as execution backends. This differs from pandas, which also provides a data API, but executes workloads locally. Ibis is built with the principle of separating the data API from execution. This lets developers write their data workflows once, and port execution to different backends as needed.
Apache Spark is a ubiquitous open source distributed computing platform used in data workloads everywhere. Amazon Elastic MapReduce (EMR) is a popular platform for provisioning ephemeral Spark clusters in the cloud.
To leverage this, Ibis has a PySpark backend that can run on Spark clusters (including EMR) to leverage cloud-distributed computing for workloads that process large datasets.
Data Workload Development Lifecycle
Many data workloads begin the development phase on a data scientist’s laptop, working with small data samples that reflect a small portion of the overall dataset.
However, even with a subset that reflects the production schema, transitioning from the sample size to the full dataset can result in time-consuming tuning and code rewrites.
Ibis makes developing these types of workloads much easier. One can begin development leveraging the DuckDB local backend with Ibis, and then almost seamlessly “promote” the workload to a larger computing framework such as Spark with minimal code change. This lets them leverage distributed computing capabilities (such as Spark) to divide and conquer the larger production dataset that would not be able to be processed locally.
Let’s look at an example workload that processes the publicly available New York City Taxi Cab dataset. First, we will see the workload as it would run on a developer’s workstation, then we will see it as it would run in PySpark with distributed compute.
Setup
The code examples shared in this article are available in repo:
https://github.com/voltrondata/ibis-taxi-demo/
To run the examples in this article you must have an AWS account and have installed the AWS Client. Follow these steps:
1- Open a terminal with the bash shell and follow these steps:
# Clone the repo
git clone https://github.com/voltrondata/ibis-taxi-demo
# Go to the repo root directory
cd ibis-taxi-demo
# Create a Python 3.8+ virtual environment
python3 -m venv ./venv
# Activate the virtual environment
. ./venv/bin/activate
# Upgrade pip
pip install --upgrade pip
# Install requirements (ibis-framework, etc.)
pip install --requirement requirements.txt
2- Create a file called: .env
in your local repo’s scripts
directory. You can do that with:
touch scripts/.env
3- Authenticate to AWS, and copy the “AWS_” environment variable credentials from the portal. 4.Paste the environment variable credentials, along with your preference for “AWS_DEFAULT_REGION” in the .env file - example contents:
export AWS_ACCESS_KEY_ID="put value from AWS here"
export AWS_SECRET_ACCESS_KEY="put value from AWS here"
export AWS_SESSION_TOKEN="put value from AWS here"
export AWS_DEFAULT_REGION="us-east-1"
Note: You’ll have to refresh the contents of the .env file when your token expires.
Running Locally
Let’s start with a typical step in the data workflow development lifecycle - local development.
First, download a sample of the taxi dataset to the local ./data
folder (it is about a 443MB download):
pushd scripts
./get_data.sh
popd
Now that you have the sample data, you can run the Ibis workload locally using the local DuckDB backend.
In file: ibis_demo_duckdb.py - we see the following code:
import ibis
from ibis import _
import pandas as pd
from pathlib import Path
# Setup pandas
pd.set_option("display.width", 0)
pd.set_option("display.max_columns", 99)
pd.set_option("display.max_colwidth", None)
pd.set_option("display.float_format", '{:,.2f}'.format)
# Setup some Path constants
SCRIPT_DIR = Path(__file__).parent.resolve()
LOCAL_DATA_DIR = (SCRIPT_DIR / "data").resolve()
# Get our DuckDB connection (to an ephemeral in-memory database)
con = ibis.duckdb.connect(threads=4, memory_limit="1GB")
# This assumes you've run the "get_data.sh" shell script to download the parquet file locally
trip_data = con.read_parquet((LOCAL_DATA_DIR / "fhvhv_tripdata_2022-11.parquet").as_posix())
# Create new metric for total cost of a trip
trip_data = trip_data.mutate(total_amount=_.base_passenger_fare + _.tolls + _.sales_tax + _.congestion_surcharge + _.tips)
# Aggregate the trip data, grouping by license number
trip_summary = (trip_data.group_by([_.hvfhs_license_num])
.aggregate(
trip_count=_.count(),
trip_miles_total=_.trip_miles.sum(),
trip_miles_avg=_.trip_miles.mean(),
cost_total=_.total_amount.sum(),
cost_avg=_.total_amount.mean()
)
)
# Display the generated SQL
ibis.show_sql(trip_summary)
print(trip_summary.execute())
That Ibis code will read the “For-hire vehicle (FHV) trips” data for November 2022 and aggregate it grouping by the hvfhs_license_num
column to get a count of rows, sum, and average of the trip miles, and sum, and average of the cost of the trips.
Run the code with:
python ibis_demo_duckdb.py
Once it completes, we see the results:
hvfhs_license_num trip_count trip_miles_total trip_miles_avg cost_total cost_avg
0 HV0003 12968005 65,717,423.00 5.07 390,289,500.33 30.10
1 HV0005 5117891 25,352,368.39 4.95 137,708,332.74 26.91
That code just processed 18,085,896 rows of data - a decent chunk of data for local execution. If we were to process the entire dataset, however - we would need to download nearly 20GB of parquet files locally first, and then we would have to process it using a local back-end. Luckily, transitioning to distributed compute (Spark / EMR) is easy with Ibis!
Running with PySpark via EMR
The overall Taxi dataset represents much more data than just one month from 2022. To process the entire “For-hire vehicle (FHV) trips” dataset requires more resources than most developers have available on their personal workstations. Typically, very large datasets require the use of distributed compute systems such as EMR Spark.
Important: to run with EMR, you’ll need to have an AWS account which has privileges to create S3 buckets and provision EMR clusters in your AWS Account.
EMR Setup steps
We provide convenient shell scripts in our repository that simplify EMR setup.
1- Run the script which will create an EMR logging bucket (to hold the log files from your EMR cluster):
pushd scripts
# Create an EMR logging bucket
./create_emr_log_bucket.sh
2- Run the script which will create a bootstrap script bucket - and then upload our bootstrap shell script which will upgrade the default EMR cluster Python version from 3.7 to 3.10. The Python version must be 3.8 or above in order to use the latest Ibis features:
# Create an EMR bootstrap bucket, and upload the bootstrap shell script for upgrading Python and installing Ibis requirements in EMR
./create_bucket_and_upload_bootstrap_script.sh
Note: The bootstrapping is needed for the current latest EMR version: 6.10.0 - which has Python 3.7 installed. Future versions of EMR may make the step unnecessary.
Provisioning an interactive, ephemeral EMR Cluster
With the EMR setup steps completed, we will now provision an interactive EMR Cluster that we can connect to via ssh. This will let us run an interactive PySpark shell for running our Ibis workload with the much larger dataset.
Note: The EMR cluster we provision here will consist of 1 master node, and 4 worker nodes - of machine type: m5.xlarge (more info here) - this cluster costs $0.1920 for each node per hour on-demand. The EMR cluster will self-terminate after you have left it idle for one hour.
1- Run this command to create the EMR cluster:
./provision_emr_spark_cluster.sh
The script will first create an SSH key and place it into the scripts/.ssh
directory. This is needed to connect to the EMR master node. The SSH key file will be git ignored for security reasons.
Then, the script will run for several minutes while it provisions the EMR cluster, then upgrades Python and installs Ibis.
Once it completes, it should have an output like this:
...
Cluster status: BOOTSTRAPPING
Cluster status: WAITING
Cluster is ready!
Use this SSH command to connect to the EMR cluster:
ssh -i ./.ssh/keypair.pem hadoop@ec2-xx-xxx-xx-xxx.compute-1.amazonaws.com
2- Now connect with the ssh command output by the script (not the example one above).
When you connect for the first time, ssh will ask: `Are you sure you want to continue connecting (yes/no/[fingerprint])?”`
Type: `yes`
then hit Enter.
3- Run command: pyspark
to start the interactive PySpark shell
4- Once the pyspark shell has initialized, go ahead and paste this script into the shell to run our Ibis code with Spark distributed compute. We also show the code below with some annotations:
import ibis
from ibis import _
import pandas as pd
from pyspark.sql import SparkSession
# Setup pandas
pd.set_option("display.width", 0)
pd.set_option("display.max_columns", 99)
pd.set_option("display.max_colwidth", None)
pd.set_option("display.float_format", '{:,.2f}'.format)
# A. Get a Spark Session
spark = SparkSession \
.builder \
.appName(name="Ibis-Rocks!") \
.getOrCreate()
# B. Connect the Ibis PySpark back-end to the Spark Session
con = ibis.pyspark.connect(spark)
# C. Read the parquet data into an ibis table
trip_data = con.read_parquet("s3://nyc-tlc/trip data/fhvhv_tripdata_*.parquet")
trip_data = trip_data.mutate(total_amount=_.base_passenger_fare + _.tolls + _.sales_tax + _.congestion_surcharge + _.tips)
trip_summary = (trip_data.group_by([_.hvfhs_license_num])
.aggregate(
trip_count=_.count(),
trip_miles_total=_.trip_miles.sum(),
trip_miles_avg=_.trip_miles.mean(),
cost_total=_.total_amount.sum(),
cost_avg=_.total_amount.mean()
)
)
print(trip_summary.execute())
Please note, the script above differs from the local version in just a few places:
A. We get a Spark session
B. We connect the spark session to the Ibis PySpark backend.
C. We use Ibis 5.1’s new PySpark “read_parquet” method to read the parquet data into an Ibis table expression
Note: One more difference from the local version of the code is that we can no longer use ibis.show_sql - this is because the Ibis PySpark backend is not SQL-based.
Once our code completes execution, we see the following output:
...
>>> print(trip_summary.execute())
hvfhs_license_num trip_count trip_miles_total trip_miles_avg cost_total cost_avg
0 HV0005 201571786 1,008,864,430.87 5.00 5,121,759,734.73 25.41
1 HV0003 561586224 2,684,517,922.50 4.78 13,824,901,291.98 24.62
2 HV0002 6388934 27,129,772.50 4.25 122,262,575.55 20.81
3 HV0004 13884957 53,652,525.62 3.86 201,049,319.02 14.48
This time, we processed 783,431,901 rows of data, over 43x more than our local example’s 18 million rows. This was made possible by the power of Spark’s distributed compute capabilities, and, thanks to Ibis, we barely had to change any code.
Summary
In this article, we showed how we can use Ibis for the local and production development lifecycles with nearly identical code. This lets developers write, execute, and test data workflows locally - and then seamlessly scale up to production workloads with distributed computing back-ends such as PySpark.
We also showed how to provision an EMR cluster with an upgraded Python environment capable of running Ibis’ latest features.
If you’re interested in using Ibis to effectively scale your workloads with backends like PySpark, Snowflake, pandas, Druid, and more, see how Voltron Data Enterprise Support can help.