mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2024-11-25 05:03:05 +00:00
c53b40e455
Change-Id: Ie539304c85d0dc54bc1e17892db8bbbfb15b69ef Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4417934 Reviewed-by: Daniel Verkamp <dverkamp@chromium.org> Commit-Queue: Daniel Verkamp <dverkamp@chromium.org> Reviewed-by: Dennis Kempin <denniskempin@google.com>
1180 lines
36 KiB
Python
1180 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright 2022 The ChromiumOS Authors
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""
|
|
Provides helpers for writing shell-like scripts in Python.
|
|
|
|
It provides tools to execute commands with similar flexibility to shell scripts and simplifies
|
|
command line arguments using `argh` and provides common flags (e.g. -v and -vv) for all of
|
|
our command line tools.
|
|
|
|
Refer to the scripts in ./tools for example usage.
|
|
"""
|
|
|
|
# Import preamble before anything else
|
|
from . import preamble # type: ignore
|
|
|
|
import argparse
|
|
import contextlib
|
|
import datetime
|
|
import functools
|
|
import getpass
|
|
import json
|
|
import os
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import urllib
|
|
import urllib.request
|
|
import urllib.error
|
|
from copy import deepcopy
|
|
from math import ceil
|
|
from multiprocessing.pool import ThreadPool
|
|
from pathlib import Path
|
|
from subprocess import DEVNULL, PIPE, STDOUT # type: ignore
|
|
from tempfile import gettempdir
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
List,
|
|
NamedTuple,
|
|
Optional,
|
|
Tuple,
|
|
TypeVar,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
import argh # type: ignore
|
|
import rich
|
|
import rich.console
|
|
import rich.live
|
|
import rich.spinner
|
|
import rich.text
|
|
|
|
# Hack: argh does not support type annotations. This prevents type errors.
|
|
argh: Any # type: ignore
|
|
|
|
# File where to store http headers for gcloud authentication
|
|
AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}"
|
|
|
|
PathLike = Union[Path, str]
|
|
|
|
|
|
def find_crosvm_root():
|
|
"Walk up from CWD until we find the crosvm root dir."
|
|
path = Path("").resolve()
|
|
while True:
|
|
if (path / "tools/impl/common.py").is_file():
|
|
return path
|
|
if path.parent:
|
|
path = path.parent
|
|
else:
|
|
raise Exception("Cannot find crosvm root dir.")
|
|
|
|
|
|
"Root directory of crosvm derived from CWD."
|
|
CROSVM_ROOT = find_crosvm_root()
|
|
|
|
"Cargo.toml file of crosvm"
|
|
CROSVM_TOML = CROSVM_ROOT / "Cargo.toml"
|
|
|
|
"""
|
|
Root directory of crosvm devtools.
|
|
|
|
May be different from `CROSVM_ROOT/tools`, which is allows you to run the crosvm dev
|
|
tools from this directory on another crosvm repo.
|
|
|
|
Use this if you want to call crosvm dev tools, which will use the scripts relative
|
|
to this file.
|
|
"""
|
|
TOOLS_ROOT = Path(__file__).parent.parent.resolve()
|
|
|
|
"Cache directory that is preserved between builds in CI."
|
|
CACHE_DIR = Path(os.environ.get("CROSVM_CACHE_DIR", os.environ.get("TMPDIR", "/tmp")))
|
|
|
|
"Url of crosvm's gerrit review host"
|
|
GERRIT_URL = "https://chromium-review.googlesource.com"
|
|
|
|
# Ensure that we really found the crosvm root directory
|
|
assert 'name = "crosvm"' in CROSVM_TOML.read_text()
|
|
|
|
# List of times recorded by `record_time` which will be printed if --timing-info is provided.
|
|
global_time_records: List[Tuple[str, datetime.timedelta]] = []
|
|
|
|
# Regex that matches ANSI escape sequences
|
|
ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
|
|
|
|
|
|
def crosvm_target_dir():
|
|
crosvm_target = os.environ.get("CROSVM_TARGET_DIR")
|
|
cargo_target = os.environ.get("CARGO_TARGET_DIR")
|
|
if crosvm_target:
|
|
return Path(crosvm_target)
|
|
elif cargo_target:
|
|
return Path(cargo_target) / "crosvm"
|
|
else:
|
|
return CROSVM_ROOT / "target/crosvm"
|
|
|
|
|
|
class CommandResult(NamedTuple):
|
|
"""Results of a command execution as returned by Command.run()"""
|
|
|
|
stdout: str
|
|
stderr: str
|
|
returncode: int
|
|
|
|
|
|
class Command(object):
|
|
"""
|
|
Simplified subprocess handling for shell-like scripts.
|
|
|
|
## Example Usage
|
|
|
|
To run a program on behalf of the user:
|
|
|
|
>> cmd("cargo build").fg()
|
|
|
|
This will run the program with stdio passed to the user. Developer tools usually run a set of
|
|
actions on behalf of the user. These should be executed with fg().
|
|
|
|
To make calls in the background to gather information use success/stdout/lines:
|
|
|
|
>> cmd("git branch").lines()
|
|
>> cmd("git rev-parse foo").success()
|
|
|
|
These will capture all program output. Try to avoid using these to run mutating commands,
|
|
as they will remain hidden to the user even when using --verbose.
|
|
|
|
## Arguments
|
|
|
|
Arguments are provided as a list similar to subprocess.run():
|
|
|
|
>>> Command('cargo', 'build', '--workspace')
|
|
Command('cargo', 'build', '--workspace')
|
|
|
|
In contrast to subprocess.run, all strings are split by whitespaces similar to bash:
|
|
|
|
>>> Command('cargo build --workspace', '--features foo')
|
|
Command('cargo', 'build', '--workspace', '--features', 'foo')
|
|
|
|
In contrast to bash, globs are *not* evaluated, but can easily be provided using Path:
|
|
|
|
>>> Command('ls -l', *Path(CROSVM_ROOT).glob('*.toml'))
|
|
Command('ls', '-l', ...)
|
|
|
|
None or False are ignored to make it easy to include conditional arguments:
|
|
|
|
>>> all = False
|
|
>>> Command('cargo build', '--workspace' if all else None)
|
|
Command('cargo', 'build')
|
|
|
|
## Nesting
|
|
|
|
Commands can be nested, similar to $() subshells in bash. The sub-commands will be executed
|
|
right away and their output will undergo the usual splitting:
|
|
|
|
>>> Command('printf "(%s)"', Command('echo foo bar')).stdout()
|
|
'(foo)(bar)'
|
|
|
|
Arguments can be explicitly quoted to prevent splitting, it applies to both sub-commands
|
|
as well as strings:
|
|
|
|
>>> Command('printf "(%s)"', quoted(Command('echo foo bar'))).stdout()
|
|
'(foo bar)'
|
|
|
|
Commands can also be piped into one another:
|
|
|
|
>>> wc = Command('wc')
|
|
>>> Command('echo "abcd"').pipe(wc('-c')).stdout()
|
|
'5'
|
|
|
|
## Verbosity
|
|
|
|
The --verbose flag is intended for users and will show all command lines executed in the
|
|
foreground with fg(), it'll also include output of programs run with fg(quiet=True). Commands
|
|
executed in the background are not shown.
|
|
|
|
For script developers, the --very-verbose flag will print full details and output of all
|
|
executed command lines, including those run hidden from the user.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*args: Any,
|
|
stdin_cmd: Optional["Command"] = None,
|
|
env_vars: Dict[str, str] = {},
|
|
cwd: Optional[Path] = None,
|
|
):
|
|
self.args = Command.__parse_cmd(args)
|
|
self.stdin_cmd = stdin_cmd
|
|
self.env_vars = env_vars
|
|
self.cwd = cwd
|
|
|
|
### Builder API to construct commands
|
|
|
|
def with_args(self, *args: Any):
|
|
"""Returns a new Command with added arguments.
|
|
|
|
>>> cargo = Command('cargo')
|
|
>>> cargo.with_args('clippy')
|
|
Command('cargo', 'clippy')
|
|
"""
|
|
cmd = deepcopy(self)
|
|
cmd.args = [*self.args, *Command.__parse_cmd(args)]
|
|
return cmd
|
|
|
|
def with_cwd(self, cwd: Optional[Path]):
|
|
"""Changes the working directory the command is executed in.
|
|
|
|
>>> cargo = Command('pwd')
|
|
>>> cargo.with_cwd('/tmp').stdout()
|
|
'/tmp'
|
|
"""
|
|
cmd = deepcopy(self)
|
|
cmd.cwd = cwd
|
|
return cmd
|
|
|
|
def __call__(self, *args: Any):
|
|
"""Shorthand for Command.with_args"""
|
|
return self.with_args(*args)
|
|
|
|
def with_env(self, key: str, value: Optional[str]):
|
|
"""
|
|
Returns a command with an added env variable.
|
|
|
|
The variable is removed if value is None.
|
|
"""
|
|
return self.with_envs({key: value})
|
|
|
|
def with_envs(self, envs: Union[Dict[str, Optional[str]], Dict[str, str]]):
|
|
"""
|
|
Returns a command with an added env variable.
|
|
|
|
The variable is removed if value is None.
|
|
"""
|
|
cmd = deepcopy(self)
|
|
for key, value in envs.items():
|
|
if value is not None:
|
|
cmd.env_vars[key] = value
|
|
else:
|
|
if key in cmd.env_vars:
|
|
del cmd.env_vars[key]
|
|
return cmd
|
|
|
|
def with_path_env(self, new_path: str):
|
|
"""Returns a command with a path added to the PATH variable."""
|
|
path_var = self.env_vars.get("PATH", os.environ.get("PATH", ""))
|
|
return self.with_env("PATH", f"{path_var}:{new_path}")
|
|
|
|
def with_color_arg(
|
|
self,
|
|
always: Optional[str] = None,
|
|
never: Optional[str] = None,
|
|
):
|
|
"""Returns a command with an argument added to pass through enabled/disabled colors."""
|
|
new_cmd = self
|
|
if color_enabled():
|
|
if always:
|
|
new_cmd = new_cmd(always)
|
|
else:
|
|
if never:
|
|
new_cmd = new_cmd(never)
|
|
return new_cmd
|
|
|
|
def with_color_env(self, var_name: str):
|
|
"""Returns a command with an env var added to pass through enabled/disabled colors."""
|
|
return self.with_env(var_name, "1" if color_enabled() else "0")
|
|
|
|
def with_color_flag(self, flag: str = "--color"):
|
|
"""Returns a command with an added --color=always/never/auto flag."""
|
|
return self.with_color_arg(always=f"{flag}=always", never=f"{flag}=never")
|
|
|
|
def foreach(self, arguments: Iterable[Any], batch_size: int = 1):
|
|
"""
|
|
Yields a new command for each entry in `arguments`.
|
|
|
|
The argument is appended to each command and is intended to be used in
|
|
conjunction with `parallel()` to execute a command on a list of arguments in
|
|
parallel.
|
|
|
|
>>> parallel(*cmd('echo').foreach((1, 2, 3))).stdout()
|
|
['1', '2', '3']
|
|
|
|
Arguments can also be batched by setting batch_size > 1, which will append multiple
|
|
arguments to each command.
|
|
|
|
>>> parallel(*cmd('echo').foreach((1, 2, 3), batch_size=2)).stdout()
|
|
['1 2', '3']
|
|
|
|
"""
|
|
for batch in batched(arguments, batch_size):
|
|
yield self(*batch)
|
|
|
|
def pipe(self, *args: Any):
|
|
"""
|
|
Pipes the output of this command into another process.
|
|
|
|
The target can either be another Command or the argument list to build a new command.
|
|
"""
|
|
if len(args) == 1 and isinstance(args[0], Command):
|
|
cmd = Command(stdin_cmd=self)
|
|
cmd.args = args[0].args
|
|
cmd.env_vars = self.env_vars.copy()
|
|
return cmd
|
|
else:
|
|
return Command(*args, stdin_cmd=self, env_vars=self.env_vars)
|
|
|
|
### Executing programs in the foreground
|
|
|
|
def run_foreground(
|
|
self,
|
|
quiet: bool = False,
|
|
check: bool = True,
|
|
dry_run: bool = False,
|
|
style: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
|
|
):
|
|
"""
|
|
Runs a program in the foreground with output streamed to the user.
|
|
|
|
>>> Command('true').fg()
|
|
0
|
|
|
|
Non-zero exit codes will trigger an Exception
|
|
|
|
>>> Command('false').fg()
|
|
Traceback (most recent call last):
|
|
...
|
|
subprocess.CalledProcessError...
|
|
|
|
But can be disabled:
|
|
|
|
>>> Command('false').fg(check=False)
|
|
1
|
|
|
|
Output can be hidden by setting quiet=True:
|
|
|
|
>>> Command("echo foo").fg(quiet=True)
|
|
0
|
|
|
|
This will hide the programs stdout and stderr unless the program fails.
|
|
|
|
More sophisticated means of outputting stdout/err are available via `Styles`:
|
|
|
|
>>> Command("echo foo").fg(style=Styles.live_truncated())
|
|
…
|
|
foo
|
|
0
|
|
|
|
Will output the results of the command but truncate output after a few lines. See `Styles`
|
|
for more options.
|
|
|
|
Arguments:
|
|
quiet: Do not show stdout/stderr unless the program failed.
|
|
check: Raise an exception if the program returned an error code.
|
|
style: A function to present the output of the program. See `Styles`
|
|
|
|
Returns: The return code of the program.
|
|
"""
|
|
if dry_run:
|
|
print(f"Not running: {self}")
|
|
return 0
|
|
|
|
if quiet:
|
|
style = Styles.quiet
|
|
|
|
if verbose():
|
|
print(f"$ {self}")
|
|
|
|
if style is None or verbose():
|
|
return self.__run(stdout=None, stderr=None, check=check).returncode
|
|
else:
|
|
process = self.popen(stdout=PIPE, stderr=STDOUT)
|
|
style(process)
|
|
returncode = process.wait()
|
|
if returncode != 0 and check:
|
|
assert process.stdout
|
|
raise subprocess.CalledProcessError(returncode, process.args)
|
|
return returncode
|
|
|
|
def fg(
|
|
self,
|
|
quiet: bool = False,
|
|
check: bool = True,
|
|
dry_run: bool = False,
|
|
style: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
|
|
):
|
|
"""
|
|
Shorthand for Command.run_foreground()
|
|
"""
|
|
return self.run_foreground(quiet, check, dry_run, style)
|
|
|
|
def write_to(self, filename: Path):
|
|
"""
|
|
Writes stdout to the provided file.
|
|
"""
|
|
if verbose():
|
|
print(f"$ {self} > {filename}")
|
|
with open(filename, "w") as file:
|
|
file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout)
|
|
|
|
def append_to(self, filename: Path):
|
|
"""
|
|
Appends stdout to the provided file.
|
|
"""
|
|
if verbose():
|
|
print(f"$ {self} >> {filename}")
|
|
with open(filename, "a") as file:
|
|
file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout)
|
|
|
|
### API for executing commands hidden from the user
|
|
|
|
def success(self):
|
|
"""
|
|
Returns True if the program succeeded (i.e. returned 0).
|
|
|
|
The program will not be visible to the user unless --very-verbose is specified.
|
|
"""
|
|
if very_verbose():
|
|
print(f"$ {self}")
|
|
return self.__run(stdout=PIPE, stderr=PIPE, check=False).returncode == 0
|
|
|
|
def stdout(self, check: bool = True, stderr: int = PIPE):
|
|
"""
|
|
Runs a program and returns stdout.
|
|
|
|
The program will not be visible to the user unless --very-verbose is specified.
|
|
"""
|
|
if very_verbose():
|
|
print(f"$ {self}")
|
|
return self.__run(stdout=PIPE, stderr=stderr, check=check).stdout.strip()
|
|
|
|
def json(self, check: bool = True) -> Any:
|
|
"""
|
|
Runs a program and returns stdout parsed as json.
|
|
|
|
The program will not be visible to the user unless --very-verbose is specified.
|
|
"""
|
|
stdout = self.stdout(check=check)
|
|
if stdout:
|
|
return json.loads(stdout)
|
|
else:
|
|
return None
|
|
|
|
def lines(self, check: bool = True, stderr: int = PIPE):
|
|
"""
|
|
Runs a program and returns stdout line by line.
|
|
|
|
The program will not be visible to the user unless --very-verbose is specified.
|
|
"""
|
|
return self.stdout(check=check, stderr=stderr).splitlines()
|
|
|
|
### Utilities
|
|
|
|
def __str__(self):
|
|
stdin = ""
|
|
if self.stdin_cmd:
|
|
stdin = str(self.stdin_cmd) + " | "
|
|
return stdin + shlex.join(self.args)
|
|
|
|
def __repr__(self):
|
|
stdin = ""
|
|
if self.stdin_cmd:
|
|
stdin = ", stdin_cmd=" + repr(self.stdin_cmd)
|
|
return f"Command({', '.join(repr(a) for a in self.args)}{stdin})"
|
|
|
|
### Private implementation details
|
|
|
|
def __run(
|
|
self,
|
|
stdout: Optional[int],
|
|
stderr: Optional[int],
|
|
check: bool = True,
|
|
) -> CommandResult:
|
|
"Run this command in subprocess.run()"
|
|
if very_verbose():
|
|
print(f"cwd: {Path().resolve()}")
|
|
for k, v in self.env_vars.items():
|
|
print(f"env: {k}={v}")
|
|
result = subprocess.run(
|
|
self.args,
|
|
cwd=self.cwd,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
stdin=self.__stdin_stream(),
|
|
env={**os.environ, **self.env_vars},
|
|
check=check,
|
|
text=True,
|
|
)
|
|
if very_verbose():
|
|
if result.stdout:
|
|
for line in result.stdout.splitlines():
|
|
print("stdout:", line)
|
|
if result.stderr:
|
|
for line in result.stderr.splitlines():
|
|
print("stderr:", line)
|
|
print("returncode:", result.returncode)
|
|
if check and result.returncode != 0:
|
|
raise subprocess.CalledProcessError(result.returncode, str(self), result.stdout)
|
|
return CommandResult(result.stdout, result.stderr, result.returncode)
|
|
|
|
def __stdin_stream(self):
|
|
if self.stdin_cmd:
|
|
return self.stdin_cmd.popen(stdout=PIPE, stderr=PIPE).stdout
|
|
return None
|
|
|
|
def popen(self, **kwargs: Any) -> "subprocess.Popen[str]":
|
|
"""
|
|
Runs a program and returns the Popen object of the running process.
|
|
"""
|
|
return subprocess.Popen(
|
|
self.args,
|
|
cwd=self.cwd,
|
|
stdin=self.__stdin_stream(),
|
|
env={**os.environ, **self.env_vars},
|
|
text=True,
|
|
**kwargs,
|
|
)
|
|
|
|
@staticmethod
|
|
def __parse_cmd(args: Iterable[Any]) -> List[str]:
|
|
"""Parses command line arguments for Command."""
|
|
res = [parsed for arg in args for parsed in Command.__parse_cmd_args(arg)]
|
|
return res
|
|
|
|
@staticmethod
|
|
def __parse_cmd_args(arg: Any) -> List[str]:
|
|
"""Parses a mixed type command line argument into a list of strings."""
|
|
if isinstance(arg, Path):
|
|
return [str(arg)]
|
|
elif isinstance(arg, QuotedString):
|
|
return [arg.value]
|
|
elif isinstance(arg, Command):
|
|
return [*shlex.split(arg.stdout())]
|
|
elif arg is None or arg is False:
|
|
return []
|
|
else:
|
|
return [*shlex.split(str(arg))]
|
|
|
|
|
|
class Styles(object):
|
|
"A collection of methods that can be passed to `Command.fg(style=)`"
|
|
|
|
@staticmethod
|
|
def quiet(process: "subprocess.Popen[str]"):
|
|
"Won't print anything unless the command failed."
|
|
assert process.stdout
|
|
stdout = process.stdout.read()
|
|
if process.wait() != 0:
|
|
print(stdout, end="")
|
|
|
|
@staticmethod
|
|
def live_truncated(num_lines: int = 8):
|
|
"Prints only the last `num_lines` of output while the program is running and successful."
|
|
|
|
def output(process: "subprocess.Popen[str]"):
|
|
assert process.stdout
|
|
spinner = rich.spinner.Spinner("dots")
|
|
lines: List[rich.text.Text] = []
|
|
stdout: List[str] = []
|
|
with rich.live.Live(refresh_per_second=30, transient=True) as live:
|
|
for line in iter(process.stdout.readline, ""):
|
|
stdout.append(line.strip())
|
|
lines.append(rich.text.Text.from_ansi(line.strip(), no_wrap=True))
|
|
while len(lines) > num_lines:
|
|
lines.pop(0)
|
|
live.update(rich.console.Group(rich.text.Text("…"), *lines, spinner))
|
|
if process.wait() == 0:
|
|
console.print(rich.console.Group(rich.text.Text("…"), *lines))
|
|
else:
|
|
for line in stdout:
|
|
print(line)
|
|
|
|
return output
|
|
|
|
@staticmethod
|
|
def quiet_with_progress(title: str):
|
|
"Prints only the last `num_lines` of output while the program is running and successful."
|
|
|
|
def output(process: "subprocess.Popen[str]"):
|
|
assert process.stdout
|
|
with rich.live.Live(
|
|
rich.spinner.Spinner("dots", title), refresh_per_second=30, transient=True
|
|
):
|
|
stdout = process.stdout.read()
|
|
|
|
if process.wait() == 0:
|
|
console.print(f"[green]OK[/green] {title}")
|
|
else:
|
|
print(stdout)
|
|
console.print(f"[red]ERR[/red] {title}")
|
|
|
|
return output
|
|
|
|
|
|
class ParallelCommands(object):
|
|
"""
|
|
Allows commands to be run in parallel.
|
|
|
|
>>> parallel(cmd('true'), cmd('false')).fg(check=False)
|
|
[0, 1]
|
|
|
|
>>> parallel(cmd('echo a'), cmd('echo b')).stdout()
|
|
['a', 'b']
|
|
"""
|
|
|
|
def __init__(self, *commands: Command):
|
|
self.commands = commands
|
|
|
|
def fg(self, quiet: bool = False, check: bool = True):
|
|
with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool:
|
|
return pool.map(lambda command: command.fg(quiet=quiet, check=check), self.commands)
|
|
|
|
def stdout(self):
|
|
with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool:
|
|
return pool.map(lambda command: command.stdout(), self.commands)
|
|
|
|
def success(self):
|
|
results = self.fg(check=False, quiet=True)
|
|
return all(result == 0 for result in results)
|
|
|
|
|
|
class Remote(object):
|
|
"""
|
|
Wrapper around the cmd() API and allow execution of commands via SSH."
|
|
"""
|
|
|
|
def __init__(self, host: str, opts: Dict[str, str]):
|
|
self.host = host
|
|
ssh_opts = [f"-o{k}={v}" for k, v in opts.items()]
|
|
self.ssh_cmd = cmd("ssh", host, "-T", *ssh_opts)
|
|
self.scp_cmd = cmd("scp", *ssh_opts)
|
|
|
|
def ssh(self, cmd: Command, remote_cwd: Optional[Path] = None):
|
|
# Use huponexit to ensure the process is killed if the connection is lost.
|
|
# Use shlex to properly quote the command.
|
|
wrapped_cmd = f"bash -O huponexit -c {shlex.quote(str(cmd))}"
|
|
if remote_cwd is not None:
|
|
wrapped_cmd = f"cd {remote_cwd} && {wrapped_cmd}"
|
|
# The whole command to pass it to SSH for remote execution.
|
|
return self.ssh_cmd.with_args(quoted(wrapped_cmd))
|
|
|
|
def scp(self, sources: List[Path], target: str, quiet: bool = False):
|
|
return self.scp_cmd.with_args(*sources, f"{self.host}:{target}").fg(quiet=quiet)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def record_time(title: str):
|
|
"""
|
|
Records wall-time of how long this context lasts.
|
|
|
|
The results will be printed at the end of script executation if --timing-info is specified.
|
|
"""
|
|
start_time = datetime.datetime.now()
|
|
try:
|
|
yield
|
|
finally:
|
|
global_time_records.append((title, datetime.datetime.now() - start_time))
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def cwd_context(path: PathLike):
|
|
"""Context for temporarily changing the cwd.
|
|
|
|
>>> with cwd('/tmp'):
|
|
... os.getcwd()
|
|
'/tmp'
|
|
|
|
"""
|
|
cwd = os.getcwd()
|
|
try:
|
|
chdir(path)
|
|
yield
|
|
finally:
|
|
chdir(cwd)
|
|
|
|
|
|
def chdir(path: PathLike):
|
|
if very_verbose():
|
|
print("cd", path)
|
|
os.chdir(path)
|
|
|
|
|
|
class QuotedString(object):
|
|
"""
|
|
Prevents the provided string from being split.
|
|
|
|
Commands will be executed and their stdout is quoted.
|
|
"""
|
|
|
|
def __init__(self, value: Any):
|
|
if isinstance(value, Command):
|
|
self.value = value.stdout()
|
|
else:
|
|
self.value = str(value)
|
|
|
|
def __str__(self):
|
|
return f'"{self.value}"'
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
def batched(source: Iterable[T], max_batch_size: int) -> Iterable[List[T]]:
|
|
"""
|
|
Returns an iterator over batches of elements from source_list.
|
|
|
|
>>> list(batched([1, 2, 3, 4, 5], 2))
|
|
[[1, 2], [3, 4], [5]]
|
|
"""
|
|
source_list = list(source)
|
|
# Calculate batch size that spreads elements evenly across all batches
|
|
batch_count = ceil(len(source_list) / max_batch_size)
|
|
batch_size = ceil(len(source_list) / batch_count)
|
|
for index in range(0, len(source_list), batch_size):
|
|
yield source_list[index : min(index + batch_size, len(source_list))]
|
|
|
|
|
|
# Shorthands
|
|
quoted = QuotedString
|
|
cmd = Command
|
|
cwd = cwd_context
|
|
parallel = ParallelCommands
|
|
|
|
|
|
def run_main(main_fn: Callable[..., Any], usage: Optional[str] = None):
|
|
run_commands(default_fn=main_fn, usage=usage)
|
|
|
|
|
|
def run_commands(
|
|
*functions: Callable[..., Any],
|
|
default_fn: Optional[Callable[..., Any]] = None,
|
|
usage: Optional[str] = None,
|
|
):
|
|
"""
|
|
Allow the user to call the provided functions with command line arguments translated to
|
|
function arguments via argh: https://pythonhosted.org/argh
|
|
"""
|
|
exit_code = 0
|
|
try:
|
|
parser = argparse.ArgumentParser(
|
|
description=usage,
|
|
# Docstrings are used as the description in argparse, preserve their formatting.
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
# Do not allow implied abbreviations. Abbreviations should be manually specified.
|
|
allow_abbrev=False,
|
|
)
|
|
add_common_args(parser)
|
|
|
|
# Add provided commands to parser. Do not use sub-commands if we just got one function.
|
|
if functions:
|
|
argh.add_commands(parser, functions) # type: ignore
|
|
if default_fn:
|
|
argh.set_default_command(parser, default_fn) # type: ignore
|
|
|
|
with record_time("Total Time"):
|
|
# Call main method
|
|
argh.dispatch(parser) # type: ignore
|
|
|
|
except Exception as e:
|
|
if verbose():
|
|
traceback.print_exc()
|
|
else:
|
|
print(e)
|
|
exit_code = 1
|
|
|
|
if parse_common_args().timing_info:
|
|
print_timing_info()
|
|
|
|
sys.exit(exit_code)
|
|
|
|
|
|
def print_timing_info():
|
|
console.print()
|
|
console.print("Timing info:")
|
|
console.print()
|
|
for title, delta in global_time_records:
|
|
console.print(f" {title:20} {delta.total_seconds():.2f}s")
|
|
|
|
|
|
@functools.lru_cache(None)
|
|
def parse_common_args():
|
|
"""
|
|
Parse args common to all scripts
|
|
|
|
These args are parsed separately of the run_main/run_commands method so we can access
|
|
verbose/etc before the commands arguments are parsed.
|
|
"""
|
|
parser = argparse.ArgumentParser(add_help=False)
|
|
add_common_args(parser)
|
|
return parser.parse_known_args()[0]
|
|
|
|
|
|
def add_common_args(parser: argparse.ArgumentParser):
|
|
"These args are added to all commands."
|
|
parser.add_argument(
|
|
"--color",
|
|
default="auto",
|
|
choices=("always", "never", "auto"),
|
|
help="Force enable or disable colors. Defaults to automatic detection.",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
"-v",
|
|
action="store_true",
|
|
default=False,
|
|
help="Print more details about the commands this script is running.",
|
|
)
|
|
parser.add_argument(
|
|
"--very-verbose",
|
|
"-vv",
|
|
action="store_true",
|
|
default=False,
|
|
help="Print more debug output",
|
|
)
|
|
parser.add_argument(
|
|
"--timing-info",
|
|
action="store_true",
|
|
default=False,
|
|
help="Print info on how long which parts of the command take",
|
|
)
|
|
|
|
|
|
def verbose():
|
|
return very_verbose() or parse_common_args().verbose
|
|
|
|
|
|
def very_verbose():
|
|
return parse_common_args().very_verbose
|
|
|
|
|
|
def color_enabled():
|
|
color_arg = parse_common_args().color
|
|
if color_arg == "never":
|
|
return False
|
|
if color_arg == "always":
|
|
return True
|
|
return sys.stdout.isatty()
|
|
|
|
|
|
def all_tracked_files():
|
|
for line in cmd("git ls-files").lines():
|
|
file = Path(line)
|
|
if file.is_file():
|
|
yield file
|
|
|
|
|
|
def find_source_files(extension: str, ignore: List[str] = []):
|
|
for file in all_tracked_files():
|
|
if file.suffix != f".{extension}":
|
|
continue
|
|
if file.is_relative_to("third_party"):
|
|
continue
|
|
if str(file) in ignore:
|
|
continue
|
|
yield file
|
|
|
|
|
|
def find_scripts(path: Path, shebang: str):
|
|
for file in path.glob("*"):
|
|
if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"):
|
|
yield file
|
|
|
|
|
|
def confirm(message: str, default: bool = False):
|
|
print(message, "[y/N]" if default == False else "[Y/n]", end=" ", flush=True)
|
|
response = sys.stdin.readline().strip()
|
|
if response in ("y", "Y"):
|
|
return True
|
|
if response in ("n", "N"):
|
|
return False
|
|
return default
|
|
|
|
|
|
def get_cookie_file():
|
|
path = cmd("git config http.cookiefile").stdout(check=False)
|
|
return Path(path) if path else None
|
|
|
|
|
|
def get_gcloud_access_token():
|
|
if not shutil.which("gcloud"):
|
|
return None
|
|
return cmd("gcloud auth print-access-token").stdout(check=False)
|
|
|
|
|
|
@functools.lru_cache(maxsize=None)
|
|
def curl_with_git_auth():
|
|
"""
|
|
Returns a curl `Command` instance set up to use the same HTTP credentials as git.
|
|
|
|
This currently supports two methods:
|
|
- git cookies (the default)
|
|
- gcloud
|
|
|
|
Most developers will use git cookies, which are passed to curl.
|
|
|
|
glloud for authorization can be enabled in git via `git config credential.helper gcloud.sh`.
|
|
If enabled in git, this command will also return a curl command using a gloud access token.
|
|
"""
|
|
helper = cmd("git config credential.helper").stdout(check=False)
|
|
|
|
if not helper:
|
|
cookie_file = get_cookie_file()
|
|
if not cookie_file or not cookie_file.is_file():
|
|
raise Exception("git http cookiefile is not available.")
|
|
return cmd("curl --cookie", cookie_file)
|
|
|
|
if helper.endswith("gcloud.sh"):
|
|
token = get_gcloud_access_token()
|
|
if not token:
|
|
raise Exception("Cannot get gcloud access token.")
|
|
# File where to store http headers for gcloud authentication
|
|
AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}"
|
|
|
|
# Write token to a header file so it will not appear in logs or error messages.
|
|
AUTH_HEADERS_FILE.write_text(f"Authorization: Bearer {token}")
|
|
return cmd(f"curl -H @{AUTH_HEADERS_FILE}")
|
|
|
|
raise Exception(f"Unsupported git credentials.helper: {helper}")
|
|
|
|
|
|
def strip_xssi(response: str):
|
|
# See https://gerrit-review.googlesource.com/Documentation/rest-api.html#output
|
|
assert response.startswith(")]}'\n")
|
|
return response[5:]
|
|
|
|
|
|
def gerrit_api_get(path: str):
|
|
response = cmd(f"curl --silent --fail {GERRIT_URL}/{path}").stdout()
|
|
return json.loads(strip_xssi(response))
|
|
|
|
|
|
def gerrit_api_post(path: str, body: Any):
|
|
response = curl_with_git_auth()(
|
|
"--silent --fail",
|
|
"-X POST",
|
|
"-H",
|
|
quoted("Content-Type: application/json"),
|
|
"-d",
|
|
quoted(json.dumps(body)),
|
|
f"{GERRIT_URL}/a/{path}",
|
|
).stdout()
|
|
if very_verbose():
|
|
print("Response:", response)
|
|
return json.loads(strip_xssi(response))
|
|
|
|
|
|
class GerritChange(object):
|
|
"""
|
|
Class to interact with the gerrit /changes/ API.
|
|
|
|
For information on the data format returned by the API, see:
|
|
https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-info
|
|
"""
|
|
|
|
id: str
|
|
_data: Any
|
|
|
|
def __init__(self, data: Any):
|
|
self._data = data
|
|
self.id = data["id"]
|
|
|
|
@functools.cached_property
|
|
def _details(self) -> Any:
|
|
return gerrit_api_get(f"changes/{self.id}/detail")
|
|
|
|
@functools.cached_property
|
|
def _messages(self) -> List[Any]:
|
|
return gerrit_api_get(f"changes/{self.id}/messages")
|
|
|
|
@property
|
|
def status(self):
|
|
return cast(str, self._data["status"])
|
|
|
|
def get_votes(self, label_name: str) -> List[int]:
|
|
"Returns the list of votes on `label_name`"
|
|
label_info = self._details.get("labels", {}).get(label_name)
|
|
votes = label_info.get("all", [])
|
|
return [cast(int, v.get("value")) for v in votes]
|
|
|
|
def get_messages_by(self, email: str) -> List[str]:
|
|
"Returns all messages posted by the user with the specified `email`."
|
|
return [m["message"] for m in self._messages if m["author"].get("email") == email]
|
|
|
|
def review(self, message: str, labels: Dict[str, int]):
|
|
"Post review `message` and set the specified review `labels`"
|
|
print("Posting on", self, ":", message, labels)
|
|
gerrit_api_post(
|
|
f"changes/{self.id}/revisions/current/review",
|
|
{"message": message, "labels": labels},
|
|
)
|
|
|
|
def abandon(self, message: str):
|
|
print("Abandoning", self, ":", message)
|
|
gerrit_api_post(f"changes/{self.id}/abandon", {"message": message})
|
|
|
|
@classmethod
|
|
def query(cls, *queries: str):
|
|
"Returns a list of gerrit changes matching the provided list of queries."
|
|
return [cls(c) for c in gerrit_api_get(f"changes/?q={'+'.join(queries)}")]
|
|
|
|
def short_url(self):
|
|
return f"http://crrev.com/c/{self._data['_number']}"
|
|
|
|
def __str__(self):
|
|
return self.short_url()
|
|
|
|
def pretty_info(self):
|
|
return f"{self} - {self._data['subject']}"
|
|
|
|
|
|
def is_cros_repo():
|
|
"Returns true if the crosvm repo is a symlink or worktree to a CrOS repo checkout."
|
|
dot_git = CROSVM_ROOT / ".git"
|
|
if not dot_git.is_symlink() and dot_git.is_dir():
|
|
return False
|
|
return (cros_repo_root() / ".repo").exists()
|
|
|
|
|
|
def cros_repo_root():
|
|
"Root directory of the CrOS repo checkout."
|
|
return (CROSVM_ROOT / "../../..").resolve()
|
|
|
|
|
|
def is_kiwi_repo():
|
|
"Returns true if the crosvm repo contains .kiwi_repo file."
|
|
dot_kiwi_repo = CROSVM_ROOT / ".kiwi_repo"
|
|
return dot_kiwi_repo.exists()
|
|
|
|
|
|
def kiwi_repo_root():
|
|
"Root directory of the kiwi repo checkout."
|
|
return (CROSVM_ROOT / "../..").resolve()
|
|
|
|
|
|
def sudo_is_passwordless():
|
|
# Run with --askpass but no askpass set, succeeds only if passwordless sudo
|
|
# is available.
|
|
(ret, _) = subprocess.getstatusoutput("SUDO_ASKPASS=false sudo --askpass true")
|
|
return ret == 0
|
|
|
|
|
|
SHORTHANDS = {
|
|
"mingw64": "x86_64-pc-windows-gnu",
|
|
"msvc64": "x86_64-pc-windows-msvc",
|
|
"armhf": "armv7-unknown-linux-gnueabihf",
|
|
"aarch64": "aarch64-unknown-linux-gnu",
|
|
"x86_64": "x86_64-unknown-linux-gnu",
|
|
}
|
|
|
|
|
|
class Triple(NamedTuple):
|
|
"""
|
|
Build triple in cargo format.
|
|
|
|
The format is: <arch><sub>-<vendor>-<sys>-<abi>, However, we will treat <arch><sub> as a single
|
|
arch to simplify things.
|
|
"""
|
|
|
|
arch: str
|
|
vendor: str
|
|
sys: Optional[str]
|
|
abi: Optional[str]
|
|
|
|
@classmethod
|
|
def from_shorthand(cls, shorthand: str):
|
|
"These shorthands make it easier to specify triples on the command line."
|
|
if "-" in shorthand:
|
|
triple = shorthand
|
|
elif shorthand in SHORTHANDS:
|
|
triple = SHORTHANDS[shorthand]
|
|
else:
|
|
raise Exception(f"Not a valid build triple shorthand: {shorthand}")
|
|
return cls.from_str(triple)
|
|
|
|
@classmethod
|
|
def from_str(cls, triple: str):
|
|
parts = triple.split("-")
|
|
if len(parts) < 2:
|
|
raise Exception(f"Unsupported triple {triple}")
|
|
return cls(
|
|
parts[0],
|
|
parts[1],
|
|
parts[2] if len(parts) > 2 else None,
|
|
parts[3] if len(parts) > 3 else None,
|
|
)
|
|
|
|
@classmethod
|
|
def from_linux_arch(cls, arch: str):
|
|
"Rough logic to convert the output of `arch` into a corresponding linux build triple."
|
|
if arch == "armhf":
|
|
return cls.from_str("armv7-unknown-linux-gnueabihf")
|
|
else:
|
|
return cls.from_str(f"{arch}-unknown-linux-gnu")
|
|
|
|
@classmethod
|
|
def host_default(cls):
|
|
"Returns the default build triple of the host."
|
|
rustc_info = subprocess.check_output(["rustc", "-vV"], text=True)
|
|
match = re.search(r"host: (\S+)", rustc_info)
|
|
if not match:
|
|
raise Exception(f"Cannot parse rustc info: {rustc_info}")
|
|
return cls.from_str(match.group(1))
|
|
|
|
@property
|
|
def feature_flag(self):
|
|
triple_to_shorthand = {v: k for k, v in SHORTHANDS.items()}
|
|
shorthand = triple_to_shorthand.get(str(self))
|
|
if not shorthand:
|
|
raise Exception(f"No feature set for triple {self}")
|
|
return f"all-{shorthand}"
|
|
|
|
@property
|
|
def target_dir(self):
|
|
return crosvm_target_dir() / str(self)
|
|
|
|
def get_cargo_env(self):
|
|
"""Environment variables to make cargo use the test target."""
|
|
env: Dict[str, str] = {}
|
|
cargo_target = str(self)
|
|
env["CARGO_BUILD_TARGET"] = cargo_target
|
|
env["CARGO_TARGET_DIR"] = str(self.target_dir)
|
|
env["CROSVM_TARGET_DIR"] = str(crosvm_target_dir())
|
|
return env
|
|
|
|
def __str__(self):
|
|
return f"{self.arch}-{self.vendor}-{self.sys}-{self.abi}"
|
|
|
|
|
|
def download_file(url: str, filename: Path, attempts: int = 3):
|
|
assert attempts > 0
|
|
while True:
|
|
attempts -= 1
|
|
try:
|
|
urllib.request.urlretrieve(url, filename)
|
|
return
|
|
except Exception as e:
|
|
if attempts == 0:
|
|
raise e
|
|
else:
|
|
console.print("Download failed:", e)
|
|
|
|
|
|
def strip_ansi_escape_sequences(line: str) -> str:
|
|
return ANSI_ESCAPE.sub("", line)
|
|
|
|
|
|
console = rich.console.Console()
|
|
|
|
if __name__ == "__main__":
|
|
import doctest
|
|
|
|
(failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS)
|
|
sys.exit(1 if failures > 0 else 0)
|