'''
A modul for basic interaction of custom scripts with the portal.
'''
__author__ = 'Jonas Jäger'
from lost.db import access
from lost.db import dtype, state
from lost.logic.file_access import UserFileAccess
from lost.logic import log
from lost.pyapi import inout
import argparse
import datetime
import traceback
import os
from lostconfig import LOSTConfig
import json
from lost.pyapi import pe_base
from lost.logic.label import LabelTree
from lost.pyapi import pipe_elements
from lost.logic.db_access import UserDbAccess
import ast
def report_script_err(pipe_element, task, dbm, msg):
'''Report an error for a script to portal
Args:
msg (str): The error message that should be reported.
Note:
You can call this method multiple times if you like. All messages
will be concatenated and sent to the portal.
'''
if pipe_element.error_msg is None:
pipe_element.error_msg = str(msg)
else:
pipe_element.error_msg += str(msg)
debug_info = "\nPipeElementID = {}".format(pipe_element.idx)
pipe_element.error_msg += debug_info
pipe_element.state = state.PipeElement.SCRIPT_ERROR
task.state = state.Pipe.ERROR
dbm.add(task)
dbm.add(pipe_element)
dbm.commit()
[docs]class Script(pe_base.Element):
'''Superclass for a user defined Script.
Custom scripts need to inherit from Script and implement the main method.
Attributes:
pe_id (int): Pipe element id. Assign the pe id of a pipline script
in order to emulate this script in a jupyter notebook for example.
'''
def __init__(self, pe_id=None):
if pe_id is None:
parser = argparse.ArgumentParser(description='A user defined script.')
parser.add_argument('--idx', nargs='?', action='store',
help='Id of related pipeline element.')
args = parser.parse_args()
lostconfig = LOSTConfig()
dbm = access.DBMan(lostconfig)
self._dbm = dbm #type: lost.db.access.DBMan
if pe_id is None:
pe = dbm.get_pipe_element(int(args.idx))
else:
pe = dbm.get_pipe_element(pe_id)
super().__init__(pe, dbm)
user_id = self.pipe_info.user.idx
self.user_id = user_id
db_fs = dbm.get_user_default_fs(user_id)
self.ufa = UserFileAccess(dbm, self.pipe_info.user, db_fs)
self.dba = UserDbAccess(dbm, user_id)
logfile_path = self.ufa.get_pipe_log_path(self._pipe.idx)
self._log_stream = self.ufa.fs.open(logfile_path, 'a')
self._logger = log.get_stream_logger(os.path.basename(pe.script.path),
self._log_stream)
if self.pipe_info.logfile_path is None or not self.pipe_info.logfile_path:
self.pipe_info.logfile_path = logfile_path
self._inp = inout.Input(self)
self._outp = inout.ScriptOutput(self)
self.rejected_execution = False
# If pe_id is None we have a normal script
# If pe_id is not None a JupyterNotebook uses this script
if pe_id is None:
return self._run()
def _run(self, ret_success=False):
try:
self.main()
self.i_am_done()
success = 'PipeElementID: {}, Successfully executed script: {}'.format(
self._pipe_element.idx, self._pipe_element.script.name)
self._dbm.close_session()
if ret_success:
return success
except:
err_msg = str(datetime.datetime.now()) + '\n'
err_msg += traceback.format_exc()
self.report_err(err_msg)
self._dbm.close_session()
def __str__(self):
my_str = 'I am a Script.\nMy name is: {}\nPipeElementID: {}'.format(self._pipe_element.script.name,
self._pipe_element.idx)
return my_str
def main(self):
#raise NotImplementedError("You need to implement a main method to get your Script running.")
pass
@property
def logger(self):
''':class:`logging.Logger`: A standard python logger for this script.
It will log to the pipline log file.
'''
return self._logger
@property
def inp(self):
''':class:`lost.pyapi.inout.Input`
'''
return self._inp #type: inout.Input
@property
def outp(self):
''':class:`lost.pyapi.inout.ScriptOutput`
'''
return self._outp #type: inout.ScriptOutput
[docs] def get_label_tree(self, name):
'''Get a LabelTree by name.
Args:
name (str): Name of the desired LabelTree.
Retruns:
:class:`lost.logic.label.LabelTree` or None:
If a label tree with the given name exists
it will be returned. Otherwise None
will be returned'''
group_id = self._pipe.group_id
root_list = self._dbm.get_all_label_trees(group_id, add_global=True)
root = next(filter(lambda x: x.name==name, root_list), None)
if root is None:
return None
else:
return LabelTree(self._dbm, root_leaf=root)
[docs] def create_label_tree(self, name, external_id=None):
'''Create a new LabelTree
Args:
name (str): Name of the tree / name of the root leaf.
external_id (str): An external id for the root leaf.
Returns:
:class:`lost.logic.label.LabelTree`:
The created LabelTree.
'''
tree = LabelTree(self._dbm)
tree.create_root(name, external_id=external_id)
return tree
[docs] def break_loop(self):
'''Break next loop in pipeline.
'''
loop_e = self._pipe_man.get_next_loop(self._pipe_element)
if loop_e is not None:
loop_e.loop.break_loop = True
self._dbm.add(loop_e)
self._dbm.commit()
[docs] def loop_is_broken(self):
'''Check if the current loop is broken'''
loop_e = self._pipe_man.get_next_loop(self._pipe_element)
if loop_e is not None:
return loop_e.loop.break_loop
else:
self.logger.warning('loop_is_broken method was used, but no loop seems to be in this pipeline!')
return False
[docs] def get_arg(self, arg_name):
'''Get argument value by name for this script.
Args:
arg_name (str): Name of the argument.
Returns:
Value of the given argument.
'''
if self._pipe_element.arguments:
args = json.loads(self._pipe_element.arguments)
# args = ast.literal_eval(self._pipe_element.arguments)
my_arg = args[arg_name]['value']
if my_arg in ['t', 'true', 'yes']:
return True
if my_arg in ['f', 'false', 'no']:
return False
if my_arg in ['-', '', '[]']:
return None
try:
return ast.literal_eval(my_arg)
except:
return my_arg
else:
return None
[docs] def get_fs(self, name=None):
'''Get default lost filesystem or a specific filesystem by name.
Returns:
fsspec.spec.AbstractFileSystem: See https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem
'''
if name is None:
return self.ufa.fs
return self.ufa.get_fs(name)
[docs] def get_path(self, file_name, context='instance'):
'''Get path for the filename in a specific context in filesystem.
Args:
file_name (str): Name or relative path for a file.
context (str): Options: *instance*, *pipe*
Returns:
str: Absolute path to the file in the specified context.
'''
if context == 'instance':
path = os.path.join(self.ufa.get_instance_path(self._pe), file_name)
elif context == 'pipe':
path = os.path.join(self.ufa.get_pipe_context_path(self._pe), file_name)
else:
raise Exception('Unknown context: {}. Should be *instance* or *pipe*!'.format(context))
return path
@property
def iteration(self):
'''int: Get the current iteration.
Number of times this script has been executed.
'''
return self._pipe_element.iteration
@property
def progress(self):
'''float: Get current progress that is displayed in the progress bar of this script.
Current progress in percent 0...100
'''
return self._pipe_element.progress
[docs] def update_progress(self, value):
'''Update the progress for this script.
Args:
value (float): Progress in percent 0...100
'''
self._pipe_element.progress = value
self._dbm.commit()
[docs] def reject_execution(self):
'''Reject execution of this script and set it to PENDING again.
Note:
This method is useful if you want to execute this script only
when some condition based on previous pipeline elements is
meet.
'''
self.rejected_execution = True
[docs] def get_alien_element(self, pe_id):
'''Get an pipeline element by id from somewhere in the LOST system.
It is an alien element since it is most likely not part of the
pipeline instance this script belongs to.
Args:
pe_id (int): PipeElementID of the alien element.
Returns:
* :class:`lost.pyapi.script.Script`
* :class:`lost.pyapi.pipe_elements.AnnoTask`
* :class:`lost.pyapi.pipe_elements.Datasource`
* :class:`lost.pyapi.pipe_elements.VisualOutput`
* :class:`lost.pyapi.pipe_elements.DataExport`
* :class:`lost.pyapi.pipe_elements.Loop`
'''
pe = self.dba.get_alien(pe_id)
if pe.dtype == dtype.PipeElement.SCRIPT:
return Script(pe_id=pe_id)
elif pe.dtype == dtype.PipeElement.ANNO_TASK:
return pipe_elements.AnnoTask(pe, self._dbm)
elif pe.dtype == dtype.PipeElement.DATASOURCE:
return pipe_elements.Datasource(pe, self._dbm)
elif pe.dtype == dtype.PipeElement.VISUALIZATION:
return pipe_elements.VisualOutput(pe, self._dbm)
elif pe.dtype == dtype.PipeElement.DATA_EXPORT:
return pipe_elements.DataExport(pe, self._dbm)
elif pe.dtype == dtype.PipeElement.LOOP:
return pipe_elements.Loop(pe, self._dbm)
else:
raise Exception('Unknown pipe element type!')
def i_am_done(self):
if self.rejected_execution:
self._pipe_element.state = state.PipeElement.PENDING
self._dbm.add(self._pipe)
self._dbm.add(self._pipe_element)
self._dbm.commit()
return
#Save all changes to database
if self._pipe_element.is_debug_mode == False:
self._pipe_element.state = state.PipeElement.FINISHED
self._pipe_element.progress = 100.0
self._pipe.state = state.Pipe.IN_PROGRESS
self._dbm.add(self._pipe)
self._dbm.add(self._pipe_element)
self._dbm.commit()
else:
answer = input("Have you finished debugging? [y/n]: ")
if answer[0].lower() == 'y':
self._pipe_element.state = state.PipeElement.FINISHED
self._pipe_element.progress = 100.0
self._pipe.state = state.Pipe.IN_PROGRESS
self._dbm.add(self._pipe)
self._dbm.add(self._pipe_element)
else:
self.outp.clean_up()
self._pipe_man.pipe.state = state.Pipe.IN_PROGRESS
self._dbm.commit()
self._log_stream.close()
[docs] def report_err(self, msg):
'''Report an error for this user script to portal
Args:
msg: The error message that should be reported.
Note:
You can call this method multiple times if you like. All messages
will be concatenated an sent to the portal.
'''
self.logger.error(msg)
report_script_err(self._pipe_element, self._pipe, self._dbm, msg)