import os
import random
+import tempfile
import typing as T
import subprocess as subp
from enum import Enum
# Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
_FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
+# Path to the system temporary directory.
+_TEMPDIR = tempfile.gettempdir()
+
def _make_wd(name: str) -> str:
work_dir = os.path.join(path.TEST_WD, "data", name)
MINIUPNPC = "miniupnpc"
OPENSSL = "openssl"
READLINE = "readline"
- TUNEMU = "tunemu"
SANDBOX = "sandbox"
+ TUNEMU = "tunemu"
UML = "uml"
VDE = "vde"
+ WATCHDOG = "watchdog"
class Tinc:
name: str
address: str
_work_dir: str
+ _pid: T.Optional[int]
_port: T.Optional[int]
_scripts: T.Dict[str, TincScript]
_procs: T.List[subp.Popen]
self.name = name if name else random_string(10)
self.address = addr if addr else _rand_localhost()
self._work_dir = _make_wd(self.name)
+ os.makedirs(self._work_dir, exist_ok=True)
self._port = None
self._scripts = {}
self._procs = []
self._port = random_port()
return self._port
+ @property
+ def pid_file(self) -> str:
+ """Get the path to the pid file."""
+ return os.path.join(_TEMPDIR, f"tinc_{self.name}")
+
def read_port(self) -> int:
"""Read port used by tincd from its pidfile and update the _port field."""
- pidfile = self.sub("pid")
- log.debug("reading pidfile at %s", pidfile)
+ log.debug("reading pidfile at %s", self.pid_file)
- with open(pidfile, "r", encoding="utf-8") as f:
+ with open(self.pid_file, "r", encoding="utf-8") as f:
content = f.read()
log.debug("found data %s", content)
- _, _, _, token, port = content.split()
+ pid, _, _, token, port = content.split()
check.equals("port", token)
self._port = int(port)
+ self._pid = int(pid)
return self._port
@property
assert self._port is not None
return self._port
+ @property
+ def pid(self) -> int:
+ """pid of the main tincd process."""
+ assert self._pid is not None
+ return self._pid
+
def __str__(self) -> str:
return self.name
"--net",
self.name,
"--config",
- self._work_dir,
+ self.work_dir,
"--pidfile",
- self.sub("pid"),
+ self.pid_file,
]
def sub(self, *paths: str) -> str:
self.add_script(Script.TINC_UP)
tinc_up = self[Script.TINC_UP]
- self.cmd(*args, "start")
+ self.cmd(*args, "start", "--logfile", self.sub("log"))
tinc_up.wait()
if new_script:
return self._port
def cmd(
- self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
+ self,
+ *args: str,
+ code: T.Optional[int] = 0,
+ stdin: T.Optional[T.AnyStr] = None,
+ timeout: T.Optional[int] = None,
) -> T.Tuple[str, str]:
"""Run command through tinc, writes `stdin` to it (if the argument is not None),
check its return code (if the argument is not None), and return (stdout, stderr).
proc = self.tinc(*args, binary=isinstance(stdin, bytes))
log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
- out, err = proc.communicate(stdin, timeout=60)
+ out, err = proc.communicate(stdin, timeout=60 if timeout is None else timeout)
res = proc.returncode
self._procs.remove(proc)
log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)