cac4d2ba1d751cd4067e9e5eebe005d6c5c6bf3f
[tinc] / test / integration / compression.py
1 #!/usr/bin/env python3
2
3 """Test supported and unsupported compression levels."""
4
5 import os
6 import signal
7 import sys
8 import multiprocessing.connection as mpc
9 import subprocess as subp
10 import time
11 import typing as T
12
13 from testlib import external as ext, cmd, path, check, util
14 from testlib.log import log
15 from testlib.proc import Script, Tinc, Feature
16 from testlib.test import Test
17 from testlib.template import make_netns_config
18
19 IP_FOO = "192.168.1.1"
20 IP_BAR = "192.168.1.2"
21 MASK = 24
22
23 CONTENT = "zHgfHEzRsKPU41rWoTzmcxxxUGvjfOtTZ0ZT2S1GezL7QbAcMGiLa8i6JOgn59Dq5BtlfbZj"
24
25
26 def run_receiver() -> None:
27     """Run server that receives data it prints it to stdout."""
28     with mpc.Listener((IP_FOO, 0), family="AF_INET") as listener:
29         port = listener.address[1]
30         sys.stdout.write(f"{port}\n")
31         sys.stdout.flush()
32
33         with listener.accept() as conn:
34             data = conn.recv()
35             print(data, sep="", flush=True)
36
37
38 def run_sender() -> None:
39     """Start client that reads data from stdin and sends it to server."""
40     port = int(os.environ["PORT"])
41
42     for _ in range(5):
43         try:
44             with mpc.Client((IP_FOO, port)) as client:
45                 client.send(CONTENT)
46             return
47         except OSError as ex:
48             log.warning("could not connect to receiver", exc_info=ex)
49             time.sleep(1)
50
51     log.error("failed to send data, terminating")
52     os.kill(0, signal.SIGTERM)
53
54
55 def get_levels(features: T.Container[Feature]) -> T.Tuple[T.List[int], T.List[int]]:
56     """Get supported compression levels."""
57     log.info("getting supported compression levels")
58
59     levels: T.List[int] = []
60     bogus: T.List[int] = []
61
62     for comp, lvl_min, lvl_max in (
63         (Feature.COMP_ZLIB, 1, 9),
64         (Feature.COMP_LZO, 10, 11),
65         (Feature.COMP_LZ4, 12, 12),
66     ):
67         lvls = range(lvl_min, lvl_max + 1)
68         if comp in features:
69             levels += lvls
70         else:
71             bogus += lvls
72
73     log.info("supported compression levels: %s", levels)
74     log.info("unsupported compression levels: %s", bogus)
75
76     return levels, bogus
77
78
79 def init(ctx: Test) -> T.Tuple[Tinc, Tinc]:
80     """Initialize new test nodes."""
81     foo, bar = ctx.node(addr=IP_FOO), ctx.node(addr=IP_BAR)
82
83     stdin = f"""
84         init {foo}
85         set Port 0
86         set Address {foo.address}
87         set Subnet {foo.address}
88         set Interface {foo}
89         set Address localhost
90     """
91     foo.cmd(stdin=stdin)
92     assert ext.netns_add(foo.name)
93     foo.add_script(Script.TINC_UP, make_netns_config(foo.name, foo.address, MASK))
94
95     stdin = f"""
96         init {bar}
97         set Port 0
98         set Address {bar.address}
99         set Subnet {bar.address}
100         set Interface {bar}
101         set ConnectTo {foo}
102     """
103     bar.cmd(stdin=stdin)
104     assert ext.netns_add(bar.name)
105     bar.add_script(Script.TINC_UP, make_netns_config(bar.name, bar.address, MASK))
106     foo.add_script(Script.SUBNET_UP)
107
108     log.info("start %s and exchange configuration", foo)
109     foo.start()
110     cmd.exchange(foo, bar)
111
112     return foo, bar
113
114
115 def test_valid_level(foo: Tinc, bar: Tinc) -> None:
116     """Test that supported compression level works correctly."""
117     while True:
118         env = foo[Script.SUBNET_UP].wait().env
119         if env.get("SUBNET") == bar.address:
120             break
121
122     log.info("start receiver in netns")
123     with subp.Popen(
124         ["ip", "netns", "exec", foo.name, path.PYTHON_PATH, __file__, "--recv"],
125         stdout=subp.PIPE,
126         encoding="utf-8",
127     ) as receiver:
128         assert receiver.stdout
129         port = receiver.stdout.readline().strip()
130
131         log.info("start sender in netns")
132         with subp.Popen(
133             ["ip", "netns", "exec", bar.name, path.PYTHON_PATH, __file__, "--send"],
134             env={**dict(os.environ), "PORT": port},
135         ):
136             recv = receiver.stdout.read()
137             log.info('received %d bytes: "%s"', len(recv), recv)
138
139     check.equals(0, receiver.wait())
140     check.equals(CONTENT, recv.rstrip())
141
142
143 def test_bogus_level(node: Tinc) -> None:
144     """Test that unsupported compression level fails to start."""
145     tincd = node.tincd()
146     _, stderr = tincd.communicate()
147     check.equals(1, tincd.returncode)
148     check.is_in("Bogus compression level", stderr)
149
150
151 def run_tests() -> None:
152     """Run all tests."""
153     with Test("get supported levels") as ctx:
154         node = ctx.node()
155         levels, bogus = get_levels(node.features)
156
157     with Test("valid levels") as ctx:
158         foo, bar = init(ctx)
159         for level in levels:
160             for node in foo, bar:
161                 node.cmd("set", "Compression", str(level))
162             bar.cmd("start")
163             test_valid_level(foo, bar)
164             bar.cmd("stop")
165
166     with Test("test bogus levels") as ctx:
167         node = ctx.node()
168         for level in bogus:
169             node.cmd("set", "Compression", str(level))
170             test_bogus_level(node)
171
172
173 last = sys.argv[-1]
174
175 if last == "--recv":
176     run_receiver()
177 elif last == "--send":
178     run_sender()
179 else:
180     util.require_root()
181     util.require_command("ip", "netns", "list")
182     util.require_path("/dev/net/tun")
183     run_tests()