from .base import AndorError, AndorTimeoutError, AndorNotSupportedError from . import atmcd32d_lib from .atmcd32d_lib import wlib as lib, AndorSDK2LibError from .atmcd32d_lib import DRV_STATUS from .atmcd32d_lib import AC_ACQMODE, AC_READMODE, AC_TRIGGERMODE, AC_EMGAIN, AC_FEATURES, AC_GETFUNC, AC_SETFUNC from .atmcd32d_lib import drAC_CAMERATYPE, AC_PIXELMODE from .atmcd32d_defs import AT_VersionInfoId from ...core.utils import py3 from ...core.devio import interface from ..interface import camera from ..utils import load_lib import numpy as np import collections import functools import threading class LibraryController(load_lib.LibraryController): def _do_uninit(self): try: self.lib.ShutDown() except AndorSDK2LibError as e: if e.code!=DRV_STATUS.DRV_NOT_INITIALIZED: raise libctl=LibraryController(lib) def restart_lib(): libctl.shutdown() def get_SDK_version(): """Get version of Andor SDK2""" libctl.preinit() return py3.as_str(lib.GetVersionInfo(AT_VersionInfoId.AT_SDKVersion)) def get_cameras_number(): """Get number of connected Andor cameras""" libctl.preinit() return lib.GetAvailableCameras() TDeviceInfo=collections.namedtuple("TDeviceInfo",["controller_model","head_model","serial_number"]) TCycleTimings=collections.namedtuple("TCycleTimings",["exposure","accum_cycle_time","kinetic_cycle_time"]) TAcqProgress=collections.namedtuple("TAcqProgress",["frames_done","cycles_done"]) _camsel_lock=threading.RLock() def _camfunc(*args, **kwargs): if len(args)>0: return _camfunc(**kwargs)(args[0]) option=kwargs.get("option",[]) if not isinstance(option,list): option=[option] sel=kwargs.get("sel",True) setpar=kwargs.get("setpar",None) getpar=kwargs.get("getpar",None) defpar=kwargs.get("defpar",None) par_lookup=(getpar is not None) and not option if (setpar is not None) and (getpar is not None): raise ValueError("a method is either a setpar or a getpar") def wrapper(func): @functools.wraps(func) def wrapped(self, *args, **kwargs): if par_lookup: return self._cpar.get(getpar,defpar) with _camsel_lock: if sel: self._select_camera() for opt in option: if not self._check_option(*opt): return defpar if getpar is not None: return self._cpar.get(getpar,defpar) else: res=func(self,*args,**kwargs) if setpar is not None: self._cpar[setpar]=res return res return wrapped return wrapper class AndorSDK2Camera(camera.IBinROICamera, camera.IExposureCamera): """ Andor SDK2 camera. Due to the library features, the camera needs to set up all of the parameters to some default values upon connection. Most of these parameters are chosen as reasonable defaults: full ROI, minimal exposure time, closed shutter, internal trigger, fastest recommended verticals shift speed, no EMCCD gain. However, some should be supplied during the connection: temperature setpoint (where appropriate), fan mode, and amplifier mode; while there is still a possibility to have default values of these parameters, they might not be appropriate in some settings, and frequently need to be changed. Caution: the manufacturer DLL is designed such that if the camera is not closed on the program termination, the allocated resources are never released. If this happens, these resources are blocked until the complete OS restart. Args: idx(int): camera index (use :func:`get_cameras_number` to get the total number of connected cameras) ini_path(str): path to .ini file, if required by the camera temperature: initial temperature setpoint (in C); can also be ``None`` (select the bottom 20% of the whole range), or ``"off"`` (turn the cooler off and set the maximal of the whole range) fan_mode: initial fan mode amp_mode: initial amplifier mode (a tuple like the one returned by :meth:`get_amp_mode`); can also be ``None``, which selects the slowest, smallest gain mode """ _default_image_indexing="rcb" Error=AndorError TimeoutError=AndorTimeoutError def __init__(self, idx=0, ini_path="", temperature=None, fan_mode="off", amp_mode=None): super().__init__() self.idx=idx self.ini_path=ini_path self.handle=None self._opid=None self.capabilities=None self._minh=0 self._minv=0 self._cpar={} self._start_temperature=temperature self._start_fan_mode=fan_mode self._start_amp_mode=amp_mode self.open() self._device_var_ignore_error={"get":(AndorNotSupportedError,),"set":(AndorNotSupportedError,)} self._add_info_variable("device_info",self.get_device_info) self._add_info_variable("capabilities",self.get_capabilities,priority=-5) self._add_info_variable("amp_modes",self.get_all_amp_modes,ignore_error=AndorSDK2LibError,priority=-2) self._add_info_variable("vsspeeds",self.get_all_vsspeeds,ignore_error=AndorSDK2LibError,priority=-2) self._add_info_variable("pixel_size",self.get_pixel_size) self._add_settings_variable("temperature",self.get_temperature_setpoint,self.set_temperature) self._add_status_variable("temperature_monitor",self.get_temperature,ignore_error=AndorSDK2LibError) self._add_status_variable("temperature_status",self.get_temperature_status,ignore_error=AndorSDK2LibError) self._add_info_variable("temperature_range",self.get_temperature_range,ignore_error=AndorSDK2LibError,priority=-2) self._add_settings_variable("cooler",self.is_cooler_on,self.set_cooler,ignore_error=AndorSDK2LibError) self._add_status_variable("amp_mode",self.get_amp_mode,ignore_error=AndorSDK2LibError) self._add_settings_variable("channel",self.get_channel,lambda x:self.set_amp_mode(channel=x)) self._add_settings_variable("oamp",self.get_oamp,lambda x:self.set_amp_mode(oamp=x)) self._add_settings_variable("hsspeed",self.get_hsspeed,lambda x:self.set_amp_mode(hsspeed=x)) self._add_settings_variable("preamp",self.get_preamp,lambda x:self.set_amp_mode(preamp=x),ignore_error=AndorSDK2LibError) self._add_settings_variable("vsspeed",self.get_vsspeed,self.set_vsspeed,ignore_error=AndorSDK2LibError) self._add_settings_variable("EMCCD_gain",self.get_EMCCD_gain,self.set_EMCCD_gain,ignore_error=AndorSDK2LibError) self._add_settings_variable("shutter",self.get_shutter_parameters,self.setup_shutter,ignore_error=AndorSDK2LibError) self._add_settings_variable("fan_mode",self.get_fan_mode,self.set_fan_mode,ignore_error=AndorSDK2LibError) self._add_settings_variable("trigger_mode",self.get_trigger_mode,self.set_trigger_mode) self._add_settings_variable("acq_parameters/accum",self.get_accum_mode_parameters,self.setup_accum_mode) self._add_settings_variable("acq_parameters/kinetic",self.get_kinetic_mode_parameters,self.setup_kinetic_mode) self._add_settings_variable("acq_parameters/fast_kinetic",self.get_fast_kinetic_mode_parameters,self.setup_fast_kinetic_mode) self._add_settings_variable("acq_parameters/cont",self.get_cont_mode_parameters,self.setup_cont_mode) self._add_settings_variable("acq_mode",self.get_acquisition_mode,self.set_acquisition_mode) self._add_status_variable("acq_status",self.get_status) self._add_settings_variable("frame_transfer",self.is_frame_transfer_enabled,self.enable_frame_transfer_mode,ignore_error=AndorSDK2LibError) self._add_status_variable("cycle_timings",self.get_cycle_timings) self._add_status_variable("readout_time",self.get_readout_time) self._add_settings_variable("read_parameters/single_track",self.get_single_track_mode_parameters,self.setup_single_track_mode) self._add_settings_variable("read_parameters/multi_track",self.get_multi_track_mode_parameters,self.setup_multi_track_mode) self._add_settings_variable("read_parameters/random_track",self.get_random_track_mode_parameters,self.setup_random_track_mode) self._add_settings_variable("read_parameters/image",self.get_image_mode_parameters,self.setup_image_mode) self._add_settings_variable("read_mode",self.get_read_mode,self.set_read_mode) self._update_device_variable_order("exposure") self._add_settings_variable("frame_period",self.get_frame_period,self.set_frame_period) def _initial_setup_temperature(self): if self._start_temperature=="off": trng=self.get_temperature_range() self.set_temperature(trng[1] if trng else 0,enable_cooler=False) if self._start_temperature is None: trng=self.get_temperature_range() if trng: self._start_temperature=trng[0]+int((trng[1]-trng[0])*0.2) else: self._start_temperature=0 self.set_temperature(self._start_temperature,enable_cooler=True) def _initial_setup_ext_trigger(self): try: lrng=self.get_trigger_level_limits() level=(lrng[0]+lrng[1])/2 except AndorNotSupportedError: level=None self.setup_ext_trigger(level,False,True) def _setup_default_settings(self): self.capabilities=self._get_capabilities_n() self.device_info=self.get_device_info() try: self._strict_option_check=False self._initial_setup_temperature() self.init_amp_mode(mode=self._start_amp_mode) self.set_fan_mode(self._start_fan_mode) self.setup_shutter("closed") self._initial_setup_ext_trigger() self.set_trigger_mode("int") self.set_exposure(0) self.set_acquisition_mode("cont") self.setup_accum_mode(1) self.setup_kinetic_mode(1) self.setup_fast_kinetic_mode(1) self.setup_cont_mode() self.enable_frame_transfer_mode(False) self.setup_single_track_mode() for (height,offset) in [(1,0),(2,0),(1,1),(2,1),(self.get_detector_size()[1],0),(self.get_detector_size()[1],1)]: try: self.setup_multi_track_mode(height=height,offset=offset) break except AndorSDK2LibError: pass self.setup_random_track_mode() self._minh=self._find_min_roi_end("h") self._minv=self._find_min_roi_end("v") self.setup_image_mode() self.setup_acquisition("single") # flush the buffers self.setup_acquisition("cont") self.clear_acquisition() self._buffer_size=0 finally: self._strict_option_check=True def _select_camera(self): if self.handle is None: raise AndorError("camera is not opened") if lib.GetCurrentCamera()!=self.handle: lib.SetCurrentCamera(self.handle) def _has_option(self, kind, option): if kind=="acq": opt=self.capabilities.ulAcqModes elif kind=="read": opt=self.capabilities.ulReadModes elif kind=="trig": opt=self.capabilities.ulTriggerModes elif kind=="set": opt=self.capabilities.ulSetFunctions elif kind=="get": opt=self.capabilities.ulGetFunctions elif kind=="feat": opt=self.capabilities.ulFeatures else: raise AndorError("unknown option kind: {}".format(kind)) return bool(option&opt) def _check_option(self, kind, option): has_option=self._has_option(kind,option) if (not has_option) and self._strict_option_check: raise AndorNotSupportedError("option {}.{} is not supported by {}".format(kind,getattr(option,"name",option),self.device_info.head_model)) return has_option def _get_connection_parameters(self): return self.idx,self.ini_path def open(self): """Open connection to the camera""" if self.handle is None: ncams=get_cameras_number() if self.idx>=ncams: raise AndorError("camera index {} is not available ({} cameras exist)".format(self.idx,ncams)) self.handle=lib.GetCameraHandle(self.idx) with self._close_on_error(): with _camsel_lock: self._select_camera() lib.Initialize(py3.as_builtin_bytes(self.ini_path)) self._opid=libctl.open().opid self._setup_default_settings() def close(self): """Close connection to the camera""" if self.handle is not None: try: if self._opid is not None and self.capabilities is not None: self.clear_acquisition() try: self._select_camera() except AndorError: pass finally: self.handle=None libctl.close(self._opid) self._opid=None def is_opened(self): """Check if the device is connected""" return self.handle is not None @_camfunc def get_device_info(self): """ Get camera device info. Return tuple ``(controller_mode, head_model, serial_number)``. """ control_model=py3.as_str(lib.GetControllerCardModel()) head_model=py3.as_str(lib.GetHeadModel()) serial_number=lib.GetCameraSerialNumber() return TDeviceInfo(control_model,head_model,serial_number) ### Generic controls ### _p_status=interface.EnumParameterClass("status", {"idle":DRV_STATUS.DRV_IDLE,"acquiring":DRV_STATUS.DRV_ACQUIRING,"temp_cycle":DRV_STATUS.DRV_TEMPCYCLE}) @_camfunc(option=("feat",AC_FEATURES.AC_FEATURES_POLLING)) @interface.use_parameters(_returns="status") def get_status(self): """ Get camera status. Return either ``"idle"`` (no acquisition), ``"acquiring"`` (acquisition in progress) or ``"temp_cycle"`` (temperature cycle in progress). """ status=lib.GetStatus() if status not in {DRV_STATUS.DRV_IDLE,DRV_STATUS.DRV_ACQUIRING,DRV_STATUS.DRV_TEMPCYCLE}: raise AndorSDK2LibError("GetStatus",status) return status def acquisition_in_progress(self): return self.get_status()=="acquiring" def _int_to_enumlst(self, value, enums): lst=[] for k in enums: if value&k: lst.append(enums(k).name) return lst def _parse_capabilities(self, caps): cap_dict={} cap_dict["acq_mode"]=self._int_to_enumlst(caps.ulAcqModes,AC_ACQMODE) cap_dict["read_mode"]=self._int_to_enumlst(caps.ulReadModes,AC_READMODE) cap_dict["ft_read_mode"]=self._int_to_enumlst(caps.ulFTReadModes,AC_READMODE) cap_dict["trig_mode"]=self._int_to_enumlst(caps.ulTriggerModes,AC_TRIGGERMODE) cap_dict["set_func"]=self._int_to_enumlst(caps.ulSetFunctions,AC_SETFUNC) cap_dict["get_func"]=self._int_to_enumlst(caps.ulGetFunctions,AC_GETFUNC) cap_dict["features"]=self._int_to_enumlst(caps.ulFeatures,AC_FEATURES) cap_dict["EM_gain"]=self._int_to_enumlst(caps.ulEMGainCapability,AC_EMGAIN) cap_dict["pci_speed"]=int(caps.ulPCICard) cap_dict["pix_mode"]=self._int_to_enumlst(caps.ulPixelMode&0xFFFF,AC_PIXELMODE),AC_PIXELMODE(caps.ulPixelMode&0xFFFF0000).name cap_dict["cam_type"]=drAC_CAMERATYPE.get(caps.ulCameraType,"UNKNOWN") return cap_dict @_camfunc def _get_capabilities_n(self): """Get camera capabilities as a plain ``AndorCapabilities`` structure""" return lib.GetCapabilities() def get_capabilities(self): """ Get camera capabilities. For description of the structure, see Andor SDK manual. """ return self._parse_capabilities(self._get_capabilities_n()) @_camfunc def get_pixel_size(self): """Get camera pixel size (in m)""" return tuple([s*1E-6 for s in lib.GetPixelSize()]) ### Cooler controls ### @_camfunc(option=("get",AC_SETFUNC.AC_SETFUNCTION_TEMPERATURE)) def is_cooler_on(self): """Check if the cooler is on""" return bool(lib.IsCoolerOn()) @_camfunc(option=("set",AC_GETFUNC.AC_GETFUNCTION_TEMPERATURE)) def set_cooler(self, on=True): """Set the cooler on or off""" if on: lib.CoolerON() else: lib.CoolerOFF() return self.is_cooler_on() _p_temp_status=interface.EnumParameterClass("temp_status",{ "off":DRV_STATUS.DRV_TEMPERATURE_OFF, "not_reached":DRV_STATUS.DRV_TEMPERATURE_NOT_REACHED, "not_stabilized":DRV_STATUS.DRV_TEMPERATURE_NOT_STABILIZED, "drifted":DRV_STATUS.DRV_TEMPERATURE_DRIFT, "stabilized":DRV_STATUS.DRV_TEMPERATURE_STABILIZED }) @_camfunc(option=("get",AC_GETFUNC.AC_GETFUNCTION_TEMPERATURE)) @interface.use_parameters(_returns="temp_status") def get_temperature_status(self): """ Get temperature status. Can return ``"off"`` (cooler off), ``"not_reached"`` (cooling in progress), ``"not_stabilized"`` (reached but not stabilized yet), ``"stabilized"`` (completely stabilized) or ``"drifted"``. """ return lib.GetTemperatureF()[0] @_camfunc(option=("get",AC_GETFUNC.AC_GETFUNCTION_TEMPERATURE)) def get_temperature(self): """Get the current camera temperature (in C)""" return lib.GetTemperatureF()[1] @_camfunc(setpar="temperature_setpoint",option=[("get",AC_GETFUNC.AC_GETFUNCTION_TEMPERATURERANGE),("set",AC_SETFUNC.AC_SETFUNCTION_TEMPERATURE)]) def set_temperature(self, temperature, enable_cooler=True): """ Change the temperature setpoint (in C). If ``enable_cooler==True``, turn the cooler on automatically if present. """ rng=lib.GetTemperatureRange() temperature=max(temperature,rng[0]) temperature=min(temperature,rng[1]) temperature=int(temperature) lib.SetTemperature(temperature) if enable_cooler: try: self.set_cooler(True) except AndorSDK2LibError: pass return temperature @_camfunc(getpar="temperature_setpoint",option=[("set",AC_SETFUNC.AC_SETFUNCTION_TEMPERATURE)]) def get_temperature_setpoint(self): """Get the temperature setpoint (in C)""" @_camfunc def get_temperature_range(self): """Return the available range of temperatures (in C)""" return lib.GetTemperatureRange() ### Amplifiers/shift speeds controls ### @_camfunc def get_all_amp_modes(self): """ Get all available preamp modes. Each preamp mode is characterized by an AD channel index, amplifier index, channel speed (horizontal scan speed) index and preamp gain index. Return list of tuples ``(channel, channel_bitdepth, oamp, oamp_kind, hsspeed, hsspeed_MHz, preamp, preamp_gain)``, where ``channel``, ``oamp``, ``hsspeed`` and ``preamp`` are indices, while ``channel_bitdepth``, ``oamp_kind``, ``hsspeed_MHz`` and ``preamp_gain`` are descriptions. """ return lib.get_all_amp_modes() @_camfunc def get_max_vsspeed(self): """Get maximal recommended vertical scan speed""" return lib.GetFastestRecommendedVSSpeed()[0] @_camfunc def get_all_vsspeeds(self): """ Get all available vertical shift speeds modes. Return list of the vertical shift periods in microseconds for the corresponding indices (starting from 0). """ try: nspeeds=lib.GetNumberVSSpeeds() return [lib.GetVSSpeed(i) for i in range(nspeeds)] except AndorSDK2LibError: return [] def _truncate_amp_mode(self, amp_mode): all_amp_modes=self.get_all_amp_modes() all_amp_modes=[(mode.channel,mode.oamp,mode.hsspeed,mode.preamp) for mode in all_amp_modes] for i,v in enumerate(amp_mode): if v is None: continue avs={mode[i] for mode in all_amp_modes} if v not in avs: lvs=[vv for vv in avs if vv0.1: timeout=0.1 try: try: _camsel_lock.release() lib.WaitForAcquisitionByHandleTimeOut(self.handle,int(timeout*1E3)) finally: _camsel_lock.acquire() except AndorSDK2LibError as e: if e.code!=DRV_STATUS.DRV_NO_NEW_DATA: raise @_camfunc def _read_frames(self, rng, return_info=False): """ Read and return frames given the range. The range is always a tuple with at least a single frame in it, and is guaranteed to already be valid. Always return tuple ``(frames, infos)``; if ``return_info==False``, ``infos`` value is ignored, so it can be anything (e.g., ``None``). """ dim=self._get_data_dimensions_rc() dt=np.dtype(self._default_image_dtype) get_method=lib.GetImages16 if dt.itemsize<=2 else lib.GetImages data,_,_=get_method(rng[0]+1,rng[1],dim[0]*dim[1]*(rng[1]-rng[0])) data=self._convert_indexing(data.reshape((-1,dim[0],dim[1])),"rcb",axes=(1,2)) return list(data),None def _get_grab_acquisition_parameters(self, nframes, buff_size): return {"mode":"cont"}