aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Warta <[email protected]>2015-12-12 13:21:43 +0100
committerSimon Warta <[email protected]>2017-04-18 09:37:41 +0200
commitfe6375e0f23757eb65c1ad491ab625afda0ee821 (patch)
treed1511d3429838398a06d6a91273bf77d42f5dfac
parent3553ac7b22620604d19da4881201fb416425d23f (diff)
Add botan encryption cli app
-rw-r--r--src/cli/encryption.cpp120
-rwxr-xr-xsrc/scripts/cli_tests.py115
-rw-r--r--src/scripts/vecparser.py51
3 files changed, 286 insertions, 0 deletions
diff --git a/src/cli/encryption.cpp b/src/cli/encryption.cpp
new file mode 100644
index 000000000..fa9a7e3df
--- /dev/null
+++ b/src/cli/encryption.cpp
@@ -0,0 +1,120 @@
+/*
+* (C) 2015 Simon Warta (Kullo GmbH)
+*
+* Botan is released under the Simplified BSD License (see license.txt)
+*/
+
+#include "apps.h"
+
+#if defined(BOTAN_HAS_AES)
+
+#include <botan/aes.h>
+#include <botan/aead.h>
+
+#include <iostream>
+#include <iterator>
+
+using namespace Botan;
+
+namespace {
+
+auto VALID_MODES = std::map<std::string, std::string>{
+ // Don't add algorithms here without extending tests
+ // in `src/scripts/cli_tests.py`
+ { "aes-128-gcm", "AES-128/GCM" },
+ { "aes-192-gcm", "AES-192/GCM" },
+ { "aes-256-gcm", "AES-256/GCM" },
+};
+
+secure_vector<byte> do_crypt(const std::string &cipher,
+ const secure_vector<byte> &input,
+ const SymmetricKey &key,
+ const InitializationVector &iv,
+ const OctetString &ad,
+ Cipher_Dir direction)
+ {
+ if (iv.size() == 0) throw std::invalid_argument("IV must not be empty");
+
+ // TODO: implement streaming
+
+ std::shared_ptr<Botan::Cipher_Mode> processor(Botan::get_cipher_mode(cipher, direction));
+ if(!processor) throw std::runtime_error("Cipher algorithm not found");
+
+ // Set key
+ processor->set_key(key);
+
+ // Set associated data
+ if (cipher.find("/GCM") != std::string::npos)
+ {
+ auto aead_processor = std::dynamic_pointer_cast<AEAD_Mode>(processor);
+ if(!aead_processor) throw std::runtime_error("Cipher algorithm not could not be converted to AEAD");
+ aead_processor->set_ad(ad.bits_of());
+ }
+
+ // Set IV
+ processor->start(iv.bits_of());
+
+ secure_vector<byte> buf(input.begin(), input.end());
+ processor->finish(buf);
+
+ return buf;
+ }
+
+secure_vector<byte> get_stdin()
+ {
+ secure_vector<byte> out;
+ std::streamsize reserved_size = 1048576; // 1 MiB
+ out.reserve(reserved_size);
+
+ std::istreambuf_iterator<char> iterator(std::cin.rdbuf()); // stdin iterator
+ std::istreambuf_iterator<char> EOS; // end-of-range iterator
+ std::copy(iterator, EOS, std::back_inserter(out));
+ return out;
+ }
+
+void to_stdout(const secure_vector<byte> &data)
+ {
+ std::copy(data.begin(), data.end(), std::ostreambuf_iterator<char>(std::cout));
+ }
+
+int encryption(const std::vector<std::string> &args)
+ {
+ OptionParser opts("debug|decrypt|mode=|key=|iv=|ad=");
+ opts.parse(args);
+
+ std::string mode = opts.value_if_set("mode");
+ if (!VALID_MODES.count(mode))
+ {
+ std::cout << "Invalid mode: '" << mode << "'\n"
+ << "valid modes are:";
+ for (auto valid_mode : VALID_MODES) std::cout << " " << valid_mode.first;
+ std::cout << std::endl;
+ return 1;
+ }
+
+ std::string key_hex = opts.value("key");
+ std::string iv_hex = opts.value("iv");
+ std::string ad_hex = opts.value_or_else("ad", "");
+
+ auto input = get_stdin();
+ if (opts.is_set("debug"))
+ {
+ std::cerr << "Got " << input.size() << " bytes of input data." << std::endl;
+ }
+
+ auto key = SymmetricKey(key_hex);
+ auto iv = InitializationVector(iv_hex);
+ auto ad = OctetString(ad_hex);
+
+ auto direction = opts.is_set("decrypt") ? Cipher_Dir::DECRYPTION : Cipher_Dir::ENCRYPTION;
+ auto out = do_crypt(VALID_MODES[mode], input, key, iv, ad, direction);
+ to_stdout(out);
+
+ return 0;
+ }
+
+}
+
+REGISTER_APP(encryption);
+
+#endif
diff --git a/src/scripts/cli_tests.py b/src/scripts/cli_tests.py
new file mode 100755
index 000000000..b86d37b1b
--- /dev/null
+++ b/src/scripts/cli_tests.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python3
+
+import binascii
+import collections
+import unittest
+import argparse
+import re
+import subprocess
+import vecparser
+import sys
+
+cli_binary = ""
+testdata = {}
+
+class TestSequence(unittest.TestCase):
+ pass
+
+def create_test(data):
+ def do_test_expected(self):
+ iv = data['Nonce']
+ key = data['Key']
+ ad = data['AD'] if 'AD' in data else ""
+ plaintext = data['In'].lower()
+ ciphertext = data['Out'].lower()
+ algorithm = data['Algorithm']
+ direction = data['Direction']
+
+ if algorithm == "AES-128/GCM":
+ mode = "aes-128-gcm"
+ elif algorithm == "AES-192/GCM":
+ mode = "aes-192-gcm"
+ elif algorithm == "AES-256/GCM":
+ mode = "aes-256-gcm"
+ else: raise Exception("Unknown algorithm: '" + algorithm + "'")
+
+ cmd = [
+ cli_binary,
+ "encryption",
+ "--mode=%s" % mode,
+ "--iv=%s" % iv,
+ "--ad=%s" % ad,
+ "--key=%s" % key]
+ if direction == "decrypt":
+ cmd += ['--decrypt']
+ # out_raw = subprocess.check_output(cmd)
+
+ if direction == "decrypt":
+ invalue = ciphertext
+ else:
+ invalue = plaintext
+
+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ out_raw = p.communicate(input=binascii.unhexlify(invalue))[0]
+ out = binascii.hexlify(out_raw).decode("UTF-8").lower()
+
+ # Renamings
+ if direction == "decrypt":
+ expected = plaintext
+ else:
+ expected = ciphertext
+ actual = out
+ self.assertEqual(expected, actual)
+ return do_test_expected
+
+def get_testdata(document):
+ out = collections.OrderedDict()
+ for algorithm in document:
+ if algorithm in ['AES-128/GCM', 'AES-192/GCM', 'AES-256/GCM']:
+ testcase_number = 0
+ for testcase in document[algorithm]:
+ testcase_number += 1
+ for direction in ['encrypt', 'decrypt']:
+ testname = "%s no %d (%s)" % (algorithm.lower(), testcase_number, direction)
+ testname = re.sub("[^a-z0-9\-]", "_", testname)
+ testname = re.sub("_+", "_", testname)
+ testname = testname.strip("_")
+ out[testname] = {}
+ for key in testcase:
+ value = testcase[key]
+ out[testname][key] = value
+ out[testname]['Algorithm'] = algorithm
+ out[testname]['Direction'] = direction
+ return out
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description="")
+ parser.add_argument('cli_binary',
+ help='path to the botan cli binary')
+ parser.add_argument('unittest_args', nargs="*")
+ args = parser.parse_args()
+
+ cli_binary = args.cli_binary
+
+ vecfile = vecparser.VecDocument("src/tests/data/aead/gcm.vec")
+ #data = vecfile.get_data()
+ #for algo in data:
+ # print(algo)
+ # i = 0
+ # for testcase in data[algo]:
+ # i += 1
+ # print(str(i) + ":", testcase)
+
+ testdata = get_testdata(vecfile.get_data())
+ #for testname in testdata:
+ # print(testname)
+ # for key in testdata[testname]:
+ # print(" " + key + ": " + testdata[testname][key])
+ for testname in testdata:
+ test_method = create_test (testdata[testname])
+ test_method.__name__ = 'test_%s' % testname
+ setattr(TestSequence, test_method.__name__, test_method)
+
+ # Hand over sys.argv[0] and unittest_args to the testing framework
+ sys.argv[1:] = args.unittest_args
+ unittest.main()
diff --git a/src/scripts/vecparser.py b/src/scripts/vecparser.py
new file mode 100644
index 000000000..2f59a0ea2
--- /dev/null
+++ b/src/scripts/vecparser.py
@@ -0,0 +1,51 @@
+from collections import OrderedDict
+import re
+
+class VecDocument:
+ data = OrderedDict()
+
+ def __init__(self, filepath):
+ last_testcase_number = 1
+ current_testcase_number = 1
+ current_group_name = ""
+ last_group_name = ""
+ current_testcase = {}
+
+ PATTERN_GROUPHEADER = "^\[(.+)\]$"
+ PATTERN_KEYVALUE = "^\s*([a-zA-Z]+)\s*=(.*)$"
+
+ with open(filepath, 'r') as f:
+ # Append one empty line to simplify parsing
+ lines = f.read().splitlines() + ["\n"]
+
+ for line in lines:
+ line = line.strip()
+ if line.startswith("#"):
+ pass # Skip
+ elif line == "":
+ current_testcase_number += 1
+ elif re.match(PATTERN_GROUPHEADER, line):
+ match = re.match(PATTERN_GROUPHEADER, line)
+ current_group_name = match.group(1)
+ elif re.match(PATTERN_KEYVALUE, line):
+ match = re.match(PATTERN_KEYVALUE, line)
+ key = match.group(1)
+ value = match.group(2).strip()
+ current_testcase[key] = value
+
+ if current_testcase_number != last_testcase_number:
+ if not current_group_name in self.data:
+ self.data[current_group_name] = []
+ if len(current_testcase) != 0:
+ self.data[current_group_name].append(current_testcase)
+ current_testcase = {}
+ last_testcase_number = current_testcase_number
+
+ if current_group_name != last_group_name:
+ last_group_name = current_group_name
+ # Reset testcase number
+ last_testcase_number = 1
+ current_testcase_number = 1
+
+ def get_data(self):
+ return self.data