pymake

A build system based on Build Systems à la Carte
git clone https://git.grace.moe/pymake
Log | Files | Refs | README

commit e79043a791765498026ab48c3f7f7de80e1f5c6a
parent 78628084d9fa30bc26b45f8fcca61fbf4d9c88fa
Author: gracefu <81774659+gracefuu@users.noreply.github.com>
Date:   Wed, 16 Apr 2025 17:35:43 +0800

Do packaging stuff

Diffstat:
M.gitignore | 5+++--
Dbench.py | 85-------------------------------------------------------------------------------
Dexamples.py | 93-------------------------------------------------------------------------------
Aexamples/examples.py | 96+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dmake.py | 606-------------------------------------------------------------------------------
Amake/__init__.py | 679+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Apackage.py | 100+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Apyproject.toml | 8++++++++
8 files changed, 886 insertions(+), 786 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,2 +1,3 @@ -make.db -__pycache__ +__pycache__/ +dist/ +.makedb diff --git a/bench.py b/bench.py @@ -1,85 +0,0 @@ -import time -import functools - - -def f1(i): - def g(): - return i + 1 - - return g - - -def g_expanded(i): - return i + 1 - - -def f2(i): - return functools.partial(g_expanded, i) - - -class f3: - __slots__ = "i" - - def __init__(self, i): - self.i = i - - def __call__(self): - return self.i + 1 - - -class f4(tuple): - def __call__(self): - return self[0] + 1 - - -def main(N=2000000, WARM=200000): - for _ in range(5): - x1 = [f1(i) for i in range(WARM)] - x2 = [f2(i) for i in range(WARM)] - x3 = [f3(i) for i in range(WARM)] - x4 = [f4((i,)) for i in range(WARM)] - - [f() for f in x1] - [f() for f in x2] - [f() for f in x3] - [f() for f in x4] - - start = time.time() - x1 = [f1(i) for i in range(N)] - print(time.time() - start) - - start = time.time() - x2 = [f2(i) for i in range(N)] - print(time.time() - start) - - start = time.time() - x3 = [f3(i) for i in range(N)] - print(time.time() - start) - - start = time.time() - x4 = [f4((i,)) for i in range(N)] - print(time.time() - start) - - print("----") - - start = time.time() - [f() for f in x1] - print(time.time() - start) - - start = time.time() - [f() for f in x2] - print(time.time() - start) - - start = time.time() - [f() for f in x3] - print(time.time() - start) - - start = time.time() - [f() for f in x4] - print(time.time() - start) - - print("====") - - -if __name__ == "__main__": - main() diff --git a/examples.py b/examples.py @@ -1,93 +0,0 @@ -from make import hash_cache, rule, detach, Fetch, Task, Build -import asyncio - -# Example rules -# Observe the general pattern that every rule is called to get a task, which can then be fetched. -# res = await fetch(rule(task_args...)) - - -@hash_cache -@rule -async def _eg_six(fetch: Fetch): - _ = fetch - six = 6 - print(f"{six=}") - return six - - -@rule -async def _eg_thirtysix(fetch: Fetch): - # Here we await the dependencies serially. - # The second dependency cannot start until the first finishes. - six1 = await fetch(_eg_six()) - six2 = await fetch(_eg_six()) - print(f"{six1*six2=}") - return six1 * six2 - - -@rule -async def _eg_multiply_add(fetch: Fetch, taskA: Task, taskB: Task, num: int): - # Here we await the dependencies in parallel. - a, b = await asyncio.gather(fetch(taskA), fetch(taskB)) - await asyncio.sleep(0.1) - print(f"{a*b+num=}") - return a * b + num - - -# When interfacing with inputs or in general anything outside the build system, -# Do NOT add @hash_cache, as it makes the task only rerun if a dependency was known to be modified. -# In this case, we have no real dependencies, and our output depends on the filesystem. -# So we leave out @hash_cache to ensure we always check that the file has not changed. -@rule -async def _eg_file(fetch: Fetch, filename: str): - _ = fetch - await asyncio.sleep(0.1) - with open(filename, "r") as f: - contents = f.readlines() - print("file", filename, "\n" + "".join(contents[1:5]), end="") - return contents - - -# Semaphores can be used to limit concurrency -_sem = asyncio.Semaphore(4) - - -@hash_cache -@rule -async def _eg_rec(fetch: Fetch, i: int): - if i // 3 - 1 >= 0: - # Instead of awaiting, dependencies can also be detached and run in the background. - detach(fetch(_eg_rec(i // 2 - 1))) - detach(fetch(_eg_rec(i // 3 - 1))) - else: - detach(fetch(_eg_file("make.py"))) - - # Use semaphore to limit concurrency easily - async with _sem: - print("+ rec", i) - # Simulate some hard work - await asyncio.sleep(0.1) - print("- rec", i) - - -async def main(): - # To actually run the build system, - # 1) Create the store - # Use context manager to ensure the store is saved automatically when exiting - with Build("make.db") as build: - # 2) Use it to await tasks - await build(_eg_rec(1234)) - await asyncio.gather( - build(_eg_thirtysix()), build(_eg_multiply_add(_eg_six(), _eg_six(), 6)) - ) - - # Note that `build(...)` will wait for all detached jobs to complete before returning. - # You may instead use `build.build(...)`, which does not wait for detached jobs. - # You should ensure `build.wait()` is called eventually so detached jobs can complete. - detach(build.build(_eg_rec(2345))) - await build.build(_eg_rec(3456)) - await build.wait() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/examples.py b/examples/examples.py @@ -0,0 +1,96 @@ +import asyncio +import sys + +sys.path += ".." +from make import hash_cache, rule, detach, Fetch, Task, Build + +# Example rules +# Observe the general pattern that every rule is called to get a task, which can then be fetched. +# res = await fetch(rule(task_args...)) + + +@hash_cache +@rule +async def _eg_six(fetch: Fetch): + _ = fetch + six = 6 + print(f"{six=}") + return six + + +@rule +async def _eg_thirtysix(fetch: Fetch): + # Here we await the dependencies serially. + # The second dependency cannot start until the first finishes. + six1 = await fetch(_eg_six()) + six2 = await fetch(_eg_six()) + print(f"{six1*six2=}") + return six1 * six2 + + +@rule +async def _eg_multiply_add(fetch: Fetch, taskA: Task, taskB: Task, num: int): + # Here we await the dependencies in parallel. + a, b = await asyncio.gather(fetch(taskA), fetch(taskB)) + await asyncio.sleep(0.1) + print(f"{a*b+num=}") + return a * b + num + + +# When interfacing with inputs or in general anything outside the build system, +# Do NOT add @hash_cache, as it makes the task only rerun if a dependency was known to be modified. +# In this case, we have no real dependencies, and our output depends on the filesystem. +# So we leave out @hash_cache to ensure we always check that the file has not changed. +@rule +async def _eg_file(fetch: Fetch, filename: str): + _ = fetch + await asyncio.sleep(0.1) + with open(filename, "r") as f: + contents = f.readlines() + print("file", filename, "\n" + "".join(contents[1:5]), end="") + return contents + + +# Semaphores can be used to limit concurrency +_sem = asyncio.Semaphore(4) + + +@hash_cache +@rule +async def _eg_rec(fetch: Fetch, i: int): + if i // 3 - 1 >= 0: + # Instead of awaiting, dependencies can also be detached and run in the background. + detach(fetch(_eg_rec(i // 2 - 1))) + detach(fetch(_eg_rec(i // 3 - 1))) + else: + detach(fetch(_eg_file("make/__init__.py"))) + + # Use semaphore to limit concurrency easily + async with _sem: + print("+ rec", i) + # Simulate some hard work + await asyncio.sleep(0.1) + print("- rec", i) + + +async def main(): + # To actually run the build system, + # 1) Create the store + # Use context manager to ensure the store is saved automatically when exiting + with Build(".makedb") as build: + # 2) Use it to await tasks + await build(_eg_rec(1234)) + await asyncio.gather( + build(_eg_thirtysix()), build(_eg_multiply_add(_eg_six(), _eg_six(), 6)) + ) + + # Note that `build(...)` will wait for all detached jobs to complete before returning. + # You may instead use `build.build(...)`, which does not wait for detached jobs. + # You should ensure `build.wait()` is called eventually so detached jobs can complete. + detach(build.build(_eg_rec(2345))) + await build.build(_eg_rec(3456)) + await build.wait() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/make.py b/make.py @@ -1,606 +0,0 @@ -""" -make.py -------- - -Design inspired by the paper "Build Systems à la Carte" - -- https://github.com/snowleopard/build -- https://www.microsoft.com/en-us/research/wp-content/uploads/2018/03/build-systems.pdf - -Key concepts: - -- The goal is to maintain an up-to-date *store* mapping *tasks* to *values*. -- Tasks are described using rules, functions from parameters to tasks. -- Each rule can choose its own caching policy, the default is a persistent cache keyed by hashes. -- The current scheduler is a top-down (suspending) scheduler. - -make.py improves upon the paper's design in a few ways: - -- Task keys (for book-keeping purposes) are automatically derived from the rule functions. -- Tasks are executed concurrently. -- We split the two concepts Rebuilder and Scheduler into three concepts: - - - (Per-task) Caching policies. - - (Global) Updating strategy. - - (Global) Metadata updaters. - -# Why we re-interpret the concepts Rebuilder and Scheduler - -The paper merges the concept of "metadata updaters" in the Rebuilder and Scheduler. -This sort of makes sense as different rebuilders and schedulers require different metadata. - -However, it means that a rebuilder may need to override the `fetch` function in a call -in order to ensure the metadata required for the rebuilder is created, -and it encourages a local way to build metadata information. -Furthermore, a rebuilder may sometimes require the same metadata as a scheduler's fetch function, -for instance tracking dependency relationships is required for both the -topological sort scheduler as well as trace-based rebuilders (e.g. constructive trace rebuilder). - -So, we instead factor out the metadata updating portion of both rebuilders and schedulers -into a global metadata updater, which can be viewed as yet another wrapper around rules. -However, as this must apply on a global level to support the whole scheduling strategy, -metadata updaters are defined at a global level, unlike the per-task caching policies. - -# TODO - -- Make files on the filesystem a core concept as opposed to merely something you can do. -""" - -import asyncio -import collections -import functools -import hashlib -import inspect -import pickle -import subprocess -import sys -import traceback - -from typing import ( - Any, - Awaitable, - Callable, - Concatenate, - Optional, - ParamSpec, - Protocol, -) - - -class Fetch(Protocol): - """Protocol defining the fetch operation used by tasks.""" - - async def __call__(self, task: "Task") -> Any: ... - - -RuleKey = bytes -TaskKey = tuple -ValueHash = bytes - -P = ParamSpec("P") -RuleFn = Callable[Concatenate[Fetch, TaskKey, "Store", P], Awaitable[Any]] -NiceRuleFn = Callable[Concatenate[Fetch, P], Awaitable[Any]] - - -def make_hash(o: Any) -> bytes: - if isinstance(o, bytes): - h = hashlib.sha256(b"s") - h.update(o) - else: - h = hashlib.sha256(b"r") - h.update(repr(o).encode("utf-8")) - return h.digest() - - -def rule_fn_to_key(fn: Callable) -> RuleKey: - name = fn.__name__ - source = inspect.getsource(fn) - h = hashlib.sha256(source.encode("utf-8")).hexdigest()[:16] - key = f"{name}-{len(source)}-{h}".encode("utf-8") - return key - - -class Task: - """A computation of a value.""" - - __slots__ = "task_key", "rule_fn", "args", "hash" - - task_key: TaskKey - rule_fn: RuleFn - args: tuple - hash: int - - def __init__(self, task_key: TaskKey, rule_fn: RuleFn, *args): - self.task_key = task_key - self.rule_fn = rule_fn - self.args = args - self.hash = hash(self.task_key) - - def __call__(self, fetch: Fetch, store: "Store"): - return self.rule_fn(fetch, self.task_key, store, *self.args) - - def __repr__(self) -> str: - return repr(self.task_key) - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Task): - return NotImplemented - return self.task_key == other.task_key - - def __hash__(self) -> int: - return self.hash - - -class Rule: - """A function that returns tasks.""" - - __slots__ = "rule_key", "rule_fn", "hash" - - rule_key: RuleKey - rule_fn: RuleFn - hash: int - - @staticmethod - def new(rule_fn: RuleFn): - return Rule( - rule_fn_to_key(rule_fn), - rule_fn, - ) - - def __init__(self, rule_key: RuleKey, rule_fn: RuleFn): - self.rule_key = rule_key - self.rule_fn = rule_fn - self.hash = hash(self.rule_key) - - def __call__(self, *args): - return Task( - ( - self.rule_key, - *(arg.task_key if isinstance(arg, Task) else arg for arg in args), - ), - self.rule_fn, - *args, - ) - - def __eq__(self, other): - if not isinstance(other, Rule): - return NotImplemented - return self.rule_key == other.rule_key - - def __hash__(self): - return self.hash - - -class Rules: - """The registry of all rules created.""" - - __slots__ = "rules" - - rules: dict[RuleKey, Rule] - - def __init__(self): - self.rules = dict() - - def eval_task_key(self, task_key: TaskKey) -> Optional[Task]: - rule_key, *arg_keys = task_key - if rule_key not in self.rules: - return None - rule = self.rules[rule_key] - args = [] - for arg in arg_keys: - if isinstance(arg, tuple) and arg[0] not in self.rules: - return None - args.append(self.eval_task_key(arg) if isinstance(arg, tuple) else arg) - return rule(*args) - - @staticmethod - def nice_rule_fn_to_rule_fn(nice_rule_fn, fetch, task_key, store, *args): - return nice_rule_fn(fetch, *args) - - def rule(self, rule_fn: NiceRuleFn) -> Rule: - return self.register( - self.hash_cache( - Rule.new( - functools.update_wrapper( - functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn), - rule_fn, - ) - ) - ) - ) - - def rule_no_cache(self, rule_fn: NiceRuleFn) -> Rule: - return self.register( - Rule.new( - functools.update_wrapper( - functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn), - rule_fn, - ) - ) - ) - - def register(self, rule: Rule) -> Rule: - self.rules[rule.rule_key] = rule - return rule - - def hash_cache(self, rule: Rule) -> Rule: - """Adds hash based caching to a rule - - Attempts to replay the rule by checking if the hashes of each input - it would have obtained if run now matches up with a previous run. - - Currently, there is no cache eviction policy (all previous runs are stored forever). - - TODO: Implement some cache eviction. - """ - rule.rule_fn = functools.update_wrapper( - functools.partial(Rules.hash_cache_fn, self, rule.rule_fn), - rule.rule_fn, - ) - return rule - - @staticmethod - async def track_fetch(fetch: Fetch, new_inputs: list, task: Task): - result = await fetch(task) - new_inputs.append((task.task_key, make_hash(result))) - return result - - async def hash_cache_fn( - self, - inner_rule_fn: RuleFn, - fetch: Fetch, - task_key: TaskKey, - store: "Store", - *args, - ): - """Actual implementation of hash_cache""" - if task_key in store.key_info: - past_runs = store.key_info[task_key] - output_value = store.key_value[task_key] - possible_values = [] - for past_inputs, past_value in past_runs: - for past_input_key, past_input_hash in past_inputs: - input_task = self.eval_task_key(past_input_key) - if not input_task: - break - current_input_value = await fetch(input_task) - if make_hash(current_input_value) != past_input_hash: - break - else: - if output_value == past_value: - return past_value - possible_values.append(past_value) - - if possible_values: - return possible_values[0] - - new_inputs = [] - - new_value = await inner_rule_fn( - functools.partial(Rules.track_fetch, fetch, new_inputs), - task_key, - store, - *args, - ) - store.key_info[task_key].append((new_inputs, new_value)) - return new_value - - -_rules = Rules() -rule = _rules.rule -rule_no_cache = _rules.rule_no_cache -register = _rules.register -hash_cache = _rules.hash_cache - - -class Store: - """Stores a mapping from tasks to their values.""" - - __slots__ = "filename", "rules", "key_value", "key_info" - - @staticmethod - def _fNone(): - return None - - def __init__(self, filename, rules): - self.filename = filename - self.rules = rules - - self.key_value = collections.defaultdict(Store._fNone) - self.key_info = collections.defaultdict(list) - - try: - with open(filename, "rb") as f: - self.key_value, self.key_info = pickle.load(f) - except: - pass - - def save(self): - with open(self.filename, "wb") as f: - pickle.dump((self.key_value, self.key_info), f) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.save() - - -class Detach: - __slots__ = "_background_tasks" - - def __init__(self): - self._background_tasks = set() - - def __call__(self, *args, **kwargs): - task = asyncio.create_task(*args, **kwargs) - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) - - -detach = Detach() - - -class SuspendingScheduler: - __slots__ = "store", "done", "waits" - store: Store - done: set[TaskKey] - waits: dict[TaskKey, asyncio.Event] - - def __init__(self, store: Store): - self.store = store - self.done = set() - self.waits = dict() - - async def wait(self): - while detach._background_tasks: - await asyncio.gather(*detach._background_tasks) - - def build(self, task: Task): - return self.fetch_once(task) - - async def fetch_once(self, task: Task): - task_key = task.task_key - wait = None - event = None - if task_key in self.done: - return self.store.key_value[task_key] - if task_key in self.waits: - wait = self.waits[task_key] - - if wait: - await wait.wait() - return self.store.key_value[task_key] - - event = self.waits[task_key] = asyncio.Event() - try: - self.store.key_value[task_key] = result = await task( - self.fetch_once, self.store - ) - except: - print(traceback.format_exc()) - event.set() - self.store.key_value[task_key] = None - return None - - self.done.add(task_key) - event.set() - - return result - - -class Build: - __slots__ = "_store", "_scheduler" - - def __init__(self, filename, rules=_rules): - self._store = Store(filename, rules) - self._scheduler = SuspendingScheduler(self._store) - - async def __call__(self, task: Task): - result = await self.build(task) - await self.wait() - return result - - def wait(self): - return self._scheduler.wait() - - def build(self, task: Task): - return self._scheduler.build(task) - - def __enter__(self): - self._store.__enter__() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._store.__exit__(exc_type, exc_val, exc_tb) - - -class ShellResult(collections.namedtuple("ShellResult", "stdout stderr returncode")): - __slots__ = () - - @property - def utf8stdout(self): - return self.stdout.decode("utf-8") - - @property - def utf8stderr(self): - return self.stderr.decode("utf-8") - - -EchoNothing = 0 -EchoStdout = 1 -EchoStderr = 2 -EchoAll = 3 - - -async def shell( - cmd, - input=None, - echo=EchoNothing, -) -> ShellResult: - proc = await asyncio.create_subprocess_shell( - cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - stdout, stderr = await proc.communicate(input) - if echo & EchoStdout: - sys.stdout.buffer.write(stdout) - sys.stdout.buffer.flush() - if echo & EchoStderr: - sys.stderr.buffer.write(stderr) - sys.stderr.buffer.flush() - return ShellResult(stdout, stderr, proc.returncode) - - -def run_in_executor(f, *args, executor=None): - return asyncio.get_running_loop().run_in_executor(executor, f, *args) - - -# class AsyncWrapperSpec: -# __slots__ = "async_methods", "async_subobjects" - -# def __init__( -# self, -# async_methods=set(), -# async_subobjects=dict(), -# ): -# self.async_methods = set(async_methods) -# self.async_subobjects = dict(async_subobjects) - - -# class AsyncWrapper: -# __slots__ = "_obj", "_spec", "_executor" - -# def __init__(self, obj, spec=AsyncWrapperSpec(), executor=None): -# self._obj = obj -# self._spec = spec -# self._executor = executor - -# @staticmethod -# def wrapper(f, executor, *args): -# return run_in_executor(f, *args, executor=executor) - -# def __getattr__(self, attr): -# if attr in self._spec.async_methods: -# return functools.partial( -# self.wrapper, getattr(self._obj, attr), self._executor -# ) -# if attr in self._spec.async_subobjects: -# return AsyncWrapper( -# getattr(self._obj, attr), -# spec=self._spec.async_subobjects[attr], -# executor=self._executor, -# ) -# return getattr(self._obj, attr) - -# async def __aenter__(self): -# return AsyncWrapper( -# await run_in_executor(self._obj.__enter__, executor=self._executor), -# spec=self._spec, -# ) - -# async def __aexit__(self, exc_type, exc_val, exc_tb): -# return await run_in_executor( -# self._obj.__exit__, exc_type, exc_val, exc_tb, executor=self._executor -# ) - -# def __aiter__(self): -# return AsyncWrapper(self._obj.__iter__(), spec=self._spec) - -# @staticmethod -# def wrapped_next(obj): -# try: -# return True, next(obj) -# except StopIteration: -# return False, None - -# async def __anext__(self): -# ok, res = await run_in_executor( -# functools.partial(self.wrapped_next, self._obj), executor=self._executor -# ) -# if not ok: -# raise StopAsyncIteration -# return res - -# @staticmethod -# def wrapped_foreach(f, obj): -# for chunk in obj: -# f(chunk) - -# async def foreach(self, f): -# await run_in_executor( -# functools.partial(self.wrapped_foreach, f, self._obj), -# executor=self._executor, -# ) - - -# class AsyncIO(Protocol): -# async def __aenter__(self) -> "AsyncIO": ... -# async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ... - -# async def close(self) -> None: ... -# def fileno(self) -> int: ... -# async def flush(self) -> None: ... -# def isatty(self) -> bool: ... -# def readable(self) -> bool: ... -# async def readlines(self, hint: int = -1, /) -> list[bytes]: ... -# async def seek(self, offset: int, whence: int = 0, /) -> int: ... -# def seekable(self) -> bool: ... -# async def tell(self) -> int: ... -# async def truncate(self, size: int | None = None, /) -> int: ... -# def writable(self) -> bool: ... -# async def writelines(self, lines, /) -> None: ... -# async def readline(self, size: int | None = -1, /) -> bytes: ... -# @property -# def closed(self) -> bool: ... -# async def readall(self) -> bytes: ... -# async def readinto(self, buffer, /) -> Any: ... -# async def write(self, b, /) -> Any: ... -# async def read(self, size: int = -1, /) -> Any: ... -# def detach(self) -> "AsyncIO": ... -# async def readinto1(self, buffer, /) -> int: ... -# async def read1(self, size: int = -1, /) -> bytes: ... - -# mode: str -# name: Any - -# @property -# def closefd(self) -> bool: ... - -# raw: "AsyncIO" - -# async def peek(self, size: int = 0, /) -> bytes: ... - -# encoding: str -# errors: str | None -# newlines: str | tuple[str, ...] | None - -# def __aiter__(self) -> AsyncIterator[Any]: ... -# async def __anext__(self) -> Any: ... - -# async def foreach(self, f) -> Any: ... - - -# async def open_async(*args, executor=None) -> AsyncIO: -# # List of methods: https://docs.python.org/3/library/io.html -# async_methods = ( -# "close", -# "detach", -# "flush", -# "peek", -# "read", -# "read1", -# "readall", -# "readinto", -# "readinto1", -# "readline", -# "readlines", -# "seek", -# "tell", -# "truncate", -# "write", -# "writelines", -# ) -# return AsyncWrapper( -# await run_in_executor(open, *args, executor=executor), -# AsyncWrapperSpec(async_methods, {"buffer": AsyncWrapperSpec(async_methods)}), -# ) # type: ignore diff --git a/make/__init__.py b/make/__init__.py @@ -0,0 +1,679 @@ +""" +make.py +------- + +Design inspired by the paper "Build Systems à la Carte" + +- https://github.com/snowleopard/build +- https://www.microsoft.com/en-us/research/wp-content/uploads/2018/03/build-systems.pdf + +Key concepts: + +- The goal is to maintain an up-to-date *store* mapping *tasks* to *values*. +- Tasks are described using rules, functions from parameters to tasks. +- Each rule can choose its own caching policy, the default is a persistent cache keyed by hashes. +- The current scheduler is a top-down (suspending) scheduler. + +make.py improves upon the paper's design in a few ways: + +- Task keys (for book-keeping purposes) are automatically derived from the rule functions. +- Tasks are executed concurrently. +- We split the two concepts Rebuilder and Scheduler into three concepts: + + - (Per-task) Caching policies. + - (Global) Updating strategy. + - (Global) Metadata updaters. + +# Why we re-interpret the concepts Rebuilder and Scheduler + +The paper merges the concept of "metadata updaters" in the Rebuilder and Scheduler. +This sort of makes sense as different rebuilders and schedulers require different metadata. + +However, it means that a rebuilder may need to override the `fetch` function in a call +in order to ensure the metadata required for the rebuilder is created, +and it encourages a local way to build metadata information. +Furthermore, a rebuilder may sometimes require the same metadata as a scheduler's fetch function, +for instance tracking dependency relationships is required for both the +topological sort scheduler as well as trace-based rebuilders (e.g. constructive trace rebuilder). + +So, we instead factor out the metadata updating portion of both rebuilders and schedulers +into a global metadata updater, which can be viewed as yet another wrapper around rules. +However, as this must apply on a global level to support the whole scheduling strategy, +metadata updaters are defined at a global level, unlike the per-task caching policies. + +# TODO + +- Make files on the filesystem a core concept as opposed to merely something you can do. +""" + +import asyncio +import collections +import functools +import hashlib +import inspect +import pickle +import subprocess +import sys +import traceback + +from typing import ( + Any, + Awaitable, + Callable, + Concatenate, + Optional, + ParamSpec, + Protocol, +) + + +class Fetch(Protocol): + """Protocol defining the fetch operation used by tasks.""" + + async def __call__(self, task: "Task") -> Any: ... + + +RuleKey = bytes +TaskKey = tuple +ValueHash = bytes + +P = ParamSpec("P") +RuleFn = Callable[Concatenate[Fetch, TaskKey, "Store", P], Awaitable[Any]] +NiceRuleFn = Callable[Concatenate[Fetch, P], Awaitable[Any]] + + +def make_hash(o: Any) -> bytes: + if isinstance(o, bytes): + h = hashlib.sha256(b"s") + h.update(o) + else: + h = hashlib.sha256(b"r") + h.update(repr(o).encode("utf-8")) + return h.digest() + + +def rule_fn_to_key(fn: Callable) -> RuleKey: + name = fn.__name__ + source = inspect.getsource(fn) + h = hashlib.sha256(source.encode("utf-8")).hexdigest()[:16] + key = f"{name}-{len(source)}-{h}".encode("utf-8") + return key + + +class Task: + """A computation of a value.""" + + __slots__ = "task_key", "rule_fn", "args", "hash" + + task_key: TaskKey + rule_fn: RuleFn + args: tuple + hash: int + + def __init__(self, task_key: TaskKey, rule_fn: RuleFn, *args): + self.task_key = task_key + self.rule_fn = rule_fn + self.args = args + self.hash = hash(self.task_key) + + def __call__(self, fetch: Fetch, store: "Store"): + return self.rule_fn(fetch, self.task_key, store, *self.args) + + def __repr__(self) -> str: + return repr(self.task_key) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Task): + return NotImplemented + return self.task_key == other.task_key + + def __hash__(self) -> int: + return self.hash + + +class Rule: + """A function that returns tasks.""" + + __slots__ = "rule_key", "rule_fn", "hash" + + rule_key: RuleKey + rule_fn: RuleFn + hash: int + + @staticmethod + def new(rule_fn: RuleFn): + return Rule( + rule_fn_to_key(rule_fn), + rule_fn, + ) + + def __init__(self, rule_key: RuleKey, rule_fn: RuleFn): + self.rule_key = rule_key + self.rule_fn = rule_fn + self.hash = hash(self.rule_key) + + def __call__(self, *args): + return Task( + ( + self.rule_key, + *(arg.task_key if isinstance(arg, Task) else arg for arg in args), + ), + self.rule_fn, + *args, + ) + + def __eq__(self, other): + if not isinstance(other, Rule): + return NotImplemented + return self.rule_key == other.rule_key + + def __hash__(self): + return self.hash + + +def singleton(cls): + cls.main = cls() + return cls + + +@singleton +class Rules: + """The registry of all rules created.""" + + # Main registry + main: "Rules" + + __slots__ = "rules" + + rules: dict[RuleKey, Rule] + + def __init__(self): + self.rules = dict() + + def eval_task_key(self, task_key: TaskKey) -> Optional[Task]: + rule_key, *arg_keys = task_key + if rule_key not in self.rules: + return None + rule = self.rules[rule_key] + args = [] + for arg in arg_keys: + if isinstance(arg, tuple) and arg[0] not in self.rules: + return None + args.append(self.eval_task_key(arg) if isinstance(arg, tuple) else arg) + return rule(*args) + + @staticmethod + def nice_rule_fn_to_rule_fn(nice_rule_fn, fetch, task_key, store, *args): + return nice_rule_fn(fetch, *args) + + def rule(self, rule_fn: NiceRuleFn) -> Rule: + return self.register( + self.hash_cache( + Rule.new( + functools.update_wrapper( + functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn), + rule_fn, + ) + ) + ) + ) + + def rule_no_cache(self, rule_fn: NiceRuleFn) -> Rule: + return self.register( + Rule.new( + functools.update_wrapper( + functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn), + rule_fn, + ) + ) + ) + + def register(self, rule: Rule) -> Rule: + self.rules[rule.rule_key] = rule + return rule + + def hash_cache(self, rule: Rule) -> Rule: + """Adds hash based caching to a rule + + Attempts to replay the rule by checking if the hashes of each input + it would have obtained if run now matches up with a previous run. + + Currently, there is no cache eviction policy (all previous runs are stored forever). + + TODO: Implement some cache eviction. + """ + rule.rule_fn = functools.update_wrapper( + functools.partial(Rules.hash_cache_fn, self, rule.rule_fn), + rule.rule_fn, + ) + return rule + + @staticmethod + async def track_fetch(fetch: Fetch, new_inputs: list, task: Task): + result = await fetch(task) + new_inputs.append((task.task_key, make_hash(result))) + return result + + async def hash_cache_fn( + self, + inner_rule_fn: RuleFn, + fetch: Fetch, + task_key: TaskKey, + store: "Store", + *args, + ): + """Actual implementation of hash_cache""" + if task_key in store.key_info: + past_runs = store.key_info[task_key] + output_value = store.key_value[task_key] + possible_values = [] + for past_inputs, past_value in past_runs: + for past_input_key, past_input_hash in past_inputs: + input_task = self.eval_task_key(past_input_key) + if not input_task: + break + current_input_value = await fetch(input_task) + if make_hash(current_input_value) != past_input_hash: + break + else: + if output_value == past_value: + return past_value + possible_values.append(past_value) + + if possible_values: + return possible_values[0] + + new_inputs = [] + + new_value = await inner_rule_fn( + functools.partial(Rules.track_fetch, fetch, new_inputs), + task_key, + store, + *args, + ) + store.key_info[task_key].append((new_inputs, new_value)) + return new_value + + +# Rules.main = Rules() +rule = Rules.main.rule +rule_no_cache = Rules.main.rule_no_cache +register = Rules.main.register +hash_cache = Rules.main.hash_cache + + +class Store: + """Stores a mapping from tasks to their values.""" + + __slots__ = "filename", "rules", "key_value", "key_info" + + @staticmethod + def _fNone(): + return None + + def __init__(self, filename, rules): + self.filename = filename + self.rules = rules + + self.key_value = collections.defaultdict(Store._fNone) + self.key_info = collections.defaultdict(list) + + try: + with open(filename, "rb") as f: + self.key_value, self.key_info = pickle.load(f) + except: + pass + + def save(self): + with open(self.filename, "wb") as f: + pickle.dump((self.key_value, self.key_info), f) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.save() + + +class Detach: + __slots__ = "_background_tasks" + + def __init__(self): + self._background_tasks = set() + + def __call__(self, *args, **kwargs): + task = asyncio.create_task(*args, **kwargs) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + +detach = Detach() + + +class SuspendingScheduler: + __slots__ = "store", "done", "waits" + store: Store + done: set[TaskKey] + waits: dict[TaskKey, asyncio.Event] + + def __init__(self, store: Store): + self.store = store + self.done = set() + self.waits = dict() + + async def wait(self): + while detach._background_tasks: + await asyncio.gather(*detach._background_tasks) + + async def build(self, *tasks: Task): + return await asyncio.gather(*(self.fetch_once(task) for task in tasks)) + + async def fetch_once(self, task: Task): + task_key = task.task_key + wait = None + event = None + if task_key in self.done: + return self.store.key_value[task_key] + if task_key in self.waits: + wait = self.waits[task_key] + + if wait: + await wait.wait() + return self.store.key_value[task_key] + + event = self.waits[task_key] = asyncio.Event() + try: + self.store.key_value[task_key] = result = await task( + self.fetch_once, self.store + ) + except: + print(traceback.format_exc()) + event.set() + self.store.key_value[task_key] = None + return None + + self.done.add(task_key) + event.set() + + return result + + +class Build: + __slots__ = "_store", "_scheduler" + + def __init__(self, filename, rules=Rules.main): + self._store = Store(filename, rules) + self._scheduler = SuspendingScheduler(self._store) + + async def __call__(self, *tasks: Task): + result = await self.build(*tasks) + await self.wait() + return result + + async def wait(self): + return await self._scheduler.wait() + + async def build(self, *tasks: Task): + return await self._scheduler.build(*tasks) + + def __enter__(self): + self._store.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._store.__exit__(exc_type, exc_val, exc_tb) + + +def build(*tasks, filename=".makedb", rules=Rules.main): + with Build(filename, rules) as build: + asyncio.run(build(*tasks)) + + +class ShellResult(collections.namedtuple("ShellResult", "stdout stderr returncode")): + __slots__ = () + + @property + def utf8stdout(self): + return self.stdout.decode("utf-8") + + @property + def utf8stderr(self): + return self.stderr.decode("utf-8") + + +EchoNothing = 0 +EchoStdout = 1 +EchoStderr = 2 +EchoAll = 3 + + +async def _exec_reader(istream, ostream, echo: Any = False): + contents = b"" + async for chunk in istream: + contents += chunk + if echo: + ostream.write(chunk) + ostream.flush() + return contents + + +async def exec( + program, + *args, + input: bytes | bytearray | memoryview | None = None, + echo: int = EchoNothing, +) -> ShellResult: + + proc = await asyncio.create_subprocess_exec( + program, + *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + if input is not None: + proc.stdin.write(input) # type: ignore + _, stdout, stderr = await asyncio.gather( + proc.stdin.drain(), # type: ignore + _exec_reader(proc.stdout, sys.stdout.buffer, echo=echo & EchoStdout), + _exec_reader(proc.stderr, sys.stderr.buffer, echo=echo & EchoStderr), + ) + else: + stdout, stderr = await asyncio.gather( + _exec_reader(proc.stdout, sys.stdout.buffer, echo=echo & EchoStdout), + _exec_reader(proc.stderr, sys.stderr.buffer, echo=echo & EchoStderr), + ) + + return ShellResult(stdout, stderr, proc.returncode) + + +async def shell( + cmd, + input: bytes | bytearray | memoryview | None = None, + echo: int = EchoNothing, +) -> ShellResult: + proc = await asyncio.create_subprocess_shell( + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + stdout, stderr = await proc.communicate(input) + if echo & EchoStdout: + sys.stdout.buffer.write(stdout) + sys.stdout.buffer.flush() + if echo & EchoStderr: + sys.stderr.buffer.write(stderr) + sys.stderr.buffer.flush() + return ShellResult(stdout, stderr, proc.returncode) + + +def run_in_executor(f, *args, executor=None): + return asyncio.get_running_loop().run_in_executor(executor, f, *args) + + +def main(globals, filename=".makedb", default_target="all"): + targets = sys.argv[1:] + if not targets: + targets.append(default_target) + actual_targets: list[Task] = [] + for target in targets: + if "(" in target: + actual_targets.append(eval(target, globals=globals)) # type: ignore + else: + actual_targets.append(eval(target + "()", globals=globals)) # type: ignore + + async def async_main(actual_targets): + await asyncio.gather(*(build(target) for target in actual_targets)) + + with Build(filename) as build: + return asyncio.run(async_main(actual_targets)) + + +# class AsyncWrapperSpec: +# __slots__ = "async_methods", "async_subobjects" + +# def __init__( +# self, +# async_methods=set(), +# async_subobjects=dict(), +# ): +# self.async_methods = set(async_methods) +# self.async_subobjects = dict(async_subobjects) + + +# class AsyncWrapper: +# __slots__ = "_obj", "_spec", "_executor" + +# def __init__(self, obj, spec=AsyncWrapperSpec(), executor=None): +# self._obj = obj +# self._spec = spec +# self._executor = executor + +# @staticmethod +# def wrapper(f, executor, *args): +# return run_in_executor(f, *args, executor=executor) + +# def __getattr__(self, attr): +# if attr in self._spec.async_methods: +# return functools.partial( +# self.wrapper, getattr(self._obj, attr), self._executor +# ) +# if attr in self._spec.async_subobjects: +# return AsyncWrapper( +# getattr(self._obj, attr), +# spec=self._spec.async_subobjects[attr], +# executor=self._executor, +# ) +# return getattr(self._obj, attr) + +# async def __aenter__(self): +# return AsyncWrapper( +# await run_in_executor(self._obj.__enter__, executor=self._executor), +# spec=self._spec, +# ) + +# async def __aexit__(self, exc_type, exc_val, exc_tb): +# return await run_in_executor( +# self._obj.__exit__, exc_type, exc_val, exc_tb, executor=self._executor +# ) + +# def __aiter__(self): +# return AsyncWrapper(self._obj.__iter__(), spec=self._spec) + +# @staticmethod +# def wrapped_next(obj): +# try: +# return True, next(obj) +# except StopIteration: +# return False, None + +# async def __anext__(self): +# ok, res = await run_in_executor( +# functools.partial(self.wrapped_next, self._obj), executor=self._executor +# ) +# if not ok: +# raise StopAsyncIteration +# return res + +# @staticmethod +# def wrapped_foreach(f, obj): +# for chunk in obj: +# f(chunk) + +# async def foreach(self, f): +# await run_in_executor( +# functools.partial(self.wrapped_foreach, f, self._obj), +# executor=self._executor, +# ) + + +# class AsyncIO(Protocol): +# async def __aenter__(self) -> "AsyncIO": ... +# async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ... + +# async def close(self) -> None: ... +# def fileno(self) -> int: ... +# async def flush(self) -> None: ... +# def isatty(self) -> bool: ... +# def readable(self) -> bool: ... +# async def readlines(self, hint: int = -1, /) -> list[bytes]: ... +# async def seek(self, offset: int, whence: int = 0, /) -> int: ... +# def seekable(self) -> bool: ... +# async def tell(self) -> int: ... +# async def truncate(self, size: int | None = None, /) -> int: ... +# def writable(self) -> bool: ... +# async def writelines(self, lines, /) -> None: ... +# async def readline(self, size: int | None = -1, /) -> bytes: ... +# @property +# def closed(self) -> bool: ... +# async def readall(self) -> bytes: ... +# async def readinto(self, buffer, /) -> Any: ... +# async def write(self, b, /) -> Any: ... +# async def read(self, size: int = -1, /) -> Any: ... +# def detach(self) -> "AsyncIO": ... +# async def readinto1(self, buffer, /) -> int: ... +# async def read1(self, size: int = -1, /) -> bytes: ... + +# mode: str +# name: Any + +# @property +# def closefd(self) -> bool: ... + +# raw: "AsyncIO" + +# async def peek(self, size: int = 0, /) -> bytes: ... + +# encoding: str +# errors: str | None +# newlines: str | tuple[str, ...] | None + +# def __aiter__(self) -> AsyncIterator[Any]: ... +# async def __anext__(self) -> Any: ... + +# async def foreach(self, f) -> Any: ... + + +# async def open_async(*args, executor=None) -> AsyncIO: +# # List of methods: https://docs.python.org/3/library/io.html +# async_methods = ( +# "close", +# "detach", +# "flush", +# "peek", +# "read", +# "read1", +# "readall", +# "readinto", +# "readinto1", +# "readline", +# "readlines", +# "seek", +# "tell", +# "truncate", +# "write", +# "writelines", +# ) +# return AsyncWrapper( +# await run_in_executor(open, *args, executor=executor), +# AsyncWrapperSpec(async_methods, {"buffer": AsyncWrapperSpec(async_methods)}), +# ) # type: ignore diff --git a/package.py b/package.py @@ -0,0 +1,100 @@ +import asyncio +import tempfile +import venv + +import make + + +@make.rule_no_cache +async def tmp_venv(_): + dir = tempfile.TemporaryDirectory() + venv.create(dir.name) + packages = ( + "build", + "hatchling", + "pyBake", + "aiofiles", + ) + await make.exec( + "pip", + "install", + "--prefix", + dir.name, + "--force-reinstall", + *packages, + echo=make.EchoAll, + ) + return dir + + +@make.rule_no_cache +async def build(f): + dir = (await f(tmp_venv())).name + await make.exec( + f"{dir}/bin/python", "-m", "build", "--no-isolation", echo=make.EchoAll + ) + + +@make.rule_no_cache +async def bake(f): + dir = (await f(tmp_venv())).name + await make.exec( + f"{dir}/bin/python", + "-c", + """\ +#!/usr/bin/env python3 +from pybake import PyBake + +HEADER = \"""\ +## make.py - Single file version ## +\""" + +FOOTER = \"""\ +from make import * ## +import aiofiles ## +\""" + +pb = PyBake(HEADER, FOOTER) +import make +import aiofiles + +pb.add_module(make) +pb.add_module(aiofiles) + +import os + +os.makedirs("dist", exist_ok=True) +pb.write_bake("dist/make.py") +os.chmod("dist/make.py", int("644", 8)) +""", + echo=make.EchoAll, + ) + + +@make.rule_no_cache +async def tag_release(f, *tasks): + if (await make.exec("git", "status", "--porcelain")).stdout: + raise RuntimeError("Working directory not clean") + branch, *_ = await asyncio.gather( + make.exec("git", "branch", "--show-current"), + *(f(t) for t in tasks), + ) + await make.shell( + f"""\ + git switch --detach + git add --force dist + git commit -m 'Build packages' + git tag --force nightly + git switch '{branch.utf8stdout.strip()}' + """, + echo=make.EchoAll, + ) + + +@make.rule_no_cache +def all(f): + return asyncio.gather(f(build()), f(bake())) + + +if __name__ == "__main__": + exit(make.main(globals())) diff --git a/pyproject.toml b/pyproject.toml @@ -0,0 +1,8 @@ +[build-system] +requires = ["hatchling >= 1.26"] +build-backend = "hatchling.build" +[project] +name = "make" +version = "0.0.1" +[tool.hatch.build.targets.wheel] +packages = ["make"]