123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- # -*- coding: utf-8 -*-
- """
- This is an implementation of SEM API Wrapper
- How to use:
-
- The SEM_API class provides a context manager. It is highly recommended to use
- it through _with_ expression
- with SEM_API(state='remote') as sem:
- do_something(sem)
- The SEM_API class must be initialized by specifying if the EM Server is running
- remotely or locally (state = 'remote' or 'local'). This will influence the
- behavior of how the real time image interface behaves.
- The interface provides following functions:
-
- 1. Functions derived from SmartSEM API:
-
- GetValue(AP_name, style='float')
- SetValue(AP_name, value)
- GetValueMin(AP_name)
- GetValueMax(AP_name)
- GetState(DP_name, style='string')
- SetState(DP_name, value)
- Execute(CMD_name)
- GetStagePosition()
- MoveStage(coord)
- Grab(self, fname, left = 0.0, right = 1.0, top = 0.0, bottom = 1.0, overlay = False)
- 2. Interface to get real time image. The real time image will be updated in
- following properties:
- img_array
- If running remotely, it provides uint8 array of current image using the current
- pixel store. If running locally, it provides uint16 array instead.
- img_array_reduced
- Only present if running locally. It provides uint8 array of current image in
- 1024x768 pixel store.
-
- The following commands control the start, pause and terminate of continuous
- image acquisition:
- UpdateImage_Start()
- UpdateImage_Pause()
- UpdateImage_Terminate()
- The update rate can be set through the following property:
- update_rate
- possible values from 0.01 to 10
-
- 3. Often used high level function:
- wait_for_stage_idle()
- Wait till the stage arrives in destination.
- grab_full_image(fname)
- Freeze and wait till scan finished to capture a full image.
- set_scan_speed(scan_speed)
- Directly set certain scan speed.
- set_scan_mode(scan_mode)
- Directly set the noise reduction mode of scan.
- do_autofocus()
- Peform autofocus, block the thread till it finishes.
-
- 4. Event interface:
- The event interface is implemented as "bag of handler function". The handler function is
- called as:
- handler(ParameterName, Reason, ParameterID, LastKnownValue)
- The following function and attributes are used to control the event interface:
- Add_Event(func)
- Add func in the bag of handler.
- Remove_Event(func)
- Remove func from the bag of handler.
- Set_Notify(PARAM)
- Start monitoring PARAM, trigger the event when PARAM is changed.
- Unset_Notify(PARAM)
- Stop monitoring PARAM.
- event_time
- Set the time interval in second to check event trigger. Default is 0.1 S.
- Version: 0.3
- Author: Luyang Han
- ChangLog:
- v0.3
- 04.09.2019 : Add event interface
- """
- import numpy as np
- import sys
- sys.coinit_flags = 0
- from win32com import client
- import pythoncom
- import threading, time, tempfile, mmap, warnings
- try:
- from imageio import imread
- except:
- try:
- from scipy.misc import imread
- except:
- try:
- from skimage.io import imread
- except:
- print("Cannot find the module with imread. Please install one of the following package:\n imageio, skimage or scipy.")
- # following changes are necessary for event to work
- from win32com.client import makepy
- sys.argv = ["makepy", r"CZ.EmApiCtrl.1"]
- makepy.main()
- _excpetion_dict = \
- {1000: ('API_E_GET_TRANSLATE_FAIL',
- 'Failed to translate parameter into an id'),
- 1001: ('API_E_GET_AP_FAIL', 'Failed to get analogue value'),
- 1002: ('API_E_GET_DP_FAIL', 'Failed to get digital value'),
- 1003: ('API_E_GET_BAD_PARAMETER',
- 'Parameter supplied is not analogue nor digital'),
- 1004: ('API_E_SET_TRANSLATE_FAIL',
- 'Failed to translate parameter into an id'),
- 1005: ('API_E_SET_STATE_FAIL', 'Failed to set a digital state'),
- 1006: ('API_E_SET_FLOAT_FAIL', 'Failed to set a float value'),
- 1007: ('API_E_SET_FLOAT_LIMIT_LOW', 'Value supplied is too low'),
- 1008: ('API_E_SET_FLOAT_LIMIT_HIGH', 'Value supplied is too high'),
- 1009: ('API_E_SET_BAD_VALUE', 'Value supplied is is of wrong type'),
- 1010: ('API_E_SET_BAD_PARAMETER',
- 'Parameter supplied is not analogue nor digital'),
- 1011: ('API_E_EXEC_TRANSLATE_FAIL', 'Failed to translate command into an id'),
- 1012: ('API_E_EXEC_CMD_FAIL', 'Failed to execute command'),
- 1013: ('API_E_EXEC_MCF_FAIL', 'Failed to execute file macro'),
- 1014: ('API_E_EXEC_MCL_FAIL', 'Failed to execute library macro'),
- 1015: ('API_E_EXEC_BAD_COMMAND', 'Command supplied is not implemented'),
- 1016: ('API_E_GRAB_FAIL', 'Grab command failed'),
- 1017: ('API_E_GET_STAGE_FAIL', 'Get Stage position failed'),
- 1018: ('API_E_MOVE_STAGE_FAIL', 'Move Stage position failed'),
- 1019: ('API_E_NOT_INITIALISED', 'API not initialised'),
- 1020: ('API_E_NOTIFY_TRANSLATE_FAIL',
- 'Failed to translate parameter to an id'),
- 1021: ('API_E_NOTIFY_SET_FAIL', 'Set notification failed'),
- 1022: ('API_E_GET_LIMITS_FAIL', 'Get limits failed'),
- 1023: ('API_E_GET_MULTI_FAIL', 'Get multiple parameters failed'),
- 1024: ('API_E_SET_MULTI_FAIL', 'Set multiple parameters failed'),
- 1025: ('API_E_NOT_LICENSED', 'Missing API license'),
- 1026: ('API_E_NOT_IMPLEMENTED', 'Reserved or not implemented'),
- 1027: ('API_E_GET_USER_NAME_FAIL',
- 'Failed to get user name (Remoting Interface only)'),
- 1028: ('API_E_GET_USER_IDLE_FAIL',
- 'Failed to get user idle state (Remoting Interface only)'),
- 1029: ('API_E_GET_LAST_REMOTING_CONNECT_ERROR_FAIL',
- 'Failed to get the last remoting connection error string (Remoting Interface Only)'),
- 1030: ('API_E_EMSERVER_LOGON_FAILED',
- 'Failed to remotely logon to the EM Server (username and password may be incorrect or EM Server is not running or User is already logged on, Remoting only)'),
- 1031: ('API_E_EMSERVER_START_FAILED',
- 'Failed to start the EM Server. This may be because the Server is already running or has an internal error (Remoting Interface only)'),
- 1032: ('API_E_PARAMETER_IS_DISABLED',
- 'The command or parameter is currently disabled (you cannot execute or set it. Remoting Interface only)'),
- 2027: ('API_E_REMOTING_NOT_CONFIGURED',
- 'Remoting incorrectly configured, use RConfigure to correct'),
- 2028: ('API_E_REMOTING_FAILED_TO_CONNECT',
- 'Remoting did not connect to the server'),
- 2029: ('API_E_REMOTING_COULD_NOT_CREATE_INTERFACE',
- 'Remoting could not start (unknown reason)'),
- 2030: ('API_E_REMOTING_EMSERVER_NOT_RUNNING',
- 'Remoting: Remote server is not running currently'),
- 2031: ('API_E_REMOTING_NO_USER_LOGGED_IN',
- 'Remoting: Remote server has no user logged in')}
-
- # class to handle exceptions
- class API_ERROR(Exception):
- def __init__(self, error_code):
- global _excpetion_dict
- error_text = ('\n').join(_excpetion_dict[error_code])
- raise Exception(error_text)
- # periodically run function
- class SwitchThread(threading.Thread):
- """
- class to run a function repeatedly in the background.
- The delay between each run is given as delay in s.
- The thread can be paused and also ended.
- """
- def __init__(self, target=None, name=None, args=(), kwargs={}, delay = 1):
- super(SwitchThread, self).__init__(group=None, target=target, name=name, args=(), kwargs={})
- self.target = target
- self.args = args
- self.kwargs = kwargs
- self.delay = delay
- self.enable = True
- self.stop = False
- return
- def run(self):
- while not self.stop:
- if self.enable:
- self.target(*self.args, **self.kwargs)
- time.sleep(self.delay)
- def pause(self):
- self.enable = False
- def resume(self):
- self.enable = True
- def terminate(self):
- self.stop = True
- EventBase = client.getevents("CZ.EMApiCtrl.1")
- class SEM_Handle(EventBase):
- def __init__(self, sem):
- self.subscribers = []
- super().__init__(sem)
- def OnNotifyWithCurrentValue(self, lpszParameter, Reason, paramid, dLastKnownValue):
- for sub in self.subscribers:
- sub (lpszParameter, Reason, paramid, dLastKnownValue)
- # EventBase = client.getevents("CZ.EMApiCtrl.1")
- # class SEM_Handle(EventBase):
- # def OnNotifyWithCurrentValue(self, lpszParameter, Reason, paramid, dLastKnownValue):
- # print("Reason:%d. The parameter %s changed to %d." %(Reason, lpszParameter, dLastKnownValue))
- class SEM_API:
- """
- Python Wrapper to SEM API interface
-
- The class provide a context manager to do some house keeping, it is advised
- to use with statement.
-
- with SEM_API(state='remote') as sem:
- do_something(sem)
-
- """
-
- def __init__(self, state='local'):
- """
- Function to initialize the API interface.
- This function uses the long InitialiseRemote(void) command.
-
- """
- self.mic = client.Dispatch('CZ.EMApiCtrl.1')
- if not (state == "remote" or state == "local"):
- raise ValueError("state is either remote or local")
- self.__state = state
- # first initialize
- res = self.mic.InitialiseRemoting()
- if res == 0:
- # prepare the background working
- # prepare the MMF for local operation
- if self.__state == "local":
- self.__pymap = mmap.mmap(-1, 1024*768+1064,"Capture0.MMF",mmap.ACCESS_READ)
- time.sleep(0.1)
- self.__background_worker = SwitchThread(target=self.__update_image, delay = 1.0)
- self.__background_worker.start()
- self.__background_worker.pause()
- time.sleep(0.1)
- # self.subs = []
- # self.event = SEM_Handle(self.mic, self.subs)
- self.event_time = 0.1
- self.event = SEM_Handle(self.mic)
- self.__event_stop = False
- self.__event_thread = threading.Thread(target = self.__pump)
- self.__event_thread.start()
- else:
- raise API_ERROR(res)
-
- # decorator for the wrapper function
- def __error_handling(func):
- def func_wrapper(*arg, **kwargs):
- res = func(*arg, **kwargs)
- if type(res) == int:
- return_code = res
- if return_code != 0:
- raise API_ERROR(return_code)
- elif type(res) == tuple:
- return_code = res[0]
- result = res[1]
- if return_code == 0:
- return result
- else:
- raise API_ERROR(return_code)
- return func_wrapper
- # function to handle the event
-
- def __pump(self):
- pythoncom.CoInitializeEx(pythoncom.COINIT_MULTITHREADED)
- while self.__event_stop is not True:
- pythoncom.PumpWaitingMessages()
- time.sleep(self.event_time)
- pythoncom.CoUninitialize()
- def Add_Event(self, func):
- self.event.subscribers.append(func)
- def Remove_Event(self, func):
- self.event.subscribers.remove(func)
- @__error_handling
- def Set_Notify(self, PARAM):
- res = self.mic.SetNotify(PARAM, True)
- return res
- @__error_handling
- def Unset_Notify(self, PARAM):
- res = self.mic.SetNotify(PARAM, False)
- return res
- # some variables to hold the data
- __fname = tempfile.TemporaryFile(suffix='.bmp').name
-
- # function to update the image file
- def __update_image(self):
- if self.__state == 'remote':
- # when working remote, write the image to a bmp file and read back.
- self.mic.Grab(0,0,1024,768,0,self.__fname)
- self.img_array = imread(self.__fname)
- elif self.__state == 'local':
- # when working local, write the image to mmap file and read back.
- # as each time the size of the frame can change, we create a mmap
- # every time on the fly to grab the image
- pixel_density = tuple(map(int,self.GetState("DP_IMAGE_STORE").split("*")))
- self.mic.Grab(0,0,1024,768,0,"CZ.MMF")
- py_map = mmap.mmap(-1, pixel_density[0]*pixel_density[1]*2,"CZ.MMF",mmap.ACCESS_READ)
- self.img_array = np.copy(np.frombuffer(py_map, dtype="uint16").reshape((pixel_density[1],pixel_density[0])))
- py_map.close()
- # update local image
- self.img_array_reduce = np.flip(np.frombuffer(self.__pymap, dtype="uint8", offset=1064).reshape((768,1024)), axis=0)
- return
-
- @property
- def update_rate(self):
- return self.__background_worker.delay
-
- @update_rate.setter
- def update_rate(self, value):
- if value < 0.01:
- value = 0.01
- warnings.warn("Update delay cannot be smaller than 10 ms.")
- elif value > 10:
- value = 10
- warnings.warn("Update delay cannot be larger than 10 s.")
- self.__background_worker.delay = value
-
- # function for AP
- @__error_handling
- def GetValue(self, AP_name, style='float'):
- """
- function to get an analog value.
- style: float or string
- float returns a floating number, string returns a string representation with unit.
- """
- if style == 'float':
- buffer = 0.0
- res = self.mic.Get(AP_name, buffer)
- return res
- elif style == 'string':
- buffer = ''
- res = self.mic.Get(AP_name, buffer)
- return res
- else:
- raise AttributeError("style = float or string")
-
- @__error_handling
- def GetValueMin(self, AP_name):
- buffer = 0.0
- res = self.mic.GetLimits(AP_name, buffer, buffer)
- return (res[0], res[1])
-
- @__error_handling
- def GetValueMax(self, AP_name):
- buffer = 0.0
- res = self.mic.GetLimits(AP_name, buffer, buffer)
- return (res[0], res[2])
-
- @__error_handling
- def SetValue(self, AP_name, value):
- """
- function to set an analog value.
- The function can take either floating point or a formatted string.
- """
- if type(value) == str:
- pass
- else:
- value = float(value)
- res = self.mic.Set(AP_name, value)
- return res
-
- # function for DP
-
- @__error_handling
- def GetState(self, DP_name, style='string'):
- """
- function to get an digital value.
- style: float or string
- int returns a integer number, string returns a string representation.
- """
- if style == 'int':
- buffer = 0
- res = self.mic.Get(DP_name, buffer)
- return res
- elif style == 'string':
- buffer = ''
- res = self.mic.Get(DP_name, buffer)
- return res
- else:
- raise AttributeError("style = int or string")
-
- @__error_handling
- def SetState(self, DP_name, value):
- """
- function to set a digital state.
- The function can take either integer or a formatted string.
- """
- if type(value) == str:
- pass
- else:
- value = int(value)
- res = self.mic.Set(DP_name, value)
- return res
-
- # function for CMD
- @__error_handling
- def Execute(self, CMD_name):
- res = self.mic.Execute(CMD_name)
- return res
-
- # function about stage
- @__error_handling
- def GetStagePosition(self):
- buffer = 0.0
- res = self.mic.GetStagePosition(buffer, buffer, buffer, buffer, buffer, buffer)
- return (res[0], res[1:])
-
- @__error_handling
- def MoveStage(self, coord):
- if len(coord) == 6:
- res = self.mic.MoveStage(*coord)
- return res
- else:
- raise IndexError("coord must be a list or tuple of 6 floating number")
-
- # function about grab image
- @__error_handling
- def Grab(self, fname, X = 0, Y = 0, W = 1024, H = 768, overlay = False):
- """
- The function to grab the current image.
- left, right, top, button defines the desired size of the imaged, express as the
- coordinate relative to the full image.
- For example, if 10% of the edge should be excluded, the coordinate should be
- left = 0.1, right = 0.9, top = 0,1, button = 0.9
- overlay determines whether the image overlay such as datazone should be included.
- """
- #X = int(left * 1024)
- #Y = int(top * 768)
- #W = int((right-left)*1024)
- #H = int((bottom-top)*768)
- if overlay:
- res = self.mic.Grab(X,Y,W,H,-1,fname)
- else:
- res = self.mic.Grab(X,Y,W,H,0,fname)
- return res
-
- # parameter and functions dealing with the memory map file and real time imaging.
- # implementation pending
- # The real time image access only works with local client.
-
- # 2 different interface are supplied, the img_array gives the real time image with
- # current pixel density in 16 bit grey. the image_array_reduced gives 8 bit grey
- # image of image reduced to 1024x768
- # the value should be updated regularly by a seperate thread in the background.
- img_array = np.zeros((768,1024))
- img_array_reduce = np.zeros((768,1024))
- # img_array = np.zeros((768,1024)).astype('uint16')
- def UpdateImage_Start(self):
- """
- function to start continuous updating the imaging array
- """
- self.__background_worker.resume()
-
- def UpdateImage_Pause(self):
- """
- function to pause continuous updating the imaging array
- """
- self.__background_worker.pause()
-
- # some typically used higher level functions
- # implementation pending
- def move_stage_relative_xy(self, dx, dy):
- pos = self.GetStagePosition()
- self.MoveStage((pos[0] + dx, pos[1]+dy, pos[2],pos[3],pos[4],pos[5]))
-
- def wait_for_stage_idle(self):
- """
- wait for the stage movement to finish.
- """
- while not self.GetState("DP_STAGE_IS") == "Idle":
- time.sleep(0.1)
-
- def set_scan_speed(self, DP_Param):
- c_dict = {
- "0":"CMD_SCANRATE0",
- "1":"CMD_SCANRATE1",
- "2":"CMD_SCANRATE2",
- "3":"CMD_SCANRATE3",
- "4":"CMD_SCANRATE4",
- "5":"CMD_SCANRATE5",
- "6":"CMD_SCANRATE6",
- "7":"CMD_SCANRATE7",
- "8":"CMD_SCANRATE8",
- "9":"CMD_SCANRATE9",
- "10":"CMD_SCANRATE10",
- "11":"CMD_SCANRATE11",
- "12":"CMD_SCANRATE12",
- "13":"CMD_SCANRATE13",
- "14":"CMD_SCANRATE14",
- "15":"CMD_SCANRATE15"
- }
- self.Execute(c_dict[str(DP_Param)])
-
- def set_scan_mode(self, DP_Param):
- c_dict = {
- "Pixel Avg.":"CMD_PIXNR",
- "Frame Avg":"CMD_FRAME_AVERAGE",
- "Frame Int. Busy":"CMD_FRAME_INT",
- "Frame Int. Done":"CMD_FRAME_INT",
- "Line Avg":"CMD_LINE_AVG",
- "Line Int. Busy":"CMD_LINE_INT",
- "Line Int. Done":"CMD_LINE_INT",
- "Continuous Avg.":"CMD_CONTINUOUS_AVG",
- "Drift Comp. Frame Int. Busy":"CMD_DC_FRAME_INT",
- "Drift Comp. Frame Int. Done":"CMD_DC_FRAME_INT",
- "Drift Comp. Frame Avg.":"CMD_DC_FRAME_AVG"
- }
- self.Execute(c_dict[DP_Param])
-
- def do_autofocus(self):
- self.SetState("DP_AUTOFOCUS_VERSION", "Line Scan based Autofocus Version")
- time.sleep(0.5)
- self.Execute("CMD_AUTO_FOCUS_FINE")
- time.sleep(2)
- while self.GetState("DP_AUTO_FN_STATUS") == "Busy":
- time.sleep(0.1)
-
-
- def grab_full_image(self, fname, overlay = False):
- """
- grab the current full image by restarting the scan and till the complete
- frame is finished, and then save the image.
- """
- self.SetState("DP_FREEZE_ON", "End Frame")
- self.Execute("CMD_UNFREEZE_ALL")
- time.sleep(0.1)
- #self.Execute("CMD_MODE_NORMAL")
- #time.sleep(0.1)
- if self.GetState("DP_NOISE_REDUCTION") in ("Frame Avg","Line Avg",\
- "Pixel Avg.", "Continuous Avg.", "Drift Comp. Frame Avg."):
- time.sleep(0.1)
- self.Execute("CMD_FREEZE_ALL")
- while self.GetState("DP_FROZEN") == "Live":
- time.sleep(0.1)
- self.Grab(fname, overlay = overlay)
-
- # context interface
- # it is important to use this class through with statement
- # the exit function will properly clean up the wrapper.
- def __enter__(self):
- return self
-
- def __exit__(self, *arg):
- self.__background_worker.terminate()
- self.__background_worker.join()
- # stop event interface
- self.__event_stop = True
- self.__event_thread.join()
- self.event.close()
- del self.event
- # clear the MMF
- if self.__state == 'local':
- self.__pymap.close()
- self.mic.Grab(0,0,0,0,0,"CZ.MMF")
- self.mic.ClosingControl()
-
|