4 Abril, 2019 Implementación de un estimador personalizado en TensorFlow Nuestro cliente es una empresa internacional de ingeniería aeroespacial que brinda servicios de adquisición, pruebas y distribución de componentes y sistemas electrónicos avanzados. Nuestro cliente es una empresa internacional de ingeniería aeroespacial que brinda servicios de adquisición, pruebas y distribución de componentes y sistemas electrónicos avanzados. El reto de nuestro cliente es el de ofrecer un sistema usable de clasificación y búsqueda entre los más de 15 millones de componentes que tiene en su base de datos. Así que implementamos un sistema de recomendación que, aplicando técnicas de factorización matricial en Tensorflow, hace sugerencias al usuario basadas en la navegación de otros usuarios similares. Básicamente, el sistema puede aprender de la navegación de usuarios pasados para facilitar la navegación de los futuros usuarios. Pipeline, análisis y preprocesamiento de datos El primer reto fue trabajar con datos agregados, ya que la información del cliente se proporcionó originalmente a través de archivos .json que contenían campos anidados. Por lo tanto, tuvimos que convertir todos estos datos en un formato que nos permitiese trabajar de una manera fácil y flexible. El mayor problema inicial fue que recibíamos datos de diferentes servidores por día. La compañía tiene más de un servidor en producción en una solución interna de autoescalado, por lo que podríamos obtener N archivos .json cada día. Con esos datos, la empresa quería obtener dos cosas: Un cuadro de mando con análisis y visualizaciones sobre las tendencias de búsqueda realizadas Un recomendador de componentes para los usuarios Uno de los pasos más importantes y difíciles en un proyecto de machine learning son los procesos de adquisición de datos. En este caso por dos motivos: porque vienen en archivos .json de diferentes servidores, pero también porque contienen algunos datos confidenciales. Por lo tanto, los datos tenían que procesarse primero para anonimizarlos, y luego transformarse para poder utilizarlos más tarde. 1. Desde cada servidor, cada día se carga un archivo .json con toda la información. Esta información es confidencial por lo que el administrador de la consola GCP solo puede acceder a este grupo siguiendo las consideraciones de seguridad. 2. Luego, se activa una cloud function para anonimizar los datos y garantizar que no se manejen los datos confidenciales. Una vez que los archivos .json se desidentifican, se almacenan en otro almacenamiento en la nube (paso 3). Durante este proceso, los datos no se transforman, el único objetivo de la función de nube es desidentificar los datos que realizan la transformación primitiva en algunos campos (por ejemplo, IP, correo electrónico) mediante el uso de la API de prevención de pérdida de datos de la nube. # Construct deidentify configuration dictionary deidentify_config = { 'info_type_transformations': { 'transformations': [ { "primitive_transformation": { "crypto_hash_config": { "crypto_key": { "unwrapped": { "key": crypto_hash_key } } } } } ] } } # Construct item item = {'value': content} # Call the API response = dlp.deidentify_content( parent, inspect_config=inspect_config, deidentify_config=deidentify_config, item=item)3. En este paso, hay 2 cloud functions porque, una vez que se de-identifican los datos, la empresa tenía dos necesidades diferentes. Por lo tanto, cada función de la nube realiza el proceso ETL necesario para cada objetivo, y los datos procesados se almacenan posteriormente en BigQuery (Paso 5) porque debemos seguir agregando datos y también uno de ellos se está utilizando desde Data Studio (estadísticas de búsqueda) y el otro se usa desde el Modelo de datos. a) La primera función de nube transforma y almacena los datos relacionados con las búsquedas. Así que Data studio se puede utilizar con esa información. client = bigquery.Client() rows_to_insert = [ (instance['jsessionId'], instance['sessionId'], instance['date'], instance['time'], instance['refererURL'], instance['origin'], instance['searchPosition'], instance['searchId'], instance['ip'], instance['application_quality'], instance['application_radiationMinimum'], instance['application_radiationMaximum'], instance['application_typeOfSatellite'], instance['geoIp_regionCode'], instance['geoIp_countryName'], instance['geoIp_status'], instance['geoIp_continentCode'], instance['geoIp_regionName'], instance['geoIp_countryCode'], instance['geoIp_longitude'], instance['geoIp_latitude'], instance['geoIp_city'], instance['device_os'], instance['device_manufacturer'], instance['device_group'], instance['device_type'], instance['device_browser_renderingEngine'], instance['device_browser_manufacturer'], instance['device_browser_name'], instance['device_browser_majorVersion'], instance['device_browser_minorVersion'], instance['device_browser_group'], instance['device_browser_type'], instance['device_browser_version'], instance['originalComponent_componentNumber'], instance['originalComponent_style'], instance['originalComponent_family'], instance['originalComponent_maker'], instance['originalComponent_familyPath'], instance['originalComponent_familyId'], instance['originalComponent_componentId'], instance['originalComponent_nasaGroup'], instance['originalComponent_nasaSection'], instance['originalComponent_qualified'], instance['relatedComponent_componentNumber'], instance['relatedComponent_style'], instance['relatedComponent_family'], instance['relatedComponent_maker'], instance['relatedComponent_familyPath'], instance['relatedComponent_familyId'], instance['relatedComponent_componentId'], instance['relatedComponent_nasaGroup'], instance['relatedComponent_nasaSection'], instance['relatedComponent_qualified'], instance['user_userId'], instance['user_email']) ] # API request table_ref = client.dataset(bq_dataset).table(bq_table) table = client.get_table(table_ref) client.insert_rows(table, rows_to_insert)b) La segunda función de la nube transforma y almacena los datos relacionados con cada componente para poder usarse como un conjunto de datos del modelo. client = bigquery.Client() rows_to_insert = [ (instance['jsessionId'], instance['sessionId'], instance['date'], instance['time'], instance['refererURL'], instance['origin'], instance['searchPosition'], instance['searchId'], instance['ip'], instance['application_quality'], instance['application_radiationMinimum'], instance['application_radiationMaximum'], instance['application_typeOfSatellite'], instance['geoIp_regionCode'], instance['geoIp_countryName'], instance['geoIp_status'], instance['geoIp_continentCode'], instance['geoIp_regionName'], instance['geoIp_countryCode'], instance['geoIp_longitude'], instance['geoIp_latitude'], instance['geoIp_city'], instance['device_os'], instance['device_manufacturer'], instance['device_group'], instance['device_type'], instance['device_browser_renderingEngine'], instance['device_browser_manufacturer'], instance['device_browser_name'], instance['device_browser_majorVersion'], instance['device_browser_minorVersion'], instance['device_browser_group'], instance['device_browser_type'], instance['device_browser_version'], instance['originalComponent_componentNumber'], instance['originalComponent_style'], instance['originalComponent_family'], instance['originalComponent_maker'], instance['originalComponent_familyPath'], instance['originalComponent_familyId'], instance['originalComponent_componentId'], instance['originalComponent_nasaGroup'], instance['originalComponent_nasaSection'], instance['originalComponent_qualified'], instance['relatedComponent_componentNumber'], instance['relatedComponent_style'], instance['relatedComponent_family'], instance['relatedComponent_maker'], instance['relatedComponent_familyPath'], instance['relatedComponent_familyId'], instance['relatedComponent_componentId'], instance['relatedComponent_nasaGroup'], instance['relatedComponent_nasaSection'], instance['relatedComponent_qualified'], instance['user_userId'], instance['user_email']) ] # API request table_ref = client.dataset(bq_dataset).table(bq_table) table = client.get_table(table_ref) client.insert_rows(table, rows_to_insert)4. Como mencionamos anteriormente, la razón principal para almacenar los datos procesados en BigQuery es porque: a) El conjunto de datos se utiliza en Data Studio, y necesitábamos realizar consultas de tipo SQL. b) Para el modelo, Cloud Storage era una opción, pero la razón principal que movió la decisión a BigQuery es que necesitábamos agregar datos y BigQuery lo permite. Selección y exploración de datos Una vez que los datos se almacenan, se activa una cloud function como parte del proceso ETL. La cloud function está estructurada con diferentes objetivos, pero el proceso más importante se puede describir en la siguiente captura de pantalla: # Load dataset from BigQuery df = pd.io.gbq.read_gbq("""SELECT * FROM dataset.component""", project_id=project_name) # Phase of analysis of the data set. df = preprocess_dataset(df,threshold) # Processing phase of the data set. df = analyze_and_process_dataset(df, bucket, local_tmp_path, column_codes_path, dataset_training_path, dataset_original_name, percenttrain) # Model training on ML Engine train_model_in_ml_engine( project_name, 'gs://' + bucket_name + '/' + ml_engine_job_info_path, ml_engine_package_uris.split(','), 'gs://' + bucket_name + '/' + dataset_training_path + 'data-' + dataset_original_name, 'gs://' + bucket_name + '/' + dataset_training_path + 'test-' + dataset_original_name)Como puede observar, en primer lugar los datos se importan y se procesan previamente, eliminando columnas con muchos valores nulos. Luego, se analiza y prepara el nuevo conjunto de datos preprocesados, que codifica los identificadores del usuario y los componentes en números ordenados y almacena los archivos de marcos de datos para la capacitación y evaluación en el almacenamiento en la nube. Finalmente, el sistema de recomendación está entrenado en ML Engine Preprocesamiento de datos con cloud function El primer paso importante de la función de nube es el primer preprocesamiento de datos donde: Las cadenas vacías se convierten en valores perdidos # Convert empty strings to missing values df_initial = df.replace('',np.nan, regex=True) Las columnas con más valores perdidos que un umbral deseado se eliminan # Obtain number of missing values tab_info=pd.DataFrame(df_initial.dtypes).T.rename(index={0:'column type'}) tab_info=tab_info.append(pd.DataFrame(df_initial.isnull().sum()).T.rename(index={0:'null values (number)'})) tab_info=tab_info.append(pd.DataFrame(df_initial.isnull().sum()/df_initial.shape[0]*100).T.rename(index={0:'null values (%)'})) # Remove columns with more missing values than a pre-defined percentage threshold data_colOut = df_initial.copy() for x in range(0, len(df_initial.columns.values)): if tab_info.values[2][x]>float(threshold): data_colOut = data_colOut.drop(df_initial.columns.values[x], axis=1) Se eliminan otras columnas no deseadas. # Remove undesired features toRemove = ['device_browser_type', 'device_browser_group', 'device_browser_minorVersion', 'device_browser_name', 'device_browser_manufacturer', 'device_browser_majorVersion', 'device_browser_version','device_browser_renderingEngine'] data4Query = data_colOut.drop(toRemove, axis=1) Si quieres conocer un ejemplo práctico, te recomendamos leer nuestro post “Un ejemplo práctico de cómo usar la api del estimador con TensorFlow”.