2차정렬 문제의 Spark 해결책

To solve a secondary sorting problem in Spark, we have at least two options: Option #1Read and buffer all of the values for a given key in an Array or List data structure and then do an in-reducer sort on the values. This solution works if you have a small set of values (which will fit in memory) per reducer key.Option #2Use the Spark framework for sorting the reducer values (this option does not require in-reducer sorting of values passed to the reducer). This approach involves “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” This option always scales (because you are not limited by the memory of a commodity server).

Time Series as Input

To demonstrate secondary sorting, let’s use time series data: name time value

  x    2    9
  y    2    5
  x    1    3
  y    1    7
  y    3    1
  x    3    6
  z    1    4
  z    2    8
  z    3    7
  z    4    0
  p    2    6
  p    4    7
  p    1    9
  p    6    0
  p    7    3

Expected Output

Our expected output is as follows. Note that the values of reducers are grouped by name and sorted by time:

name   t1   t2  t3   t4   t5 ...
x =>  [3,   9,  6]
y =>  [7,   5,  1]
z =>  [4,   8,  7,   0]
p =>  [9,   6,  7,   0,   3]

Option 1: Secondary Sorting in Memory

Since Spark has a very powerful and high-level API, I will present the entire solution in a single Java class. The Spark API is built upon the basic abstraction concept of the RDD (resilient distributed data set). To fully utilize Spark’s API, we have to understand RDDs. An RDD<T> (i.e., an RDD of type T) object represents an immutable, partitioned collection of elements (of type T) that can be operated on in parallel. The RDD<T> class contains the basic MapReduce operations available on all RDDs, such as map(), filter(), and persist(), while the JavaPairRDD<K,V> class contains MapReduce operations such as mapToPair(), flatMapToPair(), and groupByKey(). In addition, Spark’s PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as reduce(), groupByKey(), and join(). (For details on RDDs, see Spark’s API and Appendix B of this book.) Therefore, JavaRDD<T> is a list of objects of type T, and JavaPairRDD<K,V> is a list of objects of type Tuple2<K,V> (where each tuple represents a key-value pair).

The Spark-based algorithm is listed next. Although there are 10 steps, most of them are trivial and some are provided for debugging purposes only:

  1. We import the required Java/Spark classes. The main Java classes for MapReduce are given in the org.apache.spark.api.java package. This package includes the following classes and interfaces:

    • JavaRDDLike (interface)

    • JavaDoubleRDD

    • JavaPairRDD

    • JavaRDD

    • JavaSparkContext

    • StorageLevels

  2. We pass input data as arguments and validate.

  3. We connect to the Spark master by creating a JavaSparkContext object, which is used to create new RDDs.

  4. Using the context object (created in step 3), we create an RDD for the input file; the resulting RDD will be a JavaRDD<String>. Each element of this RDD will be a record of time series data: <name><,><time><,><value>.

  5. Next we want to create key-value pairs from a JavaRDD<String>, where the key is the name and the value is a pair of (time, value). The resulting RDD will be a JavaPairRDD<String, Tuple2<Integer, Integer>>.

  6. To validate step 5, we collect all values from the JavaPairRDD<> and print them.

  7. We group JavaPairRDD<> elements by the key (name). To accomplish this, we use the groupByKey() method.

The result will be the RDD:

JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>>

Note that the resulting list (Iterable<Tuple2<Integer, Integer>>) is unsorted. In general, Spark’s reduceByKey() is preferred over groupByKey() for performance reasons, but here we have no other option than groupByKey() (since reduceByKey() does not allow us to sort the values in place for a given key).

  1. To validate step 7, we collect all values from the JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> and print them.

  2. We sort the reducer’s values to get the final output. We accomplish this by writing a custom mapValues() method. We just sort the values (the key remains the same).

  3. To validate the final result, we collect all values from the sorted JavaPairRDD<> and print them.

A solution for option #1 is implemented by a single driver class: SecondarySorting (see Example 1-7). All steps, 1–10, are listed inside the class definition, which will be presented in the following sections. Typically, a Spark application consists of a driver program that runs the user’s main() function and executes various parallel operations on a cluster. Parallel operations will be achieved through the extensive use of RDDs. For further details on RDDs, see Appendix B.

