From charlesreid1

Serverless Data Analysis with BigQuery and Dataflow

Module 1: Serverless Data Analysis with BigQuery

Data Engineers

Data engineers enable decision-making

They do this by:

  • Building data pipelines
  • Ingesting data
  • processing data
  • Building analysis tools
  • Building dashboards
  • Building machine-learning models

Enabling decision-making in a systematic way

Data engineers must know both programming and statistics in depth

Advantage of cloud services:

  • Amount of programming you need to know has gotten simpler
  • Statistics realm has also gotten simpler
  • These have enabled better analysis - programming with data, building statistical machine learning models, etc.
  • Enables end-to-end data engineers: building data pipelines, all the way through building statistical machine learning models

Serverless Data Pipelines

Building data pipelines using BigQuery and Cloud Dataflow

What you need:

  • Python or Java (for Dataflow)
  • SQL (for BigQuery)

BigQuery:

  • No-ops data warehouse solution - you ask it a question, you get an answer, and that's it (scales to petabytes)
  • Topics: queries, functions, load/export, nested repeated fields, windows, UDFs
  • Labs: queries and functions, loading and exporting data, demos

Cloud Dataflow:

  • No-ops data pipeline for scalable data processing - you ask it to process data in a certain way, and you get the result (scales to large streams)
  • Writing programs to process data (batch or streaming, code can work on both)
  • Topics: pipeline concepts, using MapReduce, side inputs, streaming
  • Labs: Simple pipelines, MapReduce, side inputs, demos

BigQuery

BigQuery:

  • How it works
  • Constructing queries and using functions
  • Loading data into BigQuery
  • Exporting data from BigQuery
  • Advanced capabilities (nesting, etc.)
  • Performance and pricing

Labs: very structured; recommended you try different tasks with different data sets

History:

  • GFS/MapReduce papers (2002-2004), Dataproc, BigTable
  • Part of Google Big Data Stack 1.0
  • Dremel/BigQuery (2006), Dataflow (2014), TensorFlow (2015)
  • Part of Google Big Data Stack 2.0

Big Data Stack 2.0 serverless approach is a result of problems with the MapReduce framework

MapReduce framework:

  • Parallelize your data storage (sharded datasets)
  • Parallelize the map operations, so each map operation works on whatever shards of data the compute node owns
  • These results are then recombined on a different set of machines in the reduce operation/step
  • The MapReduce framework requires a prior step: you have to take your data, and you have to shard it.
  • This presents a scaling problem, because now you are mixing up storage and compute
  • The number of machines you need drives how you spilt your data
  • Each time you do a compute job, you first need to figure out where the data are stored

The TLDR: The serverless approach scales your infrastructure to fit your data; the MapReduce approach scales your data to fit your infrastructure.

bigquery.cloud.google.com

Two examples of BigQuery queries:

SELECT
    nppes_provider_state AS state,
    ROUND(SUM(total_claim_count))/1e6) AS total_claim_count_millions
FROM
    `bigquery-public-data.medicare.part_d_prescriber_2014`
GROUP BY
    state
ORDER BY
    total_claim_count_millions DESC
LIMIT
    5;

This can easily be switched to listing claims by drug, instead of by state: just change the two state field names to drug.

SELECT
    drug,
    ROUND(SUM(total_claim_count))/1e6) AS total_claim_count_millions
FROM
    `bigquery-public-data.medicare.part_d_prescriber_2014`
GROUP BY
    drug
ORDER BY
    total_claim_count_millions DESC
LIMIT
    5;
--------------------------------------------

Features/components:

  • Interact with petabyte scale datasets
  • Uses SQL 2011 query language and SQL functions
  • Multiple methods for importing/exporting (incl. third party tools)
  • Nested and repeated fields (JSON logs) are ok - although this goes beyond the regular SQL language
  • User-defined functions in Javascript
  • Data storage inexpensive
  • Queries are charged based on amount of data processed (also fixed-cost plans)
  • Immutable audit logs
  • Can use access control to share data with others (without hassles of ops), so can enable data collaboration
  • Can also share queries, so e.g., "How many people are taking course X" - respond with a query that they can open anytime

BigQuery Project Architecture

Pattern for analytics architecture:

  • Mobile gaming - things are happening continuously in the game, generating a stream of events
  • Events are passed (through game server authentication) to PubSub for asynchronous messaging
  • Messaging is fed to Dataflow for parallel data processing
  • Dataflow outputs to BigQuery
  • BigQuery data can then be analyzed using Datalab, or BI tools like Tableau, or spreadsheets, etc.

There is also a batch component (e.g., historical data)

  • Batch load sent to cloud storage (historical data, raw log storage)
  • Storage and logs are sent to Dataflow for parallel processing (batch pipeline)
  • Dataflow outputs to BigQuery
  • etc.

