建立MLOps模型API
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。
在上一篇文章中,我们讨论了MLCI/CD管道中的单元测试步骤。在这一篇中,我们将构建模型API以支持预测服务。
下图显示了我们在项目流程中的位置。
代码文件的结构如下:
本文中的大多数代码与上一篇代码几乎相同,因此我们仅关注它们之间的区别。
在此存储库中找到完整的代码,因为下面显示的摘录是精简版本。
task.py
协调容器中程序执行的task.py文件如下所示:
import tensorflow as tf from tensorflow.keras.models import load_modelimport jsonpickleimport data_utils, email_notificationsimport sysimport os from google.cloud import storageimport datetimeimport numpy as npimport jsonpickleimport cv2 from flask import flash,Flask,Response,request,jsonifyimport threadingimport requestsimport time # IMPORTANT # If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/ # Starting flask app app = Flask(__name__) # general variables declaration model_name = 'best_model.hdf5'bucket_name = 'automatictrainingcicd-aiplatform'global model @app.before_first_request def before_first_request(): def initialize_job(): if len(tf.config.experimental.list_physical_devices('GPU')) > 0: tf.config.set_soft_device_placement(True) tf.debugging.set_log_device_placement(True) global model # Checking if there's any model saved at testing on GCS model_gcs = data_utils.previous_model(bucket_name,model_name) # If any model exists at prod, load it, test it on data and use it on the API if model_gcs[0] == True: model_gcs = data_utils.load_model(bucket_name,model_name) if model_gcs[0] == True: try: model = load_model(model_name) except Exception as e: email_notifications.exception('Something went wrong trying to production model. Exception: '+str(e)) sys.exit(1) else: email_notifications.exception('Something went wrong when trying to load production model. Exception: '+str(model_gcs[1])) sys.exit(1) if model_gcs[0] == False: email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.') sys.exit(1) if model_gcs[0] == None: email_notifications.exception('Something went wrong when trying to check if production model exists. Exception: '+model_gcs[1]+'. Aborting execution.') sys.exit(1) thread = threading.Thread(target=initialize_job) thread.start() @app.route('/init', methods=['GET','POST']) def init(): message = {'message': 'API initialized.'} response = jsonpickle.encode(message) return Response(response=response, status=200, mimetype="application/json") @app.route('/', methods=['POST']) def index(): if request.method=='POST': try: #Converting string that contains image to uint8 image = np.fromstring(request.data,np.uint8) image = image.reshape((128,128,3)) image = [image] image = np.array(image) image = image.astype(np.float16) result = model.predict(image) result = np.argmax(result) message = {'message': '{}'.format(str(result))} json_response = jsonify(message) return json_response except Exception as e: message = {'message': 'Error'} json_response = jsonify(message) email_notifications.exception('Something went wrong when trying to make prediction via Production API. Exception: '+str(e)+'. Aborting execution.') return json_response else: message = {'message': 'Error. Please use this API in a proper manner.'} json_response = jsonify(message) return json_response def self_initialize(): def initialization(): global started started = False while started == False: try: server_response = requests.get('http://127.0.0.1:5000/init') if server_response.status_code == 200: print('API has started successfully, quitting initialization job.') started = True except: print('API has not started. Still attempting to initialize it.') time.sleep(3) thread = threading.Thread(target=initialization) thread.start() if __name__ == '__main__': self_initialize() app.run(host='0.0.0.0',debug=True,threaded=True)123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109复制代码类型:[cpp]
data_utils.py
该data_utils.py文件不同于其以前的版本仅在加载从生产注册表模型中的一部分。不同之处在于:
status=storage.Blob(bucket=bucket,name='{}/{}'.format('testing',model_filename)).exists(storage_client)经过status=storage.Blob(bucket=bucket,name='{}/{}'.format('production',model_filename)).exists(storage_client)
blob1=bucket.blob('{}/{}'.format('testing',model_filename))byblob1=bucket.blob('{}/{}'.format('production',model_filename))
Docker文件
在我们的Dockerfile中,替换
运行gitclone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git
和
运行gitclone https://github.com/sergiovirahonda/AutomaticTraining-PredictionAPI.git
在本地构建和运行容器后,应该可以通过POST请求在http://172.17.0.2:5000/上获得功能齐全的预测服务。