If multicast is supported in principle, but multicast packets are blocked
by the firewall, then the multicast test will time out and reports an
error. Detect this case and don't fail.
"""Test multicast device."""
"""Test multicast device."""
import socket
import struct
import time
import socket
import struct
import time
-def multicast_works() -> bool:
+class MulticastSupport(enum.Enum):
+ NO = 0
+ BLOCKED = 1
+ YES = 2
+
+
+def multicast_works() -> MulticastSupport:
"""Check if multicast is supported and works."""
msg = b"foobar"
"""Check if multicast is supported and works."""
msg = b"foobar"
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
client.sendto(msg, (MCAST_ADDR, PORT))
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
client.sendto(msg, (MCAST_ADDR, PORT))
- return msg == server.recv(16)
+ try:
+ select.select([server], [], [], 1)
+ if msg == server.recv(16, socket.MSG_DONTWAIT):
+ return MulticastSupport.YES
+ except OSError:
+ pass
+
+ return MulticastSupport.BLOCKED
+ log.info("no")
+ return MulticastSupport.NO
def test_no_mcast_support(foo: Tinc) -> None:
def test_no_mcast_support(foo: Tinc) -> None:
foo.cmd("set", "Device", f"{MCAST_ADDR} {PORT}")
foo.cmd("set", "LogLevel", "10")
foo.cmd("set", "Device", f"{MCAST_ADDR} {PORT}")
foo.cmd("set", "LogLevel", "10")
+ multicast_support = multicast_works()
+ if multicast_support == MulticastSupport.YES:
+ log.info("multicast supported")
+ elif multicast_support == MulticastSupport.BLOCKED:
+ log.info("multicast blocked")
+ pass
+ log.info("multicast not supported")
test_no_mcast_support(foo)
test_no_mcast_support(foo)