Skip to content
Snippets Groups Projects
Commit 8bf1a889 authored by Christoph Schmidt's avatar Christoph Schmidt
Browse files

UI now runs again - Added new files

parent 14fcf032
No related branches found
No related tags found
No related merge requests found
Showing
with 31 additions and 1363 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JupyterPersistentConnectionParameters">
<option name="moduleParameters">
<map>
<entry key="G:/Andere Computer/Arbeits-Laptop/Documents/2_mmWave/2_workspace/mmWaveProject/.idea/mmWaveProject.iml">
<value>
<JupyterConnectionParameters>
<option name="managed" value="true" />
</JupyterConnectionParameters>
</value>
</entry>
<entry key="G:/Meine Ablage/Dokumente/TU Graz/PhD Elektrotechnik/1_workspace/2d-to-3d/.idea/2d-to-3d.iml">
<value>
<JupyterConnectionParameters>
<option name="managed" value="true" />
</JupyterConnectionParameters>
</value>
</entry>
<entry key="G:/Meine Ablage/Dokumente/TU Graz/PhD Elektrotechnik/1_workspace/mmWave_lense_optimization/.idea/mmWaveProject.iml">
<value>
<JupyterConnectionParameters>
<option name="managed" value="true" />
</JupyterConnectionParameters>
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
# - Configuration file stored 2023-11-09 17:09:18.468374 - # - Configuration file stored 2023-11-11 17:00:09.246929 -
CaptDeviceConfig: #!!python/object:controller.CaptDeviceConfig CaptDeviceConfig: #!!python/object:controller.CaptDeviceConfig
sample_rate: 50000 # Sample rate: Sample rate of the device sample_rate: 50000 # Sample rate: Sample rate of the device
streaming_rate: 500 # Streaming rate: Streaming rate in Hz (should be below 1kHz) streaming_rate: 500 # Streaming rate: Streaming rate in Hz (should be below 1kHz)
......
# -*- coding: utf-8 -*-
"""
Author(s): Christoph Schmidt <christoph.schmidt@tugraz.at>
Created: 2023-10-19 12:35
Package Version:
"""
import confighandler as cfg
class CaptDeviceConfig(cfg.ConfigNode):
def __init__(self) -> None:
super().__init__()
self.sample_rate = cfg.Field(50000, friendly_name="Sample rate",
description="Sample rate of the device")
self.streaming_rate = cfg.Field(500, friendly_name="Streaming rate",
description="Streaming rate in Hz (should be below 1kHz)")
self.ain_channel = cfg.Field(0, friendly_name="Analog In Channel",
description="Analog in channel. Defines which channel is used for capturing.")
self.show_simulator = cfg.Field(True, friendly_name="Show Simulators",
description="Show available simulators in the device list "
"provided by the DreamWaves API.")
self.streaming_history = cfg.Field(2000, friendly_name="Streaming history (ms)",
description="Defines the range of the stream in ms")
# TODO: Old configs (not used and will probably removed in future)
self.total_samples = cfg.Field(200000)
self.sample_time = cfg.Field(45)
self.ad2_raw_out_file = cfg.Field("{output_directory}/measurement/ad2_raw/ad2_out_{wafer_nr}_{date}.csv")
self.register()
# -*- coding: utf-8 -*-
"""
Author(s): Christoph Schmidt <christoph.schmidt@tugraz.at>
Created: 2023-10-19 12:35
Package Version:
"""
from .CaptDeviceConfig import CaptDeviceConfig as Config
from .controller.AD2CaptDeviceController import AD2CaptDeviceController as Controller
from .model.AD2CaptDeviceModel import AD2CaptDeviceModel as Model
from .view.AD2CaptDeviceView import ControlWindow as View
\ No newline at end of file
## MATLAB ##
# Windows default autosave extension
*.asv
# OSX / *nix default autosave extension
*.m~
# Compiled MEX binaries (all platforms)
*.mex*
# Packaged app and toolbox files
*.mlappinstall
*.mltbx
# Generated helpsearch folders
helpsearch*/
# Simulink code generation folders
slprj/
sccprj/
# Matlab code generation folders
codegen/
# Simulink autosave extension
*.autosave
# Simulink cache files
*.slxc
# Octave session info
octave-workspace
## PYTHON ##
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintainted in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
Sacher_Lasertechnik/
measurements/
flexsensorpy/measurements/
.venv-waffel-untersucher/
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="3">
<item index="0" class="java.lang.String" itemvalue="scipy" />
<item index="1" class="java.lang.String" itemvalue="pyMKL" />
<item index="2" class="java.lang.String" itemvalue="opt_einsum" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N803" />
<option value="N802" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="dict.is_set" />
<option value="libs.LaserLib" />
<option value="Laser.LaserControl.controller.LaserSLLIB.laserSacher" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (adcaptdevicecontrol)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/adcaptdevicecontrol.iml" filepath="$PROJECT_DIR$/.idea/adcaptdevicecontrol.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>
\ No newline at end of file
# ADCaptDeviceControl
## Getting started
This is the module which allows to control an Analog Discovery device from Digilent. It is based on the [Analog Discovery SDK](https://reference.digilentinc.com/reference/software/waveforms/waveforms-3/reference-manual) and the [Waveforms SDK](https://reference.digilentinc.com/reference/software/waveforms/waveforms-3/reference-manual).
The module is made to stream and capture the data.
### Prerequisites
The Analog Discovery only works with the correct SDK installed.
The SDK can be found [here](https://reference.digilentinc.com/reference/software/waveforms/waveforms-3/start).
The SDK is only available for Windows, Mac and Linux. The module is only tested on Windows.
## Using the module
### Logging setup
This step is not mandatory, however helps you to see what is happening in the background.
Jus implement the following code in your main file and run the function
```python
import logging
from rich.logging import RichHandler
def setup_logging(window: ConsoleWindow = None):
for log_name, log_obj in logging.Logger.manager.loggerDict.items():
if log_name != '<module name>':
log_obj.disabled = True
# Format the Rich logger
FORMAT = "%(message)s"
if window is not None:
logging.basicConfig(
level="DEBUG", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True), window.handler
]
)
else:
logging.basicConfig(
level="DEBUG", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True)
]
)
# Setup the logging formatter.
setup_logging()
```
THis makes sure to have proper logging formatting.
### Connecting the model, controller and view
The module can be run with
````python
# This path is not included in this module. It is only included in flexsensorpy
# See the git repo under ./flexsensorpy/configs/init_config.yaml
vaut_config = VAutomatorConfig.load_config("../../configs/init_config.yaml")
# Init the AD model, controller and view
ad2_model = AD2CaptDeviceModel(vaut_config.ad2_device_config)
ad2_controller = AD2CaptDeviceController(ad2_model)
ad2_window = ControlWindow(ad2_model, ad2_controller)
ad2_window.show()
sys.exit(app.exec())
````
\ No newline at end of file
import os
from controller.AD2CaptDeviceController import AD2CaptDeviceController
from model.AD2CaptDeviceModel import AD2CaptDeviceModel
from view.AD2CaptDeviceView import ControlWindow
if os.environ.get('ADC_SIM') == "TRUE":
pass
else:
pass
# Init the AD model, controller and view
ad2_model = AD2CaptDeviceModel(vaut_config.ad2_device_config)
ad2_controller = AD2CaptDeviceController(ad2_model)
ad2_window = ControlWindow(ad2_model, ad2_controller)
"""
Main File for testing the module
(c) Christoph Schmidt, 2023
christoph.schmidt@tugraz.at
"""
import logging
import sys
#from generics.logger import setup_logging
from PySide6.QtWidgets import QApplication
from rich.logging import RichHandler
sys.path.append('../adcaptdevicecontrol')
import adcaptdevicecontrol
from ConfigHandler.controller.VAutomatorConfig import VAutomatorConfig
if __name__ == "__main__":
app = QApplication()
#setup_logging()
logging.warning("AD2CaptDeviceController.py is not meant to be run as a script.")
# This path is not included in this module. It is only included in flexsensorpy
# See the git repo under ./flexsensorpy/configs/init_config.yaml
vaut_config = VAutomatorConfig.load_config("../../configs/init_config.yaml")
ad2_window.show()
sys.exit(app.exec())
\ No newline at end of file
"""
DWFConstants (definitions file for DWF library)
Author: Digilent, Inc.
Revision: 2019-10-15
Must install:
Python 2.7 or 3
"""
from ctypes import *
# device handle
#HDWF
hdwfNone = c_int(0)
# device enumeration filters
enumfilterAll = c_int(0)
enumfilterType = c_int(0x8000000)
enumfilterUSB = c_int(0x0000001)
enumfilterNetwork = c_int(0x0000002)
enumfilterAXI = c_int(0x0000004)
enumfilterRemote = c_int(0x1000000)
enumfilterAudio = c_int(0x2000000)
enumfilterDemo = c_int(0x4000000)
# device ID
devidEExplorer = c_int(1)
devidDiscovery = c_int(2)
devidDiscovery2 = c_int(3)
devidDDiscovery = c_int(4)
devidADP3X50 = c_int(6)
# device version
devverEExplorerC = c_int(2)
devverEExplorerE = c_int(4)
devverEExplorerF = c_int(5)
devverDiscoveryA = c_int(1)
devverDiscoveryB = c_int(2)
devverDiscoveryC = c_int(3)
# trigger source
trigsrcNone = c_ubyte(0)
trigsrcPC = c_ubyte(1)
trigsrcDetectorAnalogIn = c_ubyte(2)
trigsrcDetectorDigitalIn = c_ubyte(3)
trigsrcAnalogIn = c_ubyte(4)
trigsrcDigitalIn = c_ubyte(5)
trigsrcDigitalOut = c_ubyte(6)
trigsrcAnalogOut1 = c_ubyte(7)
trigsrcAnalogOut2 = c_ubyte(8)
trigsrcAnalogOut3 = c_ubyte(9)
trigsrcAnalogOut4 = c_ubyte(10)
trigsrcExternal1 = c_ubyte(11)
trigsrcExternal2 = c_ubyte(12)
trigsrcExternal3 = c_ubyte(13)
trigsrcExternal4 = c_ubyte(14)
trigsrcHigh = c_ubyte(15)
trigsrcLow = c_ubyte(16)
trigsrcClock = c_ubyte(17)
# instrument states
DwfStateReady = c_ubyte(0)
DwfStateConfig = c_ubyte(4)
DwfStatePrefill = c_ubyte(5)
DwfStateArmed = c_ubyte(1)
DwfStateWait = c_ubyte(7)
DwfStateTriggered = c_ubyte(3)
DwfStateRunning = c_ubyte(3)
DwfStateDone = c_ubyte(2)
# DwfEnumConfigInfo
DECIAnalogInChannelCount = c_int(1)
DECIAnalogOutChannelCount = c_int(2)
DECIAnalogIOChannelCount = c_int(3)
DECIDigitalInChannelCount = c_int(4)
DECIDigitalOutChannelCount = c_int(5)
DECIDigitalIOChannelCount = c_int(6)
DECIAnalogInBufferSize = c_int(7)
DECIAnalogOutBufferSize = c_int(8)
DECIDigitalInBufferSize = c_int(9)
DECIDigitalOutBufferSize = c_int(10)
# acquisition modes:
acqmodeSingle = c_int(0)
acqmodeScanShift = c_int(1)
acqmodeScanScreen = c_int(2)
acqmodeRecord = c_int(3)
acqmodeOvers = c_int(4)
acqmodeSingle1 = c_int(5)
# analog acquisition filter:
filterDecimate = c_int(0)
filterAverage = c_int(1)
filterMinMax = c_int(2)
# analog in trigger mode:
trigtypeEdge = c_int(0)
trigtypePulse = c_int(1)
trigtypeTransition = c_int(2)
trigtypeWindow = c_int(3)
# trigger slope:
DwfTriggerSlopeRise = c_int(0)
DwfTriggerSlopeFall = c_int(1)
DwfTriggerSlopeEither = c_int(2)
# trigger length condition
triglenLess = c_int(0)
triglenTimeout = c_int(1)
triglenMore = c_int(2)
# error codes for the functions:
dwfercNoErc = c_int(0) # No error occurred
dwfercUnknownError = c_int(1) # API waiting on pending API timed out
dwfercApiLockTimeout = c_int(2) # API waiting on pending API timed out
dwfercAlreadyOpened = c_int(3) # Device already opened
dwfercNotSupported = c_int(4) # Device not supported
dwfercInvalidParameter0 = c_int(16) # Invalid parameter sent in API call
dwfercInvalidParameter1 = c_int(17) # Invalid parameter sent in API call
dwfercInvalidParameter2 = c_int(18) # Invalid parameter sent in API call
dwfercInvalidParameter3 = c_int(19) # Invalid parameter sent in API call
dwfercInvalidParameter4 = c_int(20) # Invalid parameter sent in API call
# analog out signal types
funcDC = c_ubyte(0)
funcSine = c_ubyte(1)
funcSquare = c_ubyte(2)
funcTriangle = c_ubyte(3)
funcRampUp = c_ubyte(4)
funcRampDown = c_ubyte(5)
funcNoise = c_ubyte(6)
funcPulse = c_ubyte(7)
funcTrapezium= c_ubyte(8)
funcSinePower= c_ubyte(9)
funcCustom = c_ubyte(30)
funcPlay = c_ubyte(31)
# analog io channel node types
analogioEnable = c_ubyte(1)
analogioVoltage = c_ubyte(2)
analogioCurrent = c_ubyte(3)
analogioPower = c_ubyte(4)
analogioTemperature = c_ubyte(5)
analogioDmm = c_ubyte(6)
analogioRange = c_ubyte(7)
analogioMeasure = c_ubyte(8)
analogioTime = c_ubyte(9)
analogioFrequency = c_ubyte(10)
analogioResistance = c_ubyte(11)
DwfDmmResistance = c_double(1)
DwfDmmContinuity = c_double(2)
DwfDmmDiode = c_double(3)
DwfDmmDCVoltage = c_double(4)
DwfDmmACVoltage = c_double(5)
DwfDmmDCCurrent = c_double(6)
DwfDmmACCurrent = c_double(7)
DwfDmmDCLowCurrent = c_double(8)
DwfDmmACLowCurrent = c_double(9)
DwfDmmTemperature = c_double(10)
AnalogOutNodeCarrier = c_int(0)
AnalogOutNodeFM = c_int(1)
AnalogOutNodeAM = c_int(2)
DwfAnalogOutIdleDisable = c_int(0)
DwfAnalogOutIdleOffset = c_int(1)
DwfAnalogOutIdleInitial = c_int(2)
DwfDigitalInClockSourceInternal = c_int(0)
DwfDigitalInClockSourceExternal = c_int(1)
DwfDigitalInSampleModeSimple = c_int(0)
# alternate samples: noise|sample|noise|sample|...
# where noise is more than 1 transition between 2 samples
DwfDigitalInSampleModeNoise = c_int(1)
DwfDigitalOutOutputPushPull = c_int(0)
DwfDigitalOutOutputOpenDrain = c_int(1)
DwfDigitalOutOutputOpenSource = c_int(2)
DwfDigitalOutOutputThreeState = c_int(3)
DwfDigitalOutTypePulse = c_int(0)
DwfDigitalOutTypeCustom = c_int(1)
DwfDigitalOutTypeRandom = c_int(2)
DwfDigitalOutTypeROM = c_int(3)
DwfDigitalOutTypeState = c_int(4)
DwfDigitalOutTypePlay = c_int(5)
DwfDigitalOutIdleInit = c_int(0)
DwfDigitalOutIdleLow = c_int(1)
DwfDigitalOutIdleHigh = c_int(2)
DwfDigitalOutIdleZet = c_int(3)
DwfAnalogImpedanceImpedance = c_int(0)
DwfAnalogImpedanceImpedancePhase = c_int(1)
DwfAnalogImpedanceResistance = c_int(2)
DwfAnalogImpedanceReactance = c_int(3)
DwfAnalogImpedanceAdmittance = c_int(4)
DwfAnalogImpedanceAdmittancePhase = c_int(5)
DwfAnalogImpedanceConductance = c_int(6)
DwfAnalogImpedanceSusceptance = c_int(7)
DwfAnalogImpedanceSeriesCapacitance = c_int(8)
DwfAnalogImpedanceParallelCapacitance = c_int(9)
DwfAnalogImpedanceSeriesInductance = c_int(10)
DwfAnalogImpedanceParallelInductance = c_int(11)
DwfAnalogImpedanceDissipation = c_int(12)
DwfAnalogImpedanceQuality = c_int(13)
DwfAnalogImpedanceVrms = c_int(14)
DwfAnalogImpedanceVreal = c_int(15)
DwfAnalogImpedanceVimag = c_int(16)
DwfAnalogImpedanceIrms = c_int(17)
DwfAnalogImpedanceIreal = c_int(18)
DwfAnalogImpedanceIimag = c_int(19)
DwfParamUsbPower = c_int(2) # 1 keep the USB power enabled even when AUX is connected, Analog Discovery 2
DwfParamLedBrightness = c_int(3) # LED brightness 0 ... 100%, Digital Discovery
DwfParamOnClose = c_int(4) # 0 continue, 1 stop, 2 shutdown
DwfParamAudioOut = c_int(5) # 0 disable / 1 enable audio output, Analog Discovery 1, 2
DwfParamUsbLimit = c_int(6) # 0..1000 mA USB power limit, -1 no limit, Analog Discovery 1, 2
DwfParamAnalogOut = c_int(7) # 0 disable / 1 enable
DwfParamFrequency = c_int(8) # Hz
DwfParamExtFreq = c_int(9) # Hz
DwfParamClockMode = c_int(10) # 0 internal, 1 output, 2 input, 3 IO
# obsolate
#STS
stsRdy = c_ubyte(0)
stsArm = c_ubyte(1)
stsDone = c_ubyte(2)
stsTrig = c_ubyte(3)
stsCfg = c_ubyte(4)
stsPrefill = c_ubyte(5)
stsNotDone = c_ubyte(6)
stsTrigDly = c_ubyte(7)
stsError = c_ubyte(8)
stsBusy = c_ubyte(9)
stsStop = c_ubyte(10)
#TRIGCOND
trigcondRisingPositive = c_int(0)
trigcondFallingNegative = c_int(1)
#use deiceid
enumfilterEExplorer = c_int(1)
enumfilterDiscovery = c_int(2)
enumfilterDiscovery2 = c_int(3)
enumfilterDDiscovery = c_int(4)
#!/.venv/Scripts/python
from ctypes import c_int, byref, create_string_buffer, cdll, c_int32, c_uint, c_double
from controller.BaseAD2CaptDevice import BaseAD2CaptDevice
from model.AD2CaptDeviceModel import AD2CaptDeviceModel
from constants.dwfconstants import enumfilterUSB, enumfilterType, enumfilterDemo
class AD2CaptDeviceController(BaseAD2CaptDevice):
def __init__(self, ad2capt_model: AD2CaptDeviceModel):
self.dwf = cdll.dwf
super().__init__(ad2capt_model)
# This is required for acquiring the data
def connect_device(self, device_id):
self.start_device_process(device_id)
return True
def read_hardware_config(self, iDevice):
hw_info_dict = {}
hdwf = c_int()
int0 = c_int()
int1 = c_int()
uint0 = c_uint()
dbl0 = c_double()
dbl1 = c_double()
dbl2 = c_double()
self.dwf.FDwfDeviceConfigOpen(c_int(iDevice), c_int(0), byref(hdwf))
if hdwf.value == 0:
szerr = create_string_buffer(512)
self.dwf.FDwfGetLastErrorMsg(szerr)
raise Exception(str(szerr.value))
self.dwf.FDwfAnalogInChannelCount(hdwf, byref(int0))
hw_info_dict["analog_in_channels"] = int(int0.value)
self.dwf.FDwfAnalogIOChannelCount(hdwf, byref(int0))
hw_info_dict["analog_io_channels"] = int(int0.value)
self.dwf.FDwfAnalogInBufferSizeInfo(hdwf, 0, byref(int0))
hw_info_dict["buffer_size"] = int(int0.value)
self.dwf.FDwfAnalogInBitsInfo(hdwf, byref(int0))
hw_info_dict["adc_bits"] = int(int0.value)
self.dwf.FDwfAnalogInChannelRangeInfo(hdwf, byref(dbl0), byref(dbl1), byref(dbl2))
hw_info_dict["range"] = (int(dbl0.value), int(dbl1.value), int(dbl2.value))
self.dwf.FDwfAnalogInChannelOffsetInfo(hdwf, byref(dbl0), byref(dbl1), byref(dbl2))
hw_info_dict["offset"] = (int(dbl0.value), int(dbl1.value), int(dbl2.value))
return hw_info_dict
def discover_connected_devices(self):
# enumerate connected devices
connected_devices = []
# for filter_type in [(c_int32(enumfilterType.value | enumfilterUSB.value), 'USB'),
# (c_int32(enumfilterType.value | enumfilterNetwork.value), 'Network'),
# (c_int32(enumfilterType.value | enumfilterAXI.value), 'AXI'),
# (c_int32(enumfilterType.value | enumfilterRemote.value), 'Remote'),
# (c_int32(enumfilterType.value | enumfilterAudio.value), 'Audio'),
# (c_int32(enumfilterType.value | enumfilterDemo.value), 'Demo')]:
cDevice = c_int()
# filter, type = (c_int32(enumfilterType.value | enumfilterUSB.value), 'USB')
filter, type = (c_int32(enumfilterType.value | enumfilterUSB.value | enumfilterDemo.value), 'USB')
self.dwf.FDwfEnum(filter, byref(cDevice))
self.model.num_of_connected_devices = cDevice
devicename = create_string_buffer(64)
serialnum = create_string_buffer(16)
for iDevice in range(0, cDevice.value):
self.dwf.FDwfEnumDeviceName(c_int(iDevice), devicename)
self.dwf.FDwfEnumSN(c_int(iDevice), serialnum)
hw_info = self.read_hardware_config(iDevice)
srn = str(serialnum.value.decode('UTF-8'))
if "demo" in srn.lower():
type = "Simulator "
con_dev_dict = {
'type': type,
'device_id': int(iDevice),
'device_name': str(devicename.value.decode('UTF-8')),
'serial_number': srn
}
con_dev_dict = dict(con_dev_dict, **hw_info)
connected_devices.append(con_dev_dict)
self.logger.info(connected_devices)
self.model.connected_devices = connected_devices
self.logger.info(f"Discovered {len(self.model.connected_devices)} devices.")
return self.model.connected_devices
def close_device(self):
self.end_process_flag.value = 1
# def _open_device(self, device_index):
# devicename = create_string_buffer(64)
# serialnum = create_string_buffer(16)
#
# self.dwf.FDwfEnumDeviceName(c_int(device_index), devicename)
# self.dwf.FDwfEnumSN(c_int(device_index), serialnum)
#
# self.model.device_name = devicename
# self.model.device_serial_number = serialnum
# # open device
# self.logger.info(f"[{self.pref} Task] Opening device #{device_index}...")
#
# # Opens a device identified by the enumeration index and retrieves a handle. To automatically
# # enumerate all connected devices and open the first discovered device, use index -1.
# self.dwf.FDwfDeviceOpen(c_int(device_index), byref(self.model.hdwf))
#
# if self.model.hdwf.value == hdwfNone.value:
# szerr = create_string_buffer(512)
# self.dwf.FDwfGetLastErrorMsg(szerr)
# # print(str(szerr.value))
# self.model.connected = False
# raise Exception(f"Failed to open device: {szerr.value}")
# else:
# self.model.connected = True
# self.get_analog_in_status()
# self.logger.info(f"[{self.pref} Task] Device connected!")
#
# def _setup_acquisition(self):
# # set up acquisition
# self.get_analog_in_status()
# self.logger.debug(f"[{self.pref} Task] Setup for acquisition. Wait 2 seconds for the offset to stabilize.")
# self.dwf.FDwfAnalogInChannelEnableSet(self.model.hdwf, c_int(self.model.analog_in_channel), c_int(1))
# self.dwf.FDwfAnalogInChannelRangeSet(self.model.hdwf, c_int(self.model.analog_in_channel), c_double(5))
# self.dwf.FDwfAnalogInAcquisitionModeSet(self.model.hdwf, acqmodeRecord)
# self.dwf.FDwfAnalogInFrequencySet(self.model.hdwf, c_double(self.model.hz_acquisition))
# self.dwf.FDwfAnalogInRecordLengthSet(self.model.hdwf, 0) # -1 infinite record length
# self.get_analog_in_status()
# # wait at least 2 seconds for the offset to stabilize
# time.sleep(2)
# self.logger.info(f"[{self.pref} Task] Setup for acquisition done.")
# return True
#
# # # ==================================================================================================================
# # # Acquisition
# # # ==================================================================================================================
# # @Slot()
# # def start_capture(self, capture):
# # if not self.model.connected:
# # self.logger.warning(f"[{self.pref} Task] No device connected. Connecting to first device.")
# # self.connect_device(0)
# # return False
# #
# # if capture:
# # self.logger.info(f"[{self.pref} Task] Setting up device for capturing.")
# # self.gen_sine()
# # self._setup_acquisition()
# # # if self._setup_acquisition():
# # # self.logger.info(f"[{self.pref} Task] Started capturing thread")
# # self.set_ad2_acq_status(True)
# # return self.thread_manager.start(self._capture)
# # else:
# # self.set_ad2_acq_status(False)
# # # return self._capture()
# #
# # def _capture(self):
# # self.model.capturing_finished = False
# # self.model.device_capturing = False
# # cAvailable = c_int()
# # cLost = c_int()
# # cCorrupted = c_int()
# #
# # self.logger.info(f"[{self.pref} Report] Capturing started. "
# # f"Waiting for start command: "
# # f"{self.model.start_recording}<->{self.model.stop_recording}")
# #
# # # Configures the instrument and start or stop the acquisition. To reset the Auto trigger timeout, set
# # # fReconfigure to TRUE.
# # # print("Starting oscilloscope")
# # self.dwf.FDwfAnalogInConfigure(self.model.hdwf, c_int(0), c_int(1))
# #
# # t0 = -1
# #
# # cSamples = 0
# # self.model.device_ready = True
# # self.logger.info(f"[{self.pref} Report] Capturing device is ready.")
# # while True:
# #
# # if self.model.start_recording and not self.model.stop_recording:
# # if t0 < 0:
# # self.logger.info(f"[{self.pref} Report] Start command received.")
# # self.model.capturing_finished = False
# # self.model.device_capturing = True
# # self.model.current_recorded_samples = []
# # timestamp = datetime.now()
# # t0 = time.time()
# # # print(f"Start ({cSamples})")
# # sts = self.get_analog_in_status()
# #
# # if cSamples == 0 and (
# # sts == DwfStateConfig or
# # sts == DwfStatePrefill or
# # sts == DwfStateArmed):
# # print('idle')
# # continue # Acquisition not yet started.
# #
# # # Retrieves information about the recording process. The data loss occurs when the device acquisition
# # # is faster than the read process to PC. In this case, the device recording buffer is filled and data
# # # samples are overwritten. Corrupt samples indicate that the samples have been overwritten by the
# # # acquisition process during the previous read. In this case, try optimizing the loop process for faster
# # # execution or reduce the acquisition frequency or record length to be less than or equal to the device
# # # buffer size (record length <= buffer size/frequency).
# # self.dwf.FDwfAnalogInStatusRecord(self.model.hdwf, # Interface handle
# # byref(cAvailable),
# # byref(cLost),
# # byref(cCorrupted))
# #
# # cSamples += cLost.value
# #
# # if cLost.value:
# # self.logger.warning(f"[{self.pref} Report] - Sample(s) lost ({cLost.value})")
# # self.model.fLost += int(cLost.value)
# # if cCorrupted.value:
# # self.logger.warning(f"[{self.pref} Report] - Samples(s) corrupted ({cCorrupted.value})")
# # self.model.fCorrupted += int(cCorrupted.value)
# #
# # # self.dwf.FDwfAnalogInStatusSamplesValid(self.hdwf, byref(self.cValid))
# # if cAvailable.value == 0:
# # # print(f"Nothing available {cAvailable.value}")
# # continue
# # else:
# # # print(f"Available: {cAvailable.value}")
# #
# # # if cSamples + cAvailable.value > self.ad2capt_model.n_samples:
# # # cAvailable = c_int(self.ad2capt_model.n_samples - cSamples)
# # rgdSamples = (c_double * cAvailable.value)()
# # # Retrieves the acquired data samples from the specified idxChannel on the AnalogIn instrument. It
# # # copies the data samples to the provided buffer.
# # self.dwf.FDwfAnalogInStatusData(self.model.hdwf,
# # c_int(self.model.analog_in_channel),
# # byref(rgdSamples),
# # cAvailable) # get channel 1 data
# # for s in rgdSamples:
# # self.model.current_recorded_samples.append(float(s))
# #
# # cSamples += cAvailable.value
# #
# # elif not self.model.start_recording and self.model.stop_recording:
# # t1 = time.time()
# # self.model.measurement_time = t1 - t0
# # self.get_analog_in_status()
# # self.model.capturing_finished = True
# # self.model.device_capturing = False
# # self.logger.info(f"Finished Thread. Acquisition took {self.model.measurement_time} s. "
# # f"Process captured {len(self.model.current_recorded_samples)} samples.")
# # # 1. Assign the current captured samples to a dict
# # self.model.all_recorded_samples.append({'timestamp': timestamp,
# # 'measurement_time': self.model.measurement_time,
# # 'num_samples': len(
# # self.model.current_recorded_samples),
# # 'acqRate': self.model.hz_acquisition,
# # 'samples': self.model.current_recorded_samples})
# # # Reset status bits
# # try:
# # time.sleep(1)
# # self.close_device()
# # except Exception as e:
# # print(e)
# # return
# # # return self.model.current_recorded_samples, self.model.measurement_time
# # else:
# # self.model.device_capturing = False
#
# # # ==================================================================================================================
# # #
# # # ==================================================================================================================
# # def gen_sine(self, channel=0, frequency=1):
# # self.logger.debug(f"[{self.pref} Task] Generating sine wave on output 0...")
# # self.dwf.FDwfAnalogOutNodeEnableSet(self.model.hdwf, c_int(channel), AnalogOutNodeCarrier, c_int(1))
# # self.dwf.FDwfAnalogOutNodeFunctionSet(self.model.hdwf, c_int(channel), AnalogOutNodeCarrier,
# # funcTrapezium) # sine
# # self.dwf.FDwfAnalogOutNodeFrequencySet(self.model.hdwf, c_int(channel), AnalogOutNodeCarrier,
# # c_double(frequency)) # 1Hz
# # self.dwf.FDwfAnalogOutNodeAmplitudeSet(self.model.hdwf, c_int(channel), AnalogOutNodeCarrier, c_double(2))
# # self.dwf.FDwfAnalogOutConfigure(self.model.hdwf, c_int(channel), c_int(1))
# # return self.model.hdwf
#
# # ==================================================================================================================
# #
# # ==================================================================================================================
# def close_device(self):
# # Resets and configures (by default, having auto configure enabled) all AnalogOut instrument
# # parameters to default values for the specified channel. To reset instrument parameters across all
# # channels, set idxChannel to -1.
# self.model.fLost = 0
# self.model.fCorrupted = 0
# self.model.start_recording = True
# self.model.stop_recording = False
# self.model.capturing_finished = False
# self.model.device_capturing = False
# self.model.connected = False
# self.model.device_state = 0
# self.model.dwf_version = "Unknown"
# self.model.device_serial_number = "Unknown"
# self.model.device_name = "Unknown"
# self.model.analog_in_channel = -1
#
# self.dwf.FDwfAnalogOutReset(self.model.hdwf, c_int(self.model.analog_in_channel))
# self.dwf.FDwfDeviceCloseAll()
# self.logger.info(f"[{self.pref} Task] Device closed.")
# def get_analog_in_status(self):
# sts: c_byte = c_byte()
# # Checks the state of the acquisition. To read the data from the device, set fReadData to TRUE. For
# # single acquisition mode, the data will be read only when the acquisition is finished.
# self.dwf.FDwfAnalogInStatus(self.model.hdwf, # Interface handle.
# c_int(1), # True, if data should be read
# byref(sts)) # Variable to receive the acquisition state
# self.model.device_state = sts.value
# return sts
# ==================================================================================================================
#
# ==================================================================================================================
import time
from datetime import datetime
import scipy
from PySide6.QtCore import Slot
from controller.BaseAD2CaptDevice import BaseAD2CaptDevice
from model.AD2CaptDeviceModel import AD2CaptDeviceModel
class AD2CaptDeviceSimulator(BaseAD2CaptDevice):
def __init__(self, ad2capt_model: AD2CaptDeviceModel):
super().__init__(ad2capt_model)
def connect_device(self, device_id):
self.logger.info("Connecting to simulator")
self._init_device_parameters()
self.model.dwf_version = "DWF SIM 1.0"
self.logger.info(f"[{self.pref} Report] DWF Version: {self.model.dwf_version}")
self.logger.info(f"[{self.pref} Task] Opening device Simulator...")
self.model.device_name = "ADC Simulator"
self.model.device_serial_number = "Simulator"
#
self.model.ain_device_state = 0
self.model.connected = True
self.update_device_information()
self.logger.info(f"[{self.pref} Task] Device connected!")
return True
def discover_connected_devices(self):
self.model.connected_devices = {
'type': "Simulator",
'device_id': 0,
'device_name': "Simulator AD2",
'serial_number': "0000"
}
@Slot()
def start_capture(self, capture):
if capture:
self.logger.info(f"[{self.pref} Task] Setting up device for capturing.")
# if self._setup_acquisition():
# self.logger.info(f"[{self.pref} Task] Started capturing thread")
self.set_ad2_acq_status(True)
return self.thread_manager.start(self._capture)
else:
self.set_ad2_acq_status(False)
# return self._capture()
def _capture(self):
self.model.capturing_finished = False
self.model.device_capturing_state = False
# Read the mat file from ./support
matf = scipy.io.loadmat("./support/simulator_support.mat")
self.model.device_ready = True
t0 = -1
curr_sample = 0
while True:
if self.model.start_recording and not self.model.stop_recording:
if t0 < 0:
self.logger.info(f"[{self.pref} Report] Start command received.")
self.model.capturing_finished = False
self.model.device_capturing_state = True
self.model.recorded_samples = []
timestamp = datetime.now()
t0 = time.time()
else:
curr_sample += 1
try:
# print(matf['amplitude'].flatten()[curr_sample])
if curr_sample % 200 == 0:
t2 = time.time()
# Append 10 samples at a time
self.model.recorded_samples.extend(
matf['amplitude'].flatten()[curr_sample-99:curr_sample]
)
t3 = time.time()
#print(f"{curr_sample}: Time took {t3-t2}")
except Exception as e:
print(e)
elif not self.model.start_recording and self.model.stop_recording:
t1 = time.time()
self.model.measurement_time = t1 - t0
# self.get_analog_in_status()
self.model.capturing_finished = True
self.model.device_capturing_state = False
self.logger.info(f"Finished Thread. Acquisition took {self.model.measurement_time} s. "
f"Process captured {len(self.model.recorded_samples)} samples.")
# 1. Assign the current captured samples to a dict
self.model.all_recorded_samples.append({'timestamp': timestamp,
'measurement_time': self.model.measurement_time,
'num_samples': len(
self.model.recorded_samples),
'acqRate': self.model.sample_rate,
'samples': self.model.recorded_samples})
try:
time.sleep(1)
self.close_device()
except Exception as e:
print(e)
return
def update_device_information(self):
self.model.ain_channels = [0]
self.model.ain_buffer_size = 1
self.model.aout_channels = [0]
self.model.selected_ain_channel = 1
def close_device(self):
# Resets and configures (by default, having auto configure enabled) all AnalogOut instrument
# parameters to default values for the specified channel. To reset instrument parameters across all
# channels, set idxChannel to -1.
self.model.fLost = 0
self.model.fCorrupted = 0
self.model.start_recording = True
self.model.stop_recording = False
self.model.capturing_finished = False
self.model.device_capturing_state = False
self.model.connected = False
self.model.ain_device_state = 0
self.model.dwf_version = "Unknown"
self.model.device_serial_number = "Unknown"
self.model.device_name = "Unknown"
self.model.selected_ain_channel = -1
self.logger.info(f"[{self.pref} Task] Device closed.")
import logging
import time
from abc import abstractmethod
from collections import deque
from PySide6.QtCore import QObject, QThreadPool
from controller.mp_AD2Capture.AD2StateMPSetter import AD2State
from controller.mp_AD2Capture.MPDeviceControl import mp_capture
from model.AD2CaptDeviceModel import AD2CaptDeviceModel, AD2CaptDeviceSignals
from model.AD2Constants import AD2Constants
from multiprocessing import Process, Queue, Value, Lock
class BaseAD2CaptDevice(QObject):
def __init__(self, ad2capt_model: AD2CaptDeviceModel):
super().__init__()
self.model = ad2capt_model
self.pref = "AD2CaptDev"
self.logger = logging.getLogger(f"AD2 Device")
self.signals = AD2CaptDeviceSignals()
self.thread_manager = QThreadPool()
self.kill_thread = False
# self.thread_manager.setMaxThreadCount(3)
# self.thread_manager.se
# self.thread_manager.setThreadPriority(QThread.HighestPriority)
self.lock = Lock()
self.proc = None
self.stream_data_queue = Queue()
self.capture_data_queue = Queue()
self.state_queue = Queue()
self.start_capture_flag = Value('i', 0, lock=self.lock)
self.end_process_flag = Value('i', False, lock=self.lock)
# Number of sa
self.streaming_data_dqueue: deque = None # a dqueue, initialize later
self.status_dqueue = deque(maxlen=int(1))
self.unconsumed_capture_data = 0
self.discover_connected_devices()
@abstractmethod
def connect_device(self, device_id):
raise NotImplementedError
@abstractmethod
def discover_connected_devices(self):
raise NotImplementedError
@abstractmethod
def update_device_information(self):
raise NotImplementedError
@abstractmethod
def _capture(self):
raise NotImplementedError
@abstractmethod
def close_device(self):
raise NotImplementedError
def set_ad2_acq_status(self, record):
if record:
self.model.start_recording = True
self.model.stop_recording = False
self.logger.info(f"[{self.pref} Task] >>>>>>>>>> Started acquisition!")
elif record == False:
self.model.start_recording = False
self.model.stop_recording = True
self.logger.info(f"[{self.pref} Task] >>>>>>>>>>> Stopped acquisition!")
else:
self.model.start_recording = False
self.model.stop_recording = False
self.logger.info(f"[{self.pref} Task] >>>>>>>>>>> Reset acquisition!")
def _init_device_parameters(self):
pass
#sample_rate = int(self.model.ad2captdev_config.get_sample_rate())
#total_samples = int(self.model.ad2captdev_config.get_total_samples())
#channel = 0 # TODO Read channel from input
#self.model.sample_rate = int(sample_rate)
#self.model.n_samples = int(total_samples)
#self.model.selected_ain_channel = int(channel)
#self.logger.info(f"AD2 device initialized {self.model.selected_ain_channel} with "
# f"acquisition rate {self.model.sample_rate} Hz and "
# f"samples {self.model.n_samples}")
# ==================================================================================================================
#
# ==================================================================================================================
def clear_data(self):
self.model.recorded_samples = []
self.model.recorded_sample_stream = []
def start_capture(self, clear=True):
print(f"Start capture. Clear {clear}")
self.start_capture_flag.value = 1
if clear:
self.model.recorded_samples = []
self.model.recorded_sample_stream = []
self.model.start_recording = True
self.model.stop_recording = False
self.model.device_capturing_state = AD2Constants.CapturingState.RUNNING()
def stop_capture(self):
print("Stop capture")
self.start_capture_flag.value = 0
self.model.start_recording = False
if self.model.reset_recording:
self.model.device_capturing_state = AD2Constants.CapturingState.STOPPED()
self.model.stop_recording = True
else:
self.model.device_capturing_state = AD2Constants.CapturingState.PAUSED()
def capture(self, start, clear=True):
if start:
self.start_capture(clear)
else:
self.stop_capture()
def reset_capture(self):
self.logger.info("Resetting captured samples for new measurement.")
self.model.recorded_samples = []
self.model.measurement_time = 0
self.model.capturing_finished = False
# ==================================================================================================================
def start_device_process(self, device_id):
self.logger.info(f"[{self.pref} Task] Starting capturing process...")
#self.logger.debug(f"Dataqueue maxlen={int(self.model.duration_streaming_history * self.model.sample_rate)}")
self.streaming_data_dqueue = deque(maxlen=int(self.model.duration_streaming_history * self.model.sample_rate))
#print(self.model.duration_streaming_history * self.model.sample_rate)
self.stream_data_queue.maxsize = int(self.model.duration_streaming_history * self.model.sample_rate)
self.proc = Process(target=mp_capture,
args=(
self.stream_data_queue, self.capture_data_queue, self.state_queue,
self.start_capture_flag, self.end_process_flag,
device_id, self.model.selected_ain_channel, self.model.sample_rate)
)
self.proc.start()
# self.thread_manager.moveToThread(())
self.thread_manager.start(self.qt_consume_data)
self.thread_manager.start(self.qt_stream_data)
self.thread_manager.start(self.qt_get_state)
def qt_consume_data(self):
"""Consume data from the queue and plot it. This is a QThread."""
while not self.kill_thread and not bool(self.end_process_flag.value):
while self.capture_data_queue.qsize() > 0:
self.model.unconsumed_capture_data = self.capture_data_queue.qsize()
d, s = self.capture_data_queue.get()
[self.model.recorded_samples.append(e) for e in d]
# self.model.samples_captured = len(self.model.recorded_samples)
self.status_dqueue.append(s)
#time.sleep(0.01)
self.logger.info("Capture Data consume thread ended")
def qt_stream_data(self):
nth_cnt = 1
nth = 2
while not self.kill_thread and not bool(self.end_process_flag.value):
while self.stream_data_queue.qsize() > 0:
self.model.unconsumed_stream_samples = self.stream_data_queue.qsize()
for d in self.stream_data_queue.get()[0]:
#if nth_cnt == nth:
self.streaming_data_dqueue.append(d)
# nth_cnt = 0
#nth_cnt += 1
#time.sleep(0.01)
self.logger.info("Streaming data consume thread ended")
def qt_get_state(self):
while not self.kill_thread and not bool(self.end_process_flag.value):
while self.state_queue.qsize() > 0:
self._set_ad2state_from_process(self.state_queue.get())
#time.sleep(0.1)
self.logger.info("Status data consume thread ended")
def _set_ad2state_from_process(self, ad2state: AD2State):
# print(ad2state.__dict__)
self.model.pid = ad2state.pid
self.model.dwf_version = ad2state.dwf_version
self.model.connected = ad2state.connected
self.model.device_name = ad2state.device_name
self.model.device_serial_number = ad2state.device_serial_number
self.model.device_index = ad2state.device_index
if ad2state.acquisition_state == AD2Constants.CapturingState.RUNNING():
self.logger.info("[START ACQ] Started acquisition")
self.model.capturing_finished = False
self.model.start_recording = True
self.model.stop_recording = False
elif ad2state.acquisition_state == AD2Constants.CapturingState.STOPPED():
if self.model.start_recording:
self.model.capturing_finished = True
self.logger.info(f"[STOP ACQ] Finished acquisition {self.model.capturing_finished}.")
# Emit a signal, that the Capturing has finished
self.model.start_recording = False
self.model.stop_recording = True
self.model.device_capturing_state = ad2state.acquisition_state
self.model.sample_rate = ad2state.sample_rate
self.model.selected_ain_channel = ad2state.selected_ain_channel
self.model.ain_channels = ad2state.ain_channels
self.model.ain_buffer_size = ad2state.ain_buffer_size
self.model.ain_bits = ad2state.ain_bits
self.model.ain_device_state = ad2state.ain_device_state
self.model.recording_time = ad2state.recording_time
# ==================================================================================================================
# Destructor
# ==================================================================================================================
def stop_process(self):
self.end_process_flag.value = True
time_start = time.time()
while self.proc.is_alive():
time.sleep(0.1)
self.logger.warning(f"AD2 process exited after {time.time()-time_start}s")
self.kill_thread = True
def __del__(self):
self.logger.info("Exiting AD2 controller")
self.stop_process()
self.logger.warning("AD2 controller exited")
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment