From charlesreid1

Leveraging Unstructured Data with Cloud Dataproc

Module 1: Introduction to Cloud Dataproc

Overview of Unstructured Data

In industry:

  • Unstructured data is often the hardest to deal with
  • Four sources of data: (data you have, and analyze) (data you don't have, but wish you had) (data you have, but don't analyze) (data that you could easily acquire)
  • Focus on data you have, but don't analyze
  • Several reasons why you wouldn't analyze it - volume, quality, velocity, and lack of structure
  • Most common difficulty is lack of structure - we don't have the kinds of tools to analyze unstructured data that we have to analyze structured data
  • Example: emails from customers. Newsgroups. Photographs taken in the field (inspections).
  • If we don't have tools for dealing with images, or with free-form text, we will ignore it ("too hard")

Example:

  • Google Street View imagery collected
  • Huge collection of unstructured imagery
  • Collected, used, and shown to users of Google Maps
  • Initially, no technology in place to utilize it any further
  • Once deep learning came along, went back and looked at that imagery
  • Could analyze the imagery, extract information, use it to enhance the maps

Extrapolating:

  • You may have a lot of unstructured data laying around in the company, serving one specific purpose
  • Typically, it is given to a human user to analyze, and that's the end - because there is no automated system, no automated tools, for dealing with that unstructured data
  • Google Cloud provides machine learning tools to analyze unstructured data (freeform text, images, etc.)
  • You don't have to start from scratch - using the ML APIs, can take advantage of pre-built machine learning models to extract information that is useful

Counting Problems

Consider MapReduce tasks: these can usually be boiled down to counting problems

Example: delays in payment processing

  • Not just counting, but essentially, lots of counting problems
  • Counting how long it took you, counting number of occurrences, counting a mean
  • MapReduce is full of easy counting problems - large set of data, apply specific operations, count occurrences

Harder counting problems:

  • How often are programmers checking in low-quality code?
  • This is still a counting problem, but now it's hard to actually determine what to count
  • How do you determine what "low quality code" means?
  • How can you extract information like "low quality" or "high quality" from code?
  • Many analytical tools are about counting problems - but some counting problems are easier than others

Why Dataproc

What is a petabyte?

  • 27 years to download a petabyte over 4G
  • 100 libraries of congress
  • Every tweet ever tweeted......... TIMES 50
  • 2 micrograms of DNA
  • 1 day of videos uploaded to YouTube

Scaling up: bigger machine

Scaling out: more machines

MapReduce approach - split the data, compute nodes process data local to it

Big data open source stack:

  • Hadoop HDFS
  • On top of that, Pig/Hive/HBase/Spark

Challenge with clusters:

  • If you're using it 24/7, you need a bigger cluster
  • If you're not using it 24/7, you are using your cluster inefficiently
  • Need a shared resource that can be provisioned when you need it, not used when you don't

YARN = Yet Another Resource Negotiator (MapReduce 2.0)

Dataproc cluster allocated through menu:

  • We don't have to do anything (like connecting to worker nodes) except specify how many nodes we need
  • Bucket allocated for the cluster - this is how the worker nodes interact with the master nodes

Creating Dataproc Clusters

Philosophy: one cluster for one job

Select zone closest to your data (e.g., Cloud SQL)

  • Data coming into the data center will not cost you
  • Egress data (data leaving the data center) does cost you
  • Having Dataproc cluster in a different zone from your data will lead to more costs

Note: HDFS by default creates 3 copies of the data

We want our Hadoop jobs to directly use Cloud Storage, in place of HDFS - don't want to use HDFS to store input/output data

Preemptible worker nodes - we can provision worker nodes that can, potentially, get kicked off our cluster if a higher-priority job comes along

Want a core of nodes that are high-priority, but can also allocate a large number of pre-emptible worker nodes that may potentially "go away" (Hadoop is resilient to machines going down, so no problemo)

If we allocate preemptible compute nodes, these will match the types (CPU and memory) of regular worker nodes

They also won't have any primary disk associated with them, since they can't be part of HDFS (we don't want them to hold any data permanently, since they may disappear)

Versions of Dataproc:

  • Spark, Hadoop, Pig, Hive versions
  • Google Cloud Storage-Hadoop connector
  • BigQuery-Hadoop connector

To create a Dataproc cluster: can use web console, or can use gcloud SDK (command line)

To create these resources from a command line:

gcloud dataproc clusters create my-creative-cluster-name \
--zone us-central1-a \
--master-machine-type n1-standard-1 \
--master-boot-disk-size 50 \
--num-workers 2 \
--worker-machine-type n1-standard-1 \
--worker-boot-disk-size 50

To get some help on this:

$ gcloud dataproc --help
$ gcloud dataproc clusters --help 
$ gcloud dataproc clusters create --help

Custom machine types:

  • Control the types of machines in the cluster
  • Cloud Network for networking
  • Cloud IAM for security
  • Master node running on compute engine
  • Persistent workers running on compute engine
  • HDFS and Google Cloud Storage as a shared filesystem between nodes

Machine types:

  • Standard-4 means 4 cpus, 1 GB per cpu
  • Highmem instances double the amount of memory per core
  • Custom type: first dash number specifies number of CPUs, second dash number specifies amount of memory in MB

Example:

gcloud dataproc clusters create test-cluster \
--worker-machine-type custom-6-30720 \
--master-machine-type custom-6-23040 

Custom instance: 6 CPUs, 5 GB per core:

  • That's 30 GB total
  • 30 * 1024 = 30720 MB
  • Hence, the flag custom-6-30720

Note that you can also set up a custom cluster, and get the REST command that has the parameters you want, to set it up on the command line.


Creating Dataproc Clusters Lab

Lab goals:

  • Create console
  • SSH to cluster
  • Get to the browser management for Hadoop (this requires a firewall rule change to allow your local machine to reach the page)
  • Create, manage, delete Dataproc clusters from CLI

Create a Dataproc cluster:

  • Standard size: 1 cpu, 2 nodes
  • Disk space: 10 GB each
  • Once cluster is up, go to Compute Engine and find the master cluster. Click SSH.

If you want to SSH in, you can just use a browser window.

To enable SSH to the cluster from an arbitrary client:

  • Edit the master node in the Compute Engine section of the console
  • Scroll down to the SSH Keys section
  • Add the public key from the machine that will be connecting
  • Click save
  • Take note of the username assigned, just to the left of where you copy and paste your key
  • Now you can SSH in

Once you're in:

$ python --version
Python 2.7.9

$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-1~bpo8+1-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)

$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_131
Branch dataproc-branch-1.2
Compiled by user  on 2017-08-09T20:35:29Z
Revision 002ee20ea40d88d051e0a7c5bd4a7c07721dbddc
Url https://bigdataoss-internal.googlesource.com/third_party/apache/bigtop
Type --help for more information.

$ pig --version
Apache Pig version 0.16.0 (r: unknown)
compiled Aug 09 2017, 20:25:20

$ hive --version
Hive 2.1.1
Subversion git://build-dataproc-1-2/mnt/ram/bigtop/bigtop/output/hive/hive-2.1.1 -r 002ee20ea40d88d051e0a7c5bd4a7c07721dbddc
Compiled by bigtop on Wed Aug 9 19:46:33 UTC 2017
From source with checksum a7e70a0bdc3bd77d45c8738c41ac59eb

$ sudo su

# apt-get install inetutils-tools

# ifconfig
eth0      Link encap:Ethernet  HWaddr 42:01:0a:8a:00:03
          inet addr:10.138.0.3  Bcast:10.138.0.3  Mask:255.255.255.255
          UP BROADCAST RUNNING MULTICAST  MTU:1460  Metric:1
          RX packets:9088 errors:0 dropped:0 overruns:0 frame:0
          TX packets:6308 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000
          RX bytes:2222892 (2.1 MiB)  TX bytes:1654499 (1.5 MiB)

lo        Link encap:Local Loopback
          inet addr:127.0.0.1  Mask:255.0.0.0
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:65536  Metric:1
          RX packets:4322 errors:0 dropped:0 overruns:0 frame:0
          TX packets:4322 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0
          RX bytes:521771 (509.5 KiB)  TX bytes:521771 (509.5 KiB)

To restrict access via SSH to your machine and your machine only:

  • Side menu > Networking > VPC Networking > Firewall Rules
  • Create a new firewall rule
  • Targets: all instances in network
  • Source IP ranges: enter your IP address, followed by /32
  • Protocols and ports: allow specific protocols: tcp:8088;tcp:50070;tcp:8080
  • Create the rule

