MLOps管道中的模型自动调整
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。
在上一篇文章中,我们为此项目设置了一个云环境。在这一部分中,我们将引导您完成持续集成,模型自动训练,自动调整和持续交付所需的代码。下图显示了我们在项目流程中的位置。
我们将显示代码的精简版本。有关完整版本,请参见此存储库。我们将在该项目中使用GCRDocker映像(由TensorFlow支持)–但是随时可以使用其他映像。
首先,我们将讨论在本地运行这些解决方案的代码。稍后,我们将看到如何为云部署做好准备。
下图显示了我们项目的文件结构。
data_utils.py
该data_utils.py文件句柄的数据加载,转换和模型保存到GCS。此文件可能因项目而异。本质上,它在模型训练之前执行所有数据处理任务。让我们看一下代码:
import datetime from google.cloud import storage import pandas as pd import numpy as np from sklearn.model_selection import train_test_split import tensorflow as tf import gc from sklearn import preprocessing import os import zipfile import cv2 import sys def dataset_transformation(path): images = [] for dirname, _, filenames in os.walk(path): for filename in filenames: if filename.endswith('.png'): image = cv2.imread(os.path.join(dirname, filename)) image = cv2.resize(image, (128, 128)) images.append(image) return images def load_data(args): file_1 = '/root/AutomaticTraining-Dataset/COVID_RX/normal_images.zip' file_2 = '/root/AutomaticTraining-Dataset/COVID_RX/covid_images.zip' file_3 = '/root/AutomaticTraining-Dataset/COVID_RX/viral_images.zip' extract_to = '/root/AutomaticTraining-Dataset/COVID_RX/' with zipfile.ZipFile(file_1, 'r') as zip_ref: zip_ref.extractall(extract_to) with zipfile.ZipFile(file_2, 'r') as zip_ref: zip_ref.extractall(extract_to) with zipfile.ZipFile(file_3, 'r') as zip_ref: zip_ref.extractall(extract_to) normal = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/normal_images') covid = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/covid_images') viral = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/viral_images') #Train and test - dataset combination X = normal + viral + covid #Transforming from list to numpy array. X = np.array(X) #Creating labels. y = [] for i in range(len(normal)): y.append(0) for i in range(len(covid)): y.append(1) for i in range(len(viral)): y.append(2) y = np.array(y) #Dataset splitting X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0, shuffle = True) return X_train, X_test, y_train, y_test def save_model(bucket_name, best_model): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) blob1 = bucket.blob('{}/{}'.format('testing',best_model)) blob1.upload_from_filename(best_model) return True,None except Exception as e: return False,e123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869复制代码类型:[html]
model_assembly.py
该model_assembly.py文件包含模型的创建和自动的调整的代码。我们希望从一个非常基本的模型开始–对其进行训练和评估。如果初始模型不能达到理想的性能,我们将进行改进直到达到目标。让我们看一下代码:
from tensorflow.keras.models import load_model from tensorflow.keras import layers import tensorflow as tf import numpy as np def get_base_model(): input_img = layers.Input(shape=(128, 128, 3)) x = layers.Conv2D(64,(3, 3), activation='relu')(input_img) return input_img,x def get_additional_layer(filters,x): x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(filters, (3, 3), activation='relu')(x) return x def get_final_layers(neurons,x): x = layers.SpatialDropout2D(0.2)(x) x = layers.Flatten()(x) x = layers.Dense(neurons)(x) x = layers.Dense(3)(x) return x123456789101112131415161718192021复制代码类型:[html]
这些函数将在循环中调用,并且在第一次迭代中,我们将获得base_model,final_layers并将它们堆叠起来以构建一个非常简单的模型。如果训练我们发现该模型不执行不够好之后,然后我们将再次得到base_model,加additional_layers,堆栈final_layers,然后训练和一次评估。如果我们仍然无法达到良好的性能,则将在循环中重复最后一个过程,并增加更多的过程,additional_layers直到达到预定义的良好指标为止。
email_notifications.py
该email_notifications.py文件是负责通过本地SMTP服务器产品所有者提供的电子邮件。这些电子邮件会告诉所有者是否一切正常,如果不正常,那是什么问题。
import smtplib import os # Email variables definition sender = 'example@gmail.com’ receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider smtp_port = 587 smtp_account = 'example@gmail.com’ smtp_password = 'your_password’ def training_result(result,model_acc): if result == 'ok': message = 'The model reached '+str(model_acc)+', It has been saved to GCS.' if result == 'failed': message = 'None of the models reached an acceptable accuracy, training execution had to be forcefully ended.’ message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message) try: server = smtplib.SMTP(smtp_provider,smtp_port) server.starttls() server.login(smtp_account,smtp_password) server.sendmail(sender, receiver, message) return except Exception as e: print('Something went wrong. Unable to send email: '+str(e),flush=True) return def exception(e_message): try: message = 'Subject: {}\n\n{}'.format('An automatic training job has failed.', e_message) server = smtplib.SMTP(smtp_provider,smtp_port) server.starttls() server.login(smtp_account,smtp_password) server.sendmail(sender, receiver, message) return except Exception as e: print('Something went wrong. Unable to send email: '+str(e),flush=True) return1234567891011121314151617181920212223242526272829303132333435363738复制代码类型:[html]
task.py
该task.py文件编排节目。它会初始化GPU(如果有),以开始模型训练,并在需要时调整模型。它还接收传递给应用程序的参数。这是代码:
import tensorflow as tf from tensorflow.keras import Model, layers, optimizers from tensorflow.keras.callbacks import ModelCheckpoint from tensorflow.keras import Model from tensorflow.keras.models import load_model import argparse import model_assembly, data_utils, email_notifications import sys import os import gc from google.cloud import storage import datetime import math # general variables declaration model_name = 'best_model.hdf5' def initialize_gpu(): if len(tf.config.experimental.list_physical_devices('GPU')) > 0: tf.config.set_soft_device_placement(True) tf.debugging.set_log_device_placement(True) return def start_training(args): # Loading splitted data X_train, X_test, y_train, y_test = data_utils.load_data(args) # Initializing GPU if available initialize_gpu() train_model(X_train, X_test, y_train, y_test, args) def train_model(X_train, X_test, y_train, y_test,args): try: model_loss, model_acc = [0,0] counter = 0 while model_acc <= 0.85: input_img,x = model_assembly.get_base_model() if counter == 0: x = model_assembly.get_final_layers(64,x) else: for i in range(counter): x = model_assembly.get_additional_layer(int(64*(math.pow(2,counter))),x) x = model_assembly.get_final_layers(int(64*(math.pow(2,counter))),x) cnn = Model(input_img, x,name="CNN_COVID_"+str(counter)) cnn.summary() cnn.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy']) checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq="epoch") cnn.fit(X_train, y_train, epochs=args.epochs, validation_data=(X_test, y_test),callbacks=[checkpoint]) cnn = load_model(model_name) model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2) if model_acc > 0.85: saved_ok = data_utils.save_model(args.bucket_name,model_name) if saved_ok[0] == True: email_notifications.training_result('ok',model_acc) sys.exit(0) else: email_notifications.exception(saved_ok[1]) sys.exit(1) else: pass if counter >= 5: email_notifications.training_result('failed',None) sys.exit(1) counter += 1 except Exception as e: email_notifications.exception('An exception when training the model has occurred: '+str(e)) sys.exit(1) def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform', help = 'GCP bucket name') parser.add_argument('--epochs', type=int, default=3, help='Epochs number') args = parser.parse_args() return args def main(): args = get_args() start_training(args) if __name__ == '__main__': main()1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980复制代码类型:[html]
Docker文件
我们的Dockerfile负责将指令传递给Docker守护程序以构建适当的容器。看起来是这样的:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-CodeCommit.git RUN mv /root/AutomaticTraining-CodeCommit/model_assembly.py /root RUN mv /root/AutomaticTraining-CodeCommit/task.py /root RUN mv /root/AutomaticTraining-CodeCommit/data_utils.py /root RUN mv /root/AutomaticTraining-CodeCommit/email_notifications.py /root ENTRYPOINT ["python","task.py"]123456789101112131415161718复制代码类型:[html]
上面的文件使用该gcr.io/deeplearning-platform-release/tf2-cpu.2-0映像,安装依赖项,克隆所需的存储库,将文件移至主目录,并设置容器执行的入口点。