Visualizing DAGs in Spark

Written on 2019-02-27

The goal of this post is to provide a general introduction to the RDD API. Each example has a snippet of PySpark code with explanations. Another goal is to provide a general introduction to Spark's web UI. Certain examples have DAG visualizations for jobs and stages. Spark starts a web UI for each SparkContext that is initialized.

This will only include rudimentary examples of methods in the RDD API. For more detailed illustrations and explanations of these concepts, refer to this post and this article. In the coming posts, we'll dive deeper into more generic objects in the Spark API. Then, we'll explore low-level concepts, including the Spark internals.

Table of Contents

Setting up a SparkSession

Before walking through any code examples, we need to create a SparkSession. Each SparkSession acts as an entry point into Spark programming with RDDs. After executing the setup code, we'll be able to use the session in our examples below.

The builder object of a SparkSession provides access to the associated application name, associated master URL, and configuration options. It also provides access to a getOrCreate method, which initializes a SparkSession after setting options in the builder object.

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession \
...         .builder \
...         .appName('TxtRdr') \
...         .getOrCreate()
>>> sc = spark.sparkContext

Some examples use a set of files named hello.txt and order.txt. A SparkContext will read these files using the textFile method. After creating an RDD, we'll call methods to transform and filter the data in the files listed below.

hello.txt:

Hello world
This is a file

order.txt:

Hello friend
This file is my order
Burger, fries, soda

Notice, there are two similar words found in both files. Specifically, these words are Hello and file. In the upcoming examples, we'll run some standard Python code and Spark code to find these words. Then, we'll count the number of each word using Spark.

Using an RDD with Python

Before performing any transformations, the data needs to be read using the textFile method. Specifically, the SparkContext will read the hello.txt and orders.txt files using this method, which returns an RDD object. After calling this method, the returned RDD essentially contains a list of strings, where each string represents a line from the file.

The flatMap method maps a function that is defined using the lambda keyword to our RDD. In this example, our function separates each line into individual strings based on any spaces. Although it isn't used here, the map method is very similar to the flatMap method. Specifically, the map method performs an extra step, which involves storing these individual strings into a list for each line.

Ultimately, the collect method is called, which returns all the elements from the RDD as a list to the driver program. This method is an action. In other words, the transformations prior to the collect method aren't computed until the collect method is called. The code snippet below creates an RDD and performs the described operations on the hello.txt file.

>>> hello = sc.textFile('hello.txt') \
...           .flatMap(lambda c: c.split(' '))
...           .collect()

For this example, there's still a second file with words that should be counted by the same Spark program. Through a similar process as before, the collect method is called for reading in the order.txt file. Additionally, the other methods are called for function mapping and data collection.

>>> order = sc.textFile('order.txt') \
...           .flatMap(lambda c: c.split(' ')) \
...           .collect()

After executing these two code snippets, the resulting RDD includes a long list of each word in both files. Since the collect method outputs a list from the RDD, we can perform standard Python functions on it. As an example, we can use Python to find the common words between the two RDDs.

>>> list(set(hello) & set(order))
['Hello', 'file']

Counting Words from Files

Now, we may prefer to compute some of these operations in Spark. By performing filters in CPython, we will lose the benefit of distributed computation in Spark. If hello.txt and orders.txt are much larger files, or if we instead count the words of thousands of files, filtering in Spark becomes much faster compared to filtering in ordinary CPython on a single machine.

To illustrate this point, let's go through an example that is very similar to the previous one, but ultimately performs counting and filtering.

>>> hello = sc.textFile('hello.txt')
>>> order = sc.textFile('order.txt')
>>> counts = hello.union(order) \
...               .flatMap(lambda w: w.split(' ')) \
...               .map(lambda c: (c, 1)) \
...               .reduceByKey(add) \
...               .filter(lambda t: t[1] > 1) \
...               .collect()

Again, calling the flatMap method returns an RDD containing individual words as strings. The map method converts each string into a tuple containing words and their frequency. Then, the reduceByKey operation loops through each key-value pair and adds up the values for any repeated keys. Lastly, the filter method will only include key-value pairs, where the value is greater than one.

>>> counts
[('Hello', 2), ('file', 2)]

As expected, the program outputs two key-value pairs. The first key-value pair is ('Hello', 2), since there are two of these keys in both files. Similarly, the second key-value pair is ('file', 2), since there are two of these keys in both files.

Visualizing the DAG

The number of jobs for an application equals the number of RDD actions. In our example, we can see there is a single job. This is because there is only one action, which is collect. Instead of having a single job, we would have two jobs with this code:

>>> hello = sc.textFile('hello.txt').collect()
>>> order = sc.textFile('order.txt').collect()

The number of additional stages equals the number of wide transformations in an application. In our example, we can see there are two stages in total, but only a single additional stage. This is because there is only one wide transformation, which is reduceByKey. Notice, the web UI in Spark provides a nice visualization of the DAG.

wordcounts

Recall, the number of tasks within a stage equals the number of partitions in an RDD. By default, Spark assigns the number of paritions to be two. Meaning, the default number of tasks per stage is two. The number of partitions is assigned to an RDD when it is initialized. Thus, this parameter can be adjusted when calling textFile(file, num_partitions). Therefore, our stage will have six tasks (instead of four) if we change this line of code:

>>> hello = sc.textFile('hello.txt', 6)

Since Spark assigns the number of paritions of an RDD during initialization, the number of tasks are determined after shuffling as well. This is because Spark creates a new RDD after shuffling. This object is called a ShuffleRDD. For a more detailed description of how tasks are separated and organized in Spark, refer to this post.

Designed and developed by Darius Kharazi © 2020

Built with Gatsby