How to use the estimator API with TensorFlow, a practical example.

TensorFlow is an open-source library used for the creation of Machine Learning models developed by Google. It is characterized by its flexibility and offers a wide range of tools for its implementation, such as the Estimator API or Estimators API.

This API reduces the standard code needed to prepare a TensorFlow model. But the best way to understand how it works is through a practical example. That's why we have chosen the work we did for one of our clients to tell you about it.

This customer, a world leader in security solutions, offers a wide range of solutions aimed at numerous industries and customer segments, from small businesses to large industrial complexes. One of its most prominent services is the Operations Centre, also known as SOC, which aims to provide cutting-edge and differentiating solutions through the most innovative technologies and continuous monitoring of its customers.

Among other things, the SOC is responsible for receiving and managing fault messages on its monitoring devices. Within the notification management process, there is an operator in the SOC who has to decide where to send the notification. A room technician can be dispatched to resolve the problem remotely, or a field technician can be dispatched to solve the issue by physically moving to the damaged device.

This solution aims to generate a reliable predictive model, which in the future could allow us to create a tool capable of assisting the SOC operator in deciding where to route each fault, in order to improve their success rates and therefore generate savings for our customers.

Data pipeline: data analysis, processing and model training

One of the most important and difficult steps in a data project is data acquisition, especially when it comes to .csv files. It must be handled in an ETL process to be used correctly. In a high-level overview, the flow of the data pipeline:

google-tensorfloe

  • The original dataset is loaded into a container in .csv files.
    Once the data is stored in the cloud storage, a cloud function is activated as part of the ETL process. The cloud function (available here) is structured for different purposes, but the most important process can be described in the screenshot below:
# Phase of analysis of the data set.
    
	df = preprocess_dataset(df)
    
	# Processing phase of the data set.
    
	df = analyze_and_process_dataset(df, bucket, local_tmp_path, column_codes_path, dataset_training_path, dataset_original_name)
    
	# Model training on ML Engine
    
	train_model_in_ml_engine(
    	project_name,
    	'gs://' + bucket_name + '/' + ml_engine_job_info_path,
    	ml_engine_package_uris.split(','),
    	'gs://' + bucket_name + '/' + dataset_training_path + 'data-' + dataset_original_name,
    	'gs://' + bucket_name + '/' + dataset_training_path + 'test-' + dataset_original_name)

As can be seen, data pre-processing is performed first, whereby panda data frame files are generated and stored in Cloud Storage (step 3). Then, that new pre-processed dataset is analyzed, processing the dataset, storing the column coding files and datasets in Cloud Storage (step 4) to ensure that model training and evaluation can be performed (step 5).

All cloud functions are based on previous work done in local notebooks via datalab.

The first important step of the cloud function is the first data preprocessing where:

  • Some unnecessary fields are removed.
# Drop columns with very scattered initial values.

df = df.drop(['alarm_incident_full_clear_opactdisp_id', 'alarm_incident_full_clear_emp_no', 'event_history_test_seqno', 'site_install_servco_no'], axis=1)
df = df.reset_index(drop=True)
# Drop columns whose content does not contribute

df = df.drop(['event_history_event_id', 'alarm_incident_status', 'comment'], axis=1)
  • Some other fields are pre-processed (filling in some missing fields by default values or being formatted):
df['event_history_event_id'] = df['event_history_event_id'].str.strip()
  # Replacing and drop empty strings with NaN values

             df['alarm_incident_delay_seconds'].replace('', np.nan, inplace=True)
	df['system_systype_id'].replace('', np.nan, inplace=True)
	df['site_site_no'].replace('', np.nan, inplace=True)
	df['site_sitetype_id'].replace('', np.nan, inplace=True)
	df['site_sitestat_id'].replace('', np.nan, inplace=True)
	df['alarm_incident_status'].replace('', np.nan, inplace=True)
	df['system_Nzonas'].replace('', np.nan, inplace=True)
	df['site_Nvias'].replace('', np.nan, inplace=True)
	df['site_cspart_no'].replace('', np.nan, inplace=True)
	df['site_siteloc_id'].replace('', np.nan, inplace=True)
	df['alarm_incident_alarminc_no'].replace('', np.nan, inplace=True)
	df['comment'].replace('', np.nan, inplace=True)
# Remove blank spaces from columns with identifiers

	df['system_systype_id'] = df['system_systype_id'].str.strip()
	df['site_sitetype_id'] = df['site_sitetype_id'].str.strip()
	df['site_sitestat_id'] = df['site_sitestat_id'].str.strip()
	df['event_history_event_id'] = df['event_history_event_id'].str.strip()
	df['site_siteloc_id'] = df['site_siteloc_id'].str.strip()

	df = df.reset_index(drop=True)
  • In the process of analyzing the dataset, new variables are derived from the original data.
# Processing the date to obtain the day of the week, month and season of the year

	df['event_history_event_date'] = pd.to_datetime(df['event_history_event_date'])
	df['day_of_week'] = df['event_history_event_date'].dt.weekday_name

	df['month'] = df['event_history_event_date'].dt.month

	df = df.drop(list(df.filter(regex='date')), axis=1)

	df['season'] = 3

	df.loc[(df['month']==3) | (df['month']==4) | (df['month']==5), 'season'] = 0
	df.loc[(df['month']==6) | (df['month']==7) | (df['month']==8), 'season'] = 1
	df.loc[(df['month']==9) | (df['month']==10) | (df['month']==11), 'season'] = 2
  • In the dataset process, column coding files and datasets are stored in the cloud storage so that, training and evaluation of the model could be performed.
# Encodings of text variables to unique identifiers and storage in Cloud Storage for use by other CFs
    
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'system_systype_id', column_codes_path, 'systypeId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_sitetype_id', column_codes_path, 'sitetypeId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_sitestat_id', column_codes_path, 'sitestatId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_siteloc_id', column_codes_path, 'sitelocId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'day_of_week', column_codes_path, 'dayofweek_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_site_no', column_codes_path, 'siteNo_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_cspart_no', column_codes_path, 'cspartNo_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'system_system_no', column_codes_path, 'systemNo_cod.pkl')

	df['system_systype_id'] = df['system_systype_id_coded']
	df['site_sitetype_id'] = df['site_sitetype_id_coded']
	df['site_sitestat_id'] = df['site_sitestat_id_coded']
	df['site_siteloc_id'] = df['site_siteloc_id_coded']
	df['day_of_week'] = df['day_of_week_coded']
	df['site_site_no'] = df['site_site_no_coded']
	df['site_cspart_no'] = df['site_cspart_no_coded']
	df['system_system_no'] = df['system_system_no_coded']

	df = df.drop('system_systype_id_coded', axis=1)
	df = df.drop('site_sitetype_id_coded', axis=1)
	df = df.drop('site_sitestat_id_coded', axis=1)
	df = df.drop('site_siteloc_id_coded', axis=1)
	df = df.drop('day_of_week_coded', axis=1)
	df = df.drop('site_site_no_coded', axis=1)
	df = df.drop('site_cspart_no_coded', axis=1)
	df = df.drop('system_system_no_coded', axis=1)

Once the data is analyzed, 85% of the dataset is sent to Cloud Storage to be used as training data and the rest to be used as evaluation data.

   # Generation of the training and test set for the model
    
	cut = int(np.round(0.85 * df.shape[0]))
    
	upload_in_csv_to_cloud_storage(df.iloc[0:cut], bucket, local_tmp_path, datasets_training_path, 'data-' + datasets_training_name)
	upload_in_csv_to_cloud_storage(df.iloc[cut+1:df.shape[0]], bucket, local_tmp_path, datasets_training_path, 'test-' + datasets_training_name)

