Skip to content
Snippets Groups Projects
Commit f9bc5480 authored by Christoph Schmidt's avatar Christoph Schmidt
Browse files

New again works with coupled CaptDevice

parent 2fd40d1d
No related branches found
No related tags found
No related merge requests found
Showing
with 746 additions and 224 deletions
# - Configuration file stored 2023-11-24 11:10:33.720518 -
# - Configuration file stored 2023-11-27 14:41:07.592507 -
LaserConfig: #!!python/object:controller.LaserConfig
wl_sweep_start: 840 # wl_sweep_start: None
wl_sweep_stop: 850 # wl_sweep_stop: None
......
import logging
import sys
from rich.logging import RichHandler
from PySide6.QtWidgets import QApplication
sys.path.append('./src')
import LaserControl as Laser
if __name__ == "__main__":
FORMAT = "%(message)s"
logging.basicConfig(
level="DEBUG", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True)
]
)
if not QApplication.instance():
app = QApplication(sys.argv)
......
import logging
import sys
from multiprocessing import Value
from rich.logging import RichHandler
from PySide6.QtWidgets import QApplication
import CaptDeviceControl as captdev
sys.path.append('./src')
import LaserControl as Laser
if __name__ == "__main__":
#logging.basicConfig(level=logging.INFO,
# format='%(asctime)s [%(levelname)s] (%(threadName)-10s) %(message)s', )
if not QApplication.instance():
app = QApplication(sys.argv)
else:
app = QApplication.instance()
conf_capt_dev = captdev.Config()
#conf.load("config.yaml")
start_capture_flag = Value('i', 0)
capt_dev_model = captdev.Model(conf_capt_dev)
capt_dev_controller = captdev.Controller(capt_dev_model, start_capture_flag)
capt_dev_window = captdev.View(capt_dev_model, capt_dev_controller)
capt_dev_window.show()
conf = Laser.Config()
model = Laser.Model(conf)
controller = Laser.Controller(model, start_capture_flag)
window = Laser.View(model, controller)
controller.connect_capture_device(capt_dev_controller)
window.show()
sys.exit(app.exec())
\ No newline at end of file
......@@ -5,6 +5,12 @@ PySide6~=6.5.2
scipy~=1.10.1
rich
# add the git repo
git+https://gitlab.tugraz.at/flexsensor-public/confighandler.git@develop
#git+https://gitlab.tugraz.at/flexsensor-public/confighandler.git@develop
../confighandler
git+https://gitlab.tugraz.at/flexsensor-public/fswidgets.git@develop
#git+https://gitlab.tugraz.at/flexsensor-public/captdevicecontrol.git@develop
../captdevicecontrol
../../pyside-multiprocessing
......@@ -5,8 +5,8 @@ class LaserConfig(cfg.ConfigNode):
def __init__(self) -> None:
super().__init__()
self.wl_sweep_start = cfg.Field(840)
self.wl_sweep_stop = cfg.Field(850)
self.wl_sweep_start = cfg.Field(857)
self.wl_sweep_stop = cfg.Field(870)
self.velocity = cfg.Field(2.0)
self.acceleration = cfg.Field(1.0)
self.deceleration = cfg.Field(1.0)
......
import logging
import os
import sys
from rich.logging import RichHandler
# check if a handler is already set
#if not logging.root.handlers:
# FORMAT = "%(name)s %(message)s"
# logging.basicConfig(
# level="DEBUG", format=FORMAT, datefmt="[%X]", handlers=[
# RichHandler(rich_tracebacks=True)
# ]
# )
sys.path.append(os.path.join(os.path.dirname(__file__), '../'))
......
......@@ -6,6 +6,7 @@ import tempfile
from contextlib import contextmanager
from PySide6.QtWidgets import QFileDialog, QApplication
from rich.logging import RichHandler
from LaserControl.controller.ErrorConverter import ErrorConverter
......@@ -63,7 +64,12 @@ def stdout_redirector(stream):
class LaserCon(object):
def __init__(self, port=0):
self.logger = logging.getLogger(self.__class__.__name__)
self.handler = RichHandler(rich_tracebacks=True)
self.logger = logging.getLogger(f"AD2Controller({os.getpid()})")
self.logger.handlers = [self.handler]
self.logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)s %(message)s')
self.handler.setFormatter(formatter)
#self.logger.addHandler(logging.StreamHandler())
#self.logger.setLevel(logging.DEBUG)
self.laser_con = LaserLib()
......
......@@ -4,70 +4,131 @@ from multiprocessing import Value, Lock, Process
from PySide6.QtCore import QThreadPool
from LaserControl.controller.multiprocess.MPLaserDeviceControl import MPLaserDeviceControl
from LaserControl.model.LaserControlModel import LaserControlModel
from LaserControl.controller.LaserCon import LaserCon
from LaserControl.controller.multiprocess.LaserStateArray import LaserStateArray
from LaserControl.controller.multiprocess.move_to_wavelength import move_to_wavelength
from LaserControl.controller.multiprocess.wavelength_sweep import wavelength_sweep
import CaptDeviceControl as captdev
class LaserControlController:
def __init__(self, model: LaserControlModel):
def __init__(self, model: LaserControlModel, start_capture_flag: Value):
self.logger = logging.Logger("Laser Driver (Generic)")
self.model = model
# Multiprocess variables
self.proc: Process = None
#self.proc: Process = None
self.lock = Lock()
self._laser_port = Value('i', 0, lock=self.lock)
self._laser_connected_flag = Value('i', False, lock=self.lock)
#self._laser_port = Value('i', 0, lock=self.lock)
#self._laser_connected_flag = Value('i', False, lock=self.lock)
self._laser_moving_flag = Value('i', False, lock=self.lock)
self._laser_finished_flag = Value('i', False, lock=self.lock)
self._current_wavelength = Value('f', 0.0, lock=self.lock)
self._laser_state = Value(LaserStateArray, (False, False, False, 0, 0, 0), lock=self.lock)
#if self.model.capturing_device is None or not self.model.capturing_device_connected:
self._start_capture_flag = start_capture_flag
#else:
# self._start_capture_flag = self.model.capturing_device.start_capture_flag
#self._current_wavelength = Value('f', 0.0, lock=self.lock)
#self._laser_state = Value(LaserStateArray, (False, False, False, 0, 0, 0), lock=self.lock)
# For the sweep
self.laser_at_start_position_flag = Value('i', False, lock=self.lock)
self.laser_at_end_position_flag = Value('i', False, lock=self.lock)
#self.laser_at_start_position_flag = Value('i', False, lock=self.lock)
#self.laser_at_end_position_flag = Value('i', False, lock=self.lock)
# Threads for acquiring data from the process
self.thread_manager = QThreadPool()
self.kill_thread = False
self.thread_manager.start(self._monitor_laser_state)
#self.thread_manager = QThreadPool()
self.mp_laser_controller = MPLaserDeviceControl(None,
self._laser_moving_flag,
self._laser_finished_flag,
self._start_capture_flag,
enable_logging=False)
def connect_device(self):
self.logger.info("Connecting to laser...")
self.mp_laser_controller.get_connected_finished.connect(
lambda x: type(self.model).connected.fset(self.model, bool(x)))
self.mp_laser_controller.get_current_wavelength_finished.connect(
lambda x: type(self.model).current_wavelength.fset(self.model, x))
def _monitor_laser_state(self):
while not self.kill_thread:
self.model.connected = bool(self._laser_connected_flag.value)
self.model.laser_is_moving = bool(self._laser_moving_flag.value)
self.model.laser_at_position = bool(self._laser_finished_flag.value)
self.model.current_wavelength = float(self._current_wavelength.value)
#print(self._laser_state.connected)
#print("Monitor")
time.sleep(0.1)
self.logger.info("Monitor Laser State Thread Ended")
# Reset the flag
self._laser_finished_flag.value = False
self.mp_laser_controller.get_min_wavelength_finished.connect(
lambda x: type(self.model).min_laser_wavelength.fset(self.model, x))
self.mp_laser_controller.get_max_wavelength_finished.connect(
lambda x: type(self.model).max_laser_wavelength.fset(self.model, x))
def read_laser_settings(self):
self.logger.info("Reading laser settings...")
with LaserCon() as con:
self.model.connected = con.connected
self.model.min_laser_wavelength = con.min_wavelength
self.model.max_laser_wavelength = con.max_wavelength
self.model.current_wavelength = con.current_wavelength
self.model.velocity = con.velocity
self.model.acceleration = con.acceleration
self.mp_laser_controller.get_velocity_finished.connect(
lambda x: type(self.model).velocity.fset(self.model, x))
self.mp_laser_controller.get_acceleration_finished.connect(
lambda x: type(self.model).acceleration.fset(self.model, x))
self.model.connected = False
self.mp_laser_controller.get_deceleration_finished.connect(
lambda x: type(self.model).deceleration.fset(self.model, x))
self.mp_laser_controller.move_to_wavelength_finished.connect(self._move_to_wavelength_finished)
self.mp_laser_controller.laser_is_moving_changed.connect(self._laser_is_moving_changed)
self.mp_laser_controller.movement_finished_changed.connect(self._laser_movement_finished)
self.kill_thread = False
def connect_capture_device(self, device: captdev.Controller):
self.logger.info("***********************************************Connecting to capture device..***********************************")
self.mp_laser_controller.mp_read_laser_settings(self.model.port)
if isinstance(device, captdev.Controller):
self.model.capturing_device = device
self.model.capturing_device_connected = True
def connect_device(self):
self.logger.info("Connecting to laser...")
self.mp_laser_controller.mp_read_laser_settings(self.model.port)
def _move_to_wavelength_finished(self, wavelength: float):
self.logger.info(f"Move to wavelength finished. Current wavelength: {wavelength}")
def _laser_is_moving_changed(self, is_moving: bool, to_wavelength: float):
self.logger.info(f"************** Laser is moving: {is_moving}."
f"Moving to {self.model.laser_moving_to_wavelength} nm")
self.model.laser_is_moving = (is_moving, to_wavelength)
self.logger.info(f"************** Laser is moving: {is_moving}."
f"Moving to {self.model.laser_moving_to_wavelength} nm")
def _laser_movement_finished(self, is_finished: bool):
pass
# def _monitor_laser_state(self):
# while not self.kill_thread:
# self.model.connected = bool(self._laser_connected_flag.value)
# self.model.laser_is_moving = bool(self._laser_moving_flag.value)
# self.model.laser_at_position = bool(self._laser_finished_flag.value)
# self.model.current_wavelength = float(self._current_wavelength.value)
# #print(self._laser_state.connected)
# #print("Monitor")
# time.sleep(0.1)
# self.logger.info("Monitor Laser State Thread Ended")
# # Reset the flag
# self._laser_finished_flag.value = False
# def read_laser_settings(self):
# self.logger.info("Reading laser settings...")
# with LaserCon() as con:
# self.model.connected = con.connected
# self.model.min_laser_wavelength = con.min_wavelength
# self.model.max_laser_wavelength = con.max_wavelength
# self.model.current_wavelength = con.current_wavelength
# self.model.velocity = con.velocity
# self.model.acceleration = con.acceleration
#
#
# self.model.connected = False
def start_wavelength_sweep(self, start_wavelength: float = None, stop_wavelength: float = None) -> None:
# self.capt_device.clear_data()
......@@ -78,33 +139,11 @@ class LaserControlController:
start_wavelength = self.model.sweep_start_wavelength
if stop_wavelength is None:
stop_wavelength = self.model.sweep_stop_wavelength
self.proc = Process(target=wavelength_sweep,
args=(self._laser_port,
self._current_wavelength,
self._laser_connected_flag,
self._laser_moving_flag,
self.laser_at_start_position_flag,
self.laser_at_end_position_flag,
# self.capt_device.start_capture_flag,
self.laser_at_end_position_flag,
self._laser_finished_flag,
start_wavelength, stop_wavelength,
self._laser_state))
self.proc.start()
self.logger.info("Sweep Process Started")
# self.thread_manager.start(self._monitor_laser_state)
self.mp_laser_controller.wavelength_sweep(self.model.port, start_wavelength, stop_wavelength)
def move_to_wavelength(self, wavelength: float) -> None:
self.proc = Process(target=move_to_wavelength,
args=(self._laser_port, self._current_wavelength,
self._laser_connected_flag,
self._laser_moving_flag,
self._laser_finished_flag,
wavelength))
self.proc.start()
self.logger.info("Move to Wavelength Process Started")
# self.thread_manager.start(self._monitor_laser_state)
self.logger.info(f"Move to wavelength {wavelength}")
self.mp_laser_controller.move_to_wavelength(self.model.port, wavelength)
def stop_process(self):
time_start = time.time()
......
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '../'))
from ctypes import c_int, c_byte
class LaserState:
def __init__(self):
# Multiprocessing Information
self._pid = None
self._ppid = None
self._wavelength = 0
self._min_wavelength = 0
self._max_wavelength = 0
self._velocity = 0
self._acceleration = 0
self._deceleration = 0
def reinit(self, fields: dict):
for k, v in fields.items():
setattr(self, k, v)
# =========== Multiprocessing Information
@property
def pid(self):
return self._pid
@property
def ppid(self):
return self._ppid
# =========== Laser Information
@property
def wavelength(self):
return self._wavelength
@property
def min_wavelength(self):
return self._min_wavelength
@property
def max_wavelength(self):
return self._max_wavelength
@property
def velocity(self):
return self._velocity
@property
def acceleration(self):
return self._acceleration
@property
def deceleration(self):
return self._deceleration
class LaserStateMPSetter(LaserState):
def __init__(self, state_queue):
super().__init__()
self._state_queue = state_queue
@LaserState.pid.setter
def pid(self, value):
self._pid = value
self._state_queue.put(self.to_simple_class())
@LaserState.ppid.setter
def ppid(self, value):
self._ppid = value
self._state_queue.put(self.to_simple_class())
@LaserState.wavelength.setter
def wavelength(self, value):
self._wavelength = value
self._state_queue.put(self.to_simple_class())
@LaserState.min_wavelength.setter
def min_wavelength(self, value):
self._min_wavelength = value
self._state_queue.put(self.to_simple_class())
@LaserState.max_wavelength.setter
def max_wavelength(self, value):
self._max_wavelength = value
self._state_queue.put(self.to_simple_class())
@LaserState.velocity.setter
def velocity(self, value):
self._velocity = value
self._state_queue.put(self.to_simple_class())
@LaserState.acceleration.setter
def acceleration(self, value):
self._acceleration = value
self._state_queue.put(self.to_simple_class())
@LaserState.deceleration.setter
def deceleration(self, value):
self._deceleration = value
self._state_queue.put(self.to_simple_class())
def to_simple_class(self) -> LaserState:
exclude = ["_state_queue"]
laserstate = LaserState()
to_dict = {}
for item, value in self.__dict__.items():
if item in exclude:
continue
else:
to_dict[item] = value
laserstate.reinit(to_dict)
return laserstate
\ No newline at end of file
import logging
import os
import time
from multiprocessing import Value
import cmp
from LaserControl.controller.LaserCon import LaserCon
class MPLaserDevice(cmp.CProcess):
def __init__(self, state_queue, cmd_queue,
laser_moving_flag: Value,
laser_finished_flag: Value,
start_capture_flag: Value,
enable_internal_logging):
super().__init__(state_queue, cmd_queue, enable_internal_logging=enable_internal_logging)
self.logger, self.ha = None, None
# if not self.logger.handlers:
# self.logger.setLevel(level=logging.DEBUG)
# self.logger.disabled = False
# print(self.logger)
# self.logger.info(f"Created logger for {self.__class__.__name__}({os.getpid()})")
self.laser_moving_flag = laser_moving_flag
self.laser_finished_flag = laser_finished_flag
self.start_capture_flag = start_capture_flag
def init_loggers(self):
self.logger, self.ha = self.create_new_logger(f"{self.__class__.__name__}/({os.getpid()})")
def wrap_func(self, func, con: LaserCon = None, usb_port: str = None):
res = None
if usb_port is not None and con is None:
with LaserCon(usb_port) as con:
self.get_connected(con)
res = func(con)
self.get_connected(con)
elif usb_port is None and con is not None and isinstance(con, LaserCon):
res = func(con)
elif usb_port is None and con is None:
self.logger.error("USB Port or a connection object must be provided!")
raise ValueError("USB Port or a connection object must be provided!")
#self.logger.debug(f"Result of {func.__name__} is {res}")
return res
@cmp.CProcess.register_for_signal()
def get_connected(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> bool:
def func(_con: LaserCon):
return _con.connected
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
def get_current_wavelength(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.current_wavelength
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
def get_min_wavelength(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.min_wavelength
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
def get_max_wavelength(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.max_wavelength
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
def get_velocity(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.velocity
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
def get_acceleration(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(con: LaserCon): return con.acceleration
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
def get_deceleration(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.deceleration
return self.wrap_func(func, con, usb_port)
def mp_read_laser_settings(self, usb_port: str = None, con: LaserCon = None, *args, **kwargs):
self.logger.info(f"Reading laser settings from process using {usb_port}")
def _read(con: LaserCon):
self.get_connected(con)
self.get_min_wavelength(con)
self.get_max_wavelength(con)
self.get_current_wavelength(con)
self.get_velocity(con)
self.get_acceleration(con)
self.get_deceleration(con)
self.wrap_func(_read, con, usb_port)
@cmp.CProcess.register_for_signal(postfix="_changed")
def laser_is_moving(self, moving: bool, to_wavelength: float = -1):
self.laser_moving_flag.value = moving
return self.laser_moving_flag.value, to_wavelength
@cmp.CProcess.register_for_signal(postfix="_changed")
def movement_finished(self, finished: bool):
self.laser_finished_flag.value = finished
return self.laser_finished_flag.value
@cmp.CProcess.register_for_signal()
def move_to_wavelength(self, usb_port: str = None, wavelength: float = None, capture: bool = False,
con: LaserCon = None, *args, **kwargs):
# laser_moving_flag.value = False
def _move(con: LaserCon):
self.mp_read_laser_settings(usb_port, con)
self.laser_is_moving(False)
self.laser_finished_flag.value = False
# current_wavelength.value = con.current_wavelength
# laser_connected_flag.value = con.connected
# log_debug(f"Current Wavelength: {current_wavelength.value}")
# Move the laser to the new wavelength
# f = io.StringIO()
time_start = 0
# with stdout_redirector(f) as s:
print(self.logger)
self._internal_logger.info(f"**** Go to selected wavelength. Started moving laser to {wavelength}. ****")
# laser_finished_flag.value = False
# laser_moving_flag.value = True
# if capture_device_started_flag is not None:
# capture_device_started_flag.value = True
time_start = time.time()
self.laser_is_moving(True, wavelength)
if capture:
self.start_capture_flag.value = int(True)
self.logger.info(
f"******************** Capture flag set to {self.start_capture_flag.value} **********************")
con.go_to_wvl(wavelength, False)
self.start_capture_flag.value = int(False)
self._internal_logger.info(
f"******************** Capture flag set to {self.start_capture_flag.value} **********************")
self.laser_is_moving(False)
time_end = time.time()
# if capture_device_started_flag is not None:
# capture_device_started_flag.value = False
# laser_finished_flag.value = True
# laser_moving_flag.value = False
self.logger.info(f"Go to selected wavelength finished.")
# current_wavelength.value = con.current_wavelength
self.logger.info(
f"Current Wavelength: {self.get_current_wavelength(con)}. Took {time_end - time_start} seconds to move.")
self.mp_read_laser_settings(usb_port, con)
# laser_connected_flag.value = False # We need to manually set this
return self.get_current_wavelength(con)
return self.wrap_func(_move, con, usb_port)
def wavelength_sweep(self, usb_port: str = None,
wavelength_start: float = None,
wavelength_end: float = None, *args, **kwargs):
# laser_finished_flag.value = False
# laser_state.value.connected = False
self.logger.info(f"Starting sweep from {wavelength_start} to {wavelength_end}.")
self.logger.info(f"Resetting laser to start wavelength {wavelength_start}")
self.move_to_wavelength(usb_port, wavelength_start)
self.logger.info(f"Starting sweep from {wavelength_start} to {wavelength_end}.")
self.move_to_wavelength(usb_port, wavelength_end, capture=True)
self.logger.info(f"Finished sweep from {wavelength_start} to {wavelength_end}.")
# laser_state.connected = False
from multiprocessing import Value
from PySide6.QtCore import Signal
from cmp.CProcessControl import CProcessControl
from LaserControl.controller.multiprocess.MPLaserDevice import MPLaserDevice
class MPLaserDeviceControl(CProcessControl):
get_connected_finished = Signal(bool)
get_current_wavelength_finished = Signal(float)
get_min_wavelength_finished = Signal(float)
get_max_wavelength_finished = Signal(float)
get_velocity_finished = Signal(float)
get_acceleration_finished = Signal(float)
get_deceleration_finished = Signal(float)
mp_read_laser_settings_finished = Signal()
move_to_wavelength_finished = Signal(float)
wavelength_sweep_finished = Signal(float)
laser_is_moving_changed = Signal(bool, float)
movement_finished_changed = Signal(bool)
def __init__(self, parent,
laser_moving_flag: Value,
laser_finished_flag: Value,
start_capture_flag: Value,
enable_logging=False):
super().__init__(parent, enable_internal_logging=enable_logging)
self.register_child_process(MPLaserDevice(self.state_queue, self.cmd_queue,
laser_moving_flag, laser_finished_flag, start_capture_flag,
enable_internal_logging=enable_logging))
def set_start_capture_flag(self, start_capture_flag: Value):
self._start_capture_flag = start_capture_flag
@CProcessControl.register_function
def get_connected(self):
self._internal_logger.info("Reading current connection state from process.")
@CProcessControl.register_function()
def get_current_wavelength(self):
self._internal_logger.info("Reading current wavelength from process.")
@CProcessControl.register_function()
def get_min_wavelength(self):
self._internal_logger.info("Reading minimum wavelength from process.")
@CProcessControl.register_function()
def get_max_wavelength(self):
self._internal_logger.info("Reading maximum wavelength from process.")
@CProcessControl.register_function()
def get_velocity(self):
self._internal_logger.info("Reading velocity from process.")
@CProcessControl.register_function()
def get_acceleration(self):
self._internal_logger.info("Reading acceleration from process.")
@CProcessControl.register_function()
def get_deceleration(self):
self._internal_logger.info("Reading deceleration from process.")
@CProcessControl.register_function()
def mp_read_laser_settings(self, usb_port: str):
self._internal_logger.info(f"Reading laser settings from process using {usb_port}")
@CProcessControl.register_function()
def move_to_wavelength(self, usb_port: str, wavelength: float):
print(f"Moving laser ({usb_port}) to wavelength {wavelength} from process")
@CProcessControl.register_function()
def wavelength_sweep(self, usb_port: str, wavelength_start: float, wavelength_end: float):
print(f"Sweeping laser ({usb_port}): Wavelength {wavelength_start} - {wavelength_end} from process")
from controller.LaserCon import LaserCon
def _get_current_wavelength(con: LaserCon() = None) -> float:
if con is None or not isinstance(con, LaserCon):
raise ValueError("laco must be a LaserConn object")
else:
print(con.current_wavelength)
return con.current_wavelength
import os
def log_debug(msg, prefix="LAS Thread"):
print(f"DBG | [{prefix}/{os.getpid()}]: {msg}")
def log_error(msg, prefix="LAS Thread"):
print(f"ERR | [{prefix}/{os.getpid()}]: {msg}")
def log_info(msg, prefix="LAS Thread"):
print(f"INF | [{prefix}/{os.getpid()}]: {msg}")
def log_warning(msg, prefix="LAS Thread"):
print(f"WARN | [{prefix}/{os.getpid()}]: {msg}")
\ No newline at end of file
......@@ -2,8 +2,8 @@ import io
import time
from multiprocessing import Value
from controller.LaserCon import LaserCon
from controller.multiprocess.logging import log_debug, log_info
from LaserControl.controller.LaserCon import LaserCon
#from LaserControl.controller.multiprocess.logging import log_debug, log_info
def move_to_wavelength(laser_port, current_wavelength: Value,
......
from multiprocessing import Value, Array
from controller.multiprocess.LaserStateArray import LaserStateArray
from controller.multiprocess.logging import log_info
from controller.multiprocess.move_to_wavelength import move_to_wavelength
def wavelength_sweep(laser_port, current_wavelength: Value,
laser_connected_flag: Value, # Flag if the laser is connected
laser_moving_flag: Value, # Flag if the laser is moving
laser_at_start_position_flag: Value, # Flag if the laser is at the starting position
laser_at_end_position_flag: Value, # Flag if the laser is at the end position
capt_device_started_flag: Value, # Flag if the capture device is started
laser_finished_flag: Value, # Flag if the laser is finished
start_wavelength: int, end_wavelength: int,
laser_state: Value):
laser_finished_flag.value = False
#laser_state.value.connected = False
log_info(f"Starting sweep from {start_wavelength} to {end_wavelength}.")
log_info(f"Resetting laser to start wavelength {start_wavelength}")
move_to_wavelength(
laser_port=laser_port,
current_wavelength=current_wavelength,
laser_connected_flag=laser_connected_flag,
laser_moving_flag=laser_moving_flag,
laser_finished_flag=laser_at_start_position_flag,
wavelength=start_wavelength,
vel_fast=True,
capture_device_started_flag=None)
log_info(f"Starting sweep from {start_wavelength} to {end_wavelength}.")
move_to_wavelength(
laser_port=laser_port,
current_wavelength=current_wavelength,
laser_connected_flag=laser_connected_flag,
laser_moving_flag=laser_moving_flag,
laser_finished_flag=laser_at_end_position_flag,
wavelength=end_wavelength,
vel_fast=False,
capture_device_started_flag=capt_device_started_flag)
log_info(f"Finished sweep from {start_wavelength} to {end_wavelength}.")
#laser_state.connected = False
......@@ -8,6 +8,7 @@ from LaserControl.libs.LaserSceleton import LaserScelton
class LaserLibSimulator(LaserScelton):
connected = False
def __init__(self) -> None:
super().__init__()
self.pref = "Laser (Simulator)"
......@@ -103,19 +104,26 @@ class LaserLibSimulator(LaserScelton):
# ==================================================================================================================
def goToWvl(self, wavelength: float, fast: bool) -> None:
num_wavelength = int(wavelength)
#et_acc_dec = self._velocity / self._acceleration
#wl_after_acc = self._current_wavelength + 0.5 * self._acceleration * et_acc_dec ** 2
#wl_bevore_dec = num_wavelength - 0.5 * self._deceleration * et_acc_dec ** 2
#eta = 2 * et_acc_dec + (wl_bevore_dec - wl_after_acc) / self._velocity
#steps_per_second = (eta / 100)*1000
print(f"Moving from {self._current_wavelength} to {num_wavelength}")
if int(self._current_wavelength) > num_wavelength:
print(f"Going down: {self._current_wavelength} -> {num_wavelength}")
for i in range(int(self._current_wavelength), num_wavelength, -1):
for i in range(num_wavelength, int(self._current_wavelength), -1):
print(f"{i} | ", end="")
time.sleep(0.5)
time.sleep(1)
else:
print(f"Going up: {self._current_wavelength} -> {num_wavelength}")
for i in range(num_wavelength, int(self._current_wavelength)):
for i in range(int(self._current_wavelength), num_wavelength):
print(f"{i} | ", end="")
time.sleep(0.5)
print(f"Done!")
time.sleep(1)
self._current_wavelength = num_wavelength
print(f"Done: {self._current_wavelength}")
def startScan(self, wavelength: float, target: float) -> None:
self.goToWvl(wavelength, True)
......
......@@ -2,6 +2,7 @@
from LaserControl.LaserConfig import LaserConfig
from LaserControl.model.LaserControlSignals import LaserControlSignals
import CaptDeviceControl as captdev
# from LaserControl.model.LaserControlSignals import LaserControllerSignals
# from LaserControlOld.LaserConfig import LaserConfig
......@@ -11,37 +12,37 @@ class LaserControlModel(object):
def __init__(self, laser_config: LaserConfig):
self.signals = LaserControlSignals()
self._laser_config: LaserConfig = laser_config
self._connected = False
self._port = None
self._laser_connected = False
self._laser_at_position = False
self._laser_is_moving = False
self._wavelength_sweep_running = False
self._laser_ready_for_sweep = False
# A class which can be used to export the laser Settings
self._laser_moving_to_wavelength: float = 0
self._current_wavelength = -1
self._min_laser_wavelength = -1
self._max_laser_wavelength = -1
self._deceleration = -1
self._min_deceleration = -10
self._max_deceleration = 10
self._acceleration = -1
self._min_acceleration = -10
self._max_acceleration = 10
self._velocity = -1
self._min_velocity = -10
self._max_velocity = 10
self._sweep_start_wavelength = self.laser_config.wl_sweep_start.get()
self._sweep_stop_wavelength = self.laser_config.wl_sweep_stop.get()
# Capturing device settings
self._capturing_device_connected: bool = False
self._capturing_device: captdev.Controller = None
@property
def laser_config(self) -> LaserConfig:
......@@ -61,7 +62,6 @@ class LaserControlModel(object):
# (mcpy.Rectangular(self.sweep_start_wavelength, 0.01, unit='nm', k=2), mcpy.Rectangular(self.sweep_stop_wavelength, 0.01, unit='nm', k=2))
# )
@property
def laser_ready_for_sweep(self):
return self._laser_ready_for_sweep
......@@ -78,11 +78,11 @@ class LaserControlModel(object):
@laser_at_position.setter
def laser_at_position(self, value):
self._laser_at_position = value
self.signals.laser_at_position_changed.emit(self._laser_at_position)
self.signals.laser_at_position_changed.emit(self.laser_at_position)
@property
def sweep_start_wavelength(self):
return self._sweep_start_wavelength
return self.laser_config.wl_sweep_start.get()
@sweep_start_wavelength.setter
def sweep_start_wavelength(self, value):
......@@ -95,13 +95,12 @@ class LaserControlModel(object):
if value > self.sweep_stop_wavelength:
raise Exception(f"Sweep start wavelength ({value}) can not be greater than "
f"sweep stop wavelength ({self.sweep_stop_wavelength})!")
self._sweep_start_wavelength = value
self.signals.sweep_start_wavelength_changed.emit(self._sweep_start_wavelength)
self.laser_config.sweep_start_wavelength = value
self.laser_config.wl_sweep_start.set(value)
self.signals.sweep_start_wavelength_changed.emit(self.sweep_start_wavelength)
@property
def sweep_stop_wavelength(self) -> float:
return self._sweep_stop_wavelength
return self.laser_config.wl_sweep_stop.get()
@sweep_stop_wavelength.setter
def sweep_stop_wavelength(self, value):
......@@ -114,9 +113,8 @@ class LaserControlModel(object):
if value < self.sweep_start_wavelength:
raise Exception(f"Sweep stop wavelength ({value}) can not be smaller than "
f"sweep start wavelength ({self.sweep_start_wavelength})!")
self._sweep_stop_wavelength = value
self.signals.sweep_stop_wavelength_changed.emit(self._sweep_stop_wavelength)
self.laser_config.sweep_start_wavelength = value
self.laser_config.wl_sweep_stop.set(value)
self.signals.sweep_stop_wavelength_changed.emit(self.sweep_stop_wavelength)
@property
def connected(self):
......@@ -125,25 +123,35 @@ class LaserControlModel(object):
@connected.setter
def connected(self, value):
self._connected = value
self.signals.connected_changed.emit(self._connected)
self.signals.connected_changed.emit(self.connected)
@property
def port(self):
return self._port
return self.laser_config.port.get()
@port.setter
def port(self, value):
self._port = value
self.signals.port_changed.emit(self._port)
self.laser_config.port.set(value)
self.signals.port_changed.emit(self.port)
@property
def laser_is_moving(self):
return self._laser_is_moving
@laser_is_moving.setter
def laser_is_moving(self, value):
self._laser_is_moving = value
self.signals.laser_is_moving_changed.emit(self._laser_is_moving)
def laser_is_moving(self, value: tuple):
self._laser_is_moving = value[0]
self.laser_moving_to_wavelength = value[1]
self.signals.laser_is_moving_changed.emit(self.laser_is_moving, self.laser_moving_to_wavelength)
@property
def laser_moving_to_wavelength(self):
return self._laser_moving_to_wavelength
@laser_moving_to_wavelength.setter
def laser_moving_to_wavelength(self, value):
self._laser_moving_to_wavelength = value
self.signals.laser_moving_to_wavelength_changed.emit(self.laser_moving_to_wavelength)
@property
def current_wavelength(self):
......@@ -152,7 +160,7 @@ class LaserControlModel(object):
@current_wavelength.setter
def current_wavelength(self, value):
self._current_wavelength = value
self.signals.current_wavelength_changed.emit(self._current_wavelength)
self.signals.current_wavelength_changed.emit(self.current_wavelength)
@property
def min_laser_wavelength(self):
......@@ -161,8 +169,7 @@ class LaserControlModel(object):
@min_laser_wavelength.setter
def min_laser_wavelength(self, value):
self._min_laser_wavelength = value
#self._sweep_start_wavelength = value
self.signals.min_laser_wavelength_changed.emit(self._min_laser_wavelength)
self.signals.min_laser_wavelength_changed.emit(self.min_laser_wavelength)
@property
def max_laser_wavelength(self):
......@@ -171,17 +178,16 @@ class LaserControlModel(object):
@max_laser_wavelength.setter
def max_laser_wavelength(self, value):
self._max_laser_wavelength = value
#self._sweep_stop_wavelength = value
self.signals.max_laser_wavelength_changed.emit(self._max_laser_wavelength)
self.signals.max_laser_wavelength_changed.emit(self.max_laser_wavelength)
# ==================================================================================================================
@property
def deceleration(self):
return self._deceleration
return self.laser_config.deceleration.get()
@deceleration.setter
def deceleration(self, value):
self._deceleration = value
self.laser_config.deceleration.set(value)
self.signals.deceleration_changed.emit(self.deceleration)
@property
......@@ -191,7 +197,7 @@ class LaserControlModel(object):
@min_deceleration.setter
def min_deceleration(self, value):
self._min_acceleration = value
self.signals.min_deceleration_changed.emit(self._min_deceleration)
self.signals.min_deceleration_changed.emit(self.min_deceleration)
@property
def max_deceleration(self):
......@@ -200,17 +206,17 @@ class LaserControlModel(object):
@max_deceleration.setter
def max_deceleration(self, value):
self._max_deceleration = value
self.signals.max_deceleration_changed.emit(self._max_deceleration)
self.signals.max_deceleration_changed.emit(self.max_deceleration)
# ==================================================================================================================
@property
def acceleration(self):
return self._acceleration
return self.laser_config.acceleration.get()
@acceleration.setter
def acceleration(self, value):
self._acceleration = value
self.signals.acceleration_changed.emit(self._acceleration)
self.laser_config.acceleration.set(value)
self.signals.acceleration_changed.emit(self.acceleration)
@property
def min_acceleration(self):
......@@ -219,7 +225,7 @@ class LaserControlModel(object):
@min_acceleration.setter
def min_acceleration(self, value):
self._min_acceleration = value
self.signals.min_acceleration_changed.emit(self._min_acceleration)
self.signals.min_acceleration_changed.emit(self.min_acceleration)
@property
def max_acceleration(self):
......@@ -228,17 +234,18 @@ class LaserControlModel(object):
@max_acceleration.setter
def max_acceleration(self, value):
self._max_acceleration = value
self.signals.max_acceleration_changed.emit(self._max_acceleration)
self.signals.max_acceleration_changed.emit(self.max_acceleration)
# ==================================================================================================================
@property
def velocity(self):
return self._velocity
return self.laser_config.velocity.get()
@velocity.setter
def velocity(self, value):
self._velocity = value
self.signals.velocity_changed.emit(self._velocity)
self.laser_config.velocity.set(value)
self.signals.velocity_changed.emit(self.velocity)
@property
def min_velocity(self):
......@@ -272,3 +279,22 @@ class LaserControlModel(object):
def wavelength_sweep_running(self, value):
self._wavelength_sweep_running = value
self.signals.wavelength_sweep_running_changed.emit(self._wavelength_sweep_running)
# ==================================================================================================================
@property
def capturing_device_connected(self):
return self._capturing_device_connected
@capturing_device_connected.setter
def capturing_device_connected(self, value):
self._capturing_device_connected = value
self.signals.capturing_device_connected_changed.emit(self._capturing_device_connected)
@property
def capturing_device(self) -> captdev.Controller:
return self._capturing_device
@capturing_device.setter
def capturing_device(self, value: captdev.Controller):
self._capturing_device = value
self.signals.capturing_device_changed.emit(self._capturing_device)
\ No newline at end of file
......@@ -2,6 +2,10 @@ from PySide6.QtCore import QObject, Signal
class LaserControlSignals(QObject):
def __init__(self, parent: QObject = None):
super().__init__(parent)
laser_config_changed = Signal()
# Device status
......@@ -9,6 +13,8 @@ class LaserControlSignals(QObject):
port_changed = Signal(str)
# Device parameter
laser_moving_to_wavelength_changed = Signal(float)
current_wavelength_changed = Signal(float)
min_laser_wavelength_changed = Signal(float)
max_laser_wavelength_changed = Signal(float)
......@@ -27,7 +33,7 @@ class LaserControlSignals(QObject):
max_velocity_changed = Signal(float)
laser_is_moving_changed = Signal(bool)
laser_is_moving_changed = Signal(bool, float)
wavelength_sweep_running_changed = Signal(bool)
laser_at_position_changed = Signal(bool)
laser_ready_for_sweep_changed = Signal(bool)
......@@ -38,6 +44,9 @@ class LaserControlSignals(QObject):
connected = Signal(bool)
capturing_device_changed = Signal(bool)
capturing_device_connected_changed = Signal(object)
# current wavelength, min wavelength, max wavelength
# velocity, acceleration, deceleration
current_speed_settings = Signal(float, float, float)
......
import logging
import os
import time
from PySide6.QtWidgets import QMainWindow
from PySide6.QtCore import QThread, QTimer
from PySide6.QtWidgets import QMainWindow, QProgressDialog
from rich.logging import RichHandler
from LaserControl.controller.LaserControlController import LaserControlController
from LaserControl.view.Ui_LaserControlWindow import Ui_LaserControlWindow
......@@ -12,12 +16,22 @@ class LaserControlView(QMainWindow):
def __init__(self, model: LaserControlModel, controller: LaserControlController):
super().__init__()
self.logger = logging.getLogger("LaserControlWindow")
self.handler = RichHandler(rich_tracebacks=True)
self.logger = logging.getLogger(f"{self.__class__.__name__}({os.getpid()})")
self.logger.handlers = [self.handler]
self.logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)s %(message)s')
self.handler.setFormatter(formatter)
self._ui = Ui_LaserControlWindow()
self._ui.setupUi(self)
self.laser_moving_progress_dialog = QProgressDialog(self)
self.laser_moving_progress_dialog.cancel()
self.timer = QTimer()
self.timer.timeout.connect(self.handle_timer)
self.model: LaserControlModel = model
self.controller = controller
......@@ -31,12 +45,14 @@ class LaserControlView(QMainWindow):
self.laser_information = WidgetLaserInformation()
self._ui.grd_system_info.addWidget(self.laser_information, 0, 0, 1, 1)
self._connect_controls()
self._connect_signals()
def _on_uncertainty_changed(self, uncertainty):
self.uncertainty_dist_plot.uncertainty = uncertainty
# def _on_uncertainty_changed(self, uncertainty):
# self.uncertainty_dist_plot.uncertainty = uncertainty
def connect_capturing_device(self, device):
pass
def _connect_controls(self):
# USB Connection List
......@@ -46,8 +62,8 @@ class LaserControlView(QMainWindow):
# Buttons
self._ui.btn_connect.clicked.connect(self._on_btn_connect_clicked)
self._ui.btn_move_to_wavelength.clicked.connect(self._on_btn_move_to_wavelength_clicked)
# Spinboxes
# Spinboxes
self.model.port = self._ui.cb_port_selection.currentText()
# Sweep start
......@@ -63,10 +79,15 @@ class LaserControlView(QMainWindow):
self.model.signals.connected_changed.connect(self._on_laser_connected_changed)
self.model.signals.laser_is_moving_changed.connect(self._on_laser_is_moving_changed)
# Capturing device signals
self.model.signals.capturing_device_connected_changed.connect(self._on_capture_device_connected_changed)
# self.uncertainty_model.signals.uncertainty_changed.connect(self._on_uncertainty_changed)
# Triggerd if the laser port changes
self.model.signals.current_wavelength_changed.connect(self._on_wavelength_changed)
self.model.signals.min_laser_wavelength_changed.connect(self._on_min_wavelength_changed)
self.model.signals.max_laser_wavelength_changed.connect(self._on_max_wavelength_changed)
self.model.signals.velocity_changed.connect(self._on_velocity_changed)
self.model.signals.acceleration_changed.connect(self._on_acceleration_changed)
......@@ -88,29 +109,65 @@ class LaserControlView(QMainWindow):
self._ui.btn_connect.setText("Connect")
self.laser_information.set_connection_state(False)
def _on_laser_is_moving_changed(self, is_moving):
def _on_laser_is_moving_changed(self, is_moving, to_wl):
self._ui.btn_move_to_wavelength.setEnabled(not is_moving)
self._ui.btn_start_sweep.setEnabled(not is_moving)
self.laser_information.set_movement_state(is_moving)
self.laser_information.set_movement_state(is_moving, "Moving" if is_moving else "Stopped")
if is_moving:
self.logger.info(f">>>>> {self.model.current_wavelength}, {to_wl}")
self._display_estimated_progress(
self.model.current_wavelength,
self.model.laser_moving_to_wavelength)
else:
self._exit_display_estimated_progress()
# ==================================================================================================================
# Slots that are triggerd if the laser settings change7
# ==================================================================================================================
def _on_connected_changed(self, connected):
self.laser_information.set_connection_state(connected)
# ==================================================================================================================
# Slots that are triggered if the capturing device changes
# ==================================================================================================================
def _on_capture_device_connected_changed(self, connected):
if connected:
self.laser_information.set_capt_dev_state(connected, self.model.capturing_device.model.device_information.device_name)
self.model.capturing_device.model.device_information.signals.device_name_changed.connect(
self._on_capture_device_name_changed)
def _on_capture_device_name_changed(self):
self.laser_information.set_capt_dev_state(self.model.capturing_device_connected,
f"Connected: {self.model.capturing_device.model.device_information.device_name}")
def _on_wavelength_changed(self, wavelength):
# self.logger.debug(f"Wavelength changed to: {wavelength}. "
# f"Range ({self.model.min_laser_wavelength}-{self.model.max_laser_wavelength})nm")
self._ui.lbl_min_wavelength.setText(str(self.model.min_laser_wavelength))
self._ui.lbl_max_wavelength.setText(str(self.model.max_laser_wavelength))
# self._ui.lbl_min_wavelength.setText(str(self.model.min_laser_wavelength))
# self._ui.lbl_max_wavelength.setText(str(self.model.max_laser_wavelength))
self._ui.slider_wavelength.setMinimum(self.model.min_laser_wavelength)
self._ui.slider_wavelength.setMaximum(self.model.max_laser_wavelength)
# self._ui.slider_wavelength.setMinimum(self.model.min_laser_wavelength)
# self._ui.slider_wavelength.setMaximum(self.model.max_laser_wavelength)
self._ui.sb_set_wavelength.setMinimum(self.model.min_laser_wavelength)
self._ui.sb_set_wavelength.setMaximum(self.model.max_laser_wavelength)
# self._ui.sb_set_wavelength.setMinimum(self.model.min_laser_wavelength)
# self._ui.sb_set_wavelength.setMaximum(self.model.max_laser_wavelength)
# Set the minimum and maximum values for the sweep start and stop
self._ui.sb_set_wavelength.setValue(wavelength)
self._ui.lcd_wavelength.display(wavelength)
self._ui.slider_wavelength.setValue(wavelength)
def _on_min_wavelength_changed(self, min_wavelength):
self._ui.lbl_min_wavelength.setText(str(self.model.min_laser_wavelength))
self._ui.slider_wavelength.setMinimum(self.model.min_laser_wavelength)
self._ui.sb_set_wavelength.setMinimum(self.model.min_laser_wavelength)
def _on_max_wavelength_changed(self, min_wavelength):
self._ui.lbl_max_wavelength.setText(str(self.model.max_laser_wavelength))
self._ui.slider_wavelength.setMaximum(self.model.max_laser_wavelength)
self._ui.sb_set_wavelength.setMaximum(self.model.max_laser_wavelength)
def _on_velocity_changed(self, velocity: float):
self.logger.debug(f"Velocity changed to: {velocity} "
f"[{self.model.min_velocity}, {self.model.max_velocity}]")
......@@ -156,14 +213,14 @@ class LaserControlView(QMainWindow):
def _on_btn_connect_clicked(self):
if self.model.connected:
self.logger.debug("Attempting to disconnect from laser")
self.controller.read_laser_settings()
# self.controller.read_laser_settings()
elif not self.model.connected:
self.logger.debug(f"Attempting to connect to laser on port: {self.model.port}")
self.controller.read_laser_settings()
self.controller.connect_device()
def _on_btn_move_to_wavelength_clicked(self):
self.logger.debug(f"Attempting to set wavelength to: {self._ui.sb_set_wavelength.value()}")
self.controller.move_to_wavelength(False)
self.controller.move_to_wavelength(self._ui.sb_set_wavelength.value())
def _on_sb_sweep_start_changed(self, value):
try:
......@@ -185,3 +242,26 @@ class LaserControlView(QMainWindow):
def _on_btn_start_sweep_clicked(self):
self.logger.warning("Sweep manually started")
self.controller.start_wavelength_sweep(self.model.sweep_start_wavelength, self.model.sweep_stop_wavelength)
def _display_estimated_progress(self, start_wavelength: float, stop_wavelength: float):
# Dialog that displays the estimated time until the sweep is finished
# calulate the time, the laser accelerates using
self.laser_moving_progress_dialog.show()
et_acc_dec = self.model.velocity / self.model.acceleration
wl_after_acc = start_wavelength + 0.5 * self.model.acceleration * et_acc_dec ** 2
wl_bevore_dec = stop_wavelength - 0.5 * self.model.deceleration * et_acc_dec ** 2
self.eta = 2 * et_acc_dec + (wl_bevore_dec - wl_after_acc) / self.model.velocity
self.laser_moving_progress_dialog.setLabelText(
f"Laser is moving {start_wavelength}-{stop_wavelength}."
f"\nEstimated time until sweep is finished: {self.eta}")
steps_per_second = self.eta / 100
self.timer.start(steps_per_second * 1000)
def _exit_display_estimated_progress(self):
self.timer.stop()
self.laser_moving_progress_dialog.close()
def handle_timer(self):
value = self.laser_moving_progress_dialog.value() + 1
self.laser_moving_progress_dialog.setValue(value)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment