diff options
author | Jack Lloyd <[email protected]> | 2017-05-19 10:44:48 -0400 |
---|---|---|
committer | Jack Lloyd <[email protected]> | 2017-05-19 10:44:48 -0400 |
commit | 7873092f84bf61ac932330e0a17449c17897b91b (patch) | |
tree | afa5e6d8f5c9eb74712691a8e3b0fb8457b9049a | |
parent | 98e5b30922ada39516a77cd8b7ff87b281ba521e (diff) | |
parent | f53f4db7a231e3d7722b7c9dccf15c777706d1c8 (diff) |
Merge GH #359 Add botan cli encryption tool
-rw-r--r-- | src/cli/encryption.cpp | 139 | ||||
-rwxr-xr-x | src/scripts/ci/travis/build.sh | 7 | ||||
-rwxr-xr-x | src/scripts/cli_tests.py | 159 | ||||
-rw-r--r-- | src/scripts/vecparser.py | 50 |
4 files changed, 355 insertions, 0 deletions
diff --git a/src/cli/encryption.cpp b/src/cli/encryption.cpp new file mode 100644 index 000000000..d3dfdd466 --- /dev/null +++ b/src/cli/encryption.cpp @@ -0,0 +1,139 @@ +/* +* (C) 2015,2017 Simon Warta (Kullo GmbH) +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#include "cli.h" + +#if defined(BOTAN_HAS_AES) && defined(BOTAN_HAS_AEAD_MODES) + +#include <botan/aes.h> +#include <botan/aead.h> + +#include <iostream> +#include <iterator> +#include <sstream> + +using namespace Botan; + +namespace Botan_CLI { + +namespace { + +auto VALID_MODES = std::map<std::string, std::string>{ + // Don't add algorithms here without extending tests + // in `src/scripts/cli_tests.py` + { "aes-128-cfb", "AES-128/CFB" }, + { "aes-192-cfb", "AES-192/CFB" }, + { "aes-256-cfb", "AES-256/CFB" }, + { "aes-128-gcm", "AES-128/GCM" }, + { "aes-192-gcm", "AES-192/GCM" }, + { "aes-256-gcm", "AES-256/GCM" }, + { "aes-128-ocb", "AES-128/OCB" }, + { "aes-128-xts", "AES-128/XTS" }, + { "aes-256-xts", "AES-256/XTS" }, +}; + +bool is_aead(const std::string &cipher) + { + return cipher.find("/GCM") != std::string::npos + || cipher.find("/OCB") != std::string::npos; + } + +secure_vector<byte> do_crypt(const std::string &cipher, + const secure_vector<byte> &input, + const SymmetricKey &key, + const InitializationVector &iv, + const OctetString &ad, + Cipher_Dir direction) + { + if (iv.size() == 0) throw std::invalid_argument("IV must not be empty"); + + // TODO: implement streaming + + std::shared_ptr<Botan::Cipher_Mode> processor(Botan::get_cipher_mode(cipher, direction)); + if(!processor) throw std::runtime_error("Cipher algorithm not found"); + + // Set key + processor->set_key(key); + + // Set associated data + if (is_aead(cipher)) + { + auto aead_processor = std::dynamic_pointer_cast<AEAD_Mode>(processor); + if(!aead_processor) throw std::runtime_error("Cipher algorithm not could not be converted to AEAD"); + aead_processor->set_ad(ad.bits_of()); + } + + // Set IV + processor->start(iv.bits_of()); + + secure_vector<byte> buf(input.begin(), input.end()); + processor->finish(buf); + + return buf; + } + +secure_vector<byte> get_stdin() + { + secure_vector<byte> out; + std::streamsize reserved_size = 1048576; // 1 MiB + out.reserve(reserved_size); + + std::istreambuf_iterator<char> iterator(std::cin.rdbuf()); // stdin iterator + std::istreambuf_iterator<char> EOS; // end-of-range iterator + std::copy(iterator, EOS, std::back_inserter(out)); + return out; + } + +void to_stdout(const secure_vector<byte> &data) + { + std::copy(data.begin(), data.end(), std::ostreambuf_iterator<char>(std::cout)); + } + +} + +class Encryption : public Command + { + public: + Encryption() : Command("encryption --decrypt --mode= --key= --iv= --ad=") {} + + void go() override + { + std::string mode = get_arg_or("mode", ""); + if (!VALID_MODES.count(mode)) + { + std::ostringstream error; + error << "Invalid mode: '" << mode << "'\n" + << "valid modes are:"; + for (auto valid_mode : VALID_MODES) error << " " << valid_mode.first; + + throw CLI_Usage_Error(error.str()); + } + + std::string key_hex = get_arg("key"); + std::string iv_hex = get_arg("iv"); + std::string ad_hex = get_arg_or("ad", ""); + + auto input = get_stdin(); + if (verbose()) + { + std::cerr << "Got " << input.size() << " bytes of input data." << std::endl; + } + + auto key = SymmetricKey(key_hex); + auto iv = InitializationVector(iv_hex); + auto ad = OctetString(ad_hex); + + auto direction = flag_set("decrypt") ? Cipher_Dir::DECRYPTION : Cipher_Dir::ENCRYPTION; + auto out = do_crypt(VALID_MODES[mode], input, key, iv, ad, direction); + to_stdout(out); + } + }; + +BOTAN_REGISTER_COMMAND("encryption", Encryption); + +} + +#endif diff --git a/src/scripts/ci/travis/build.sh b/src/scripts/ci/travis/build.sh index 71d33d1bb..36517738b 100755 --- a/src/scripts/ci/travis/build.sh +++ b/src/scripts/ci/travis/build.sh @@ -6,6 +6,7 @@ MAKE_PREFIX=() TEST_PREFIX=() TEST_EXE=./botan-test TEST_FLAGS=() +CLI_EXE=./botan CFG_FLAGS=(--prefix=/tmp/botan-installation --cc=$CC --os=$TRAVIS_OS_NAME) CC_BIN=$CXX @@ -179,6 +180,12 @@ else time "${TEST_CMD[@]}" fi +if [ "$BUILD_MODE" = "static" ] || [ "$BUILD_MODE" = "shared" ] +then + echo "Running cli tests ..." + ./src/scripts/cli_tests.py "$CLI_EXE" +fi + # Run Python tests (need shared libs) if [ "$BUILD_MODE" = "shared" ] || [ "$BUILD_MODE" = "coverage" ]; then diff --git a/src/scripts/cli_tests.py b/src/scripts/cli_tests.py new file mode 100755 index 000000000..757de654d --- /dev/null +++ b/src/scripts/cli_tests.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 + +import binascii +from collections import OrderedDict +import unittest +import argparse +import re +import subprocess +import sys + +import vecparser + +cli_binary = "" + +SUPPORTED_ALGORITHMS = [ + 'AES-128/CFB', + 'AES-192/CFB', + 'AES-256/CFB', + 'AES-128/GCM', + 'AES-192/GCM', + 'AES-256/GCM', + 'AES-128/OCB', + 'AES-128/XTS', + 'AES-256/XTS' +] + +def append_ordered(base, additional_elements): + for key in additional_elements: + value = additional_elements[key] + base[key] = value + +class TestSequence(unittest.TestCase): + pass + +def create_test(data): + def do_test_expected(self): + iv = data['Nonce'] + key = data['Key'] + ad = data['AD'] if 'AD' in data else "" + plaintext = data['In'].lower() + ciphertext = data['Out'].lower() + algorithm = data['Algorithm'] + direction = data['Direction'] + + # CFB + if algorithm == "AES-128/CFB": + mode = "aes-128-cfb" + elif algorithm == "AES-192/CFB": + mode = "aes-192-cfb" + elif algorithm == "AES-256/CFB": + mode = "aes-256-cfb" + # GCM + elif algorithm == "AES-128/GCM": + mode = "aes-128-gcm" + elif algorithm == "AES-192/GCM": + mode = "aes-192-gcm" + elif algorithm == "AES-256/GCM": + mode = "aes-256-gcm" + # OCB + elif algorithm == "AES-128/OCB": + mode = "aes-128-ocb" + # XTS + elif algorithm == "AES-128/XTS": + mode = "aes-128-xts" + elif algorithm == "AES-256/XTS": + mode = "aes-256-xts" + else: raise Exception("Unknown algorithm: '" + algorithm + "'") + + cmd = [ + cli_binary, + "encryption", + "--mode=%s" % mode, + "--iv=%s" % iv, + "--ad=%s" % ad, + "--key=%s" % key] + if direction == "decrypt": + cmd += ['--decrypt'] + # out_raw = subprocess.check_output(cmd) + + if direction == "decrypt": + invalue = ciphertext + else: + invalue = plaintext + + #print(cmd) + + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + out_raw = p.communicate(input=binascii.unhexlify(invalue))[0] + out = binascii.hexlify(out_raw).decode("UTF-8").lower() + + # Renamings + if direction == "decrypt": + expected = plaintext + else: + expected = ciphertext + actual = out + self.assertEqual(expected, actual) + return do_test_expected + +def get_testdata(document): + out = OrderedDict() + for algorithm in document: + if algorithm in SUPPORTED_ALGORITHMS: + testcase_number = 0 + for testcase in document[algorithm]: + testcase_number += 1 + for direction in ['encrypt', 'decrypt']: + testname = "{} no {:0>3} ({})".format( + algorithm.lower(), testcase_number, direction) + testname = re.sub("[^-a-z0-9-]", "_", testname) + testname = re.sub("_+", "_", testname) + testname = testname.strip("_") + out[testname] = {} + for key in testcase: + value = testcase[key] + out[testname][key] = value + out[testname]['Algorithm'] = algorithm + out[testname]['Direction'] = direction + return out + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="") + parser.add_argument('cli_binary', + help='path to the botan cli binary') + parser.add_argument('unittest_args', nargs="*") + args = parser.parse_args() + + cli_binary = args.cli_binary + + vecfile_cfb = vecparser.VecDocument("src/tests/data/modes/cfb.vec") + vecfile_gcm = vecparser.VecDocument("src/tests/data/aead/gcm.vec") + vecfile_ocb = vecparser.VecDocument("src/tests/data/aead/ocb.vec") + vecfile_xts = vecparser.VecDocument("src/tests/data/modes/xts.vec") + #data = vecfile.get_data() + #for algo in data: + # print(algo) + # i = 0 + # for testcase in data[algo]: + # i += 1 + # print(str(i) + ":", testcase) + + testdata = OrderedDict() + append_ordered(testdata, get_testdata(vecfile_cfb.get_data())) + append_ordered(testdata, get_testdata(vecfile_gcm.get_data())) + append_ordered(testdata, get_testdata(vecfile_ocb.get_data())) + append_ordered(testdata, get_testdata(vecfile_xts.get_data())) + + #for testname in testdata: + # print(testname) + # for key in testdata[testname]: + # print(" " + key + ": " + testdata[testname][key]) + for testname in testdata: + test_method = create_test(testdata[testname]) + test_method.__name__ = 'test_%s' % testname + setattr(TestSequence, test_method.__name__, test_method) + + # Hand over sys.argv[0] and unittest_args to the testing framework + sys.argv[1:] = args.unittest_args + unittest.main() diff --git a/src/scripts/vecparser.py b/src/scripts/vecparser.py new file mode 100644 index 000000000..707a58920 --- /dev/null +++ b/src/scripts/vecparser.py @@ -0,0 +1,50 @@ +from collections import OrderedDict +import re + +class VecDocument: + def __init__(self, filepath): + self.data = OrderedDict() + last_testcase_number = 1 + current_testcase_number = 1 + current_group_name = "" + last_group_name = "" + current_testcase = {} + + PATTERN_GROUPHEADER = "^\[(.+)\]$" + PATTERN_KEYVALUE = "^\s*([a-zA-Z]+)\s*=(.*)$" + + with open(filepath, 'r') as f: + # Append one empty line to simplify parsing + lines = f.read().splitlines() + ["\n"] + + for line in lines: + line = line.strip() + if line.startswith("#"): + pass # Skip + elif line == "": + current_testcase_number += 1 + elif re.match(PATTERN_GROUPHEADER, line): + match = re.match(PATTERN_GROUPHEADER, line) + current_group_name = match.group(1) + elif re.match(PATTERN_KEYVALUE, line): + match = re.match(PATTERN_KEYVALUE, line) + key = match.group(1) + value = match.group(2).strip() + current_testcase[key] = value + + if current_testcase_number != last_testcase_number: + if not current_group_name in self.data: + self.data[current_group_name] = [] + if len(current_testcase) != 0: + self.data[current_group_name].append(current_testcase) + current_testcase = {} + last_testcase_number = current_testcase_number + + if current_group_name != last_group_name: + last_group_name = current_group_name + # Reset testcase number + last_testcase_number = 1 + current_testcase_number = 1 + + def get_data(self): + return self.data |