Finally, to verify that the current model is still valid for the new dataset, and there are no variables that need to be revised, we created and submitted a basic model training job (training type) back to ML Engine with the training and generated datasets for testing purposes.

 project_id = 'projects/{}'.format(project_name)
    
	job_name = 'origen_ticketing_tf_pipeline_trigger_' + time.strftime('%Y%m%d_%H%M%S')
    
	training_inputs = {
    		'runtimeVersion': '1.10',
    		'jobDir': job_dir + job_name,
    		'packageUris': package_uris,
    		'pythonModule': 'trainer.task',
    		'region': 'europe-west1',
   		 'args': [
        			'--train-file', train_file,
        			'--eval-file', eval_file
    		]}
    
	job_spec = {'jobId': job_name, 'trainingInput': training_inputs}
    
	cloudml = discovery.build('ml', 'v1')
    
	request = cloudml.projects().jobs().create(body=job_spec, parent=project_id)
    
	try:
    		response = request.execute()
	except HttpError as err:
    		# Do whatever error response is appropriate for your application.
    		# For this example, just send some text to the logs.
    		# You need to import logging for this to work.
    		logging.error('There was an error creating the training job. Check the details:')
    		logging.error(err._get_reason())

Model in TensorFlow

To generate a learning model capable of representing the aforementioned data and making predictions accordingly, we decided to use the pre-built estimators in TensorFlow. In particular, we used the DNNClassifier class to create a deep neural network classifier based on Tensorflow. This estimator has been chosen because some previous tests were performed on local machines and neural networks gave better results than boosting tree algorithms, among others. Regarding other DNN-based techniques, tests have been performed with DNNRegressor which also allows us to model deep neural networks, but also gave worse results in our case.

Therefore, the first step was to develop a complete library or application with the code structure containing all the necessary functions. For this, we used the Google Cloud template for ML applications. These functions deal with the definition of the data structure, some data preprocessing (selection of data types, transformation of categorical variables...), the creation of the corresponding deep neural network schema, the design of the hyperparameter setting evaluation and finally the training of the neural network and its evaluation on the test set. Our code contains the following functions and directory structure:

modelo-tensorflow

  • trainer: subdirectory to store the main module of the application.
  • init__.py: file ID used by the configuration tools to identify directories with code to package.
  • featurizer.py: the different characteristics of the data are treated to convert them to the appropriate types and structures, some transformations...
  • input.py: the input files for the training and evaluation sets are collected from the corresponding storage repositories and transformed into TensorFlow format.
  • metadata.py: some parameters necessary for the execution of the code structure are encoded in this function.
  • model.py: the machine learning algorithm is chosen and its architecture is designed. In addition, training and evaluation experiments are run here.
  • task.py: the arguments are parsed from the console and the main function resides here.
  • config.yaml: contains the different test hyperparameter settings when training your model to maximize the predictive accuracy of your model.
  • sample.py: script to generate sample JSON instances to test predictions online.
  • setup.py: file with module information.

Regarding the neural network architecture definition (based on DNNClassifier), some decisions have been made. In the case of the optimizer, Adagrad has been chosen due to data sparsity and because its improved robustness compared to the traditional Stochastic Gradient Descent.

The activation functions consist of rectified linear functions for the middle layers, which is the most accepted in the literature, and the logistic function in the last layer (to do the classification).

In addition, dropout is being used for network regularization. Finally, some hyperparameters have been adjusted with the implementation of the Google Bayesian optimization algorithm: the number of layers in the network, the size scaling factor between layers, the learning rate.

dnn_optimizer = tf.train.AdagradOptimizer(learning_rate = task.HYPER_PARAMS.learning_rate)

estimator = tf.estimator.DNNClassifier(
	
	n_classes = len( metadata.TARGET_LABELS),
	label_vocabulary = metadata.TARGET_LABELS,

	feature_columns = deep_columns,
	optimizer = dnn_optimizer,

	weight_column = metadata.WEIGHT_COLUMN_NAME,

	hidden_units = construct_hidden_units(),
	activation_fn = tf.nn.relu
	dropout = task.HYPER_PARAMS.dropout_prob,

	config = config,
)

Tuning the model with hyperparameters

The Data flow's result was a final dataset with 173679 rows and 49 columns, including the label. It is well-known that it is good practice to split the original dataset into training, evaluation and test sets to work with independent data for training, hyperparameter tuning and evaluation of invisible data. Therefore, the original data was split into 85% for training and evaluation (for hyperparameter tuning) and 15% for testing.