Access control:

  • Project level - top level, where the billing occurs
  • Dataset level - organization and access control level
  • Tables - data with schema (not where access control occurs)
  • Access control happens at the dataset level, because you want to be able to join tables together

Tables:

  • Table views are virtual tables, defined by SQL queries
  • Tables can also be external (e.g., live on Cloud Storage)

Columns vs Rows:

  • Relational databases are row-based (record-oriented storage) to support updates to existing records
  • BigQuery storage is column-based (more compact/compressed, easier to replicate data)
  • BigQuery does not utilize keys
  • Designed for massive and immutable datasets

Consequences for cost: running a query on fewer columns means lower cost


Queries and Functions

Note: to get all of the data sets, open bigquery.cloud.google.com Click the triangle next to the project name Pick Switch to project > Display project Then enter the following:

  • bigquery-samples
  • publicdata
  • bigquery-public-data


Run queries from web console (bigquery.cloud.google.com)

When calling BigQuery query client program (Java/Python), need to specify project (not needed in web console - current project) when specifying where to select FROM (project.dataset.table)

Consider example query:

SELECT
    airline,
    SUM(IF(arrival_delay > 0), 1, 0)) AS num_delayed,
    COUNT(arrival_delay) AS total_flights

FROM
    `bigquery-samples.airline_ontime-data.flights`

WHERE
    arrival_airport='OKC'
    AND departure_airport='DFW'

GROUP BY
    airline


SQL features to point out:

  • Can rename variables "on the fly" and return them with more convenient handles
  • SELECT some-inconveniently-long-name AS result1
  • Can perform aggregate operations (or even conditional aggregate operations) on the fields being selected
  • SELECT airline, SUM(IF(arrival_delay > 0, 1, 0)) AS num_delayed
  • Can perform counts of non-null fields
  • SELECT COUNT(arrival_delay) AS total_flights
  • (This arrival_delay field would be null if a flight were canceled, otherwise it is a number)
  • Can perform counts of non-null, non-zero fields
  • SELECT COUNT(IF(arrival_delay > 0, 1, 0)) AS total_delayed_flights
  • Can set conditions using the WHERE clause
  • WHERE arrival_airport='OKC'
  • WHERE departure_airport='DFW'
  • Can also combine these with logical operators:
  • WHERE arrival_airport='OKC' AND departure_airport='DFW'
  • We can group records (think of this as tossing them all into bags)
  • GROUP BY airline, departure_airport
  • This creates a bag for each combination of airline and departure airport
  • Each bag contains many flights - we are performing aggregation operations on each bag (count, sum, etc.)


Performing subqueries

Subqueries use one query to feed another query

Using the query from before, we can fee those results into a new query - we are now performing a query on the TEMPORARY RESULTS TABLE

SELECT
    airline, 
    departure_airport, 
    num_delayed, 
    total_flights, 
    num_delayed/total_flights AS delayed_frac

FROM

    (
    SELECT
        airline, departure_airport,
        SUM(IF(arrival_delay > 0, 1, 0)) AS num_delayed,
        COUNT(arrival_delay) AS total_flights
    FROM
        `bigquery-samples.airline_ontime_data.flights`
    WHERE
        arrival_airport='OKC'
    GROUP BY
        airline, departure_airport
    )

WHERE
    total_flights > 5

ORDER BY delayed_Frac DESC 

LIMIT 5

The first query returns a table with four columns: airline, departure_airport, num_delayed, total_flights

From those results, we perform a second query that further filters results (and performs a calculation)

This tells you the airline OO is delayed, leaving Atlanta, 72% of the time (most delayed departure location + airline combination)


Querying from multiple tables

To query from multiple tables, we specify multiple tables in the FROM clause

This query examines non-internal root actions over the course of three days:

SELECT
    FORMAT_UTC_USEC(event.timestamp_in_usec) AS time,
    request_url

FROM
    [myproject.applogs.events_20120501],
    [myproject.applogs.events_20120502],
    [myproject.applogs.events_20120503]

WHERE
    event.username='root'
    AND NOT event.source_ip.is_internal

If we want to match any tables within a date range, can use a "wildcard" FROM:

FROM
    TABLE_DATE_RANGE(myproject:applogs.events_,
                    TIMESTAMP('2012-05-01'),
                    TIMESTAMP('2012-05-03'))

This makes it easier to select larger date ranges without the complications of wildcards not selecting the right set of tables (May-July, for example)