Example 1-7. SecondarySort class overall structure

 1 // Step 1: import required Java/Spark classes
 2 public class SecondarySort {
 3   public static void main(String[] args) throws Exception {
 4     // Step 2: read input parameters and validate them
 5     // Step 3: connect to the Spark master by creating a JavaSparkContext
 6     // object (ctx)
 6     // Step 4: use ctx to create JavaRDD<String>
 7     // Step 5: create key-value pairs from JavaRDD<String>, where
 8     // key is the {name} and value is a pair of (time, value)
 9     // Step 6: validate step 5-collect all values from JavaPairRDD<>
10     // and print them
11     // Step 7: group JavaPairRDD<> elements by the key ({name})
12     // Step 8: validate step 7-collect all values from JavaPairRDD<>
13     // and print them
14     // Step 9: sort the reducer's values; this will give us the final output
15     // Step 10: validate step 9-collect all values from JavaPairRDD<>
16     // and print them
17
18     // done
19     ctx.close();
20     System.exit(0);
21   }
22 }

Step 1: Import required classes

As shown in Example 1-8, the main Spark package for the Java API is org.apache.spark.api.java, which includes the JavaRDD, JavaPairRDD, and JavaSparkContext classes. JavaSparkContext is a factory class for creating new RDDs (such as JavaRDD and JavaPairRDD objects).

Example 1-8. Step 1: Import required classes

 1 // Step 1: import required Java/Spark classes
 2 import scala.Tuple2;
 3 import org.apache.spark.api.java.JavaRDD;
 4 import org.apache.spark.api.java.JavaPairRDD;
 5 import org.apache.spark.api.java.JavaSparkContext;
 6 import org.apache.spark.api.java.function.Function;
 7 import org.apache.spark.api.java.function.Function2;
 8 import org.apache.spark.api.java.function.PairFunction;
 9
10 import java.util.List;
11 import java.util.ArrayList;
12 import java.util.Map;
13 import java.util.Collections;
14 import java.util.Comparator;

Step 2: Read input parameters

This step, demonstrated in Example 1-9, reads the HDFS input file (Spark may read data from HDFS and other persistent stores, such as a Linux filesystem), which might look like /dir1/dir2/myfile.txt.

Example 1-9. Step 2: Read input parameters

1    // Step 2: read input parameters and validate them
2    if (args.length < 1) {
3       System.err.println("Usage: SecondarySort <file>");
4       System.exit(1);
5    }
6    String inputPath = args[0];
7    System.out.println("args[0]: <file>="+args[0]);

Step 3: Connect to the Spark master

To work with RDDs, first you need to create a JavaSparkContext object (as shown in Example 1-10), which is a factory for creating JavaRDD and JavaPairRDD objects. It is also possible to create a JavaSparkContext object by injecting a SparkConf object into the JavaSparkContext’s class constructor. This approach is useful when you read your cluster configurations from an XML file. In a nutshell, the JavaSparkContext object has the following responsibilities:

  • Initializes the application driver.

  • Registers the application driver to the cluster manager. (If you are using the Spark cluster, then this will be the Spark master; if you are using YARN, then it will be YARN’s resource manager.)

  • Obtains a list of executors for executing your application driver.

Example 1-10. Step 3: Connect to the Spark master

1    // Step 3: connect to the Spark master by creating a JavaSparkContext object
2    final JavaSparkContext ctx = new JavaSparkContext();

Step 4: Use the JavaSparkContext to create a JavaRDD

This step, illustrated in Example 1-11, reads an HDFS file and creates a JavaRDD<String> (which represents a set of records where each record is a String object). By definition, Spark’s RDDs are immutable (i.e., they cannot be altered or modified). Note that Spark’s RDDs are the basic abstraction for parallel execution. Note also that you may use textFile() to read HDFS or non-HDFS files.

Example 1-11. Step 4: Create JavaRDD

1    // Step 4: use ctx to create JavaRDD<String>
2    // input record format: <name><,><time><,><value>
3    JavaRDD<String> lines = ctx.textFile(inputPath, 1);

Step 5: Create key-value pairs from the JavaRDD

