4 April, 2019 Implementing a custom estimator in TensorFlow Our client is an international aerospace engineering company that provides procurement, testing and distribution services for advanced electronic components and systems. Our client is an international aerospace engineering company that provides procurement, testing and distribution services for advanced electronic components and systems. Our client's challenge is to provide a usable system for sorting and searching through the more than 15 million components in its database. So we implemented a recommendation system that, by applying matrix factorization techniques in Tensorflow, makes suggestions to the user based on the browsing of other similar users. Basically, the system can learn from the navigation of past users to facilitate the navigation of future users. Pipeline, analysis and data pre-processing The first challenge was to work with aggregated data as the customer information was originally provided through .json files containing nested fields. Therefore, we had to convert all this data into a format that would allow us to work easily and flexibly. The biggest initial problem was that we were receiving data from different servers per day. The company has more than one server in production on an internal auto-scaling solution, so we could get N .json files every day. With that data, the company wanted to get two things: A dashboard with analytics and visualizations on the search trends performed. A component recommender for users. One of the most important and difficult steps in a Machine Learning project is the data acquisition process. In this case for two reasons: because they come in .json files from different servers, but also because they contain some confidential data. Therefore, the data first had to be processed to anonymize it and then transformed for later use. 1. From each server, each day a .json file is uploaded with all the information. This information is confidential so, the GCP console administrator can only access this group following security considerations. 2. Then, a cloud function is activated to anonymize the data and ensure that confidential data is not handled. Once the .json files are de-identified, they are stored in another cloud storage (step 3). During this process, the data is not transformed, the only purpose of the cloud function is to de-identify the data performing the primitive transformation in some fields ( IP, email...) by using the cloud data loss prevention API. # Construct deidentify configuration dictionary deidentify_config = { 'info_type_transformations': { 'transformations': [ { "primitive_transformation": { "crypto_hash_config": { "crypto_key": { "unwrapped": { "key": crypto_hash_key } } } } } ] } } # Construct item item = {'value': content} # Call the API response = dlp.deidentify_content( parent, inspect_config=inspect_config, deidentify_config=deidentify_config, item=item)3. In this step, there are 2 cloud functions because, once the data is de-identified, the company had two different needs. Therefore, each cloud function performs the necessary ETL process for each objective, and the processed data is stored later in BigQuery (Step 5) because we have to keep adding data and also one of them is being used from Data Studio (search statistics) and the other one is being used from the Data Model. a) The first cloud function transforms and stores the data related to the searches. So Data studio can be used with that information. client = bigquery.Client() rows_to_insert = [ (instance['jsessionId'], instance['sessionId'], instance['date'], instance['time'], instance['refererURL'], instance['origin'], instance['searchPosition'], instance['searchId'], instance['ip'], instance['application_quality'], instance['application_radiationMinimum'], instance['application_radiationMaximum'], instance['application_typeOfSatellite'], instance['geoIp_regionCode'], instance['geoIp_countryName'], instance['geoIp_status'], instance['geoIp_continentCode'], instance['geoIp_regionName'], instance['geoIp_countryCode'], instance['geoIp_longitude'], instance['geoIp_latitude'], instance['geoIp_city'], instance['device_os'], instance['device_manufacturer'], instance['device_group'], instance['device_type'], instance['device_browser_renderingEngine'], instance['device_browser_manufacturer'], instance['device_browser_name'], instance['device_browser_majorVersion'], instance['device_browser_minorVersion'], instance['device_browser_group'], instance['device_browser_type'], instance['device_browser_version'], instance['originalComponent_componentNumber'], instance['originalComponent_style'], instance['originalComponent_family'], instance['originalComponent_maker'], instance['originalComponent_familyPath'], instance['originalComponent_familyId'], instance['originalComponent_componentId'], instance['originalComponent_nasaGroup'], instance['originalComponent_nasaSection'], instance['originalComponent_qualified'], instance['relatedComponent_componentNumber'], instance['relatedComponent_style'], instance['relatedComponent_family'], instance['relatedComponent_maker'], instance['relatedComponent_familyPath'], instance['relatedComponent_familyId'], instance['relatedComponent_componentId'], instance['relatedComponent_nasaGroup'], instance['relatedComponent_nasaSection'], instance['relatedComponent_qualified'], instance['user_userId'], instance['user_email']) ] # API request table_ref = client.dataset(bq_dataset).table(bq_table) table = client.get_table(table_ref) client.insert_rows(table, rows_to_insert)b) The second cloud function transforms and stores the data related to each component so that it can be used as a model dataset. client = bigquery.Client() rows_to_insert = [ (instance['jsessionId'], instance['sessionId'], instance['date'], instance['time'], instance['refererURL'], instance['origin'], instance['searchPosition'], instance['searchId'], instance['ip'], instance['application_quality'], instance['application_radiationMinimum'], instance['application_radiationMaximum'], instance['application_typeOfSatellite'], instance['geoIp_regionCode'], instance['geoIp_countryName'], instance['geoIp_status'], instance['geoIp_continentCode'], instance['geoIp_regionName'], instance['geoIp_countryCode'], instance['geoIp_longitude'], instance['geoIp_latitude'], instance['geoIp_city'], instance['device_os'], instance['device_manufacturer'], instance['device_group'], instance['device_type'], instance['device_browser_renderingEngine'], instance['device_browser_manufacturer'], instance['device_browser_name'], instance['device_browser_majorVersion'], instance['device_browser_minorVersion'], instance['device_browser_group'], instance['device_browser_type'], instance['device_browser_version'], instance['originalComponent_componentNumber'], instance['originalComponent_style'], instance['originalComponent_family'], instance['originalComponent_maker'], instance['originalComponent_familyPath'], instance['originalComponent_familyId'], instance['originalComponent_componentId'], instance['originalComponent_nasaGroup'], instance['originalComponent_nasaSection'], instance['originalComponent_qualified'], instance['relatedComponent_componentNumber'], instance['relatedComponent_style'], instance['relatedComponent_family'], instance['relatedComponent_maker'], instance['relatedComponent_familyPath'], instance['relatedComponent_familyId'], instance['relatedComponent_componentId'], instance['relatedComponent_nasaGroup'], instance['relatedComponent_nasaSection'], instance['relatedComponent_qualified'], instance['user_userId'], instance['user_email']) ] # API request table_ref = client.dataset(bq_dataset).table(bq_table) table = client.get_table(table_ref) client.insert_rows(table, rows_to_insert)4. As mentioned above, the main reason for storing the processed data in BigQuery is because: (a) The dataset is used in Data Studio, and we needed to perform SQL-like queries. b) For the model, Cloud Storage was an option, but the main reason that we needed to aggregate data and BigQuery allows it. Data selection and exploration Once the data is stored, a cloud function is triggered as part of the ETL process. The cloud function is structured for different purposes, but the most important process can be described in the screenshot below: # Load dataset from BigQuery df = pd.io.gbq.read_gbq("""SELECT * FROM dataset.component""", project_id=project_name) # Phase of analysis of the data set. df = preprocess_dataset(df,threshold) # Processing phase of the data set. df = analyze_and_process_dataset(df, bucket, local_tmp_path, column_codes_path, dataset_training_path, dataset_original_name, percenttrain) # Model training on ML Engine train_model_in_ml_engine( project_name, 'gs://' + bucket_name + '/' + ml_engine_job_info_path, ml_engine_package_uris.split(','), 'gs://' + bucket_name + '/' + dataset_training_path + 'data-' + dataset_original_name, 'gs://' + bucket_name + '/' + dataset_training_path + 'test-' + dataset_original_name)As you can see, first the data is imported and pre-processed, removing columns with many null values. Then, the new pre-processed dataset is parsed and prepared, which encodes the user and component identifiers into sorted numbers and stores the data frame files for training and evaluation in cloud storage. Finally, the recommender system is trained in ML Engine. Data pre-processing with cloud function The first important step of the cloud function is the first data preprocessing where empty strings are converted to values: Empty strings are converted into missing values. # Convert empty strings to missing values df_initial = df.replace('',np.nan, regex=True) Columns with more missing values than the desired threshold are deleted. # Obtain number of missing values tab_info=pd.DataFrame(df_initial.dtypes).T.rename(index={0:'column type'}) tab_info=tab_info.append(pd.DataFrame(df_initial.isnull().sum()).T.rename(index={0:'null values (number)'})) tab_info=tab_info.append(pd.DataFrame(df_initial.isnull().sum()/df_initial.shape[0]*100).T.rename(index={0:'null values (%)'})) # Remove columns with more missing values than a pre-defined percentage threshold data_colOut = df_initial.copy() for x in range(0, len(df_initial.columns.values)): if tab_info.values[2][x]>float(threshold): data_colOut = data_colOut.drop(df_initial.columns.values[x], axis=1) Other unwanted columns are removed. # Remove undesired features toRemove = ['device_browser_type', 'device_browser_group', 'device_browser_minorVersion', 'device_browser_name', 'device_browser_manufacturer', 'device_browser_majorVersion', 'device_browser_version','device_browser_renderingEngine'] data4Query = data_colOut.drop(toRemove, axis=1) If you want to know a practical example, we recommend you to read our post "A practical example of how to use the estimator api with TensorFlow".