#!/usr/bin/env python # -*- coding=utf-8 -*- import os import numpy as np import base64 import cv2 import json import logging import logging.config import importlib import time import sys import traceback from os import path import uuid reload(sys) sys.setdefaultencoding('utf8') cdir =os.path.dirname(__file__) # logging.basicConfig(filename=os.path.join(cdir, 'AIServer.log'), # filemode='a',format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', # level=logging.DEBUG) if sys.platform.find('linux') >= 0: logging.basicConfig( format='%(asctime)s [%(levelname)s] - %(message)s', level=logging.INFO) else: with open(os.path.join(cdir, 'log.config'), 'rt') as f: lcnf = json.load(f) logging.config.dictConfig(lcnf) vi_lib_dir = os.getenv('VI_LIB') vi_opton_dir = os.getenv('VI_OPTON') opton_pred_dir = os.path.join(vi_opton_dir, 'opton_pred_v2') sys.path.append(vi_lib_dir) sys.path.append(vi_opton_dir) sys.path.append(opton_pred_dir) logging.debug('vi_lib_dir: ' + vi_lib_dir) logging.debug('vi_opton_dir: ' + vi_opton_dir) logging.debug('opton_pred_dir: ' + opton_pred_dir) logging.debug('sys.path: ' + str(sys.path)) idir = os.path.join(cdir, 'web', 'images') if not os.path.exists(idir): os.makedirs(idir) hdir = 'http://127.0.0.1:7999/images/' from opton_core.const import * from opton_core.font_util import get_font_scale_and_thick from opton_core.read_image_scale import read_scale import opton_core.translate_zzsb as translate_zzsb import opton_core.translate_bzpj as translate_bzpj from opton_pred_v2.config import get_task_models from opton_pred_v2.pred import pred_zzsb_image from opton_pred_v2.pred import pred_bzpj_image class NumpyEncoder(json.JSONEncoder): """ Special json encoder for numpy types """ def default(self, obj): if isinstance(obj, (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64)): return int(obj) elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) elif isinstance(obj,(np.ndarray,)): return obj.tolist() return json.JSONEncoder.default(self, obj) def get_image_scale(img_base64): im_str = base64.b64decode(img_base64) im_inp = np.fromstring(im_str, dtype=np.uint8) im_inp = cv2.imdecode(im_inp, 1) zoom_key, um_per_pix, _ = read_scale(im_inp) try: return parseResult(True, {"zoom_key":zoom_key,"um_per_pix":round(um_per_pix,4)}) except Exception as e: logging.error(traceback.format_exc()) return parseResult(False, {"msg":"Failed to get the scale, please check the image"}) def parseResult(success, raw_msg): smsg = "" try: json_msg = {"success":success, "content": raw_msg} logging.debug(json_msg) smsg = json.dumps(json_msg, ensure_ascii=False, cls=NumpyEncoder) except Exception as e: logging.error(e) json_msg = {"success":False, "content": "Invalid for the result."} smsg = json.dumps(json_msg, ensure_ascii=False) return smsg.decode("utf-8") def check_json_format(raw_msg): try: logging.debug(raw_msg.encode("utf-8")) json_msg = json.loads(raw_msg.encode("utf-8")) return True, json_msg except ValueError: try: json_msg = json.loads(raw_msg) return True, json_msg except ValueError: return False, {} class ImageDetect: def __init__(self): logging.debug('ImageDetect class is initialised.') self.sfunc = '' self.scatagory = -1 self.sstatus = -1 self.smaginifiation = -1 self.sitems = -1 self.task = None def init(self, args): logging.debug('init function is called:'+args) args_ok, jargs = check_json_format(args) if (args_ok == False): logging.error("Invalid input parameter:"+args) return parseResult(False, {"msg":"Invalid input parameter"}) try: sfunc = jargs['func'] self.sfunc = sfunc if sfunc == 'zzsb': scatagory = jargs['catagory'] sstatus = jargs['status'] smaginifiation = jargs['magnification'] logging.debug('zzsb init: '+scatagory + ":" + sstatus + ":" + smaginifiation) self.scatagory = int(scatagory) self.sstatus = int(sstatus) self.smaginifiation = int(smaginifiation) elif sfunc == 'bzpj': scatagory = jargs['catagory'] sitems = jargs['items'] logging.debug('bzpj init: '+scatagory + ":" + sitems) self.scatagory = int(scatagory) self.sitems = int(sitems) jargs.pop('catagory') jargs.pop('items') jargs.pop('func') else : logging.error('unsupport function: '+sfunc) return parseResult(False, {"msg":"unsupport function"}) except: logging.error(traceback.format_exc()) return parseResult(False, {"msg":"Missing parameters"}) self.times = 0 return parseResult(True, {"msg":"Call init method success."}) def get_vp(self, im_inp, args): h, w = im_inp.shape[:2] vp = {} vp['type'] = 'R' vp['x'] = 0 vp['y'] = 0 vp['w'] = w vp['h'] = h # vp params if args.has_key('viewport'): vp = args['viewport'] return vp def overlay_im_out_on_im_inp(self, im_out, im_inp, args): if im_out is None: return h, w = im_inp.shape[:2] vp = self.get_vp(im_inp, args) x1 = vp['x'] y1 = vp['y'] x2 = vp['x'] + vp['w'] y2 = vp['y'] + vp['h'] color = (0,0,255) font = cv2.FONT_HERSHEY_SIMPLEX scale, thick = get_font_scale_and_thick(im_inp) # blur im_inp = cv2.blur(im_inp, (30,30)) if vp['type'] == 'R': im_inp[y1:y2, x1:x2, :] = im_out # draw vp border # if vp['w'] < w or vp['h'] < h: # cv2.rectangle(im_inp[y1:y2, x1:x2, :], (x1,y1), (x2,y2), color, thick) else: im_out_fg_mask = np.zeros((vp['h'], vp['w']), np.uint8) cx = int(vp['w']/2) cy = int(vp['h']/2) r = int(min(vp['w'], vp['h']) / 2) cv2.circle(im_out_fg_mask, (cx,cy), r, (255,255,255), -1) im_out_fg = cv2.bitwise_and(im_out, im_out, mask=im_out_fg_mask) im_vp = im_inp[y1:y2, x1:x2, :] im_out_bg_mask = cv2.bitwise_not(im_out_fg_mask) im_out_bg = cv2.bitwise_and(im_vp, im_vp, mask=im_out_bg_mask) im_inp[y1:y2, x1:x2, :] = cv2.add(im_out_bg, im_out_fg) # draw vp border # cv2.circle(im_inp[y1:y2, x1:x2, :], (cx,cy), r, color, thick) return im_inp def draw_text(self, im_inp, text, x, y, color, shade=False): scale, thick = get_font_scale_and_thick(im_inp) # print("scale=",scale) # scale=2 if scale>3 : scale=3 font = cv2.FONT_HERSHEY_SIMPLEX offset = thick if shade: # cv2.putText(im_inp, text, (x+offset, y+offset), font, scale, (255,255,255), thick) cv2.putText(im_inp, text, (x, y), font, scale, (255,255,255), thick*4) cv2.putText(im_inp, text, (x, y), font, scale, color, thick) def draw_bbox(self, im_inp, bbox, color, shade=False): (x1,y1,x2,y2) = bbox scale, thick = get_font_scale_and_thick(im_inp) font = cv2.FONT_HERSHEY_SIMPLEX offset = thick if shade: # cv2.rectangle(im_inp, (x1+offset,y1x1+offset), (x2x1+offset,y2x1+offset), (255,255,255), thick) cv2.rectangle(im_inp, (x1,y1), (x2,y2), (255,255,255), thick*4) cv2.rectangle(im_inp, (x1,y1), (x2,y2), color, thick) def do_pred_by_data(self, img_base64, args={}): logging.debug("call start to do_pred_by_data---->") im_str = base64.b64decode(img_base64) im_inp = np.fromstring(im_str, dtype=np.uint8) im_inp = cv2.imdecode(im_inp, 1) h, w = im_inp.shape[:2] vp = self.get_vp(im_inp, args) title = '' details = {} im_out = None zoom_key = '' um_per_pix = -1 image_http = '' image_path = '' task_name = None cls_name = None font = cv2.FONT_HERSHEY_SIMPLEX scale, thick = get_font_scale_and_thick(im_inp) is_success = True try: if self.sfunc == 'zzsb': logging.debug("pred_zzsb_image ---->") title, details, im_out, (self.task, task_name, cls_name, zoom_key, um_per_pix) = pred_zzsb_image(im_inp, args) # translate names title = translate_zzsb.translate_title(title) details = translate_zzsb.translate_details(details) logging.debug("pred_zzsb_image title: " + title) # overlay im_out on im_inp im_inp = self.overlay_im_out_on_im_inp(im_out, im_inp, args) if args.has_key('draw_color_zz') and len(args['draw_color_zz']) > 0: # draw percent text offset = scale * 40 x = 20 y = 20 # details: {'class_name': (percent, sample_bbox, dets, color), ...} for k, v in details.items(): color = v[3] if color is None: # indicate the class_name is not in draw_color_zz list continue y = y + offset if v[0] == 1.0: text = '{}: 100%'.format(k) else: text = '{}: {:.2%}'.format(k, v[0]) self.draw_text(im_inp, text, x, y, color, True) else: # draw sample bboxes on im_inp color = (0,0,255) for key, value in details.items(): # draw sample bbox (x1,y1,x2,y2) = value[1] bg=value[3] if bg ==None: margin = 20 x1 = max(vp['x']+x1-margin, 0) y1 = max(vp['y']+y1-margin, 0) x2 = min(vp['x']+x2+margin, w) y2 = min(vp['y']+y2+margin, h) self.draw_bbox(im_inp, (x1,y1,x2,y2), color, True) # draw bbox text margin = 10 x = x1 y = y1 - margin if y < margin: y = y2 + margin*3 self.draw_text(im_inp, key, x, y, color, True) else: self.draw_text(im_inp, key,vp['x']+30,vp['y']+100, color, True) elif self.sfunc == 'bzpj': logging.debug("pred_bzpj_image ---->") title, details, im_out, (self.task, task_name, cls_name, zoom_key, um_per_pix) = pred_bzpj_image(im_inp.copy(), self.scatagory, self.sitems, args) title = str(title) logging.debug("pred_bzpj_image title: " + title) # overlay im_out on im_inp im_inp = self.overlay_im_out_on_im_inp(im_out, im_inp, args) # details: {'results': [(text, color),], ...} if details is not None and details.has_key('results'): offset = scale * 40 x = 20 y = 40*scale for (text, color) in details['results']: size=cv2.getTextSize(text,cv2.FONT_HERSHEY_COMPLEX,3,2) print("size=",size) self.draw_text(im_inp, text, x, y, color, True) x +=size[0][0]+20 title_base64 = base64.b64encode(title) image_name = str(time.strftime('%Y%m%d_%H%M%S_' , time.localtime())) + str(uuid.uuid1())[:8] + ".jpg" image_path = os.path.join(idir, image_name).decode("utf-8") cv2.imencode(image_path, im_inp)[1].tofile(image_path) image_http = (hdir+image_name).decode("utf-8") except: logging.error(traceback.format_exc()) is_success = False detect = { "title": title, "details": details, "detect_type": self.sfunc, "viewport": vp, "additional": { "zoom_key":zoom_key, "um_per_pix":um_per_pix, "task_name":task_name, "cls_name":cls_name, "pred_image":image_http, "image_path":image_path } } jresult = parseResult(is_success, detect) logging.debug(jresult) logging.debug("call end of do_pred_by_data<----") return jresult ''' NOT USED by now ''' def do_pred_by_file(self, param): logging.debug("call start to do_pred_by_file---->") logging.debug(param.encode("utf-8")) im_inp = self.task.load_image_file(param.encode("utf-8")) title, details = self.task.pred_img_data(im_inp) zoom_key, um_per_pix = self.task.read_image_scale(im_inp) title = translate_zzsb.translate_title(title) details = translate_zzsb.translate_details(details) detect = {"title":title, "details":details, "additional":{"zoom_key":zoom_key, "um_per_pix":um_per_pix}} jresult = parseResult(True, detect) logging.debug(jresult) logging.debug("call end of do_pred_by_file<----") return jresult if __name__ == '__main__': jpg_name = sys.argv[1] cur_dir = os.getcwd() test_dir = os.path.join(cur_dir, 'test') jpg_file = os.path.join(test_dir, jpg_name + '.jpg') img = cv2.imread(jpg_file) x=0 y=0 h, w = img.shape[:2] r = 800 cx = int(w/2) - r cy = int(h/2) - r cw = r * 2 ch = r * 2 img = cv2.imencode('.jpg', img)[1] img = base64.b64encode(img) ai = ImageDetect() params = {} # ================================================================= # Used for demonstrating all supported custom parameters # ================================================================= if jpg_name == 'z1_1': # ZZSB-a-alpha_f ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = { # 'zoom_key': 500, # 'um_per_pix': 0.049, 'draw_color_zz':['alphaf'], # 'input': [ # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'alphaf', 'action':'add'}, # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'', 'action':'delete'} # ] } if jpg_name == 'z1_2': # ZZSB-a-alpha_f ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = { # 'zoom_key': 500, # 'um_per_pix': 0.049, 'draw_color_zz':['alphaf'], # 'input': [ # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'alphaf', 'action':'add'}, # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'', 'action':'delete'} # ] } if jpg_name == 'z2': # ZZSB-a-f ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'draw_color_zz':['a', 'f']} if jpg_name == 'z3': # ZZSB-b-p-f ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'draw_color_zz':['f', 'p']} if jpg_name == 'z4': # ZZSB-f-p ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'draw_color_zz':['p']} if jpg_name == 'z5': # ZZSB-g_p-ps_c ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == 'z6': # ZZSB-m-cem-retain_a ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') if jpg_name == 'z7': # ZZSB-m ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') if jpg_name == 'z8': # ZZSB-p-f ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'draw_color_zz':['p']} if jpg_name == 'z9': # ZZSB-p ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'draw_color_zz':['p']} if jpg_name == 'z10': # ZZSB-tempm-cem-retain_a ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') if jpg_name == 'z11': # ZZSB-temps ai.init('{"func": "zzsb", "catagory": "-1", "status": "-1", "magnification": "-1"}') params = {'draw_color_zz':['deltaf']} # ------------------------------------------------------------------------------ if jpg_name == '14_1': # ZT_H-G_XZ ai.init('{"func":"bzpj", "catagory":"14", "items":"1"}') params = { # 'input': [ # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'A', 'action':'add'} # ] } if jpg_name == '14_2': # ZT_H-G_CD ai.init('{"func":"bzpj", "catagory":"14", "items":"2"}') params = { 'zoom_key': 100, 'um_per_pix': 0.407, 'input': [ {'shape_type': 'line', 'shape_data': [[50, 228], [478, 600]], 'action': 'add', 'value': ''}, {'shape_type': 'line', 'shape_data': [(280, 1062), (746, 704)], 'action': 'add', 'value': ''}, {'shape_type': 'line', 'shape_data': [(2144, 906), (1784, 1112)], 'action': 'add', 'value': ''} ] } if jpg_name == '14_3': # ZT_H-P_SL ai.init('{"func":"bzpj", "catagory":"14", "items":"3"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '14_5': # ZT_H-LGJ_SL ai.init('{"func":"bzpj", "catagory":"14", "items":"5"}') params = {'viewport': {'type':'C', 'x':496, 'y':224, 'w':1759, 'h':1759}} if jpg_name == '14_6': # ZT_H-GJT_SL_OD ai.init('{"func":"bzpj", "catagory":"14", "items":"6"}') params = {'viewport': {'type':'C', 'x':377, 'y':105, 'w':1997, 'h':1997}} if jpg_name == '14_10': # ZT_H-GJT_SL_CLS ai.init('{"func":"bzpj", "catagory":"14", "items":"10"}') params = {'viewport': {'type':'C', 'x':377, 'y':105, 'w':1997, 'h':1997}} # ------------------------------------------------------------------------------ if jpg_name == '15_1': # ZT_QM-G_QH ai.init('{"func":"bzpj", "catagory":"15", "items":"1"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '15_2': # ZT_QM-G_DX ai.init('{"func":"bzpj", "catagory":"15", "items":"2"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '15_3': # ZT_QM-P_SL ai.init('{"func":"bzpj", "catagory":"15", "items":"3"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '15_4': # ZT_QM-F_SL ai.init('{"func":"bzpj", "catagory":"15", "items":"4"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '15_5': # ZT_QM-LGJ_SL ai.init('{"func":"bzpj", "catagory":"15", "items":"5"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} # ------------------------------------------------------------------------------ if jpg_name == '16_1': # ZCG_GTG-QH ai.init('{"func":"bzpj", "catagory":"16", "items":"1"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '16_2': # ZCG_GTG-WZ_T ai.init('{"func":"bzpj", "catagory":"16", "items":"2"}') params = {'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}} if jpg_name == '16_4': # ZCG_GTG-DZ ai.init('{"func":"bzpj", "catagory":"16", "items":"4"}') params = { 'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}, 'ZCG_GTG-DZ': {'direction':'v'} } # ------------------------------------------------------------------------------ if jpg_name == '17_1': # YGXG-DZ_A ai.init('{"func":"bzpj", "catagory":"17", "items":"1"}') params = {'YGXG-DZ': {'direction':'h'}} if jpg_name == '17_4': # YGXG-DZ_D ai.init('{"func":"bzpj", "catagory":"17", "items":"4"}') params = {'YGXG-DZ': {'direction':'h'}} if jpg_name == '17_5': # YGXG-DZ_E ai.init('{"func":"bzpj", "catagory":"17", "items":"5"}') params = { 'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch}, 'YGXG-DZ': {'direction':'v'} } # ------------------------------------------------------------------------------ if jpg_name == '18_1': # YGB-JZW ai.init('{"func":"bzpj", "catagory":"18", "items":"1"}') params = { 'YGB-JZW': {'output_cls_names': ['a', 'b', 'c', 'd', 'ds'], 'direction':'h'}, # 'input': [ # {'shape_type':'line', 'shape_data':[(x1,y1),(x2,y2)], 'value':'A', 'action':'add'}, # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'D', 'action':'add'}, # {'shape_type':'rect', 'shape_data':[(x1,y1),(x2,y2)], 'value':'', 'action':'delete'} # ] } if jpg_name == '18_2': # YGB-JLD_OD ai.init('{"func":"bzpj", "catagory":"18", "items":"2"}') if jpg_name == '18_10': # YGB-JLD_CLS ai.init('{"func":"bzpj", "catagory":"18", "items":"10"}') if jpg_name == '18_3': # YGB-W ai.init('{"func":"bzpj", "catagory":"18", "items":"3"}') params = { 'YBG-W': {'w_cls':'wa'}, 'viewport': {'type':'C', 'x':cx, 'y':cy, 'w':cw, 'h':ch} } raw_msg = ai.do_pred_by_data(img, params) json_msg = json.loads(raw_msg.encode("utf-8")) detect_type = json_msg['content']['detect_type'] title = json_msg['content']['title'] details = json_msg['content']['details'] print('') print('{}: {}'.format(jpg_name, json_msg['success'])) if detect_type == 'zzsb': print('{}'.format(title)) elif detect_type == 'bzpj': print('{}'.format(details)) else: print('invalid detect_type={}'.format(detect_type))