MLOps具有模型单元测试的连续交付

在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。

在本系列的前几篇文章中,我们解释了如何在收集新数据时不断集成模型更改并不断训练我们的模型。在本文中,我们将在模拟生产环境的环境中测试经过训练的模型。我们将加载保存在测试注册表中的模型,通过模型API的克隆形式将其公开,然后对其进行测试。欢迎您在此阶段添加自己的测试。下图显示了我们在项目流程中的位置。

代码文件结构如下:

从其存储库中获取原始代码。

data_utils.py

该data_utils.py文件所包含的功能是检查是否存在模型在测试模型注册表的,如果是,加载这个模型:

import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
import os
import cv2
import sys

def previous_model(bucket_name,model_filename):
 try:
  storage_client = storage.Client() #if running on GCP
  bucket = storage_client.bucket(bucket_name)
  status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client)
  return status,None
 except Exception as e:
  print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: '+e,flush=True)
  return None,e

def load_model(bucket_name,model_filename):
 try:
  storage_client = storage.Client() #if running on GCP
  bucket = storage_client.bucket(bucket_name)
  blob1 = bucket.blob('{}/{}'.format('testing',model_filename))
  blob1.download_to_filename('/root/'+str(model_filename))
  return True,None
 except Exception as e:
  print('Something went wrong when trying to load previous model from GCS bucket. Exception: '+e,flush=True)
  return False,e12345678910111213141516171819202122232425262728复制代码类型:[html]

email_notifications.py

在该email_notifications.py文件句柄发送到有关代码执行成功或有问题的产品负责人的通知:

import smtplib
import os

# Email variables definition
sender = 'example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = 'example@gmail.com’
smtp_password = 'your_password’

def send_update(message):
 message = 'Subject: {}\n\n{}'.format('An automatic unit testing has ended recently.', message)
 try:
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)
  return
 except Exception as e:
  print('Something went wrong. Unable to send email.',flush=True)
  print('Exception: ',e)
  return

def exception(e_message):
 try:
  message = 'Subject: {}\n\n{}'.format('Something went wrong with the testing API.', e_message)
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)
  return
 except Exception as e:
  print('Something went wrong. Unable to send email.',flush=True)
  print('Exception: ',e)
  return123456789101112131415161718192021222324252627282930313233343536复制代码类型:[html]

task.py

该task.py文件处理容器执行。它协调Flask应用程序的初始化和结束,模型加载,模型测试和电子邮件通知:

import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash,Flask,Response,request,jsonify
import threading
import requests
import time

# IMPORTANT
# If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/

# Starting flask app
app = Flask(__name__)

# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
class_names = ['Normal','Viral Pneumonia','COVID-19']
headers = {'content-type': 'image/png'}
api = 'http://127.0.0.1:5000/' # self app
global model

@app.before_first_request
def before_first_request():
 def initialize_job():
  if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
   tf.config.set_soft_device_placement(True)
   tf.debugging.set_log_device_placement(True)
  global model
  # Checking if there's any model saved at testing on GCS
  model_gcs = data_utils.previous_model(bucket_name,model_name)
  # If any model exists at testing, load it, test it on data and use it on the API
  if model_gcs[0] == True:
   model_gcs = data_utils.load_model(bucket_name,model_name)
 if model_gcs[0] == True:
  try:
   model = load_model(model_name)
  except Exception as e:
   email_notifications.exception('Something went wrong trying to test old /testing model. Exception: '+str(e))
   sys.exit(1)
 else:
  email_notifications.exception('Something went wrong when trying to load old /testing model. Exception: '+str(model_gcs[1]))
  sys.exit(1)
   if model_gcs[0] == False:
 email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
 sys.exit(1)
   if model_gcs[0] == None:
 email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: '+model_gcs[1]+'. Aborting automatic testing.')
 sys.exit(1)
   api_test()
  thread = threading.Thread(target=initialize_job)
  thread.start()

@app.route('/init', methods=['GET','POST'])
def init():
 message = {'message': 'API initialized.'}
 response = jsonpickle.encode(message)
 return Response(response=response, status=200, mimetype="application/json")

@app.route('/', methods=['POST'])
def index():
 if request.method=='POST':
  try:
   #Converting string that contains image to uint8
   image = np.fromstring(request.data,np.uint8)
   image = image.reshape((128,128,3))
   image = [image]
   image = np.array(image)
   image = image.astype(np.float16)
   result = model.predict(image)
   result = np.argmax(result)
   message = {'message': '{}'.format(str(result))}
   json_response = jsonify(message)
   return json_response

  except Exception as e:
   message = {'message': 'Error: '+str(e)}
   json_response = jsonify(message)
   email_notifications.exception('Something went wrong when trying to make prediction via testing API. Exception: '+str(e)+'. Aborting automatic testing.')
   return json_response
 else:
  message = {'message': 'Error. Please use this API in a proper manner.'}
  json_response = jsonify(message)
  return json_response

def self_initialize():
 def initialization():
  global started
  started = False
  while started == False:
   try:
 server_response = requests.get('http://127.0.0.1:5000/init')
 if server_response.status_code == 200:
  started = True
   except:
 pass
   time.sleep(3)
 thread = threading.Thread(target=initialization)
 thread.start()

def api_test():
 try:
  image = cv2.imread('TEST_IMAGE.jpg')
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  image = cv2.resize(image, (128, 128))
  result = requests.post(api, data=image.tostring(),headers=headers)
  result = result.json()
  prediction = int(result['message'])
  if prediction == 1:
   email_notifications.send_update('Testing stage has ended successfully. Shutting down container. Check the GCP logs for more information.')
   sys.exit(0)
  else:
   email_notifications.send_update('Testing stage has crashed. Check the GCP logs for more information.')
   sys.exit(1)
 except Exception as e:
  email_notifications.exception('Testing stage crashed with an exception: '+str(e)+'. Check the GCP logs for more information.')
  sys.exit(1)

if __name__ == '__main__':
 self_initialize()
 app.run(host='0.0.0.0',debug=True,threaded=True)123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130复制代码类型:[html]

Docker文件

我们的Dockerfile提供了构建容器的规则:

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0
WORKDIR /root

RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python Flask jsonpickle

RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev

ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git

RUN mv /root/AutomaticTraining-UnitTesting/data_utils.py /root
RUN mv /root/AutomaticTraining-UnitTesting/task.py /root
RUN mv /root/AutomaticTraining-UnitTesting/email_notifications.py /root
RUN mv /root/AutomaticTraining-UnitTesting/TEST_IMAGE.jpg /root

EXPOSE 5000

ENTRYPOINT ["python","task.py"]123456789101112131415161718复制代码类型:[html]

一旦在本地构建并运行了容器,最终将得到一个功能模型单元测试器。它使您可以验证将要部署到生产中的模型是否输出了预期的结果,而没有错误或失败。

随时在此作业中包括其他测试。通常,此类测试取决于业务案例。

(0)

相关推荐