(Not entirely clear how TABLE_DATE_RANGE or TIMESTAMP are working together, but ok, that's an SQL issue.


Joining tables

Nice explanation of inner vs outer joins: http://www.programmerinterview.com/index.php/database-sql/inner-vs-outer-joins/

Join predicate - the setting that tells the join what fields it is actually joining

Inner joins only pick results in common to both

Outer joins fill in records missing from one or the other with NULL

Left outer join leaves in all records that are in the left table, filling in NULL for fields missing in the right table. Any records that exist in the right table but not in the left table are dropped.

Right outer join leaves in all records that are in the right table, filling in NULL for fields missing in the left table. Any records that exist in the left table but not the right table are dropped.

Full outer join will include every record in both left and right, joining records that can be joined, and filling in NULL for missing fields of records that only exist in left or right tables. All records are kept/included in the join.

To join tables, use the JOIN ON statement (the join predicate comes after the ON)

(Note: this uses the fact that we can also create "handles" for tables, like this:)

FROM `bigquery-samples.airline_ontime_data.flights` AS f

Here's the example, which we'll break down:

SELECT
    f.airline,
    SUM(IF(f.arrival_delay) > 0, 1, 0)) AS num_delayed,
    COUNT(f.arrival_delay) AS total_flights

FROM
    `bigquery-samples.airline_ontime_data.flights` as f

JOIN (
    SELECT
        CONCAT(
            CAST(year AS STRING),'-',
            LPAD(CAST(month AS STRING),2,'0'),'-',
            LPAD(CAST(day AS STRING),2,'0')
        ) AS rainyday
    FROM
        `bigquery-samples.weather-geo.gsod`
    WHERE
        station_number = 725030
        AND total_precipitation > 0
    ) as w

ON
    w.rainyday = f.date

WHERE
    f.arrival_airport = 'LGA'

GROUP BY
    f.airline

Start with the subquery:

    SELECT
        CONCAT(
            CAST(year AS STRING),'-',
            LPAD(CAST(month AS STRING),2,'0'),'-',
            LPAD(CAST(day AS STRING),2,'0')
        ) AS rainyday
    FROM
        `bigquery-samples.weather-geo.gsod`
    WHERE
        station_number = 725030
        AND total_precipitation > 0
    ) as w

This is selecting dates where there was precipitation at a particular weather station, and combining separate year/month/day fields into a sensible string that is comparable to the strings in the airline data set. (Left pad means, pad this number into a 2-digit number, using 0s to pad, so 3 is turned into 03)

We are then using a field that we haven't used yet - the date field of the airline delays database - to FILTER the airline delays we are looking at

So the logic here is:

  • Construct a subquery that selects rainy days from the weather data set
  • Do an inner join on the rainy days dates and the airline delays dates, tossing out the rest of the weather data set (this will also discard any flight delay info that occurs on non-rainy days, and any rainy days that we don't have flight delays for)
  • Perform the same aggregation as before, counting number of delays and number of total flights

NOTE: if we want to first explore the sub-query, can run this sub-query; important to add LIMIT 5!

SELECT
    year, month, day, precipitation

FROM 
    `bigquery-samples.weather-geo.gsod`

WHERE
    station_number = 725030
    AND total_precipitation > 0

LIMIT 5


Building Queries: Lab

https://codelabs.developers.google.com/codelabs/cpb101-bigquery-query/#0

Goal of lab:

  • Run some queries of data
  • Modify queries to explore different features (functions, booleans, string processing, subqueries, etc.)


Running a basic query

SELECT
  airline,
  date,
  departure_delay
FROM
  `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0
  AND departure_airport = 'LGA'
LIMIT
  100


Aggregate and boolean functions

SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

Change to:

SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0 AND
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

Now run:

SELECT
  f.airline,
  COUNT(f.departure_delay) AS total_flights,
  SUM(IF(f.departure_delay > 0, 1, 0)) AS num_delayed
FROM
   `bigquery-samples.airline_ontime_data.flights` AS f
WHERE
  f.departure_airport = 'LGA' AND f.date = '2008-05-13'
GROUP BY
  f.airline


String operations

Run the query:

SELECT
  CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
FROM
  `bigquery-samples.weather_geo.gsod`
WHERE
  station_number = 725030
  AND total_precipitation > 0

Joining queries on field

Join sql statement:

SELECT
  f.airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline


Using subqueries

Example query that uses a subquery: runs one query, feeds the results on to another query

SELECT
  airline,
  num_delayed,
  total_flights,
  num_delayed / total_flights AS frac_delayed
FROM (
SELECT
  f.airline AS airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
  )
ORDER BY
  frac_delayed ASC

Make Your Own BigQuery

Baseball data set: https://cloud.google.com/bigquery/public-data/baseball

games_wide: Every pitch, steal, or lineup event for each at bat in the 2016 regular season. games_post_wide: Every pitch, steal, or lineup event for each at-bat in the 2016 post season.

(Same schema for both)


Example queries:

"What types of pitches were thrown, how many, top speeds?"

"What was the at bat and pitching sequence for the fastest pitch(es) in the 2016 season?"


Github repo: https://github.com/charlesreid1/sabermetrics-bigquery

Module 2: Data Processing Pipelines with Dataflow

References

Flags