The purpose of this was to allow us to access port 8088/8080/5007 via browser

Copy IP address of master node from the Compute Engine console, and go to http://35.197.115.38:8088/

Port 50070: no such luck. Not open.

$ nmap -Pn -p 8088 35.197.115.38/32

Starting Nmap 7.60 ( https://nmap.org ) at 2017-09-24 14:54 PDT
Nmap scan report for 38.115.197.35.bc.googleusercontent.com (35.197.115.38)
Host is up (0.075s latency).

PORT     STATE SERVICE
8088/tcp open  radan-http

Nmap done: 1 IP address (1 host up) scanned in 0.26 seconds

$ nmap -Pn -p 50070 35.197.115.38/32

Starting Nmap 7.60 ( https://nmap.org ) at 2017-09-24 14:54 PDT
Nmap scan report for 38.115.197.35.bc.googleusercontent.com (35.197.115.38)
Host is up (0.070s latency).

PORT      STATE  SERVICE
50070/tcp closed unknown

Nmap done: 1 IP address (1 host up) scanned in 0.16 seconds

Side note: a bit confused about SSH firewall rules. Once I have my public key incorporated into the Compute Engine instance, there was no need for a firewall rule. But the "Connecting to a Linux Instance" documentation states:

"Note: Your Google Cloud Platform VPC network must have one or more firewall rules that allow SSH connections on port 22. The firewall rules must allow SSH connections for the IP ranges or specific IP addresses from which you want to connect."

Link: https://cloud.google.com/compute/docs/instances/connecting-to-instance#standardssh

Also looks like you can have a VPN network that connects to the Google Cloud Platform VPC. That makes the Google Compute nodes into internal IP addresses on the network.

Bastion instances: these are special nodes that have an internal and an external IP address, so that you can use them as a gateway for connecting to a Google Compute Engine network

Link: https://cloud.google.com/solutions/connecting-securely#bastion

Creating Dataproc Clusters from Command Line Lab

Start by installing gcloud command line tool:

$ brew install caskroom/cask/google-cloud-sdk

Add this to bashrc:

source '/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.bash.inc'
source '/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/completion.bash.inc'

Now authorize google cloud. If you try and allocate compute nodes before authenticating, you'll see:

$ gcloud dataproc clusters create my-fancy-cluster \
 --zone us-west1-a \
 --master-machine-type n1-standard-1 \
 --master-boot-disk-size 20 \
 --num-workers 2 \
 --worker-machine-type n1-standard-1 \
 --worker-boot-disk-size 20

ERROR: (gcloud.dataproc.clusters.create) PERMISSION_DENIED: Permission denied on resource project quiet-era-180418 (#0)
- '@type': type.googleapis.com/google.rpc.Help
  links:
  - description: Google developers console
    url: https://console.developers.google.com

To authenticate:

$ gcloud init

Link: https://cloud.google.com/sdk/docs/authorizing

Cloud Dataproc Remote Conections

Secure connections to VM: https://cloud.google.com/solutions/connecting-securely#bastion

  • How to protect services on machines with external IPs
  • How to connect to machines that do not have external IPs
  • Firewall rules

Connecting to Linux instances: https://cloud.google.com/compute/docs/instances/connecting-to-instance#standardssh

  • How to connect to compute engine instances

Module 2: Running Dataproc jobs

Running Pig and Spark Programs

Two objectives in the lab:

  • Submit Pig/Hive/Spark/Hadoop jobs using the cloud console's job submission form
  • SSH into the cluster master and execute jobs using PySpark's Read-Evaluate-Process-Loop (REPL) interpreter

Procedure:

  • Create firewall rule to ssh into cluster
  • Create dataproc cluster
  • Copy scripts being run into a bucket
  • Run PySpark REPL
  • Run Pig job reading from HDFS

Firewall rule:

  • Left menu > Networking > VPC Networking > Firewall rules
  • Create new firewall rule (or modify existing)
  • Set target to all instanes on network
  • IP ranges should be based on your IP address
  • Protocol should be "tcp:8088;tcp:9870;tcp:8080"

Dataproc cluster creation:

  • 2-worker cluster
  • Use the command line:
$ gcloud dataproc clusters create my-spark-cluster \
 --zone us-west1-a \
 --master-machine-type n1-standard-1 \
 --master-boot-disk-size 50 \
 --num-workers 2 \
 --worker-machine-type n1-standard-1 \
 --worker-boot-disk-size 50 \
 --network=default

Bucket creation:

  • Standard bucket, same region as cluster
  • Use the command line
$ gsutil mb -c regional -l us-west1 gs://my-spark-cluster

Put scripts into a bucket:

  • Clone the data analyst training github repo in a cloud shell
  • Run the script, pass it your bucket name (basically does some fancy stuff to put bucket name into notebook, and performs a gsutil copy of python and png files...)


Run PySpark REPL:

  • We'll use a simple PySpark code (mapping an array of input values to a simple mathematical function) to test out running REPL script in PySpark
  • Copy the SSH key from the local computer into the SSH key list for the master node
  • Run PySpark by running the command "pyspark"
  • This may take a minute...
data = range(100)
distData = sc.parallelize(data)
squares = distData.map(lambda x : x*x*x*x*x )
res = squares.reduce(lambda a, b : a + b)
print res

Sums up first 100 powers of 5:

$ pyspark
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.9 (default, Jun 29 2016 13:08:31)
SparkSession available as 'spark'.
>>> data = range(100)
>>> distData = sc.parallelize(data)
>>> squares = distData.map(lambda x : x*x*x*x*x )
>>> res = squares.reduce(lambda a, b : a + b)
>>> print res
161708332500
>>>

Another equation, this one shows how reduce() is not, strictly speaking, necessary for simple operations like sum():

>>> import numpy as np
>>> data = range(1000)
>>> distData = sc.parallelize(data)
>>> terms = distData.map(lambda x : 8.0/((2*x+1)*(2*x+1)))
>>> sum = terms.reduce(lambda a,b: a+b)
>>> print sum
9.86760440126
>>> print terms.sum()
9.86760440126
>>> 
>>> print np.sqrt(terms.sum())
3.1412743276027379
>>> print np.sqrt(terms.reduce(lambda a,b : a+b))
3.1412743276027379

Comparing the two approaches shows that the two methods are basically comparable:

>>> start_time = timeit.default_timer(); np.sqrt(terms.sum()); stop_time = timeit.default_timer() - start_time; print stop_time
0.174875020981
>>> start_time = timeit.default_timer(); np.sqrt(terms.reduce(lambda a,b : a+b)); stop_time = timeit.default_timer() - start_time; print stop_time
0.174139976501


Prepare to run the Pig job from HDFS:

  • SSH onto the master node of the cluster
  • First, copy from GCS to local disk (will then copy from local disk to HDFS)
  • Bucket is my-spark-cluster
  • gsutil -m cp gs://my-spark-cluster/unstructured/pet-details* .
  • Note: the -m flag passed to gsutil means multithreaded (copy in parallel)
  • This consists of a data file (.txt) and a pig file (.pig)
  • Then copy from local disk to HDFS
  • hadoop fs -mkdir /pet-details
  • hadoop fs -put pet-details.txt /pet-details
  • http://35.203.180.167:9870/ opens the web control panel for HDFS
  • Pick "Utilities" menu on the right, and "Browse file system" menu option
  • Verify the files you copied are in the HDFS system

Run the Pig job:

  • Feed the pig file to pig (from the local disk, of course)
  • Watch it go!

Once job is finished:

  • Go back to HDFS browse file system
  • Look for GroupedByType directory
  • Look for file named part-r-00000
  • This file contains the results of the MapReduce job

Examine results:

  • hadoop fs -get /GroupedByTypepart* .
  • The script grouped pets by type, and dumped out a set of tuples consisting of name, type, color, wt (lb), wt (kg)

Clean up:

  • Copy results over to the bucket
  • Shut down the cluster

Motivating Serverless Operations

MapReduce - splits big data so each compute node can access a portion of data, locally

GFS (2002) and MapReduce (2004) created the early ideas of a distributed file system

HDFS is a YARN (yet another resource negotiator)

Link: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

Locally-built cluster vs. cloud cluster:

  • With a locally-built cluster, your Hadoop nodes all read the input files, do their processing, and compile their output together at the end
  • At the end of the job, the output files are on the HDFS, so you have to go and collect these
  • Input and output data - persistent data (needed before and after job is run)
  • Locally-built cluster: no problem, you have the cluster forever, it is always persistent
  • Cloud model: you have to treat the input and output data differently, since we're going to spin up and shut down clusters at will

Colossus:

  • Google Cloud Storage buckets provide an elastic disk space usage solution
  • Colossus is a Google file system product (internal technology)
  • Sits on a petabit network (i.e., same network that pushes around the petabytes of data being uploaded to YouTube every day)
  • Colossus doesn't show up - because it's an "implicit product"

Storage options:

  • Can incorporate buckets with input-output part of Hadoop cluster/HDFS
  • Can use any other serverless option:
  • App Engine is a serverless platform
  • BigQuery (used for data warehousing) also serverless, can be queried like an SQL database
  • PubSub is for messaging oriented middleware
  • Dataflow is fore pure elastic pipelines (Apache Beam)
  • Machine Learning APIs are serverless

Hadoop setup:

  • Create a cluster specific to a job
  • Monitor it, submit jobs, etc.
  • Destroy cluster when finished
  • Long-term goal: move to a pure serverless platform that allows you to JUST FOCUS ON THE JOB
  • Example: writing Java or Python program for Apache Beam, letting Google handle the scaling
  • Example: feed a batch of data into Google Cloud Storage, stream it in via PubSub, process it with Dataflow, and store the results back in Cloud Storage, or in BigTable, or in BigQuery
  • These approaches eliminate the actual maintenance of clusters, move you away from having to deal with Hadoop, HDFS, Pig, Hive, etc.
  • That's moving from the initial use case (transforming where you compute) to the other use cases (scaling/reliability, and changing how you compute)

The heart of this problem: compute and storage are closely tied

Hadoop model: storage and compute nodes are too tied together

HBase database: same problem, database spread across entire filesystem

This is the reason for the push to serverless data solutions...

  • Faster
  • Cheaper
  • Flexible

Processing: Dataflow for processing (create pipelines, stack operations together to process streams)

Storage: Cloud Storage for files, BigQuery Storage for tables, Cloud BigTable for NoSQL (replaces Hadoop HBase)

Note that these sources of data provide either input, or output, or both to Dataflow

Note on BigQuery for both processing and storage:

  • BigQuery Analytics: analogous to Dataflow or to distributed compute nodes doing work
  • BigQuery Storage: stores lots of unstructured data in tables

Cloud Dataproc allows Spark jobs to separate compute and storage

  • Slight change from migrating to the cloud; initial step is using HDFS as-is
  • Can use Google Cloud Storage in place of HDFS for the input/output files
  • Just need to install (or use, since already installed) libraries on worker nodes to interact with cloud storage
  • Should be as simple as replacing hdfs:// with gfs://
  • Cloud Storage has faster read/write than traditional network hardware

Yet another advantage:

  • Pre-emptible nodes are not made a part of HDFS (since they might go away at any time)
  • That's why we can make, e.g., 10 core compute nodes, plus 40 pre-emptible nodes, without violating the factor-of-three replication

Serverless Architecture=

Architecture for Dataproc cluster:

  • Set up networking and accounts to connect to the Dataproc cluster
  • Provision the cluster with worker nodes/master node
  • Dataproc Agent manages the Dataproc cluster
  • Master/worker nodes will read input from a bucket in Cloud Storage
  • During the compute phase, master node and worker nodes can share data using HDFS
  • Pre-emptible workers are not part of HDFS, but can still do work
  • Master/worker nodes will write output to a bucket in Cloud Storage
  • This is really fast, so no bottlenecks

Using Dataproc and Cloud Storage:

  • Copy data to GCS via manual copy or via connector
  • Replace hdfs:// with gfs://
  • Write output to GCS, and delete cluster when done

Notes on storage formats:

  • If using HBase previously, can use BigTable as drop-in replacement (same API)
  • If using SQL table previously, can use BigQuery Storage (and later, query it using BigQuery Analytics)

HDFS becomes temporary storage, cluster becomes stateless

Next major step: rewrite your jobs so that they don't use Hadoop, they use Dataflow and Apache Beam

Using Cloud Storage Lab

Lab 3:

  • Create cloud storage bucket and deposit input and application files
  • Submit job using web console
  • Submit job using CLI
  • Monitor job progress/view results
  • Collect output in cloud storage bucket

Setup for the lab:

  • Create a dataproc cluster with 2 workers
  • Already copied input files from the training-data-analyst repository (folder unstructured) into bucket
  • This contains input files and input data

Submitting job via google cloud console:

  • Use a PySpark job, defined in a .py file, along with an input file, defined in a .txt file, all in a cloud storage bucket
  • Click Dataproc > Jobs
  • Click +Submit Job
  • Job type: PySpark
  • Main python file: gs://my-spark-cluster/unstructured/lab2.py
  • No other information needed, click Submit

Submitting job via command line:

$ gcloud dataproc jobs submit pyspark --cluster my-spark-cluster gs://my-spark-cluster/unstructured/lab2.py

Once done, delete the clusters created.

To list the clusters:

$ gcloud dataproc clusters list
NAME              WORKER_COUNT  STATUS   ZONE
my-spark-cluster  2             RUNNING  us-west1-a

To delete the clusters:

$ gcloud dataproc clusters delete my-spark-cluster
The cluster 'my-spark-cluster' and all attached disks will be deleted.

Do you want to continue (Y/n)?  y

Waiting on operation [projects/not-all-broken/regions/global/operations/c74eb958-17d3-48cd-9566-5b2d44b28656].
Waiting for cluster deletion operation...done.
Deleted [https://dataproc.googleapis.com/v1/projects/not-all-broken/regions/global/clusters/my-spark-cluster].

BigQuery Analysis of StackOverflow Dataset

https://codelabs.developers.google.com/codelabs/gcp-aws-bigquery/index.html?index=..%2F..%2Findex#2

#standardSQL
SELECT badge_name AS First_Gold_Badge, 
       COUNT(1) AS Num_Users,
       ROUND(AVG(tenure_in_days)) AS Avg_Num_Days
FROM
(
  SELECT 
    badges.user_id AS user_id,
    badges.name AS badge_name,
    TIMESTAMP_DIFF(badges.date, users.creation_date, DAY) AS tenure_in_days,
    ROW_NUMBER() OVER (PARTITION BY badges.user_id
                       ORDER BY badges.date) AS row_number
  FROM 
    `bigquery-public-data.stackoverflow.badges` badges
  JOIN
    `bigquery-public-data.stackoverflow.users` users
  ON badges.user_id = users.id
  WHERE badges.class = 1 
) 
WHERE row_number = 1
GROUP BY First_Gold_Badge
ORDER BY Num_Users DESC
LIMIT 10

The query generates the top 10 gold badges, ranked by how many users got them as their first gold badges. The query also determines how many days it took for these gold badges to be obtained on average.

Row
First_Gold_Badge
Num_Users
Avg_Num_Days
1
Famous Question
176982
1169.0
2
Fanatic
15085
618.0
3
Unsung Hero
12875
595.0
4
Great Answer
10641
1363.0
5
Electorate
5769
829.0
6
Populist
5491
1227.0
7
Steward
1005
993.0
8
Great Question
582
717.0
9
Copy Editor
253
595.0

You can also query sets of tables using wildcard table names. For more information, see Querying sets of tables using wildcard tables.


Module 3: Leveraging GCP

Customizing Clusters with Initialization

Advantages to moving to cloud storage:

  • Can make clusters ephemeral, no longer competing for single resource
  • High throughput (sustained) file system (GFS)

Examining how to combine Spark and Hadoop with other capabilities of GCP


Overview of what we have covered:

  • Stateless clusters in < 90 seconds
  • Using Hadoop, Pig, Spark
  • High-level APIs for submitting jobs

What we'll cover next:

  • Using scripts to set up a Dataproc cluster with software packages
  • Using software to connect Dataproc to BigTable/BigQuery/Cloud Storage


Dataproc Inputs:

  • Cloud Dataproc
  • BigQuery
  • Cloud Storage
  • BigTable
  • Compute Engine

Dataproc software:

  • Hadoop
  • HCatalog
  • Hive
  • Hue
  • Kafka (if not ready to replace with PubSub)
  • Mahout
  • Oozie
  • Pig
  • Spark
  • Sqoop
  • Tez
  • Zookeeper

Dataproc outputs:

  • Cloud Dataproc
  • BigQuery
  • Cloud Storage
  • Cloud BigTable
  • Compute Engine

Dataproc uses Bigtop to handle consistent packaging (not interacting with it directly)

Example software:

  • Datalab docker image
  • Install and run on master node - utilize cluster for compute, and interact using iPython notebook
  • This requires installing some additional libraries on the master node

To install software:

  • Write your script (bash, python, etc.)
  • Upload script to Cloud Storage
  • Specify location in cloud storage when initializing the Dataproc cluster

Example script: install py4sci

  • apt-get update || true
  • apt-get install -y
  • numpy, scipy, matplotlib, pandas

To limit installation script to master only:

  • Only one script for both master and worker
  • Can do this using the ${ROLE} env variable (metadata)

if [[ "${ROLE}" == 'Master' ]]; then

 apt-get install -y vim

fi

Storing the script:

  • Once script is finished, put it in a cloud storage bucket
  • To initialize standard stuff - git repository with Dataproc initialization actions
  • Link: https://github.com/GoogleCloudPlatform/dataproc-initialization-actions
  • Initialization scripts for whole bunch of software: cloud-sql-proxy, conda, datalab, drill, flink, ganglia, hive, hue, ipython, jupyter, kafka, oozie, presto, stackdriver, tez, zeppelin, zookeeper
  • Note: stackdriver is for monitoring cluster

Note: we can also obtain this through google cloud storage

gsutil ls gs://dataproc-initialization-actions/

This sets up and runs a bash script to obtain the Docker container and spin it up

From command line:

gcloud dataproc clusters create mycluster \

--initialization-actions gs://mybucket/init-actions/my_init.sh \
--initialization-action-timeout 3m

From web console:

Specify location of script in "Initialization actions" text box

If you have to change site configuration for cluster:

  • Changing core-site.xml
  • Use gcloud command line or gcloud SDK like so:
  • "file_prefix:property=value"

Dataproc + Datalab Initialization Lab

Lab tasks:

  • Create Dataproc cluster with initialization action to install Datalab
  • Run Jupyter Notebook on Dataproc cluster via Datalab
  • Create Python and PySpark jobs utilizing Cloud Storage, BigQuery, and Spark

Initialization script:

  • In training-data-analyst repo, unstructured folder, we have an example init script
  • This prints out a hello world message from the master node
  • Use this, or use the ready-to-go dataproc-initializaiton-actions repo, to initialize Datalab

Here was the final command:

$ gcloud dataproc clusters create my-spark-cluster \
--zone us-west1-a \
--master-machine-type n1-standard-1 --master-boot-disk-size 50 \
--num-workers 2 \
--worker-machine-type n1-standard-1 --worker-boot-disk-size 50 \
--network=default \
--initialization-actions gs://dataproc-initialization-actions/datalab/datalab.sh,gs://my-spark-cluster/unstructured/init-script.sh \
--initialization-action-timeout 8m

(Apparently the operation timed out, and experienced an error.)

$ gcloud dataproc clusters create my-spark-cluster \
  --zone us-west1-a --master-machine-type n1-standard-1 --master-boot-disk-size 50 \
  --num-workers 2 \
  --worker-machine-type n1-standard-1 --worker-boot-disk-size 50 \
  --network=default --initialization-actions gs://dataproc-initialization-actions/datalab/datalab.sh,gs://my-spark-cluster/unstructured/init-script.sh \
  --initialization-action-timeout 3m

Waiting on operation [projects/not-all-broken/regions/global/operations/0a5fca0a-8978-4250-9c0d-9566c5ba8ba6].
Waiting for cluster creation operation...done.
ERROR: (gcloud.dataproc.clusters.create) Operation [projects/not-all-broken/regions/global/operations/0a5fca0a-8978-4250-9c0d-9566c5ba8ba6] failed: Initialization action timed out. Failed action 'gs://dataproc-initialization-actions/datalab/datalab.sh', see output in: gs://dataproc-46a56292-2a64-4744-b3c4-d030544b5bb9-us/google-cloud-dataproc-metainfo/bbdf0e10-aeba-4768-ba2c-5495a1de0cbd/my-spark-cluster-m/dataproc-initialization-script-0_output.

We wait for the init script to spin up.

Now we need to set up firewall rules so that we can access the Datalab instance

Select VPC Network > Firewall and add a firewall rule

Add your IP address, add target "All instances in network," and for protocols and ports add tcp:8088;tcp:50070;tcp:8080

(Why not 8081?)

Note: if you create a firewall rule, and you add your specific IP address, then the remote instance will only open the Datalab instance to YOUR machine. If you don't add an IP address, or if you open it more broadly, you could allow more machines (attackers) to access Datalab.

Firewall rule:

  • Opening ports, as usual:
  • "tcp:8088; tcp:50070; tcp:8080; tcp:9870; tcp:8081"
  • (It's actually running on port 8080, weirdly.)

Enabling APIs:

  • The APIs are not working.
  • To enable an API, you have to shut down an entire machine.
  • But... this... I don't...
  • It goddamn lists BigQuery API as ENABLED

If your browser is on a different machine then exit and re-run this application with the command-line parameter

  --noauth_local_webserver

Ooookay, trying this again, but creating the cluster from the web console this time.

Checked the box to allow API access for entire project.

(And of course, the prior notebook disappeared to... who the fuck knows where...)

Mistake from prior not-working attempts: incorrectly specified project ID.

>>> data = gbq.read_gbq(sql, project_id=projectId)

Requesting query... ok.
Query running...
Query done.
Processed: 2.1 Gb

Retrieving results...
Got 40 rows.

Total time taken 3.21 s.
Finished at 2017-09-26 01:56:18.

Spark and BigQuery: Why and Workflow

If BigQuery is serverless, and scales to thousands of nodes, why would you run a BigQuery command from Spark?

Answer: Machine learning.

You may want to feed data from a BigQuery statement into Spark so that you can perform machine learning on the results.

Who does what work depends on what parts of the Jupyter notebook we're looking at:

  • BigQuery statements are serverless and automatically scaled out - so when we run the BigQuery statement from Spark, it isn't the Spark nodes running the query, it's BigQuery and the BigQuery infrastructure
  • When the results are returned, the data can be distributed among the Spark nodes to do ML training
  • BigQuery runs its search, and returns (potentially) LOTS of data
  • BigQuery can shard the data and export it to Google Cloud Storage (previously, this would be HDFS)
  • Spark (or Hadoop) then pulls the sharded data from GCS (previously, HDFS)
  • From there, we can perform computations on the data using Spark

What does this look like:

sc = pyspark.SparkContext()
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Specify parameters for BigQuery input
    'mapred.bq.project.id' : project,
    'mapred.bq.gcs.bucket' : bucket,
    'mapred.bq.temp.gcs.path' : input_directory,
    'mapred.bq.input.project.id' : 'publicdata',
    'mapred.bq.input.dataset.id' : 'samples',
    'mapred.bq.input.table.id' : 'shakespeare'
}

This identifies the bucket/project where the BigQuery results will go

It then creates a connector between BigQuery and Google Cloud storage

(We then run the BigQuery)

Recall RDD = resilient distributed dataset

# Load data from BigQuery
# This exports the BigQuery table as JSON into Google Cloud Storage,
# then reads it
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf = conf)

Now, we run the actual Spark code: this is what operates on the distributed data set:

# Perform a word count
word_counts = (
    table_data
        .map(lambda (_, record): json.loads(record))
        .map(lambda x : (x['word'].lower(), int( x['word_count'] )))
        .reduceByKey(lambda x, y: x + y)
    )

# Now display 10 results
pprint(word_counts.take(10))

Where do the results of the Spark computation go? The results are sharded (distributed among lots of different files) on each of the different worker nodes. Each worker node writes its results to the same bucket.

Here's how we output the sharded results files to GCS:

# Stage data formatted as 
# newline-delimited JSON in GCS
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)

# Now use the partitions to index the output files
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(j) for j in partitions]

(word_counts
    .map( lambda (w, c) : json.dumps({'word' : w, 'word_count' : c}) )
    .saveAsTextFile(output_directory)
)

Reasoning: GCS is immutable blob storage, so if all nodes writing to same file, you're going to have each process overwriting the data from the prior node. Instead, each process writes to its own file on GCS.

Now close the other end of the loop: IMPORT the sharded results from PySpark into BigQuery:

# Output parameters
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_table'

# Use subprocess to make the BigQuery call
# to put the JSON files into BigQuery
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--schema word:STRING,word_count:INTEGER '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=','.join(output_files)
    ).split()
)

Last step is to clean up the PySpark result files in GCS:

input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(output_path,True)

If you can't do everything you want in BigQuery (if UDF are too limited, for example), you can use the workflow:

(BigQuery data) --> (PySpark) --> (BigQuery table)

This raises another problem: sometimes your BigQuery statement will return TOO MUCH data to fit into a Pandas dataframe (too much data to fit in memory)

References

Flags