OPENSSL = "openssl"
READLINE = "readline"
TUNEMU = "tunemu"
+ SANDBOX = "sandbox"
UML = "uml"
VDE = "vde"
"""Return path to a subdirectory within the working dir for this node."""
return os.path.join(self._work_dir, *paths)
+ @property
+ def work_dir(self):
+ """Node's working directory."""
+ return self._work_dir
+
@property
def script_up(self) -> str:
"""Name of the hosts/XXX-up script for this node."""
return self._port
def cmd(
- self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[str] = None
+ self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
) -> T.Tuple[str, str]:
"""Run command through tinc, writes `stdin` to it (if the argument is not None),
check its return code (if the argument is not None), and return (stdout, stderr).
"""
- proc = self.tinc(*args)
+ proc = self.tinc(*args, binary=isinstance(stdin, bytes))
log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
out, err = proc.communicate(stdin, timeout=60)
return out if out else "", err if err else ""
- def tinc(self, *args: str) -> subp.Popen:
+ def tinc(self, *args: str, binary=False) -> subp.Popen:
"""Start tinc with the specified arguments."""
args = tuple(filter(bool, args))
cmd = [path.TINC_PATH, *self._common_args, *args]
stdin=subp.PIPE,
stdout=subp.PIPE,
stderr=subp.PIPE,
- encoding="utf-8",
+ encoding=None if binary else "utf-8",
)
self._procs.append(proc)
return proc