This commit is contained in:
2025-12-04 17:52:40 +08:00
parent 29217cea2e
commit bc9925903f

View File

@@ -38,6 +38,7 @@ from typing import Optional, Any, List
# Third Party Imports
import numpy as np
from pylablib.devices.Andor import AndorSDK3Camera, get_cameras_number_SDK3
# Local Imports
from navigate.model.devices.camera.base import CameraBase
@@ -67,7 +68,7 @@ class AndorCamera(CameraBase):
microscope_name : str
Name of microscope in configuration
device_connection : Any
Hardware device to connect to
Hardware device to connect to (camera index)
configuration : Dict[str, Any]
Global configuration of the microscope
"""
@@ -76,12 +77,24 @@ class AndorCamera(CameraBase):
#: str: Name of the microscope
self.microscope_name = microscope_name
#: Any: Device connection
self.device_connection = device_connection
#: int: Camera index
self.camera_index = device_connection if isinstance(device_connection, int) else 0
#: Dict[str, Any]: Configuration settings
self.configuration = configuration
#: AndorSDK3Camera: Actual camera object
self.camera = AndorSDK3Camera(idx=self.camera_index)
self.camera.open()
# Get device info
device_info = self.camera.get_device_info()
logger.info(f"Connected to Andor camera: {device_info.camera_model}")
logger.info(f"Serial number: {device_info.serial_number}")
#: str: serial number
self.serial_number = device_info.serial_number
#: bool: Whether the camera is currently acquiring
self.is_acquiring = False
@@ -97,17 +110,10 @@ class AndorCamera(CameraBase):
#: int: previous image id
self.pre_frame_idx = None
#: int: serial number
self.serial_number = "andor_test"
#: float: exposure time
self.camera_exposure_time = 0.2
#: int: x binning
self.x_binning = 1
#: int: y binning
self.y_binning = 1
# Get detector size
detector_size = self.camera.get_detector_size()
self.camera_parameters["x_pixels"] = detector_size[0]
self.camera_parameters["y_pixels"] = detector_size[1]
#: int: width
self.x_pixels = self.camera_parameters["x_pixels"]
@@ -121,7 +127,18 @@ class AndorCamera(CameraBase):
#: int: center y
self.center_y = self.y_pixels // 2
self.camera_parameters["supported_trigger_sources"] = ["External", "Internal"]
#: float: exposure time
self.camera_exposure_time = self.camera.get_exposure()
#: int: x binning
self.x_binning = 1
#: int: y binning
self.y_binning = 1
self.camera_parameters["supported_trigger_sources"] = ["External", "Internal", "Software"]
logger.info(f"Andor camera initialized: {self.x_pixels}x{self.y_pixels}")
def __str__(self) -> str:
"""String representation of AndorCamera class.
@@ -135,15 +152,32 @@ class AndorCamera(CameraBase):
def __del__(self) -> None:
"""Delete AndorCamera class."""
pass
try:
if hasattr(self, 'camera') and self.camera is not None:
self.close_camera()
except Exception as e:
logger.error(f"Error during camera deletion: {e}")
def report_settings(self) -> None:
"""Print Camera Settings."""
pass
logger.info("=== Andor Camera Settings ===")
logger.info(f"Serial Number: {self.serial_number}")
logger.info(f"Sensor Size: {self.x_pixels}x{self.y_pixels}")
logger.info(f"Exposure Time: {self.camera_exposure_time}s")
logger.info(f"Binning: {self.x_binning}x{self.y_binning}")
logger.info(f"ROI Center: ({self.center_x}, {self.center_y})")
logger.info(f"Trigger Mode: {self.camera.get_trigger_mode()}")
def close_camera(self) -> None:
"""Close AndorCamera Camera"""
pass
try:
if self.is_acquiring:
self.close_image_series()
if hasattr(self, 'camera') and self.camera is not None:
self.camera.close()
logger.info("Andor camera closed")
except Exception as e:
logger.error(f"Error closing camera: {e}")
def set_sensor_mode(self, mode: str) -> None:
"""Set AndorCamera sensor mode.
@@ -153,7 +187,9 @@ class AndorCamera(CameraBase):
mode : str
'Normal' or 'Light-Sheet'
"""
pass
logger.info(f"Sensor mode set to: {mode}")
# Andor SDK3 cameras typically don't have explicit sensor modes
# This can be implemented based on specific camera features if needed
def set_exposure_time(self, exposure_time: float) -> None:
"""Set AndorCamera exposure time.
@@ -165,7 +201,13 @@ class AndorCamera(CameraBase):
exposure_time : float
Exposure time in seconds.
"""
self.camera_exposure_time = exposure_time
try:
self.camera.set_exposure(exposure_time)
self.camera_exposure_time = self.camera.get_exposure()
logger.debug(f"Exposure time set to: {self.camera_exposure_time}s")
except Exception as e:
logger.error(f"Error setting exposure time: {e}")
self.camera_exposure_time = exposure_time
def set_line_interval(self, line_interval_time: float) -> bool:
"""Set AndorCamera line interval.
@@ -197,14 +239,35 @@ class AndorCamera(CameraBase):
}
if binning_string not in binning_dict.keys():
logger.debug(f"can't set binning to {binning_string}")
print(f"can't set binning to {binning_string}")
return False
self.x_binning = int(binning_string[0])
self.y_binning = int(binning_string[2])
self.x_pixels = int(self.x_pixels / self.x_binning)
self.y_pixels = int(self.y_pixels / self.y_binning)
return True
try:
binning = binning_dict[binning_string]
# Get current ROI
current_roi = self.camera.get_roi()
# Set new ROI with binning
self.camera.set_roi(
hstart=current_roi[0],
hend=current_roi[1],
vstart=current_roi[2],
vend=current_roi[3],
hbin=binning,
vbin=binning
)
self.x_binning = binning
self.y_binning = binning
# Update pixel counts based on actual ROI
new_roi = self.camera.get_roi()
self.x_pixels = (new_roi[1] - new_roi[0]) // new_roi[4]
self.y_pixels = (new_roi[3] - new_roi[2]) // new_roi[5]
logger.info(f"Binning set to {binning_string}, new size: {self.x_pixels}x{self.y_pixels}")
return True
except Exception as e:
logger.error(f"Error setting binning: {e}")
return False
def initialize_image_series(
self,
@@ -220,89 +283,90 @@ class AndorCamera(CameraBase):
number_of_frames : int
Number of frames. Default is 100.
"""
self.data_buffer = data_buffer
self.num_of_frame = number_of_frames
self.current_frame_idx = 0
self.pre_frame_idx = 0
self.is_acquiring = True
try:
self.data_buffer = data_buffer
self.num_of_frame = number_of_frames
self.current_frame_idx = 0
self.pre_frame_idx = 0
# Setup and start acquisition
self.camera.setup_acquisition(mode="sequence", nframes=number_of_frames)
self.camera.start_acquisition()
self.is_acquiring = True
logger.info(f"Image series initialized: {number_of_frames} frames")
except Exception as e:
logger.error(f"Error initializing image series: {e}")
self.is_acquiring = False
raise
def close_image_series(self) -> None:
"""Close image series.
Stops the acquisition and sets is_acquiring flag to False.
"""
self.pre_frame_idx = 0
self.current_frame_idx = 0
self.is_acquiring = False
def generate_new_frame(self) -> None:
"""Generate an image with letter 'A' in the center."""
if not self.is_acquiring:
return
# Create a blank image with background level
image = np.full((self.y_pixels, self.x_pixels), 100, dtype=np.uint16)
# Define the letter 'A' pattern
# Center coordinates
cy, cx = self.y_pixels // 2, self.x_pixels // 2
# Size of the letter A (adjust as needed)
letter_height = min(200, self.y_pixels // 4)
letter_width = min(150, self.x_pixels // 4)
thickness = max(10, min(letter_height // 10, 20))
# Draw letter 'A'
# Left diagonal line
for i in range(letter_height):
y = cy - letter_height // 2 + i
x_offset = int((letter_height - i) * letter_width / (2 * letter_height))
x = cx - x_offset
if 0 <= y < self.y_pixels and 0 <= x < self.x_pixels:
image[y, max(0, x-thickness//2):min(self.x_pixels, x+thickness//2)] = 3000
# Right diagonal line
for i in range(letter_height):
y = cy - letter_height // 2 + i
x_offset = int((letter_height - i) * letter_width / (2 * letter_height))
x = cx + x_offset
if 0 <= y < self.y_pixels and 0 <= x < self.x_pixels:
image[y, max(0, x-thickness//2):min(self.x_pixels, x+thickness//2)] = 3000
# Horizontal bar in the middle
bar_y = cy + letter_height // 8
bar_x_start = cx - letter_width // 4
bar_x_end = cx + letter_width // 4
if 0 <= bar_y < self.y_pixels:
image[max(0, bar_y-thickness//2):min(self.y_pixels, bar_y+thickness//2),
max(0, bar_x_start):min(self.x_pixels, bar_x_end)] = 3000
# Copy to buffer
ctypes.memmove(
self.data_buffer[self.current_frame_idx].ctypes.data,
image.ctypes.data,
self.x_pixels * self.y_pixels * 2,
)
self.current_frame_idx = (self.current_frame_idx + 1) % self.num_of_frame
try:
if self.is_acquiring:
self.camera.stop_acquisition()
self.pre_frame_idx = 0
self.current_frame_idx = 0
self.is_acquiring = False
logger.info("Image series closed")
except Exception as e:
logger.error(f"Error closing image series: {e}")
def get_new_frame(self) -> List[int]:
"""Get frame from AndorCamera camera."""
"""Get frame from AndorCamera camera.
time.sleep(self.camera_exposure_time)
timeout = 500
while self.pre_frame_idx == self.current_frame_idx and timeout:
time.sleep(0.001)
timeout -= 1
if timeout <= 0:
Returns
-------
List[int]
List of frame indices that have been acquired.
"""
if not self.is_acquiring:
return []
try:
# Read available frames from camera
frames_data = self.camera.read_multiple_images()
if frames_data and len(frames_data) > 0:
# Copy frames to buffer
for frame in frames_data:
if self.data_buffer is not None:
# Ensure frame is correct size and type
if frame.shape != (self.y_pixels, self.x_pixels):
logger.warning(f"Frame size mismatch: {frame.shape} vs ({self.y_pixels}, {self.x_pixels})")
continue
# Convert to uint16 if needed
if frame.dtype != np.uint16:
frame = frame.astype(np.uint16)
# Copy to buffer
ctypes.memmove(
self.data_buffer[self.current_frame_idx].ctypes.data,
frame.ctypes.data,
self.x_pixels * self.y_pixels * 2,
)
self.current_frame_idx = (self.current_frame_idx + 1) % self.num_of_frame
# Return list of frame indices
if self.pre_frame_idx < self.current_frame_idx:
frame_indices = list(range(self.pre_frame_idx, self.current_frame_idx))
else:
frame_indices = list(range(self.pre_frame_idx, self.num_of_frame))
frame_indices += list(range(0, self.current_frame_idx))
self.pre_frame_idx = self.current_frame_idx
return frame_indices
else:
return []
except Exception as e:
logger.error(f"Error getting new frame: {e}")
return []
if self.pre_frame_idx < self.current_frame_idx:
frames = list(range(self.pre_frame_idx, self.current_frame_idx))
else:
frames = list(range(self.pre_frame_idx, self.num_of_frame))
frames += list(range(0, self.current_frame_idx))
self.pre_frame_idx = self.current_frame_idx
return frames
def set_ROI(
self,
@@ -329,11 +393,43 @@ class AndorCamera(CameraBase):
bool
True if successful, False otherwise.
"""
self.x_pixels = roi_width
self.y_pixels = roi_height
self.center_x = center_x
self.center_y = center_y
return True
try:
# Calculate ROI boundaries
hstart = center_x - roi_width // 2
hend = center_x + roi_width // 2
vstart = center_y - roi_height // 2
vend = center_y + roi_height // 2
# Ensure boundaries are within detector limits
detector_size = self.camera.get_detector_size()
hstart = max(0, hstart)
vstart = max(0, vstart)
hend = min(detector_size[0], hend)
vend = min(detector_size[1], vend)
# Set ROI with current binning
self.camera.set_roi(
hstart=hstart,
hend=hend,
vstart=vstart,
vend=vend,
hbin=self.x_binning,
vbin=self.y_binning
)
# Update parameters
actual_roi = self.camera.get_roi()
self.x_pixels = (actual_roi[1] - actual_roi[0]) // actual_roi[4]
self.y_pixels = (actual_roi[3] - actual_roi[2]) // actual_roi[5]
self.center_x = (actual_roi[0] + actual_roi[1]) // 2
self.center_y = (actual_roi[2] + actual_roi[3]) // 2
logger.info(f"ROI set to: {self.x_pixels}x{self.y_pixels} at ({self.center_x}, {self.center_y})")
return True
except Exception as e:
logger.error(f"Error setting ROI: {e}")
return False
@staticmethod
def calculate_readout_time() -> float:
@@ -355,9 +451,26 @@ class AndorCamera(CameraBase):
Parameters
----------
trigger_source : str
Trigger source, either 'External' or 'Internal'.
Trigger source, either 'External', 'Internal', or 'Software'.
"""
logger.debug(f"Set camera trigger mode: {trigger_source}")
try:
# Map trigger source names to Andor SDK3 trigger modes
trigger_map = {
"External": "ext",
"Internal": "int",
"Software": "software"
}
if trigger_source in trigger_map:
andor_trigger = trigger_map[trigger_source]
self.camera.set_trigger_mode(andor_trigger)
logger.info(f"Trigger mode set to: {trigger_source}")
else:
logger.warning(f"Unknown trigger source: {trigger_source}, using Internal")
self.camera.set_trigger_mode("int")
except Exception as e:
logger.error(f"Error setting trigger mode: {e}")
def calculate_light_sheet_exposure_time(
self, full_chip_exposure_time: float, shutter_width: float