From 55c7751b1eee10b5d850a500dd000cbe81d88942 Mon Sep 17 00:00:00 2001 From: Jack Lloyd Date: Mon, 28 Jan 2019 19:05:41 -0500 Subject: Add a thread pool --- src/lib/utils/os_utils.cpp | 30 +++++---- src/lib/utils/os_utils.h | 9 +++ src/lib/utils/thread_utils/info.txt | 3 +- src/lib/utils/thread_utils/thread_pool.cpp | 103 +++++++++++++++++++++++++++++ src/lib/utils/thread_utils/thread_pool.h | 81 +++++++++++++++++++++++ 5 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 src/lib/utils/thread_utils/thread_pool.cpp create mode 100644 src/lib/utils/thread_utils/thread_pool.h (limited to 'src/lib') diff --git a/src/lib/utils/os_utils.cpp b/src/lib/utils/os_utils.cpp index f64b85c18..71f4f12d4 100644 --- a/src/lib/utils/os_utils.cpp +++ b/src/lib/utils/os_utils.cpp @@ -263,20 +263,9 @@ size_t OS::get_memory_locking_limit() * programs), but small enough that we should not cause problems * even if many processes are mlocking on the same machine. */ - size_t mlock_requested = BOTAN_MLOCK_ALLOCATOR_MAX_LOCKED_KB; + const size_t user_req = read_env_variable_sz("BOTAN_MLOCK_POOL_SIZE", BOTAN_MLOCK_ALLOCATOR_MAX_LOCKED_KB); - /* - * Allow override via env variable - */ - if(const char* env = read_env_variable("BOTAN_MLOCK_POOL_SIZE")) - { - try - { - const size_t user_req = std::stoul(env, nullptr); - mlock_requested = std::min(user_req, mlock_requested); - } - catch(std::exception&) { /* ignore it */ } - } + const size_t mlock_requested = std::min(user_req, BOTAN_MLOCK_ALLOCATOR_MAX_LOCKED_KB); if(mlock_requested > 0) { @@ -327,6 +316,21 @@ const char* OS::read_env_variable(const std::string& name) return std::getenv(name.c_str()); } +size_t OS::read_env_variable_sz(const std::string& name, size_t def) + { + if(const char* env = read_env_variable(name)) + { + try + { + const size_t val = std::stoul(env, nullptr); + return val; + } + catch(std::exception&) { /* ignore it */ } + } + + return def; + } + std::vector OS::allocate_locked_pages(size_t count) { std::vector result; diff --git a/src/lib/utils/os_utils.h b/src/lib/utils/os_utils.h index 37a8d3a9c..82f3aad04 100644 --- a/src/lib/utils/os_utils.h +++ b/src/lib/utils/os_utils.h @@ -89,6 +89,15 @@ size_t system_page_size(); */ const char* read_env_variable(const std::string& var_name); +/** +* Read the value of an environment variable and convert it to an +* integer. If not set or conversion fails, returns the default value. +* +* If the process seems to be running in a privileged state (such as setuid) +* then always returns nullptr, similiar to glibc's secure_getenv. +*/ +size_t read_env_variable_sz(const std::string& var_name, size_t def_value = 0); + /** * Request @count pages of RAM which are locked into memory using mlock, * VirtualLock, or some similar OS specific API. Free it with free_locked_pages. diff --git a/src/lib/utils/thread_utils/info.txt b/src/lib/utils/thread_utils/info.txt index 826a9d734..80ce2d389 100644 --- a/src/lib/utils/thread_utils/info.txt +++ b/src/lib/utils/thread_utils/info.txt @@ -1,10 +1,11 @@ -THREAD_UTILS -> 20180112 +THREAD_UTILS -> 20190122 barrier.h semaphore.h +thread_pool.h diff --git a/src/lib/utils/thread_utils/thread_pool.cpp b/src/lib/utils/thread_utils/thread_pool.cpp new file mode 100644 index 000000000..4ccefe8dc --- /dev/null +++ b/src/lib/utils/thread_utils/thread_pool.cpp @@ -0,0 +1,103 @@ +/* +* (C) 2019 Jack Lloyd +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#include +#include +#include +#include + +namespace Botan { + +//static +Thread_Pool& Thread_Pool::global_instance() + { + static Thread_Pool g_thread_pool(OS::read_env_variable_sz("BOTAN_THREAD_POOL_SIZE")); + return g_thread_pool; + } + +Thread_Pool::Thread_Pool(size_t pool_size) + { + if(pool_size == 0) + { + pool_size = std::thread::hardware_concurrency(); + + /* + * For large machines don't create too many threads, unless + * explicitly asked to by the caller. + */ + if(pool_size > 16) + pool_size = 16; + } + + if(pool_size <= 1) + pool_size = 2; + + m_shutdown = false; + + for(size_t i = 0; i != pool_size; ++i) + { + m_workers.push_back(std::thread(&Thread_Pool::worker_thread, this)); + } + } + +void Thread_Pool::shutdown() + { + { + std::unique_lock lock(m_mutex); + + if(m_shutdown == true) + return; + + m_shutdown = true; + + m_more_tasks.notify_all(); + } + + for(auto&& thread : m_workers) + { + thread.join(); + } + m_workers.clear(); + } + +void Thread_Pool::queue_thunk(std::function fn) + { + std::unique_lock lock(m_mutex); + + if(m_shutdown) + throw Invalid_State("Cannot add work after thread pool has shut down"); + + m_tasks.push_back(fn); + m_more_tasks.notify_one(); + } + +void Thread_Pool::worker_thread() + { + for(;;) + { + std::function task; + + { + std::unique_lock lock(m_mutex); + m_more_tasks.wait(lock, [this]{ return m_shutdown || !m_tasks.empty(); }); + + if(m_tasks.empty()) + { + if(m_shutdown) + return; + else + continue; + } + + task = m_tasks.front(); + m_tasks.pop_front(); + } + + task(); + } + } + +} diff --git a/src/lib/utils/thread_utils/thread_pool.h b/src/lib/utils/thread_utils/thread_pool.h new file mode 100644 index 000000000..d48975090 --- /dev/null +++ b/src/lib/utils/thread_utils/thread_pool.h @@ -0,0 +1,81 @@ +/* +* (C) 2019 Jack Lloyd +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#ifndef BOTAN_THREAD_POOL_H_ +#define BOTAN_THREAD_POOL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Botan { + +class BOTAN_TEST_API Thread_Pool + { + public: + /** + * Return an instance to a shared thread pool + */ + static Thread_Pool& global_instance(); + + /** + * Initialize a thread pool with some number of threads + * @param pool_size number of threads in the pool, if 0 + * then some default value is chosen + */ + Thread_Pool(size_t pool_size = 0); + + ~Thread_Pool() { shutdown(); } + + void shutdown(); + + Thread_Pool(const Thread_Pool&) = delete; + Thread_Pool& operator=(const Thread_Pool&) = delete; + + // Does this work? + Thread_Pool(Thread_Pool&&) = default; + Thread_Pool& operator=(Thread_Pool&&) = default; + + /* + * Enqueue some work + */ + void queue_thunk(std::function); + + template + auto run(F&& f, Args&&... args) -> std::future::type> + { + typedef typename std::result_of::type return_type; + + auto future_work = std::bind(std::forward(f), std::forward(args)...); + auto task = std::make_shared>(future_work); + auto future_result = task->get_future(); + queue_thunk([task]() { (*task)(); }); + return future_result; + } + + private: + void worker_thread(); + + // Only touched in constructor and destructor + std::vector m_workers; + + std::mutex m_mutex; + std::condition_variable m_more_tasks; + std::deque> m_tasks; + bool m_shutdown; + }; + +} + +#endif -- cgit v1.2.3