From charlesreid1

Deploying TensorFlow Models

Module 3: Scaling Machine Learning Models with Cloud ML Engine

Effective machine learning requires:

  • Larger data sets
  • More feature engineering
  • More complicated model architectures

Refactor the current taxi cab fare prediction machine learning model:

  • Read out of memory data
  • Make it easy to add new input features
  • Make the model evaluate as part of training

Scaling TensorFlow Models

Once you have a working TensorFlow model, you can scale it up to more machines and more data

Taking the written model and scaling it out to more machines is essentially just scripting via gcloud commands

Scaling the Training Process

Most machine learning frameworks can handle toy problems and in-memory data sets

But if data size becomes much larger, need to be able to split data into batches and run model on many machines (batching and distribution are important)

Also doing transformations:

  • Pre-processing (transformation, cropping, de-colorize, etc.)
  • Feature creation (combine features, eliminate features, transform features)
  • Train model (also, hyper-parameter tuning)

The need for the cloud again - if data set is large, need to do these transformations in the cloud, across many machines. Same with hyperparameter tuning - want to explore different model architectures, at scale.

Scaling the Prediction Process

When using the trained model, you still need scaling. To make predictions, you turn a model into a microservice (web application). TensorFlow Model - fit your estimator - then, to predict, take your estimator and call predict() on it (via Python).

  • Are all clients ("customers") in the code able to run in Python?
  • Will they all have access to the directory needed to construct the estimator object?
  • Will they know the feature columns you used to train the model?
  • The answer to all of these is, NO!
  • Deploy model as a microservice to serve as a layer between your client and the details of your machine learning model

Microservice architecture:

  • Need to shield clients from the details of the machine learning prediction details (including programming language, features used, etc)
  • If clients need a prediction from the model, they bundle everything into a REST API call (with all input variables needed by model)
  • Web service will take all input variables, convert them into tensors, send them to TensorFlow model, get results back, and convert them back to an API response (HTTP)
  • If you have millions of clients, and lots of requests coming in simultaneously, need to have a web service that can support this throughput
  • Weak link is the model evaluation step - this also needs to scale

Problems in training and problems in prediction are different.

Training problems: scaling out data and training process to more machines.

Prediction problems: scaling up prediction engine to handle high throughput and lots of clients

First generation TPU - primarily around prediction (inference) and doing prediction at scale - predicting/evaluating as fast as possible to handle user requests

Cloud ML Engine Workflow

