Source code for qass.tools.analyzer.buffer_metadata_cache
#
# Copyright (c) 2022 QASS GmbH.
# Website: https://qass.net
# Contact: QASS GmbH <info@qass.net>
#
# This file is part of Qass tools
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List, Sequence, Tuple, Union
import re
import warnings
from sqlalchemy import (
Float,
create_engine,
Column,
Integer,
String,
BigInteger,
Identity,
Index,
Enum,
TypeDecorator,
select,
text,
)
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.sql.selectable import Select
from pathlib import Path
from enum import Enum
from tqdm.auto import tqdm
from multiprocessing.pool import Pool as Pool
from .buffer_parser import Buffer, InvalidFileError
__all__ = ["BufferMetadataCache", "BufferMetadata"]
class BufferEnum(TypeDecorator):
impl = String
def __init__(self, enumtype, *args, **kwargs):
super().__init__(*args, **kwargs)
self._enumtype = enumtype
def process_bind_param(self, value, dialect):
if value is None:
return None
return value.name
def process_result_value(self, value, dialect):
if value is None:
return None
return self._enumtype[str(value)]
__Base = declarative_base()
[docs]
class BufferMetadata(__Base):
"""This class acts as a template for buffer files. It's properties represent all available metadata of a buffer file.
This class is used internally as a database model and can be instantiated to provide a template for a buffer file by
populating desired properties and passing the object to the cache which will in turn create a query based on this object.
"""
__tablename__ = "buffer_metadata"
properties = (
"id",
"project_id",
"directory_path",
"filename",
"header_size",
"process",
"channel",
"datamode",
"datakind",
"datatype",
"process_time",
"process_date_time",
"db_header_size",
"bytes_per_sample",
"db_count",
"full_blocks",
"db_size",
"db_sample_count",
"frq_bands",
"db_spec_count",
"compression_frq",
"compression_time",
"avg_time",
"avg_frq",
"spec_duration",
"frq_start",
"frq_end",
"frq_per_band",
"sample_count",
"spec_count",
"adc_type",
"bit_resolution",
"fft_log_shift",
"streamno",
"preamp_gain",
"analyzer_version",
"partnumber",
"header_hash",
)
id = Column(Integer, Identity(start=1), primary_key=True)
project_id = Column(BigInteger, index=True)
directory_path = Column(String, nullable=False, index=True)
filename = Column(String, nullable=False)
header_hash = Column(String(64), index=True)
machine_id = Column(String, nullable=True)
header_size = Column(Integer)
process = Column(Integer, index=True)
channel = Column(Integer, index=True)
datamode = Column(
BufferEnum(Buffer.DATAMODE), index=True
) # this is an ENUM in buffer_parser
datakind = Column(BufferEnum(Buffer.DATAKIND)) # this is an ENUM in buffer_parser
datatype = Column(BufferEnum(Buffer.DATATYPE)) # this is an ENUM in buffer_parser
process_time = Column(BigInteger)
process_date_time = Column(String)
db_header_size = Column(Integer)
bytes_per_sample = Column(Integer)
db_count = Column(Integer)
full_blocks = Column(Integer)
db_size = Column(Integer)
db_sample_count = Column(Integer)
frq_bands = Column(Integer)
db_spec_count = Column(Integer)
compression_frq = Column(Integer, index=True)
compression_time = Column(Integer, index=True)
avg_time = Column(Integer, index=True)
avg_frq = Column(Integer, index=True)
spec_duration = Column(Float)
frq_start = Column(Integer)
frq_end = Column(Integer)
frq_per_band = Column(Float)
sample_count = Column(BigInteger)
spec_count = Column(BigInteger)
adc_type = Column(
BufferEnum(Buffer.ADCTYPE)
) # TODO this is an ENUM in buffer_parser
bit_resolution = Column(Integer)
fft_log_shift = Column(Integer)
streamno = Column(Integer)
preamp_gain = Column(Integer)
analyzer_version = Column(String)
partnumber = Column(String)
opening_error = Column(String, nullable=True)
@hybrid_property
def filepath(self):
return str(Path(str(self.directory_path)) / self.filename)
[docs]
@staticmethod
def buffer_to_metadata(buffer):
"""Converts a Buffer object to a BufferMetadata database object by copying all the @properties from the Buffer
object putting them in the BufferMetadata object
:param buffer: Buffer object
:type buffer: buffer_parser.Buffer
"""
file = Path(buffer.filepath)
directory_path = str(file.parent)
buffer_metadata = BufferMetadata(
filename=file.name, directory_path=directory_path
)
for prop in BufferMetadata.properties:
try: # try to map all the buffer properties and skip on error
setattr(
buffer_metadata, prop, getattr(buffer, prop)
) # get the @property method and execute it
except Exception:
continue
return buffer_metadata
Index("project_id_process_index", BufferMetadata.project_id, BufferMetadata.process)
Index(
"project_id_process_channel_index",
BufferMetadata.project_id,
BufferMetadata.process,
BufferMetadata.channel,
)
Index(
"compression_time_frq_index",
BufferMetadata.compression_time,
BufferMetadata.compression_frq,
)
Index(
"project_id_compression_time_frq_index",
BufferMetadata.project_id,
BufferMetadata.compression_time,
BufferMetadata.compression_frq,
)
def _create_metadata(args):
"""Create a metadata object from a Buffer object by trying to open
the buffer file contained in the args tuple"""
Buffer_cls, file = args
try:
with Buffer_cls(file.resolve()) as buffer:
buffer_metadata = BufferMetadata.buffer_to_metadata(buffer)
return buffer_metadata
except InvalidFileError:
pass
except Exception as e:
directory_path, filename = str(file.parent), file.name
warnings.warn(f"One or more Buffers couldn't be opened {file}", UserWarning)
return BufferMetadata(
directory_path=directory_path, filename=filename, opening_error=str(e)
)
[docs]
class BufferMetadataCache:
"""This class acts as a Cache for Buffer Metadata. It uses a database session with a buffer_metadata table to map
metadata to files on the disk. The cache can be queried a lot faster than manually opening a lot of buffer files.
"""
BufferMetadata = BufferMetadata
def __init__(self, db_url="sqlite:///:memory:", Buffer_cls=Buffer):
self.engine = create_engine(db_url)
BufferMetadata.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self.Buffer_cls = Buffer_cls
[docs]
def synchronize_directory(
self,
*paths,
sync_subdirectories=True,
regex_pattern="^.*[p][0-9]*[c][0-9]{1}[b][0-9]{2}",
verbose=1,
delete_stale_entries=False,
machine_id=None,
glob_pattern="*p*c?b*",
):
"""synchronize the buffer files in the given paths with the database matching the regex pattern
:param paths: The absolute paths to the directory
:type paths: str
:param recursive: When True synchronize all of the subdirectories recursively, defaults to True
:type recursive: bool, optional
:param regex_pattern: The regex pattern validating the buffer naming format (matched on file.name)
:type regex_pattern: string, optional
:param verbose: verbosity level. 0 = no feedback, 1 = progress bar
:type verbose: int, optional
:param machine_id: An optional identifier for a certain machine to enable synchronization of different platforms
:type machine_id: string, optional
:param glob_pattern: The pattern forwarded to Path.glob. This pattern acts as a preselection for the
files retrieved for the regex pattern
:type glob_pattern: string, optional
"""
pattern = re.compile(regex_pattern)
for path in paths:
if sync_subdirectories:
files = [
file
for file in Path(path).resolve().rglob(glob_pattern)
if file.is_file() and pattern.match(file.name)
]
else:
files = [
file
for file in Path(path).resolve().glob(glob_pattern)
if file.is_file() and pattern.match(file.name)
]
unsynchronized_files, synchronized_missing_buffers = (
self.get_non_synchronized_files(files, machine_id)
)
if delete_stale_entries:
self.remove_files_from_cache(
synchronized_missing_buffers, verbose=verbose
)
self.add_files_to_cache(
unsynchronized_files,
verbose=verbose,
machine_id=machine_id,
check_synced=False,
)
[docs]
def get_matching_metadata(self, query: Select) -> List[BufferMetadata]:
"""Query the cache for all BufferMetadata database entries matching
:param query: A sqlalchemy select statement specifying the properties of the BufferMetadata objects
:type query: Select
:return: A list with the matching BufferMetadata objects
:rtype: list[BufferMetadata]
"""
with self.Session() as session:
matching_metadata = session.scalars(query).all()
return matching_metadata
[docs]
def get_matching_files(self, query: Select) -> List[str]:
"""Query the Cache for all files matching the properties that selected by the query object.
The usage of the buffer_metadata, filter_functions and sort_key is deprecated and will be removed in
two minor versions. Use the sqlalchemy query parameter instead.
.. code-block:: python
:linenos:
BufferMetadataCache.get_matching_files(
select(BM).filter(BM.channel==1, BM.compression_freq==4, BM.process > 100)
)
# Returns all buffer filepaths with channel = 1, A frequency compression of 4,
# processes above 100 sorted by the process number
:param query: A sqlalchemy select statement specifying the properties of the BufferMetadata objects
:type query: Select
:return: A list with the paths to the buffer files that match the buffer_metadata
:rtype: list[str]
"""
matching_metadata = self.get_matching_metadata(query)
return [str(m.filepath) for m in matching_metadata]
[docs]
def get_matching_buffers(self, query: Select) -> List[Buffer]:
"""Calls get_matching_files and converts the result to Buffer objects
:return: List of Buffer objects
:rtype: list
"""
files = self.get_matching_files(query)
buffers = []
for file in files:
try:
with self.Buffer_cls(file) as buffer:
buffers.append(buffer)
except Exception:
pass
return buffers
[docs]
def get_non_synchronized_files(
self,
files: Sequence[Path],
machine_id: Union[str, None] = None,
) -> Tuple[List[Path], List[Path]]:
"""calculate the difference between the set of files and the set of synchronized files
:param files: filenames
:type files: Sequence[Path]
:param machine_id: machine identifier
:type machine_id: Union[str, None]
:return: The set of files that are not synchronized, and the database entries that exist but the file is not present anymore
"""
file_set = set(files)
with self.Session() as session:
synchronized_buffers = set(
Path(str(buffer.filepath))
for buffer in session.query(self.BufferMetadata)
.filter(BufferMetadata.machine_id == machine_id)
.all()
)
unsynchronized_files = [
Path(p) for p in file_set.difference(synchronized_buffers)
]
synchronized_missing_buffers = [
Path(p) for p in synchronized_buffers.difference(file_set)
]
return unsynchronized_files, synchronized_missing_buffers
[docs]
def add_files_to_cache(
self,
files: Sequence[Path],
verbose: int = 0,
batch_size: int = 1000,
machine_id: Union[str, None] = None,
check_synced: bool = True,
):
"""Add buffer files to the cache by providing the complete filepaths
If a file (determined by filename and directory) is already synchronized it will be skipped
:param files: complete filepaths that are added to the cache. The filepath is used with the Buffer class to open a buffer and extract the header information.
:type files: list, tuple of str
:param verbose: verbosity level. 0 = no feedback, 1 = progress bar
:type verbose: int, optional
:param batch_size: The batch size after which the cache will commit a batch to the database
:type batch_size: int, optional
:param machine_id: A unique identifier for a different machine
:type machine_id: str, optional
:param check_synced: Whether to check if the files that are about to be added are already synced to the cache
:type check_synced: bool, optional
"""
if check_synced:
files, _ = self.get_non_synchronized_files(files, machine_id)
with self.Session() as session:
with Pool() as pool:
params = [(self.Buffer_cls, f) for f in files]
f_gen = pool.imap_unordered(_create_metadata, params)
f_gen = (
tqdm(f_gen, desc="Adding Buffers", total=len(params))
if verbose > 0 and len(params) > 0
else f_gen
)
for i, metadata in enumerate(filter(lambda m: m is not None, f_gen)):
metadata: BufferMetadata
assert metadata is not None, "BufferMetadata object is None"
metadata.machine_id = machine_id
session.add(metadata)
if i % batch_size == 0:
session.commit()
session.commit()
[docs]
def remove_files_from_cache(self, files: List[Path], verbose=0):
"""Remove synchronized files from the cache
:param files: complete filepaths that are present in the cache
:type files: list, tuple of Path
:param verbose: verbosity level. 0 = no feedback, 1 = progress bar
:type verbose: int, optional
"""
with self.Session() as session:
file_iter = (
tqdm(files, desc="Removing File Entries")
if verbose > 0 and len(files) > 0
else files
)
for file in file_iter:
try:
with self.Buffer_cls(file) as b:
pass
entry = (
session.query(self.BufferMetadata)
.filter(self.BufferMetadata.header_hash == b.header_hash)
.one()
)
if not entry:
continue
session.delete(entry)
except Exception as e:
session.rollback()
raise e
session.commit()
[docs]
def get_buffer_metadata_query(self, buffer_metadata):
"""Converts a .. py:class:: BufferMetadata object to a complete query. Every property of the object will be converted into
SQL and returned as a ..py:class:: sqlalchemy.orm.query.FromStatement object
:param buffer_metadata: The template BufferMetadata object.
:type buffer_metadata: BufferMetadata
:return: The sqlalchemy query object
:rtype: sqlalchemy.orm.query.FromStatement
"""
q = "SELECT * FROM buffer_metadata WHERE opening_error IS NULL AND "
for prop in self.BufferMetadata.properties:
prop_value = getattr(buffer_metadata, prop)
if prop_value is not None:
if isinstance(prop_value, str):
prop_value = f"'{prop_value}'"
elif isinstance(prop_value, Enum):
prop_value = f"'{prop_value.name}'"
q += f"{prop} = {prop_value} AND "
q = q[:-4] # prune the last AND
return select(BufferMetadata).from_statement(text(q))
[docs]
def get_declarative_base():
"""Getter for the declarative Base that is used by the :py:class:`BufferMetadataCache`.
:return: declarative base class
"""
return __Base