commit 78628084d9fa30bc26b45f8fcca61fbf4d9c88fa
parent 02d5f992b24639a2fa9ade4d49cc4f1cfd7c5531
Author: gracefu <81774659+gracefuu@users.noreply.github.com>
Date: Wed, 16 Apr 2025 07:52:38 +0800
Implement async shell and open (but comment it out cause it looks bad), not crash on exception (but not handling properly either), ensure typing works with Python 3.10
Diffstat:
| A | README.md | | | 1 | + |
| A | bench.py | | | 85 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | make.py | | | 238 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- |
3 files changed, 311 insertions(+), 13 deletions(-)
diff --git a/README.md b/README.md
@@ -0,0 +1 @@
+Compatible with Python 3.10+.
diff --git a/bench.py b/bench.py
@@ -0,0 +1,85 @@
+import time
+import functools
+
+
+def f1(i):
+ def g():
+ return i + 1
+
+ return g
+
+
+def g_expanded(i):
+ return i + 1
+
+
+def f2(i):
+ return functools.partial(g_expanded, i)
+
+
+class f3:
+ __slots__ = "i"
+
+ def __init__(self, i):
+ self.i = i
+
+ def __call__(self):
+ return self.i + 1
+
+
+class f4(tuple):
+ def __call__(self):
+ return self[0] + 1
+
+
+def main(N=2000000, WARM=200000):
+ for _ in range(5):
+ x1 = [f1(i) for i in range(WARM)]
+ x2 = [f2(i) for i in range(WARM)]
+ x3 = [f3(i) for i in range(WARM)]
+ x4 = [f4((i,)) for i in range(WARM)]
+
+ [f() for f in x1]
+ [f() for f in x2]
+ [f() for f in x3]
+ [f() for f in x4]
+
+ start = time.time()
+ x1 = [f1(i) for i in range(N)]
+ print(time.time() - start)
+
+ start = time.time()
+ x2 = [f2(i) for i in range(N)]
+ print(time.time() - start)
+
+ start = time.time()
+ x3 = [f3(i) for i in range(N)]
+ print(time.time() - start)
+
+ start = time.time()
+ x4 = [f4((i,)) for i in range(N)]
+ print(time.time() - start)
+
+ print("----")
+
+ start = time.time()
+ [f() for f in x1]
+ print(time.time() - start)
+
+ start = time.time()
+ [f() for f in x2]
+ print(time.time() - start)
+
+ start = time.time()
+ [f() for f in x3]
+ print(time.time() - start)
+
+ start = time.time()
+ [f() for f in x4]
+ print(time.time() - start)
+
+ print("====")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/make.py b/make.py
@@ -47,21 +47,23 @@ metadata updaters are defined at a global level, unlike the per-task caching pol
"""
import asyncio
-import functools
-import pickle
-import inspect
import collections
+import functools
import hashlib
+import inspect
+import pickle
+import subprocess
+import sys
+import traceback
from typing import (
Any,
Awaitable,
+ Callable,
Concatenate,
Optional,
- Callable,
+ ParamSpec,
Protocol,
- Tuple,
- List,
)
@@ -74,10 +76,10 @@ class Fetch(Protocol):
RuleKey = bytes
TaskKey = tuple
ValueHash = bytes
-TaskInputs = List[Tuple[TaskKey, ValueHash]]
-RuleFn = Callable[Concatenate[Fetch, TaskKey, "Store", ...], Awaitable[Any]]
-NiceRuleFn = Callable[Concatenate[Fetch, ...], Awaitable[Any]]
+P = ParamSpec("P")
+RuleFn = Callable[Concatenate[Fetch, TaskKey, "Store", P], Awaitable[Any]]
+NiceRuleFn = Callable[Concatenate[Fetch, P], Awaitable[Any]]
def make_hash(o: Any) -> bytes:
@@ -207,6 +209,16 @@ class Rules:
)
)
+ def rule_no_cache(self, rule_fn: NiceRuleFn) -> Rule:
+ return self.register(
+ Rule.new(
+ functools.update_wrapper(
+ functools.partial(Rules.nice_rule_fn_to_rule_fn, rule_fn),
+ rule_fn,
+ )
+ )
+ )
+
def register(self, rule: Rule) -> Rule:
self.rules[rule.rule_key] = rule
return rule
@@ -276,6 +288,7 @@ class Rules:
_rules = Rules()
rule = _rules.rule
+rule_no_cache = _rules.rule_no_cache
register = _rules.register
hash_cache = _rules.hash_cache
@@ -360,9 +373,16 @@ class SuspendingScheduler:
return self.store.key_value[task_key]
event = self.waits[task_key] = asyncio.Event()
- self.store.key_value[task_key] = result = await task(
- self.fetch_once, self.store
- )
+ try:
+ self.store.key_value[task_key] = result = await task(
+ self.fetch_once, self.store
+ )
+ except:
+ print(traceback.format_exc())
+ event.set()
+ self.store.key_value[task_key] = None
+ return None
+
self.done.add(task_key)
event.set()
@@ -377,8 +397,9 @@ class Build:
self._scheduler = SuspendingScheduler(self._store)
async def __call__(self, task: Task):
- await self.build(task)
+ result = await self.build(task)
await self.wait()
+ return result
def wait(self):
return self._scheduler.wait()
@@ -392,3 +413,194 @@ class Build:
def __exit__(self, exc_type, exc_val, exc_tb):
self._store.__exit__(exc_type, exc_val, exc_tb)
+
+
+class ShellResult(collections.namedtuple("ShellResult", "stdout stderr returncode")):
+ __slots__ = ()
+
+ @property
+ def utf8stdout(self):
+ return self.stdout.decode("utf-8")
+
+ @property
+ def utf8stderr(self):
+ return self.stderr.decode("utf-8")
+
+
+EchoNothing = 0
+EchoStdout = 1
+EchoStderr = 2
+EchoAll = 3
+
+
+async def shell(
+ cmd,
+ input=None,
+ echo=EchoNothing,
+) -> ShellResult:
+ proc = await asyncio.create_subprocess_shell(
+ cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
+ stdout, stderr = await proc.communicate(input)
+ if echo & EchoStdout:
+ sys.stdout.buffer.write(stdout)
+ sys.stdout.buffer.flush()
+ if echo & EchoStderr:
+ sys.stderr.buffer.write(stderr)
+ sys.stderr.buffer.flush()
+ return ShellResult(stdout, stderr, proc.returncode)
+
+
+def run_in_executor(f, *args, executor=None):
+ return asyncio.get_running_loop().run_in_executor(executor, f, *args)
+
+
+# class AsyncWrapperSpec:
+# __slots__ = "async_methods", "async_subobjects"
+
+# def __init__(
+# self,
+# async_methods=set(),
+# async_subobjects=dict(),
+# ):
+# self.async_methods = set(async_methods)
+# self.async_subobjects = dict(async_subobjects)
+
+
+# class AsyncWrapper:
+# __slots__ = "_obj", "_spec", "_executor"
+
+# def __init__(self, obj, spec=AsyncWrapperSpec(), executor=None):
+# self._obj = obj
+# self._spec = spec
+# self._executor = executor
+
+# @staticmethod
+# def wrapper(f, executor, *args):
+# return run_in_executor(f, *args, executor=executor)
+
+# def __getattr__(self, attr):
+# if attr in self._spec.async_methods:
+# return functools.partial(
+# self.wrapper, getattr(self._obj, attr), self._executor
+# )
+# if attr in self._spec.async_subobjects:
+# return AsyncWrapper(
+# getattr(self._obj, attr),
+# spec=self._spec.async_subobjects[attr],
+# executor=self._executor,
+# )
+# return getattr(self._obj, attr)
+
+# async def __aenter__(self):
+# return AsyncWrapper(
+# await run_in_executor(self._obj.__enter__, executor=self._executor),
+# spec=self._spec,
+# )
+
+# async def __aexit__(self, exc_type, exc_val, exc_tb):
+# return await run_in_executor(
+# self._obj.__exit__, exc_type, exc_val, exc_tb, executor=self._executor
+# )
+
+# def __aiter__(self):
+# return AsyncWrapper(self._obj.__iter__(), spec=self._spec)
+
+# @staticmethod
+# def wrapped_next(obj):
+# try:
+# return True, next(obj)
+# except StopIteration:
+# return False, None
+
+# async def __anext__(self):
+# ok, res = await run_in_executor(
+# functools.partial(self.wrapped_next, self._obj), executor=self._executor
+# )
+# if not ok:
+# raise StopAsyncIteration
+# return res
+
+# @staticmethod
+# def wrapped_foreach(f, obj):
+# for chunk in obj:
+# f(chunk)
+
+# async def foreach(self, f):
+# await run_in_executor(
+# functools.partial(self.wrapped_foreach, f, self._obj),
+# executor=self._executor,
+# )
+
+
+# class AsyncIO(Protocol):
+# async def __aenter__(self) -> "AsyncIO": ...
+# async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
+
+# async def close(self) -> None: ...
+# def fileno(self) -> int: ...
+# async def flush(self) -> None: ...
+# def isatty(self) -> bool: ...
+# def readable(self) -> bool: ...
+# async def readlines(self, hint: int = -1, /) -> list[bytes]: ...
+# async def seek(self, offset: int, whence: int = 0, /) -> int: ...
+# def seekable(self) -> bool: ...
+# async def tell(self) -> int: ...
+# async def truncate(self, size: int | None = None, /) -> int: ...
+# def writable(self) -> bool: ...
+# async def writelines(self, lines, /) -> None: ...
+# async def readline(self, size: int | None = -1, /) -> bytes: ...
+# @property
+# def closed(self) -> bool: ...
+# async def readall(self) -> bytes: ...
+# async def readinto(self, buffer, /) -> Any: ...
+# async def write(self, b, /) -> Any: ...
+# async def read(self, size: int = -1, /) -> Any: ...
+# def detach(self) -> "AsyncIO": ...
+# async def readinto1(self, buffer, /) -> int: ...
+# async def read1(self, size: int = -1, /) -> bytes: ...
+
+# mode: str
+# name: Any
+
+# @property
+# def closefd(self) -> bool: ...
+
+# raw: "AsyncIO"
+
+# async def peek(self, size: int = 0, /) -> bytes: ...
+
+# encoding: str
+# errors: str | None
+# newlines: str | tuple[str, ...] | None
+
+# def __aiter__(self) -> AsyncIterator[Any]: ...
+# async def __anext__(self) -> Any: ...
+
+# async def foreach(self, f) -> Any: ...
+
+
+# async def open_async(*args, executor=None) -> AsyncIO:
+# # List of methods: https://docs.python.org/3/library/io.html
+# async_methods = (
+# "close",
+# "detach",
+# "flush",
+# "peek",
+# "read",
+# "read1",
+# "readall",
+# "readinto",
+# "readinto1",
+# "readline",
+# "readlines",
+# "seek",
+# "tell",
+# "truncate",
+# "write",
+# "writelines",
+# )
+# return AsyncWrapper(
+# await run_in_executor(open, *args, executor=executor),
+# AsyncWrapperSpec(async_methods, {"buffer": AsyncWrapperSpec(async_methods)}),
+# ) # type: ignore