diff options
author | Jack Lloyd <[email protected]> | 2019-01-28 19:05:41 -0500 |
---|---|---|
committer | Jack Lloyd <[email protected]> | 2019-01-31 11:07:59 -0500 |
commit | 55c7751b1eee10b5d850a500dd000cbe81d88942 (patch) | |
tree | 08c1a77eebad68f3baf74624ed04d85e626d7f91 /src/lib | |
parent | a0837b9a2a21562b4ff289728ed1ca9ca40d20c8 (diff) |
Add a thread pool
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/utils/os_utils.cpp | 30 | ||||
-rw-r--r-- | src/lib/utils/os_utils.h | 9 | ||||
-rw-r--r-- | src/lib/utils/thread_utils/info.txt | 3 | ||||
-rw-r--r-- | src/lib/utils/thread_utils/thread_pool.cpp | 103 | ||||
-rw-r--r-- | src/lib/utils/thread_utils/thread_pool.h | 81 |
5 files changed, 212 insertions, 14 deletions
diff --git a/src/lib/utils/os_utils.cpp b/src/lib/utils/os_utils.cpp index f64b85c18..71f4f12d4 100644 --- a/src/lib/utils/os_utils.cpp +++ b/src/lib/utils/os_utils.cpp @@ -263,20 +263,9 @@ size_t OS::get_memory_locking_limit() * programs), but small enough that we should not cause problems * even if many processes are mlocking on the same machine. */ - size_t mlock_requested = BOTAN_MLOCK_ALLOCATOR_MAX_LOCKED_KB; + const size_t user_req = read_env_variable_sz("BOTAN_MLOCK_POOL_SIZE", BOTAN_MLOCK_ALLOCATOR_MAX_LOCKED_KB); - /* - * Allow override via env variable - */ - if(const char* env = read_env_variable("BOTAN_MLOCK_POOL_SIZE")) - { - try - { - const size_t user_req = std::stoul(env, nullptr); - mlock_requested = std::min(user_req, mlock_requested); - } - catch(std::exception&) { /* ignore it */ } - } + const size_t mlock_requested = std::min<size_t>(user_req, BOTAN_MLOCK_ALLOCATOR_MAX_LOCKED_KB); if(mlock_requested > 0) { @@ -327,6 +316,21 @@ const char* OS::read_env_variable(const std::string& name) return std::getenv(name.c_str()); } +size_t OS::read_env_variable_sz(const std::string& name, size_t def) + { + if(const char* env = read_env_variable(name)) + { + try + { + const size_t val = std::stoul(env, nullptr); + return val; + } + catch(std::exception&) { /* ignore it */ } + } + + return def; + } + std::vector<void*> OS::allocate_locked_pages(size_t count) { std::vector<void*> result; diff --git a/src/lib/utils/os_utils.h b/src/lib/utils/os_utils.h index 37a8d3a9c..82f3aad04 100644 --- a/src/lib/utils/os_utils.h +++ b/src/lib/utils/os_utils.h @@ -90,6 +90,15 @@ size_t system_page_size(); const char* read_env_variable(const std::string& var_name); /** +* Read the value of an environment variable and convert it to an +* integer. If not set or conversion fails, returns the default value. +* +* If the process seems to be running in a privileged state (such as setuid) +* then always returns nullptr, similiar to glibc's secure_getenv. +*/ +size_t read_env_variable_sz(const std::string& var_name, size_t def_value = 0); + +/** * Request @count pages of RAM which are locked into memory using mlock, * VirtualLock, or some similar OS specific API. Free it with free_locked_pages. * diff --git a/src/lib/utils/thread_utils/info.txt b/src/lib/utils/thread_utils/info.txt index 826a9d734..80ce2d389 100644 --- a/src/lib/utils/thread_utils/info.txt +++ b/src/lib/utils/thread_utils/info.txt @@ -1,10 +1,11 @@ <defines> -THREAD_UTILS -> 20180112 +THREAD_UTILS -> 20190122 </defines> <header:internal> barrier.h semaphore.h +thread_pool.h </header:internal> <os_features> diff --git a/src/lib/utils/thread_utils/thread_pool.cpp b/src/lib/utils/thread_utils/thread_pool.cpp new file mode 100644 index 000000000..4ccefe8dc --- /dev/null +++ b/src/lib/utils/thread_utils/thread_pool.cpp @@ -0,0 +1,103 @@ +/* +* (C) 2019 Jack Lloyd +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#include <botan/internal/thread_pool.h> +#include <botan/internal/os_utils.h> +#include <botan/exceptn.h> +#include <thread> + +namespace Botan { + +//static +Thread_Pool& Thread_Pool::global_instance() + { + static Thread_Pool g_thread_pool(OS::read_env_variable_sz("BOTAN_THREAD_POOL_SIZE")); + return g_thread_pool; + } + +Thread_Pool::Thread_Pool(size_t pool_size) + { + if(pool_size == 0) + { + pool_size = std::thread::hardware_concurrency(); + + /* + * For large machines don't create too many threads, unless + * explicitly asked to by the caller. + */ + if(pool_size > 16) + pool_size = 16; + } + + if(pool_size <= 1) + pool_size = 2; + + m_shutdown = false; + + for(size_t i = 0; i != pool_size; ++i) + { + m_workers.push_back(std::thread(&Thread_Pool::worker_thread, this)); + } + } + +void Thread_Pool::shutdown() + { + { + std::unique_lock<std::mutex> lock(m_mutex); + + if(m_shutdown == true) + return; + + m_shutdown = true; + + m_more_tasks.notify_all(); + } + + for(auto&& thread : m_workers) + { + thread.join(); + } + m_workers.clear(); + } + +void Thread_Pool::queue_thunk(std::function<void ()> fn) + { + std::unique_lock<std::mutex> lock(m_mutex); + + if(m_shutdown) + throw Invalid_State("Cannot add work after thread pool has shut down"); + + m_tasks.push_back(fn); + m_more_tasks.notify_one(); + } + +void Thread_Pool::worker_thread() + { + for(;;) + { + std::function<void()> task; + + { + std::unique_lock<std::mutex> lock(m_mutex); + m_more_tasks.wait(lock, [this]{ return m_shutdown || !m_tasks.empty(); }); + + if(m_tasks.empty()) + { + if(m_shutdown) + return; + else + continue; + } + + task = m_tasks.front(); + m_tasks.pop_front(); + } + + task(); + } + } + +} diff --git a/src/lib/utils/thread_utils/thread_pool.h b/src/lib/utils/thread_utils/thread_pool.h new file mode 100644 index 000000000..d48975090 --- /dev/null +++ b/src/lib/utils/thread_utils/thread_pool.h @@ -0,0 +1,81 @@ +/* +* (C) 2019 Jack Lloyd +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#ifndef BOTAN_THREAD_POOL_H_ +#define BOTAN_THREAD_POOL_H_ + +#include <botan/types.h> +#include <functional> +#include <deque> +#include <vector> +#include <memory> +#include <utility> +#include <type_traits> +#include <mutex> +#include <thread> +#include <future> +#include <condition_variable> + +namespace Botan { + +class BOTAN_TEST_API Thread_Pool + { + public: + /** + * Return an instance to a shared thread pool + */ + static Thread_Pool& global_instance(); + + /** + * Initialize a thread pool with some number of threads + * @param pool_size number of threads in the pool, if 0 + * then some default value is chosen + */ + Thread_Pool(size_t pool_size = 0); + + ~Thread_Pool() { shutdown(); } + + void shutdown(); + + Thread_Pool(const Thread_Pool&) = delete; + Thread_Pool& operator=(const Thread_Pool&) = delete; + + // Does this work? + Thread_Pool(Thread_Pool&&) = default; + Thread_Pool& operator=(Thread_Pool&&) = default; + + /* + * Enqueue some work + */ + void queue_thunk(std::function<void ()>); + + template<class F, class... Args> + auto run(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> + { + typedef typename std::result_of<F(Args...)>::type return_type; + + auto future_work = std::bind(std::forward<F>(f), std::forward<Args>(args)...); + auto task = std::make_shared<std::packaged_task<return_type ()>>(future_work); + auto future_result = task->get_future(); + queue_thunk([task]() { (*task)(); }); + return future_result; + } + + private: + void worker_thread(); + + // Only touched in constructor and destructor + std::vector<std::thread> m_workers; + + std::mutex m_mutex; + std::condition_variable m_more_tasks; + std::deque<std::function<void ()>> m_tasks; + bool m_shutdown; + }; + +} + +#endif |