Source code for datawork.api.invocation

"""Module implementing Tool and Invocation."""

import hashlib

from .cache import Cached, Hashable
from .graph import Node


[docs]class Invocation(Hashable, Cached, Node): """ Called tool connecting input data to output data. This class represents a :class:`~.tool.Tool` with fully or partially specified inputs, ready for computation and caching. It is responsible for providing cache identifiers for all its outputs. """
[docs] def __init__(self, tool, args): """ Construct invocation object. Args: tool: the :class:`~.tool.Tool` object being invoked args: tuple of arguments, which are either :class:`~.data.Data` objects or `None`, in which case a new placeholder type will be instantiated. """ self.tool = tool if len(self.tool.INPUTS) != len(args): print(self.tool.INPUTS, args) raise ValueError( f"Wrong number of inputs to {self.__class__.__name__}:" f" Expected {len(self.tool.INPUTS)}," f" received {len(args)}." ) self._args = args self.i = {} for inp, v in zip(self.tool.INPUTS, args): k = inp.name if k == "self": continue self.i[k] = v self._outputs = [] self._invocations = {} for ii, outp in enumerate(self.tool.OUTPUTS): n = outp.name t = outp.__class__ o = t(name=n) o.provider = self o.provider_slot = ii self._outputs.append(o) for k, v in self.i.items(): setattr(self, "in_" + k, v)
[docs] def missing_args(self): """Count number of missing arguments.""" return sum(i.missing_args() for i in self.i.values())
[docs] def get_hash(self): """Compute a hash of this invocation.""" tool_hash = self.tool.get_hash() input_hashes = [self.i[k].get_hash() for k in sorted(self.i.keys())] return hashlib.md5(("invokedtool." + tool_hash + ".".join(input_hashes)).encode("ascii")).hexdigest()
[docs] def cache_identifier(self, o): """Return identifier for cache which combines name of output with hash.""" return self.tool.__class__.__name__ + "." + self.get_hash() + "." + o.name
[docs] def cache_outputs(self): """Write outputs to cache.""" for o in self._outputs: o.cache_write()
[docs] def get_outputs(self): """Implement a getter for evaluating outputs on demand.""" if len(self._outputs) == 1: return self._outputs[0] else: return self._outputs
o = property(get_outputs, None)
[docs] def set_output(self, name, value, cache=True): """Set output, caching if requested.""" for o in self._outputs: if o.name == name: o.set_data(value) if cache: self.cache_outputs()
[docs] def populate(self): """ Compute the outputs by calling a tool's '.run()' method. This method organizes all of the input :class:`.data.Data` and :class:`.config.Option` parameters and passes them to the static :meth:`.tool.Tool.run` method. """ needs_run = False for o in self._outputs: if not o.cache_request(): needs_run = True if needs_run: dataargs = tuple([self.i[k].data for k in self.i]) # Options must be passed as kwonly args optionargs = self.tool.config.to_dict() rets = self.tool.__class__.run(*dataargs, **optionargs) if not isinstance(rets, tuple): rets = (rets,) # single output tools don't need to return a 1-tuple if len(rets) != len(self._outputs): raise ValueError(f"Tool's run() method returns incorrect number of outputs") for o, r in zip(self._outputs, rets): o.data = r
[docs] def invoke(self, *args): """ Handle partial evaluation by invoking with more arguments. The result of this method is another :class:`~.invocation.Invocation`. If the same arguments (meaning the same objects, identified by python id) are provided, the same invocation object is returned. """ argids = tuple([id(a) for a in args]) # Return another invocation if argids in self._invocations.keys(): return self._invocations[argids] # invoke inputs with arguments as needed invokedinputs = [] for inp, a in zip(self.tool.INPUTS, args): iname = inp.name d = self.i[iname] d = d(*args) invokedinputs.append(d) # create a new invocation inv = Invocation(self.tool, invokedinputs) self._invocations[argids] = inv return inv
[docs] def parents(self): """Invocations depend on tool and all arguments.""" return [self.tool] + list(self._args)