"""Module implementing Tool and Invocation."""
import hashlib
from .cache import Cached, Hashable
from .graph import Node
[docs]class Invocation(Hashable, Cached, Node):
"""
Called tool connecting input data to output data.
This class represents a :class:`~.tool.Tool` with fully or
partially specified inputs, ready for computation and caching. It is
responsible for providing cache identifiers for all its outputs.
"""
[docs] def __init__(self, tool, args):
"""
Construct invocation object.
Args:
tool: the :class:`~.tool.Tool` object being invoked
args: tuple of arguments, which are either
:class:`~.data.Data` objects or `None`, in which
case a new placeholder type will be instantiated.
"""
self.tool = tool
if len(self.tool.INPUTS) != len(args):
print(self.tool.INPUTS, args)
raise ValueError(
f"Wrong number of inputs to {self.__class__.__name__}:"
f" Expected {len(self.tool.INPUTS)},"
f" received {len(args)}."
)
self._args = args
self.i = {}
for inp, v in zip(self.tool.INPUTS, args):
k = inp.name
if k == "self":
continue
self.i[k] = v
self._outputs = []
self._invocations = {}
for ii, outp in enumerate(self.tool.OUTPUTS):
n = outp.name
t = outp.__class__
o = t(name=n)
o.provider = self
o.provider_slot = ii
self._outputs.append(o)
for k, v in self.i.items():
setattr(self, "in_" + k, v)
[docs] def missing_args(self):
"""Count number of missing arguments."""
return sum(i.missing_args() for i in self.i.values())
[docs] def get_hash(self):
"""Compute a hash of this invocation."""
tool_hash = self.tool.get_hash()
input_hashes = [self.i[k].get_hash() for k in sorted(self.i.keys())]
return hashlib.md5(("invokedtool." +
tool_hash +
".".join(input_hashes)).encode("ascii")).hexdigest()
[docs] def cache_identifier(self, o):
"""Return identifier for cache which combines name of output with hash."""
return self.tool.__class__.__name__ + "." + self.get_hash() + "." + o.name
[docs] def cache_outputs(self):
"""Write outputs to cache."""
for o in self._outputs:
o.cache_write()
[docs] def get_outputs(self):
"""Implement a getter for evaluating outputs on demand."""
if len(self._outputs) == 1:
return self._outputs[0]
else:
return self._outputs
o = property(get_outputs, None)
[docs] def set_output(self, name, value, cache=True):
"""Set output, caching if requested."""
for o in self._outputs:
if o.name == name:
o.set_data(value)
if cache:
self.cache_outputs()
[docs] def populate(self):
"""
Compute the outputs by calling a tool's '.run()' method.
This method organizes all of the input :class:`.data.Data` and
:class:`.config.Option` parameters and passes them to the static
:meth:`.tool.Tool.run` method.
"""
needs_run = False
for o in self._outputs:
if not o.cache_request():
needs_run = True
if needs_run:
dataargs = tuple([self.i[k].data for k in self.i])
# Options must be passed as kwonly args
optionargs = self.tool.config.to_dict()
rets = self.tool.__class__.run(*dataargs, **optionargs)
if not isinstance(rets, tuple):
rets = (rets,) # single output tools don't need to return a 1-tuple
if len(rets) != len(self._outputs):
raise ValueError(f"Tool's run() method returns incorrect number of outputs")
for o, r in zip(self._outputs, rets):
o.data = r
[docs] def invoke(self, *args):
"""
Handle partial evaluation by invoking with more arguments.
The result of this method is another :class:`~.invocation.Invocation`.
If the same arguments (meaning the same objects, identified by python
id) are provided, the same invocation object is returned.
"""
argids = tuple([id(a) for a in args])
# Return another invocation
if argids in self._invocations.keys():
return self._invocations[argids]
# invoke inputs with arguments as needed
invokedinputs = []
for inp, a in zip(self.tool.INPUTS, args):
iname = inp.name
d = self.i[iname]
d = d(*args)
invokedinputs.append(d)
# create a new invocation
inv = Invocation(self.tool, invokedinputs)
self._invocations[argids] = inv
return inv
[docs] def parents(self):
"""Invocations depend on tool and all arguments."""
return [self.tool] + list(self._args)