This step, shown in Example 1-12, implements a mapper. Each record (from the JavaRDD<String> and consisting of <name><,><time><,><value>) is converted to a key-value pair, where the key is a name and the value is a Tuple2(time, value).

Example 1-12. Step 5: Create key-value pairs from JavaRDD

 1    // Step 5: create key-value pairs from JavaRDD<String>, where
 2    // key is the {name} and value is a pair of (time, value).
 3    // The resulting RDD will be a JavaPairRDD<String, Tuple2<Integer, Integer>>.
 4    // Convert each record into Tuple2(name, time, value).
 5    // PairFunction<T, K, V>
 6    //     T => Tuple2(K, V) where T is input (as String),
 7    //     K=String
 8    //     V=Tuple2<Integer, Integer>
 9    JavaPairRDD<String, Tuple2<Integer, Integer>> pairs =
10          lines.mapToPair(new PairFunction<
11                                           String,                  // T
12                                           String,                  // K
13                                           Tuple2<Integer, Integer> // V
14                                          >() {
15      public Tuple2<String, Tuple2<Integer, Integer>> call(String s) {
16        String[] tokens = s.split(","); // x,2,5
17        System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
18        Integer time = new Integer(tokens[1]);
19        Integer value = new Integer(tokens[2]);
20        Tuple2<Integer, Integer> timevalue =
          new Tuple2<Integer, Integer>(time, value);
21        return new Tuple2<String, Tuple2<Integer, Integer>>(tokens[0], timevalue);
22      }
23    });

Step 6: Validate step 5

To debug and validate your steps in Spark (as shown in Example 1-13), you may use JavaRDD.collect() and JavaPairRDD.collect(). Note that collect() is used for debugging and educational purposes (but avoid using collect() for debugging purposes in production clusters; doing so will impact performance). Also, you may use JavaRDD.saveAsTextFile() for debugging as well as creating your desired outputs.

Example 1-13. Step 6: Validate step 5

1    // Step 6: validate step 5-collect all values from JavaPairRDD<>
2    // and print them
3    List<Tuple2<String, Tuple2<Integer, Integer>>> output = pairs.collect();
4    for (Tuple2 t : output) {
5       Tuple2<Integer, Integer> timevalue = (Tuple2<Integer, Integer>) t._2;
6       System.out.println(t._1 + "," + timevalue._1 + "," + timevalue._1);
7    }

Step 7: Group JavaPairRDD elements by the key (name)

We implement the reducer operation using groupByKey(). As you can see in Example 1-14, it is much easier to implement the reducer through Spark than MapReduce/Hadoop. Note that in Spark, in general, reduceByKey() is more efficient than groupByKey(). Here, however, we cannot use reduceByKey().

Example 1-14. Step 7: Group JavaPairRDD elements

1    // Step 7: group JavaPairRDD<> elements by the key ({name})
2    JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups =
3          pairs.groupByKey();

Step 8: Validate step 7

This step, shown in Example 1-15, validates the previous step by using the collect() function, which gets all values from the groups RDD.

Example 1-15. Step 8: Validate step 7

 1    // Step 8: validate step 7-we collect all values from JavaPairRDD<>
 2    // and print them
 2    System.out.println("===DEBUG1===");
 3    List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> output2 =
 4          groups.collect();
 5    for (Tuple2<String, Iterable<Tuple2<Integer, Integer>>> t : output2) {
 6       Iterable<Tuple2<Integer, Integer>> list = t._2;
 7       System.out.println(t._1);
 8       for (Tuple2<Integer, Integer> t2 : list) {
 9          System.out.println(t2._1 + "," + t2._2);
10       }
11       System.out.println("=====");
12    }

The following shows the output of this step. As you can see, the reducer values are not sorted:

y
2,5
1,7
3,1
=====
x
2,9
1,3
3,6
=====
z
1,4
2,8
3,7
4,0
=====
p
2,6
4,7
6,0
7,3
1,9
=====

Step 9: Sort the reducer’s values in memory

This step, shown in Example 1-16, uses another powerful Spark method, mapValues(), to just sort the values generated by reducers. The mapValues() method enables us to convert (K, V1) into (K, V2), where V2 is a sorted V1. One important note about Spark’s RDD is that it is immutable and cannot be altered/updated by any means. For example, in this step, to sort our values, we have to copy them into another list first. Immutability applies to the RDD itself and its elements.

Example 1-16. Step 9: sort the reducer’s values in memory

 1    // Step 9: sort the reducer's values; this will give us the final output.
 2    // Option #1: worked
 3    // mapValues[U](f: (V) => U): JavaPairRDD[K, U]
 4    // Pass each value in the key-value pair RDD through a map function
 5    // without changing the keys;
 6    // this also retains the original RDD's partitioning.
 7    JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> sorted =
 8          groups.mapValues(
 9            new Function<Iterable<Tuple2<Integer, Integer>>,  // input
10                         Iterable<Tuple2<Integer, Integer>>   // output
11                        >() {
12      public Iterable<Tuple2<Integer, Integer>> call(Iterable<Tuple2<Integer,
13                                                                  Integer>> s) {
14        List<Tuple2<Integer, Integer>> newList = new ArrayList<Tuple2<Integer,
15                                                                   Integer>>(s);
16        Collections.sort(newList, new TupleComparator());
17        return newList;
18      }
19    });

Step 10: output final result

The collect() method collects all of the RDD’s elements into a java.util.List object. Then we iterate through the List to get all the final elements (see Example 1-17).

Example 1-17. Step 10: Output final result

 1    // Step 10: validate step 9-collect all values from JavaPairRDD<>
 2    // and print them
 3    System.out.println("===DEBUG2=");
 4    List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> output3 =
 5          sorted.collect();
 6    for (Tuple2<String, Iterable<Tuple2<Integer, Integer>>> t : output3) {
 7       Iterable<Tuple2<Integer, Integer>> list = t._2;
 8       System.out.println(t._1);
 9       for (Tuple2<Integer, Integer> t2 : list) {
10          System.out.println(t2._1 + "," + t2._2);
11       }
12       System.out.println("=====");
13    }

Spark Sample Run

As far as Spark/Hadoop is concerned, you can run a Spark application in three different modes:

Standalone mode

This is the default setup. You start the Spark master on a master node and a “worker” on every slave node, and submit your Spark application to the Spark master.

YARN client mode

In this mode, you do not start a Spark master or worker nodes. Instead, you submit the Spark application to YARN, which runs the Spark driver in the client Spark process that submits the application.

YARN cluster mode

In this mode, you do not start a Spark master or worker nodes. Instead, you submit the Spark application to YARN, which runs the Spark driver in the ApplicationMaster in YARN.

Next, we will cover how to submit the secondary sort application in the standalone and YARN cluster modes.

Running Spark in standalone mode

The following subsections provide the input, script, and log output of a sample run of our secondary sort application in Spark’s standalone mode.

HDFS input

# hadoop fs -cat /mp/timeseries.txt
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,2,6
p,4,7
p,1,9
p,6,0
p,7,3

The script

# cat run_secondarysorting.sh
#!/bin/bash
export JAVA_HOME=/usr/java/jdk7
export SPARK_HOME=/home/hadoop/spark-1.1.0
export SPARK_MASTER=spark://myserver100:7077
BOOK_HOME=/home/mp/data-algorithms-book
APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar
INPUT=/home/hadoop/testspark/timeseries.txt
# Run on a Spark standalone cluster
prog=org.dataalgorithms.chap01.spark.SparkSecondarySort
$SPARK_HOME/bin/spark-submit \
  --class $prog \
  --master $SPARK_MASTER \
  --executor-memory 2G \
  --total-executor-cores 20 \
  $APP_JAR \
  $INPUT

Log of the run

# ./run_secondarysorting.sh
args[0]: <file>=/mp/timeseries.txt
...
===  DEBUG STEP 5 ===
...
x,2,2
y,2,2
x,1,1
y,1,1
y,3,3
x,3,3
z,1,1
z,2,2
z,3,3
z,4,4
p,2,2
p,4,4
p,1,1
p,6,6
p,7,7
===  DEBUG STEP 7 ===
14/06/04 08:42:54 INFO spark.SparkContext: Starting job: collect
  at SecondarySort.java:96
14/06/04 08:42:54 INFO scheduler.DAGScheduler: Registering RDD 2
  (mapToPair at SecondarySort.java:75)
...
14/06/04 08:42:55 INFO scheduler.DAGScheduler: Stage 1
  (collect at SecondarySort.java:96) finished in 0.273 s
14/06/04 08:42:55 INFO spark.SparkContext: Job finished:
  collect at SecondarySort.java:96, took 1.587001929 s
z
1,4
2,8
3,7
4,0
=====
p
2,6
4,7
1,9
6,0
7,3
=====
x
2,9
1,3
3,6
=====
y
2,5
1,7
3,1
=====
===  DEBUG STEP 9 ===
14/06/04 08:42:55 INFO spark.SparkContext: Starting job: collect
  at SecondarySort.java:158
...
14/06/04 08:42:55 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0,
  whose tasks have all completed, from pool
14/06/04 08:42:55 INFO spark.SparkContext: Job finished: collect at
  SecondarySort.java:158, took 0.074271723 s
z
1,4
2,8
3,7
4,0
=====
p
1,9
2,6
4,7
6,0
7,3
=====
x
1,3
2,9
3,6
=====
y
1,7
2,5
3,1
=====

Typically, you save the final result to HDFS. You can accomplish this by adding the following line of code after creating your “sorted” RDD: sorted.saveAsTextFile("/mp/output");

Then you may view the output as follows:

# hadoop fs -ls /mp/output/
Found 2 items
-rw-r--r--   3 hadoop root,hadoop      0 2014-06-04 10:49 /mp/output/_SUCCESS
-rw-r--r--   3 hadoop root,hadoop    125 2014-06-04 10:49 /mp/output/part-00000

# hadoop fs -cat /mp/output/part-00000
(z,[(1,4), (2,8), (3,7), (4,0)])
(p,[(1,9), (2,6), (4,7), (6,0), (7,3)])
(x,[(1,3), (2,9), (3,6)])
(y,[(1,7), (2,5), (3,1)])

Running Spark in YARN cluster mode

The script to submit our Spark application in YARN cluster mode is as follows:

# cat run_secondarysorting_yarn.sh
#!/bin/bash
export JAVA_HOME=/usr/java/jdk7
export HADOOP_HOME=/usr/local/hadoop-2.5.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_HOME=/home/hadoop/spark-1.1.0
BOOK_HOME=/home/mp/data-algorithms-book
APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar
INPUT=/mp/timeseries.txt
prog=org.dataalgorithms.chap01.spark.SparkSecondarySort
$SPARK_HOME/bin/spark-submit \
  --class $prog \
  --master yarn-cluster \
  --executor-memory 2G \
  --num-executors 10 \
  $APP_JAR \
  $INPUT

Option #2: Secondary Sorting Using the Spark Framework

In the solution for option #1, we sorted reducer values in memory (using Java’s Collections.sort() method), which might not scale if the reducer values will not fit in a commodity server’s memory. Next we will implement option #2 for the MapReduce/Hadoop framework. We cannot achieve this in the current Spark (Spark-1.1.0) framework, because currently Spark’s shuffle is based on a hash, which is different from MapReduce’s sort-based shuffle. So, you should implement sorting explicitly using an RDD operator. If we had a partitioner by a natural key (name) that preserved the order of the RDD, that would be a viable solution—for example, if we sorted by (name, time), we would get:

(p,1),(1,9)
(p,4),(4,7)
(p,6),(6,0)
(p,7),(7,3)

(x,1),(1,3)
(x,2),(2,9)
(x,3),(3,6)

(y,1),(1,7)
(y,2),(2,5)
(y,3),(3,1)

(z,1),(1,4)
(z,2),(2,8)
(z,3),(3,7)
(z,4),(4,0)

There is a partitioner (represented as an abstract class, org.apache.spark.Partitioner), but it does not preserve the order of the original RDD elements. Therefore, option #2 cannot be implemented by the current version of Spark (1.1.0).

Further Reading on Secondary Sorting

To support secondary sorting in Spark, you may extend the JavaPairRDD class and add additional methods such as groupByKeyAndSortValues(). For further work on this topic, you may refer to the following:

Chapter 2 provides a detailed implementation of the Secondary Sort design pattern using the MapReduce and Spark frameworks.

For details, see the Spark documentation.