__init__.py (20151B)
1 """ 2 make.py 3 ------- 4 5 Design inspired by the paper "Build Systems à la Carte" 6 7 - https://github.com/snowleopard/build 8 - https://www.microsoft.com/en-us/research/wp-content/uploads/2018/03/build-systems.pdf 9 10 Key concepts: 11 12 - The goal is to maintain an up-to-date *store* mapping *tasks* to *values*. 13 - Tasks are described using rules, functions from parameters to tasks. 14 - Each rule can choose its own caching policy, the default is a persistent cache keyed by hashes. 15 - The current scheduler is a top-down (suspending) scheduler. 16 17 make.py improves upon the paper's design in a few ways: 18 19 - Task keys (for book-keeping purposes) are automatically derived from the rule functions. 20 - Tasks are executed concurrently. 21 - We split the two concepts Rebuilder and Scheduler into three concepts: 22 23 - (Per-task) Caching policies. 24 - (Global) Updating strategy. 25 - (Global) Metadata updaters. 26 27 # Why we re-interpret the concepts Rebuilder and Scheduler 28 29 The paper merges the concept of "metadata updaters" in the Rebuilder and Scheduler. 30 This sort of makes sense as different rebuilders and schedulers require different metadata. 31 32 However, it means that a rebuilder may need to override the `fetch` function in a call 33 in order to ensure the metadata required for the rebuilder is created, 34 and it encourages a local way to build metadata information. 35 Furthermore, a rebuilder may sometimes require the same metadata as a scheduler's fetch function, 36 for instance tracking dependency relationships is required for both the 37 topological sort scheduler as well as trace-based rebuilders (e.g. constructive trace rebuilder). 38 39 So, we instead factor out the metadata updating portion of both rebuilders and schedulers 40 into a global metadata updater, which can be viewed as yet another wrapper around rules. 41 However, as this must apply on a global level to support the whole scheduling strategy, 42 metadata updaters are defined at a global level, unlike the per-task caching policies. 43 44 # TODO 45 46 - Make files on the filesystem a core concept as opposed to merely something you can do. 47 """ 48 49 import asyncio 50 import collections 51 import functools 52 import hashlib 53 import inspect 54 import pickle 55 import subprocess 56 import sys 57 import traceback 58 59 from typing import ( 60 Any, 61 Awaitable, 62 Callable, 63 Concatenate, 64 Optional, 65 ParamSpec, 66 Protocol, 67 ) 68 69 70 class Fetch(Protocol): 71 """Protocol defining the fetch operation used by tasks.""" 72 73 async def __call__(self, task_or_rule: "Task | Rule") -> Any: ... 74 75 76 RuleKey = bytes 77 TaskKey = tuple 78 ValueHash = bytes 79 80 P = ParamSpec("P") 81 RuleFn = Callable[Concatenate[Fetch, TaskKey, "Store", P], Awaitable[Any]] 82 NiceRuleFn = Callable[Concatenate[Fetch, P], Awaitable[Any]] 83 84 85 def make_hash(o: Any) -> bytes: 86 if isinstance(o, bytes): 87 h = hashlib.sha256(b"s") 88 h.update(o) 89 else: 90 h = hashlib.sha256(b"r") 91 h.update(repr(o).encode("utf-8")) 92 return h.digest() 93 94 95 def rule_fn_to_key(fn: Callable) -> RuleKey: 96 name = fn.__name__ 97 source = inspect.getsource(fn) 98 h = hashlib.sha256(source.encode("utf-8")).hexdigest()[:16] 99 key = f"{name}-{len(source)}-{h}".encode("utf-8") 100 return key 101 102 103 class Task: 104 """A computation of a value.""" 105 106 __slots__ = "task_key", "rule_fn", "args", "hash" 107 108 task_key: TaskKey 109 rule_fn: RuleFn 110 args: tuple 111 hash: int 112 113 def __init__(self, task_key: TaskKey, rule_fn: RuleFn, *args): 114 self.task_key = task_key 115 self.rule_fn = rule_fn 116 self.args = args 117 self.hash = hash(self.task_key) 118 119 def __call__(self, fetch: Fetch, store: "Store"): 120 return self.rule_fn(fetch, self.task_key, store, *self.args) 121 122 def __repr__(self) -> str: 123 return repr(self.task_key) 124 125 def __eq__(self, other: object) -> bool: 126 if not isinstance(other, Task): 127 return NotImplemented 128 return self.task_key == other.task_key 129 130 def __hash__(self) -> int: 131 return self.hash 132 133 134 class Rule: 135 """A function that returns tasks.""" 136 137 __slots__ = "rule_key", "rule_fn", "hash" 138 139 rule_key: RuleKey 140 rule_fn: RuleFn 141 hash: int 142 143 @staticmethod 144 def new(rule_fn: RuleFn): 145 return Rule( 146 rule_fn_to_key(rule_fn), 147 rule_fn, 148 ) 149 150 def __init__(self, rule_key: RuleKey, rule_fn: RuleFn): 151 self.rule_key = rule_key 152 self.rule_fn = rule_fn 153 self.hash = hash(self.rule_key) 154 155 def __call__(self, *args): 156 return Task( 157 ( 158 self.rule_key, 159 *( 160 ( 161 arg.task_key 162 if isinstance(arg, Task) 163 else arg().task_key if isinstance(arg, Rule) else arg 164 ) 165 for arg in args 166 ), 167 ), 168 self.rule_fn, 169 *args, 170 ) 171 172 def __eq__(self, other): 173 if not isinstance(other, Rule): 174 return NotImplemented 175 return self.rule_key == other.rule_key 176 177 def __hash__(self): 178 return self.hash 179 180 181 def ensure_task(task_or_rule: Task | Rule) -> Task: 182 if isinstance(task_or_rule, Rule): 183 return task_or_rule() 184 return task_or_rule 185 186 187 def singleton(cls): 188 cls.main = cls() 189 return cls 190 191 192 @singleton 193 class Rules: 194 """The registry of all rules created.""" 195 196 # Main registry 197 main: "Rules" 198 199 __slots__ = "rules" 200 201 rules: dict[RuleKey, Rule] 202 203 def __init__(self): 204 self.rules = dict() 205 206 def eval_task_key(self, task_key: TaskKey) -> Optional[Task]: 207 rule_key, *arg_keys = task_key 208 if rule_key not in self.rules: 209 return None 210 rule = self.rules[rule_key] 211 args = [] 212 for arg in arg_keys: 213 if isinstance(arg, tuple) and arg[0] not in self.rules: 214 return None 215 args.append(self.eval_task_key(arg) if isinstance(arg, tuple) else arg) 216 return rule(*args) 217 218 @staticmethod 219 def nice_rule_fn_to_rule_fn(nice_rule_fn, fetch, task_key, store, *args): 220 return nice_rule_fn(fetch, *args) 221 222 def rule(self, rule_fn: NiceRuleFn) -> Rule: 223 return self.register( 224 self.hash_cache( 225 Rule.new( 226 functools.update_wrapper( 227 functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn), 228 rule_fn, 229 ) 230 ) 231 ) 232 ) 233 234 def rule_no_cache(self, rule_fn: NiceRuleFn) -> Rule: 235 return self.register( 236 Rule.new( 237 functools.update_wrapper( 238 functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn), 239 rule_fn, 240 ) 241 ) 242 ) 243 244 def register(self, rule: Rule) -> Rule: 245 self.rules[rule.rule_key] = rule 246 return rule 247 248 def hash_cache(self, rule: Rule) -> Rule: 249 """Adds hash based caching to a rule 250 251 Attempts to replay the rule by checking if the hashes of each input 252 it would have obtained if run now matches up with a previous run. 253 254 Currently, there is no cache eviction policy (all previous runs are stored forever). 255 256 TODO: Implement some cache eviction. 257 """ 258 rule.rule_fn = functools.update_wrapper( 259 functools.partial(Rules.hash_cache_fn, self, rule.rule_fn), 260 rule.rule_fn, 261 ) 262 return rule 263 264 @staticmethod 265 async def track_fetch(fetch: Fetch, new_inputs: list, task_or_rule: Task | Rule): 266 task = ensure_task(task_or_rule) 267 result = await fetch(task) 268 new_inputs.append((task.task_key, make_hash(result))) 269 return result 270 271 async def hash_cache_fn( 272 self, 273 inner_rule_fn: RuleFn, 274 fetch: Fetch, 275 task_key: TaskKey, 276 store: "Store", 277 *args, 278 ): 279 """Actual implementation of hash_cache""" 280 if task_key in store.key_info: 281 past_runs = store.key_info[task_key] 282 output_value = store.key_value[task_key] 283 possible_values = [] 284 for past_inputs, past_value in past_runs: 285 for past_input_key, past_input_hash in past_inputs: 286 input_task = self.eval_task_key(past_input_key) 287 if not input_task: 288 break 289 current_input_value = await fetch(input_task) 290 if make_hash(current_input_value) != past_input_hash: 291 break 292 else: 293 if output_value == past_value: 294 return past_value 295 possible_values.append(past_value) 296 297 if possible_values: 298 return possible_values[0] 299 300 new_inputs = [] 301 302 new_value = await inner_rule_fn( 303 functools.partial(Rules.track_fetch, fetch, new_inputs), 304 task_key, 305 store, 306 *args, 307 ) 308 store.key_info[task_key].append((new_inputs, new_value)) 309 return new_value 310 311 312 # Rules.main = Rules() 313 rule = Rules.main.rule 314 rule_no_cache = Rules.main.rule_no_cache 315 register = Rules.main.register 316 hash_cache = Rules.main.hash_cache 317 318 319 class Store: 320 """Stores a mapping from tasks to their values.""" 321 322 __slots__ = "filename", "rules", "key_value", "key_info" 323 324 @staticmethod 325 def _fNone(): 326 return None 327 328 def __init__(self, filename, rules): 329 self.filename = filename 330 self.rules = rules 331 332 self.key_value = collections.defaultdict(Store._fNone) 333 self.key_info = collections.defaultdict(list) 334 335 try: 336 with open(filename, "rb") as f: 337 self.key_value, self.key_info = pickle.load(f) 338 except: 339 pass 340 341 def save(self): 342 with open(self.filename, "wb") as f: 343 pickle.dump((self.key_value, self.key_info), f) 344 345 def __enter__(self): 346 return self 347 348 def __exit__(self, exc_type, exc_val, exc_tb): 349 self.save() 350 351 352 class Detach: 353 __slots__ = "_background_tasks" 354 355 def __init__(self): 356 self._background_tasks = set() 357 358 def __call__(self, awaitable): 359 if asyncio.coroutines.iscoroutine(awaitable): 360 task = asyncio.create_task(awaitable) 361 self._background_tasks.add(task) 362 task.add_done_callback(self._background_tasks.discard) 363 return task 364 return awaitable 365 366 async def wait(self): 367 while self._background_tasks: 368 t = self._background_tasks.pop() 369 if not t.done(): 370 await t 371 372 373 detach = Detach() 374 375 376 class SuspendingScheduler: 377 __slots__ = "store", "done", "waits" 378 store: Store 379 done: set[TaskKey] 380 waits: dict[TaskKey, asyncio.Event] 381 382 def __init__(self, store: Store): 383 self.store = store 384 self.done = set() 385 self.waits = dict() 386 387 def build(self, *tasks: Task): 388 return asyncio.gather(*(self.fetch_once(task) for task in tasks)) 389 390 async def fetch_once(self, task_or_rule: Task | Rule): 391 task = ensure_task(task_or_rule) 392 task_key = task.task_key 393 wait = None 394 event = None 395 if task_key in self.done: 396 return self.store.key_value[task_key] 397 if task_key in self.waits: 398 wait = self.waits[task_key] 399 400 if wait: 401 await wait.wait() 402 return self.store.key_value[task_key] 403 404 event = self.waits[task_key] = asyncio.Event() 405 try: 406 self.store.key_value[task_key] = result = await task( 407 self.fetch_once, self.store 408 ) 409 except: 410 print(traceback.format_exc()) 411 event.set() 412 self.store.key_value[task_key] = None 413 return None 414 415 self.done.add(task_key) 416 event.set() 417 418 return result 419 420 421 class Build: 422 __slots__ = "_store", "_scheduler" 423 424 def __init__(self, filename, rules=Rules.main): 425 self._store = Store(filename, rules) 426 self._scheduler = SuspendingScheduler(self._store) 427 428 async def __call__(self, *tasks: Task): 429 result = await self.build(*tasks) 430 await detach.wait() 431 return result 432 433 def build(self, *tasks: Task): 434 return self._scheduler.build(*tasks) 435 436 def __enter__(self): 437 self._store.__enter__() 438 return self 439 440 def __exit__(self, exc_type, exc_val, exc_tb): 441 self._store.__exit__(exc_type, exc_val, exc_tb) 442 443 444 def build(*tasks, filename=".makedb", rules=Rules.main): 445 with Build(filename, rules) as build: 446 asyncio.run(build(*tasks)) 447 448 449 class ShellResult(collections.namedtuple("ShellResult", "stdout stderr returncode")): 450 __slots__ = () 451 452 @property 453 def utf8stdout(self): 454 return self.stdout.decode("utf-8") 455 456 @property 457 def utf8stderr(self): 458 return self.stderr.decode("utf-8") 459 460 461 EchoNothing = 0 462 EchoStdout = 1 463 EchoStderr = 2 464 EchoAll = 3 465 466 467 async def _exec_reader(istream, ostream, echo: Any = False): 468 contents = b"" 469 async for chunk in istream: 470 contents += chunk 471 if echo: 472 ostream.write(chunk) 473 ostream.flush() 474 return contents 475 476 477 async def exec( 478 program, 479 *args, 480 input: bytes | bytearray | memoryview | None = None, 481 echo: int = EchoNothing, 482 ) -> ShellResult: 483 484 proc = await asyncio.create_subprocess_exec( 485 program, 486 *args, 487 stdin=subprocess.PIPE, 488 stdout=subprocess.PIPE, 489 stderr=subprocess.PIPE, 490 ) 491 492 if input is not None: 493 proc.stdin.write(input) # type: ignore 494 _, stdout, stderr, returncode = await asyncio.gather( 495 proc.stdin.drain(), # type: ignore 496 _exec_reader(proc.stdout, sys.stdout.buffer, echo=echo & EchoStdout), 497 _exec_reader(proc.stderr, sys.stderr.buffer, echo=echo & EchoStderr), 498 proc.wait(), 499 ) 500 else: 501 stdout, stderr, returncode = await asyncio.gather( 502 _exec_reader(proc.stdout, sys.stdout.buffer, echo=echo & EchoStdout), 503 _exec_reader(proc.stderr, sys.stderr.buffer, echo=echo & EchoStderr), 504 proc.wait(), 505 ) 506 507 return ShellResult(stdout, stderr, returncode) 508 509 510 async def shell( 511 cmd, 512 input: bytes | bytearray | memoryview | None = None, 513 echo: int = EchoNothing, 514 ) -> ShellResult: 515 proc = await asyncio.create_subprocess_shell( 516 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE 517 ) 518 stdout, stderr = await proc.communicate(input) 519 if echo & EchoStdout: 520 sys.stdout.buffer.write(stdout) 521 sys.stdout.buffer.flush() 522 if echo & EchoStderr: 523 sys.stderr.buffer.write(stderr) 524 sys.stderr.buffer.flush() 525 return ShellResult(stdout, stderr, proc.returncode) 526 527 528 def run_in_executor(f, *args, executor=None): 529 return asyncio.get_running_loop().run_in_executor(executor, f, *args) 530 531 532 async def async_main(globals, filename=".makedb", default_target="all"): 533 targets = sys.argv[1:] 534 if not targets: 535 targets.append(default_target) 536 537 with Build(filename) as build: 538 await build(*(eval(target, globals=globals) for target in targets)) 539 return 0 540 541 542 def main(globals, filename=".makedb", default_target="all"): 543 targets = sys.argv[1:] 544 if not targets: 545 targets.append(default_target) 546 547 with Build(filename) as build: 548 asyncio.run(build(*(eval(target, globals=globals) for target in targets))) 549 return 0 550 551 552 # class AsyncWrapperSpec: 553 # __slots__ = "async_methods", "async_subobjects" 554 555 # def __init__( 556 # self, 557 # async_methods=set(), 558 # async_subobjects=dict(), 559 # ): 560 # self.async_methods = set(async_methods) 561 # self.async_subobjects = dict(async_subobjects) 562 563 564 # class AsyncWrapper: 565 # __slots__ = "_obj", "_spec", "_executor" 566 567 # def __init__(self, obj, spec=AsyncWrapperSpec(), executor=None): 568 # self._obj = obj 569 # self._spec = spec 570 # self._executor = executor 571 572 # @staticmethod 573 # def wrapper(f, executor, *args): 574 # return run_in_executor(f, *args, executor=executor) 575 576 # def __getattr__(self, attr): 577 # if attr in self._spec.async_methods: 578 # return functools.partial( 579 # self.wrapper, getattr(self._obj, attr), self._executor 580 # ) 581 # if attr in self._spec.async_subobjects: 582 # return AsyncWrapper( 583 # getattr(self._obj, attr), 584 # spec=self._spec.async_subobjects[attr], 585 # executor=self._executor, 586 # ) 587 # return getattr(self._obj, attr) 588 589 # async def __aenter__(self): 590 # return AsyncWrapper( 591 # await run_in_executor(self._obj.__enter__, executor=self._executor), 592 # spec=self._spec, 593 # ) 594 595 # async def __aexit__(self, exc_type, exc_val, exc_tb): 596 # return await run_in_executor( 597 # self._obj.__exit__, exc_type, exc_val, exc_tb, executor=self._executor 598 # ) 599 600 # def __aiter__(self): 601 # return AsyncWrapper(self._obj.__iter__(), spec=self._spec) 602 603 # @staticmethod 604 # def wrapped_next(obj): 605 # try: 606 # return True, next(obj) 607 # except StopIteration: 608 # return False, None 609 610 # async def __anext__(self): 611 # ok, res = await run_in_executor( 612 # functools.partial(self.wrapped_next, self._obj), executor=self._executor 613 # ) 614 # if not ok: 615 # raise StopAsyncIteration 616 # return res 617 618 # @staticmethod 619 # def wrapped_foreach(f, obj): 620 # for chunk in obj: 621 # f(chunk) 622 623 # async def foreach(self, f): 624 # await run_in_executor( 625 # functools.partial(self.wrapped_foreach, f, self._obj), 626 # executor=self._executor, 627 # ) 628 629 630 # class AsyncIO(Protocol): 631 # async def __aenter__(self) -> "AsyncIO": ... 632 # async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ... 633 634 # async def close(self) -> None: ... 635 # def fileno(self) -> int: ... 636 # async def flush(self) -> None: ... 637 # def isatty(self) -> bool: ... 638 # def readable(self) -> bool: ... 639 # async def readlines(self, hint: int = -1, /) -> list[bytes]: ... 640 # async def seek(self, offset: int, whence: int = 0, /) -> int: ... 641 # def seekable(self) -> bool: ... 642 # async def tell(self) -> int: ... 643 # async def truncate(self, size: int | None = None, /) -> int: ... 644 # def writable(self) -> bool: ... 645 # async def writelines(self, lines, /) -> None: ... 646 # async def readline(self, size: int | None = -1, /) -> bytes: ... 647 # @property 648 # def closed(self) -> bool: ... 649 # async def readall(self) -> bytes: ... 650 # async def readinto(self, buffer, /) -> Any: ... 651 # async def write(self, b, /) -> Any: ... 652 # async def read(self, size: int = -1, /) -> Any: ... 653 # def detach(self) -> "AsyncIO": ... 654 # async def readinto1(self, buffer, /) -> int: ... 655 # async def read1(self, size: int = -1, /) -> bytes: ... 656 657 # mode: str 658 # name: Any 659 660 # @property 661 # def closefd(self) -> bool: ... 662 663 # raw: "AsyncIO" 664 665 # async def peek(self, size: int = 0, /) -> bytes: ... 666 667 # encoding: str 668 # errors: str | None 669 # newlines: str | tuple[str, ...] | None 670 671 # def __aiter__(self) -> AsyncIterator[Any]: ... 672 # async def __anext__(self) -> Any: ... 673 674 # async def foreach(self, f) -> Any: ... 675 676 677 # async def open_async(*args, executor=None) -> AsyncIO: 678 # # List of methods: https://docs.python.org/3/library/io.html 679 # async_methods = ( 680 # "close", 681 # "detach", 682 # "flush", 683 # "peek", 684 # "read", 685 # "read1", 686 # "readall", 687 # "readinto", 688 # "readinto1", 689 # "readline", 690 # "readlines", 691 # "seek", 692 # "tell", 693 # "truncate", 694 # "write", 695 # "writelines", 696 # ) 697 # return AsyncWrapper( 698 # await run_in_executor(open, *args, executor=executor), 699 # AsyncWrapperSpec(async_methods, {"buffer": AsyncWrapperSpec(async_methods)}), 700 # ) # type: ignore