To perform hyperparameter tuning, a custom machine has been used because the setup has been optimized step by step to reduce the total processing time used by the task. The aim of the algorithm is to maximize the accuracy of the model, using an early stop to avoid over-fitting and with 5 attempts at most, as for higher attempts, we have verified that no large increase in success is obtained. The search range of the different variables to be optimized has been chosen according to previous experiences and knowledge of their behavior, resulting in the following configuration:

Where we want to detail the following configurations:

trainingInput:
	scaleTier: CUSTOM
	masterType: large_model
	workerType: complex_model_m
	parameterServerType: large_model
	workerCount: 10
	parameterServerCount: 5
	hyperparameters:
		goal: MAXIMIZE
		hyperparameterMetricTag: accuracy
		enableTrialEarlyStopping: True
		maxTrials: 5
		maxParallelTrials: 2

		params:
parameterName: num-layers
	type: DISCRETE
	discreteValues:
2
3
4
	scaleType: UNIT_LINEAR_SCALE
parameterName: layer-sizes-scale-factor
	type: DOUBLE
	minValue: 0.2
	maxValue: 0.8
	scaleType: UNIT_LINEAR_SCALE
parameterName: learning-rate
type: DOUBLE
minValue: 0.0001
maxValue: 0.01
scaleType: UNIT_LOG_SCALE

As for the execution of "--scale-level STANDARD_1" and "--scale-tier BASIC", they take a long time to complete the task and, in some cases, return errors in the workers for permissions and memory. So we decided to customize the workgroups with:

  • Master as a large model type, a machine with a lot of memory, especially suitable for parameter servers when their model is large.
  • Worker as type complex_model_m, a machine with about twice as many cores and twice as much memory as complex_model_s.
  • The Workers count is the number of Worker replicas used for the training job. We have configured 10 Workers to support the maximum number of training (maxTrial) for the configured parallel execution (maxParallelTrials) without deadlocks.
  • The number of parameter server replicas to use for the training job. We have configured 5 servers to support the maximum amount of training (maxTrial).
  • The objective (goal) of the algorithm is to maximize the accuracy of the model (hyperparameterMetricTag), using early stopping (enableTrialEarlyStopping) to avoid overfitting and with 5 attempts at most (maxTrials).

The search range for the different variables to be optimized (layer number, layer size scaling factor, learning rate) has been chosen according to previous experiences and knowledge of their behavior.

After completing the maximum number of trials, 5, an optimized model with the following final hyperparameters has been obtained:

“hyperparameters”: {
	“num-layers”: “4”,
	“learning-rate”: “0.0014479715888709816”,
	“layer-sizes-scale-factor”: “0.75728686112411281”
},

Y los siguientes resultados finales (con el máximo de 30 pruebas que hemos ejecutado, solo se mejoró el 1.5 por ciento):

“finalMetric”: {
	“trainingStep”: “1006”,
	“objectiveValue”: 0.625320017338
}

Model training using cloud ML Engine

Commands must be sent through the console to load the training jobs and tuning hyperparameters to the ML Engine, in order to be effective in Google Cloud. First, the main variables are defined:

DATE=`date '+%Y%m%d_%H%M%S'`
export JOB_NAME=origen_ticketing_tf_$DATE
export GCS_JOB_DIR=gs://com-soc-projectdata/OrigenTicketingTF/jobinfo/$JOB_NAME

export TRAIN_FILE=gs://com-soc-projectdata/OrigenTicketingTF/dataset-training/data-mmm.csv
export EVAL_FILE=gs://com-soc-projectdata/OrigenTicketingTF/dataset-training/test-mmm.csv

export HPTUNING_CONFIG=config.yaml

export TRAIN_STEPS=2000
export EVAL_STEPS=500

export REGION=europe-west1

The GCloud commands that are sent for hyperparameter tuning jobs are:

gcloud ml-engine jobs submit training $JOB_NAME \
    --stream-logs \
    --runtime-version 1.10 \
    --config $HPTUNING_CONFIG \
    --job-dir $GCS_JOB_DIR \
    --module-name trainer.task \
    --package-path trainer/ \
    --region $REGION \
    -- \
    --train-file $TRAIN_FILE \
    --eval-file $EVAL_FILE \
    --train-steps $TRAIN_STEPS \
    --eval-steps $EVAL_STEPS 

