Real-world Python workloads on Spark: EMR clusters

--

There are countless articles and forum posts about running Python on Spark, but most assume that the work to be submitted is contained in a single .py file: spark-submit wordcount.py — done!

What if your Python program is more than just a script? Perhaps it generates dynamic SQL for Spark to execute, or refreshes models using Spark’s output. As your Python code becomes more of an app (with a directory structure, configuration files, and library dependencies), submitting it to Spark requires a bit more consideration.

Below are the alternatives I considered when taking one such Python application to production using Spark 2.3.

This is the second article of a series. The first article covered Spark standalone clusters.

Trending AI Articles:

1. Predicting buying behavior using Machine Learning

2. Understanding and building Generative Adversarial Networks(GANs)

3. Building a Django POST face-detection API using OpenCV and Haar Cascades

4. Learning from mistakes with Hindsight Experience Replay

Sample Python application

To simulate a complete application, the scenarios below assume a Python 3 application with the following structure:

project.py
data/
data_source.py
data_source.ini

data_source.ini contains various configuration parameters:

[spark]
app_name = My PySpark App
master_url = yarn

Note: yarn is the only valid value for master URL in YARN-managed clusters.

data_source.py is a module responsible for sourcing and processing data in Spark, making math transformations with NumPy, and returning a Pandas dataframe to the client. Dependencies:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType
import pandas as pd
import numpy as np
import configparser

It defines a DataSource class that creates a SparkContext and SparkSession on initialization…

class DataSource:
def __init__(self):
config = configparser.ConfigParser()
config.read('./data/data_source.ini')
master_url = config['spark']['master_url']
app_name = config['spark']['app_name']
conf = SparkConf().setAppName(app_name) \
.setMaster(master_url)
self.sc = SparkContext(conf=conf)
self.spark = SparkSession.builder \
.config(conf=conf) \
.getOrCreate()

…and a get_data() method that:

  1. Creates an RDD from a NumPy normal distribution.
  2. Applies a function to double the value of every element.
  3. Converts the RDD into a Spark dataframe and defines a temp view on top.
  4. Applies a Python UDF that squares the contents of every dataframe element using SQL.
  5. Returns the results to the client as a Pandas dataframe.
def get_data(self, num_elements=1000) -> pd.DataFrame:    mu, sigma = 2, 0.5
v = np.random.normal(mu, sigma, num_elements)
rdd1 = self.sc.parallelize(v)
def mult(x): return x * np.array([2])
rdd2 = rdd1.map(mult).map(lambda x: (float(x),))
schema = StructType([StructField("value", FloatType(), True)])
df1 = self.spark.createDataFrame(rdd2, schema)
df1.registerTempTable("test")
def square(x): return x ** 2
self.spark.udf.register("squared", square)
df2 = self.spark.sql("SELECT squared(value) squared FROM test")
return df2.toPandas()

project.py is our main program, acting as a client of the above module:

from data.data_source import DataSourcedef main():
src = DataSource()
df = src.get_data(num_elements=100000)
print(f"Got Pandas dataframe with {df.size} elements")
print(df.head(10))
main()

Clone the repo: https://bitbucket.org/calidoteam/pyspark.git

Before we begin, let’s review the options available when submitting work to Spark.

spark-submit, client and cluster modes

  • Spark supports various cluster managers: Standalone (i.e. built into Spark), Hadoop’s YARN, Mesos, Kubernetes, all of which control how your workload runs on a set of resources.
  • spark-submit is the only interface that works consistently with all cluster managers. For Python applications, spark-submit can upload and stage all dependencies you provide as .py, .zip or .egg files when needed.
  • In client mode, your Python program (i.e. driver) will run on the same host where spark-submit runs. It is in your best interest to make sure such host is close to your worker nodes to reduce network latency.
  • In cluster mode, your Python program (i.e. driver) and dependencies will be uploaded to and run from some worker node. This is useful when submitting jobs from a remote host. As of Spark 2.4.0 cluster mode is not an option when running on Spark standalone.
  • Alternatively, it is possible to bypass spark-submit by configuring the SparkSession in your Python app to connect to the cluster. This requires the right configuration and matching PySpark binaries. Your Python app will effectively be running in client mode: It will run from wherever host you launched it.

