| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 | # -*- coding: utf-8 -*-"""This is an implementation of SEM API WrapperHow to use:    The SEM_API class provides a context manager. It is highly recommended to useit through _with_ expressionwith SEM_API(state='remote') as sem:    do_something(sem)The SEM_API class must be initialized by specifying if the EM Server is runningremotely or locally (state = 'remote' or 'local'). This will influence the behavior of how the real time image interface behaves.The interface provides following functions:    1. Functions derived from SmartSEM API:        GetValue(AP_name, style='float')    SetValue(AP_name, value)    GetValueMin(AP_name)    GetValueMax(AP_name)    GetState(DP_name, style='string')    SetState(DP_name, value)    Execute(CMD_name)    GetStagePosition()    MoveStage(coord)    Grab(self, fname, left = 0.0, right = 1.0, top = 0.0, bottom = 1.0, overlay = False)2.  Interface to get real time image. The real time image will be updated in    following properties:    img_array           If running remotely, it provides uint8 array of current image using the current         pixel store. If running locally, it provides uint16 array instead.    img_array_reduced           Only present if running locally. It provides uint8 array of current image in         1024x768 pixel store.                            The following commands control the start, pause and terminate of continuous    image acquisition:        UpdateImage_Start()        UpdateImage_Pause()        UpdateImage_Terminate()    The update rate can be set through the following property:        update_rate              possible values from 0.01 to 10    3.  Often used high level function:    wait_for_stage_idle()        Wait till the stage arrives in destination.    grab_full_image(fname)        Freeze and wait till scan finished to capture a full image.    set_scan_speed(scan_speed)        Directly set certain scan speed.    set_scan_mode(scan_mode)        Directly set the noise reduction mode of scan.    do_autofocus()        Peform autofocus, block the thread till it finishes.    4.  Event interface:    The event interface is implemented as "bag of handler function". The handler function is    called as:    handler(ParameterName, Reason, ParameterID, LastKnownValue)    The following function and attributes are used to control the event interface:    Add_Event(func)        Add func in the bag of handler.    Remove_Event(func)        Remove func from the bag of handler.    Set_Notify(PARAM)        Start monitoring PARAM, trigger the event when PARAM is changed.    Unset_Notify(PARAM)        Stop monitoring PARAM.    event_time        Set the time interval in second to check event trigger. Default is 0.1 S.Version: 0.3Author: Luyang HanChangLog:v0.304.09.2019 : Add event interface """import numpy as npimport syssys.coinit_flags = 0from win32com import clientimport pythoncomimport threading, time, tempfile, mmap, warningstry:    from imageio import imreadexcept:    try:        from scipy.misc import imread    except:        try:            from skimage.io import imread        except:            print("Cannot find the module with imread. Please install one of the following package:\n imageio, skimage or scipy.")# following changes are necessary for event to workfrom win32com.client import makepysys.argv = ["makepy", r"CZ.EmApiCtrl.1"]makepy.main()_excpetion_dict = \{1000: ('API_E_GET_TRANSLATE_FAIL',  'Failed to translate parameter into an id'), 1001: ('API_E_GET_AP_FAIL', 'Failed to get analogue value'), 1002: ('API_E_GET_DP_FAIL', 'Failed to get digital value'), 1003: ('API_E_GET_BAD_PARAMETER',  'Parameter supplied is not analogue nor digital'), 1004: ('API_E_SET_TRANSLATE_FAIL',  'Failed to translate parameter into an id'), 1005: ('API_E_SET_STATE_FAIL', 'Failed to set a digital state'), 1006: ('API_E_SET_FLOAT_FAIL', 'Failed to set a float value'), 1007: ('API_E_SET_FLOAT_LIMIT_LOW', 'Value supplied is too low'), 1008: ('API_E_SET_FLOAT_LIMIT_HIGH', 'Value supplied is too high'), 1009: ('API_E_SET_BAD_VALUE', 'Value supplied is is of wrong type'), 1010: ('API_E_SET_BAD_PARAMETER',  'Parameter supplied is not analogue nor digital'), 1011: ('API_E_EXEC_TRANSLATE_FAIL', 'Failed to translate command into an id'), 1012: ('API_E_EXEC_CMD_FAIL', 'Failed to execute command'), 1013: ('API_E_EXEC_MCF_FAIL', 'Failed to execute file macro'), 1014: ('API_E_EXEC_MCL_FAIL', 'Failed to execute library macro'), 1015: ('API_E_EXEC_BAD_COMMAND', 'Command supplied is not implemented'), 1016: ('API_E_GRAB_FAIL', 'Grab command failed'), 1017: ('API_E_GET_STAGE_FAIL', 'Get Stage position failed'), 1018: ('API_E_MOVE_STAGE_FAIL', 'Move Stage position failed'), 1019: ('API_E_NOT_INITIALISED', 'API not initialised'), 1020: ('API_E_NOTIFY_TRANSLATE_FAIL',  'Failed to translate parameter to an id'), 1021: ('API_E_NOTIFY_SET_FAIL', 'Set notification failed'), 1022: ('API_E_GET_LIMITS_FAIL', 'Get limits failed'), 1023: ('API_E_GET_MULTI_FAIL', 'Get multiple parameters failed'), 1024: ('API_E_SET_MULTI_FAIL', 'Set multiple parameters failed'), 1025: ('API_E_NOT_LICENSED', 'Missing API license'), 1026: ('API_E_NOT_IMPLEMENTED', 'Reserved or not implemented'), 1027: ('API_E_GET_USER_NAME_FAIL',  'Failed to get user name (Remoting Interface only)'), 1028: ('API_E_GET_USER_IDLE_FAIL',  'Failed to get user idle state (Remoting Interface only)'), 1029: ('API_E_GET_LAST_REMOTING_CONNECT_ERROR_FAIL',  'Failed to get the last remoting connection error string (Remoting Interface Only)'), 1030: ('API_E_EMSERVER_LOGON_FAILED',  'Failed to remotely logon to the EM Server (username and password may be incorrect or EM Server is not running or User is already logged on, Remoting only)'), 1031: ('API_E_EMSERVER_START_FAILED',  'Failed to start the EM Server. This may be because the Server is already running or has an internal error (Remoting Interface only)'), 1032: ('API_E_PARAMETER_IS_DISABLED',  'The command or parameter is currently disabled (you cannot execute or set it. Remoting Interface only)'), 2027: ('API_E_REMOTING_NOT_CONFIGURED',  'Remoting incorrectly configured, use RConfigure to correct'), 2028: ('API_E_REMOTING_FAILED_TO_CONNECT',  'Remoting did not connect to the server'), 2029: ('API_E_REMOTING_COULD_NOT_CREATE_INTERFACE',  'Remoting could not start (unknown reason)'), 2030: ('API_E_REMOTING_EMSERVER_NOT_RUNNING',  'Remoting: Remote server is not running currently'), 2031: ('API_E_REMOTING_NO_USER_LOGGED_IN',  'Remoting: Remote server has no user logged in')} # class to handle exceptionsclass API_ERROR(Exception):    def __init__(self, error_code):        global _excpetion_dict        error_text = ('\n').join(_excpetion_dict[error_code])        raise Exception(error_text)# periodically run functionclass SwitchThread(threading.Thread):    """    class to run a function repeatedly in the background.    The delay between each run is given as delay in s.    The thread can be paused and also ended.    """    def __init__(self, target=None, name=None, args=(), kwargs={}, delay = 1):        super(SwitchThread, self).__init__(group=None, target=target, name=name, args=(), kwargs={})        self.target = target        self.args = args        self.kwargs = kwargs        self.delay = delay        self.enable = True        self.stop = False        return    def run(self):        while not self.stop:            if self.enable:                self.target(*self.args, **self.kwargs)            time.sleep(self.delay)        def pause(self):        self.enable = False      def resume(self):        self.enable = True    def terminate(self):        self.stop = TrueEventBase = client.getevents("CZ.EMApiCtrl.1")class SEM_Handle(EventBase):    def __init__(self, sem):        self.subscribers = []        super().__init__(sem)    def OnNotifyWithCurrentValue(self, lpszParameter, Reason, paramid, dLastKnownValue):        for sub in self.subscribers:            sub (lpszParameter, Reason, paramid, dLastKnownValue)# EventBase = client.getevents("CZ.EMApiCtrl.1")# class SEM_Handle(EventBase):#     def OnNotifyWithCurrentValue(self, lpszParameter, Reason, paramid, dLastKnownValue):#         print("Reason:%d. The parameter %s changed to %d." %(Reason, lpszParameter, dLastKnownValue))class SEM_API:    """    Python Wrapper to SEM API interface        The class provide a context manager to do some house keeping, it is advised    to use with statement.        with SEM_API(state='remote') as sem:        do_something(sem)        """        def __init__(self, state='local'):        """        Function to initialize the API interface.        This function uses the long InitialiseRemote(void) command.                """        self.mic = client.Dispatch('CZ.EMApiCtrl.1')        if not (state == "remote" or state == "local"):            raise ValueError("state is either remote or local")        self.__state = state        # first initialize        res = self.mic.InitialiseRemoting()        if res == 0:            # prepare the background working            # prepare the MMF for local operation            if self.__state == "local":                self.__pymap = mmap.mmap(-1, 1024*768+1064,"Capture0.MMF",mmap.ACCESS_READ)            time.sleep(0.1)            self.__background_worker = SwitchThread(target=self.__update_image, delay = 1.0)            self.__background_worker.start()            self.__background_worker.pause()            time.sleep(0.1)            # self.subs = []            # self.event = SEM_Handle(self.mic, self.subs)            self.event_time = 0.1            self.event = SEM_Handle(self.mic)            self.__event_stop = False            self.__event_thread = threading.Thread(target = self.__pump)            self.__event_thread.start()        else:            raise API_ERROR(res)        # decorator for the wrapper function    def __error_handling(func):        def func_wrapper(*arg, **kwargs):            res = func(*arg, **kwargs)            if type(res) == int:                return_code = res                if return_code != 0:                    raise API_ERROR(return_code)            elif type(res) == tuple:                return_code = res[0]                result = res[1]                if return_code == 0:                    return result                else:                    raise API_ERROR(return_code)        return func_wrapper    # function to handle the event        def __pump(self):        pythoncom.CoInitializeEx(pythoncom.COINIT_MULTITHREADED)        while self.__event_stop is not True:            pythoncom.PumpWaitingMessages()            time.sleep(self.event_time)        pythoncom.CoUninitialize()    def Add_Event(self, func):        self.event.subscribers.append(func)    def Remove_Event(self, func):        self.event.subscribers.remove(func)    @__error_handling    def Set_Notify(self, PARAM):        res = self.mic.SetNotify(PARAM, True)        return res    @__error_handling    def Unset_Notify(self, PARAM):        res = self.mic.SetNotify(PARAM, False)        return res    # some variables to hold the data    __fname = tempfile.TemporaryFile(suffix='.bmp').name        # function to update the image file    def __update_image(self):        if self.__state == 'remote':            # when working remote, write the image to a bmp file and read back.            self.mic.Grab(0,0,1024,768,0,self.__fname)            self.img_array = imread(self.__fname)        elif self.__state == 'local':            # when working local, write the image to mmap file and read back.            # as each time the size of the frame can change, we create a mmap            # every time on the fly to grab the image            pixel_density = tuple(map(int,self.GetState("DP_IMAGE_STORE").split("*")))            self.mic.Grab(0,0,1024,768,0,"CZ.MMF")            py_map = mmap.mmap(-1, pixel_density[0]*pixel_density[1]*2,"CZ.MMF",mmap.ACCESS_READ)            self.img_array = np.copy(np.frombuffer(py_map, dtype="uint16").reshape((pixel_density[1],pixel_density[0])))            py_map.close()            # update local image            self.img_array_reduce = np.flip(np.frombuffer(self.__pymap, dtype="uint8", offset=1064).reshape((768,1024)), axis=0)        return        @property    def update_rate(self):        return self.__background_worker.delay        @update_rate.setter    def update_rate(self, value):        if value < 0.01:            value = 0.01            warnings.warn("Update delay cannot be smaller than 10 ms.")        elif value > 10:            value = 10            warnings.warn("Update delay cannot be larger than 10 s.")        self.__background_worker.delay = value       # function for AP    @__error_handling    def GetValue(self, AP_name, style='float'):        """        function to get an analog value.        style: float or string        float returns a floating number, string returns a string representation with unit.        """        if style == 'float':            buffer = 0.0            res = self.mic.Get(AP_name, buffer)            return res        elif style == 'string':            buffer = ''            res = self.mic.Get(AP_name, buffer)            return res        else:            raise AttributeError("style = float or string")                @__error_handling    def GetValueMin(self, AP_name):        buffer = 0.0        res = self.mic.GetLimits(AP_name, buffer, buffer)        return (res[0], res[1])        @__error_handling    def GetValueMax(self, AP_name):        buffer = 0.0        res = self.mic.GetLimits(AP_name, buffer, buffer)        return (res[0], res[2])            @__error_handling    def SetValue(self, AP_name, value):        """        function to set an analog value.        The function can take either floating point or a formatted string.        """        if type(value) == str:            pass        else:            value = float(value)        res = self.mic.Set(AP_name, value)        return res        # function for DP        @__error_handling    def GetState(self, DP_name, style='string'):        """        function to get an digital value.        style: float or string        int returns a integer number, string returns a string representation.        """        if style == 'int':            buffer = 0            res = self.mic.Get(DP_name, buffer)            return res        elif style == 'string':            buffer = ''            res = self.mic.Get(DP_name, buffer)            return res        else:            raise AttributeError("style = int or string")                @__error_handling    def SetState(self, DP_name, value):        """        function to set a digital state.        The function can take either integer or a formatted string.        """        if type(value) == str:            pass        else:            value = int(value)        res = self.mic.Set(DP_name, value)        return res        # function for CMD    @__error_handling    def Execute(self, CMD_name):        res = self.mic.Execute(CMD_name)        return res        # function about stage    @__error_handling    def GetStagePosition(self):        buffer = 0.0        res = self.mic.GetStagePosition(buffer, buffer, buffer, buffer, buffer, buffer)        return (res[0], res[1:])        @__error_handling    def MoveStage(self, coord):        if len(coord) == 6:            res = self.mic.MoveStage(*coord)            return res        else:            raise IndexError("coord must be a list or tuple of 6 floating number")                # function about grab image    @__error_handling        def Grab(self, fname, X = 0, Y = 0, W = 1024, H = 768, overlay = False):        """        The function to grab the current image.        left, right, top, button defines the desired size of the imaged, express as the         coordinate relative to the full image.        For example, if 10% of the edge should be excluded, the coordinate should be            left = 0.1, right = 0.9, top = 0,1, button = 0.9        overlay determines whether the image overlay such as datazone should be included.        """        #X = int(left * 1024)        #Y = int(top * 768)        #W = int((right-left)*1024)        #H = int((bottom-top)*768)        if overlay:            res = self.mic.Grab(X,Y,W,H,-1,fname)        else:            res = self.mic.Grab(X,Y,W,H,0,fname)        return res        # parameter and functions dealing with the memory map file and real time imaging.    # implementation pending    # The real time image access only works with local client.        # 2 different interface are supplied, the img_array gives the real time image with    # current pixel density in 16 bit grey. the image_array_reduced gives 8 bit grey    # image of image reduced to 1024x768    # the value should be updated regularly by a seperate thread in the background.    img_array = np.zeros((768,1024))    img_array_reduce = np.zeros((768,1024))    # img_array = np.zeros((768,1024)).astype('uint16')    def UpdateImage_Start(self):        """        function to start continuous updating the imaging array        """        self.__background_worker.resume()        def UpdateImage_Pause(self):        """        function to pause continuous updating the imaging array        """        self.__background_worker.pause()        # some typically used higher level functions    # implementation pending    def move_stage_relative_xy(self, dx, dy):        pos = self.GetStagePosition()        self.MoveStage((pos[0] + dx, pos[1]+dy, pos[2],pos[3],pos[4],pos[5]))        def wait_for_stage_idle(self):        """        wait for the stage movement to finish.        """        while not self.GetState("DP_STAGE_IS") == "Idle":            time.sleep(0.1)                def set_scan_speed(self, DP_Param):        c_dict = {            "0":"CMD_SCANRATE0",            "1":"CMD_SCANRATE1",            "2":"CMD_SCANRATE2",            "3":"CMD_SCANRATE3",            "4":"CMD_SCANRATE4",            "5":"CMD_SCANRATE5",            "6":"CMD_SCANRATE6",            "7":"CMD_SCANRATE7",            "8":"CMD_SCANRATE8",            "9":"CMD_SCANRATE9",            "10":"CMD_SCANRATE10",            "11":"CMD_SCANRATE11",            "12":"CMD_SCANRATE12",            "13":"CMD_SCANRATE13",            "14":"CMD_SCANRATE14",            "15":"CMD_SCANRATE15"        }        self.Execute(c_dict[str(DP_Param)])            def set_scan_mode(self, DP_Param):        c_dict = {            "Pixel Avg.":"CMD_PIXNR",            "Frame Avg":"CMD_FRAME_AVERAGE",            "Frame Int. Busy":"CMD_FRAME_INT",            "Frame Int. Done":"CMD_FRAME_INT",            "Line Avg":"CMD_LINE_AVG",            "Line Int. Busy":"CMD_LINE_INT",            "Line Int. Done":"CMD_LINE_INT",            "Continuous Avg.":"CMD_CONTINUOUS_AVG",            "Drift Comp. Frame Int. Busy":"CMD_DC_FRAME_INT",            "Drift Comp. Frame Int. Done":"CMD_DC_FRAME_INT",            "Drift Comp. Frame Avg.":"CMD_DC_FRAME_AVG"        }        self.Execute(c_dict[DP_Param])        def do_autofocus(self):        self.SetState("DP_AUTOFOCUS_VERSION", "Line Scan based Autofocus Version")        time.sleep(0.5)        self.Execute("CMD_AUTO_FOCUS_FINE")        time.sleep(2)        while self.GetState("DP_AUTO_FN_STATUS") == "Busy":            time.sleep(0.1)            def grab_full_image(self, fname, overlay = False):        """        grab the current full image by restarting the scan and till the complete        frame is finished, and then save the image.        """        self.SetState("DP_FREEZE_ON", "End Frame")        self.Execute("CMD_UNFREEZE_ALL")        time.sleep(0.1)        #self.Execute("CMD_MODE_NORMAL")        #time.sleep(0.1)        if self.GetState("DP_NOISE_REDUCTION") in ("Frame Avg","Line Avg",\                        "Pixel Avg.", "Continuous Avg.", "Drift Comp. Frame Avg."):            time.sleep(0.1)            self.Execute("CMD_FREEZE_ALL")        while self.GetState("DP_FROZEN") == "Live":            time.sleep(0.1)        self.Grab(fname, overlay = overlay)        # context interface    # it is important to use this class through with statement    # the exit function will properly clean up the wrapper.    def __enter__(self):        return self        def __exit__(self, *arg):        self.__background_worker.terminate()        self.__background_worker.join()        # stop event interface        self.__event_stop = True        self.__event_thread.join()        self.event.close()        del self.event        # clear the MMF        if self.__state == 'local':            self.__pymap.close()            self.mic.Grab(0,0,0,0,0,"CZ.MMF")        self.mic.ClosingControl()    
 |