pymake

A build system based on Build Systems à la Carte
git clone https://git.grace.moe/pymake
Log | Files | Refs | README

__init__.py (20151B)


      1 """
      2 make.py
      3 -------
      4 
      5 Design inspired by the paper "Build Systems à la Carte"
      6 
      7 - https://github.com/snowleopard/build
      8 - https://www.microsoft.com/en-us/research/wp-content/uploads/2018/03/build-systems.pdf
      9 
     10 Key concepts:
     11 
     12 - The goal is to maintain an up-to-date *store* mapping *tasks* to *values*.
     13 - Tasks are described using rules, functions from parameters to tasks.
     14 - Each rule can choose its own caching policy, the default is a persistent cache keyed by hashes.
     15 - The current scheduler is a top-down (suspending) scheduler.
     16 
     17 make.py improves upon the paper's design in a few ways:
     18 
     19 - Task keys (for book-keeping purposes) are automatically derived from the rule functions.
     20 - Tasks are executed concurrently.
     21 - We split the two concepts Rebuilder and Scheduler into three concepts:
     22 
     23   - (Per-task) Caching policies.
     24   - (Global) Updating strategy.
     25   - (Global) Metadata updaters.
     26 
     27 # Why we re-interpret the concepts Rebuilder and Scheduler
     28 
     29 The paper merges the concept of "metadata updaters" in the Rebuilder and Scheduler.
     30 This sort of makes sense as different rebuilders and schedulers require different metadata.
     31 
     32 However, it means that a rebuilder may need to override the `fetch` function in a call
     33 in order to ensure the metadata required for the rebuilder is created,
     34 and it encourages a local way to build metadata information.
     35 Furthermore, a rebuilder may sometimes require the same metadata as a scheduler's fetch function,
     36 for instance tracking dependency relationships is required for both the
     37 topological sort scheduler as well as trace-based rebuilders (e.g. constructive trace rebuilder).
     38 
     39 So, we instead factor out the metadata updating portion of both rebuilders and schedulers
     40 into a global metadata updater, which can be viewed as yet another wrapper around rules.
     41 However, as this must apply on a global level to support the whole scheduling strategy,
     42 metadata updaters are defined at a global level, unlike the per-task caching policies.
     43 
     44 # TODO
     45 
     46 - Make files on the filesystem a core concept as opposed to merely something you can do.
     47 """
     48 
     49 import asyncio
     50 import collections
     51 import functools
     52 import hashlib
     53 import inspect
     54 import pickle
     55 import subprocess
     56 import sys
     57 import traceback
     58 
     59 from typing import (
     60     Any,
     61     Awaitable,
     62     Callable,
     63     Concatenate,
     64     Optional,
     65     ParamSpec,
     66     Protocol,
     67 )
     68 
     69 
     70 class Fetch(Protocol):
     71     """Protocol defining the fetch operation used by tasks."""
     72 
     73     async def __call__(self, task_or_rule: "Task | Rule") -> Any: ...
     74 
     75 
     76 RuleKey = bytes
     77 TaskKey = tuple
     78 ValueHash = bytes
     79 
     80 P = ParamSpec("P")
     81 RuleFn = Callable[Concatenate[Fetch, TaskKey, "Store", P], Awaitable[Any]]
     82 NiceRuleFn = Callable[Concatenate[Fetch, P], Awaitable[Any]]
     83 
     84 
     85 def make_hash(o: Any) -> bytes:
     86     if isinstance(o, bytes):
     87         h = hashlib.sha256(b"s")
     88         h.update(o)
     89     else:
     90         h = hashlib.sha256(b"r")
     91         h.update(repr(o).encode("utf-8"))
     92     return h.digest()
     93 
     94 
     95 def rule_fn_to_key(fn: Callable) -> RuleKey:
     96     name = fn.__name__
     97     source = inspect.getsource(fn)
     98     h = hashlib.sha256(source.encode("utf-8")).hexdigest()[:16]
     99     key = f"{name}-{len(source)}-{h}".encode("utf-8")
    100     return key
    101 
    102 
    103 class Task:
    104     """A computation of a value."""
    105 
    106     __slots__ = "task_key", "rule_fn", "args", "hash"
    107 
    108     task_key: TaskKey
    109     rule_fn: RuleFn
    110     args: tuple
    111     hash: int
    112 
    113     def __init__(self, task_key: TaskKey, rule_fn: RuleFn, *args):
    114         self.task_key = task_key
    115         self.rule_fn = rule_fn
    116         self.args = args
    117         self.hash = hash(self.task_key)
    118 
    119     def __call__(self, fetch: Fetch, store: "Store"):
    120         return self.rule_fn(fetch, self.task_key, store, *self.args)
    121 
    122     def __repr__(self) -> str:
    123         return repr(self.task_key)
    124 
    125     def __eq__(self, other: object) -> bool:
    126         if not isinstance(other, Task):
    127             return NotImplemented
    128         return self.task_key == other.task_key
    129 
    130     def __hash__(self) -> int:
    131         return self.hash
    132 
    133 
    134 class Rule:
    135     """A function that returns tasks."""
    136 
    137     __slots__ = "rule_key", "rule_fn", "hash"
    138 
    139     rule_key: RuleKey
    140     rule_fn: RuleFn
    141     hash: int
    142 
    143     @staticmethod
    144     def new(rule_fn: RuleFn):
    145         return Rule(
    146             rule_fn_to_key(rule_fn),
    147             rule_fn,
    148         )
    149 
    150     def __init__(self, rule_key: RuleKey, rule_fn: RuleFn):
    151         self.rule_key = rule_key
    152         self.rule_fn = rule_fn
    153         self.hash = hash(self.rule_key)
    154 
    155     def __call__(self, *args):
    156         return Task(
    157             (
    158                 self.rule_key,
    159                 *(
    160                     (
    161                         arg.task_key
    162                         if isinstance(arg, Task)
    163                         else arg().task_key if isinstance(arg, Rule) else arg
    164                     )
    165                     for arg in args
    166                 ),
    167             ),
    168             self.rule_fn,
    169             *args,
    170         )
    171 
    172     def __eq__(self, other):
    173         if not isinstance(other, Rule):
    174             return NotImplemented
    175         return self.rule_key == other.rule_key
    176 
    177     def __hash__(self):
    178         return self.hash
    179 
    180 
    181 def ensure_task(task_or_rule: Task | Rule) -> Task:
    182     if isinstance(task_or_rule, Rule):
    183         return task_or_rule()
    184     return task_or_rule
    185 
    186 
    187 def singleton(cls):
    188     cls.main = cls()
    189     return cls
    190 
    191 
    192 @singleton
    193 class Rules:
    194     """The registry of all rules created."""
    195 
    196     # Main registry
    197     main: "Rules"
    198 
    199     __slots__ = "rules"
    200 
    201     rules: dict[RuleKey, Rule]
    202 
    203     def __init__(self):
    204         self.rules = dict()
    205 
    206     def eval_task_key(self, task_key: TaskKey) -> Optional[Task]:
    207         rule_key, *arg_keys = task_key
    208         if rule_key not in self.rules:
    209             return None
    210         rule = self.rules[rule_key]
    211         args = []
    212         for arg in arg_keys:
    213             if isinstance(arg, tuple) and arg[0] not in self.rules:
    214                 return None
    215             args.append(self.eval_task_key(arg) if isinstance(arg, tuple) else arg)
    216         return rule(*args)
    217 
    218     @staticmethod
    219     def nice_rule_fn_to_rule_fn(nice_rule_fn, fetch, task_key, store, *args):
    220         return nice_rule_fn(fetch, *args)
    221 
    222     def rule(self, rule_fn: NiceRuleFn) -> Rule:
    223         return self.register(
    224             self.hash_cache(
    225                 Rule.new(
    226                     functools.update_wrapper(
    227                         functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn),
    228                         rule_fn,
    229                     )
    230                 )
    231             )
    232         )
    233 
    234     def rule_no_cache(self, rule_fn: NiceRuleFn) -> Rule:
    235         return self.register(
    236             Rule.new(
    237                 functools.update_wrapper(
    238                     functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn),
    239                     rule_fn,
    240                 )
    241             )
    242         )
    243 
    244     def register(self, rule: Rule) -> Rule:
    245         self.rules[rule.rule_key] = rule
    246         return rule
    247 
    248     def hash_cache(self, rule: Rule) -> Rule:
    249         """Adds hash based caching to a rule
    250 
    251         Attempts to replay the rule by checking if the hashes of each input
    252         it would have obtained if run now matches up with a previous run.
    253 
    254         Currently, there is no cache eviction policy (all previous runs are stored forever).
    255 
    256         TODO: Implement some cache eviction.
    257         """
    258         rule.rule_fn = functools.update_wrapper(
    259             functools.partial(Rules.hash_cache_fn, self, rule.rule_fn),
    260             rule.rule_fn,
    261         )
    262         return rule
    263 
    264     @staticmethod
    265     async def track_fetch(fetch: Fetch, new_inputs: list, task_or_rule: Task | Rule):
    266         task = ensure_task(task_or_rule)
    267         result = await fetch(task)
    268         new_inputs.append((task.task_key, make_hash(result)))
    269         return result
    270 
    271     async def hash_cache_fn(
    272         self,
    273         inner_rule_fn: RuleFn,
    274         fetch: Fetch,
    275         task_key: TaskKey,
    276         store: "Store",
    277         *args,
    278     ):
    279         """Actual implementation of hash_cache"""
    280         if task_key in store.key_info:
    281             past_runs = store.key_info[task_key]
    282             output_value = store.key_value[task_key]
    283             possible_values = []
    284             for past_inputs, past_value in past_runs:
    285                 for past_input_key, past_input_hash in past_inputs:
    286                     input_task = self.eval_task_key(past_input_key)
    287                     if not input_task:
    288                         break
    289                     current_input_value = await fetch(input_task)
    290                     if make_hash(current_input_value) != past_input_hash:
    291                         break
    292                 else:
    293                     if output_value == past_value:
    294                         return past_value
    295                     possible_values.append(past_value)
    296 
    297             if possible_values:
    298                 return possible_values[0]
    299 
    300         new_inputs = []
    301 
    302         new_value = await inner_rule_fn(
    303             functools.partial(Rules.track_fetch, fetch, new_inputs),
    304             task_key,
    305             store,
    306             *args,
    307         )
    308         store.key_info[task_key].append((new_inputs, new_value))
    309         return new_value
    310 
    311 
    312 # Rules.main = Rules()
    313 rule = Rules.main.rule
    314 rule_no_cache = Rules.main.rule_no_cache
    315 register = Rules.main.register
    316 hash_cache = Rules.main.hash_cache
    317 
    318 
    319 class Store:
    320     """Stores a mapping from tasks to their values."""
    321 
    322     __slots__ = "filename", "rules", "key_value", "key_info"
    323 
    324     @staticmethod
    325     def _fNone():
    326         return None
    327 
    328     def __init__(self, filename, rules):
    329         self.filename = filename
    330         self.rules = rules
    331 
    332         self.key_value = collections.defaultdict(Store._fNone)
    333         self.key_info = collections.defaultdict(list)
    334 
    335         try:
    336             with open(filename, "rb") as f:
    337                 self.key_value, self.key_info = pickle.load(f)
    338         except:
    339             pass
    340 
    341     def save(self):
    342         with open(self.filename, "wb") as f:
    343             pickle.dump((self.key_value, self.key_info), f)
    344 
    345     def __enter__(self):
    346         return self
    347 
    348     def __exit__(self, exc_type, exc_val, exc_tb):
    349         self.save()
    350 
    351 
    352 class Detach:
    353     __slots__ = "_background_tasks"
    354 
    355     def __init__(self):
    356         self._background_tasks = set()
    357 
    358     def __call__(self, awaitable):
    359         if asyncio.coroutines.iscoroutine(awaitable):
    360             task = asyncio.create_task(awaitable)
    361             self._background_tasks.add(task)
    362             task.add_done_callback(self._background_tasks.discard)
    363             return task
    364         return awaitable
    365 
    366     async def wait(self):
    367         while self._background_tasks:
    368             t = self._background_tasks.pop()
    369             if not t.done():
    370                 await t
    371 
    372 
    373 detach = Detach()
    374 
    375 
    376 class SuspendingScheduler:
    377     __slots__ = "store", "done", "waits"
    378     store: Store
    379     done: set[TaskKey]
    380     waits: dict[TaskKey, asyncio.Event]
    381 
    382     def __init__(self, store: Store):
    383         self.store = store
    384         self.done = set()
    385         self.waits = dict()
    386 
    387     def build(self, *tasks: Task):
    388         return asyncio.gather(*(self.fetch_once(task) for task in tasks))
    389 
    390     async def fetch_once(self, task_or_rule: Task | Rule):
    391         task = ensure_task(task_or_rule)
    392         task_key = task.task_key
    393         wait = None
    394         event = None
    395         if task_key in self.done:
    396             return self.store.key_value[task_key]
    397         if task_key in self.waits:
    398             wait = self.waits[task_key]
    399 
    400         if wait:
    401             await wait.wait()
    402             return self.store.key_value[task_key]
    403 
    404         event = self.waits[task_key] = asyncio.Event()
    405         try:
    406             self.store.key_value[task_key] = result = await task(
    407                 self.fetch_once, self.store
    408             )
    409         except:
    410             print(traceback.format_exc())
    411             event.set()
    412             self.store.key_value[task_key] = None
    413             return None
    414 
    415         self.done.add(task_key)
    416         event.set()
    417 
    418         return result
    419 
    420 
    421 class Build:
    422     __slots__ = "_store", "_scheduler"
    423 
    424     def __init__(self, filename, rules=Rules.main):
    425         self._store = Store(filename, rules)
    426         self._scheduler = SuspendingScheduler(self._store)
    427 
    428     async def __call__(self, *tasks: Task):
    429         result = await self.build(*tasks)
    430         await detach.wait()
    431         return result
    432 
    433     def build(self, *tasks: Task):
    434         return self._scheduler.build(*tasks)
    435 
    436     def __enter__(self):
    437         self._store.__enter__()
    438         return self
    439 
    440     def __exit__(self, exc_type, exc_val, exc_tb):
    441         self._store.__exit__(exc_type, exc_val, exc_tb)
    442 
    443 
    444 def build(*tasks, filename=".makedb", rules=Rules.main):
    445     with Build(filename, rules) as build:
    446         asyncio.run(build(*tasks))
    447 
    448 
    449 class ShellResult(collections.namedtuple("ShellResult", "stdout stderr returncode")):
    450     __slots__ = ()
    451 
    452     @property
    453     def utf8stdout(self):
    454         return self.stdout.decode("utf-8")
    455 
    456     @property
    457     def utf8stderr(self):
    458         return self.stderr.decode("utf-8")
    459 
    460 
    461 EchoNothing = 0
    462 EchoStdout = 1
    463 EchoStderr = 2
    464 EchoAll = 3
    465 
    466 
    467 async def _exec_reader(istream, ostream, echo: Any = False):
    468     contents = b""
    469     async for chunk in istream:
    470         contents += chunk
    471         if echo:
    472             ostream.write(chunk)
    473             ostream.flush()
    474     return contents
    475 
    476 
    477 async def exec(
    478     program,
    479     *args,
    480     input: bytes | bytearray | memoryview | None = None,
    481     echo: int = EchoNothing,
    482 ) -> ShellResult:
    483 
    484     proc = await asyncio.create_subprocess_exec(
    485         program,
    486         *args,
    487         stdin=subprocess.PIPE,
    488         stdout=subprocess.PIPE,
    489         stderr=subprocess.PIPE,
    490     )
    491 
    492     if input is not None:
    493         proc.stdin.write(input)  # type: ignore
    494         _, stdout, stderr, returncode = await asyncio.gather(
    495             proc.stdin.drain(),  # type: ignore
    496             _exec_reader(proc.stdout, sys.stdout.buffer, echo=echo & EchoStdout),
    497             _exec_reader(proc.stderr, sys.stderr.buffer, echo=echo & EchoStderr),
    498             proc.wait(),
    499         )
    500     else:
    501         stdout, stderr, returncode = await asyncio.gather(
    502             _exec_reader(proc.stdout, sys.stdout.buffer, echo=echo & EchoStdout),
    503             _exec_reader(proc.stderr, sys.stderr.buffer, echo=echo & EchoStderr),
    504             proc.wait(),
    505         )
    506 
    507     return ShellResult(stdout, stderr, returncode)
    508 
    509 
    510 async def shell(
    511     cmd,
    512     input: bytes | bytearray | memoryview | None = None,
    513     echo: int = EchoNothing,
    514 ) -> ShellResult:
    515     proc = await asyncio.create_subprocess_shell(
    516         cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    517     )
    518     stdout, stderr = await proc.communicate(input)
    519     if echo & EchoStdout:
    520         sys.stdout.buffer.write(stdout)
    521         sys.stdout.buffer.flush()
    522     if echo & EchoStderr:
    523         sys.stderr.buffer.write(stderr)
    524         sys.stderr.buffer.flush()
    525     return ShellResult(stdout, stderr, proc.returncode)
    526 
    527 
    528 def run_in_executor(f, *args, executor=None):
    529     return asyncio.get_running_loop().run_in_executor(executor, f, *args)
    530 
    531 
    532 async def async_main(globals, filename=".makedb", default_target="all"):
    533     targets = sys.argv[1:]
    534     if not targets:
    535         targets.append(default_target)
    536 
    537     with Build(filename) as build:
    538         await build(*(eval(target, globals=globals) for target in targets))
    539         return 0
    540 
    541 
    542 def main(globals, filename=".makedb", default_target="all"):
    543     targets = sys.argv[1:]
    544     if not targets:
    545         targets.append(default_target)
    546 
    547     with Build(filename) as build:
    548         asyncio.run(build(*(eval(target, globals=globals) for target in targets)))
    549         return 0
    550 
    551 
    552 # class AsyncWrapperSpec:
    553 #     __slots__ = "async_methods", "async_subobjects"
    554 
    555 #     def __init__(
    556 #         self,
    557 #         async_methods=set(),
    558 #         async_subobjects=dict(),
    559 #     ):
    560 #         self.async_methods = set(async_methods)
    561 #         self.async_subobjects = dict(async_subobjects)
    562 
    563 
    564 # class AsyncWrapper:
    565 #     __slots__ = "_obj", "_spec", "_executor"
    566 
    567 #     def __init__(self, obj, spec=AsyncWrapperSpec(), executor=None):
    568 #         self._obj = obj
    569 #         self._spec = spec
    570 #         self._executor = executor
    571 
    572 #     @staticmethod
    573 #     def wrapper(f, executor, *args):
    574 #         return run_in_executor(f, *args, executor=executor)
    575 
    576 #     def __getattr__(self, attr):
    577 #         if attr in self._spec.async_methods:
    578 #             return functools.partial(
    579 #                 self.wrapper, getattr(self._obj, attr), self._executor
    580 #             )
    581 #         if attr in self._spec.async_subobjects:
    582 #             return AsyncWrapper(
    583 #                 getattr(self._obj, attr),
    584 #                 spec=self._spec.async_subobjects[attr],
    585 #                 executor=self._executor,
    586 #             )
    587 #         return getattr(self._obj, attr)
    588 
    589 #     async def __aenter__(self):
    590 #         return AsyncWrapper(
    591 #             await run_in_executor(self._obj.__enter__, executor=self._executor),
    592 #             spec=self._spec,
    593 #         )
    594 
    595 #     async def __aexit__(self, exc_type, exc_val, exc_tb):
    596 #         return await run_in_executor(
    597 #             self._obj.__exit__, exc_type, exc_val, exc_tb, executor=self._executor
    598 #         )
    599 
    600 #     def __aiter__(self):
    601 #         return AsyncWrapper(self._obj.__iter__(), spec=self._spec)
    602 
    603 #     @staticmethod
    604 #     def wrapped_next(obj):
    605 #         try:
    606 #             return True, next(obj)
    607 #         except StopIteration:
    608 #             return False, None
    609 
    610 #     async def __anext__(self):
    611 #         ok, res = await run_in_executor(
    612 #             functools.partial(self.wrapped_next, self._obj), executor=self._executor
    613 #         )
    614 #         if not ok:
    615 #             raise StopAsyncIteration
    616 #         return res
    617 
    618 #     @staticmethod
    619 #     def wrapped_foreach(f, obj):
    620 #         for chunk in obj:
    621 #             f(chunk)
    622 
    623 #     async def foreach(self, f):
    624 #         await run_in_executor(
    625 #             functools.partial(self.wrapped_foreach, f, self._obj),
    626 #             executor=self._executor,
    627 #         )
    628 
    629 
    630 # class AsyncIO(Protocol):
    631 #     async def __aenter__(self) -> "AsyncIO": ...
    632 #     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
    633 
    634 #     async def close(self) -> None: ...
    635 #     def fileno(self) -> int: ...
    636 #     async def flush(self) -> None: ...
    637 #     def isatty(self) -> bool: ...
    638 #     def readable(self) -> bool: ...
    639 #     async def readlines(self, hint: int = -1, /) -> list[bytes]: ...
    640 #     async def seek(self, offset: int, whence: int = 0, /) -> int: ...
    641 #     def seekable(self) -> bool: ...
    642 #     async def tell(self) -> int: ...
    643 #     async def truncate(self, size: int | None = None, /) -> int: ...
    644 #     def writable(self) -> bool: ...
    645 #     async def writelines(self, lines, /) -> None: ...
    646 #     async def readline(self, size: int | None = -1, /) -> bytes: ...
    647 #     @property
    648 #     def closed(self) -> bool: ...
    649 #     async def readall(self) -> bytes: ...
    650 #     async def readinto(self, buffer, /) -> Any: ...
    651 #     async def write(self, b, /) -> Any: ...
    652 #     async def read(self, size: int = -1, /) -> Any: ...
    653 #     def detach(self) -> "AsyncIO": ...
    654 #     async def readinto1(self, buffer, /) -> int: ...
    655 #     async def read1(self, size: int = -1, /) -> bytes: ...
    656 
    657 #     mode: str
    658 #     name: Any
    659 
    660 #     @property
    661 #     def closefd(self) -> bool: ...
    662 
    663 #     raw: "AsyncIO"
    664 
    665 #     async def peek(self, size: int = 0, /) -> bytes: ...
    666 
    667 #     encoding: str
    668 #     errors: str | None
    669 #     newlines: str | tuple[str, ...] | None
    670 
    671 #     def __aiter__(self) -> AsyncIterator[Any]: ...
    672 #     async def __anext__(self) -> Any: ...
    673 
    674 #     async def foreach(self, f) -> Any: ...
    675 
    676 
    677 # async def open_async(*args, executor=None) -> AsyncIO:
    678 #     # List of methods: https://docs.python.org/3/library/io.html
    679 #     async_methods = (
    680 #         "close",
    681 #         "detach",
    682 #         "flush",
    683 #         "peek",
    684 #         "read",
    685 #         "read1",
    686 #         "readall",
    687 #         "readinto",
    688 #         "readinto1",
    689 #         "readline",
    690 #         "readlines",
    691 #         "seek",
    692 #         "tell",
    693 #         "truncate",
    694 #         "write",
    695 #         "writelines",
    696 #     )
    697 #     return AsyncWrapper(
    698 #         await run_in_executor(open, *args, executor=executor),
    699 #         AsyncWrapperSpec(async_methods, {"buffer": AsyncWrapperSpec(async_methods)}),
    700 #     )  # type: ignore