Add tests for some device & address variables
[tinc] / test / integration / testlib / proc.py
index ffa0a5f..970c80c 100755 (executable)
@@ -2,6 +2,7 @@
 
 import os
 import random
+import tempfile
 import typing as T
 import subprocess as subp
 from enum import Enum
@@ -16,6 +17,9 @@ from .util import random_string, random_port
 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
 
+# Path to the system temporary directory.
+_TEMPDIR = tempfile.gettempdir()
+
 
 def _make_wd(name: str) -> str:
     work_dir = os.path.join(path.TEST_WD, "data", name)
@@ -49,10 +53,11 @@ class Feature(Enum):
     MINIUPNPC = "miniupnpc"
     OPENSSL = "openssl"
     READLINE = "readline"
-    TUNEMU = "tunemu"
     SANDBOX = "sandbox"
+    TUNEMU = "tunemu"
     UML = "uml"
     VDE = "vde"
+    WATCHDOG = "watchdog"
 
 
 class Tinc:
@@ -63,6 +68,7 @@ class Tinc:
     name: str
     address: str
     _work_dir: str
+    _pid: T.Optional[int]
     _port: T.Optional[int]
     _scripts: T.Dict[str, TincScript]
     _procs: T.List[subp.Popen]
@@ -71,6 +77,7 @@ class Tinc:
         self.name = name if name else random_string(10)
         self.address = addr if addr else _rand_localhost()
         self._work_dir = _make_wd(self.name)
+        os.makedirs(self._work_dir, exist_ok=True)
         self._port = None
         self._scripts = {}
         self._procs = []
@@ -80,19 +87,24 @@ class Tinc:
         self._port = random_port()
         return self._port
 
+    @property
+    def pid_file(self) -> str:
+        """Get the path to the pid file."""
+        return os.path.join(_TEMPDIR, f"tinc_{self.name}")
+
     def read_port(self) -> int:
         """Read port used by tincd from its pidfile and update the _port field."""
-        pidfile = self.sub("pid")
-        log.debug("reading pidfile at %s", pidfile)
+        log.debug("reading pidfile at %s", self.pid_file)
 
-        with open(pidfile, "r", encoding="utf-8") as f:
+        with open(self.pid_file, "r", encoding="utf-8") as f:
             content = f.read()
         log.debug("found data %s", content)
 
-        _, _, _, token, port = content.split()
+        pid, _, _, token, port = content.split()
         check.equals("port", token)
 
         self._port = int(port)
+        self._pid = int(pid)
         return self._port
 
     @property
@@ -101,6 +113,12 @@ class Tinc:
         assert self._port is not None
         return self._port
 
+    @property
+    def pid(self) -> int:
+        """pid of the main tincd process."""
+        assert self._pid is not None
+        return self._pid
+
     def __str__(self) -> str:
         return self.name
 
@@ -140,9 +158,9 @@ class Tinc:
             "--net",
             self.name,
             "--config",
-            self._work_dir,
+            self.work_dir,
             "--pidfile",
-            self.sub("pid"),
+            self.pid_file,
         ]
 
     def sub(self, *paths: str) -> str:
@@ -207,7 +225,7 @@ class Tinc:
             self.add_script(Script.TINC_UP)
 
         tinc_up = self[Script.TINC_UP]
-        self.cmd(*args, "start")
+        self.cmd(*args, "start", "--logfile", self.sub("log"))
         tinc_up.wait()
 
         if new_script:
@@ -219,7 +237,11 @@ class Tinc:
         return self._port
 
     def cmd(
-        self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
+        self,
+        *args: str,
+        code: T.Optional[int] = 0,
+        stdin: T.Optional[T.AnyStr] = None,
+        timeout: T.Optional[int] = None,
     ) -> T.Tuple[str, str]:
         """Run command through tinc, writes `stdin` to it (if the argument is not None),
         check its return code (if the argument is not None), and return (stdout, stderr).
@@ -227,7 +249,7 @@ class Tinc:
         proc = self.tinc(*args, binary=isinstance(stdin, bytes))
         log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
 
-        out, err = proc.communicate(stdin, timeout=60)
+        out, err = proc.communicate(stdin, timeout=60 if timeout is None else timeout)
         res = proc.returncode
         self._procs.remove(proc)
         log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)