Skip to content

Commit

Permalink
add upload to attic cache
Browse files Browse the repository at this point in the history
  • Loading branch information
techknowlogick committed Oct 25, 2024
1 parent 1775c73 commit 6101c80
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions nix_fast_build/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class Options:

cachix_cache: str | None = None

attic_cache: str | None = None

@property
def remote_url(self) -> None | str:
if self.remote is None:
Expand All @@ -97,6 +99,7 @@ class ResultType(enum.Enum):
UPLOAD = enum.auto()
DOWNLOAD = enum.auto()
CACHIX = enum.auto()
ATTIC = enum.auto()


@dataclass
Expand Down Expand Up @@ -183,6 +186,11 @@ async def parse_args(args: list[str]) -> Options:
help="Cachix cache to upload to",
default=None,
)
parser.add_argument(
"--attic-cache",
help="Attic cache to upload to",
default=None,
)
parser.add_argument(
"--no-nom",
help="Don't use nix-output-monitor to print build output (default: false)",
Expand Down Expand Up @@ -318,6 +326,7 @@ async def parse_args(args: list[str]) -> Options:
eval_workers=a.eval_workers,
copy_to=a.copy_to,
cachix_cache=a.cachix_cache,
attic_cache=a.attic_cache,
no_link=a.no_link,
out_link=a.out_link,
result_format=ResultFormat[a.result_format.upper()],
Expand Down Expand Up @@ -633,6 +642,22 @@ async def upload_cachix(
proc = await asyncio.create_subprocess_exec(*cmd)
return await proc.wait()

async def upload_attic(self, opts: Options) -> int:
if opts.attic_cache is None:
return 0
cmd = maybe_remote(
[
*nix_shell("nixpkgs#attic-client", "attic"),
"push",
opts.attic_cache,
*list(self.outputs.values()),
],
opts,
)
logger.debug("run %s", shlex.join(cmd))
proc = await asyncio.create_subprocess_exec(*cmd)
return await proc.wait()

async def download(self, exit_stack: AsyncExitStack, opts: Options) -> int:
if not opts.remote_url or not opts.download:
return 0
Expand Down Expand Up @@ -757,6 +782,7 @@ async def run_builds(
build_queue: QueueWithContext[Job | StopTask],
upload_queue: QueueWithContext[Build | StopTask],
cachix_queue: QueueWithContext[Build | StopTask],
attic_queue: QueueWithContext[Build | StopTask],
download_queue: QueueWithContext[Build | StopTask],
results: list[Result],
opts: Options,
Expand Down Expand Up @@ -791,6 +817,7 @@ async def run_builds(
upload_queue.put_nowait(build)
download_queue.put_nowait(build)
cachix_queue.put_nowait(build)
attic_queue.put_nowait(build)


async def run_uploads(
Expand Down Expand Up @@ -842,6 +869,29 @@ async def run_cachix_upload(
)


async def run_attic_upload(
attic_queue: QueueWithContext[Build | StopTask],
results: list[Result],
opts: Options,
) -> int:
while True:
async with attic_queue.get_context() as build:
if isinstance(build, StopTask):
logger.debug("finish attic upload task")
return 0
start_time = timeit.default_timer()
rc = await build.upload_attic(opts)
results.append(
Result(
result_type=ResultType.ATTIC,
attr=build.attr,
success=rc == 0,
duration=start_time - timeit.default_timer(),
error=f"attic upload exited with {rc}" if rc != 0 else None,
)
)


async def run_downloads(
stack: AsyncExitStack,
download_queue: QueueWithContext[Build | StopTask],
Expand Down Expand Up @@ -915,6 +965,7 @@ async def run(stack: AsyncExitStack, opts: Options) -> int:
results: list[Result] = []
build_queue: QueueWithContext[Job | StopTask] = QueueWithContext()
cachix_queue: QueueWithContext[Build | StopTask] = QueueWithContext()
attic_queue: QueueWithContext[Build | StopTask] = QueueWithContext()
upload_queue: QueueWithContext[Build | StopTask] = QueueWithContext()
download_queue: QueueWithContext[Build | StopTask] = QueueWithContext()

Expand All @@ -937,6 +988,7 @@ async def run(stack: AsyncExitStack, opts: Options) -> int:
build_queue,
upload_queue,
cachix_queue,
attic_queue,
download_queue,
results,
opts,
Expand All @@ -961,6 +1013,16 @@ async def run(stack: AsyncExitStack, opts: Options) -> int:
name=f"cachix-{i}",
)
)
tasks.append(
tg.create_task(
run_attic_upload(
attic_queue,
results,
opts,
),
name=f"attic-{i}",
)
)
tasks.append(
tg.create_task(
run_downloads(stack, download_queue, results, opts),
Expand Down Expand Up @@ -993,6 +1055,11 @@ async def run(stack: AsyncExitStack, opts: Options) -> int:
cachix_queue.put_nowait(StopTask())
await cachix_queue.join()

logger.debug("Uploads finished, waiting for attic uploads to finish...")
for _ in range(opts.max_jobs):
attic_queue.put_nowait(StopTask())
await attic_queue.join()

logger.debug("Uploads finished, waiting for downloads to finish...")
for _ in range(opts.max_jobs):
download_queue.put_nowait(StopTask())
Expand Down

0 comments on commit 6101c80

Please sign in to comment.