3 """Test systemd integration."""
10 from testlib import check, path
11 from testlib.log import log
12 from testlib.feature import Feature
13 from testlib.const import MAXSOCKETS
14 from testlib.proc import Tinc, Script
15 from testlib.test import Test
18 def tincd_start_socket(foo: Tinc, pass_pid: bool) -> int:
19 """Start tincd as systemd socket activation does it."""
23 env = {**os.environ, "LISTEN_FDS": str(MAXSOCKETS + 1)}
25 env["LISTEN_PID"] = str(os.getpid())
35 assert not os.execve(path.TINCD_PATH, args, env)
39 _, status = os.waitpid(pid, 0)
40 assert os.WIFEXITED(status)
41 return os.WEXITSTATUS(status)
44 def test_listen_fds(foo: Tinc) -> None:
45 """Test systemd socket activation."""
47 foo_log = foo.sub("log")
49 log.info("foreground tincd fails with too high LISTEN_FDS")
50 status = tincd_start_socket(foo, pass_pid=True)
52 check.in_file(foo_log, "Too many listening sockets")
54 foo.add_script(Script.TINC_UP)
55 foo.add_script(Script.TINC_DOWN)
58 log.info("LISTEN_FDS is ignored without LISTEN_PID")
59 status = tincd_start_socket(foo, pass_pid=False)
60 foo[Script.TINC_UP].wait()
62 foo[Script.TINC_DOWN].wait()
64 check.not_in_file(foo_log, "Too many listening sockets")
67 def recv_until(sock: socket.socket, want: bytes) -> None:
68 """Receive from a datagram socket until a specific value is found."""
71 msg = sock.recv(65000)
72 log.info("received %s", msg)
77 def test_watchdog(foo: Tinc) -> None:
78 """Test systemd watchdog."""
80 address = tempfile.mktemp()
81 foo_log = foo.sub("log")
83 log.info("watchdog is disabled if no env vars are passed")
84 foo.cmd("start", "--logfile", foo_log)
86 check.in_file(foo_log, "Watchdog is disabled")
88 log.info("watchdog is enabled by systemd env vars")
89 foo.add_script(Script.TINC_UP)
90 foo.add_script(Script.TINC_DOWN)
92 with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
96 env = {"NOTIFY_SOCKET": address, "WATCHDOG_USEC": str(int(watchdog * 1e6))}
97 proc = foo.tincd("-D", env=env)
98 recv_until(sock, b"READY=1")
101 before = time.monotonic()
102 recv_until(sock, b"WATCHDOG=1")
103 spent = time.monotonic() - before
104 assert spent < watchdog
107 recv_until(sock, b"STOPPING=1")
109 check.success(proc.wait())
112 with Test("socket activation") as context:
113 test_listen_fds(context.node(init=True))
115 with Test("watchdog") as context:
116 node = context.node(init=True)
117 if Feature.WATCHDOG in node.features: