Source code hyperqueue/output.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import dataclasses
import uuid
from typing import List, Literal, Optional, Union

from .common import GenericPath
from .validation import ValidationException

FileOnCloseBehavior = Literal["none", "rm-if-finished"]


@dataclasses.dataclass
class StdioDef:
    path: Optional[GenericPath]
    on_close: FileOnCloseBehavior

    @staticmethod
    def from_path(path: GenericPath) -> "StdioDef":
        return StdioDef(path=path, on_close="none")

    @staticmethod
    def remove_if_finished(path: Optional[GenericPath] = None) -> "StdioDef":
        return StdioDef(path=path, on_close="rm-if-finished")


Stdio = Union[GenericPath, StdioDef]


# Keep in sync with `DEFAULT_STDOUT_PATH`
def default_output(ext: str) -> str:
    return "%{CWD}/%{TASK_ID}." + ext


def default_stdout() -> str:
    return default_output("stdout")


def default_stderr() -> str:
    return default_output("stderr")


# TODO: how to resolve TASK_ID in the context of some other task?
class Output:
    def __init__(self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None):
        if filepath and extension:
            raise ValidationException("Parameters `filepath` and `extension` are mutually exclusive")

        self.name = name
        self.filepath = filepath
        self.extension = extension.lstrip(".")


def generate_name(output: Output) -> str:
    if output.filepath is not None:
        return output.filepath
    extension = f".{output.extension}" or ""
    name = generate_random_name()
    return f"{name}{extension}"


def generate_random_name() -> str:
    return uuid.uuid4()[:16]


def materialize_outputs(collection) -> List[Output]:
    items = []
    if isinstance(collection, (list, tuple)):
        items = collection
    elif isinstance(collection, dict):
        items = collection.values()
    elif isinstance(collection, Output):
        items = [collection]
    return [item for item in items if isinstance(item, Output)]


def gather_outputs(collection) -> List[Output]:
    items = []
    if isinstance(collection, (list, tuple)):
        items = collection
    elif isinstance(collection, dict):
        items = collection.values()
    elif isinstance(collection, Output):
        items = [collection]
    return [item for item in items if isinstance(item, Output)]