import csv
import io
import operator
import re
import typing as t
from collections import Counter
from datetime import datetime
from pathlib import Path, PosixPath
import numpy as np
from logfile_parser import Parser
from omero.gateway import BlitzGateway, TagAnnotationWrapper, _DatasetWrapper
from tqdm import tqdm
[docs]class OmeroExplorer:
[docs] def __init__(
self,
host: str,
user: str,
password: str,
min_date: t.Tuple[int, int, int] = (2020, 6, 1),
):
self.conn = BlitzGateway(user, password, host=host)
self.conn.connect()
self.min_date = min_date
self.backups = {}
self.removed = []
@property
def cache(self):
if not hasattr(self, "_cache"):
self._cache = {v.id: get_annotsets(v) for v in self.dsets}
return self._cache
@property
def raw_log(self):
return {k: v["log"] for k, v in self.cache.items()}
@property
def raw_log_end(self):
if not hasattr(self, "_raw_log_end"):
self._raw_log_end = {d.id: get_logfile(d) for d in self.dsets}
return self._raw_log_end
@property
def log(self):
return {k: parse_annot(v, "log") for k, v in self.raw_log.items()}
@property
def raw_acq(self):
return {k: v["acq"] for k, v in self.cache.items()}
@property
def acq(self):
return {k: parse_annot(v, "acq") for k, v in self.raw_acq.items()}
[docs] def load(self, min_id=18000, min_date=None):
"""
:min_id: int
:min_date: tuple
"""
if min_date is None:
min_date = self.min_date
self._dsets_bak = [
d for d in self.conn.getObjects("Dataset") if d.getId() > min_id
]
if min_date:
if len(min_date) < 3:
min_date = min_date + tuple(
[1 for _ in range(3 - len(min_date))]
)
min_date = datetime(*min_date)
# sort by dates
dates = [d.getDate() for d in self._dsets_bak]
self._dsets_bak[:] = [
a for _, a in sorted(zip(dates, self._dsets_bak))
]
self._dsets_bak = [
d for d in self._dsets_bak if d.getDate() >= min_date
]
self.dsets = self._dsets_bak
self.n_dsets
def image_ids(self):
return {
dset.getId(): [im.getId() for im in dset.listChildren()]
for dset in self.dsets
}
def dset(self, n):
try:
return [x for x in self.dsets if x.id == n][0]
except Exception as e:
print(f"Could not fetch all data xsets: {e}")
[docs] def channels(self, setkey, present=True):
"""
:setkey: str indicating a set of channels
:present: bool indicating whether the search should or shold not be in the dset
"""
self.dsets = [
v for v in self.acqs.values() if present == has_channels(v, setkey)
]
self.n_dsets
def update_cache(self):
if not hasattr(self, "acq") or not hasattr(self, "log"):
for attr in ["acq", "log"]:
print("Updating raw ", attr)
setattr(
self,
"raw_" + attr,
{v.id: get_annotsets(v)[attr] for v in self.dsets},
)
setattr(
self,
attr,
{
v.id: parse_annot(
getattr(self, "raw_" + attr)[v.id], attr
)
for v in self.dsets
},
)
else:
for attr in ["acq", "log", "raw_acq", "raw_log"]:
setattr(
self,
attr,
{i.id: getattr(self, attr)[i.id] for i in self.dsets},
)
@property
def dsets(self):
if not hasattr(self, "_dsets"):
self._dsets = self.load()
return self._dsets
@dsets.setter
def dsets(self, dsets):
if hasattr(self, "_dsets"):
if self._dsets is None:
self._dsets = []
self.removed += [
x for x in self._dsets if x.id not in [y.id for y in dsets]
]
self._dsets = dsets
@property
def all_tags(self):
if not hasattr(self, "_tags"):
self._tags = {
d.id: [
x.getValue()
for x in d.listAnnotations()
if isinstance(x, TagAnnotationWrapper)
]
for d in self.dsets
}
return self._tags
def get_timepoints(self):
self.image_wrappers = {
d.id: list(d.listChildren())[0] for d in self.dsets
}
return {k: i.getSizeT() for k, i in self.image_wrappers.items()}
[docs] def timepoints(self, n, op="greater"):
"Filter experiments using the number of timepoints"
op = operator.gt if op == "greater" else operator.le
self._timepoints = self.get_timepoints()
self.dsets = [
v for v in tqdm(self.dsets) if op(self._timepoints[v.id], n)
]
def microscope(self, microscope):
self.microscopes = {
dset.id: self.get_microscope(self.log[dset.id])
for dset in self.dsets
}
self.n_dsets
def get_microscope(self, parsed_log):
return parsed_log["microscope"]
def reset(self, backup_id=None):
self.dsets = self.backups.get(backup_id, self._dsets_bak)
self.n_dsets
def backup(self, name):
self.backups[name] = self.dsets
def reset_backup(self, name):
self.dsets = self.backups[name]
@staticmethod
def is_complete(logfile: str):
return logfile.endswith("Experiment completed\r\r\n")
@staticmethod
def count_errors(logfile: str):
return re.findall("ERROR CAUGHT", logfile)
@staticmethod
def count_drift_alert(logfile: str):
return re.findall("High drift alert!", logfile)
@staticmethod
def is_interrupted(logfile: str):
return "Experiment stopped by user" in logfile
@property
def n_dsets(self):
print("{} datasets.".format(len(self.dsets)))
@property
def desc(self):
for d in self.dsets:
print(
"{}\t{}\t{}\t{}".format(
d.getDate().strftime("%x"),
d.getId(),
d.getName(),
d.getDescription(),
)
)
@property
def ids(self):
return [d.getId() for d in self.dsets]
def get_ph_params(self):
t = [
{
ch: [exp, v]
for ch, exp, v in zip(
j["channel"], j["exposure"], j["voltage"]
)
if ch in {"GFPFast", "pHluorin405"}
}
for j in [i["channels"] for i in self.acqs]
]
ph_param_pairs = [
(tuple(x.values())) for x in t if np.all(list(x.values()))
]
return Counter([str(x) for x in ph_param_pairs])
def find_duplicate_candidates(self, days_tol=2):
# Find experiments with the same name or Aim and from similar upload dates
# and group them for cleaning
pass
def return_completed(self, kind="complete"):
return {
k: getattr(self, f"is_{kind}")(v.get("log", ""))
for k, v in self.cache.items()
}
def dset_count(
self,
dset: t.Union[int, _DatasetWrapper],
kind: str = "errors",
norm: bool = True,
):
if isinstance(dset, int):
dset = self.conn.getObject("Dataset", dset)
total_images_tps = sum([im.getSizeT() for im in dset.listChildren()])
return len(
getattr(self, f"count_{kind}")(
self.cache[dset.getId()].get("log", ""), norm=norm
)
) / (norm * total_images_tps)
def count_in_log(self, kind="errors", norm: bool = True):
return {
k: self.dset_count(k, kind=kind, norm=norm)
for k, v in self.cache.items()
}
@property
def complete(self):
self.completed = {
k: self.is_complete(v.get("log", ""))
for k, v in self.cache.items()
}
self.dsets = [dset for dset in self.dsets if self.completed[dset.id]]
return self.n_dsets
def save(self, fname):
with open(fname + ".tsv", "w") as f:
writer = csv.writer(f, delimiter="\t")
for d in self.dsets:
writer.writerow(
[
d.getDate().strftime("%x"),
d.getId(),
d.getName(),
d.getDescription(),
]
)
@property
def positions(self):
return {x.id: len(list(x.listChildren())) for x in self.dsets}
def has_tags(self, d, tags):
if set(tags).intersection(self.all_tags[d.id]):
return True
[docs]def convert_to_hours(delta):
total_seconds = delta.total_seconds()
hours = int(total_seconds // 3600)
return hours
[docs]class Argo(OmeroExplorer):
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]def annot_from_dset(dset, kind):
v = [x for x in dset.listAnnotations() if hasattr(x, "getFileName")]
infname = kind if kind == "log" else kind.title()
try:
acqfile = [x for x in v if x.getFileName().endswith(infname + ".txt")][
0
]
decoded = list(acqfile.getFileInChunks())[0].decode("utf-8")
acq = parse_annot(decoded, kind)
except Exception as e:
print(f"Conversion from acquisition file failed: {e}")
return {}
return acq
[docs]def check_channels(acq, channels, _all=True):
shared_channels = set(acq["channels"]["channel"]).intersection(channels)
condition = False
if _all:
if len(shared_channels) == len(channels):
condition = True
else:
if len(shared_channels):
condition = True
return condition
[docs]def get_chs(exptype):
# TODO Documentation
exptypes = {
"dual_ph": ("GFP", "pHluorin405", "mCherry"),
"ph": ("GFP", "pHluorin405"),
"dual": ("GFP", "mCherry"),
"mCherry": ("mCherry"),
}
return exptypes[exptype]
[docs]def load_annot_from_cache(exp_id, cache_dir="cache/"):
# TODO Documentation
if type(cache_dir) is not PosixPath:
cache_dir = Path(cache_dir)
annot_sets = {}
for fname in cache_dir.joinpath(exp_id).rglob("*.txt"):
fmt = fname.name[:3]
with open(fname, "r") as f:
annot_sets[fmt] = f.read()
return annot_sets
[docs]def get_annot(annot_sets, fmt):
"""
Get parsed annotation file
"""
str_io = annot_sets.get(fmt, None)
return parse_annot(str_io, fmt)
[docs]def parse_annot(str_io, fmt):
parser = Parser("multiDGUI_" + fmt + "_format")
return parser.parse(io.StringIO(str_io))
[docs]def get_annotsets(dset):
annot_files = [
annot.getFile()
for annot in dset.listAnnotations()
if hasattr(annot, "getFile")
]
annot_sets = {
ftype[:-4].lower(): annot
for ftype in ("log.txt", "Acq.txt", "Pos.txt")
for annot in annot_files
if ftype in annot.getName()
}
annot_sets = {
key: list(a.getFileInChunks())[0].decode("utf-8")
for key, a in annot_sets.items()
}
return annot_sets
[docs]def load_acq(dset):
# TODO Documentation
try:
acq = annot_from_dset(dset, kind="acq")
return acq
except Exception as e:
print(f"dset{dset.getId()}failed acq loading: {e}")
return False
[docs]def has_channels(dset, exptype):
# TODO Documentation
acq = load_acq(dset)
if acq:
return check_channels(acq, get_chs(exptype))
else:
return
[docs]def get_logfile(dset):
# TODO Documentation
annot_file = [
annot.getFile()
for annot in dset.listAnnotations()
if hasattr(annot, "getFile")
and annot.getFileName().endswith("log.txt")
][0]
return list(annot_file.getFileInChunks())[-1].decode("utf-8")