Skip to content
Snippets Groups Projects
Commit 6ded61fe authored by Christoph Schmidt's avatar Christoph Schmidt
Browse files

Updated pyproject.toml

parent 0e814eb5
No related branches found
No related tags found
No related merge requests found
......@@ -3,23 +3,15 @@
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="Python 3.12 (lasercontrol)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="PyNamespacePackagesService">
<option name="namespacePackageFolders">
<list>
<option value="$MODULE_DIR$/src/LaserControl" />
<option value="$MODULE_DIR$/src/LaserControl/controller" />
<option value="$MODULE_DIR$/src/LaserControl/model" />
<option value="$MODULE_DIR$/src/LaserControl/view" />
</list>
</option>
</component>
......
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (lasercontrol) (2)" />
<option name="sdkName" value="Python 3.10 (lasercontrol) (2)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (lasercontrol) (2)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (lasercontrol)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
[project]
name = "lasercontrol"
version = "0.1.0"
version = "0.1.1"
authors = [
{ name="Christoph Schmidt", email="cschmidt.fs@gmail.com" },
]
......
import logging
import confighandler as cfg
class LaserConfig(cfg.ConfigNode):
def __init__(self) -> None:
super().__init__()
super().__init__(internal_log_level=logging.DEBUG, internal_log=True)
self.wl_sweep_start = cfg.Field(857)
self.wl_sweep_stop = cfg.Field(870)
self.velocity = cfg.Field(2.0)
......
......@@ -11,7 +11,7 @@ import CaptDeviceControl as captdev
class LaserControlController:
def __init__(self, model: LaserControlModel, start_capture_flag: Value):
def __init__(self, model: LaserControlModel, start_capture_flag: Value = None):
self.logger = logging.Logger("Laser Driver (Generic)")
......@@ -27,9 +27,10 @@ class LaserControlController:
self._laser_finished_flag = Value('i', False, lock=self.lock)
#if self.model.capturing_device is None or not self.model.capturing_device_connected:
self._start_capture_flag = start_capture_flag
#else:
# self._start_capture_flag = self.model.capturing_device.start_capture_flag
if start_capture_flag is not None:
self._start_capture_flag = start_capture_flag
else:
self._start_capture_flag = Value('i', False, lock=self.lock)
#self._current_wavelength = Value('f', 0.0, lock=self.lock)
......@@ -147,10 +148,10 @@ class LaserControlController:
def stop_process(self):
time_start = time.time()
if self.proc is not None:
while self.proc.is_alive():
time.sleep(0.1)
self.logger.warning(f"Laser process exited after {time.time() - time_start}s")
#if self.proc is not None:
# while self.proc.is_alive():
# time.sleep(0.1)
# self.logger.warning(f"Laser process exited after {time.time() - time_start}s")
self.kill_thread = True
def __del__(self):
......
......@@ -13,8 +13,12 @@ class MPLaserDevice(cmp.CProcess):
laser_moving_flag: Value,
laser_finished_flag: Value,
start_capture_flag: Value,
enable_internal_logging):
super().__init__(state_queue, cmd_queue, enable_internal_logging=enable_internal_logging)
kill_flag: Value,
internal_log, internal_log_level):
super().__init__(state_queue, cmd_queue,
kill_flag=kill_flag,
internal_log=internal_log,
internal_log_level=internal_log_level)
self.logger, self.ha = None, None
# if not self.logger.handlers:
......@@ -45,48 +49,48 @@ class MPLaserDevice(cmp.CProcess):
#self.logger.debug(f"Result of {func.__name__} is {res}")
return res
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_connected(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> bool:
def func(_con: LaserCon):
return _con.connected
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_current_wavelength(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.current_wavelength
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_min_wavelength(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.min_wavelength
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_max_wavelength(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.max_wavelength
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_velocity(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.velocity
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_acceleration(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(con: LaserCon): return con.acceleration
return self.wrap_func(func, con, usb_port)
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def get_deceleration(self, con: LaserCon = None, usb_port: str = None, *args, **kwargs) -> float:
def func(_con: LaserCon):
return _con.deceleration
......@@ -107,17 +111,17 @@ class MPLaserDevice(cmp.CProcess):
self.wrap_func(_read, con, usb_port)
@cmp.CProcess.register_for_signal(postfix="_changed")
@cmp.CProcess.register_signal(postfix="_changed")
def laser_is_moving(self, moving: bool, to_wavelength: float = -1):
self.laser_moving_flag.value = moving
return self.laser_moving_flag.value, to_wavelength
@cmp.CProcess.register_for_signal(postfix="_changed")
@cmp.CProcess.register_signal(postfix="_changed")
def movement_finished(self, finished: bool):
self.laser_finished_flag.value = finished
return self.laser_finished_flag.value
@cmp.CProcess.register_for_signal()
@cmp.CProcess.register_signal()
def move_to_wavelength(self, usb_port: str = None, wavelength: float = None, capture: bool = False,
con: LaserCon = None, *args, **kwargs):
# laser_moving_flag.value = False
......
import logging
from multiprocessing import Value
from PySide6.QtCore import Signal
......@@ -7,71 +8,72 @@ from LaserControl.controller.multiprocess.MPLaserDevice import MPLaserDevice
class MPLaserDeviceControl(CProcessControl):
get_connected_finished = Signal(bool)
get_current_wavelength_finished = Signal(float)
get_min_wavelength_finished = Signal(float)
get_max_wavelength_finished = Signal(float)
get_velocity_finished = Signal(float)
get_acceleration_finished = Signal(float)
get_deceleration_finished = Signal(float)
mp_read_laser_settings_finished = Signal()
get_connected_finished = Signal(bool, name='get_connected_finished')
get_current_wavelength_finished = Signal(float, name='get_current_wavelength_finished')
get_min_wavelength_finished = Signal(float, name='get_min_wavelength_finished')
get_max_wavelength_finished = Signal(float, name='get_max_wavelength_finished')
get_velocity_finished = Signal(float, name='get_velocity_finished')
get_acceleration_finished = Signal(float, name='get_acceleration_finished')
get_deceleration_finished = Signal(float, name='get_deceleration_finished')
mp_read_laser_settings_finished = Signal(name='mp_read_laser_settings_finished')
move_to_wavelength_finished = Signal(float)
wavelength_sweep_finished = Signal(float)
move_to_wavelength_finished = Signal(float, name='move_to_wavelength_finished')
wavelength_sweep_finished = Signal(float, float, name='wavelength_sweep_finished')
laser_is_moving_changed = Signal(bool, float)
movement_finished_changed = Signal(bool)
laser_is_moving_changed = Signal(bool, float, name='laser_is_moving_changed')
movement_finished_changed = Signal(bool, name='movement_finished_changed')
def __init__(self, parent,
laser_moving_flag: Value,
laser_finished_flag: Value,
start_capture_flag: Value,
enable_logging=False):
super().__init__(parent, enable_internal_logging=enable_logging)
super().__init__(parent, internal_log_level=logging.DEBUG, internal_log=True)
self.register_child_process(MPLaserDevice(self.state_queue, self.cmd_queue,
laser_moving_flag, laser_finished_flag, start_capture_flag,
enable_internal_logging=enable_logging))
self.register_child_process(MPLaserDevice,
laser_moving_flag,
laser_finished_flag,
start_capture_flag)
def set_start_capture_flag(self, start_capture_flag: Value):
self._start_capture_flag = start_capture_flag
@CProcessControl.register_function
@CProcessControl.register_function(get_connected_finished)
def get_connected(self):
self._internal_logger.info("Reading current connection state from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(get_current_wavelength_finished)
def get_current_wavelength(self):
self._internal_logger.info("Reading current wavelength from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(get_min_wavelength_finished)
def get_min_wavelength(self):
self._internal_logger.info("Reading minimum wavelength from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(get_max_wavelength_finished)
def get_max_wavelength(self):
self._internal_logger.info("Reading maximum wavelength from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(get_velocity_finished)
def get_velocity(self):
self._internal_logger.info("Reading velocity from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(get_acceleration_finished)
def get_acceleration(self):
self._internal_logger.info("Reading acceleration from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(get_deceleration_finished)
def get_deceleration(self):
self._internal_logger.info("Reading deceleration from process.")
@CProcessControl.register_function()
@CProcessControl.register_function(mp_read_laser_settings_finished)
def mp_read_laser_settings(self, usb_port: str):
self._internal_logger.info(f"Reading laser settings from process using {usb_port}")
@CProcessControl.register_function()
@CProcessControl.register_function(move_to_wavelength_finished)
def move_to_wavelength(self, usb_port: str, wavelength: float):
print(f"Moving laser ({usb_port}) to wavelength {wavelength} from process")
@CProcessControl.register_function()
@CProcessControl.register_function(wavelength_sweep_finished)
def wavelength_sweep(self, usb_port: str, wavelength_start: float, wavelength_end: float):
print(f"Sweeping laser ({usb_port}): Wavelength {wavelength_start} - {wavelength_end} from process")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment