Skip to content
Snippets Groups Projects
Select Git revision
  • 00be017de003fff859146035a03215b95ee56d77
  • main default protected
  • renovate/lock-file-maintenance
  • demo protected
  • person-select-custom
  • dbp-translation-component
  • icon-set-mapping
  • port-i18next-parser
  • remove-sentry
  • favorites-and-recent-files
  • revert-6c632dc6
  • lit2
  • advertisement
  • wc-part
  • automagic
  • publish
  • wip-cleanup
  • demo-file-handling
18 results

index.js

Blame
  • AD2CaptDeviceView.py 16.71 KiB
    import logging
    import os
    
    import numpy as np
    import pyqtgraph as pg
    from PySide6.QtCore import QTimer
    from PySide6.QtWidgets import QMainWindow, QStatusBar
    from pyqtgraph.dockarea import DockArea, Dock
    from rich.logging import RichHandler
    
    from CaptDeviceControl.controller.BaseAD2CaptDevice import BaseAD2CaptDevice
    from CaptDeviceControl.model.AD2CaptDeviceModel import AD2CaptDeviceModel
    from CaptDeviceControl.model.AD2Constants import AD2Constants
    from CaptDeviceControl.view.Ui_AD2ControlWindowNew import Ui_AD2ControlWindowNew
    from CaptDeviceControl.view.widget.WidgetCapturingInformation import WidgetCapturingInformation, WidgetDeviceInformation
    
    
    class ControlWindow(QMainWindow):
    
        def __init__(self, model: AD2CaptDeviceModel, controller: BaseAD2CaptDevice):
            super().__init__()
            self.handler = RichHandler(rich_tracebacks=True)
            self.logger = logging.getLogger(f"AD2Window({os.getpid()})")
            self.logger.handlers = [self.handler]
            self.logger.setLevel(logging.DEBUG)
            formatter = logging.Formatter('%(name)s %(message)s')
            self.handler.setFormatter(formatter)
    
            self.controller = controller
            self.model = model
    
            self._ui = Ui_AD2ControlWindowNew()
            self._ui.setupUi(self)
    
            self.capt_info = WidgetCapturingInformation()
            self.dev_info = WidgetDeviceInformation()
            self._ui.grd_information.addWidget(self.capt_info, 0, 0, 1, 1)
            self._ui.grd_information.addWidget(self.dev_info, 0, 1, 1, 1)
    
            # The Information Widgets
            self._ui.grd_plot.addWidget(self._init_UI_live_plot(), 1, 0, 1, 1)
            # self._ui.grd_information.addWidget(self.init_UI_ad2_settings(), 3, 0, 1, 2)
    
            self.status_bar = QStatusBar()
            self.setStatusBar(self.status_bar)
    
            # Timer for periodically updating the plot
            self.capture_update_timer = QTimer()
            self.capture_update_timer.setInterval(10)
            self.capture_update_timer.timeout.connect(self._on_capture_update_plot)
    
            self.stream_update_timer = QTimer()
            self.stream_update_timer.setInterval(40)
            self.stream_update_timer.timeout.connect(self._on_stream_update_timer_timeout)
    
            # Connect the signals and controls
            self._connect_config_properties()
            self._connect_controls()
            self._connect_signals()
            # self._init_other_ui_elements()
            # self._ui.cb_duration_streaming_history.setCurrentIndex(5)
    
    
            self._ui.sb_acquisition_rate.setValue(self.model.capturing_information.sample_rate)
            # self._ui.cb_duration_streaming_history.set(self.model.capturing_information.streaming_history)
    
        # ==================================================================================================================
        #
        # ==================================================================================================================
        def _init_UI_live_plot(self):
            area = DockArea()
    
            d1 = Dock("Analog Discovery 2")
            area.addDock(d1, 'top')
    
            d2 = Dock("Captured Data")
            area.addDock(d2, 'bottom')
    
            self.scope_original = pg.PlotWidget(title="AD2 Acquisition")
            self.scope_original.plotItem.showGrid(x=True, y=True, alpha=1)
            d1.addWidget(self.scope_original)
            self.scope_original.setYRange(-1.5, 1.5, padding=0)
    
            self.scope_captured = pg.PlotWidget(title="Captured Data")
            self.scope_captured.plotItem.showGrid(x=True, y=True, alpha=1)
            d2.addWidget(self.scope_captured)
    
            return area
        # ==================================================================================================================
        #
        # ==================================================================================================================
        def _add_menues(self):
            pass
    
        def _connect_config_properties(self):
            # Connect the Controls that are also Settings
            self.model.ad2captdev_config.streaming_history.view.add_new_view(self._ui.cb_streaming_history)
            self.model.ad2captdev_config.sample_rate.view.add_new_view(self._ui.sb_acquisition_rate)
            # Selected Analog In Channel
            self.model.ad2captdev_config.ain_channel.view.add_new_view(self._ui.cb_channel_select)
            #self.model.ad2captdev_config.ain_channel.connect(self._ui_on_selected_ain_changed)
    
            #self.model.ad2captdev_config.selected_device_index.view.add_new_view(self._ui.cb_device_select)
    
        def _connect_controls(self):
            self._ui.cb_device_select.currentIndexChanged.connect(self._on_ui_selected_device_index_changed)
    
            self._ui.btn_connect.clicked.connect(self._on_ui_btn_connect_clicked)
    
            self._ui.sb_acquisition_rate.valueChanged.connect(self._on_ui_sample_rate_changed)
    
            # Connect the buttons
            self._ui.btn_play.clicked.connect(
                lambda: self.controller.start_capturing_process()
            )
            self._ui.btn_stop.clicked.connect(self.controller.stop_capturing_process)
    
    
            self._ui.btn_record.clicked.connect(self._ui_on_btn_recording_clicked)
            self._ui.btn_reset.clicked.connect(self._ui_on_btn_reset_clicked)
    
            # self._ui.cb_channel_select.currentIndexChanged.connect(self._ui_on_selected_ain_changed)
    
        def _connect_signals(self):
            self.model.signals.dwf_version_changed.connect(self._on_dwf_version_changed)
            self.model.device_information.signals.connected_devices_changed.connect(self._on_connected_devices_changed)
            self.model.device_information.signals.device_state_changed.connect(self._on_device_state_changed)
            self.model.device_information.signals.device_connected_changed.connect(self._on_connected_changed)
            self.model.capturing_information.signals.sample_rate_changed.connect(self._on_model_sample_rate_changed)
            self.model.capturing_information.signals.device_capturing_state_changed.connect(
                self._on_capture_process_state_changed)
            self.model.analog_in.signals.selected_ain_channel_changed.connect(self._on_selected_ain_channel_changed)
    
            # # WaveForms Runtime (DWF) Information
            # # Connected Device Information
            self.model.device_information.signals.device_name_changed.connect(self._on_device_name_changed)
            self.model.device_information.signals.device_serial_number_changed.connect(
                self._on_device_serial_number_changed)
            self.model.capturing_information.signals.ready_for_recording_changed.connect(self._on_ready_for_recording_changed)
    
        # ==================================================================================================================
        # Slots
        # ==================================================================================================================
    
    
        def _on_dwf_version_changed(self, dwf_version):
            """
                Gets called if the DWF version changes. Updates the UI.
                :param dwf_version: The DWF version
            """
            self.dev_info.dwf_version = dwf_version
    
        def _on_connected_devices_changed(self, connected_devices: list):
            """
            Gets called if the connected devices changed. Populates the combobox with the list of the connected devices.
            :param connected_devices:
            :return:
            """
            selection_index = self.model.device_information.selected_device_index
            self._ui.cb_device_select.clear()
            for it, dev in enumerate(connected_devices):
                dev: dict
                #  'type': type, 'device_id', 'device_name', 'serial_number'
                self._ui.cb_device_select.addItem(f"{it}: {dev['type']}{dev['device_id']} - {dev['device_name']}")
            self._ui.cb_device_select.setCurrentIndex(selection_index)
    
        def _on_device_state_changed(self, capturing):
            if capturing == AD2Constants.DeviceState.ACQ_NOT_STARTED():
                self.capt_info.led_device_state.set_color(color="yellow")
                self.capt_info.lbl_device_state.setText(AD2Constants.DeviceState.ACQ_NOT_STARTED(True))
            elif capturing == AD2Constants.DeviceState.NO_SAMPLES_AVAILABLE:
                self.capt_info.led_device_state.set_color(color="red")
                self.capt_info.lbl_device_state.setText(AD2Constants.DeviceState.NO_SAMPLES_AVAILABLE(True))
            elif capturing == AD2Constants.DeviceState.SAMPLES_AVAILABLE():
                self.capt_info.led_device_state.set_color(color="green")
                self.capt_info.lbl_device_state.setText(AD2Constants.DeviceState.SAMPLES_AVAILABLE(True))
            elif capturing == AD2Constants.DeviceState.DEV_CAPT_SETUP():
                self.capt_info.led_device_state.set_color(color="yellow")
                self.capt_info.lbl_device_state.setText(AD2Constants.DeviceState.DEV_CAPT_SETUP(True))
                self._ui.btn_pause.setEnabled(True)
                self._ui.btn_stop.setEnabled(True)
                self._ui.btn_record.setEnabled(False)
                self._ui.btn_reset.setEnabled(False)
                self._ui.btn_play.setEnabled(False)
            elif capturing == AD2Constants.DeviceState.DEV_CAPT_STREAMING():
                self.capt_info.led_device_state.set_color(color="green")
                self.capt_info.lbl_device_state.setText(AD2Constants.DeviceState.DEV_CAPT_STREAMING(True))
                self._ui.btn_pause.setEnabled(True)
                self._ui.btn_stop.setEnabled(True)
                self._ui.btn_record.setEnabled(True)
                self._ui.btn_reset.setEnabled(True)
                self._ui.btn_play.setEnabled(True)
                self._ui.btn_play.setChecked(True)
    
        # ==================================================================================================================
        # UI Slots
        # ==================================================================================================================
        def _on_ui_selected_device_index_changed(self, index):
            self.controller.set_selected_device(index)
    
        def _ui_on_selected_ain_changed(self, channel_index):
            """ Gets called if the ui changes the field (should modify the model) """
            self.model.analog_in.selected_ain_channel = channel_index
    
        def _on_ui_btn_connect_clicked(self):
            try:
                self.controller.open_device()
            except Exception as e:
                self.logger.error(f"Error: {e}")
    
    
        def _on_ui_sample_rate_changed(self, sample_rate: int):
            self.model.sample_rate = sample_rate
    
        def _on_model_sample_rate_changed(self, sample_rate: int):
            self._ui.sb_acquisition_rate.setRange(1, 1e9)
            self._ui.sb_acquisition_rate.setValue(sample_rate)
    
        def _ui_on_btn_recording_clicked(self):
            if not self.model.capturing_information.device_capturing_state == AD2Constants.CapturingState.RUNNING():
                self._ui.btn_record.setChecked(True)
                self.controller.start_capture()
            else:
                self._ui.btn_record.setChecked(False)
                self.controller.stop_capture()
    
        def _ui_on_btn_reset_clicked(self):
            self.scope_captured.clear()
            self.controller.reset_capture()
    
        # ==================================================================================================================
        # Device information changed
        # ==================================================================================================================
        def _on_selected_ain_channel_changed(self, channel):
            self.controller.set_selected_ain_channel(channel)
    
        def _on_selected_index_changed(self, index):
            self._ui.cb_channel_select.setCurrentIndex(index)
    
        def _on_connected_changed(self, connected):
            if connected:
                self.capt_info.lbl_conn_state.setText("Connected")
                self.capt_info.led_conn_state.set_color(color="green")
                self._ui.btn_pause.setEnabled(False)
                self._ui.btn_stop.setEnabled(False)
                self._ui.btn_record.setEnabled(False)
                self._ui.btn_reset.setEnabled(False)
                self._ui.btn_play.setEnabled(True)
                self._ui.btn_connect.setText("Disconnect")
            else:
                self.capt_info.lbl_conn_state.setText("Not connected")
                self._ui.btn_connect.setText("Connect")
                self._ui.btn_pause.setEnabled(False)
                self._ui.btn_stop.setEnabled(False)
                self._ui.btn_record.setEnabled(False)
                self._ui.btn_reset.setEnabled(False)
                self._ui.btn_play.setEnabled(False)
                self.capt_info.led_conn_state.set_color(color="red")
    
        # ============== Recording Flags (starting, stopping and pausing)
        def _on_capture_process_state_changed(self, capturing):
            if capturing == AD2Constants.CapturingState.RUNNING():
                self.capt_info.led_is_capt.set_color(color="green")
                self.capt_info.lbl_is_capt.setText(AD2Constants.CapturingState.RUNNING(True))
                self._ui.btn_record.setChecked(True)
            elif capturing == AD2Constants.CapturingState.PAUSED():
                self.capt_info.led_is_capt.set_color(color="yellow")
                self.capt_info.lbl_is_capt.setText(AD2Constants.CapturingState.PAUSED(True))
            elif capturing == AD2Constants.CapturingState.STOPPED():
                self.capt_info.led_is_capt.set_color(color="red")
                self.capt_info.lbl_is_capt.setText(AD2Constants.CapturingState.STOPPED(True))
                self._ui.btn_record.setChecked(False)
    
        def _on_ready_for_recording_changed(self, ready):
            if ready:
                self.capture_update_timer.start()
                self.stream_update_timer.start()
            else:
                self.capture_update_timer.stop()
                self.stream_update_timer.stop()
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
        def _on_device_name_changed(self, device_name):
            self.dev_info.device_name = device_name
    
        def _on_device_serial_number_changed(self, serial_number):
            self.dev_info.serial_number = serial_number
    
    
    
    
    
        # ============== Plotting
        def _on_capture_update_plot(self):
    
            if len(self.model.capturing_information.recorded_samples) > 0:
                self.scope_captured.clear()
                # print(self.ad2device.recorded_samples)
                d = self.model.capturing_information.recorded_samples
                self.scope_captured.plot(d, pen=pg.mkPen(width=1))
                # print(f"Length: {len(self.controller.recorded_sample_stream)}")
    
            #if len(self.controller.status_dqueue) > 0:
                # print(self.controller.status_dqueue)
            #    self.model.samples_captured = self.controller.status_dqueue[-1]["captured"]
            # self.model.samples_lost = d[1]["lost"]
            # self.model.samples_corrupted = d[1]["corrupted"]
            #self._ui.lcd_unconsumed_capture.display(self.model.unconsumed_capture_samples)
    
        def _on_stream_update_timer_timeout(self):
            self.scope_original.clear()
            # print(self.ad2device.recorded_samples)
    
            self.scope_original.plot(
                np.array(self.controller.streaming_dqueue),  # [::100],
                pen=pg.mkPen(width=1))
            # self._ui.lcd_unconsumed_stream.display(self.model.capturing_information.unconsumed_stream_samples)
    
    
        # ============== Connected Device Information
        def _on_num_of_connected_devices_changed(self, num_of_connected_devices):
            pass
    
        # ============== Acquisition Settings
    
        def _model_on_selected_ain_changed(self, channel):
            """ Gets called if the model is changed directly (should modify the UI)"""
            self._on_selected_ain_channel_changed(channel)
            self._ui.cb_channel_select.setCurrentIndex(channel)
    
    
    
    
        # ============== Analog In Information
        def _on_ain_channels_changed(self, list_of_ad_ins):
            self._ui.cb_channel_select.clear()
            for adin in list_of_ad_ins:
                self._ui.cb_channel_select.addItem(f"Channel {adin}")
    
        def _on_ain_buffer_size_changed(self, buffer_size):
            pass
    
        def _on_ain_bits_changed(self, bits):
            pass
    
    
        def _on_start_recording_changed(self, start_recording):
            self.logger.debug(f"Start Recording: {start_recording}")
            if start_recording:
                self._ui.btn_stop.setEnabled(True)
                # self._ui.btn_start_capture.setStyleSheet(PlayPushButton.style_pause())
                self._ui.btn_start_capture.pause()
                self._ui.btn_start_capture.setText("Pause Capture")
    
        def _on_stop_recording_changed(self, stop_recording):
            self.logger.debug(f"Stop Recording: {stop_recording}")
            if stop_recording:
                self._ui.btn_stop.setEnabled(False)
                # self._ui.btn_start_capture.setStyleSheet(CSSPlayPushButton.style_play())
                self._ui.btn_start_capture.play()
                self._ui.btn_start_capture.setText("Start Capture")
    
        def _on_pause_recording_changed(self, pause_recording):
            self._ui.btn_stop.setEnabled(True)
            # self._ui.btn_start_capture.setStyleSheet(CSSPlayPushButton.style_play())
            self._ui.btn_start_capture.play()
    
        def _on_reset_recording_changed(self, reset_recording):
            pass
    
    
    
    
    
        def closeEvent(self, args):
            print("Destroyed")
            self.controller.exit()
            self.destroyed.emit()