MLOps具有模型单元测试的连续交付
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。

在本系列的前几篇文章中,我们解释了如何在收集新数据时不断集成模型更改并不断训练我们的模型。在本文中,我们将在模拟生产环境的环境中测试经过训练的模型。我们将加载保存在测试注册表中的模型,通过模型API的克隆形式将其公开,然后对其进行测试。欢迎您在此阶段添加自己的测试。下图显示了我们在项目流程中的位置。

代码文件结构如下:

从其存储库中获取原始代码。
data_utils.py
该data_utils.py文件所包含的功能是检查是否存在模型在测试模型注册表的,如果是,加载这个模型:
import datetime from google.cloud import storage import pandas as pd import numpy as np import os import cv2 import sys def previous_model(bucket_name,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client) return status,None except Exception as e: print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: '+e,flush=True) return None,e def load_model(bucket_name,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) blob1 = bucket.blob('{}/{}'.format('testing',model_filename)) blob1.download_to_filename('/root/'+str(model_filename)) return True,None except Exception as e: print('Something went wrong when trying to load previous model from GCS bucket. Exception: '+e,flush=True) return False,e12345678910111213141516171819202122232425262728复制代码类型:[html]
email_notifications.py
在该email_notifications.py文件句柄发送到有关代码执行成功或有问题的产品负责人的通知:
import smtplib import os # Email variables definition sender = 'example@gmail.com’ receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider smtp_port = 587 smtp_account = 'example@gmail.com’ smtp_password = 'your_password’ def send_update(message): message = 'Subject: {}\n\n{}'.format('An automatic unit testing has ended recently.', message) try: server = smtplib.SMTP(smtp_provider,smtp_port) server.starttls() server.login(smtp_account,smtp_password) server.sendmail(sender, receiver, message) return except Exception as e: print('Something went wrong. Unable to send email.',flush=True) print('Exception: ',e) return def exception(e_message): try: message = 'Subject: {}\n\n{}'.format('Something went wrong with the testing API.', e_message) server = smtplib.SMTP(smtp_provider,smtp_port) server.starttls() server.login(smtp_account,smtp_password) server.sendmail(sender, receiver, message) return except Exception as e: print('Something went wrong. Unable to send email.',flush=True) print('Exception: ',e) return123456789101112131415161718192021222324252627282930313233343536复制代码类型:[html]
task.py
该task.py文件处理容器执行。它协调Flask应用程序的初始化和结束,模型加载,模型测试和电子邮件通知:
import tensorflow as tf from tensorflow.keras.models import load_model import jsonpickle import data_utils, email_notifications import sys import os from google.cloud import storage import datetime import numpy as np import jsonpickle import cv2 from flask import flash,Flask,Response,request,jsonify import threading import requests import time # IMPORTANT # If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/ # Starting flask app app = Flask(__name__) # general variables declaration model_name = 'best_model.hdf5' bucket_name = 'automatictrainingcicd-aiplatform' class_names = ['Normal','Viral Pneumonia','COVID-19'] headers = {'content-type': 'image/png'} api = 'http://127.0.0.1:5000/' # self app global model @app.before_first_request def before_first_request(): def initialize_job(): if len(tf.config.experimental.list_physical_devices('GPU')) > 0: tf.config.set_soft_device_placement(True) tf.debugging.set_log_device_placement(True) global model # Checking if there's any model saved at testing on GCS model_gcs = data_utils.previous_model(bucket_name,model_name) # If any model exists at testing, load it, test it on data and use it on the API if model_gcs[0] == True: model_gcs = data_utils.load_model(bucket_name,model_name) if model_gcs[0] == True: try: model = load_model(model_name) except Exception as e: email_notifications.exception('Something went wrong trying to test old /testing model. Exception: '+str(e)) sys.exit(1) else: email_notifications.exception('Something went wrong when trying to load old /testing model. Exception: '+str(model_gcs[1])) sys.exit(1) if model_gcs[0] == False: email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.') sys.exit(1) if model_gcs[0] == None: email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: '+model_gcs[1]+'. Aborting automatic testing.') sys.exit(1) api_test() thread = threading.Thread(target=initialize_job) thread.start() @app.route('/init', methods=['GET','POST']) def init(): message = {'message': 'API initialized.'} response = jsonpickle.encode(message) return Response(response=response, status=200, mimetype="application/json") @app.route('/', methods=['POST']) def index(): if request.method=='POST': try: #Converting string that contains image to uint8 image = np.fromstring(request.data,np.uint8) image = image.reshape((128,128,3)) image = [image] image = np.array(image) image = image.astype(np.float16) result = model.predict(image) result = np.argmax(result) message = {'message': '{}'.format(str(result))} json_response = jsonify(message) return json_response except Exception as e: message = {'message': 'Error: '+str(e)} json_response = jsonify(message) email_notifications.exception('Something went wrong when trying to make prediction via testing API. Exception: '+str(e)+'. Aborting automatic testing.') return json_response else: message = {'message': 'Error. Please use this API in a proper manner.'} json_response = jsonify(message) return json_response def self_initialize(): def initialization(): global started started = False while started == False: try: server_response = requests.get('http://127.0.0.1:5000/init') if server_response.status_code == 200: started = True except: pass time.sleep(3) thread = threading.Thread(target=initialization) thread.start() def api_test(): try: image = cv2.imread('TEST_IMAGE.jpg') image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (128, 128)) result = requests.post(api, data=image.tostring(),headers=headers) result = result.json() prediction = int(result['message']) if prediction == 1: email_notifications.send_update('Testing stage has ended successfully. Shutting down container. Check the GCP logs for more information.') sys.exit(0) else: email_notifications.send_update('Testing stage has crashed. Check the GCP logs for more information.') sys.exit(1) except Exception as e: email_notifications.exception('Testing stage crashed with an exception: '+str(e)+'. Check the GCP logs for more information.') sys.exit(1) if __name__ == '__main__': self_initialize() app.run(host='0.0.0.0',debug=True,threaded=True)123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130复制代码类型:[html]
Docker文件
我们的Dockerfile提供了构建容器的规则:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python Flask jsonpickle RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git RUN mv /root/AutomaticTraining-UnitTesting/data_utils.py /root RUN mv /root/AutomaticTraining-UnitTesting/task.py /root RUN mv /root/AutomaticTraining-UnitTesting/email_notifications.py /root RUN mv /root/AutomaticTraining-UnitTesting/TEST_IMAGE.jpg /root EXPOSE 5000 ENTRYPOINT ["python","task.py"]123456789101112131415161718复制代码类型:[html]
一旦在本地构建并运行了容器,最终将得到一个功能模型单元测试器。它使您可以验证将要部署到生产中的模型是否输出了预期的结果,而没有错误或失败。
随时在此作业中包括其他测试。通常,此类测试取决于业务案例。
赞 (0)