GCDEC/BigQuery/Notes
From charlesreid1
Serverless Data Analysis with BigQuery and Dataflow
Module 1: Serverless Data Analysis with BigQuery
Data Engineers
Data engineers enable decision-making
They do this by:
- Building data pipelines
- Ingesting data
- processing data
- Building analysis tools
- Building dashboards
- Building machine-learning models
Enabling decision-making in a systematic way
Data engineers must know both programming and statistics in depth
Advantage of cloud services:
- Amount of programming you need to know has gotten simpler
- Statistics realm has also gotten simpler
- These have enabled better analysis - programming with data, building statistical machine learning models, etc.
- Enables end-to-end data engineers: building data pipelines, all the way through building statistical machine learning models
Serverless Data Pipelines
Building data pipelines using BigQuery and Cloud Dataflow
What you need:
- Python or Java (for Dataflow)
- SQL (for BigQuery)
BigQuery:
- No-ops data warehouse solution - you ask it a question, you get an answer, and that's it (scales to petabytes)
- Topics: queries, functions, load/export, nested repeated fields, windows, UDFs
- Labs: queries and functions, loading and exporting data, demos
Cloud Dataflow:
- No-ops data pipeline for scalable data processing - you ask it to process data in a certain way, and you get the result (scales to large streams)
- Writing programs to process data (batch or streaming, code can work on both)
- Topics: pipeline concepts, using MapReduce, side inputs, streaming
- Labs: Simple pipelines, MapReduce, side inputs, demos
BigQuery
BigQuery:
- How it works
- Constructing queries and using functions
- Loading data into BigQuery
- Exporting data from BigQuery
- Advanced capabilities (nesting, etc.)
- Performance and pricing
Labs: very structured; recommended you try different tasks with different data sets
History:
- GFS/MapReduce papers (2002-2004), Dataproc, BigTable
- Part of Google Big Data Stack 1.0
- Dremel/BigQuery (2006), Dataflow (2014), TensorFlow (2015)
- Part of Google Big Data Stack 2.0
Big Data Stack 2.0 serverless approach is a result of problems with the MapReduce framework
MapReduce framework:
- Parallelize your data storage (sharded datasets)
- Parallelize the map operations, so each map operation works on whatever shards of data the compute node owns
- These results are then recombined on a different set of machines in the reduce operation/step
- The MapReduce framework requires a prior step: you have to take your data, and you have to shard it.
- This presents a scaling problem, because now you are mixing up storage and compute
- The number of machines you need drives how you spilt your data
- Each time you do a compute job, you first need to figure out where the data are stored
The TLDR: The serverless approach scales your infrastructure to fit your data; the MapReduce approach scales your data to fit your infrastructure.
bigquery.cloud.google.com
Two examples of BigQuery queries:
SELECT
nppes_provider_state AS state,
ROUND(SUM(total_claim_count))/1e6) AS total_claim_count_millions
FROM
`bigquery-public-data.medicare.part_d_prescriber_2014`
GROUP BY
state
ORDER BY
total_claim_count_millions DESC
LIMIT
5;
This can easily be switched to listing claims by drug, instead of by state: just change the two state field names to drug.
SELECT
drug,
ROUND(SUM(total_claim_count))/1e6) AS total_claim_count_millions
FROM
`bigquery-public-data.medicare.part_d_prescriber_2014`
GROUP BY
drug
ORDER BY
total_claim_count_millions DESC
LIMIT
5;
- --------------------------------------------
Features/components:
- Interact with petabyte scale datasets
- Uses SQL 2011 query language and SQL functions
- Multiple methods for importing/exporting (incl. third party tools)
- Nested and repeated fields (JSON logs) are ok - although this goes beyond the regular SQL language
- User-defined functions in Javascript
- Data storage inexpensive
- Queries are charged based on amount of data processed (also fixed-cost plans)
- Immutable audit logs
- Can use access control to share data with others (without hassles of ops), so can enable data collaboration
- Can also share queries, so e.g., "How many people are taking course X" - respond with a query that they can open anytime
BigQuery Project Architecture
Pattern for analytics architecture:
- Mobile gaming - things are happening continuously in the game, generating a stream of events
- Events are passed (through game server authentication) to PubSub for asynchronous messaging
- Messaging is fed to Dataflow for parallel data processing
- Dataflow outputs to BigQuery
- BigQuery data can then be analyzed using Datalab, or BI tools like Tableau, or spreadsheets, etc.
There is also a batch component (e.g., historical data)
- Batch load sent to cloud storage (historical data, raw log storage)
- Storage and logs are sent to Dataflow for parallel processing (batch pipeline)
- Dataflow outputs to BigQuery
- etc.
Access control:
- Project level - top level, where the billing occurs
- Dataset level - organization and access control level
- Tables - data with schema (not where access control occurs)
- Access control happens at the dataset level, because you want to be able to join tables together
Tables:
- Table views are virtual tables, defined by SQL queries
- Tables can also be external (e.g., live on Cloud Storage)
Columns vs Rows:
- Relational databases are row-based (record-oriented storage) to support updates to existing records
- BigQuery storage is column-based (more compact/compressed, easier to replicate data)
- BigQuery does not utilize keys
- Designed for massive and immutable datasets
Consequences for cost: running a query on fewer columns means lower cost
Queries and Functions
Note: to get all of the data sets, open bigquery.cloud.google.com Click the triangle next to the project name Pick Switch to project > Display project Then enter the following:
- bigquery-samples
- publicdata
- bigquery-public-data
Run queries from web console (bigquery.cloud.google.com)
When calling BigQuery query client program (Java/Python), need to specify project (not needed in web console - current project) when specifying where to select FROM (project.dataset.table)
Consider example query:
SELECT
airline,
SUM(IF(arrival_delay > 0), 1, 0)) AS num_delayed,
COUNT(arrival_delay) AS total_flights
FROM
`bigquery-samples.airline_ontime-data.flights`
WHERE
arrival_airport='OKC'
AND departure_airport='DFW'
GROUP BY
airline
SQL features to point out:
- Can rename variables "on the fly" and return them with more convenient handles
- SELECT some-inconveniently-long-name AS result1
- Can perform aggregate operations (or even conditional aggregate operations) on the fields being selected
- SELECT airline, SUM(IF(arrival_delay > 0, 1, 0)) AS num_delayed
- Can perform counts of non-null fields
- SELECT COUNT(arrival_delay) AS total_flights
- (This arrival_delay field would be null if a flight were canceled, otherwise it is a number)
- Can perform counts of non-null, non-zero fields
- SELECT COUNT(IF(arrival_delay > 0, 1, 0)) AS total_delayed_flights
- Can set conditions using the WHERE clause
- WHERE arrival_airport='OKC'
- WHERE departure_airport='DFW'
- Can also combine these with logical operators:
- WHERE arrival_airport='OKC' AND departure_airport='DFW'
- We can group records (think of this as tossing them all into bags)
- GROUP BY airline, departure_airport
- This creates a bag for each combination of airline and departure airport
- Each bag contains many flights - we are performing aggregation operations on each bag (count, sum, etc.)
Performing subqueries
Subqueries use one query to feed another query
Using the query from before, we can fee those results into a new query - we are now performing a query on the TEMPORARY RESULTS TABLE
SELECT
airline,
departure_airport,
num_delayed,
total_flights,
num_delayed/total_flights AS delayed_frac
FROM
(
SELECT
airline, departure_airport,
SUM(IF(arrival_delay > 0, 1, 0)) AS num_delayed,
COUNT(arrival_delay) AS total_flights
FROM
`bigquery-samples.airline_ontime_data.flights`
WHERE
arrival_airport='OKC'
GROUP BY
airline, departure_airport
)
WHERE
total_flights > 5
ORDER BY delayed_Frac DESC
LIMIT 5
The first query returns a table with four columns: airline, departure_airport, num_delayed, total_flights
From those results, we perform a second query that further filters results (and performs a calculation)
This tells you the airline OO is delayed, leaving Atlanta, 72% of the time (most delayed departure location + airline combination)
Querying from multiple tables
To query from multiple tables, we specify multiple tables in the FROM clause
This query examines non-internal root actions over the course of three days:
SELECT
FORMAT_UTC_USEC(event.timestamp_in_usec) AS time,
request_url
FROM
[myproject.applogs.events_20120501],
[myproject.applogs.events_20120502],
[myproject.applogs.events_20120503]
WHERE
event.username='root'
AND NOT event.source_ip.is_internal
If we want to match any tables within a date range, can use a "wildcard" FROM:
FROM
TABLE_DATE_RANGE(myproject:applogs.events_,
TIMESTAMP('2012-05-01'),
TIMESTAMP('2012-05-03'))
This makes it easier to select larger date ranges without the complications of wildcards not selecting the right set of tables (May-July, for example)
(Not entirely clear how TABLE_DATE_RANGE or TIMESTAMP are working together, but ok, that's an SQL issue.
Joining tables
Nice explanation of inner vs outer joins: http://www.programmerinterview.com/index.php/database-sql/inner-vs-outer-joins/
Join predicate - the setting that tells the join what fields it is actually joining
Inner joins only pick results in common to both
Outer joins fill in records missing from one or the other with NULL
Left outer join leaves in all records that are in the left table, filling in NULL for fields missing in the right table. Any records that exist in the right table but not in the left table are dropped.
Right outer join leaves in all records that are in the right table, filling in NULL for fields missing in the left table. Any records that exist in the left table but not the right table are dropped.
Full outer join will include every record in both left and right, joining records that can be joined, and filling in NULL for missing fields of records that only exist in left or right tables. All records are kept/included in the join.
To join tables, use the JOIN ON statement (the join predicate comes after the ON)
(Note: this uses the fact that we can also create "handles" for tables, like this:)
FROM `bigquery-samples.airline_ontime_data.flights` AS f
Here's the example, which we'll break down:
SELECT
f.airline,
SUM(IF(f.arrival_delay) > 0, 1, 0)) AS num_delayed,
COUNT(f.arrival_delay) AS total_flights
FROM
`bigquery-samples.airline_ontime_data.flights` as f
JOIN (
SELECT
CONCAT(
CAST(year AS STRING),'-',
LPAD(CAST(month AS STRING),2,'0'),'-',
LPAD(CAST(day AS STRING),2,'0')
) AS rainyday
FROM
`bigquery-samples.weather-geo.gsod`
WHERE
station_number = 725030
AND total_precipitation > 0
) as w
ON
w.rainyday = f.date
WHERE
f.arrival_airport = 'LGA'
GROUP BY
f.airline
Start with the subquery:
SELECT
CONCAT(
CAST(year AS STRING),'-',
LPAD(CAST(month AS STRING),2,'0'),'-',
LPAD(CAST(day AS STRING),2,'0')
) AS rainyday
FROM
`bigquery-samples.weather-geo.gsod`
WHERE
station_number = 725030
AND total_precipitation > 0
) as w
This is selecting dates where there was precipitation at a particular weather station, and combining separate year/month/day fields into a sensible string that is comparable to the strings in the airline data set. (Left pad means, pad this number into a 2-digit number, using 0s to pad, so 3 is turned into 03)
We are then using a field that we haven't used yet - the date field of the airline delays database - to FILTER the airline delays we are looking at
So the logic here is:
- Construct a subquery that selects rainy days from the weather data set
- Do an inner join on the rainy days dates and the airline delays dates, tossing out the rest of the weather data set (this will also discard any flight delay info that occurs on non-rainy days, and any rainy days that we don't have flight delays for)
- Perform the same aggregation as before, counting number of delays and number of total flights
NOTE: if we want to first explore the sub-query, can run this sub-query; important to add LIMIT 5!
SELECT
year, month, day, precipitation
FROM
`bigquery-samples.weather-geo.gsod`
WHERE
station_number = 725030
AND total_precipitation > 0
LIMIT 5
Building Queries: Lab
https://codelabs.developers.google.com/codelabs/cpb101-bigquery-query/#0
Goal of lab:
- Run some queries of data
- Modify queries to explore different features (functions, booleans, string processing, subqueries, etc.)
Running a basic query
SELECT airline, date, departure_delay FROM `bigquery-samples.airline_ontime_data.flights` WHERE departure_delay > 0 AND departure_airport = 'LGA' LIMIT 100
Aggregate and boolean functions
SELECT airline, COUNT(departure_delay) FROM `bigquery-samples.airline_ontime_data.flights` WHERE departure_airport = 'LGA' AND date = '2008-05-13' GROUP BY airline ORDER BY airline
Change to:
SELECT airline, COUNT(departure_delay) FROM `bigquery-samples.airline_ontime_data.flights` WHERE departure_delay > 0 AND departure_airport = 'LGA' AND date = '2008-05-13' GROUP BY airline ORDER BY airline
Now run:
SELECT f.airline, COUNT(f.departure_delay) AS total_flights, SUM(IF(f.departure_delay > 0, 1, 0)) AS num_delayed FROM `bigquery-samples.airline_ontime_data.flights` AS f WHERE f.departure_airport = 'LGA' AND f.date = '2008-05-13' GROUP BY f.airline
String operations
Run the query:
SELECT CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday FROM `bigquery-samples.weather_geo.gsod` WHERE station_number = 725030 AND total_precipitation > 0
Joining queries on field
Join sql statement:
SELECT
f.airline,
SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
COUNT(f.arrival_delay) AS total_flights
FROM
`bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
SELECT
CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
FROM
`bigquery-samples.weather_geo.gsod`
WHERE
station_number = 725030
AND total_precipitation > 0) AS w
ON
w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
Using subqueries
Example query that uses a subquery: runs one query, feeds the results on to another query
SELECT
airline,
num_delayed,
total_flights,
num_delayed / total_flights AS frac_delayed
FROM (
SELECT
f.airline AS airline,
SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
COUNT(f.arrival_delay) AS total_flights
FROM
`bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
SELECT
CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
FROM
`bigquery-samples.weather_geo.gsod`
WHERE
station_number = 725030
AND total_precipitation > 0) AS w
ON
w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
)
ORDER BY
frac_delayed ASC
Make Your Own BigQuery
Baseball data set: https://cloud.google.com/bigquery/public-data/baseball
games_wide: Every pitch, steal, or lineup event for each at bat in the 2016 regular season. games_post_wide: Every pitch, steal, or lineup event for each at-bat in the 2016 post season.
(Same schema for both)
Example queries:
"What types of pitches were thrown, how many, top speeds?"
"What was the at bat and pitching sequence for the fastest pitch(es) in the 2016 season?"
Github repo: https://github.com/charlesreid1/sabermetrics-bigquery