Add tests for sign/verify commands
authorKirill Isakov <bootctl@gmail.com>
Fri, 27 May 2022 11:22:05 +0000 (17:22 +0600)
committerKirill Isakov <bootctl@gmail.com>
Fri, 27 May 2022 17:56:50 +0000 (23:56 +0600)
test/integration/cmd_fsck.py [moved from test/integration/command_fsck.py with 100% similarity]
test/integration/cmd_sign_verify.py [new file with mode: 0755]
test/integration/meson.build
test/integration/testlib/proc.py

diff --git a/test/integration/cmd_sign_verify.py b/test/integration/cmd_sign_verify.py
new file mode 100755 (executable)
index 0000000..eefa3ee
--- /dev/null
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+
+"""Test sign/verify commands."""
+
+import os
+import tempfile
+
+from testlib import util, cmd, check
+from testlib.proc import Tinc
+from testlib.test import Test
+
+PRIV_KEY = """
+-----BEGIN ED25519 PRIVATE KEY-----
+4Q8bJqfN60s0tOiZdAhAWLgB9+o947cta2WMXmQIz8mCdBdcphzhp23Wt2vUzfQ6
+XHt9+5IqidIw/lLXG61Nbc6IZ+4Fy1XOO1uJ6j4hqIKjdSytD2Vb7MPlNJfPdCDu
+-----END ED25519 PRIVATE KEY-----
+"""
+
+HOST = """
+Ed25519PublicKey = nOSmPehc9ljTtbi+IeoKiyYnkc7gd12OzTZTy3TnwgL
+Port = 17879
+"""
+
+# Do not replace \n or this will break on Windows if cloned with native line endings
+SIGNED_BYTES = """Signature = foo 1653397516 \
+T8Bjg7dc7IjsCrZQC/20qLRsWPlrbthnjyDHQM0BMLoTeAHbLt0fxP5CbTy7Cifgg7P0K179GeahBFsnaIr4MA\n\
+fake testing data\n\
+hello there\n\
+""".encode(
+    "utf-8"
+)
+
+RAW_DATA = tempfile.mktemp()
+
+with open(RAW_DATA, "wb") as raw_file:
+    raw_file.write(util.random_string(64).encode("utf-8"))
+
+
+def init(ctx: Test) -> Tinc:
+    """Initialize a node."""
+
+    foo = ctx.node()
+    stdin = f"init {foo}"
+    foo.cmd(stdin=stdin)
+    return foo
+
+
+def test_sign_errors(foo: Tinc) -> None:
+    """Test `sign` error conditions."""
+
+    _, err = foo.cmd("sign", "foo", "bar", code=1)
+    check.is_in("Too many arguments", err)
+
+    _, err = foo.cmd("sign", "/nonexistent", code=1)
+    check.is_in("Could not open", err)
+
+    os.truncate(foo.sub("ed25519_key.priv"), 0)
+    _, err = foo.cmd("sign", RAW_DATA, code=1)
+    check.is_in("Could not read private key from", err)
+
+    os.remove(foo.sub("ed25519_key.priv"))
+    _, err = foo.cmd("sign", RAW_DATA, code=1)
+    check.is_in("Could not open", err)
+
+
+def test_verify(foo: Tinc) -> None:
+    """Test `verify` of data known to work."""
+
+    signed_file = tempfile.mktemp()
+    with open(signed_file, "wb") as f:
+        f.write(SIGNED_BYTES)
+
+    foo.name = "foo"
+    util.write_text(foo.sub("tinc.conf"), f"Name = {foo}")
+    util.write_text(foo.sub(f"hosts/{foo}"), HOST)
+    util.write_text(foo.sub("ed25519_key.priv"), PRIV_KEY)
+
+    for name in ".", foo.name:
+        foo.cmd("verify", name, stdin=SIGNED_BYTES)
+        foo.cmd("verify", name, signed_file)
+
+    if os.name != "nt":
+        foo.cmd("verify", "*", stdin=SIGNED_BYTES)
+        foo.cmd("verify", "*", signed_file)
+
+    os.remove(signed_file)
+
+
+def test_verify_errors(foo: Tinc) -> None:
+    """Test `verify` error conditions."""
+
+    _, err = foo.cmd("verify", code=1)
+    check.is_in("Not enough arguments", err)
+
+    _, err = foo.cmd("verify", foo.name, "bar", "baz", code=1)
+    check.is_in("Too many arguments", err)
+
+    _, err = foo.cmd("verify", "foo@", code=1)
+    check.is_in("Invalid node name", err)
+
+    _, err = foo.cmd("verify", foo.name, "/nonexistent", code=1)
+    check.is_in("Could not open", err)
+
+    _, err = foo.cmd("verify", foo.name, stdin="", code=1)
+    check.is_in("Invalid input", err)
+
+    _, err = foo.cmd("verify", foo.name, stdin="Signature = foo bar baz", code=1)
+    check.is_in("Invalid input", err)
+
+    sig = (
+        "Signature = dog "
+        "1653395565 "
+        "D25ACFD89jaV9+6g9TNMDTDxH8JGd3wLMv/YNMwXbrj9Bos9q6IW/tuFPxGxYNQ6qAc93XFzkH5u7Gw+Z86GDA\n"
+    )
+    _, err = foo.cmd("verify", foo.name, stdin=sig, code=1)
+    check.is_in(f"Signature is not made by {foo}", err)
+
+    sig = (
+        f"Signature = {foo} "
+        "1653395565 "
+        "D25ACFD89jaV9+6g9TNMDTDxH8JGd3wLMv/YNMwXbrj9Bos9q6IW/tuFPxGxYNQ6qAc93XFzkH5u7Gw+Z86GDA\n"
+    )
+    _, err = foo.cmd("verify", foo.name, stdin=sig, code=1)
+    check.is_in("Invalid signature", err)
+
+    util.write_text(foo.sub(f"hosts/{foo}"), "foobar")
+    _, err = foo.cmd("verify", foo.name, stdin=sig, code=1)
+    check.is_in("Could not read public key from", err)
+
+
+def test_sign_verify(foo: Tinc, bar: Tinc) -> None:
+    """Test `sign` and pass its result to `verify`."""
+
+    signed, _ = foo.cmd("sign", RAW_DATA, stdin=b"")
+    assert isinstance(signed, bytes)
+
+    signed_file = tempfile.mktemp()
+    with open(signed_file, "wb") as f:
+        f.write(signed)
+
+    for name in ".", foo.name:
+        foo.cmd("verify", name, signed_file)
+        foo.cmd("verify", name, stdin=signed)
+
+    if os.name != "nt":
+        foo.cmd("verify", "*", signed_file)
+        foo.cmd("verify", "*", stdin=signed)
+
+    os.remove(signed_file)
+
+    cmd.exchange(foo, bar)
+
+    if os.name != "nt":
+        signed, _ = foo.cmd("sign", RAW_DATA)
+        bar.cmd("verify", "*", stdin=signed)
+
+    signed, _ = bar.cmd("sign", RAW_DATA)
+    foo.cmd("verify", bar.name, stdin=signed)
+
+
+with Test("test errors in `sign`") as context:
+    test_sign_errors(init(context))
+
+with Test("test errors in `verify`") as context:
+    test_verify_errors(init(context))
+
+with Test("test successful `verify`") as context:
+    test_verify(init(context))
+
+with Test("test `sign` and `verify`") as context:
+    test_sign_verify(init(context), init(context))
index cf09cac..4d2bf99 100644 (file)
@@ -1,6 +1,7 @@
 tests = [
   'basic.py',
-  'command_fsck.py',
+  'cmd_fsck.py',
+  'cmd_sign_verify.py',
   'commandline.py',
   'executables.py',
   'import_export.py',
index a0e6c45..d5cbee4 100755 (executable)
@@ -214,12 +214,12 @@ class Tinc:
         return self._port
 
     def cmd(
-        self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[str] = None
+        self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
     ) -> T.Tuple[str, str]:
         """Run command through tinc, writes `stdin` to it (if the argument is not None),
         check its return code (if the argument is not None), and return (stdout, stderr).
         """
-        proc = self.tinc(*args)
+        proc = self.tinc(*args, binary=isinstance(stdin, bytes))
         log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
 
         out, err = proc.communicate(stdin, timeout=60)
@@ -232,7 +232,7 @@ class Tinc:
 
         return out if out else "", err if err else ""
 
-    def tinc(self, *args: str) -> subp.Popen:
+    def tinc(self, *args: str, binary=False) -> subp.Popen:
         """Start tinc with the specified arguments."""
         args = tuple(filter(bool, args))
         cmd = [path.TINC_PATH, *self._common_args, *args]
@@ -244,7 +244,7 @@ class Tinc:
             stdin=subp.PIPE,
             stdout=subp.PIPE,
             stderr=subp.PIPE,
-            encoding="utf-8",
+            encoding=None if binary else "utf-8",
         )
         self._procs.append(proc)
         return proc