The GCloud commands used for sending without hyperparameters are these:

gcloud ml-engine jobs submit training $JOB_NAME \
    --stream-logs \
    --runtime-version 1.10 \
    --job-dir $GCS_JOB_DIR \
    --module-name trainer.task \
    --package-path trainer/ \
    --region $REGION \
    -- \
    --train-file $TRAIN_FILE \
    --eval-file $EVAL_FILE \
    --train-steps $TRAIN_STEPS \
    --eval-steps $EVAL_STEPS 

Model evaluation

Once the optimized classification model was trained, it was necessary to test and validate its performance. For such a task, the model has been evaluated on the test set (15% of the full dataset) with a final accuracy of 62.53%.

Considering the results achieved in this particular study, and considering other tests that were performed on a local machine using different values for the parameters and other learning methods and input features... The results seem quite good, but there is room for improvement. In this case, It could be done a bit more feature engineering, making use of the implemented TensorFlow capabilities to deal directly with categorical variables. On the other hand, the DNNLinearCombined classifier looks quite promising, according to the literature, so testing it could be a good option.

Moreover, the results are fine compared to the current operation when making decisions about the origin of a problem for our real case based on the demonstration, which has an accuracy of 50%.

The model must be loaded with the necessary console commands to implement the model in ML Engine to make new predictions. Thus, the prediction service is set up:

export MODEL_BINARIES=$GCS_JOB_DIR/export/estimator/<timestamp>

gcloud ml-engine versions create <version> --model origen_ticketing_tf --origin $MODEL_BINARIES --runtime-version 1.10

Inspect binary models with the SavedModel CLI TensorFlow ships with a CLI that allows you to inspect the signature of exported binary files. To do this run:

saved_model_cli show --dir $MODEL_BINARIES --tag serve --signature_def predict

Online forecasting with Cloud-Fn and Web Client

The final stage covered in the current Demo is related to the learning model's implementation to make it available for online predictions.

First, create a processed sample of the test data:

python sample.py $EVAL_FILE sample.json

It then sends the prediction requests to the API. To test this, you can use the prediction tool.

gcloud ml-engine predict --model origen_recomm_tf --version <version> --json-instances sample.json

At last, you should see a response with the predicted tags from the examples!

CLASS_IDS  CLASSES    LOGISTIC                          LOGITS                             PROBABILITIES
[0]                  [u'0.0']         [0.3971346318721771]  [-0.417418509721756]  [0.6028653383255005, 0.3971346318721771]

It is important to note that the model used in ML Engine, which is the one exposed to the user to support scalability, is directly linked to that in Cloud Storage.

As we said earlier, the input variables used in the model are inferred by those entered by the user through the user interface. That means we needed to translate the initial ones to the latter.

In one of the previous steps, we decided to keep some coding tables to be able to perform that translation, taking into account the full context of the dataset. Therefore, we defined a custom function in Cloud Functions responsible for performing the aforementioned task. In the following, we will explain the most representative parts of the function:

a) We read the request parameters: which consist of reading the inputs provided by the user through the user interface.

request_json = request.get_json()
alarm_delay = request_json['alarm_incident_delay_seconds']
system_no = request_json['system_system_no']
site_no = request_json['site_site_no']
n_zonas = request_json['system_Nzonas']
n_vias = request_json['site_Nvias']
cspart_no = request_json['site_cspart_no']
alarminc_no = request_json['alarm_incident_alarminc_no']
rec_date = request_json['event_history_event_date']
systype_id = request_json['system_systype_id']
sitetype_id = request_json['site_sitetype_id']
sitestat_id = request_json['site_sitestat_id']
siteloc_id = request_json['site_siteloc_id']

b) We load the coding tables, which contain the equivalences between the original values and the coded values.

