aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJack Lloyd <[email protected]>2017-05-19 10:44:48 -0400
committerJack Lloyd <[email protected]>2017-05-19 10:44:48 -0400
commit7873092f84bf61ac932330e0a17449c17897b91b (patch)
treeafa5e6d8f5c9eb74712691a8e3b0fb8457b9049a
parent98e5b30922ada39516a77cd8b7ff87b281ba521e (diff)
parentf53f4db7a231e3d7722b7c9dccf15c777706d1c8 (diff)
Merge GH #359 Add botan cli encryption tool
-rw-r--r--src/cli/encryption.cpp139
-rwxr-xr-xsrc/scripts/ci/travis/build.sh7
-rwxr-xr-xsrc/scripts/cli_tests.py159
-rw-r--r--src/scripts/vecparser.py50
4 files changed, 355 insertions, 0 deletions
diff --git a/src/cli/encryption.cpp b/src/cli/encryption.cpp
new file mode 100644
index 000000000..d3dfdd466
--- /dev/null
+++ b/src/cli/encryption.cpp
@@ -0,0 +1,139 @@
+/*
+* (C) 2015,2017 Simon Warta (Kullo GmbH)
+*
+* Botan is released under the Simplified BSD License (see license.txt)
+*/
+
+#include "cli.h"
+
+#if defined(BOTAN_HAS_AES) && defined(BOTAN_HAS_AEAD_MODES)
+
+#include <botan/aes.h>
+#include <botan/aead.h>
+
+#include <iostream>
+#include <iterator>
+#include <sstream>
+
+using namespace Botan;
+
+namespace Botan_CLI {
+
+namespace {
+
+auto VALID_MODES = std::map<std::string, std::string>{
+ // Don't add algorithms here without extending tests
+ // in `src/scripts/cli_tests.py`
+ { "aes-128-cfb", "AES-128/CFB" },
+ { "aes-192-cfb", "AES-192/CFB" },
+ { "aes-256-cfb", "AES-256/CFB" },
+ { "aes-128-gcm", "AES-128/GCM" },
+ { "aes-192-gcm", "AES-192/GCM" },
+ { "aes-256-gcm", "AES-256/GCM" },
+ { "aes-128-ocb", "AES-128/OCB" },
+ { "aes-128-xts", "AES-128/XTS" },
+ { "aes-256-xts", "AES-256/XTS" },
+};
+
+bool is_aead(const std::string &cipher)
+ {
+ return cipher.find("/GCM") != std::string::npos
+ || cipher.find("/OCB") != std::string::npos;
+ }
+
+secure_vector<byte> do_crypt(const std::string &cipher,
+ const secure_vector<byte> &input,
+ const SymmetricKey &key,
+ const InitializationVector &iv,
+ const OctetString &ad,
+ Cipher_Dir direction)
+ {
+ if (iv.size() == 0) throw std::invalid_argument("IV must not be empty");
+
+ // TODO: implement streaming
+
+ std::shared_ptr<Botan::Cipher_Mode> processor(Botan::get_cipher_mode(cipher, direction));
+ if(!processor) throw std::runtime_error("Cipher algorithm not found");
+
+ // Set key
+ processor->set_key(key);
+
+ // Set associated data
+ if (is_aead(cipher))
+ {
+ auto aead_processor = std::dynamic_pointer_cast<AEAD_Mode>(processor);
+ if(!aead_processor) throw std::runtime_error("Cipher algorithm not could not be converted to AEAD");
+ aead_processor->set_ad(ad.bits_of());
+ }
+
+ // Set IV
+ processor->start(iv.bits_of());
+
+ secure_vector<byte> buf(input.begin(), input.end());
+ processor->finish(buf);
+
+ return buf;
+ }
+
+secure_vector<byte> get_stdin()
+ {
+ secure_vector<byte> out;
+ std::streamsize reserved_size = 1048576; // 1 MiB
+ out.reserve(reserved_size);
+
+ std::istreambuf_iterator<char> iterator(std::cin.rdbuf()); // stdin iterator
+ std::istreambuf_iterator<char> EOS; // end-of-range iterator
+ std::copy(iterator, EOS, std::back_inserter(out));
+ return out;
+ }
+
+void to_stdout(const secure_vector<byte> &data)
+ {
+ std::copy(data.begin(), data.end(), std::ostreambuf_iterator<char>(std::cout));
+ }
+
+}
+
+class Encryption : public Command
+ {
+ public:
+ Encryption() : Command("encryption --decrypt --mode= --key= --iv= --ad=") {}
+
+ void go() override
+ {
+ std::string mode = get_arg_or("mode", "");
+ if (!VALID_MODES.count(mode))
+ {
+ std::ostringstream error;
+ error << "Invalid mode: '" << mode << "'\n"
+ << "valid modes are:";
+ for (auto valid_mode : VALID_MODES) error << " " << valid_mode.first;
+
+ throw CLI_Usage_Error(error.str());
+ }
+
+ std::string key_hex = get_arg("key");
+ std::string iv_hex = get_arg("iv");
+ std::string ad_hex = get_arg_or("ad", "");
+
+ auto input = get_stdin();
+ if (verbose())
+ {
+ std::cerr << "Got " << input.size() << " bytes of input data." << std::endl;
+ }
+
+ auto key = SymmetricKey(key_hex);
+ auto iv = InitializationVector(iv_hex);
+ auto ad = OctetString(ad_hex);
+
+ auto direction = flag_set("decrypt") ? Cipher_Dir::DECRYPTION : Cipher_Dir::ENCRYPTION;
+ auto out = do_crypt(VALID_MODES[mode], input, key, iv, ad, direction);
+ to_stdout(out);
+ }
+ };
+
+BOTAN_REGISTER_COMMAND("encryption", Encryption);
+
+}
+
+#endif
diff --git a/src/scripts/ci/travis/build.sh b/src/scripts/ci/travis/build.sh
index 71d33d1bb..36517738b 100755
--- a/src/scripts/ci/travis/build.sh
+++ b/src/scripts/ci/travis/build.sh
@@ -6,6 +6,7 @@ MAKE_PREFIX=()
TEST_PREFIX=()
TEST_EXE=./botan-test
TEST_FLAGS=()
+CLI_EXE=./botan
CFG_FLAGS=(--prefix=/tmp/botan-installation --cc=$CC --os=$TRAVIS_OS_NAME)
CC_BIN=$CXX
@@ -179,6 +180,12 @@ else
time "${TEST_CMD[@]}"
fi
+if [ "$BUILD_MODE" = "static" ] || [ "$BUILD_MODE" = "shared" ]
+then
+ echo "Running cli tests ..."
+ ./src/scripts/cli_tests.py "$CLI_EXE"
+fi
+
# Run Python tests (need shared libs)
if [ "$BUILD_MODE" = "shared" ] || [ "$BUILD_MODE" = "coverage" ];
then
diff --git a/src/scripts/cli_tests.py b/src/scripts/cli_tests.py
new file mode 100755
index 000000000..757de654d
--- /dev/null
+++ b/src/scripts/cli_tests.py
@@ -0,0 +1,159 @@
+#!/usr/bin/env python3
+
+import binascii
+from collections import OrderedDict
+import unittest
+import argparse
+import re
+import subprocess
+import sys
+
+import vecparser
+
+cli_binary = ""
+
+SUPPORTED_ALGORITHMS = [
+ 'AES-128/CFB',
+ 'AES-192/CFB',
+ 'AES-256/CFB',
+ 'AES-128/GCM',
+ 'AES-192/GCM',
+ 'AES-256/GCM',
+ 'AES-128/OCB',
+ 'AES-128/XTS',
+ 'AES-256/XTS'
+]
+
+def append_ordered(base, additional_elements):
+ for key in additional_elements:
+ value = additional_elements[key]
+ base[key] = value
+
+class TestSequence(unittest.TestCase):
+ pass
+
+def create_test(data):
+ def do_test_expected(self):
+ iv = data['Nonce']
+ key = data['Key']
+ ad = data['AD'] if 'AD' in data else ""
+ plaintext = data['In'].lower()
+ ciphertext = data['Out'].lower()
+ algorithm = data['Algorithm']
+ direction = data['Direction']
+
+ # CFB
+ if algorithm == "AES-128/CFB":
+ mode = "aes-128-cfb"
+ elif algorithm == "AES-192/CFB":
+ mode = "aes-192-cfb"
+ elif algorithm == "AES-256/CFB":
+ mode = "aes-256-cfb"
+ # GCM
+ elif algorithm == "AES-128/GCM":
+ mode = "aes-128-gcm"
+ elif algorithm == "AES-192/GCM":
+ mode = "aes-192-gcm"
+ elif algorithm == "AES-256/GCM":
+ mode = "aes-256-gcm"
+ # OCB
+ elif algorithm == "AES-128/OCB":
+ mode = "aes-128-ocb"
+ # XTS
+ elif algorithm == "AES-128/XTS":
+ mode = "aes-128-xts"
+ elif algorithm == "AES-256/XTS":
+ mode = "aes-256-xts"
+ else: raise Exception("Unknown algorithm: '" + algorithm + "'")
+
+ cmd = [
+ cli_binary,
+ "encryption",
+ "--mode=%s" % mode,
+ "--iv=%s" % iv,
+ "--ad=%s" % ad,
+ "--key=%s" % key]
+ if direction == "decrypt":
+ cmd += ['--decrypt']
+ # out_raw = subprocess.check_output(cmd)
+
+ if direction == "decrypt":
+ invalue = ciphertext
+ else:
+ invalue = plaintext
+
+ #print(cmd)
+
+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ out_raw = p.communicate(input=binascii.unhexlify(invalue))[0]
+ out = binascii.hexlify(out_raw).decode("UTF-8").lower()
+
+ # Renamings
+ if direction == "decrypt":
+ expected = plaintext
+ else:
+ expected = ciphertext
+ actual = out
+ self.assertEqual(expected, actual)
+ return do_test_expected
+
+def get_testdata(document):
+ out = OrderedDict()
+ for algorithm in document:
+ if algorithm in SUPPORTED_ALGORITHMS:
+ testcase_number = 0
+ for testcase in document[algorithm]:
+ testcase_number += 1
+ for direction in ['encrypt', 'decrypt']:
+ testname = "{} no {:0>3} ({})".format(
+ algorithm.lower(), testcase_number, direction)
+ testname = re.sub("[^-a-z0-9-]", "_", testname)
+ testname = re.sub("_+", "_", testname)
+ testname = testname.strip("_")
+ out[testname] = {}
+ for key in testcase:
+ value = testcase[key]
+ out[testname][key] = value
+ out[testname]['Algorithm'] = algorithm
+ out[testname]['Direction'] = direction
+ return out
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description="")
+ parser.add_argument('cli_binary',
+ help='path to the botan cli binary')
+ parser.add_argument('unittest_args', nargs="*")
+ args = parser.parse_args()
+
+ cli_binary = args.cli_binary
+
+ vecfile_cfb = vecparser.VecDocument("src/tests/data/modes/cfb.vec")
+ vecfile_gcm = vecparser.VecDocument("src/tests/data/aead/gcm.vec")
+ vecfile_ocb = vecparser.VecDocument("src/tests/data/aead/ocb.vec")
+ vecfile_xts = vecparser.VecDocument("src/tests/data/modes/xts.vec")
+ #data = vecfile.get_data()
+ #for algo in data:
+ # print(algo)
+ # i = 0
+ # for testcase in data[algo]:
+ # i += 1
+ # print(str(i) + ":", testcase)
+
+ testdata = OrderedDict()
+ append_ordered(testdata, get_testdata(vecfile_cfb.get_data()))
+ append_ordered(testdata, get_testdata(vecfile_gcm.get_data()))
+ append_ordered(testdata, get_testdata(vecfile_ocb.get_data()))
+ append_ordered(testdata, get_testdata(vecfile_xts.get_data()))
+
+ #for testname in testdata:
+ # print(testname)
+ # for key in testdata[testname]:
+ # print(" " + key + ": " + testdata[testname][key])
+ for testname in testdata:
+ test_method = create_test(testdata[testname])
+ test_method.__name__ = 'test_%s' % testname
+ setattr(TestSequence, test_method.__name__, test_method)
+
+ # Hand over sys.argv[0] and unittest_args to the testing framework
+ sys.argv[1:] = args.unittest_args
+ unittest.main()
diff --git a/src/scripts/vecparser.py b/src/scripts/vecparser.py
new file mode 100644
index 000000000..707a58920
--- /dev/null
+++ b/src/scripts/vecparser.py
@@ -0,0 +1,50 @@
+from collections import OrderedDict
+import re
+
+class VecDocument:
+ def __init__(self, filepath):
+ self.data = OrderedDict()
+ last_testcase_number = 1
+ current_testcase_number = 1
+ current_group_name = ""
+ last_group_name = ""
+ current_testcase = {}
+
+ PATTERN_GROUPHEADER = "^\[(.+)\]$"
+ PATTERN_KEYVALUE = "^\s*([a-zA-Z]+)\s*=(.*)$"
+
+ with open(filepath, 'r') as f:
+ # Append one empty line to simplify parsing
+ lines = f.read().splitlines() + ["\n"]
+
+ for line in lines:
+ line = line.strip()
+ if line.startswith("#"):
+ pass # Skip
+ elif line == "":
+ current_testcase_number += 1
+ elif re.match(PATTERN_GROUPHEADER, line):
+ match = re.match(PATTERN_GROUPHEADER, line)
+ current_group_name = match.group(1)
+ elif re.match(PATTERN_KEYVALUE, line):
+ match = re.match(PATTERN_KEYVALUE, line)
+ key = match.group(1)
+ value = match.group(2).strip()
+ current_testcase[key] = value
+
+ if current_testcase_number != last_testcase_number:
+ if not current_group_name in self.data:
+ self.data[current_group_name] = []
+ if len(current_testcase) != 0:
+ self.data[current_group_name].append(current_testcase)
+ current_testcase = {}
+ last_testcase_number = current_testcase_number
+
+ if current_group_name != last_group_name:
+ last_group_name = current_group_name
+ # Reset testcase number
+ last_testcase_number = 1
+ current_testcase_number = 1
+
+ def get_data(self):
+ return self.data