Source code for qass.tools.analyzer.buffer_metadata_cache

#
# Copyright (c) 2022 QASS GmbH.
# Website: https://qass.net
# Contact: QASS GmbH <info@qass.net>
#
# This file is part of Qass tools 
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os, re, warnings
from typing import Any, Callable, Tuple, Union
from sqlalchemy import Float, create_engine, Column, Integer, String, BigInteger, Identity, Index, Enum, TypeDecorator, select, text, BINARY
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.sql.selectable import Select
from pathlib import Path
from enum import Enum
from tqdm.auto import tqdm
from multiprocessing.pool import Pool as Pool

from .buffer_parser import Buffer


__all__ = ["BufferMetadataCache", "BufferMetadata"]

class BufferEnum(TypeDecorator):
    impl = String
    def __init__(self, enumtype, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._enumtype = enumtype

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        return value.name

    def process_result_value(self, value, dialect):
        if value is None:
            return None
        return self._enumtype[str(value)]

__Base = declarative_base()
[docs] class BufferMetadata(__Base): """This class acts as a template for buffer files. It's properties represent all available metadata of a buffer file. This class is used internally as a database model and can be instantiated to provide a template for a buffer file by populating desired properties and passing the object to the cache which will in turn create a query based on this object. """ __tablename__ = "buffer_metadata" properties = ("id", "project_id", "directory_path", "filename", "header_size", "process", "channel", "datamode", "datakind", "datatype", "process_time", "process_date_time", "db_header_size", "bytes_per_sample", "db_count", "full_blocks", "db_size", "db_sample_count", "frq_bands", "db_spec_count", "compression_frq", "compression_time", "avg_time", "avg_frq", "spec_duration", "frq_start", "frq_end", "frq_per_band", "sample_count", "spec_count", "adc_type", "bit_resolution", "fft_log_shift", "streamno", "preamp_gain", "analyzer_version", "partnumber", "header_hash") id = Column(Integer, Identity(start = 1), primary_key=True) project_id = Column(BigInteger, index=True) directory_path = Column(String, nullable=False, index=True) filename = Column(String, nullable=False) header_hash = Column(String(64), index=True) machine_id = Column(String) header_size = Column(Integer) process = Column(Integer, index=True) channel = Column(Integer, index=True) datamode = Column(BufferEnum(Buffer.DATAMODE), index=True) # this is an ENUM in buffer_parser datakind = Column(BufferEnum(Buffer.DATAKIND)) # this is an ENUM in buffer_parser datatype = Column(BufferEnum(Buffer.DATATYPE)) # this is an ENUM in buffer_parser process_time = Column(BigInteger) process_date_time = Column(String) db_header_size = Column(Integer) bytes_per_sample = Column(Integer) db_count = Column(Integer) full_blocks = Column(Integer) db_size = Column(Integer) db_sample_count = Column(Integer) frq_bands = Column(Integer) db_spec_count = Column(Integer) compression_frq = Column(Integer, index=True) compression_time = Column(Integer, index=True) avg_time = Column(Integer, index=True) avg_frq = Column(Integer, index=True) spec_duration = Column(Float) frq_start = Column(Integer) frq_end = Column(Integer) frq_per_band = Column(Float) sample_count = Column(BigInteger) spec_count = Column(BigInteger) adc_type = Column(BufferEnum(Buffer.ADCTYPE)) # TODO this is an ENUM in buffer_parser bit_resolution = Column(Integer) fft_log_shift = Column(Integer) streamno = Column(Integer) preamp_gain = Column(Integer) analyzer_version = Column(String) partnumber = Column(String) opening_error = Column(String, nullable=True) @hybrid_property def filepath(self): return self.directory_path + self.filename
[docs] @staticmethod def buffer_to_metadata(buffer): """Converts a Buffer object to a BufferMetadata database object by copying all the @properties from the Buffer object putting them in the BufferMetadata object :param buffer: Buffer object :type buffer: buffer_parser.Buffer """ if "/" in buffer.filepath: filename = buffer.filepath.split("/")[-1] elif "\\" in buffer.filepath: filename = buffer.filepath.split("\\")[-1] directory_path = buffer.filepath[:-len(filename)] buffer_metadata = BufferMetadata(filename = filename, directory_path = directory_path) for prop in BufferMetadata.properties: try: # try to map all the buffer properties and skip on error setattr(buffer_metadata, prop, getattr(buffer, prop)) # get the @property method and execute it except: continue return buffer_metadata
Index("project_id_process_index", BufferMetadata.project_id, BufferMetadata.process) Index("project_id_process_channel_index", BufferMetadata.project_id, BufferMetadata.process, BufferMetadata.channel) Index("compression_time_frq_index", BufferMetadata.compression_time, BufferMetadata.compression_frq) Index("project_id_compression_time_frq_index", BufferMetadata.project_id, BufferMetadata.compression_time, BufferMetadata.compression_frq) def _create_metadata(args): Buffer_cls, file = args try: with Buffer_cls(file) as buffer: buffer_metadata = BufferMetadata.buffer_to_metadata(buffer) return buffer_metadata except Exception as e: directory_path, filename = BufferMetadataCache.split_filepath(file) warnings.warn(f"One or more Buffers couldn't be opened {file}", UserWarning) return BufferMetadata(directory_path = directory_path, filename = filename, opening_error = str(e))
[docs] class BufferMetadataCache: """This class acts as a Cache for Buffer Metadata. It uses a database session with a buffer_metadata table to map metadata to files on the disk. The cache can be queried a lot faster than manually opening a lot of buffer files. """ BufferMetadata = BufferMetadata def __init__(self, session=None, Buffer_cls=Buffer, db_url="sqlite:///:memory:"): if session is not None: warnings.warn('The use of the session parameter is deprecated since version 2.3 and will be removed in two minor versions. Use the db_url keyword instead', DeprecationWarning, stacklevel=2) self.engine = session.get_bind() else: self.engine = create_engine(db_url) BufferMetadata.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) self.Buffer_cls = Buffer_cls
[docs] def synchronize_directory(self, *paths, sync_subdirectories = True, regex_pattern = "^.*[p][0-9]*[c][0-9]{1}[b][0-9]{2}", verbose = 1, delete_stale_entries = False, machine_id = None): """synchronize the buffer files in the given paths with the database matching the regex pattern :param paths: The absolute paths to the directory :type paths: str :param recursive: When True synchronize all of the subdirectories recursively, defaults to True :type recursive: bool, optional :param regex_pattern: The regex pattern validating the buffer naming format :type regex_pattern: string, optional :param verbose: verbosity level. 0 = no feedback, 1 = progress bar :type verbose: int, optional :param machine_id: An optional identifier for a certain machine to enable synchronization of different platforms :type machine_id: string, optional """ pattern = re.compile(regex_pattern) for path in paths: if sync_subdirectories: files = (str(file) for file in Path(path).rglob("*p*c?b*") if os.path.isfile(file) and pattern.match(str(file))) else: files = (str(file) for file in Path(path).glob("*p*c?b*") if os.path.isfile(file) and pattern.match(str(file))) unsynchronized_files, synchronized_missing_buffers = self.get_non_synchronized_files(files, machine_id) if delete_stale_entries: self.remove_files_from_cache(synchronized_missing_buffers, verbose = verbose) self.add_files_to_cache(unsynchronized_files, verbose = verbose, machine_id=machine_id)
def synchronize_database(self, *sync_connections): # TODO pass
[docs] def get_non_synchronized_files(self, files, machine_id): """calculate the difference between the set of files and the set of synchronized files :param files: filenames :type files: str :return: The set of files that are not synchronized, and the database entries that exist but the file is not present anymore """ file_set = set(files) with self.Session() as session: synchronized_buffers = set(buffer.filepath for buffer in session.query(self.BufferMetadata).filter(BufferMetadata.machine_id==machine_id).all()) unsynchronized_files = file_set.difference(synchronized_buffers) synchronized_missing_buffers = synchronized_buffers.difference(file_set) return unsynchronized_files, synchronized_missing_buffers
[docs] def add_files_to_cache(self, files, verbose=0, batch_size=1000, machine_id = None): """Add buffer files to the cache by providing the complete filepaths :param files: complete filepaths that are added to the cache. The filepath is used with the Buffer class to open a buffer and extract the header information. :type files: list, tuple of str :param verbose: verbosity level. 0 = no feedback, 1 = progress bar :type verbose: int, optional """ import time with self.Session() as session: with Pool() as pool: params = [(self.Buffer_cls, f) for f in files] f_gen = enumerate(pool.imap_unordered(_create_metadata, params)) f_gen = tqdm(f_gen, desc = "Adding Buffers", total=len(params)) if verbose > 0 and len(params) > 0 else f_gen for i, metadata in f_gen: metadata: BufferMetadata metadata.machine_id = machine_id session.add(metadata) if i % batch_size == 0: session.commit() session.commit()
[docs] def remove_files_from_cache(self, files, verbose = 0): '''Remove synchronized files from the cache :param files: complete filepaths that are present in the cache :type files: list, tuple of str :param verbose: verbosity level. 0 = no feedback, 1 = progress bar :type verbose: int, optional ''' with self.Session() as session: files = tqdm(files, desc = "Removing File Entries") if verbose > 0 and len(files) > 0 else files for file in files: try: entry = session.query(BufferMetadata).filter_by(filepath = file).one() if not entry: continue session.delete(entry) except Exception as e: session.rollback() raise e session.commit()
def _deprecated_get_matching_metadata(self, buffer_metadata: BufferMetadata = None, filter_function: Callable = None, sort_key: Callable = None): if (buffer_metadata is not None): q = self.get_buffer_metadata_query(buffer_metadata) elif filter_function is not None: q = select(BufferMetadata).from_statement(text("SELECT * FROM buffer_metadata")) else: raise ValueError("You need to provide either a BufferMetadata object or a filter function, or both") with self.Session() as session: metadata = list(session.execute(q).scalars()) if filter_function is not None: metadata = [m for m in metadata if filter_function(m)] if sort_key is not None: metadata.sort(key = sort_key) return metadata def _get_matching_metadata(self, query: Select = None): with self.Session() as session: matching_metadata = session.scalars(query).all() return matching_metadata
[docs] def get_matching_metadata(self, buffer_metadata: BufferMetadata = None, filter_function: Callable = None, sort_key: Callable = None, query: Select = None): """Query the cache for all BufferMetadata database entries matching :param query: A sqlalchemy select statement specifying the properties of the BufferMetadata objects :type query: Select :return: A list with the paths to the buffer files that match the buffer_metadata :rtype: list[str] """ if query is not None: return self._get_matching_metadata(query) warnings.warn("The usage of the parameters buffer_metadata, filter_function, sort_key is deprecated since version 2.3 and will be removed in two minor versions. Use the query parameter instead.", DeprecationWarning, stacklevel=2) return self._deprecated_get_matching_metadata(buffer_metadata, filter_function, sort_key)
[docs] def get_matching_files(self, buffer_metadata: BufferMetadata = None, filter_function: Callable = None, sort_key: Callable = None, query: Select = None): """Query the Cache for all files matching the properties that selected by the query object. The usage of the buffer_metadata, filter_functions and sort_key is deprecated and will be removed in two minor versions. Use the sqlalchemy query parameter instead. .. code-block:: python :linenos: BufferMetadataCache.get_matching_files( select(BM).filter(BM.channel==1, BM.compression_freq==4, BM.process > 100) ) # Returns all buffer filepaths with channel = 1, A frequency compression of 4, # processes above 100 sorted by the process number .. code-block:: python :linenos: ### DEPRECATED ### BufferMetadataCache.get_matching_files( buffer_metadata = BufferMetadata(channel=1, compression_frq=4), filter_function = lambda bm: bm.process>100, sort_key = lambda bm: bm.process) # Returns all buffer filepaths with channel = 1, A frequency compression of 4, # processes above 100 sorted by the process number :param buffer_metadata: A metadata object acting as the filter. Only buffers matching the attributes of the provided BufferMetadata object are selected. This operation is done on the database :type buffer_metadata: BufferMetadata :param filter_function: A function taking a BufferMetadata object as a parameter returning a boolean. This means a conjunction of BufferMetadata attributes. :type filter_function: function :param sort_key: A function taking a BufferMetadata object as a parameter returning an attribute the objects can be sorted with :type sort_key: function :param query: A sqlalchemy select statement specifying the properties of the BufferMetadata objects :type query: Select :return: A list with the paths to the buffer files that match the buffer_metadata :rtype: list[str] """ if any(p is not None for p in (buffer_metadata, filter_function, sort_key)): warnings.warn("The usage of the parameters buffer_metadata, filter_function, sort_key is deprecated since version 2.3 and will be removed in two minor versions. Use the query parameter instead.", DeprecationWarning, stacklevel=2) matching_metadata = self.get_matching_metadata(buffer_metadata, filter_function, sort_key, query) return [m.filepath for m in matching_metadata]
[docs] def get_matching_buffers(self, buffer_metadata: BufferMetadata = None, filter_function: Callable = None, sort_key: Callable = None, query: Select = None): """Calls get_matching_files and converts the result to Buffer objects :return: List of Buffer objects :rtype: list """ if any(p is not None for p in (buffer_metadata, filter_function, sort_key)): warnings.warn("The usage of the parameters buffer_metadata, filter_function, sort_key is deprecated since version 2.3 and will be removed in two minor versions. Use the query parameter instead.", DeprecationWarning, stacklevel=2) files = self.get_matching_files(buffer_metadata, filter_function, sort_key, query) buffers = [] for file in files: try: with self.Buffer_cls(file) as buffer: buffers.append(buffer) except Exception as e: warnings.warn(f'An error occured while parsing a file header, this file will be skipped: {file}') return buffers
[docs] def get_buffer_metadata_query(self, buffer_metadata): """Converts a .. py:class:: BufferMetadata object to a complete query. Every property of the object will be converted into SQL and returned as a ..py:class:: sqlalchemy.orm.query.FromStatement object :param buffer_metadata: The template BufferMetadata object. :type buffer_metadata: BufferMetadata :return: The sqlalchemy query object :rtype: sqlalchemy.orm.query.FromStatement """ q = "SELECT * FROM buffer_metadata WHERE opening_error IS NULL AND " for prop in self.BufferMetadata.properties: prop_value = getattr(buffer_metadata, prop) if prop_value is not None: if isinstance(prop_value, str): prop_value = f"'{prop_value}'" elif isinstance(prop_value, Enum): prop_value = f"'{prop_value.name}'" q += f"{prop} = {prop_value} AND " q = q[:-4]# prune the last AND return select(BufferMetadata).from_statement(text(q))
[docs] @staticmethod def create_session(engine = None, db_url = "sqlite:///:memory:"): """Create a session and initialize the schema for the BufferMetadataCache. If an engine is provided the schema will be expanded by the buffer_metadata table. :param engine: An instance of a sqlalchemy engine. Typically sqlalchemy.create_engine() :type engine: :param db_url: The string used to create the engine. This can be a psycopg2, mysql or sqlite3 string. The default will create the database in main memory. :type db_url: str :return: A sqlalchemy session instance :rtype: sqlalchemy.orm.Session """ if engine is None: engine = create_engine(db_url) session = Session(engine) BufferMetadata.metadata.create_all(engine) return session
[docs] @staticmethod def split_filepath(filepath): """Splits a filepath to folder and filename and returns them as a tuple :param filepath: _description_ :type filepath: str :return: A tuple containing (directory_path, filename) as strings :rtype: tuple(str) """ if "/" in filepath: filename = filepath.split("/")[-1] elif "\\" in filepath: filename = filepath.split("\\")[-1] directory_path = filepath[:-len(filename)] return directory_path, filename
[docs] def get_declarative_base(): """Getter for the declarative Base that is used by the :py:class:`BufferMetadataCache`. :return: declarative base class """ return __Base