# -*- coding: utf-8 -*- import os import json import base64 from flask import Flask,request,Response from image_detect import ImageDetect from ibm_ai import ai_detect app = Flask(__name__) @app.route('/api/test', methods=['GET']) def apitest(): return Response(response='Rest server is running...',status=200,mimetype="application/json") @app.route('/', methods=['GET']) def test(): return Response(response='Rest server is running...',status=200,mimetype="application/json") #get the result image @app.route("/api/get_jpg", methods=['POST']) def get_jpg(): result_json={"error":""} jpg_name=request.json['jpg_name'] if jpg_name is None or jpg_name=="": result_json["error"]="jpg_name is null!" else: jpg_path=r'.\web\images\{0}'.format(jpg_name) if not os.path.exists(jpg_path): result_json["error"]="%s not found" % jpg_name else: with open(r'.\web\images\{0}'.format(jpg_name), 'rb') as f: result_json["image"]= base64.b64encode(f.read()) return json.dumps( result_json) @app.route('/api/detect', methods=['POST']) def detect(): in_args = { 'args': request.json['args'], 'data': request.json['data'], 'params': request.json['params'] } detect = ImageDetect() result = json.loads(detect.init(json.dumps(in_args['args']))) status=200 if result['success'] == False: status=400 result = detect.do_pred_by_data(in_args['data'], in_args['params']) return Response(response=result,status=status,mimetype="application/json") # ai analysis of jpgs # get the image names to analysize local images # the results of images will be saved in the AI directory of the local directory @app.route('/api/detect_jpgs', methods=['POST']) def jzw_detect(): json1=request.json['json1'] json2=request.json['json2'] thread_count=3 if json1['func']=='zzsb': thread_count=1 # json2.update({'zoom_key':200,'um_per_pix':0.2744}) print('json1:',json1) print('json2:',json2) print('thread_count:',thread_count) drive='X' if json1.has_key('path'): path=json1['path'] json1['path_client']=path pos=path.find(':') if pos>0 : path=drive+path[pos:] json1['path']=path status=200 result={'success':True,'count':0} try: count=ai_detect(thread_count,json1,json2) result['count']=count except Exception as e: print("exception:",e.message) status=400 result['success']=False print('response:',result) return Response(response=json.dumps(result),status=status,mimetype="application/json") if __name__ == "__main__": print("OPTON VI Edge Server be started...") #app.run(host="0.0.0.0", port=3000,debug=True) #print("OPTON VI Edge Server be shutdown")