MLOps管道中的模型自动调整
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。
在上一篇文章中,我们为此项目设置了一个云环境。在这一部分中,我们将引导您完成持续集成,模型自动训练,自动调整和持续交付所需的代码。下图显示了我们在项目流程中的位置。

我们将显示代码的精简版本。有关完整版本,请参见此存储库。我们将在该项目中使用GCRDocker映像(由TensorFlow支持)–但是随时可以使用其他映像。
首先,我们将讨论在本地运行这些解决方案的代码。稍后,我们将看到如何为云部署做好准备。
下图显示了我们项目的文件结构。

data_utils.py
该data_utils.py文件句柄的数据加载,转换和模型保存到GCS。此文件可能因项目而异。本质上,它在模型训练之前执行所有数据处理任务。让我们看一下代码:
import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import gc
from sklearn import preprocessing
import os
import zipfile
import cv2
import sys
def dataset_transformation(path):
images = []
for dirname, _, filenames in os.walk(path):
for filename in filenames:
if filename.endswith('.png'):
image = cv2.imread(os.path.join(dirname, filename))
image = cv2.resize(image, (128, 128))
images.append(image)
return images
def load_data(args):
file_1 = '/root/AutomaticTraining-Dataset/COVID_RX/normal_images.zip'
file_2 = '/root/AutomaticTraining-Dataset/COVID_RX/covid_images.zip'
file_3 = '/root/AutomaticTraining-Dataset/COVID_RX/viral_images.zip'
extract_to = '/root/AutomaticTraining-Dataset/COVID_RX/'
with zipfile.ZipFile(file_1, 'r') as zip_ref:
zip_ref.extractall(extract_to)
with zipfile.ZipFile(file_2, 'r') as zip_ref:
zip_ref.extractall(extract_to)
with zipfile.ZipFile(file_3, 'r') as zip_ref:
zip_ref.extractall(extract_to)
normal = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/normal_images')
covid = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/covid_images')
viral = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/viral_images')
#Train and test - dataset combination
X = normal + viral + covid
#Transforming from list to numpy array.
X = np.array(X)
#Creating labels.
y = []
for i in range(len(normal)):
y.append(0)
for i in range(len(covid)):
y.append(1)
for i in range(len(viral)):
y.append(2)
y = np.array(y)
#Dataset splitting
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0, shuffle = True)
return X_train, X_test, y_train, y_test
def save_model(bucket_name, best_model):
try:
storage_client = storage.Client() #if running on GCP
bucket = storage_client.bucket(bucket_name)
blob1 = bucket.blob('{}/{}'.format('testing',best_model))
blob1.upload_from_filename(best_model)
return True,None
except Exception as e:
return False,e123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869复制代码类型:[html]
model_assembly.py
该model_assembly.py文件包含模型的创建和自动的调整的代码。我们希望从一个非常基本的模型开始–对其进行训练和评估。如果初始模型不能达到理想的性能,我们将进行改进直到达到目标。让我们看一下代码:
from tensorflow.keras.models import load_model from tensorflow.keras import layers import tensorflow as tf import numpy as np def get_base_model(): input_img = layers.Input(shape=(128, 128, 3)) x = layers.Conv2D(64,(3, 3), activation='relu')(input_img) return input_img,x def get_additional_layer(filters,x): x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(filters, (3, 3), activation='relu')(x) return x def get_final_layers(neurons,x): x = layers.SpatialDropout2D(0.2)(x) x = layers.Flatten()(x) x = layers.Dense(neurons)(x) x = layers.Dense(3)(x) return x123456789101112131415161718192021复制代码类型:[html]
这些函数将在循环中调用,并且在第一次迭代中,我们将获得base_model,final_layers并将它们堆叠起来以构建一个非常简单的模型。如果训练我们发现该模型不执行不够好之后,然后我们将再次得到base_model,加additional_layers,堆栈final_layers,然后训练和一次评估。如果我们仍然无法达到良好的性能,则将在循环中重复最后一个过程,并增加更多的过程,additional_layers直到达到预定义的良好指标为止。
email_notifications.py
该email_notifications.py文件是负责通过本地SMTP服务器产品所有者提供的电子邮件。这些电子邮件会告诉所有者是否一切正常,如果不正常,那是什么问题。
import smtplib
import os
# Email variables definition
sender = 'example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = 'example@gmail.com’
smtp_password = 'your_password’
def training_result(result,model_acc):
if result == 'ok':
message = 'The model reached '+str(model_acc)+', It has been saved to GCS.'
if result == 'failed':
message = 'None of the models reached an acceptable accuracy, training execution had to be forcefully ended.’
message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message)
try:
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email: '+str(e),flush=True)
return
def exception(e_message):
try:
message = 'Subject: {}\n\n{}'.format('An automatic training job has failed.', e_message)
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email: '+str(e),flush=True)
return1234567891011121314151617181920212223242526272829303132333435363738复制代码类型:[html]
task.py
该task.py文件编排节目。它会初始化GPU(如果有),以开始模型训练,并在需要时调整模型。它还接收传递给应用程序的参数。这是代码:
import tensorflow as tf
from tensorflow.keras import Model, layers, optimizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import Model
from tensorflow.keras.models import load_model
import argparse
import model_assembly, data_utils, email_notifications
import sys
import os
import gc
from google.cloud import storage
import datetime
import math
# general variables declaration
model_name = 'best_model.hdf5'
def initialize_gpu():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
return
def start_training(args):
# Loading splitted data
X_train, X_test, y_train, y_test = data_utils.load_data(args)
# Initializing GPU if available
initialize_gpu()
train_model(X_train, X_test, y_train, y_test, args)
def train_model(X_train, X_test, y_train, y_test,args):
try:
model_loss, model_acc = [0,0]
counter = 0
while model_acc <= 0.85:
input_img,x = model_assembly.get_base_model()
if counter == 0:
x = model_assembly.get_final_layers(64,x)
else:
for i in range(counter):
x = model_assembly.get_additional_layer(int(64*(math.pow(2,counter))),x)
x = model_assembly.get_final_layers(int(64*(math.pow(2,counter))),x)
cnn = Model(input_img, x,name="CNN_COVID_"+str(counter))
cnn.summary()
cnn.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq="epoch")
cnn.fit(X_train, y_train, epochs=args.epochs, validation_data=(X_test, y_test),callbacks=[checkpoint])
cnn = load_model(model_name)
model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2)
if model_acc > 0.85:
saved_ok = data_utils.save_model(args.bucket_name,model_name)
if saved_ok[0] == True:
email_notifications.training_result('ok',model_acc)
sys.exit(0)
else:
email_notifications.exception(saved_ok[1])
sys.exit(1)
else:
pass
if counter >= 5:
email_notifications.training_result('failed',None)
sys.exit(1)
counter += 1
except Exception as e:
email_notifications.exception('An exception when training the model has occurred: '+str(e))
sys.exit(1)
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform', help = 'GCP bucket name')
parser.add_argument('--epochs', type=int, default=3, help='Epochs number')
args = parser.parse_args()
return args
def main():
args = get_args()
start_training(args)
if __name__ == '__main__':
main()1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980复制代码类型:[html]
Docker文件
我们的Dockerfile负责将指令传递给Docker守护程序以构建适当的容器。看起来是这样的:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-CodeCommit.git RUN mv /root/AutomaticTraining-CodeCommit/model_assembly.py /root RUN mv /root/AutomaticTraining-CodeCommit/task.py /root RUN mv /root/AutomaticTraining-CodeCommit/data_utils.py /root RUN mv /root/AutomaticTraining-CodeCommit/email_notifications.py /root ENTRYPOINT ["python","task.py"]123456789101112131415161718复制代码类型:[html]
上面的文件使用该gcr.io/deeplearning-platform-release/tf2-cpu.2-0映像,安装依赖项,克隆所需的存储库,将文件移至主目录,并设置容器执行的入口点。
