Skip to content
Permalink
5953517c81
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1062 lines (850 sloc) 25.3 KB
#!/usr/bin/env python3
"""
Module for inserting data from YAML forms into a SpecDB database
"""
import datetime
import hashlib
from hashlib import sha256
import io
from io import StringIO
import json
import os
import sqlite3
import sys
import subprocess
import tarfile
import time
import ruamel.yaml
from specdb.Forms import forms
table_order = [
'user', 'project', 'target', 'construct', 'expression',
'purification_batch', 'buffer', 'buffer_components', 'pst',
'batch_components', 'spectrometer', 'probe', 'pulse_sequence']
param_files = ['acqus', 'audita.txt', 'log', 'procpar']
def check_yaml(file=None):
"""
Check to see that requested YAML follows expected structure.
Parameters
----------
+ file path to YAML file to be checked
Returns
-------
+ True if YAML file passes check
+ False if YAML file fails check
"""
if file is None:
print('no YAML file provided')
print('must provide a YAML file to be checked')
print('Aborting')
sys.exit()
# read yaml file to be checked
with open(file, 'rt') as fp:
yaml = ruamel.yaml.YAML()
record = yaml.load(fp)
# convert record to dictionary
#print(record)
rec = json.loads(json.dumps(record))
for tbl, dic in rec.items():
#print(tbl)
template = forms(table=[tbl])
temp = ruamel.yaml.safe_load(template)
#print(temp, type(temp), tbl)
if not temp: continue
if not temp[tbl]: continue
temp = json.loads(json.dumps(temp))[tbl]["0"]
#print(json.dumps(temp,indent=2))
rec_keys = list(rec[tbl].keys())
assert(isinstance(rec[tbl][rec_keys[0]], dict))
for ind, data in rec[tbl].items():
for key in data:
if key not in temp:
print(f'unknown key {key} in {file}')
return False
# for key in temp:
# if key not in data:
# rec[tbl][ind][key] = ''
return True, rec
def empty_form_check(data=None):
"""
Check if data provided from the form is completely empty, don't insert it
Parameters
----------
+ data dictionary from YAML form
Returns
-------
True if all empty
False if not all empty
"""
all_none = []
for v in data.values():
if type(v) == str:
if len(v) == 0: all_none.append(True)
elif type(v) == int:
if v == 0: all_none.append(True)
elif type(v) == float:
if v == 0.0: all_none.append(True)
else:
assert(v is None)
all_none.append(True)
if len(all_none) == len(data):
return True
return False
def find_uniq_constraint(table=None, cursor=None):
"""
Find the columns in specified table with a unique constraint on it
Parameters
----------
+ table which table in database to look at
+ cursor sqlite3 cursor object to perform the select
Returns
-------
+ cols returns the columns in table that have a unique constraint
as a list
"""
sql = f"SELECT sql FROM sqlite_master WHERE type='table' AND name='{table}'"
cursor.execute(sql)
schema = cursor.fetchone()
entries = [ tmp.strip() for tmp in schema[0].splitlines()
if tmp.find("UNIQUE")>=0 ]
assert(len(entries) > 0)
uniqs = entries[0][7:]
first = uniqs.split(')')
cols = first[0].split(', ')
return cols
def condition_checker(columns=None, dic=None):
"""
Parameters
----------
+ columns Name of SpecDB columns to query on
+ dic Data structure to check for consistency with SpecDB database
Returns
-------
+ col_str column to select on for the specific table requested
+ condition string that is condition for the SQLite SELECT statement
"""
if len(columns) == 1: col_str = columns[0]
else: col_str = ', '.join(columns)
condition = ''
for i in range(len(columns)):
condition += f"{columns[i]} == '{dic[columns[i]]}'"
if i < len(columns)-1: condition += " AND "
return col_str, condition
def insert_constructor(col_names, row):
"""
Construct the SQL insert statement for data to be inserted in a SpecDB
table.
Parameters
----------
+ col_names names of columns for the specific table we are inserting
into. these strings are used as keys into the dictionary
`row`.
+ row key/value pairs to be inserted into the SQLite table.
Returns
-------
+ cols table column names to be inserted.
+ VALS `,?` placeholder in SQLite Insert statement.
+ vals the actual values to be inserted into the table.
"""
cols = '('
vals = []
VALS = '('
for i, c in enumerate(col_names):
if c == 'id': continue
if c == 'isotopic_labeling': continue
cols += f'{c}'
VALS += '?'
if type(row[c]) == str:
if len(row[c]) == 0:
vals.append(None)
else:
vals.append(row[c])
else:
vals.append(row[c])
if i+1 < len(col_names):
cols += ', '
VALS += ', '
else:
cols += ')'
VALS += ')'
return cols, VALS, vals
def table_inserter(table=None, record=None, cursor=None):
"""
Insert data into SQLite table from SpecDB
Parameters
----------
+ table name of table to insert into
+ record key/value pairs to be inserted into table
keys are table column names, values are the data to be inserted
for the respective column
+ cursor sqlite3 cursor object
Returns
-------
int last rowid of row inserted
"""
assert(table is not None)
assert(record is not None)
assert(cursor is not None)
# cursor.execute(f"select * from {table} limit 1")
# table_cols = [i[0] for i in cursor.description]
# if table == 'buffer_components':
# assert('isotopic_labeling' in table_cols)
# table_cols.remove('isotopic_labeling')
table_cols = list(record.keys())
#print('table_cols', record.keys())
#print(json.dumps(record,indent=2))
columns, vals_place, values = insert_constructor(table_cols, record)
#print('columns', columns)
#print('vals_place', vals_place)
if values == [None] * len(values): return None
sql = f"INSERT INTO {table} {columns} VALUES{vals_place}"
#print(sql)
#print(values)
try:
cursor.execute(sql, values)
return True, cursor.lastrowid
except sqlite3.Error as e:
return False, e.args[0]
def table_updater(table=None, record=None, cursor=None):
"""
Update data in a SQLite table from SpecDB
Parameters
----------
+ table name of table to update
+ record keye/value pairs to be used to update table
+ cursor sqlite3 cursor object
Returns
-------
int rowid of row that was updated
"""
assert(table is not None)
assert(record is not None)
assert(cursor is not None)
# cursor.execute(f"SELECT * from {table} limit 1")
# table_cols = [i[0] for i in cursor.description]
table_cols = list(record.keys())
columns, vals_place, values = insert_constructor(table_cols, record)
if values == [None] * len(values): return None
sql = f"INSERT OR REPLACE INTO {table} {columns} VALUES{vals_place}"
#print(sql, 'SQL for updating !!! ')
for k,v in record.items():
if k == 'zipped_dir': continue
#print(k,v)
try:
cursor.execute(sql, values)
#print(cursor.lastrowid, 'lastrowid in updater')
return True, cursor.lastrowid
except sqlite3.Error as e:
return False, e.args[0]
def read_acqus(path=None):
"""
Read an acquisiton status file to find temperature in the experiment
Parameters
----------
+ path file path to acqus file
Returns
-------
+ temp temperature float value
"""
if not os.path.isfile(path): return None
temp = 0.0
probe = ''
pulse = ''
with open(path, 'r') as fp:
for line in fp.readlines():
line = line.rstrip()
if '##$TE= ' in line:
info = line.split('= ')
temp = info[-1]
temp = float(temp)
if '##$PULPROG= ' in line:
info = line.split('= ')
pulse = info[-1]
pulse = pulse.replace('<','')
pulse = pulse.replace('>','')
if '##$PROBHD= ' in line:
info = line.split('= ')
probe = info[-1]
probe = probe.replace(' ','_')
probe = probe.replace('<','')
probe = probe.replace('>','')
return temp, probe, pulse
return None
def read_audita(path=None):
"""
Read a Bruker `audita.txt` file for when pulse sequence started
Parameters
----------
+ path path to `audita.txt` to parse
Returns
+ time string of when pulse sequence started
"""
if not os.path.isfile(path): return None
time = ''
with open(path, 'r') as fp:
for line in fp.readlines():
line = line.rstrip()
if 'started at ' in line:
info = line.split()
time = ' '.join((
info[2],
info[3]))
return time
def insert_fid(files=None, cursor=None, write=False, dic=None):
"""
Insert time domain data into database
Parameters
----------
+ files list of files in FID directory
+ cursor SQLite cursor object to perform database operations
+ dic dictionary of information to insert into time_domain_datasets
table
Returns
-------
+ success True/False if/ifnot insert successful
+ str/dic depends on the errors from table_inserter()
"""
found = {}
for f in files:
if os.path.isfile(f):
name = os.path.basename(f)
if name in param_files: found[name] = f
if 'acqus' in found and 'audita.txt' in found:
temp, probe, pulse = read_acqus(path=found['acqus'])
exp_time = read_audita(path=found['audita.txt'])
dic['temperature_from_data'] = temp
dic['probe_info'] = probe
dic['pulse_sequence'] = pulse
dic['experiment_date'] = exp_time
fid_dirs = files[0].split('/')
zipped_path = '/'.join(fid_dirs[:-1])
up_path = '/'.join(fid_dirs[:-2])
cwd = os.getcwd()
os.chdir(up_path)
# cmd = f"find ./{dic['subdir_name']} -maxdepth 1"
# cmd += f" -type f -print0 | xargs -0"
# cmd += f" tar -zcvf specdb.tar.gz"
cmd = f"find ./{dic['subdir_name']} -type f -maxdepth 3 -regex"
cmd += f" '\.\/{dic['subdir_name']}\/[^\/]*' -print0 -or -regex"
cmd += f" '\.\/{dic['subdir_name']}\/pdata/1/proc.*$' -print0 -or -regex"
cmd += f" '\.\/{dic['subdir_name']}\/pdata/1/title$' -print0"
cmd += f" | xargs -0 tar -zcvf specdb.tar.gz"
os.system(cmd)
with open('specdb.tar.gz', 'rb') as fp: fbytes = fp.read()
dic['zipped_dir'] = fbytes
time_domain = [f for f in os.listdir(zipped_path)
if os.path.isfile(os.path.join(zipped_path, f))]
time_domain = [f for f in time_domain if f == 'fid' or f == 'ser']
assert(len(time_domain) == 1)
with open(os.path.join(zipped_path, time_domain[0]), 'rb') as fp:
fbytes = fp.read()
readable_hash = hashlib.md5(fbytes).hexdigest()
dic['md5checksum'] = readable_hash
success, last_row = insert_logic( # go ahead and insert
table='time_domain_dataset',
dic=dic,
cursor=cursor,
write=write)
os.remove('specdb.tar.gz')
if success is True: return success, dic, last_row
else: return success, last_row, None
def find_processing_scripts(files=None):
#print(files)
fid_dirs = files[0].split('/')
zipped_path = '/'.join(fid_dirs[:-1])
#print(zipped_path)
cwd = os.getcwd()
os.chdir(zipped_path)
processing = []
p = subprocess.getoutput("grep -li 'pipe' * | xargs grep 'csh'")
lines = p.split('\n')
for line in lines:
info = line.split(':')
file = info[0]
if file.endswith('.com'):
processing.append(os.path.join(zipped_path, file))
os.chdir(cwd)
#print(processing)
if len(processing) == 0: return None
return processing
def save_processing_scripts(files=None, fid_id=None, cursor=None):
#print(files)
scripts = find_processing_scripts(files=files)
if scripts is None: return None, None
#print(scripts)
for script in scripts:
data = {
'name': None,
'md5checksum': None,
'script': None
}
data['name'] = script.split('/')[-1]
with open(script, 'rb') as fp:
fbytes = fp.read()
readable_hash = hashlib.md5(fbytes).hexdigest()
data['md5checksum'] = readable_hash
data['script'] = fbytes
success, last_row = insert_logic( # go ahead and insert
table='processing_scripts',
dic=data,
write=True,
cursor=cursor)
if success is True:
assert(type(last_row) is not dict)
assert(type(last_row) is int)
paired_data = {
'fid_id' : fid_id,
'script_id' : last_row
}
status, row = insert_logic(
table='scripts_to_fids',
dic=paired_data,
write=True,
cursor=cursor
)
if status is True: continue
return status, row
elif success is False:
assert(type(last_row) is not dict)
return status, last_row
else:
print(f"Unexpected value {success}")
print("Aborting")
return False, None
return True, 0
def insert_logic(table=None, dic=None, write=False, cursor=None):
"""
Control logic for insertion into each of the tables
Parameters
----------
+ table which table to insert into
+ dic which data structure is given, read from JSON file
+ write whether to write new/update items in the database
+ cursor sqlite3 cursor object
Returns
------
+ status True/False
True if insertion was successful
False otherwise
+ value The lastrowid of table after successful insert
OR
dictionary of updated record
OR
dictionary of errors to write to LOG file
"""
# find the columns that serve as unique constraints
uniq_cols = find_uniq_constraint(table=table, cursor=cursor)
#print(uniq_cols)
#print(dic.keys())
#print(dic)
# determine if they are defined in dic
try:
uniq_pres = [True for uc in uniq_cols if len(dic[uc]) > 0]
except:
uniq_pres = [True for uc in uniq_cols if type(dic[uc]) is int and dic[uc] > 0]
#print(uniq_pres)
if len(uniq_pres) == len(uniq_cols):
empty = True
# if they are defined, check if the remaining columns are empty
# if they are empty, assume that the user wanted to pull in the info
# for the table entity
# otherwise insert as nornmal
# build strings for SELECT command
col_names, cond = condition_checker(columns=uniq_cols,dic=dic)
# check if all but uniq columns are empty
for k,v in dic.items():
if k in uniq_cols: continue
if type(v) == str:
if len(v) > 0:
empty = False
break
elif type(v) == int:
if v != 0:
empty = False
break
elif type(v) == float:
if v != 0.0:
empty = False
break
else: assert(v is None)
#print(empty)
if empty and len(dic) != len(uniq_pres): # attempt to fill JSON file with database info
sql = f"SELECT {col_names} FROM {table} WHERE {cond}"
cursor.execute(sql)
results = cursor.fetchall()
if len(results) == 0: # uniq col values don't exist and empty
status = False
msg = f"nothing to pull from database on table {table}"
msg += f"\nwhat was given:\n"
msg += f"{json.dumps(dic)}"
msg += "\n"
return status, msg
else: # they are in the database and rest are empty
assert(len(results) == 1)
sql = f"SELECT * FROM {table} WHERE {cond}"
cursor.execute(sql)
results = cursor.fetchall()
assert(len(results) == 1)
# set the JSON data to what is in db
tmp_results = dict(results[0])
del tmp_results['id']
dic = tmp_results
return True, dic
else: # not empty, user supplied some information, attempt to insert
sql = f"SELECT * FROM {table} WHERE {cond}"
#print(sql)
cursor.execute(sql)
results = cursor.fetchall()
#print(len(results))
if len(results) == 0: # supplied info not in database
if not write:
msg = "information provided is new\n"
msg += json.dumps(dic, indent='\t')
msg += "\nMust set --write to insert provided information"
msg += " to database\nAborting"
return False, msg
else:
#print('HERE IN INSERT LOGIC')
success, last_row = table_inserter( # go ahead and insert
table=table,
record=dic,
cursor=cursor)
if type(last_row) is str:
assert(success is False)
msg = f"SQLite error on insert on table {table}"
msg += "\nErr Message:\n"
msg += f"{last_row}"
msg += "\ncheck the template form for instructions and "
msg += "examples\n"
msg += "ensure all ids this table relates to (i.e "
msg += "constructs relate to targets) are inserted "
msg += "already\n"
msg += "Aborting"
return success, msg
return success, last_row
else: # table unique columns actually in db, make sure they match
assert(len(results) == 1)
#print(len(results))
tmp_results = dict(results[0])
#print(tmp_results)
#print(tmp_results['md5checksum'], print(tmp_results['id']))
#print(tmp_results['id'], 'tmp results id')
del tmp_results['id']
for k,v in dic.items():
if type(v) == str:
if len(v) == 0: dic[k] = None
if tmp_results != dic:
if not write:
success = False
msg = f"requested data is different than the database"
msg += "\ninput information:\n"
msg += json.dumps(dic, indent='\t')
msg += "\ndatabase information:\n"
msg += json.dumps(tmp_results, indent='\t')
msg += "\nAborting"
return success, msg
else:
#print('>>>> they are the same >>>\n')
success, last_row = table_updater(
table=table,
record=dic,
cursor=cursor
)
#print(last_row, 'here at 682 ??')
if type(last_row) is str:
assert(success is False)
msg = f"SQLite error on insert on table {table}"
msg += "\nErr Message:\n"
msg += f"{last_row}"
msg += "\ncheck the template form for examples and "
msg += "instructions\n"
msg += "ensure all ids this table relates to (i.e "
msg += "constructs relate to targets) are inserted "
msg += "already\n"
msg += "Aborting"
return success, msg
return success, last_row
else: return True, results[0]['id']
else:
msg = f"unique column ids {uniq_cols} cannot be blank"
msg += "\nmust provide some string for them"
msg += "\nAborting"
return False, msg
def insert(file=None, db=None, write=False):
"""
Add a single YAML file into SpecDB SQLite database.
Could be used for any data item type in database.
Parameters
----------
+ file path to YAML file to be inserted
+ db path to specdb database file
+ write whether to write new SpecDB records into the SpecDB
database, or to update records in database
Expires after each new write
Returns
-------
True If insertion successful
False If insertion is unsuccessful
"""
if file is None:
print('no YAML file provided')
print('must provide a YAML file to insert for specdb add')
print('Aborting')
sys.exit()
if db is None:
print('no specdb database file provided')
print('must provide a .db file for insert to work')
print('Aborting')
sys.exit()
# connect to database
conn = sqlite3.connect(os.path.abspath(db))
conn.row_factory = sqlite3.Row
c = conn.cursor()
# enforce foreign key constraints
sql = "PRAGMA foreign_keys = ON"
c.execute(sql)
# check yaml file to be inserted that it has the expected keys
status, record = check_yaml(file=file)
if not status:
print(f"JSON file {file} does not have expected keys")
print("Aborting")
sys.exit()
#
# # read YAML file to be inserted
# with open(file, 'rt') as fp:
# yaml = ruamel.yaml.YAML()
# record = yaml.load(fp)
#
# # convert record to plain dictionary
# record = json.loads(json.dumps(record))
# insert data from yaml in the specific table order
for table in table_order:
# not all tables need to be inserter at the same time
# for example user information can be inserted separately
if table not in record: continue
table_keys = list(record[table].keys())
assert(isinstance(record[table][table_keys[0]], dict))
for counter, data in record[table].items():
if empty_form_check(data=data):
msg = f"Table: {table} is completely empty"
msg += "\nAborting"
print(msg)
return False
status, value = insert_logic(
table=table,
dic=data,
write=write,
cursor=c
)
if status is True:
if type(value) is dict: record[table][counter] = value
elif type(value) is int:
conn.commit()
continue
else:
print("Unexpected type")
print("Aborting")
return False
elif status is False:
assert(type(value) is not dict)
print(value)
return False
else:
print(f"Unexpected value {status}")
print("Aborting")
return False
conn.commit()
if 'default_processing_scripts' in record:
for index, scripts in record['default_processing_scripts'].items():
# print(scripts)
try:
processing_script_path = os.path.abspath(
scripts['default_processing'])
except:
print('cannot get default processing script path')
print(f"given: {scripts['default_processing']}")
print('Aborting')
sys.exit()
with open(processing_script_path, 'rb') as fp:
fbytes = fp.read()
scripts['default_processing'] = fbytes
status, value = insert_logic(
table='default_processing_scripts',
dic=scripts,
cursor=c,
write=write
)
scripts['default_processing'] = processing_script_path
#print(status, value)
# now insert sessions if present
if 'session' in record:
file_path = os.path.abspath(file)
dirs = file_path.split('/')
session_folder = dirs[-2]
session_path = '/'.join(dirs[:-1])
rec_keys = list(record['session'].keys())
assert(isinstance(record['session'][rec_keys[0]], dict))
record['session']['0']['folder_name'] = session_folder
session_dic = record['session']['0']
status, value = insert_logic(
table='session',
dic=session_dic,
cursor=c,
write=write
)
if status is True:
if type(value) is dict:
record['session']['0'] = value
conn.commit()
elif type(value) is int:
conn.commit()
else:
print("Unexpected type")
print("Aborting")
return False
elif status is False:
assert(type(value) is not dict)
print(value, status)
return False
else:
print(f"Unexpected value {status}")
print("Aborting")
return False
session_id = value
if 'time_domain_dataset' not in record:
record['time_domain_dataset'] = dict()
fcounter = -1
for enum, sub in enumerate(os.listdir(session_path)):
if os.path.isdir(os.path.join(session_path, sub)):
subpath = os.path.join(session_path, sub)
if 'fid' in os.listdir(subpath) or 'ser' in os.listdir(subpath):
fcounter += 1
new_fid = {
'subdir_name': sub,
'pulse_sequence_nickname': None,
'probe_id': None,
'session_id': session_id,
'zipped_dir': None,
'md5checksum': None,
'temperature_from_data': None,
'experiment_date': None,
'pulse_sequence': None,
'probe_info': None,
'pst_id': session_dic['pst_id']
}
files = [
os.path.join(subpath, f) for f in os.listdir(
subpath)
]
status, value, last_row_counter = insert_fid(
files=files,
dic=new_fid,
cursor=c,
write=write)
if status is True:
assert(type(value) is dict)
del value['zipped_dir']
record['time_domain_dataset'][str(fcounter)] = value
conn.commit()
elif status is False:
print("Err message")
print(value)
print("moving to next fid")
continue
else:
print("Unexpected type")
print("Aborting")
return False
#print(files)
#print(last_row_counter)
assert(type(last_row_counter) is int)
status, val = save_processing_scripts(
files=files,
fid_id=last_row_counter,
cursor=c
)
if status == None: continue
if status is True: continue
else:
print(val)
return False
else:
rec_keys = list(record['time_domain_dataset'].keys())
assert(isinstance(record['time_domain_dataset'][rec_keys[0]], dict))
for ind, data in record['time_domain_dataset'].items():
new_fid = {
'subdir_name': None,
'pulse_sequence_nickname': None,
'probe_id': None,
'session_id': session_id,
'zipped_dir': None,
'md5checksum': None,
'temperature_from_data': None,
'experiment_date': None,
'pulse_sequence': None,
'probe_info': None,
'pst_id': session_dic['pst_id']
}
for k, v in data.items(): new_fid[k] = v
files = [
os.path.join(
session_path,
new_fid['subdir_name'],
f) for f in os.listdir(
os.path.join(
session_path,
new_fid['subdir_name']))]
status, value, place = insert_fid(
files=files,
dic=new_fid,
cursor=c,
write=write)
#print(status)
#print(value)
#print(place, 'place is none....')
if status is True:
assert(type(value) is dict)
del value['zipped_dir']
record['time_domain_dataset'][str(ind)] = value
conn.commit()
elif status is False:
print("Err message")
print(value)
print("moving to next fid")
continue
else:
print("Unexpected type")
print("Aborting")
return False
#assert(type(value) is int)
#print(list(value.keys()))
#sys.exit()
status, val = save_processing_scripts(
files=files,
fid_id=place,
cursor=c
)
if status is True: continue
else:
print(val)
return False
conn.commit()
conn.close()
print(f"Inserted data from form file at {file}")
numbers = []
for t in record: numbers.append(len(record[t]))
#print(numbers)
#print(record)
new_form = forms(table=record.keys(), num=numbers, input_dict=record)
with open(file, 'w') as fp:
fp.write(new_form)
#print(new_form)
# yaml = ruamel.yaml.YAML()
# yaml.preserve_quotes = True
# with open(file, 'w') as fp:
# string_stream = StringIO()
# yaml.dump(new_form, string_stream)
# form_out = string_stream.getvalue()
# string_stream.close()
# fp.write(form_out)
# print(form_out)
return True