Source code for lost.pyapi.script

'''
A modul for basic interaction of custom scripts with the portal.
'''
__author__ = 'Jonas Jäger'

from lost.db import access
from lost.db import dtype, state
from lost.logic.file_man import FileMan
from lost.logic import log
from lost.pyapi import inout
import argparse
import datetime
import traceback
import os
from lost.logic.config import LOSTConfig
import json
import pickle
from lost.pyapi import pe_base
from lost.logic.label import LabelTree
from lost.pyapi import pipe_elements

def report_script_err(pipe_element, task, dbm, msg):
    '''Report an error for a script to portal

    Args:
        msg (str): The error message that should be reported.

    Note:
        You can call this method multiple times if you like. All messages
        will be concatenated and sent to the portal.
    '''
    if pipe_element.error_msg is None:
        pipe_element.error_msg = str(msg)
    else:
        pipe_element.error_msg += str(msg)
    debug_info = "\nPipeElementID = {}".format(pipe_element.idx)
    pipe_element.error_msg += debug_info
    pipe_element.state = state.PipeElement.SCRIPT_ERROR
    task.state = state.Pipe.ERROR
    dbm.add(task)
    dbm.add(pipe_element)
    dbm.commit()

[docs]class Script(pe_base.Element): '''Superclass for a user defined Script. Custom scripts need to inherit from Script and implement the main method. Attributes: pe_id (int): Pipe element id. Assign the pe id of a pipline script in order to emulate this script in a jupyter notebook for example. ''' def __init__(self, pe_id=None): if pe_id is None: parser = argparse.ArgumentParser(description='A user defined script.') parser.add_argument('--idx', nargs='?', action='store', help='Id of related pipeline element.') args = parser.parse_args() lostconfig = LOSTConfig() self.file_man = FileMan(lostconfig) dbm = access.DBMan(lostconfig) self._dbm = dbm #type: lost.db.access.DBMan if pe_id is None: pe = dbm.get_pipe_element(int(args.idx)) else: pe = dbm.get_pipe_element(pe_id) super().__init__(pe, dbm) logfile_path = self.file_man.get_pipe_log_path(self._pipe.idx) self._logger = log.get_file_logger(os.path.basename(pe.script.path), logfile_path) if self.pipe_info.logfile_path is None or not self.pipe_info.logfile_path: self.pipe_info.logfile_path = self.get_rel_path(logfile_path) self._inp = inout.Input(self) self._outp = inout.ScriptOutput(self) self.rejected_execution = False # If pe_id is None we have a normal script # If pe_id is not None a JupyterNotebook uses this script if pe_id is None: try: self.main() self.i_am_done() self._dbm.close_session() except: err_msg = str(datetime.datetime.now()) + '\n' err_msg += traceback.format_exc() self.report_err(err_msg) self._dbm.close_session() def __str__(self): my_str = 'I am a Script.\nMy name is: {}\nPipeElementID: {}'.format(self._pipe_element.script.name, self._pipe_element.idx) return my_str def main(self): #raise NotImplementedError("You need to implement a main method to get your Script running.") pass @property def logger(self): ''':class:`logging.Logger`: A standard python logger for this script. It will log to the pipline log file. ''' return self._logger @property def inp(self): ''':class:`lost.pyapi.inout.Input` ''' return self._inp #type: inout.Input @property def outp(self): ''':class:`lost.pyapi.inout.ScriptOutput` ''' return self._outp #type: inout.ScriptOutput
[docs] def get_rel_path(self, path): '''Get relativ path for current project Args: path (str): A absolute path Returns: str : Relative path ''' return self.file_man.get_rel_path(path)
[docs] def get_label_tree(self, name): '''Get a LabelTree by name. Args: name (str): Name of the desired LabelTree. Retruns: :class:`lost.logic.label.LabelTree` or None: If a label tree with the given name exists it will be returned. Otherwise None will be returned''' root_list = self._dbm.get_all_label_trees() root = next(filter(lambda x: x.name==name, root_list), None) if root is None: return None else: return LabelTree(self._dbm, root_leaf=root)
[docs] def create_label_tree(self, name, external_id=None): '''Create a new LabelTree Args: name (str): Name of the tree / name of the root leaf. external_id (str): An external id for the root leaf. Returns: :class:`lost.logic.label.LabelTree`: The created LabelTree. ''' tree = LabelTree(self._dbm) tree.create_root(name, external_id=external_id) return tree
[docs] def get_abs_path(self, path): '''Get absolute path in current file system. Args: path (str): A relative path. Returns: str: Absolute path ''' return self.file_man.get_abs_path(path)
[docs] def break_loop(self): '''Break next loop in pipeline. ''' loop_e = self._pipe_man.get_next_loop(self._pipe_element) if loop_e is not None: loop_e.loop.break_loop = True self._dbm.add(loop_e)
[docs] def loop_is_broken(self): '''Check if the current loop is broken''' loop_e = self._pipe_man.get_next_loop(self._pipe_element) if loop_e is not None: return loop_e.loop.break_loop else: self.logger.warning('loop_is_broken method was used, but no loop seems to be in this pipeline!') return False
[docs] def get_arg(self, arg_name): '''Get argument value by name for this script. Args: arg_name (str): Name of the argument. Returns: Value of the given argument. ''' if self._pipe_element.arguments: args = json.loads(self._pipe_element.arguments) return args[arg_name]['value'] else: return None
[docs] def get_path(self, file_name, context='instance', ptype='abs'): '''Get path for the filename in a specific context in filesystem. Args: file_name (str): Name or relative path for a file. context (str): Options: *instance*, *pipe*, *static* ptype (str): Type of this path. Can be relative or absolute Options: *abs*, *rel* Returns: str: Path to the file in the specified context. ''' if context == 'instance': path = os.path.join(self.instance_context, file_name) elif context == 'pipe': path = os.path.join(self.pipe_context, file_name) elif context == 'static': path = os.path.join(self.static_context, file_name) else: raise Exception('Unknown context: {}'.format(context)) if ptype == 'abs': return path elif ptype == 'rel': return self.get_rel_path(path) else: raise Exception('Unknown argument ptype: {}'.format(ptype))
@property def iteration(self): '''int: Get the current iteration. Number of times this script has been executed. ''' return self._pipe_element.iteration @property def instance_context(self): '''str: Get the path to store files that are only valid for this instance. ''' abs_path = self.file_man.create_instance_path(self._pipe_element) rel_path = self.file_man.make_path_relative(abs_path) self._pipe_element.instance_context = rel_path self._dbm.add(self._pipe_element) return abs_path @property def pipe_context(self): '''str: Root path to store files that should be visible for all elements in the pipeline. ''' return self.file_man.get_pipe_context_path(self._pipe_element) @property def static_context(self): '''str: Get the static path. Files that are stored at this path can be accessed by all instances of a script. ''' return os.path.join(self._lostconfig.project_path, os.path.split(self._pipe_element.script.path)[0]) @property def progress(self): '''float: Get current progress that is displayed in the progress bar of this script. Current progress in percent 0...100 ''' return self._pipe_element.progress
[docs] def update_progress(self, value): '''Update the progress for this script. Args: value (float): Progress in percent 0...100 ''' self._pipe_element.progress = value self._dbm.commit()
[docs] def reject_execution(self): '''Reject execution of this script and set it to PENDING again. Note: This method is useful if you want to execute this script only when some condition based on previous pipeline elements is meet. ''' self.rejected_execution = True
[docs] def get_alien_element(self, pe_id): '''Get an pipeline element by id from somewhere in the LOST system. It is an alien element since it is most likely not part of the pipeline instance this script belongs to. Args: pe_id (int): PipeElementID of the alien element. Returns: * :class:`lost.pyapi.script.Script` * :class:`lost.pyapi.pipe_elements.AnnoTask` * :class:`lost.pyapi.pipe_elements.Datasource` * :class:`lost.pyapi.pipe_elements.VisualOutput` * :class:`lost.pyapi.pipe_elements.DataExport` * :class:`lost.pyapi.pipe_elements.Loop` ''' pe = self._dbm.get_pipe_element(pe_id) if pe.dtype == dtype.PipeElement.SCRIPT: return Script(pe_id=pe_id) elif pe.dtype == dtype.PipeElement.ANNO_TASK: return pipe_elements.AnnoTask(pe, self._dbm) elif pe.dtype == dtype.PipeElement.DATASOURCE: return pipe_elements.Datasource(pe, self._dbm) elif pe.dtype == dtype.PipeElement.VISUALIZATION: return pipe_elements.VisualOutput(pe, self._dbm) elif pe.dtype == dtype.PipeElement.DATA_EXPORT: return pipe_elements.DataExport(pe, self._dbm) elif pe.dtype == dtype.PipeElement.LOOP: return pipe_elements.Loop(pe, self._dbm) else: raise Exception('Unknown pipe element type!')
def i_am_done(self): if self.rejected_execution: self._pipe_element.state = state.PipeElement.PENDING self._dbm.add(self._pipe) self._dbm.add(self._pipe_element) self._dbm.commit() return #Save all changes to database if self._pipe_element.is_debug_mode == False: self._pipe_element.state = state.PipeElement.FINISHED self._pipe_element.progress = 100.0 self._pipe.state = state.Pipe.IN_PROGRESS self._dbm.add(self._pipe) self._dbm.add(self._pipe_element) self._dbm.commit() else: answer = input("Have you finished debugging? [y/n]: ") if answer[0].lower() == 'y': self._pipe_element.state = state.PipeElement.FINISHED self._pipe_element.progress = 100.0 self._pipe.state = state.Pipe.IN_PROGRESS self._dbm.add(self._pipe) self._dbm.add(self._pipe_element) else: self.outp.clean_up() self._pipe_man.pipe.state = state.Pipe.IN_PROGRESS self._dbm.commit()
[docs] def report_err(self, msg): '''Report an error for this user script to portal Args: msg: The error message that should be reported. Note: You can call this method multiple times if you like. All messages will be concatenated an sent to the portal. ''' self.logger.error(msg) report_script_err(self._pipe_element, self._pipe, self._dbm, msg)