Handle multicast being blocked in the test suite
[tinc] / test / integration / device_multicast.py
1 #!/usr/bin/env python3
2
3 """Test multicast device."""
4
5 import enum
6 import os
7 import select
8 import socket
9 import struct
10 import time
11
12 from testlib import check
13 from testlib.log import log
14 from testlib.proc import Tinc, Script
15 from testlib.test import Test
16
17 MCAST_ADDR = "224.15.98.12"
18 PORT = 38245
19
20
21 class MulticastSupport(enum.Enum):
22     NO = 0
23     BLOCKED = 1
24     YES = 2
25
26
27 def multicast_works() -> MulticastSupport:
28     """Check if multicast is supported and works."""
29
30     msg = b"foobar"
31
32     try:
33         with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server:
34             server.bind((MCAST_ADDR, PORT))
35
36             req = struct.pack("=4sl", socket.inet_aton(MCAST_ADDR), socket.INADDR_ANY)
37             server.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, req)
38
39             with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
40                 client.sendto(msg, (MCAST_ADDR, PORT))
41
42             try:
43                 select.select([server], [], [], 1)
44                 if msg == server.recv(16, socket.MSG_DONTWAIT):
45                     return MulticastSupport.YES
46             except OSError:
47                 pass
48
49             return MulticastSupport.BLOCKED
50     except OSError:
51         log.info("no")
52         return MulticastSupport.NO
53
54
55 def test_no_mcast_support(foo: Tinc) -> None:
56     """Check that startup fails on systems without multicast support."""
57
58     code = foo.tincd("-D").wait()
59     check.failure(code)
60     check.in_file(foo.sub("log"), f"Can't bind to {MCAST_ADDR}")
61
62
63 def test_rx_tx(foo: Tinc) -> None:
64     """Test sending real data to a multicast device."""
65
66     foo.start()
67     packet = os.urandom(137)
68
69     with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) as sock:
70         for _ in range(5):
71             sent = sock.sendto(packet, (MCAST_ADDR, PORT))
72             log.info("sent broken packet (%d)", sent)
73             time.sleep(0.1)
74
75     foo.add_script(Script.TINC_DOWN)
76     foo.cmd("stop")
77     foo[Script.TINC_DOWN].wait()
78
79     check.in_file(foo.sub("log"), "Read packet of 137 bytes from multicast socket")
80
81
82 def test_device_multicast(ctx: Test) -> None:
83     """Test multicast device."""
84
85     foo = ctx.node(init=True)
86     foo.cmd("set", "DeviceType", "multicast")
87
88     log.info("check that multicast does not work without Device")
89     _, err = foo.cmd("start", "-D", code=1)
90     check.is_in("Device variable required for multicast socket", err)
91
92     log.info("check that Device requires a port")
93     foo.cmd("set", "Device", "localhost")
94     _, err = foo.cmd("start", "-D", code=1)
95     check.is_in("Port number required", err)
96
97     log.info("check that multicast receives data")
98     foo.cmd("set", "Device", f"{MCAST_ADDR} {PORT}")
99     foo.cmd("set", "LogLevel", "10")
100
101     multicast_support = multicast_works()
102     if multicast_support == MulticastSupport.YES:
103         log.info("multicast supported")
104         test_rx_tx(foo)
105     elif multicast_support == MulticastSupport.BLOCKED:
106         log.info("multicast blocked")
107         pass
108     else:
109         log.info("multicast not supported")
110         test_no_mcast_support(foo)
111
112
113 with Test("test DeviceType = multicast") as context:
114     test_device_multicast(context)