Cloud ML Engine does both the prediction and training scaling. Focused on helping TensorFlow models scale up.

  • Start with CSV files
  • Explore datasets in Datalab using Pandas, matplotlib, etc.
  • Do transformations (preprocessing, feature creation, etc.) in Apache Beam (can handle batch or streaming data - that's the intent - convert everything to a Dataflow pipeline so that you can seamlessly switch from batch to streaming without changing your transformation pipeline into ML Engine)

Dataflow workflow:

  • Work on transformations using a local Apache Beam runner, ensure everything is working
  • Scale it up to larger data sets by using a Dataflow runner

Cloud ML workflow:

  • Work on neural network locally using TensorFlow/notebooks/etc., ensure everything is working
  • Scale it up to execute TF code on GCP using Cloud ML Engine

Packaging TensorFlow Models as Python Modules for Training

To scale up a TensorFlow model to run on Cloud ML Engine, need to package the model up as a Python module.

We then submit a TensorFlow code by submitting this Python module. The task.py and model.py parts are the key here.

taxifare/
taxifare/PKG-INFO
taxifare/setup.cfg
taxifare/setup.py
taxifare/trainer/
taxifare/trainer/__init__.py
taxifare/trainer/task.py
taxifare/trainer/model.py
taxifare/trainer.egg-info/
taxifare/trainer.egg-info/dependency_links.txt
taxifare/trainer.egg-info/PKG-INFO
taxifare/trainer.egg-info/SOURCES.txt
taxifare/trainer.egg-info/top_level.txt

The TensorFlow code we wrote goes into task.py and model.py (mostly model.py). When we tar up the directory structure above, we get a Python module.

What is in task.py

task.py:

  • contains a main method
  • parses command-line parameters
  • uses command line parameters to run the model

Example task.py:

Experiment(
    model.build_estimator(
        output_dir,
        embedding_size = embedding_size,
        hidden_units = hidden_units
    ),
    train_input_fn = model.generate_csv_input_fn( train_data_paths, ... ),
    eval_input_fn = model.generate_csv_input_fn( eval_data_paths, ... ),
    eval_metrics = model.get_eval_metrics(),
)

(Note that these refer to functions that must be defined in model.py, which we'll cover in a moment)

Then, use argument parsing to get train_data_paths, for example:

parser.add_argument( '--train_data_paths', required=True )
parser.add_argument( '--num_epochs', ... )
# etc...

This makes code executable as a program, and enables passing information into the program via command line arguments.

What is in model.py

model.py:

  • All code from previous chapter (estimator API, etc.) goes into model.py

We need to have a function that returns a function. This will take a filename as an argument (passed in via task.py), and then extract from it the TensorFlow stuff that's needed.

def generate_csv_input_fn( filename, num_epochs = None, ... ):

    def _input_fn():
        input_file_names = tf.train.match_filenames_once(filename)
        filename_queue = tf.train.string_input_producer(
                            input_file_names,
                            num_epochs = num_epochs,
                            shuffle = True
                        )
        reader = tf.TextLineReader()
        _, value = reader.read_up_to(filename_queue, num_records = batch_size)
        value_column = tf.expand_dims(value, -1)
        columns = tf.decode_csv( value_column, record_defaults = DEFAULTS)
        features = dict(zip(CSV_COLUMNS, columns))
        label = features.pop(LABEL_COLUMN)
        return features, label

    return _input_fn

Verifying the Package

To verify that the model package runs as expected, you can run the following test:

export $PYTHONPATH=${PYTHONPATH}:/path/to/taxifare
python -m trainer.task \
  --train_data_paths="/path/to/dataset/taxi-train*" \
  --eval_data_paths=/path/to/dataset/taxi-valid.csv \
  --output_dir=/path/to/outputdir \
  --num_epochs=10 \
  --job-dir=/tmp 

This simulates the way that the model is run in the cloud.

  • Python path variable tells python where to look for modules
  • The -m flag runs a module called trainer.task
  • The argparse settings pass the path information from the command line on to the program

Now that you know it works, how do you scale it up? Use gcloud command.

Running Packaged Model in the Cloud

Now you can use the gcloud command to submit the model - either locally, or in the cloud.

To run it locally, use "local train":

gcloud ml-engine local train \
    --module-name=trainer.task \
    --package-path=/path/to/taxifare/trainer \
    -- \
    --train_data_paths ... <the rest looks like it did above>

We are running this locally, passing it local directories to the package path, and local directories for the training data, &c.

To run the training task in the cloud, use "jobs submit":

gcloud ml-engine jobs submit \
    training $JOBNAME \
    region $REGION \
    --module-name=trainer.task \
    --job-dir=$OUTDIR \
    --staging-bucket=gs://$BUCKET \
    --scale-tier=BASIC \
    --train_data_paths ... <the rest looks like it did above>

Does the following:

  • Submits a training job in the cloud
  • Specifies the region (same region as where your data lives)
  • Specify module name for job/model
  • Specify bucket location to put temporary files
  • Scale tier specifies the scale of the resources used (BASIC/STANDARD/PREMIUM/GPU/etc...)

The scale tier determines the cost.

The workflow, again, is:

  • Try out the job locally, and pass it local module name/location
  • Then submit it to the cloud

We covered training, but what about prediction?

Cloud ML Engine for Prediction

For the training task, we had the following task.py:

Experiment(
    model.build_estimator(
        output_dir,
        embedding_size = embedding_size,
        hidden_units = hidden_units
    ),
    train_input_fn = model.generate_csv_input_fn( train_data_paths, ... ),
    eval_input_fn = model.generate_csv_input_fn( eval_data_paths, ... ),
    eval_metrics = model.get_eval_metrics(),
)

For prediction, we want to make slight modifications:

Experiment(
    model.build_estimator(
        output_dir,
        embedding_size = embedding_size,
        hidden_units = hidden_units
    ),
    train_input_fn = model.generate_csv_input_fn( train_data_paths, ... ),
    eval_input_fn = model.generate_csv_input_fn( eval_data_paths, ... ),

    export_strategies = [saved_model_export_utils.make_export_strategy( 
                model.serving_input_fn,
                default_output_alternative_key = None,
                exports_to_keep = 1
    )],

    eval_metrics = model.get_eval_metrics(),
)

This keeps 1 export (the best one). This also requires us to define a model_serving_input_fn(), which is the function that parses the JSON file that the client is sending when it requests the model be evaluated. It creates all of the input features that the model expects.

Example: this creates placeholders for each input column, and each column is a float32:

def serving_input_fn():
    feature_placeholders = {
            column.name : tf.placeholder(tf.float32, [None]) for column in INPUT_COLUMNS
    }

(This is just an example, could have virtually any kind of types for your input data.)

Once you've done that, it's time to deploy the trained model to Google Cloud Platform:

  • Can deploy a locally-trained, locally-built model
  • can deploy a trained model that is somewhere on a Google Cloud Storage bucket

Here is an example of submitting a model that is located in a Cloud Storage bucket:

MODEL_NAME="taxifare"
MODEL_VERSION="v1"
MODEL_LOCATION="gs://${BUCKET}/taxifare/smallinput/taxi_trained/export/Servo/..."

# Create a model
gcloud ml-engine models create ${MODEL_NAME} \
        --regions $REGION

# Create a new version of this model and where it lives
gcloud ml-engine versions create ${MODEL_VERSION} \
        --model ${MODEL_NAME} \
        --origin ${MODEL_LOCATION}

Creating multiple versions allows you to do A/B testing... send 80% of your traffic to version 1, 20% of your traffic to version 2, and gradually scale up one model version or the other.

Client interaction with model predictions

Now we cover how the client interacts with the model. Recall from above, client is sending JSON requests that go to the model. These calls are made via REST calls.

JSON request containing inputs (in a structure called request_data):

# Get credentials for user to make API calls
credentials = GoogleCredentials.get_application_default()
api = discover.build('ml', 'v1beta1', 
                    credentials = credentials,
                    discoveryServiceUrl = 'https://storage.googleapis.com/cloud-ml/discover/ml_v1beta1_discovery.json'
                    )

# Set the JSON file with model inputs
request_data = [ {  'pickup_longitude' : -73.800001,
                    'pickup_latitude'  :  40.700001,
                    'dropoff_longitude': -73.980001,
                    'dropoff_latitude' :  40.730001,
                    'passenger_count'  : 2
                }]

# Now assemble the URL to which to send the model inputs:
# Set the following information:
# - name of the project
# - name of the model
# - name of the version
parent = 'projects/%s/models/%s/versions/%s' % ( 'cloud-training-demos', 'taxifare', 'v1' )

# Make the API request (call the predict function)
response = api.projects().predict( body = { 'instances' : request_data,
                                            name = parent
                                }).execute()

Recall that we specified the model and version number when we ran gcloud ml-engine models create and gcloud ml-engine versions create:

MODEL_NAME="taxifare"
MODEL_VERSION="v1"
MODEL_LOCATION="gs://${BUCKET}/taxifare/smallinput/taxi_trained/export/Servo/..."


Scaling with Cloud Machine Learning Laboratory

Use a single-region bucket for machine learning training inputs and outputs - enables consistency (fast reading/writing from multiple threads)

The lab will accomplish the following:

  • Package a TensorFlow model
  • Run the training locally
  • Run the training on the cloud
  • Deploy the model to the cloud
  • Call the model to make predictions

Pick Up Here

TensorFlow Architecture for Out of Memory Learning

Back to the middle layer:

  • Reminder, these are the components that are useful when building custom NN models
  • tf.layers, tf.losses, tf.metrics


Recap of terminology:

  • We will store our data in multiple files
  • One step = going through single batch of training data once
  • One epoch = going through entire training data once

Reading data from out of memory:

  • To go through our data for 50 epochs, we just need to create a filename queue (from randomly shuffled filenames) that contains our file names 50 times each
  • Example: dealing with three files A, B, C: our filename queue should be B B C A ... (enqueue_many function)
  • Then we dequeue each file, one at a time, using a Reader (dequeue function)
  • The reader decodes the data and turns it into data
  • The data then goes into an Example Queue (using the enqueye function)
  • Why shuffle filenames and add them in random order? When doing distributed learning, we don't want to bias our learning process, or have one file cause a slowdown (on exact same machine each time)
  • Each Reader will be on a different machine; each Reader takes filenames from the queue, and creates an example queue (an example is an input plus a label)
  • Then, TensorFlow model reads data from the Example Queue

Reading a CSV file num_epochs times:

Start by setting labels for the columns in the files being read:

CSV_COLUMNS = ['fare_amount', 'pickuplon', 'pickuplat', ...]
LABEL_COLUMN = ['fare_amount']

# Now define default values that each value will take on
# (This keeps the ML model from choking if there are one or two missing pieces of data)
Defaults = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

Next, define an input function that will do a wildcard match, and assemble each filename and put it into the Filename Queue:

def input_fn():
    input_file_names = tf.train.match_filenames_once(filename)
    filename_queue = tf.train.string_input_producer( input_file_names, num_epochs=num_epochs, shuffle=True)

    # now make the Reader
    reader = tf.TextLineReader()
    _, value = reader.read_up_to(filename_queue, num_records = batch_size)

    value_column = tf.expand_dims(value, -1)
    columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
    features = dict(zip(CSV_COLUMNS, columns))

    # Take the one label item and pop it from features.
    # Assign the result to label, so now label is a dictionary too.
    label = features.pop(LABEL_COLUMN)

    return features, label


Reading CSV Files num_epochs times

In the input function:

  • Match all filenames (can have a wildcard, like train.*) or sharded files (train-00001-36, train-00002-36, etc)
  • Then, take those input files and repeat them (in a shuffled way) num_epochs times
  • Now, create the reader with TextLineReader() to read CSV files
  • Tell the reader to read a batch of records from the filename queue
  • This is just a line, so we use expand_dims() to make the scalar into a tensor
  • Then we do decode_csv to decode this as a comma-separated string
  • We need to tell TF what the datatypes are, and what to do if the value of the field is missing
  • We now have our values
  • But our features have to be a dictionary - where each column is an entry in the dict, with the key being the name of the column
  • Associate the field names with the tensor values to make it a dictionary (that's features). One key is fare_amount, next key is pickuplon, next is pickuplat, etc.
  • Each key has a tensor associated with it
  • Those are our features - except that fare_amount is the label column, and we aren't trying to predict it
  • saying features.pop(LABEL_COLUMN) tells TF to leave out the quantity we're trying to predict (as output) from the list of inputs
  • We then return the list of features (the dictionary of label:value for each feature) and the label column name

TextLineReader() can read from local files, or from GCS

What it is doing is:

  • Decoding CSV
  • Creating a dictionary of features
  • Creating a dictionary of labels (via features.pop(LABEL_COLUMN))
  • Returning features and labels

This decode_csv can be fed a CSV from a local disk, or from Google Cloud Storage

Next lab:

  • Refactor TensorFlow model
  • Read from a potentially large data set/file in batches
  • Do a wildcard match on filenames and feed them to a filename queue
  • Break up the one-to-one relationships between inputs and features (unclear what this means, exactly)

This will smooth the way to running this TensorFlow model at scale.


Refactoring the ML Model for Big Data

Link to lab: https://codelabs.developers.google.com/codelabs/dataeng-machine-learning/index.html?index=#7

Link to notebook: https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/machine_learning/tensorflow/c_batched.ipynb

First refactoring: reading input data in batch

The first refactoring addresses how the input data are being read. A filename queue is added to the TensorFlow graph, instead of reading the file directly into a Pandas dataframe. We pass the filename, and use this tf.train.match_filename_once() thing. We use a string producer to generate the (one single) filename over and over. We shuffle the input filename queue. We repeat each file num_epochs times. Here's the whole mess:

def read_dataset(filename, num_epochs=None, batch_size=512, mode=tf.contrib.learn.ModeKeys.TRAIN):
  def _input_fn():
    input_file_names = tf.train.match_filenames_once(filename)
    filename_queue = tf.train.string_input_producer(
        input_file_names, num_epochs=num_epochs, shuffle=True)
    reader = tf.TextLineReader()
    _, value = reader.read_up_to(filename_queue, num_records=batch_size)

    value_column = tf.expand_dims(value, -1)
    columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
    features = dict(zip(CSV_COLUMNS, columns))
    label = features.pop(LABEL_COLUMN)
    return features, label

  return _input_fn


Second refactoring: treat input data and features as different

The second refactoring addresses the way we turn input data into features. They refactor this so that they are specifically extracting the input variables in one step, then explicitly specifying the model features in another, separate step. What they mean by "break the one-to-one relationship between inputs and features" is, we aren't forced to use the input data and only the input data as our model features. Once we change the way the input data is loaded (i.e., if we don't read data straight from the input file into the model), we can transform input variables, leave certain input variables out of the model, normalize them, combine them together, etc.

Third refactoring: Move model evaluation into training loop

The problem with the notebook, as is, is that we're specifying a number of epochs. Instead, we want to evaluate the model as we go, and stop when we reach a criteria.

(This will happen in the next lab.)

Also a checkpointing problem - we save checkpoints during the training, and use the final checkpoint as the final model. (Discussion of overfitting - we may not want the last step, because it may be overfit.) This will also be improved by stopping the model training when we reach some error criteria.

Train the model on the training data set, and every few steps, stop and assess RMSE on the validation data set. Stop when the RMSE on the validation data set starts to increase (indicates we're overfitting).

What to improve further?

Handle machine failure in distributed training - what if something goes wrong? Want to be able to pick up training wherever we left off.

Monitor training - especially useful if training is expected to take a very long time. Answer questions like, which epoch are we on, what is the current RMSE, etc.

Choose a model based on the validation data set - use a smarter stopping criteria than number of epochs.

How much does a reasonably realistic machine learning model cost?

It will cost a few thousand dollars for a reasonably realistic model

Module 4: Feature Engineering

References

Flags