Monday, July 16, 2018

Analyzing Sensor Data with Spark Streaming

By the end of this activity, you will be able to:
  1. Read streaming data into Spark
  2. Create and apply computations over a sliding window of data

Step 1. Open Jupyter Python Notebook for Spark Streaming. Open a web browser by clicking on the web browser icon at the top of the toolbar:
Navigate to localhost:8889/tree/Downloads/big-data-3/spark-streaming:
Open the Spark Streaming Notebook by clicking on Spark-Streaming.ipynb:
Step 2. Look at sensor format and measurement types. The first cell in the notebook gives an example of the streaming measurements coming from the weather station:
Each line contains a timestamp and a set of measurements. Each measurement has an abbreviation, and for this exercise, we are interested in the average wind direction, which is Dm. The next cell lists the abbreviations used for each type of measurement:
The third cell defines a function that parses each line and returns the average wind direction (Dm). Run this cell:

Step 3. Import and create streaming context. Next, we will import and create a new instance of Spark's StreamingContext:
Similar to the SparkContext, the StreamingContext provides an interface to Spark's streaming capabilities. The argument sc is the SparkContext, and 1 specifies a batch interval of one second.

Step 4. Create DStream of weather data. Let's open a connection to the streaming weather data:
Instead of 12028, you may find that port 12020 works instead. This create a new variable lines to be a Spark DStream that streams the lines of output from the weather station.

Step 5. Read measurement. Next, let's read the average wind speed from each line and store it in a new DStream vals:
This line uses flatMap() to iterate over the lines DStream, and calls the parse() function we defined above to get the average wind speed.

Step 6. Create sliding window of data. We can create a sliding window over the measurements by calling the window() method:
This create a new DStream called window that combines the ten seconds worth of data and moves by five seconds.

Step 7. Define and call analysis function. We would like to find the minimum and maximum values in our window. Let's define a function that prints these values for an RDD:
This function first prints the entire contents of the RDD by calling the collect() method. This is done to demonstrate the sliding window and would not be practical if the RDD was containing a large amount of data. Next, we check if the size of the RDD is greater than zero before printing the maximum and minimum values.
Next, we call the stats() function for each RDD in our sliding window:
This line calls the stats() function defined above for each RDD in the DStream window.

Step 8. Start the stream processing. We call start() on the StreamingContext to begin the processing:
The sliding window contains ten seconds worth of data and slides every five seconds. In the beginning, the number of values in the windows are increasing as the data accumulates, and after Window 3, the size stays (approximately) the same. Since the window slides half as often as the size of the window, the second half of a window becomes the first half of the next window. For example, the second half of Window 5 is 310, 321, 323, 325, 326, which becomes the first half of Window 6.
When we are done, call stop() on the StreamingContext:

Instructions for Configuring VirtualBox for Spark Streaming

Spark Streaming requires more than one executor. This Reading describes how to configure VirtualBox so that the Cloudera VM has more than one virtual processor.

Step 1. Stop the Cloudera VM. Before we can change the settings for the Cloudera VM, the VM needs to be powered off. If the VM is running, click on System in the top toolbar, and then click on Shutdown:
Next, click on Shut down:

Step 2. Change number of processors. Once the Cloudera VM is powered off, select the Cloudera VM in the list of virtual machines in the VirtualBox Manager:
Next, click on Settings:
Next, click on System:
Next, click on Processor:
The default number of processors is one. Change this to two or more:

Finally, click on OK and start the Cloudera VM.

Exploring Spark SQL and Spark DataFrames

By the end of this activity, you will be able to:
  1. Access Postgres database tables with SparkSQL
  2. Filter rows and columns of a Spark DataFrame
  3. Group and perform aggregate functions on columns in a Spark DataFrame
  4. Join two SparkDataframes on a single column

Step 1. Open Jupyter Python Notebook for SparkSQL. First Open the Terminal and enter the command "pyspark" to setup the server. Next, open a web browser by clicking on the web browser icon at the top of the toolbar:
Navigate to localhost:8889/tree/Downloads/big-data-3/spark-sql:
Open the SparkSQL Notebook by clicking on SparkSQL.ipynb:

Step 2. Connect to Postgres Table. This notebook already contains three lines of code so you do not have to enter them. Run these three lines. The first line imports the SQLContext module, which is needed access SQL databases in Spark:
The second line creates a new SQLContext from the SparkContext sc:
The third line creates a new Spark DataFrame in the variable df for the Postgres table gameclicks:
The format("jdbc") says that the source of the DataFrame will be using a Java database connection, the url option is the URL connection string to access the Postgres database, and the dbtable option specifies the gameclicks table.

Step 3. View Spark DataFrame schema and count rows. We can call the printSchema() method to view the schema of the DataFrame:
The description lists the name and data type of each column.
We can also call the count() method to count the number of rows in the DataFrame:

Step 4. View contents of DataFrame. We can call the show() method to view the contents of the DataFrame. The argument specifies how many rows to display:

Step 5. Filter columns in DataFrame. We can filter for one or more columns by calling the select() method:

Step 6. Filter rows based on criteria. We can also filter for rows that match a specific criteria using filter():
The arguments to filter() are a Column, in this case specified as df["teamlevel"], and the condition, which is greater than 1. The remainder of the commander selects only the userid and teamlevel columns and shows the first five rows.

Step 7. Group by a column and count. The groupBy() method groups the values of column(s). The ishit column only has values 0 and 1. We can calculate how many times each occurs by grouping the ishit column and counting the result:

Step 8. Calculate average and sum. Aggregate operations can be performed on columns of DataFrames. First, let's import the Python libraries for the aggregate operations. Next, we can calculate the average and total values by calling the mean() and sum() methods, respectively:
Step 9. Join two DataFrames. We can merge or join two Dataframes on a single column. First, let's create a DataFrame for the adclicks table in the Postgres database by copying the third cell in this notebook and changing gameclicks to adclicks and storing the result in a new variable df2:
Let's view the columns in df2 by calling printSchema():
We can see that the adclicks df2 DataFrame also has a column called userid. Next, we will combine the gameclicks and adclicks DataFrames by calling the join() method and saving the resulting DataFrame in a variable called merge:
We are calling the join() method on the gameclicks DataFrame; the first argument is the DrataFrame to join with, i.e., the adclicks DataFrame, and the second argument is the column name in both DataFrames to join on.
Let's view the schema of merge:
We can see that the merged DataFrame has all the columns of both gameclicks and adclicks.
Finally, let's look at the contents of merge: