Source code for qass.tools.analyzer.buffer_metadata_cache

#
# Copyright (c) 2022 QASS GmbH.
# Website: https://qass.net
# Contact: QASS GmbH <info@qass.net>
#
# This file is part of Qass tools
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List, Sequence, Tuple, Union
import re
import warnings
from sqlalchemy import (
    Float,
    create_engine,
    Column,
    Integer,
    String,
    BigInteger,
    Identity,
    Index,
    Enum,
    TypeDecorator,
    select,
    text,
)
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.sql.selectable import Select
from pathlib import Path
from enum import Enum
from tqdm.auto import tqdm
from multiprocessing.pool import Pool as Pool

from .buffer_parser import Buffer, InvalidFileError


__all__ = ["BufferMetadataCache", "BufferMetadata"]


class BufferEnum(TypeDecorator):
    impl = String

    def __init__(self, enumtype, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._enumtype = enumtype

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        return value.name

    def process_result_value(self, value, dialect):
        if value is None:
            return None
        return self._enumtype[str(value)]


__Base = declarative_base()


[docs] class BufferMetadata(__Base): """This class acts as a template for buffer files. It's properties represent all available metadata of a buffer file. This class is used internally as a database model and can be instantiated to provide a template for a buffer file by populating desired properties and passing the object to the cache which will in turn create a query based on this object. """ __tablename__ = "buffer_metadata" properties = ( "id", "project_id", "directory_path", "filename", "header_size", "process", "channel", "datamode", "datakind", "datatype", "process_time", "process_date_time", "db_header_size", "bytes_per_sample", "db_count", "full_blocks", "db_size", "db_sample_count", "frq_bands", "db_spec_count", "compression_frq", "compression_time", "avg_time", "avg_frq", "spec_duration", "frq_start", "frq_end", "frq_per_band", "sample_count", "spec_count", "adc_type", "bit_resolution", "fft_log_shift", "streamno", "preamp_gain", "analyzer_version", "partnumber", "header_hash", ) id = Column(Integer, Identity(start=1), primary_key=True) project_id = Column(BigInteger, index=True) directory_path = Column(String, nullable=False, index=True) filename = Column(String, nullable=False) header_hash = Column(String(64), index=True) machine_id = Column(String, nullable=True) header_size = Column(Integer) process = Column(Integer, index=True) channel = Column(Integer, index=True) datamode = Column( BufferEnum(Buffer.DATAMODE), index=True ) # this is an ENUM in buffer_parser datakind = Column(BufferEnum(Buffer.DATAKIND)) # this is an ENUM in buffer_parser datatype = Column(BufferEnum(Buffer.DATATYPE)) # this is an ENUM in buffer_parser process_time = Column(BigInteger) process_date_time = Column(String) db_header_size = Column(Integer) bytes_per_sample = Column(Integer) db_count = Column(Integer) full_blocks = Column(Integer) db_size = Column(Integer) db_sample_count = Column(Integer) frq_bands = Column(Integer) db_spec_count = Column(Integer) compression_frq = Column(Integer, index=True) compression_time = Column(Integer, index=True) avg_time = Column(Integer, index=True) avg_frq = Column(Integer, index=True) spec_duration = Column(Float) frq_start = Column(Integer) frq_end = Column(Integer) frq_per_band = Column(Float) sample_count = Column(BigInteger) spec_count = Column(BigInteger) adc_type = Column( BufferEnum(Buffer.ADCTYPE) ) # TODO this is an ENUM in buffer_parser bit_resolution = Column(Integer) fft_log_shift = Column(Integer) streamno = Column(Integer) preamp_gain = Column(Integer) analyzer_version = Column(String) partnumber = Column(String) opening_error = Column(String, nullable=True) @hybrid_property def filepath(self): return str(Path(str(self.directory_path)) / self.filename)
[docs] @staticmethod def buffer_to_metadata(buffer): """Converts a Buffer object to a BufferMetadata database object by copying all the @properties from the Buffer object putting them in the BufferMetadata object :param buffer: Buffer object :type buffer: buffer_parser.Buffer """ file = Path(buffer.filepath) directory_path = str(file.parent) buffer_metadata = BufferMetadata( filename=file.name, directory_path=directory_path ) for prop in BufferMetadata.properties: try: # try to map all the buffer properties and skip on error setattr( buffer_metadata, prop, getattr(buffer, prop) ) # get the @property method and execute it except Exception: continue return buffer_metadata
Index("project_id_process_index", BufferMetadata.project_id, BufferMetadata.process) Index( "project_id_process_channel_index", BufferMetadata.project_id, BufferMetadata.process, BufferMetadata.channel, ) Index( "compression_time_frq_index", BufferMetadata.compression_time, BufferMetadata.compression_frq, ) Index( "project_id_compression_time_frq_index", BufferMetadata.project_id, BufferMetadata.compression_time, BufferMetadata.compression_frq, ) def _create_metadata(args): """Create a metadata object from a Buffer object by trying to open the buffer file contained in the args tuple""" Buffer_cls, file = args try: with Buffer_cls(file.resolve()) as buffer: buffer_metadata = BufferMetadata.buffer_to_metadata(buffer) return buffer_metadata except InvalidFileError: pass except Exception as e: directory_path, filename = str(file.parent), file.name warnings.warn(f"One or more Buffers couldn't be opened {file}", UserWarning) return BufferMetadata( directory_path=directory_path, filename=filename, opening_error=str(e) )
[docs] class BufferMetadataCache: """This class acts as a Cache for Buffer Metadata. It uses a database session with a buffer_metadata table to map metadata to files on the disk. The cache can be queried a lot faster than manually opening a lot of buffer files. """ BufferMetadata = BufferMetadata def __init__(self, db_url="sqlite:///:memory:", Buffer_cls=Buffer): self.engine = create_engine(db_url) BufferMetadata.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) self.Buffer_cls = Buffer_cls
[docs] def synchronize_directory( self, *paths, sync_subdirectories=True, regex_pattern="^.*[p][0-9]*[c][0-9]{1}[b][0-9]{2}", verbose=1, delete_stale_entries=False, machine_id=None, glob_pattern="*p*c?b*", ): """synchronize the buffer files in the given paths with the database matching the regex pattern :param paths: The absolute paths to the directory :type paths: str :param recursive: When True synchronize all of the subdirectories recursively, defaults to True :type recursive: bool, optional :param regex_pattern: The regex pattern validating the buffer naming format (matched on file.name) :type regex_pattern: string, optional :param verbose: verbosity level. 0 = no feedback, 1 = progress bar :type verbose: int, optional :param machine_id: An optional identifier for a certain machine to enable synchronization of different platforms :type machine_id: string, optional :param glob_pattern: The pattern forwarded to Path.glob. This pattern acts as a preselection for the files retrieved for the regex pattern :type glob_pattern: string, optional """ pattern = re.compile(regex_pattern) for path in paths: if sync_subdirectories: files = [ file for file in Path(path).resolve().rglob(glob_pattern) if file.is_file() and pattern.match(file.name) ] else: files = [ file for file in Path(path).resolve().glob(glob_pattern) if file.is_file() and pattern.match(file.name) ] unsynchronized_files, synchronized_missing_buffers = ( self.get_non_synchronized_files(files, machine_id) ) if delete_stale_entries: self.remove_files_from_cache( synchronized_missing_buffers, verbose=verbose ) self.add_files_to_cache( unsynchronized_files, verbose=verbose, machine_id=machine_id, check_synced=False, )
[docs] def get_matching_metadata(self, query: Select) -> List[BufferMetadata]: """Query the cache for all BufferMetadata database entries matching :param query: A sqlalchemy select statement specifying the properties of the BufferMetadata objects :type query: Select :return: A list with the matching BufferMetadata objects :rtype: list[BufferMetadata] """ with self.Session() as session: matching_metadata = session.scalars(query).all() return matching_metadata
[docs] def get_matching_files(self, query: Select) -> List[str]: """Query the Cache for all files matching the properties that selected by the query object. The usage of the buffer_metadata, filter_functions and sort_key is deprecated and will be removed in two minor versions. Use the sqlalchemy query parameter instead. .. code-block:: python :linenos: BufferMetadataCache.get_matching_files( select(BM).filter(BM.channel==1, BM.compression_freq==4, BM.process > 100) ) # Returns all buffer filepaths with channel = 1, A frequency compression of 4, # processes above 100 sorted by the process number :param query: A sqlalchemy select statement specifying the properties of the BufferMetadata objects :type query: Select :return: A list with the paths to the buffer files that match the buffer_metadata :rtype: list[str] """ matching_metadata = self.get_matching_metadata(query) return [str(m.filepath) for m in matching_metadata]
[docs] def get_matching_buffers(self, query: Select) -> List[Buffer]: """Calls get_matching_files and converts the result to Buffer objects :return: List of Buffer objects :rtype: list """ files = self.get_matching_files(query) buffers = [] for file in files: try: with self.Buffer_cls(file) as buffer: buffers.append(buffer) except Exception: pass return buffers
[docs] def get_non_synchronized_files( self, files: Sequence[Path], machine_id: Union[str, None] = None, ) -> Tuple[List[Path], List[Path]]: """calculate the difference between the set of files and the set of synchronized files :param files: filenames :type files: Sequence[Path] :param machine_id: machine identifier :type machine_id: Union[str, None] :return: The set of files that are not synchronized, and the database entries that exist but the file is not present anymore """ file_set = set(files) with self.Session() as session: synchronized_buffers = set( Path(str(buffer.filepath)) for buffer in session.query(self.BufferMetadata) .filter(BufferMetadata.machine_id == machine_id) .all() ) unsynchronized_files = [ Path(p) for p in file_set.difference(synchronized_buffers) ] synchronized_missing_buffers = [ Path(p) for p in synchronized_buffers.difference(file_set) ] return unsynchronized_files, synchronized_missing_buffers
[docs] def add_files_to_cache( self, files: Sequence[Path], verbose: int = 0, batch_size: int = 1000, machine_id: Union[str, None] = None, check_synced: bool = True, ): """Add buffer files to the cache by providing the complete filepaths If a file (determined by filename and directory) is already synchronized it will be skipped :param files: complete filepaths that are added to the cache. The filepath is used with the Buffer class to open a buffer and extract the header information. :type files: list, tuple of str :param verbose: verbosity level. 0 = no feedback, 1 = progress bar :type verbose: int, optional :param batch_size: The batch size after which the cache will commit a batch to the database :type batch_size: int, optional :param machine_id: A unique identifier for a different machine :type machine_id: str, optional :param check_synced: Whether to check if the files that are about to be added are already synced to the cache :type check_synced: bool, optional """ if check_synced: files, _ = self.get_non_synchronized_files(files, machine_id) with self.Session() as session: with Pool() as pool: params = [(self.Buffer_cls, f) for f in files] f_gen = pool.imap_unordered(_create_metadata, params) f_gen = ( tqdm(f_gen, desc="Adding Buffers", total=len(params)) if verbose > 0 and len(params) > 0 else f_gen ) for i, metadata in enumerate(filter(lambda m: m is not None, f_gen)): metadata: BufferMetadata assert metadata is not None, "BufferMetadata object is None" metadata.machine_id = machine_id session.add(metadata) if i % batch_size == 0: session.commit() session.commit()
[docs] def remove_files_from_cache(self, files: List[Path], verbose=0): """Remove synchronized files from the cache :param files: complete filepaths that are present in the cache :type files: list, tuple of Path :param verbose: verbosity level. 0 = no feedback, 1 = progress bar :type verbose: int, optional """ with self.Session() as session: file_iter = ( tqdm(files, desc="Removing File Entries") if verbose > 0 and len(files) > 0 else files ) for file in file_iter: try: with self.Buffer_cls(file) as b: pass entry = ( session.query(self.BufferMetadata) .filter(self.BufferMetadata.header_hash == b.header_hash) .one() ) if not entry: continue session.delete(entry) except Exception as e: session.rollback() raise e session.commit()
[docs] def get_buffer_metadata_query(self, buffer_metadata): """Converts a .. py:class:: BufferMetadata object to a complete query. Every property of the object will be converted into SQL and returned as a ..py:class:: sqlalchemy.orm.query.FromStatement object :param buffer_metadata: The template BufferMetadata object. :type buffer_metadata: BufferMetadata :return: The sqlalchemy query object :rtype: sqlalchemy.orm.query.FromStatement """ q = "SELECT * FROM buffer_metadata WHERE opening_error IS NULL AND " for prop in self.BufferMetadata.properties: prop_value = getattr(buffer_metadata, prop) if prop_value is not None: if isinstance(prop_value, str): prop_value = f"'{prop_value}'" elif isinstance(prop_value, Enum): prop_value = f"'{prop_value.name}'" q += f"{prop} = {prop_value} AND " q = q[:-4] # prune the last AND return select(BufferMetadata).from_statement(text(q))
[docs] def get_declarative_base(): """Getter for the declarative Base that is used by the :py:class:`BufferMetadataCache`. :return: declarative base class """ return __Base