blob = bucket.blob('newModel_fromV3/dayofweek_cod.pkl')
	blob.download_to_filename('/tmp/dayofweek_cod.pkl')    
	dow_cod = pandas.read_pickle('/tmp/dayofweek_cod.pkl')
    
	blob = bucket.blob('newModel_fromV3/sitelocId_cod.pkl')
	blob.download_to_filename('/tmp/sitelocId_cod.pkl')    
	siteloc_cod = pandas.read_pickle('/tmp/sitelocId_cod.pkl')
    
	blob = bucket.blob('newModel_fromV3/sitestatId_cod.pkl')
	blob.download_to_filename('/tmp/sitestatId_cod.pkl')    
	sitestat_cod = pandas.read_pickle('/tmp/sitestatId_cod.pkl')
    
	blob = bucket.blob('newModel_fromV3/sitetypeId_cod.pkl')
	blob.download_to_filename('/tmp/sitetypeId_cod.pkl')    
	sitetype_cod = pandas.read_pickle('/tmp/sitetypeId_cod.pkl')
    
	blob = bucket.blob('newModel_fromV3/systypeId_cod.pkl')
	blob.download_to_filename('/tmp/systypeId_cod.pkl')    
	systype_cod = pandas.read_pickle('/tmp/systypeId_cod.pkl')

c) We obtain and store the encoded values and generate new variables when necessary: as is the case with the inferred values of the date variable.

# Get the "system_systype_id" codification    
	syst_c = systype_cod[systype_cod['system_systype_id'] == str(systype_id)]
	new_syst = -1 # Default
	if syst_c.size > 0:
    		new_syst = syst_c.iloc[0]['system_systype_id_coded']
    
	# Get the "site_sitetype_id" codification    
	sitet_c = sitetype_cod[sitetype_cod['site_sitetype_id'] == str(sitetype_id)]
	new_site = -1 # Default
	if sitet_c.size > 0:
    		new_site = sitet_c.iloc[0]['site_sitetype_id_coded']
    
	# Get the "site_sitestat_id" codification    
	sitest_c = sitestat_cod[sitestat_cod['site_sitestat_id'] == str(sitestat_id)]
	new_sitest = -1 # Default
	if sitest_c.size > 0:
    		new_sitest = sitest_c.iloc[0]['site_sitestat_id_coded']
    
	# Get the "site_siteloc_id" codification    
	sitel_c = siteloc_cod[siteloc_cod['site_siteloc_id'] == str(siteloc_id)]
	new_sitel = -1 # Default
	if sitel_c.size > 0:
    		new_sitel = sitel_c.iloc[0]['site_siteloc_id_coded']
# Get the "event_history_event_date" codification
    
	dateDF = pandas.DataFrame(columns=['date'])
	dateDF.loc[0] = [rec_date]
	day = pandas.to_datetime(dateDF['date']).dt.weekday_name.iloc[0]
	month = pandas.to_datetime(dateDF['date']).dt.month.iloc[0]
    
	day_c = dow_cod[dow_cod['day_of_week'] == day]
	new_day = day_c.iloc[0]['day_of_week_coded']
    
	season=-1
	if ((month==3) or (month==4) or (month==5)):
    		season = 0
	else:
    		if ((month==6) or (month==7) or (month==8)):
        			season = 1
    		else:
        			if ((month==9) or (month==10) or (month==11)):
            				season = 2
        			else:
            				season = 3

d) We create the new instance: which will be used to obtain a prediction of the model.

instance = [str(alarm_delay), str(system_no), str(site_no), str(n_zonas), str(n_vias), str(cspart_no), str(alarminc_no), str(month), str(season), str(new_syst), str(new_site), str(new_sitest), str(new_sitel), str(new_day)]

e) We send the prediction request.

prediction = send_to_predict_in_ml_engine('securitas-ml-starter', 'origen_ticketing_sl', instance, 'v3')

f) We return the result given by the model.

# Return the result of the predictions
	if 'error' in prediction:
    		return (flask.jsonify(prediction['error']), 500, headers)
       	 
	return (flask.jsonify(prediction['predictions']), 200, headers)

 

We want to help you achieve your digital objectives. Let's talk!

fondo-footer
base pixel px
Convert
Enter PX px
or
Enter EM em
Result