import socket
from pathlib import Path
from functools import wraps
import json
import numpy as np
from enum import Enum
from collections import defaultdict
from typing import Any, Literal, Dict, List, Union, Optional, Callable
import logging
import re
import threading
import queue
from concurrent import futures
import copy
import struct
import functools
import time
from qass.tools.networking.errors import ConnectionError, AnalyzerError, AnalyzerVersionError
from qass.tools.networking.constants import (
Amplitudes,
AnalyzerRunStatus,
ChannelPorts,
Channels,
FFTLogarithmic,
FFTOversampling,
FFTWindowing,
MultiPreampInput,
PreampPorts,
PreampType,
Samplerates16Bit,
SysAmplitudesType,
run_status_mapping
)
from packaging import version
def required_version(min_: Union[str, None] = None, max_: Union[str, None] = None):
"""Wrapper to check the Analyzer4D version before executing a command
This wrapper only works for methods of the AnalyzerRemote class and will
throw an error if used in other objects.
:param min_: The minimum version as a string in the format "01.01.01.01"
:param max_: The maximum version as a string in the format "01.01.01.01"
"""
def inner(f):
@wraps(f)
def wrapper(*args, **kwargs):
conn = args[0]
assert isinstance(conn, AnalyzerRemote), "First argument was not an AnalyzerRemote object"
if not conn.check_version(min_, max_):
current_version = conn.get_analyzer_version()
raise AnalyzerVersionError("For this command the Analyzer4D version must be "
f"{'>=' + min_ if min_ is not None else ''}"
f"{' ' if min_ is not None else ''}"
f"{'<=' + max_ if max_ is not None else ''}"
f"{' ' if max_ is not None else ''}"
f"but was {current_version}")
res = f(*args, **kwargs)
return res
return wrapper
return inner
[docs]
class AnalyzerRemote():
""" Class provides methods for external analyzer control (system operator independant) over a TCP socket. Every method that gets a response is able to set a custom timeout for analyzer reponse. Should any kind of bugs happen without TCP socket crashing, Queue timeout will run into failstate. """
def __init__(self, ip: str, port: int = 17000, timeout: int = 1, auto_stop: Optional[List] = None):
""" Constructor provides helper and creates logger module.
:param ip: Analyzer IP in network.
:type ip: str
:param port: Required Analyzer port, by the default always 17000.
:type port: int
:param debug_mode: Logs debug messages into sys.stdout
:type debug_mode: bool
:param timeout: Sets global timeout for queue object in seconds, default is 4
:type timeout: (pos) int
:param suppress_cb_exceptions: Flag to supress raised exceptions in callback functions, default True
:type suppress_cb_exceptions: bool
:param auto_stop: Parse list with commands to auto stop certain services started in Analyzer4D Software, by exit with statement, defaults to None
:type auto_stop: List
================== ==========================================================================================================================================
auto_stop commands services
================== ==========================================================================================================================================
all Activate service to send command for stopping beforehand remote startet service: measuring, monitoring, sine generator, operator functions
measuring Activate service to stop remote started measuring
sineGen Activate service to stop remote started sine generator
monitoring Activate service to stop remote started monitoring
operatorFunctions Activate service to stop remote started operator function output
analyzer_version Version number of connected Analyzer
================== ==========================================================================================================================================
"""
# helper
self.ip = socket.gethostbyname(ip)
self.port = port
self.timeout = timeout # seconds
# message ID to assign command to analyzer and specific response
self.translator = {True: "true", "start": "true", "true": "true",
"beginn": "true", "enabled": "true", "enable": "true", "on": "true",
False: "false", "stop": "false", "end": "false", "disabled": "false",
"false": "false", "disable": "false", "off": "false", "monitor": "monitor"}
# flags for exit method of context manager
self._io_report_count = 0
self.auto_stop = auto_stop
self._proc_report_count = 0
self._appvar_report_count = 0
self._measuring_active = False
self._sine_gen_active = False
self._monitoring_active = False
self._operator_functions_active = False
self._analyzer_version = None
self._sine_gen_active = False
self._callback_queue = queue.Queue()
self._msg_id :int = 0
self._socket : Optional[socket.socket] = None
self._requests : Dict[int,futures.Future] = {}
self._processing_data = threading.Event()
self._processing_callbacks = threading.Event()
self._receiver_thread : Optional[threading.Thread] = None
self._callback_thread : Optional[threading.Thread] = None
self._msg_id_lock = threading.Lock()
self._socket_lock = threading.Lock()
self._requests_lock = threading.Lock()
self._io_callbacks = []
self._all_appvar_callbacks = []
self._single_appvar_callbacks = defaultdict(list)
self._processnumber_callbacks = []
self._fieldbus_input_callbacks = []
self._fieldbus_output_callbacks = []
self._callback_registered_processnumber = False
self._callback_registered_io = False
self._callback_registered_operator = False
self._callback_registered_appvar = False
self._callback_registered_fieldbus_input = False
self._callback_registered_fieldbus_output = False
self.logger = logging.getLogger("qass.tools.networking")
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.disconnect()
def connect(self):
with self._socket_lock:
if self._socket:
raise ConnectionError(f'Already connected with: {self._socket.getpeername()}')
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.bind(('0.0.0.0', 0))
tcp_socket.connect((self.ip,self.port))
self._socket = tcp_socket
self.logger.info(f'Connect to {self._socket.getpeername()}')
# reset callback queue in case that the connection is reused
self._callback_queue = queue.Queue()
if not self._processing_data.is_set():
self._receiver_thread = threading.Thread(name='AnalyzerRemoteDataReceiver', target=self._receive_data, daemon=True)
self._receiver_thread.start()
if not self._processing_callbacks.is_set():
self._callback_thread = threading.Thread(name='AnalyzerRemoteCallbackExecutor', target=self._execute_callbacks, daemon=True)
self._callback_thread.start()
time.sleep(0.1)
self._callback_registered_processnumber = False
self._callback_registered_io = False
self._callback_registered_operator = False
self._callback_registered_appvar = False
self._callback_registered_fieldbus_input = False
self._callback_registered_fieldbus_output = False
try:
if len(self._processnumber_callbacks) > 0:
self._send_request(cmd="reportprocessnumber", p1="true")
self._callback_registered_processnumber = True
if len(self._io_callbacks) > 0:
self._send_request(cmd="reportio", p1="true")
self._callback_registered_io = True
if len(self._all_appvar_callbacks) > 0:
self._send_request(cmd="reportappvars", p1="true")
self._callback_registered_appvar = True
if len(self._fieldbus_input_callbacks) > 0:
self._send_request(cmd="reportprofibus", p1="true",p2="rx")
self._callback_registered_fieldbus_input = True
if len(self._fieldbus_output_callbacks) > 0:
self._send_request(cmd="reportprofibus", p1="true",p2="tx")
self._callback_registered_fieldbus_output = True
elif len(self._single_appvar_callbacks) > 0:
self._send_request(cmd='reportappvar', p1='add', p2=';'.join(self._single_appvar_callbacks.keys()))
except Exception as e:
raise ConnectionRefusedError from e
def disconnect(self):
if self.auto_stop is not None and len(self.auto_stop) > 0:
if "sineGen" in self.auto_stop and self._sine_gen_active:
self._send_request(cmd="AppCmd", p1="StopSineGen")
if "measuring" in self.auto_stop and self.is_measuring():
self.stop_measuring()
if "monitoring" in self.auto_stop and self.is_monitoring():
self.stop_monitoring()
self._processing_data.clear()
with self._socket_lock:
if self._socket:
try:
self._socket.shutdown(socket.SHUT_RDWR)
except OSError:
self.logger.warning('Socket was not closed regulary')
if self._receiver_thread and self._receiver_thread.is_alive():
self._receiver_thread.join(1)
if self._receiver_thread.is_alive():
self.logger.error(f'Failed to stop Thread: {self._receiver_thread.name}')
if self._callback_thread and self._callback_thread.is_alive():
self._callback_thread.join(3)
if self._callback_thread.is_alive():
self.logger.error(f'Failed to stop Thread: {self._callback_thread.name}')
@property
def connected(self):
return self._socket is not None
@property
def host_address(self):
with self._socket_lock:
if self._socket:
ip, port = self._socket.getpeername()
return f"{ip}:{port}"
return ""
@property
def local_address(self):
with self._socket_lock:
if self._socket:
ip, port = self._socket.getsockname()
return f"{ip}:{port}"
return ""
@property
def get_sine_gen_state(self):
""" Property that gives out if sine generator has been activated remotely.
:rtype: boolean
"""
return self._sine_gen_active
def get_run_status(self) -> AnalyzerRunStatus:
response = self._send_request(cmd='AppFunc', p1='Status', p2='run')
return AnalyzerRunStatus(run_status_mapping[response['result']])
[docs]
def is_measuring(self) -> bool:
"""
Check whether the Analyzer4D software is measuring.
:returns: True, if measuring. False, otherwise.
:rtype: boolean
"""
response = self._send_request(cmd='AppFunc', p1='Status', p2='run')
return response['result'] == AnalyzerRunStatus.MEASURE.name
[docs]
def is_monitoring(self) -> bool:
"""
Check whether the Analyzer4D software is monitoring.
:returns: True, if monitoring. False, otherwise.
:rtype: boolean
"""
response = self._send_request(cmd='AppFunc', p1='Status', p2='run')
return response['result'] == AnalyzerRunStatus.MONITOR.name
[docs]
def is_trigger_loop_activated(self) -> bool:
"""
Get state of trigger loop.
:returns: True, if trigger loop is activated, False otherwise.
:rtype: bool
"""
response = self._send_request(cmd='AppFunc', p1='Status', p2='trigger')
if response['result'] == 'running':
return True
elif response['result'] == 'deactivated':
return False
else:
raise ValueError(f'Unexpected result {response["result"]}. Expected "running" or "deactivated"!')
def enable_trigger_loop(self):
self._send_request(cmd="AppCmd", p1="sysTriggerLoop", p2="on")
def disable_trigger_loop(self):
self._send_request(cmd="AppCmd", p1="sysTriggerLoop", p2="off")
def enable_automation(self):
self._send_request(cmd='AppCmd', p1='sysAutomation', p2='on')
def disable_automation(self):
self._send_request(cmd='AppCmd', p1='sysAutomation', p2='off')
[docs]
def set_global_function_timeout(self, timeout:int) -> None:
"""Sets the global timeout for all function to a higher value. Single function can further be overwritten by custom_timeout."""
self.timeout = timeout
[docs]
def get_analyzer_version(self):
"""
Checks if analyzer version is already determined and saved in self._analyzer_version.
Otherwise get_project_info() is used to determine the analyzer version
:raises ConnectionError: Raises if determination of version number fails
:rtype: str
"""
if self._analyzer_version is None:
project_info = self.get_project_info()
if 'analyzerversion' in project_info:
self._analyzer_version = project_info['analyzerversion']
else:
raise ConnectionError("Analyzer Version determination failed!")
return self._analyzer_version
[docs]
def check_version(self, minimum_needed_version:Union[str,None]=None, maximum_supported_version:Union[str,None]=None):
""" Method checks if the version of the connected Analyzer is larger equal to the minimum_needed_version and smaller equal the maximum_supported_version. Ignores minimum_needed_version or maximum_supported_version when they are None. Determines the analyzer version with get_analyzer_version()
:param minimum_needed_version: minimum Analyzer Version needed to use feature. can have different structure than actual version(more or less numbers).
:type project_name: str, optional
:param maximum_supported_version: maximum Analyzer Version that supports feature. can have different structure than actual version(more or less numbers).
:type project_name: str, optional
:rtype: bool
"""
version_analyzer = version.parse(self.get_analyzer_version())
if minimum_needed_version is not None:
if version_analyzer < version.parse(minimum_needed_version):
return False
if maximum_supported_version is not None:
if version_analyzer > version.parse(maximum_supported_version):
return False
return True
[docs]
def start_measuring(self, custom_timeout=None) -> None:
""" Method sends a command to the connected analyzer to start a measuring process.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="startMeasuring", user_timeout=custom_timeout)
def start_monitoring(self, custom_timeout=None):
self._send_request(cmd="AppCmd", p1="startMonitoring", user_timeout=custom_timeout)
[docs]
def start_sineGenerator(self, frequency: int, amplitude: Union[int, str, Amplitudes], expert:bool=False, custom_timeout=None) -> None:
""" Method to start sine wave generation with custom frequency and amplitude settings.
.. warning:: Sine generator has to be already switched on!
.. note:: Note that you should consider that the sine generator needs a couple of µs to start.
:param int frequency: Used frequency to generate sine wave with in Hz. The suitable range is between 50Hz and 1200Hz (not considerd experts).
:param amplitude: Used amplitude to generate sine wave in mV (e.g. 955, 'AMP_955_mV' or Amplitudes.AMP_955_mV). Only discrete amplitude values are valid.
:type amplitude: int, str, Amplitudes
:param bool expert: Expert mode to disable all user safety structure. Auto evaluation of supported amplitudes and sine waves is disabled.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises ValueError: Set amplitude has to be equal to an int, string or object of class Amplitudes. The frequency value has to to be valid. If exception is raised the program is terminated.
"""
if not expert:
min_frequency = 50
max_frequency = 1200
if not (frequency >= min_frequency and frequency <= max_frequency):
self.logger.error(f'SineGenerator will not be started! Frequency of {frequency}Hz is not in the range of {min_frequency}Hz...{max_frequency}Hz.')
raise ValueError
if isinstance(amplitude, int):
if any(x.value == amplitude for x in Amplitudes):
pass
else:
self.logger.error(f'SineGenerator will not be started! Amplitude {amplitude} is not supported.')
raise ValueError
elif isinstance(amplitude, str):
if any(x.name == amplitude for x in Amplitudes):
amplitude = Amplitudes[amplitude].value
else:
self.logger.error(f'SineGenerator will not be started! Amplitude {amplitude} is not supported.')
raise ValueError
elif isinstance(amplitude, Enum):
if any(x.name == str(amplitude.name) for x in Amplitudes):
amplitude = amplitude.value
else:
self.logger.error(f'SineGenerator will not be started! Amplitude {amplitude} is not supported.')
raise ValueError
else:
self.logger.error(f'SineGenerator will not be started! Amplitude {amplitude} is not supported.')
raise ValueError
self._send_request(cmd="AppCmd", p1="StartSineGen", p2=f"{frequency} {amplitude}")
self.logger.info(f"Sine generator started with f={frequency}Hz and {amplitude}mV amplitude.")
self._sine_gen_active = True
[docs]
def stop_sineGenerator(self, custom_timeout=None) -> None:
""" Stops generating sine waves.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="StopSineGen", user_timeout=custom_timeout)
self._sine_gen_active = False
[docs]
def stop_measuring(self, custom_timeout=None) -> None:
""" Method to stop a currently running measurement.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="stopMeasuring", user_timeout=custom_timeout)
def stop_monitoring(self, custom_timeout=None):
self._send_request(cmd="AppCmd", p1="stopMonitoring", user_timeout=custom_timeout)
[docs]
def set_area_view(self, split: int, custom_timeout=None) -> None:
""" Set analyzer view to a split view with up to 4 different splitted process. Reversed process to change back to
single view.
:param split: Amount of splitted area views. Limited to 4.
:type split: int
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises ValueError: Raises if split lays out of bounds
"""
if 0 < split <= 4:
self._send_request(user_timeout=custom_timeout, cmd="AppCmd", p1="SetAreaViews", p2=split)
else:
self.logger.error("Split amount for view is out of bounds.")
raise ValueError("Split amount for view is out of bounds.")
[docs]
def save_area_view(self, template_num: int, custom_timeout=None) -> None:
""" Saves current area view settings under template number. Each template can be set differently.
:param template_num: Storage number to save.
:type template_num: int
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="SaveAreaView", p2=template_num, user_timeout=custom_timeout)
[docs]
def load_area_view(self, template_num: int, custom_timeout=None) -> None:
""" Load presaved (!) area view template.
:param template_num: Storage number to load.
:type template_num: int
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="LoadAreaView", p2=template_num, user_timeout=custom_timeout)
[docs]
def load_simulation_buffer(self, file_path: str, channel: int, do_not_copy_meta_data=False, custom_timeout=None) -> None:
""" Load and set local simulation buffer for specific channel.
.. warning:: AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
:param file_path: Local file path to buffer.
:type file_path: str
:param channel: Channel where simulationbuffer gets loaded.
:type channel: int
:param do_not_copy_meta_data: Identical to analyzer check box, defaults to False
:type do_not_copy_meta_data: bool, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
# AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
channel += 1
if do_not_copy_meta_data:
p2_string = f"channel {channel} nometa path {file_path}"
else:
p2_string = f"channel {channel} path {file_path}"
self._send_request(cmd="AppCmd", p1="SimulationBuffer", p2=p2_string, user_timeout=custom_timeout)
[docs]
def set_simulation_buffer(self, channel_number: Union[str, int, ChannelPorts], mode: str, custom_timeout=None) -> None:
""" Enable or disable already loaded simualtion buffer channel.
:param channel_number: Channel to activate simualtion buffer on. Beside normal input, key "all" is supported.
:type channel_number: str or int or ChannelPorts
:param mode: If channel should be "enabled" or "disabled" as sim buffer. Check Translator dict for more keywords.
:type mode: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
if channel_number == "all":
self._send_request(cmd="AppCmd", p1="SimulationBuffer", p2=f"path {self.translator[mode]}", user_timeout=custom_timeout)
else:
channel_number = int(channel_number) + 1
self._send_request(cmd="AppCmd", p1="SimulationBuffer", p2=f"channel {channel_number} {self.translator[mode]}", user_timeout=custom_timeout)
[docs]
def start_pulsetest_channel(self, channel_number: Union[int, Channels], gain: int = 800, count: int = 1, delay: int = 0, custom_timeout=None) -> None:
""" External set of pulse test. Only avaible for exisiting ports and sensors.
.. warning:: AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
:param channel_number: Channel where pulsetest gets executed.
:type channel_number: int or Channels
:param gain: Used gain for pulsetest, defaults to 800
:type gain: int, optional
:param count: Used count for pulsetest, defaults to 1
:type count: int, optional
:param delay: Used delay for pulsetest, defaults to 0
:type delay: int, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises ValueError: If gain is out of bounds: range(0,4096) | If count is out of bounds: range(0,200) | If delay is out of bounds: smaller zero
"""
# AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
channel_number += 1
# check params limits
if not 0 <= gain < 4096 and not 0 <= count < 201 and not 0 <= delay:
self.logger.error("Params out of bounds")
raise ValueError("Params out of bounds")
p2_string = f"channel {channel_number} pulsetest {gain} {count} {delay}"
self._send_request(cmd="AppCmd", p1="Preamp", p2=p2_string, user_timeout=custom_timeout)
[docs]
def start_pulsetest_port(self, port_number: Union[int, PreampPorts], gain: int = 800, count: int = 1, delay: int = 0, multi_preamp_input: Union[int, MultiPreampInput] = MultiPreampInput.NONE_MULTI_INPUT, custom_timeout=None) -> None:
""" External set of pulse test. Only avaible for exisiting ports and sensors.
.. warning:: AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
:param port_number: Port where pulsetest gets executed.
:type port_number: int or PreampPorts
:param gain: Pulsetest gain in range(0,4096, defaults to 800
:type gain: int
:param count: Pulsetest count in range(0,200, defaults to 1
:type count: int
:param delay: Pulsetest delay in ms, defaults to 0
:type delay: int
:param multi_preamp_input: Input number for Multi Input Preamps, defaults to NONE. Default case is useable for none MultiInput Premaps.
:type multi_preamp_input: int or MultiPreampInputs
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises ValueError: If gain is out of bounds: range(0,4096) | If count is out of bounds: range(0,200) | If delay is out of bounds: smaller zero
"""
# ..warning AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
port_number += 1
# check params limits
if not 0 <= gain < 4096 and not 0 <= count < 201 and not 0 <= delay:
self.logger.error("Params out of bounds")
raise ValueError("Params out of bounds")
if multi_preamp_input == MultiPreampInput.NONE_MULTI_INPUT:
p2_string = f"port {port_number} pulsetest {gain} {count} {delay}"
else:
# ..warning AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
multi_preamp_input += 1
p2_string = f"port {port_number} {multi_preamp_input} pulsetest {gain} {count} {delay}"
self._send_request(cmd="AppCmd", p1="Preamp", p2=p2_string, user_timeout=custom_timeout)
[docs]
def start_frequency_test_port(self, port_number: Union[int, PreampPorts], multi_preamp_input: Union[int, MultiPreampInput] = MultiPreampInput.NONE_MULTI_INPUT, custom_timeout=None) -> None:
""" Execute a frequency test for a specific port.
.. warning:: AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
:param port_number: Port number for frequency test
:type port_number: int or PreampPorts
:param preamp_input: Used Input
:type preamp_input: int or MultiPreampInput, defaults to NONE_MULTI_INPUT for no multi input preamp
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
# ..warning AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
port_number += 1
if multi_preamp_input == MultiPreampInput.NONE_MULTI_INPUT:
self._send_request(cmd="AppCmd", p1="Preamp", p2=f"port {port_number} frqtest", user_timeout=custom_timeout)
else:
# ..warning AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
multi_preamp_input += 1
self._send_request(cmd="AppCmd", p1="Preamp", p2=f"port {port_number} input {multi_preamp_input} frqtest", user_timeout=custom_timeout)
[docs]
def start_frequency_test_channel(self, channel_number: Union[int, Channels], custom_timeout=None) -> None:
""" Execute a frequency test for a specific port. Analyzer isn't resonsing in any way (not in a visual, acoustic or information way).
.. warning:: AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:param port_number: Channel number for frequency test
:type port_number: int or Channels
"""
# ..warning AppCmds are user functions and due to that not null based. Implemented IntEnums are code based and have to be added by one each.
channel_number += 1
self._send_request(cmd="AppCmd", p1="Preamp", p2=f"channel {channel_number} frqtest", user_timeout=custom_timeout)
[docs]
def set_area_scale(self, area_number: int, scale: int = 500, custom_timeout=None) -> None:
""" Set scale of each view area. Available for splitted analyzer view and single view. In case of single view area_number equals one.
Scale should be in range(10,1001) | Area number should be in range(1,5), but is limited to current activated area views.
:param area_number: Which area should be addressed
:type area_number: int
:param scale: which scale should be used, defaults to 500
:type scale: int, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises ValueError: If parsed variables are out of bounds. See extended function summary.
"""
if scale in range(10, 1001) and 0 < area_number <= 4:
self._send_request(cmd="AppCmd", p1="SetAreaScale", p2=f"{area_number} {scale}", user_timeout=custom_timeout)
else:
self.logger.error(
"Choosen key is out of bounds. Scale should be in range(10,1001) and area number should be in range(1,5).")
raise ValueError(
"Choosen key is out of bounds. Scale should be in range(10,1001) and area number should be in range(1,5).")
[docs]
def set_area_colour(self, area_number: int, colour_scale: int = 200, custom_timeout=None) -> None:
""" Set the colour scale for area view. Only available in area.
Colour scale should be in range(10,401) | Area should be in range(1,5)
:param area_number: Which area should be addressed
:type area_number: int
:param colour_scale: which scale should be used, defaults to 200
:type colour_scale: int, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises ValueError: If parsed variables are out of bounds. See extended function summary.
"""
if colour_scale in range(10, 401) and 0 < area_number <= 4:
self._send_request(cmd="AppCmd", p1="SetAreaColor", p2=f"{area_number} {colour_scale}", user_timeout=custom_timeout)
else:
self.logger.error(
"Choosen key is out of bounds. Scale should be in range(10,401) and area number should be in range(1,5).")
raise ValueError(
"Choosen key is out of bounds. Scale should be in range(10,401) and area number should be in range(1,5).")
[docs]
def set_area_time_range(self, area_number: int, start_time: int, time_range: int, custom_timeout=None) -> None:
""" Set of shown time range for each area.
:param area_number: Area which should be addressed
:type area_number: int
:param start_time: Start point of time range in ms.
:type start_time: int
:param time_range: Range that will be shown from start_time in ms.
:type time_range: int
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="SetAreaPosition", p2=f"{area_number} {start_time} {time_range}", user_timeout=custom_timeout)
[docs]
def load_process(self, process_number: int, start_time=0, custom_timeout=None) -> None:
""" Load and display by process number.
:param process_number: Process that will be loaded
:type process_number: int
:param start_time: Start time in ms, defaults to 0
:type start_time: int, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="LoadProcess", p2=f"{process_number} {start_time}", user_timeout=custom_timeout)
[docs]
def get_service_parameter(self, param_setting: str, custom_timeout=None) -> Union[str, None]:
""" Get Values from Service Parameter (Configuration->Settings->Parameter)
.. note:: Only avaible for user level 8 or higher!
:param param_setting: Service parameter that should be read
:type param_setting: str
:return: Current set service parameter value
:rtype: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
settings = self._send_request(cmd="appfunc", p1="GetServiceParameter", p2=param_setting, user_timeout=custom_timeout)
result = settings.get("result")
if result is not None:
result = str(result)
return result
[docs]
def set_service_parameter(self, param_setting: str, param_value: Any, custom_timeout=None) -> None:
""" Set Parameter in Service Parameter (Configuration->Settings->Parameter)
.. note:: Only avaible for user level 8 or higher!
:param param_setting: Service parameter that should be set
:type param_setting: str
:param param_value: New value of choosen service parameter
:type param_value: Any
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="setServiceParameter", p2=f"{param_setting} {param_value}", user_timeout=custom_timeout)
self.logger.info(
f"Service parameter {param_setting} is changed to {param_value}")
[docs]
def send_analyzer_to_sleep(self, time=2000, custom_timeout=None) -> None:
""" Only testing purpose. Command to send Analyzer system in sleep mode.
:param time: Time to sleep in ms, defaults to 2000
:type time: int, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="sysSleep", p2=time, user_timeout=custom_timeout)
self.logger.info("Analyzer tired. Analyzer sleep.")
[docs]
def set_appvar(self, appvar_name: str, appvar_value: Any, custom_timeout=None, storeevent: bool = False) -> None:
""" Set the value of an AppVar by using the name of the AppVar. The prefix `pro_` will result in the AppVar being saved in the project and persist between restarts. The prefix `sys_` will result in the AppVar being saved globally and made available over all projects.
If the AppVar doesn't exist yet it will be created.
:param app_var_name: Name of the AppVar.
:type app_var_name: str
:param app_var_value: Value of AppVar. The type can be every datatype supported by python (e.g. float, int, str, json, ...).
:type app_var_value: Any
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:param storeevent: Whether to save this update as a process event in the Analyzer4D database (Only available for Analyzer4D > V2.07.14.00)
The eventtype will be the name of the appvar and the eventdata the value. Every write to the appvar will result in a process event,
even if the value did not change.
:type storeevent: bool, optional
"""
self._send_request(cmd="setappvar", p1=appvar_name, p2=appvar_value, storeevent=storeevent, user_timeout=custom_timeout)
[docs]
def get_appvar(self, appvar_name: str, custom_timeout=None) -> Union[str, None]:
""" Get AppVar value by name.
.. note: If requested Appvar is a json, the parsed value will be changed due to string escape.
:param app_var_name: Name of AppVar to adress.
:type app_var_name: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: AppVar value
:rtype: str
"""
val = self._send_request(cmd="getappvar", p1=appvar_name, user_timeout=custom_timeout)
result = val.get('result')
if result is not None:
result = str(result)
return result
[docs]
def remove_appvar(self, appvar_name: str, custom_timeout=None) -> None:
""" Clear and remove AppVar by name.
:param appvar_name: Name of AppVar to remove.
:type appvar_name: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="clearappvar", p1=appvar_name, user_timeout=custom_timeout)
[docs]
def add_appvar_report_callback(self, callback, custom_timeout=None, check_msg_id=True, appvar: Optional[str] = None) -> None:
""" Add callback function to report of AppVar. Everytime a AppVar changes, added callback functions will be executed. See networking_example.py for an example. By adding first callback the report start automatically und will be stopped by removing all callbacks due to remove function. Beside the executed callback, analyzer sends state of all AppVars as information by every change.
.. warning:: All callbacks need as first param "result" to catch analyzer response, if used or not.
# TODO add hint on lambda functions!
:param callback: Added callback function when report happens.
:type callback: function
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:param check_msg_id: Check if msgid of request matches the resid of response. Analyzer4D version lower than "2.04.06.02 extended" must set this to False.
:type custom_timeout: int, optional
TODO add docstrings for appvar argument
"""
if appvar is not None:
if len(self._all_appvar_callbacks) > 0:
raise ValueError(f'AppVar report for single AppVar "{appvar}" requested, but global AppVar reporting is already activated!')
if self.connected:
self._send_request(cmd='reportappvar', p1='add', p2=appvar)
self._single_appvar_callbacks[appvar].append(callback)
self.logger.info(f"Callback {callback.__name__} for AppVar \"{appvar}\" report added.")
else:
if len(self._single_appvar_callbacks) > 0:
raise ValueError('Global AppVar reporting requested, but AppVar reporting for single AppVars is already activated!')
if self.connected and not self._callback_registered_appvar:
self._send_request(cmd="reportappvars", p1="true", user_timeout=custom_timeout,check_msg_id=check_msg_id)
self._all_appvar_callbacks.append(callback)
self.logger.info(f"Callback {callback.__name__} for AppVar report added.")
[docs]
def remove_appvar_report_callback(self, callback, custom_timeout=None, appvar: Optional[str] = None) -> None:
""" Removes specific callback function from AppVar report callback list. By removing all callbacks the report function will be automatically stopped.
# TODO add hint on lambda functions!
:param callback: Callback function that should be removed from AppVar report functionalities.
:type callback: function
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
if appvar is not None:
if len(self._single_appvar_callbacks[appvar]) > 0:
self._single_appvar_callbacks[appvar].remove(callback)
if self.connected and len(self._single_appvar_callbacks[appvar]) == 0:
self._send_request(cmd='reportappvar', p1='remove', p2=appvar)
self.logger.info(f"Report of AppVar {appvar} stopped.")
elif len(self._all_appvar_callbacks) > 0:
self._all_appvar_callbacks.remove(callback)
if len(self._all_appvar_callbacks) == 0:
self._send_request(cmd="reportappvars", p1="false", user_timeout=custom_timeout)
self._callback_registered_appvar = False
self.logger.info("Report of AppVars stopped.")
[docs]
def get_process_number(self, custom_timeout=None) -> Union[int, None]:
""" Returns current process number (active buffer).
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: Process number of selected process
:rtype: int
"""
obj = self._send_request(cmd="getprocessnumber", user_timeout=custom_timeout)
process = obj.get("processnumber")
if process is not None:
process = int(process)
return process
[docs]
def create_project(self, project_name: str, custom_timeout=None) -> None:
""" Create new project after used template with custom name.
.. note:: Name size has to be at least 4. Avoid spaces or other typical forbidden characters in the project name.
:param project_name: Name of new project
:type project_name: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="createloadproject", p1=project_name, user_timeout=custom_timeout)
[docs]
def send_appcmd(self, param_one: str, param_two=None, custom_timeout=None) -> None:
""" General method to send arbitrary AppCmd to analyzer.
.. warning:: Developer function. Do not use without prior knowledge about AppCmds!
:param param_one: AppCmd
:type param_one: str
:param param_two: If needed second parameter to specify params used in AppCmd, defaults to None
:type param_two: str, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises TypeError: Type Check for second parameter, exception is raised if value is not equal to type str.
"""
if param_two:
if isinstance(param_two, str):
self._send_request(cmd="AppCmd", p1=param_one, p2=param_two, user_timeout=custom_timeout)
else:
raise TypeError("Second parameter has to be a string.")
else:
self._send_request(cmd="AppCmd", p1=param_one, user_timeout=custom_timeout)
[docs]
def set_multiplexer(self, channel=Channels.CHANNEL_1, chp=ChannelPorts.CHANNEL_PORT_1, preampport=PreampPorts.PREAMP_PORT_1,
fft=True, signal=False, samplerate=Samplerates16Bit.SAMPLERATE_1600_kHz, fftoversampling=FFTOversampling.FFT_OVERSAMPLING_8_TIMES,
fftwindowing=FFTWindowing.FFT_WINDOWING_HANNING, fftlogarithmic=FFTLogarithmic.FFT_LOGARITHMIC_BASE_14, filter=True, gain=800, subport=-1) -> None:
""" Method to set preamplifier and multiplexer settings.
.. warning:: Range of params will not be checked.
:param channel: Desired channel (Dropdown, defaults to Channels.CHANNEL_1
:type channel: int or Channels, optional
:param chp: Desired channelport (Dropdown, defaults to ChannelPorts.CHANNEL_PORT_1
:type chp: int or Channelports, optional
:param preampport: Which Preampport should be used, defaults to PreampPorts.PREAMP_PORT_1
:type preampport: int or PreampPorts, optional
:param fft: Checkbox if fft buffer should be recorded, defaults to True
:type fft: bool, optional
:param signal: Checkbox if signal buffer should be recorded, defaults to False
:type signal: bool, optional
:param samplerate: Desired samplerate (Dropdown) defaults to Samplerates16Bit.SAMPLERATE_1600_kHz
:type samplerate: int or Samplerates16Bit, optional
:param fftoversampling: Desired FFTOversampling (Dropdown, defaults to FFTOversampling.FFT_OVERSAMPLING_8_TIMES
:type fftoversampling: int or FFTOversampling, optional
:param fftwindowing: Desired FFTOversampling (Dropdown, defaults to FFTWindowing.FFT_WINDOWING_HANNING
:type fftwindowing: int or FFTWindowing, optional
:param fftlogarithmic: Desired FFTLogarithmic (Dropdown, defaults to FFTLogarithmic.FFT_LOGARITHMIC_BASE_14
:type fftlogarithmic: int or FFTLogarithmic, optional
:param filter: If frequency filter should be set, defaults to True
:type filter: bool, optional
:param gain: Gain of Preamp, defaults to 800
:type gain: int, optional
:param subport: Desired Subport (Dropdown). Should only be used with MultiinputPreamps!, defaults to -1
:type subport: int, optional
"""
self._send_request(cmd="setpreamp", check_msg_id=False, channel=channel, chp=chp, preampport=preampport, fft=fft, signal=signal, samplerate=samplerate,
fftoversampling=fftoversampling, fftwindowing=fftwindowing, fftlogarithmic=fftlogarithmic, filter=filter, gain=gain, subport=subport)
[docs]
def get_analyzer_versions(self, custom_timeout=None) -> str:
""" Method to read out anlyzer version informations and return as string. Information are identical to in-software "about" button.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: Information out of info window in analyzer.
:rtype: str
"""
val = self._send_request(cmd="getversions", user_timeout=custom_timeout, can_fail=False)
# process response
analyzer_info = val.get("v")
if analyzer_info is None:
raise AnalyzerVersionError('Cannot get analyzer version!')
while "\\n" in analyzer_info:
analyzer_info = analyzer_info.replace("\\n", "\n")
return analyzer_info
[docs]
def get_project_info(self, custom_timeout=None) -> Dict:
""" Method to read out analyzer project informations as current used project ID/name or analyzer version.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: Information about current project.
:rtype: dict [with dict.keys() = ['analyzerbcdversion', 'analyzerversion', 'projectid', 'projectname', 'pronameprojectid', 'unixtime']]
"""
project_info = self._send_request(cmd="getinfo", user_timeout=custom_timeout,can_fail=False)
# process response
project_info.pop("v")
project_info.pop("cmd")
project_info.pop("resid")
project_info.pop("millisecondpart")
return project_info
[docs]
def get_heartbeat(self, custom_timeout=None) -> bool:
""" Checks if the little guy is still there.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: True if message comes back.
:rtype: bool
"""
val = self._send_request(cmd="heartbeat", user_timeout=custom_timeout)
# process response
if val:
self.logger.info("No worries. I'm still alive.")
return True
return False
[docs]
def get_max_amp_per_band(self, channel=Channels.CHANNEL_1, create_plot_buffer: bool = True, save_plot_buffer: bool = False, amplitude_type=SysAmplitudesType.AMPLITUDE_DEFAULT, custom_timeout=None) -> np.ndarray:
""" Method to return maximum amplitude per band of current active buffer.
:param channel: Datastream Channel, defaults to Channels.CHANNEL_1
:type channel: int or Channel, optional
:param create_plot_buffer: Creates a temporary plot buffer in the Analyzer software, defaults to True
:type create_plot_buffer: bool, optional
:param save_plot_buffer: Option to save plot buffer, defaults to False
:type save_plot_buffer: bool, optional
:param amplitude_type: Amplitude unit, defaults to SysAmplitudesType.AMPLITUDE_DEFAULT
:type amplitude_type: int or SysAmplitudeType, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: Calculated maximum amplitude values per band
:rtype: np.ndarray
"""
response_dict = self._send_request(cmd="calcmaxamplitude", channel=channel, plot=create_plot_buffer, save=save_plot_buffer, amplitudetype=amplitude_type, user_timeout=custom_timeout,)
# extract important information
max_amp = response_dict.get("p1")
if max_amp is None:
raise AnalyzerError('Unable to get maximum amplitude per band!')
return np.fromstring(max_amp, sep=',')
[docs]
def load_test_project(self, custom_timeout=None) -> None:
""" Loads the set test project.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="loadtestproject", user_timeout=custom_timeout)
[docs]
def load_last_user_project(self, custom_timeout=None) -> None:
""" Loads last user project before a test project was loaded.
.. warning:: To use this a test project must be loaded before!!!
.. note:: If no testproject was loaded beforehand, <name_variable> in analyzer software will not be addressed and a new project without name!(="") is going to be created. Once a project like this exist, analyzer cannot perform this again and without loading a test project beforehand, function will do nothing (but parse any check).
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="loaduserproject", user_timeout=custom_timeout)
[docs]
def load_project(self, project_name:str, part_number:str="", custom_timeout=None) ->None:
""" Loads project by project name.
:param project_name: Name of the project
:type project_name: str
:param part_number: Set part number, most of the time should be empty, defaults to ""
:type part_number: str, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="loadprojectbyname", p1=project_name, p2=part_number, user_timeout=custom_timeout)
[docs]
def load_project_by_IOid(self, project_IOid:Union[str, int], part_number:str="", custom_timeout=None) ->None:
""" Loads project by set IO id.
:param project_IOid: Projects unique IO id
:type project_IOid: Union[str, int]
:param part_number: Set part number, most of the time should be empty, defaults to ""
:type part_number: str, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="loadprojectbyioid", p1=f"{project_IOid}", p2=part_number, user_timeout=custom_timeout)
[docs]
def get_measure_positions(self, custom_timeout=None) -> Dict:
""" Gets a dictionary with all measure positions and if used an energy value.
:return: Measurepositions and their calculated energy value.
:rtype: dict [with dict.keys() = ['mp0','mp1','mp2','mp3','mp4','mp5','mp5','mp7','mp8','mp9','mp10','mp11','mp12', 'mp13','mp14','mp15']
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
response = self._send_request(cmd="getmaxmeasurepositions", user_timeout=custom_timeout)
response.pop("v")
response.pop("cmd")
return response
[docs]
def get_preamp_info(self, preamp_port: Union[PreampPorts, int], convert:bool=True, custom_timeout=None) -> Union[Dict,str, None]:
""" By default returns a dictionary with preamp serial ring and number as the set s value. If convert is set to False the string is parsed as str without putting values into dictionary.
:param preamp_port: Preamp port with connected preamp.
:type preamp_port: int or PreampPorts
:param convert: Flag to convert incomming information to more readble form, default True.
:type convert: bool
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:raises KeyError: Raises if parsed variable is no supported preamp port
:return: Serial ring, number and s-value parsed in dictionary.
:rtype: dict or str [with dict.keys() = ['serial_type','serial_number','S-value']]
"""
if preamp_port in PreampPorts or preamp_port in range(0, 8):
preamp_info = self._send_request(cmd="getpreampinfo", user_timeout=custom_timeout, p1=preamp_port)
preamp_info = preamp_info.get('p1')
if preamp_info is None:
raise AnalyzerError('Unable to get preamp information!')
if not convert:
return preamp_info
try:
serial_ring, serial_num, s_value, __ = preamp_info.split(";")
serial_ring_idx = serial_ring.find(":")
serial_num_idx = serial_num.find(":")
s_value_idx = s_value.find(":")
preamp = {
"serial_type": serial_ring[serial_ring_idx+1:], "serial_number": serial_num[serial_num_idx+1:], "S-value": s_value[s_value_idx+1:]}
return preamp
except ValueError:
self.logger.warning("The provided Preamp is not configurated properly. Please contact a QASS Service Technician to solve that.")
return None
else:
self.logger.error(
"Choosen preampport is not an analyzer system preamp port.")
raise KeyError(
"Choosen preampport is not an analyzer system preamp port.")
[docs]
def write_preamp_s_value(self, s_value:int, preampport:Union[PreampPorts, int]=PreampPorts.PREAMP_PORT_1):
""" Method to set preamp s value in preamp EEPROM text.
:param s_value: s-value which should be write to preamp EEPROM text
:type s_value: int
:param preampport: Preampport where Preamp is connected, defaults to PreampPorts.PREAMP_PORT_1
:type preampport: Union[PreampPorts, int], optional
"""
preamp_eeprom = self.get_preamp_info(preamp_port=preampport, convert=False, custom_timeout="never")
replacement = f"s:{s_value};"
preamp_eeprom = re.sub("s:-*\d\d*;", replacement, preamp_eeprom)
self._send_request(cmd="writepreampinfo", p1=preampport, p2=preamp_eeprom, user_timeout="never")
def _write_preamp_eeprom(self, preamp_type:Union[PreampType, int], serial_number:int, s_value:int, preampport:Union[PreampPorts, int]=PreampPorts.PREAMP_PORT_1):
""" Private method to set preamp EEPROM text.
:param preamp_type: Type of preamp
:type preamp_type: PreampType or int
:param serial_number: Serial Number
:type serial_number: int
:param s_value: s value for preamp
:type s_value: int
:param preampport: Preampport to which Preamp is connected, defaults to PreampPorts.PREAMP_PORT_1
:type preampport: Union[PreampPorts, int], optional
"""
preamp_eeprom = f"t:{preamp_type};s/n:{serial_number};s:{s_value};"
self._send_request(cmd="writepreampinfo", p1=preampport, p2=preamp_eeprom, check_msg_id=False)
[docs]
def set_serial_number(self, serial_number: int, process_number: int, custom_timeout=None) -> None:
""" Setting serial number for arbitary process.
Serial number is stored in the database using the process.serial attribute
:param serial_number: Serial number that should be set.
:type serial_number: int
:param process_number: Process which should be connected to serial.
:type process_number: int
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="SetProcessSerial", user_timeout=custom_timeout, p2=f"{process_number} {serial_number}")
[docs]
def set_serial_number_pending_process(self, serial_number: int, custom_timeout=None) -> None:
""" Setting serial number for next process.
Serial number is stored in the database using the process.serial attribute
:param serial_number: Serial number for next process
:type serial_number: int
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="setpendingserial", p1=serial_number, user_timeout=custom_timeout)
[docs]
def start_operator(self, operator_name: str, operator_setting: str, user_callback: Optional[Callable]=None) -> None:
""" Manual start of existing operator by name. By adding a callback function,
software will execute callback when operator finish.
.. note:: Every callback needs an argument for passed response, whether it is used or not.
:param operator_name: Name of operator that should start
:type operator_name: str
:param operator_setting: Operator settings like "loop from 0 to -1 simulation 2"
:type operator_setting: str
:param user_callback: Not implemented yet!
:type user_callback: Optional[Callable]
"""
self._send_request(cmd="startoperator", check_msg_id=True,p1=operator_name, p2=operator_setting)
[docs]
def import_patterns(self, directory_path: str, ) -> None:
""" Import all pattern files from a optimizer local directory.
:param directory_path: Directory path to patterns that will be imported.
:type directory_path: str
"""
self._send_request(cmd="importpatterns", p1=f"\"{directory_path}\"",check_msg_id=False)
[docs]
def import_trigger_list(self, filepath: str, append: bool = False, custom_timeout=None) -> None:
""" Import a trigger list file from local path. Append option decides already exisitng triggers will be set active or not.
:param filepath: Local filepath
:type filepath: str
:param append: Decision to set already existing trigger list active or passive by extending, defaults to False.
:type append: bool, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
p2_string = f"triggerlist \"{filepath}\""
if append:
p2_string = p2_string + " -a"
self._send_request(cmd="AppCmd", user_timeout=custom_timeout, p1="import", p2=p2_string)
[docs]
def import_operator_network(self, filepath: str, custom_timeout=None) -> None:
""" Import local operator network file. Command runs as root import.
.. warning:: The current operator network will be replaced.
:param filepath: Local filepath to operator network file
:type filepath: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", user_timeout=custom_timeout, p1="import", p2=f"opnet \"{filepath}\"")
[docs]
def import_project_archive(self, filepath: str, project_name: str, keep_original_process_nums: bool = False, overwrite: bool = False) -> None:
""" Import a complete project archive file (tar.gz).
.. warning:: If keep_original_process_nums is set, the proces structure will be kept like before the import. As an example if process 17000 has been exported, this flag will create 16999 empty processes before.
.. warning:: If overwrite is activated this will be overwrite and delete current activated project.
:param filepath: Local filepath to archive file
:type filepath: str
:param project_name: Name of the now imported project
:type project_name: str
:param original_nums: Keeps the original process number, defaults to False
:type original_nums: bool, optional
:param overwrite: Overwrites current active project, defaults to False
:type overwrite: bool, optional
"""
p2_string = f" \"{filepath}\" {project_name}"
if keep_original_process_nums:
p2_string = p2_string + " --originalnums"
if overwrite:
p2_string = p2_string + " --overwrite"
self._send_request(cmd="AppCmd", check_msg_id=False, p1="importprojectarchive", p2=p2_string)
[docs]
def export_operator_network(self, target_filepath: str, export: str = "root", custom_timeout=None) -> None:
""" Exports operator network as JSON file. When in doubt, check documentation.
Supported 'export' keys: 'root', str | current activated
Supported 'export' keys: 'all', str | all networks
Supported 'export' keys: 'template', str | network template
.. list-table:: Keywords on one look
:widths: 15 10 25
:header-rows: 1
* - Key
- Value datatype
- Definition
* - root
- str
- Exports current active operator network
* - all
- str
- Exports all avaible operator networks
* - template
- str
- Exports project specific operator network template
:param folderpath: Target file path
:type folderpath: str
:param export: Decided what from operator will be exported. Current activated("root", all operators or the template, defaults to "root"
:type export: str, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
my_translator = {"root": "-r", "all": "-a", "template": "-t"}
self._send_request(cmd="AppCmd", p1="export", p2=f"opnet \"{target_filepath}\" {my_translator[export]}", user_timeout=custom_timeout)
[docs]
def export_trigger_list(self, target_filepath: str, custom_timeout=None) -> None:
""" Exports current trigger list to path. Target filepath should contain new file name.
:param target_filepath: Target file path
:type target_filepath: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="export", p2=f"triggerlist \"{target_filepath}\"", user_timeout=custom_timeout)
[docs]
def export_project_archive(self, target_filepath: str, export_name: str, export_process: Optional[int] = None, export_pengui: bool = True, keep_folder: bool = True) -> None:
""" Exports current active project to path as tar.gz file. This includes all patterns, trigger list and projects.
:param target_filepath: Target folder path
:type target_filepath: str
:param export_name: Give export file a name
:type export_name: str
:param export_process: Exports an example process with measurement data, defaults to None
:type export_process: int, optional
:param export_pengui: Exports PenGUI, defaults to True
:type export_pengui: bool, optional
:param keep_folder: Preserves folder structure and exports this structure to target, defaults to True
:type keep_folder: bool, optional
"""
p2_string = f"\"{target_filepath}\" {export_name}"
if export_process is not None:
p2_string = p2_string + f" --process {export_process}"
if export_pengui:
p2_string = p2_string + " --pengui"
if keep_folder:
p2_string = p2_string + " --keepfolder"
self._send_request(cmd="AppCmd", check_msg_id=False, p1="exportprojectarchive", p2=p2_string)
# TODO: Test in newest analyzer version
[docs]
def flash_preamp_firmware(self, preampport: Union[int, PreampPorts], filepath: str) -> None:
""" Flash preamp firmware by downloaded hexfile. Path should be absolute path.
:param preampport: Connected Preamp
:type preampport: int or PreampPorts
:param filepath: Absolute (!) path to hexfile
:type filepath: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
preampport += 1
self._send_request(cmd="appfunc", check_msg_id=False, p1="PreampTool", p2=f"flash {preampport} \"{filepath}\"")
# self._send_request(cmd="PreampTool", user_timeout=custom_timeout)
# p1=f"flash {preampport} {filepath}")
[docs]
def detect_preamps(self) -> str:
""" Method which let Analyzer check for connected Preamps (will not auto. activate them!)
:return: String saying how much preamps are detected
:rtype: str
"""
custom_timeout="never"
response = self._send_request(cmd="appfunc", p1="PreampTool", p2="detect", user_timeout=custom_timeout)
detected_preamps = response.get('result')
if detected_preamps is None:
raise AnalyzerError('Unable to detect preamps!')
return detected_preamps
[docs]
def get_preamp_firmware(self, preampport: Union[int, PreampPorts], custom_timeout=None) -> str:
""" Returns preamp firmware version.
:param preampport: Port where preamp is connected
:type preampport: Union[int, PreampPorts]
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: O-PA Firmware
:rtype: str
"""
preampport += 1 # c++ analyzer source code handels here preampports between 1 to 8
response = self._send_request(cmd="appfunc", p1="PreampTool", p2=f"version {preampport}", user_timeout=custom_timeout)
firmware_version = response.get("result")
if firmware_version is None:
raise AnalyzerError('Unable to get firmware version!')
return firmware_version
[docs]
def reboot_preamp(self, preampport: Union[int, PreampPorts]) ->None:
""" Reboots preamp for one second.
:param preampport: Port where preamp is connected
:type preampport: Union[int, PreampPorts]
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
preampport += 1 # c++ analyzer source code handels here preampports between 1 to 8
p2_string = f"port {preampport} reboot"
self._send_request(cmd="AppCmd", p1="Preamp", p2=p2_string, user_timeout="never")
[docs]
def set_default_project(self, comment: Optional[str] = None, custom_timeout=None) -> None:
""" Set current active project as new default template.
:param comment: Comment to describe template, defaults to None
:type comment: str, optional
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
if comment:
self._send_request(cmd="AppCmd", p1="SaveProjectasDefault", p2=f"-c {comment}", user_timeout=custom_timeout)
else:
self._send_request(cmd="AppCmd", p1="SaveProjectasDefault", user_timeout=custom_timeout)
[docs]
def remove_default_project(self, custom_timeout=None) -> None:
""" Removes current project template."""
self._send_request(cmd="AppCmd", p1="SaveProjectasDefault", p2="-e", user_timeout=custom_timeout)
[docs]
def get_io_output(self, custom_timeout=None) -> int:
""" Returns get I/O output register as integer appearance (converted from hex).
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: I/O output register as integer appearance
:rtype: int
"""
val = self._send_request(cmd="readioout", user_timeout=custom_timeout)
io_output = val.get('result')
if io_output is None:
raise AnalyzerError('Unable to get IO outputs!')
return int(io_output)
def _shift_binary(self, original_bin: str) -> str:
""" Helper method to convert incoming binary to least significant digit on the right side
:param str original_bin: Incoming binary
:param int custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:return str: Shifted binary
"""
# helper list
new_val: list = [0] * len(original_bin)
# save current val to shifted position in list
for (i, bit) in enumerate(original_bin):
new_val[len(new_val)-1-i] = bit
# convert list to string and return
return "".join(new_val)
def _binary_to_hexa(self, binary_str: str) -> str:
""" Formats incoming binary to hexa representation with leading zeros. And leading "0xf" term.
:param binary_str: Binary that should be converted to hexa representation.
:type binary_str: str
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: hexa representation
:rtype: str
"""
hexa = "0xf" + "{0:0>4x}".format(int(binary_str, 2))
return hexa
[docs]
def add_io_report_callback(self, callback, custom_timeout=None) -> None:
""" Adds callback function to report of I/O register. Everytime I/O register changes, added callback functions will be executed. See networking_example.py for an example.
By adding first callback the report start automatically und will be stopped by removing all callbacks due to remove function.
.. warning:: All callbacks need as first param "result" to catch analyzer response, if used or not.
:param callback: Added callback function when report happens.
:type callback: function
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
if not self._callback_registered_io and self.connected:
self._send_request(cmd="reportio", p1="true", user_timeout=custom_timeout)
self._callback_registered_io = True
self._io_callbacks.append(callback)
self.logger.info(f"Callback {callback} for I/O report added")
[docs]
def remove_io_report_callback(self, callback, custom_timeout=None) -> None:
""" Removes specific callback function from I/O report callback list. By removing all callbacks the report function will be automatically stopped.
:param callback: Callback function that should be removed from I/O report functionalities.
:type callback: function
"""
if len(self._io_callbacks) > 0:
self._io_callbacks.remove(callback)
if len(self._io_callbacks) == 0:
self._send_request(cmd="reportio", p1="false", user_timeout=custom_timeout)
self._callback_registered_io = False
self.logger.info("I/O report stopped")
def write_fieldbus_input(self, data:bytearray,custom_timeout=None) -> None:
if not isinstance(data,bytearray):
raise ValueError('data must be of type bytearray')
self._send_request(cmd="profibusmsg", p1=data.hex(), user_timeout=custom_timeout)
def write_fieldbus_output(self, addr:int, size:int, value:int ,custom_timeout=None) -> None:
self._send_request(cmd="AppCmd",p1="pbSendDWord", p2=f"{value} {size} {addr} ", user_timeout=custom_timeout)
def read_fieldbus_input(self, addr:int, size:int, custom_timeout=None):
response = self._send_request(cmd="AppFunc", p1="pbReadDWord",p2=f"{size} {addr}", user_timeout=custom_timeout)
return response.get("result")
def register_fieldbus_input_callback(self, callback, custom_timeout=None) -> None:
if not self._callback_registered_fieldbus_input and self.connected:
self._send_request(cmd="reportprofibus", p1="true",p2="rx", user_timeout=custom_timeout)
self._callback_registered_fieldbus_input = True
self._fieldbus_input_callbacks.append(callback)
self.logger.info("Callback for fieldbus input report added")
def register_fieldbus_output_callback(self, callback, custom_timeout=None) -> None:
if not self._callback_registered_fieldbus_output and self.connected:
self._send_request(cmd="reportprofibus", p1="true",p2="tx", user_timeout=custom_timeout)
self._callback_registered_fieldbus_output = True
self._fieldbus_output_callbacks.append(callback)
self.logger.info("Callback for fieldbus input report added")
[docs]
def add_process_number_report_callback(self, callback, custom_timeout=None) -> None:
""" Adds callback function to report of process number. Everytime the process number changes, added callback functions will be executed. See networking_example.py for an example. By adding first callback the report start automatically und will be stopped by removing all callbacks due to remove function.
.. warning:: All callbacks need as first param "result" to catch analyzer response, if used or not.
:param callback: Added callback function when report happens.
:type callback: function
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
if not self._callback_registered_processnumber and self.connected:
self._send_request(cmd="reportprocessnumber", p1="true", user_timeout=custom_timeout)
self._callback_registered_processnumber = True
self._processnumber_callbacks.append(callback)
self.logger.info(f"Callback {callback} for AppVar report added")
[docs]
def remove_process_number_report_callback(self, callback, custom_timeout=None) -> None:
""" Removes specific callback function from process number report callback list. By removing all callbacks the report function will be automatically stopped.
:param callback: Callback function that should be removed from process number report functionalities.
:type callback: function
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
if len(self._processnumber_callbacks) > 0:
self._processnumber_callbacks.remove(callback)
if len(self._processnumber_callbacks) == 0:
self._send_request(cmd="reportprocessnumber", p1="false", user_timeout=custom_timeout)
self._callback_registered_processnumber = False
self.logger.info("Report of process number stopped.")
[docs]
def set_io_output(self, io_line: int, state: bool, custom_timeout=None) -> None:
""" Sets single I/O ouput line. As parameter only line number of third I/O line is required. When in doubt, check documentation.
.. warning:: Changing output line 3.1 - 3.3 is not possible.
.. list-table:: I/O Output possibilities
:widths: 25 25
:header-rows: 1
* - I/O line
- parameter
* - 3.1
- 1
* - 3.2
- 2
* - 3.3
- 3
* - 3.4
- 4
* - 3.5
- 5
* - 3.6
- 6
* - 3.7
- 7
* - 3.8
- 8
:param io_line: Line number in range(1,8)
:type io_line: int
:param state: Set Line high or low
:type state: bool
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="appcmd", p1="setioout", p2=f"{io_line} {state}", user_timeout=custom_timeout)
[docs]
def start_script_function(self, function_name: str, function_param: Any, custom_timeout=None) -> dict:
""" General syntax to start script function. Response is depending on called function.
.. warning:: Service function, should not be used without prior kmowledge about remote scripts
:param function_name: Name of script function
:type function_name: str
:param function_param: Passed param to script function
:type function_param: any
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
:return: Standard Analyzer response. Dict contains result of addressed function as str.
:rtype: dict
"""
return self._send_request(cmd="AppFunc", p1=function_name, p2=function_param, user_timeout=custom_timeout)
[docs]
def set_human_confirmation(self, process_IO=False, **kwargs) -> None:
""" Send human confirmation over current process. Score and comment can be parsed over kwargs. When in doubt, check documentation.
Supported Kwargs Key: "comment", str | Human comment for confirmation
Supported Kwargs Key: "score", int | Score value for confirmation
.. list-table:: Possible keyword arguments
:widths: 15 10 25
:header-rows: 1
* - Key
- Value datatype
- Description
* - comment
- str
- Human comment for confirmation
* - score
- int
- Score value for confirmation
:param process_IO: Confirmation if current process is IO or NIO, defaults to False
:type process_IO: bool
:raises ValueError: If kwargs key is not supported
"""
translator = {False: "NIO", True: "IO"}
settings = {"cmd": "confirmation",
"p1": translator[process_IO]}
if kwargs:
if kwargs.keys() not in ["comment", "score"]:
self.logger.error(
"Key is not supported for operation human_confirmation")
raise ValueError(
"Key is not supported for operation human_confirmation")
if "comment" in kwargs.keys():
settings["p2"] = kwargs["comment"]
if "score" in kwargs.keys():
settings["score"] = kwargs["score"]
self._send_request(check_msg_id=False, **settings)
[docs]
def write_to_database(self, result: Any, comment=None) -> None:
""" Writes database query for an entry with current project_id, process, process_id, result and comment as values
:param result: Result which should be saved in database
:type result: any
:param comment: Comment for result, defaults to None
:type comment: str, optional
"""
if comment:
self._send_request(check_msg_id=False, cmd="humanconfirmationresult", p1=result, p2=comment)
else:
self._send_request(check_msg_id=False, cmd="humanconfirmationresult", p1=result)
[docs]
def write_backup(self, custom_timeout=None) -> None:
""" Creates an automatic Analyzer backup.
:param custom_timeout: Custom timeout flag to get a response, defaults to None. For more information see class description.
:type custom_timeout: int, optional
"""
self._send_request(cmd="AppCmd", p1="writeBackup", user_timeout=custom_timeout)
[docs]
def set_sys_pengui_config(self, penguifile=None, reload=None, activate_on_load=None, disable_open_gl=None,
disable_buffer_boxes=None, keepIfNotChanged=None, custom_timeout=None):
""" Sets the entries under Preferences -> GUI -> Custom User Interface.
This incorporates the behaviour of the qml GUI. It is probably no good idea to set reload and keepIfNotChanged both to True.
:param str penguifile: The absolute path to the qml file that should be loaded.
:param bool reload: Whether or not to reload the qml file whenever a project is loaded
:param bool activate_on_load: Whether to display the qml GUI on program startup.
:param bool disable_open_gl: Disable the openGL view whenever a qml GUI is actively displayed.
:param bool disable_buffer_boxes: Disable Buffer bounding boxes.
:param bool keepIfNotChanged: Do not reload qml GUI when the project is changed and the new project uses the same QML. Just available for Analyzer with Version >= "2.04.12.05", otherwise this attribute is ignored and a warning occures
"""
#build p2 string for use of different optionm -> Analyzer searches for subcmd and then boolean value
activated_params= []
for key, value in [("penguifile", penguifile), ("reload", reload),
("activateOnLoad", activate_on_load),
("disableOpenGL", disable_open_gl),
("disableBufferBoxes", disable_buffer_boxes)]:
if value is not None:
activated_params.append(f"{key} \"{self.translator.get(value,value)}\" ")
if keepIfNotChanged is not None:
# check version
if self.check_version(minimum_needed_version="2.04.12.05"):
activated_params.append(f"keepIfNotChanged \"{self.translator.get(keepIfNotChanged,keepIfNotChanged)}\" ")
else:
self.logger.warning(f"keepIfNotChanged is no available attribute for method sysPenguiConfig because AnalyzerVersion {self._analyzer_version} is smaller than 2.04.12.05! Change attribute manually in Settings -> GUI or update Analyzer Version!")
p2_str = "".join(activated_params)
if p2_str == "":
self.logger.info("Method 'set_sys_pengui_config' is not executed because of no valid parameters.")
return
self._send_request(cmd="AppCmd", p1="sysPenguiConfig", p2=p2_str, user_timeout=custom_timeout)
[docs]
def set_python_init_hook(self, python_init_hook_path: Union[str, Path]):
"""
Set the python init hook path in Preferences -> Python -> Python Init Hook
:param str python_init_hook_path: The absolute path to the python script that should be executed during
the startup phase of the analyzer software.
"""
self._send_request(cmd="AppCmd", p1="sysPathConfig", p2=f"pyinithook \"{str(python_init_hook_path)}\"")
[docs]
def reset_failstate(self, set_idle_state:bool=True, clear_all_windows:bool=True,custom_timeout=None) -> None:
""" Reset failure status of optimizer (activates I/O Ready)
:param bool set_idle_state: Application state is set to IDLE, defaults to True
:param bool clear_all_windows: Removes all message/notification windows, defaults to True
"""
p2 = ""
if set_idle_state:
p2 = p2 + "-idle"
if clear_all_windows:
p2 = p2 + " -a"
self._send_request(cmd="AppCmd", p1="ResetFailstate", p2=p2,user_timeout=custom_timeout)
[docs]
def set_failstate(self, **kwargs):
""" Set Analyzer4d Software into failstate. If no duration is provided, system stays in failstate (clear I/O ready).
Supported Kwargs Key: "duration", int | Duration in ms for failstate status
.. list-table:: Possible keyword arguments
:widths: 15 10 25
:header-rows: 1
* - Key
- Value datatype
- Description
* - comment
- int
- Duration in ms for failstate status
"""
duration = kwargs.get("duration", None)
if duration:
if isinstance(duration, int):
self._send_request(cmd="AppCmd", p1="SetFailstate", p2=f"{duration}")
else:
raise ValueError("Only integer greater 0 are supported for failstate duration")
else:
self._send_request(cmd="AppCmd", p1="SetFailstate")
[docs]
def free_buffer_datablocks(self):
""" Free all buffer standby datablocks.
.. warning:: Experts method
"""
self._send_request(cmd="AppCmd", p1="ExpertCmd", p2="RAM free-standby")
[docs]
def remove_delayed_trigger(self, delay_type: Literal['all', 'busy', 'parameter'], custom_timeout=None):
""" Method to remove delayed trigger.
Supported key: "all", str | Remove all delayed trigger commands from queue
Supported key: "busy", str | Remove trigger commands delayed to busy signal
Supported key: "parameter", str | Remove trigger commands delayed by parameters from queue
.. list-table:: Possible delay types
:widths: 15 10 25
:header-rows: 1
* - Key
- Datatype
- Definition
* - 'all'
- str
- Remove all delayed trigger commands from queue
* - 'busy'
- str
- Remove trigger commands delayed to busy signal
* - 'parameter'
- str
- Remove trigger commands delayed by parameters from queue
.. warning:: Experts method
:param delay_type: Type of delayed signal to remove, defaults to None
:type delay_type: str, optional
"""
remove_kinds = {"all":"remove-all", "busy":"remove-busy", "parameter":"remove-delayed"}
self._send_request(cmd="AppCmd", p1="ExpertCmd", p2=f"TRIGGER {remove_kinds[delay_type]}", user_timeout=custom_timeout)
[docs]
def start_shell_program(self, programm_path:Union[str,Path], detach_from_analyzer:bool=True):
""" Start an arbitary system process via shell. By detaching start of program and analyzer context, start of programm runs asynchron. If false, analyzer waits for finsihed programm (max to 1 sec)
:param programm_path: Path to Programm
:type programm_path: str
:param detach_from_analyzer: Flag to decide if process is completted async to analyzer context, defaults to True
:type detach_from_analyzer: bool, optional
"""
if detach_from_analyzer:
sync_param = "-detach"
else:
sync_param = "-noasync"
self._send_request(cmd="AppCmd", p1="StartProgram", p2=f"{sync_param} \"{str(programm_path)}\"")
[docs]
def restart_analyzer(self, wait_time:Union[int,str]=2000, **kwargs):
""" Restart analyzer4D Software after system stayed a mininum time (= wait_time) in idel state.
By parsing "force_now", a reboot will be executed directly.
Supported Kwargs Key: "last_words", str | Displayed message from analyzer before restart
Supported Kwargs Key: "last_words_display_time", int | Time frame in ms for displaying last words. Time frame > 0 and Time frame <= wait_time. Keyword is only settable by simultaneously using last_words.
.. list-table:: Keyword arguments
:widths: 15 10 25
:header-rows: 1
* - Key
- Value datatype
- Description
* - last_words
- str
- Displayed message from analyzer before restart
* - last_words_display_time
- int
- Time frame in ms for displaying last words. Time frame > 0 and Time frame <= wait_time. Keyword is only settable by simultaneously using last_words.
:param wait_time: Minimum time [ms] in idle state before analyzer software is closed, defaults to 2000 ms
:type wait_time: Union[int,str], optional
:raises ValueError: If wait_time is smaller or equal zero
:raises ValueError: If display_message time is smaller or equal zero
"""
if isinstance(wait_time, str) and wait_time == "force_now":
self._send_request(cmd="AppCmd", p1="RestartAnalyzer", p2="FORCE_NOW")
elif isinstance(wait_time,int):
if not wait_time > 0:
raise ValueError("Display time has to be greater than 0 ms")
last_words = kwargs.get("last_words", None)
last_words_display_time = kwargs.get("last_words_display_time", 2000)
p2 = f"{wait_time}"
if last_words:
if not last_words_display_time > 0:
raise ValueError("Display time has to be greater than 0 ms")
if last_words_display_time > wait_time:
last_words_display_time = wait_time
self.logger.info("Disaply time for analyzer message is set to maximum time before restart (= 'wait_time')")
p2 = p2 + f" {last_words_display_time} \"{last_words}\""
self._send_request(cmd="AppCmd", p1="RestartAnalyzer", p2=f"{p2}")
#TODO: Description
[docs]
def set_frequency_mask(self, mask_id:int, measure_config:int):
""" Set an already exisiting frequency mask.
:param int mask_id: ID of desired mask
:param int measure_config: _description_
"""
self._send_request(cmd="AppCmd", p1="SetFrequencymask", p2=f"{mask_id} {measure_config}")
[docs]
def use_frequency_mask(self, mask_id:int):
""" Use already existing frequnecy mask on process.
:param int mask_id: Use frequency mask with provided ID
"""
self._send_request(cmd="AppCmd", p1="UseFrequencymask", p2=f"{mask_id}")
[docs]
def teach_frequency_mask(self, mask_id:int, mask_type:str):
""" Teach new Frequency mask for loaded measurement.
:param int mask_id: Frequency mask ID of new mask
:param str mask_name: Frequency mask type
"""
self._send_request(cmd="AppCmd", p1="TeachFrequencymask", p2=f"{mask_id} {mask_type}")
[docs]
def set_sys_python_path(self, python_sys_path:Union[str,Path]):
""" Set system python path. [Preferences->Python->sys.path extensions]
:param Union[str,Path] python_sys_path: Python path
"""
self._send_request(cmd="AppCmd", p1="sysPathConfig", p2=f"pysyspaths \"{python_sys_path}\"")
[docs]
def set_appvar_container_visible(self, visible:bool=True):
""" Shows AppVar Container in Analyzer4D menu.
:param bool visible: Flag to activate vision, defaults to True
"""
if visible:
state = "enable"
else:
state = "disable"
self._send_request(cmd="AppCmd", p1="ShowTool", p2=f"APPVARS {state}")
[docs]
def set_frq_mask_container_visible(self, visible:bool=True):
"""Shows Frequency mask manager in Analyzer4D menu.
:param bool visible: Flag to activate vision, defaults to True
"""
if visible:
state = "enable"
else:
state = "disable"
self._send_request(cmd="AppCmd", p1="ShowTool", p2=f"FRQMASKS {state}")
def _get_next_msg_id(self):
with self._msg_id_lock:
self._msg_id += 1
return copy.copy(self._msg_id)
def _send_request(self, check_msg_id: bool = True, can_fail: bool = True, user_timeout: Optional[int] = None, **kwargs) -> dict:
if not self._socket:
raise RuntimeError(f'can not send because socket is closed {kwargs}')
msg_id = self._get_next_msg_id()
# command ground structure
command = {'cmd': "",
"msgid": msg_id}
command.update(kwargs)
with self._requests_lock:
request = futures.Future()
self._requests[msg_id] = request
payload = json.dumps(command).encode()
header = struct.pack('>H',len(payload))
data = header+payload
self.logger.debug(f'Send:\n{json.dumps(command,indent=2)}')
try:
send_bytes = self._socket.send(data)
except OSError as e:
self.logger.exception(e)
try:
if self._socket:
self._socket.shutdown(socket.SHUT_RDWR)
except OSError as e:
self.logger.exception(e)
pass
raise
if send_bytes != (len(data)):
raise RuntimeError('Failed to write bytes')
if user_timeout:
if isinstance(user_timeout, str) and user_timeout == "never":
user_timeout = None # equals block
timeout = user_timeout
else:
timeout = self.timeout
try:
if check_msg_id:
response : dict = request.result(timeout)
else:
return None
except futures.TimeoutError as e :
raise RuntimeError('Do not get a response from Analyzer4D Software') from e
finally:
with self._requests_lock:
self._requests.pop(msg_id)
if can_fail and not response.get("ok",None):
raise AnalyzerError(
f"Analyzer4D software failed to execute cmd:'{command['cmd']}\n"
f"Request:{json.dumps(command,indent=2)}\n"
f"Response:{json.dumps(response,indent=2)}'."
)
return response
def _execute_callbacks(self):
self._processing_callbacks.set()
while self._processing_callbacks.is_set():
callback = self._callback_queue.get()
if callback is None:
self.logger.debug('Stop Callback Processing')
self._processing_callbacks.clear()
return
try:
callback()
except Exception as e:
self.logger.exception(e)
def _receive_data(self):
QASS_PROTO_HEADER_SIZE = 2
try:
self._processing_data.set()
buffer = bytearray()
parse_header = True
bytes_to_read = QASS_PROTO_HEADER_SIZE
while self._processing_data.is_set():
data = self._socket.recv(bytes_to_read - len(buffer))
if not data: # other side close the connection. E.g quit analyzer4D
break
buffer += data
if len(buffer)==bytes_to_read:
if parse_header:
bytes_to_read, = struct.unpack('>H',buffer)
parse_header = False
else:
response = buffer.decode()
response = json.loads(response)
self._process_msg(response)
bytes_to_read = QASS_PROTO_HEADER_SIZE
parse_header = True
buffer.clear()
except Exception as e:
self.logger.exception(e)
finally:
self._processing_data.clear()
self._callback_queue.put(None)
with self._socket_lock:
if self._socket:
try:
self._socket.close()
except OSError:
self.logger.warning('socket was not closed regulary')
finally:
self._socket = None
self.logger.info(f'Disconnect from {self.ip}:{self.port}')
# self.logger.info()
self.logger.debug('Stop Receiving data')
def _process_msg(self, msg : dict):
try:
self.logger.debug(f'Receive:\n{json.dumps(msg,indent=2)}')
if 'resid' in msg:
resid = msg['resid']
with self._requests_lock:
if resid in self._requests:
self._requests[resid].set_result(msg)
else:
self.logger.error(f'unknown resid ({resid})')
elif 'cmd' in msg:
cmd = msg['cmd']
if cmd in ("reportappvars","responseappvars"):
if len(self._all_appvar_callbacks) > 0:
for cb in self._all_appvar_callbacks:
self._callback_queue.put(functools.partial(cb,msg))
else:
appvar = msg['name']
for cb in self._single_appvar_callbacks[appvar]:
self._callback_queue.put(functools.partial(cb, msg))
elif cmd == 'responsereportio':
for cb in self._io_callbacks:
self._callback_queue.put(functools.partial(cb,msg))
elif cmd == 'responsereportprocessnumber':
for cb in self._processnumber_callbacks:
self._callback_queue.put(functools.partial(cb,msg))
elif cmd == 'responsereportprofibus':
pbrx = msg.get("pbrx",None)
pbtx = msg.get("pbtx",None)
if pbrx is not None:
data = bytes.fromhex(pbrx)
for cb in self._fieldbus_input_callbacks:
self._callback_queue.put(functools.partial(cb,data))
elif pbtx is not None:
data = bytes.fromhex(pbtx)
for cb in self._fieldbus_output_callbacks:
self._callback_queue.put(functools.partial(cb,data))
else:
self.logger.error(f'Recv unknown package {msg}')
else:
self.logger.error(f'Recv unknown package {msg}')
else:
self.logger.error('Missing field "cmd" in message')
except Exception as e:
self.logger.exception(e)