This notebook requires the Apache Toree kernel to be installed in Jupyter and Spark to be installed on the machine. Install instructions are at the Apache Toree site. Be sure to read both the quick start and the installation guide.

Download the file SparkFirstExample.zip and unpack it to access the notebook that you can run.

You can verify the version of Spark by executing:

In [5]:
sc.version
Out[5]:
2.2.0

The following will return the number of workers that you are running. local[*] means that you are using one worker per core on your machine.

In [6]:
sc.getConf.getOption("spark.master")
Out[6]:
Some(local[*])

First an example creating an RDD.

In [7]:
sc.parallelize( 1 to 100)
Out[7]:
ParallelCollectionRDD[1] at parallelize at <console>:20

Count is an action so it brings the result back to the master

In [8]:
sc.parallelize(1 to 100).count
Out[8]:
100
In [9]:
Now for some examples using DataFrames. First
Out[9]:
Name: Compile Error
Message: <console>:1: error: ';' expected but 'for' found.
Now for some examples using DataFrames. First
    ^
StackTrace: 
In [10]:
import org.apache.spark.sql._
val spark = SparkSession.builder().appName("Sample").getOrCreate()

The file listed below is included in the .zip file. Change the path below

In [11]:
val jsonFlightFile = "/Users/whitney/Courses/696/Fall17/SparkBookData/flight-data/json/2015-summary.json"

val flightData2015 = spark.read.json(jsonFlightFile)
In [12]:
flightData2015.take(2)
Out[12]:
Array([United States,Romania,15], [United States,Croatia,1])
In [13]:
flightData2015.explain()
== Physical Plan ==
*FileScan json [DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/whitney/Courses/696/Fall17/SparkBookData/flight-data/json/2015-summ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>
In [14]:
flightData2015.explain(true)
== Parsed Logical Plan ==
Relation[DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] json

== Analyzed Logical Plan ==
DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint
Relation[DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] json

== Optimized Logical Plan ==
Relation[DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] json

== Physical Plan ==
*FileScan json [DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/whitney/Courses/696/Fall17/SparkBookData/flight-data/json/2015-summ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>
In [15]:
val sortedFlightData2015 = flightData2015.sort("count")
sortedFlightData2015.show
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Estonia|    1|
|              Kosovo|      United States|    1|
|              Zambia|      United States|    1|
|       United States|   Papua New Guinea|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|            Suriname|      United States|    1|
|       United States|            Croatia|    1|
|            Djibouti|      United States|    1|
|        Burkina Faso|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|             Cyprus|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United States|            Georgia|    1|
|       United States|            Bahrain|    1|
|       Cote d'Ivoire|      United States|    1|
+--------------------+-------------------+-----+
only showing top 20 rows

In [16]:
sortedFlightData2015.show(5)
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

In [17]:
sortedFlightData2015.explain(true)
== Parsed Logical Plan ==
Sort [count#10L ASC NULLS FIRST], true
+- Relation[DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] json

== Analyzed Logical Plan ==
DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint
Sort [count#10L ASC NULLS FIRST], true
+- Relation[DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] json

== Optimized Logical Plan ==
Sort [count#10L ASC NULLS FIRST], true
+- Relation[DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] json

== Physical Plan ==
*Sort [count#10L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#10L ASC NULLS FIRST, 200)
   +- *FileScan json [DEST_COUNTRY_NAME#8,ORIGIN_COUNTRY_NAME#9,count#10L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/whitney/Courses/696/Fall17/SparkBookData/flight-data/json/2015-summ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>
In [18]:
val grouped = sortedFlightData2015.groupBy("DEST_COUNTRY_NAME")
In [19]:
grouped.mean("count").show
+--------------------+----------+
|   DEST_COUNTRY_NAME|avg(count)|
+--------------------+----------+
|            Anguilla|      41.0|
|            Paraguay|      60.0|
|              Russia|     176.0|
|             Senegal|      40.0|
|              Sweden|     118.0|
|            Kiribati|      26.0|
|              Guyana|      64.0|
|         Philippines|     134.0|
|            Djibouti|       1.0|
|            Malaysia|       2.0|
|           Singapore|       3.0|
|                Fiji|      24.0|
|              Turkey|     138.0|
|                Iraq|       1.0|
|             Germany|    1468.0|
|              Jordan|      44.0|
|               Palau|      30.0|
|Turks and Caicos ...|     230.0|
|              France|     935.0|
|              Greece|      30.0|
+--------------------+----------+
only showing top 20 rows