Add tests for some device & address variables
[tinc] / test / integration / device_fd.py
diff --git a/test/integration/device_fd.py b/test/integration/device_fd.py
new file mode 100755 (executable)
index 0000000..0195546
--- /dev/null
@@ -0,0 +1,99 @@
+#!/usr/bin/env python3
+
+"""Test FD device support."""
+
+import array
+import socket
+import tempfile
+import threading
+import time
+
+from testlib import check
+from testlib.log import log
+from testlib.test import Test
+from testlib.proc import Script
+
+JUNK_FRAME = b"\xFF" * 80
+
+
+def start_fd_server(unix: socket.socket, payload: bytes, file_desc: int) -> None:
+    """Start UNIX socket server and then the FD to the first connected client."""
+
+    def send_fd() -> None:
+        conn, _ = unix.accept()
+        with conn:
+            log.info("accepted connection %s", conn)
+            ancillary = array.array("i", [file_desc])
+            conn.sendmsg([payload], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, ancillary)])
+
+    threading.Thread(target=send_fd).start()
+
+
+def test_device_fd(ctx: Test) -> None:
+    """Test some FD device error conditions."""
+
+    foo = ctx.node(init="set DeviceType fd")
+
+    log.info("test with empty Device")
+    _, err = foo.cmd("start", code=1)
+    check.is_in("Could not read device", err)
+
+    log.info("test with too long UNIX socket path")
+    device = "x" * 110
+    _, err = foo.cmd("start", "-o", f"Device={device}", code=1)
+    check.is_in("Unix socket path too long", err)
+
+    foo.cmd("set", "Device", "/dev/null")
+
+    log.info("check that Mode=switch fails")
+    _, err = foo.cmd("start", "-o", "Mode=switch", code=1)
+    check.is_in("Switch mode not supported", err)
+
+    log.info("test with incorrect Device")
+    _, err = foo.cmd("start", code=1)
+    check.is_in("Receiving fd from Unix socket", err)
+    check.is_in("Could not connect to Unix socket", err)
+
+    log.info("test with invalid FD")
+    _, err = foo.cmd("start", "-o", "Device=-1", code=1)
+    check.is_in("Could not open", err)
+
+    log.info("create a UNIX socket to transfer FD")
+    unix = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    unix_path = tempfile.mktemp()
+    unix.bind(unix_path)
+    unix.listen(1)
+
+    foo.cmd("set", "Device", unix_path)
+    myself, him = socket.socketpair(socket.AF_UNIX)
+
+    log.info("start with empty data")
+    start_fd_server(unix, b"", him.fileno())
+    _, err = foo.cmd("start", "-o", f"Device={unix_path}", code=1)
+    check.is_in("Could not read from unix socket", err)
+
+    foo_log = foo.sub("log")
+    foo.add_script(Script.TINC_UP)
+
+    log.info("start with correct amount of data")
+    start_fd_server(unix, b" ", him.fileno())
+
+    log.info("wait for tincd to connect")
+    _, err = foo.cmd("start", "-o", f"Device={unix_path}", "--logfile", foo_log, "-d10")
+    foo[Script.TINC_UP].wait()
+    check.is_in("adapter set up", err)
+
+    log.info("send junk data and make sure tincd receives it")
+    for _ in range(10):
+        myself.send(JUNK_FRAME)
+        time.sleep(0.1)
+
+    foo.add_script(Script.TINC_DOWN)
+    foo.cmd("stop")
+    foo[Script.TINC_DOWN].wait()
+
+    check.in_file(foo_log, "Unknown IP version while reading packet from fd/")
+
+
+with Test("test FD device") as context:
+    test_device_fd(context)