Source code hyperqueue/task/task.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from pathlib import Path
from typing import Dict, Optional, Sequence, Union

from ..common import GenericPath
from ..ffi import TaskId
from ..ffi.protocol import ResourceRequest
from ..output import Stdio, StdioDef
from ..validation import ValidationException

EnvType = Dict[str, str]


def _make_ffi_requests(
    resources: Optional[Union[ResourceRequest, Sequence[ResourceRequest]]],
):
    if resources is None:
        return ()
    elif isinstance(resources, ResourceRequest):
        return (resources,)
    else:
        return resources


class Task:
    def __init__(
        self,
        task_id: TaskId,
        dependencies: Sequence["Task"] = (),
        priority: int = 0,
        resources: Optional[ResourceRequest] = None,
        env: Optional[EnvType] = None,
        cwd: Optional[GenericPath] = None,
        stdout: Optional[Stdio] = None,
        stderr: Optional[Stdio] = None,
        name: Optional[str] = None,
    ):
        assert dependencies is not None
        self.task_id = task_id
        self.dependencies = dependencies
        self.priority = priority
        self.resources = resources
        self.env = env or {}
        self.cwd = str(cwd) if cwd else None
        self.stdout = build_stdio(stdout, "stdout")
        self.stderr = build_stdio(stderr, "stderr")
        self.name = name

    @property
    def label(self) -> str:
        """
        Returns the label of the task.
        If the task has an assigned name, the label is equal to the name.
        Otherwise, the label is the ID of the task converted to a string.
        """
        return self.name if self.name is not None else str(self.task_id)

    def _build(self, client):
        raise NotImplementedError


def build_stdio(stdio: Optional[Stdio], stream: str) -> Optional[StdioDef]:
    if isinstance(stdio, (str, Path)):
        return StdioDef.from_path(stdio)
    elif isinstance(stdio, StdioDef):
        return stdio
    elif stdio is None:
        return None
    else:
        raise ValidationException(
            f"Invalid value provided for `{stream}`: {type(stdio)}. Expected str, Path or `StdioDef` or `None`."
        )