The following sections describe several deployment alternatives and what configuration was required in each.

PySpark on EMR clusters

The next sections focus on Spark on AWS EMR, in which YARN is the only cluster manager available.

Just like with standalone clusters, the following additional configuration must be applied during cluster bootstrap to support our sample app:

  • Install Python 3.6 in all nodes by means of a custom bootstrap action (i.e. shell script available from S3).
  • Configure Spark to use Python 3.6.
  • Enable Apache Arrow for Java-Python serialization performance.
  • Maximize resource allocation if the cluster is going to be dedicated to a single workload at a time.

Sample aws-cli / CloudFormation configuration section for the last three items above:

[
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"ConfigurationProperties": {
"PYSPARK_PYTHON": "/usr/bin/python3",
"PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"

}
}
]
},
{
"Classification": "spark-defaults",
"ConfigurationProperties": {
"spark.sql.execution.arrow.enabled": "true"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]

Note: EMR Release 5.19.0 was used for this writeup.

#1: Cluster mode using the Step API

Assuming a running EMR Spark cluster, the first deployment scenario is the recommended one: Submit a job using the Step API in cluster mode.

Python app launched within the EMR master node in cluster mode

In order to upload all dependencies, it is necessary to package the application either in .zip or .egg format. I chose zip files for simplicity. While at the project root directory, create a zip file of all subdirectories (or just everything in the current directory, recursing into subdirectories).

project.py
data/
data_source.py
data_source.ini
$ zip -r project.zip *project.py
project.zip
data/
data_source.py
data_source.ini

Stage all files to an S3 bucket:

Python app staged to S3

Using EMR’s Step framework, we spark-submit the application by passing the path to our main program (project.py), Python module dependencies (project.zip), and configuration files (data/data_source.ini). The latter is required because config can’t be resolved inside the supplied zip file:

Add step dialog in the EMR console

The above is equivalent to issuing the following from the master node:

$ spark-submit --master yarn 
--deploy-mode cluster
--py-files project.zip
--files data/data_source.ini
project.py

The above requires a minor change to the application to avoid using a relative path when reading the configuration file:

class DataSource:
def __init__(self):
config = configparser.ConfigParser()
# config.read('./data/data_source.ini')
config.read('data_source.ini')
master_url = config['spark']['master_url']
app_name = config['spark']['app_name']
conf = SparkConf().setAppName(app_name) \
.setMaster(master_url)
self.sc = SparkContext(conf=conf)
self.spark = SparkSession.builder \
.config(conf=conf) \
.getOrCreate()

All files are uploaded to the cluster’s local HDFS filesystem, staged for execution and deleted immediately upon completion. From log output:

Uploading resource file:/home/hadoop/pyspark/project.py -> hdfs://<ip>/user/hadoop/.sparkStaging/application_<id>/project.pyUploading resource file:/home/hadoop/pyspark/project.zip -> hdfs://<ip>/user/hadoop/.sparkStaging/application_<id>/project.zipUploading resource file:/home/hadoop/pyspark/data/data_source.ini -> hdfs://<ip>/user/hadoop/.sparkStaging/application_<id>/data_source.ini

Once an application context has been created, the Environment page of the Spark web console confirms that our application is running in cluster mode:

Spark UI’s environment page when running in cluster mode

At runtime, it’s possible to observe YARN slaves busy running multiple python3 processes:

Python processes on EMR core/YARN slave nodes

Log output is available from a number of sources: Directly from the EMR console, YARN and Spark job history web interfaces, and from yarn logs while logged into the master node:

yarn logs -applicationId application_1551148204560_0338

Takeaways:

  • Pros: Robust support since it is the recommended approach. Ideal when submitting workloads from a remote host. No network bandwidth concerns since driver app runs inside the cluster.
  • Cons: Need to break application into pieces in order to submit it, which requires some experimentation to get right. Need to stage application files in S3. Minor code changes likely needed to load file resources. Driver competes for resources with other cluster processes, can lead to out-of-memory errors. Negates the possibility of containerizing the application.

#2: Client mode using spark-submit from the master node

This alternative takes advantage of the resources, proximity and software already configured in the master node.

Python app launched within the EMR master node in client mode

Assuming that the Spark is already configured as described at the beginning of this article, and that SSH access to the master node is available, submitting the application is straightforward:

Stage application files into the master node: This may require installing git into the node. Logged in as the hadoop user:

$ sudo yum install git
$ git clone https://bitbucket.org/calidoteam/pyspark.git

Set environment variables in preparation for runningspark-submit:

export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export PYTHONPATH=$SPARK_HOME/python
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3

In particular, HADOOP_CONF_DIR tells Spark about the YARN cluster configuration needed to submit the application.

Submit the workload from the project’s root directory:

$ cd my-project/
$ spark-submit --master yarn --deploy-mode client project.py

Note that, unlike cluster mode, it isn’t necessary to specify Python dependencies or configuration files when submitting applications in client mode, since the application is run in place.

Spark UI’s environment page when running in client mode

Note: Jobs on EMR may error out due to incorrect paths for stdout and stderr:

log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException: /stderr (Permission denied)
log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException: /stdout (Permission denied)

Correct them by providing a writable path for both files:

$ vi /etc/spark/conf/log4j.propertieslog4j.appender.DRFA-stderr.file=<path_to>/stdout
log4j.appender.DRFA-stderr.file=<path_to>/stderr

Takeaways:

  • Pros: Ease of deployment, since the Python app runs unchanged from wherever it was staged, and most configuration is already in place in the master node.
  • Cons: Needs git to stage application files, possibly automated through a custom bootstrap action. Needs SSH access to master node, or creating a custom EMR step to run a triggering shell script.

#3: Client mode on EMR Spark from a Docker container

Finally, if containerizing the Python app is a design goal, it is possible to submit the workload from a container running on the master node, thus taking advantage of the configuration and software already present.

Containerized Python app launch from the EMR master node (cluster mode)

As of EMR release 5.19.0, Docker will be preconfigured in the master node whenever JupyterHub is chosen when provisioning the cluster. And just like in scenario #3 of my previous post, it’s necessary to:

Build the container to include all dependencies: Start from an image that includes Python 3 and/or the Java 8 OpenJDK, then pip-install PySpark, PyArrow and all other libraries required by the application.

Mount configuration directories from the master: When running on YARN, Spark needs to have YARN configuration files available. Since cluster configuration files change with every EMR cluster, it’s both convenient and necessary to mount into the container key configuration and library directories.

Configure host networking: An alternative to mapping ports for driver-executor communication (as described in scenario #3 of the previous post) consists in using Docker’s host networking, whereby the container and its host share the same network, making outside communication possible.

Running it

Putting the above configuration requirements together, the docker command would be as follows:

$ docker run -v /etc/hadoop/conf:/etc/hadoop/conf 
-v /etc/spark/conf:/etc/spark/conf
-v /usr/lib/spark:/usr/lib/spark
-v /usr/share/aws:/usr/share/aws
--network host
<docker_image_url>

The application will run inside the container, and use YARN and Spark configuration mounted into it to create an application context. From that point on, the job will run in client mode as in scenario #2.

Takeaways:

  • Pros: Support for containers, making it possible to run transparently against multiple environments. Python app runs unchanged from the container. Takes advantage of existing configuration in the master node.
  • Cons: Container build requires a separate CI pipeline. Needs SSH access to master node, or creating a custom EMR step to run a triggering shell script.

Don’t forget to give us your 👏 !

--

--