From charlesreid1

Installing

Docker

Easiest solution: use Docker.

Use the Jupyter PySpark notebook Docker container: https://hub.docker.com/r/jupyter/pyspark-notebook/

This comes bundled with Apache Mesos, which is a cluster resource management framework. This enables you to connect to a Mesos-managed cluster and use compute resources on that cluster.

This Docker image is provided courtesy of the Jupyter project on Github: https://github.com/jupyter/docker-stacks

Nice explanation of how to set it up with either a standalone (single node) or Mesos cluster in the PySpark notebook image's README: https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook

Basically, here are the first few lines of a standalone notebook:

import pyspark
sc = pyspark.SparkContext('local[*]')

# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

Mac

Ensure you have the following software installed:

Installing with Homebrew

Install Apache Spark using Homebrew:

$ brew install apache-spark

This should put pyspark on your path:

$ which pyspark
/usr/local/bin/pyspark

I was still getting problems importing pyspark, so I also ended up running a

$ pip3 install pyspark

Linux

Have the following software installed:

Download spark from this page: http://spark.apache.org/downloads.html

Now get the Scala build tool into aptitude (see https://stackoverflow.com/questions/35529913/how-to-install-sbt-on-ubuntu-debian-with-apt-get):

$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

Now unzip the Spark source, enter the directory, and run:

$ sbt assembly

Ensure Spark was built correctly by running this command from the same directory:

$ bin/pyspark

Now set the $SPARK_HOME environment variable to wherever your Spark lives:

export SPARK_HOME="/path/to/unzipped/spark-2.2"


Testing Out Pyspark

Test it out by running the pyspark command. This should look a bit like Python, but with a Spark splash message:

$ pyspark
Python 2.7.10 (default, Feb  7 2017, 00:08:15)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/09/26 17:53:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/26 17:53:16 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/09/26 17:53:16 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/09/26 17:53:17 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.10 (default, Feb  7 2017 00:08:15)
SparkSession available as 'spark'.
>>>

Test that it's ok by checking if the sc variable is holding a Spark context:

>>> sc
<SparkContext master=local[*] appName=PySparkShell>

Set Up PySpark With Jupyter

To use PySpark through a Jupyter notebook, instead of through the command line, first make sure your Jupyter is up to date:

$ pip3 install --upgrade jupyter

You may also need to install pyspark using pip:

$ pip3 install --upgrade pyspark

Start up Jupyter and create a new notebook:

$ jupyter notebook

Make a cell that tests out pyspark:

import pyspark
sc = pyspark.SparkContext('local[*]')

# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

Error with conflicting Python versions

When I ran the above test, I saw this error that the workers were running Python 2 and the master was running Python 3:

(For background, the "python" command was pointing to Python 2, but I created a Python 3 notebook, so the workers were using the default Python command pointed to by "python" (version 2) while the notebook, running the PySpark master, was using Python version 3.)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-0fdbaf1f4e92> in <module>()
      1 # do something to prove it works
      2 rdd = sc.parallelize(range(1000))
----> 3 rdd.takeSample(False, 5)

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in takeSample(self, withReplacement, num, seed)
    477             return []
    478 
--> 479         initialCount = self.count()
    480         if initialCount == 0:
    481             return []

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in count(self)
   1039         3
   1040         """
-> 1041         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1042 
   1043     def stats(self):

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in sum(self)
   1030         6.0
   1031         """
-> 1032         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
   1033 
   1034     def count(self):

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in fold(self, zeroValue, op)
    904         # zeroValue provided to each partition is unique from the one provided
    905         # to the final reduce call
--> 906         vals = self.mapPartitions(func).collect()
    907         return reduce(op, vals, zeroValue)
    908 

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in collect(self)
    807         """
    808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    810         return list(_load_from_socket(port, self._jrdd_deserializer))
    811 

/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

I ended up running

$ export PYSPARK_PYTHON="python3" jupyter notebook

This resolved the issue. You should see:

Out[1] : [285, 777, 376, 101, 737]

Usage

RDD

RDD = Resilient distributed dataset

RDDs are a central concept in Spark. They provide a way of splitting a larger data set into chunks. (Think of it like the Spark version of Hadoop's in-memory HDFS file system).

RDDs keep track of the transformations applied to each chunk, speeding up computations. If any chunks are lost, the transformations can be re-applied.

Internal Workings

RDDs operate in parallel - they are distributed in parallel, executed in parallel, transformed in parallel.

Transformations are lazy - the actions are accumulated but not performed, until the user calls for action. They also optimize execution.

Example of an inefficient workflow:

  • Count the number of occurrences of each word
  • Select the word counts of all words starting with the letter "T"
  • Print the results

The inefficiency in this workflow comes from counting all of the words that don't start with the letter T, which will never be returned. Spark's model is designed to optimize these calculations.

To illustrate, let's examine how the inefficient workflow is implemented in Spark code:

First, tell Spark to count words:

.map(lambda w : (w,1))

Then, tell Spark to map words starting with T:

.filter( lambda w : w.startswith('T') )

Then, we call the reduce by key:

.reduceByKey(operator.add)

This reduces the data set and adds (counts) the number of occurrences of each key.

Finally, these results are collected using the collect method:

.collect()

Calling collect is the user telling Spark to go into action - up until that point, the maps, filters, and reduce operations are just accumulated. When collect is called, the actions are actually carried out.

Creating RDDs

To create an RDD, you can use two ways:

  • Parallelize a list or array of elements
  • Load a text file in parallel

Example of parallelizing an array:

data = sc.parallelize( [ ('A',1), ('B',2), ('C',3), ('D',4) ] )

The data object is a ParallelCollectionRDD object.

Example of parallelizing a file:

fileData = sc.textFile('/path/to/file/stuff.txt' , 2)

(The last number in this argument specifies how many partitions to divide the dataset into).

Spark can also load compressed (gzipped) files directly:

compressedFileData = sc.textFile('/path/to/file/stuff.txt.gz' , 2)

These two objects are MapPartitionsRDD objects.

Spark can read from multiple file system types: local disks, network filesystems, Amazon S3, Google Cloud, Hadoop HDFS, Cassandra, and others.

Spark can also read data in different formats - text, json, Hive, SQL query results from RDBs.

RDDs have no schema, and can therefore be heterogeneous. For example, you can parallelize a list that consists of a tuple, and a dict, and a list, and Spark is okay with that. When you collect the results again (which returns all of the data back to the driver, or master, node), the resulting data set will function as any list containing a tuple, a dict, and a list.

Local Mode vs Cluster Mode

PySpark can be run in local mode or in cluster mode.

Local Mode

In local mode, things work like your "standard" Python script - there is one namespace, and if any process (on a worker or master node) changes data, those changes are visible to every worker. This creates a lot of communication overhead, so it isn't sustainable for a large cluster, but it might be useful if you have, say, two or three nodes. This reduces the cognitive overhead of switching your entire script to work for a cluster.

Cluster Mode

In cluster mode, the user submits a job for execution to the driver (master) node. The master creates a directed acyclic graph (see Graphs/DAGs) to decide which workers will perform the task. The master prepares each task's "closure" (materials, data, variables, methods to make sure the worker node actually has everything required to finish the task). It then copies that information over to the worker.

In cluster mode, the data is static - that's what we mean when we say copies are distributed to each worker. Each worker node can modify the data without affecting other workers' copies or the master node's copy.

Example

An example of a calculation in cluster mode, to illustrate:

Suppose we are numerically integrating a function over an interval using PySpark. Then we would divide the entire interval into pieces, and assign each piece of the interval to a different worker node. Each worker node runs the same code, with the same variables - delta x, start_x, end_x, and running_sum - but each worker has a separate copy of these variables. When the task is complete and each worker has found the sum of the approximating areas, these are accumulated in running_sum, and these disparate running_sum values are all collected into a single return value.

By contrast, if we were running this in local mode, we would have to proceed in serial - starting at the beginning of the interval, and proceeding to the end of the interval. (Or, alternatively, we would need multiple delta_x, start_x, end_x, and running_sum variables.)

Transformations

There are a LOT of transformations available. Full list available in PySpark documentation:

Lists

A list of the most important/common transforms:

  • map
  • flatMap
  • distinct
  • sample
  • leftOuterJoin
  • repartition

A pretty comprehensive list of transforms:

  • aggregate
  • aggregateByKey
  • cache
  • cartesian
  • checkpoint
  • coalesce
  • cogroup
  • collect
  • collectAsMap
  • combineByKey
  • context
  • count
  • countApprox
  • countApproxDistinct
  • countByKey
  • countByValue
  • distinct
  • filter
  • first
  • flatMap
  • flatMapValues
  • fold
  • foldByKey
  • foreach
  • foreachPartition
  • fullOuterJoin
  • getCheckpointFile
  • getNumPartitions
  • getStorageLevel
  • glom
  • groupBy
  • groupByKey
  • groupWith
  • histogram
  • id
  • intersection
  • isCheckpointed
  • isEmpty
  • isLocallyCheckpointed
  • join
  • keyBy
  • keys
  • leftOuterJoin
  • localCheckpoint
  • lookup
  • map
  • mapPartitions
  • mapValues
  • max
  • mean
  • name
  • persist
  • pipe
  • randomSplit
  • reduce
  • reduceByKey
  • reduceByKeyLocally
  • repartition
  • rightOuterJoin
  • sample
  • sampleByKey
  • sampleStdev
  • sampleVariance
  • saveAsHadoopDataset
  • saveAsPickleFile
  • saveAsTextFile
  • setName
  • sortBy
  • stats
  • stdev
  • subtract
  • sum
  • sumApprox
  • take
  • takeSample
  • top
  • treeAggregate
  • treeReduce
  • union
  • unpersist
  • values
  • variance
  • zip

Map Transform

The map transform is probably the most common; it applies a function to each element of the RDD.

Example: suppose we have a list of strings, and we want to turn them into integers. Then we can use map(), together with a lambda function, to apply the int() function to each string:

data_str = sc.parallelize(['15','25','38','42','127','384', ..., '1025' ] )
data_int = data_str.map( lambda x : int(x) )

Now if we run data_int.take(5), we'll see the first 5 values as a list of integers:

[15, 25, 38, 42, 127]

We can also build more complex lambda functions that return, e.g., tuples:

data_tuple = data_str.map( lambda x : (x, int(x)) )

which would return tuples with the first element being the string form of the number and the second element being the integer form of the number:

[ ('15', 15),
  ('25', 25),
  ('38', 38),
  ('42', 42),
  ('127', 127)]

Filter Transformation

The filter transformation is a way of filtering out data according to boolean criteria.

Provide a lambda function that returns a boolean. Filter will only return values for the RDD for which the boolean function returned True.

data_filt = data_str.map( lambda x : int(x) ).filter( lambda x : (x > 28 and x < 100) )

which would return

[38, 42]

FlatMap Transformation

Flat map is like map, in that it applies a function to each element of an RDD. However, it flattens anything that results. So if a function returns, say, a list of tuples, or a list of lists, flatMap will flatten those to a single, flat, shallow list. Compare:

data_notflat = data_str.map( lambda x : (x, int(x)) )

which resulted in a list of tuples. Compare with the flatMap call:

data_flat = data_str.flatMap( lambda x : (x, int(x)) )

which returns a flat map:

['15', 15, '25', 25, '38', 38, '42', 42, '127', 127, ... ]

Distinct Transformation

The distinct transformation is like the Numpy unique() function, it returns all of the unique values found in an RDD.

Suppose you have a file that contains information about people, and the fifth column contains an entry for gender. To obtain all unique values for this column (and remembering lists are zero-indexed):

distinct_gender = file_data.map( lambda row : row[4]).distinct()
distinc_gender.collect()

would return:

['O', 'M', 'F']

male/female/other.

NOTE: the distinct method is computationally expensive, as it parses all of the data.

Sample Transformation

This transformation samples the full data set and returns the sample. Think of it as picking out M representatives from a population of N things.

The parameters are:

  • Boolean - should the samples be replaced
  • Float - what fraction of the data should be sampled
  • Integer - what seed to use for the RNG

Example:

# Sample without replacing the values (remove them from the population)
# Sample 20% of the population
# Seed the random number generator with 1337 b/c we are 1337
fraction = 0.2
data_sample = file_data.sample( False, fraction, 1337)

Now we've basically split our data set into a 20% piece and an 80% piece.

One more transformation we can apply that's useful in this context is count():

print("Population count: %d / Sample count: %d" % (file_data.count(), data_sample.count()) )

Actions

Flags