diff options
author | lloyd <[email protected]> | 2013-02-02 23:03:47 +0000 |
---|---|---|
committer | lloyd <[email protected]> | 2013-02-02 23:03:47 +0000 |
commit | 06fc6aa688dcb4d4b1d742c7978c020f94b82e5b (patch) | |
tree | 0ed8d2e6b68d7adbfdce4e2cd04b506cca5be7f8 /src | |
parent | cc0765f2946f7aee146e4df370460a4d06fda3ae (diff) |
Add Threaded_Fork, which acts like a normal Fork filter except that
each subchain of filters will run in its own thread.
Written and contributed by Joel Low.
A thread on botan-devel contains the original patch and some
discussion:
http://lists.randombit.net/pipermail/botan-devel/2013-January/001699.html
Diffstat (limited to 'src')
-rw-r--r-- | src/filters/basefilt.h | 39 | ||||
-rw-r--r-- | src/filters/filter.h | 9 | ||||
-rw-r--r-- | src/filters/info.txt | 1 | ||||
-rw-r--r-- | src/filters/threaded_fork.cpp | 146 | ||||
-rw-r--r-- | src/utils/info.txt | 5 | ||||
-rw-r--r-- | src/utils/semaphore.cpp | 39 | ||||
-rw-r--r-- | src/utils/semaphore.h | 34 |
7 files changed, 271 insertions, 2 deletions
diff --git a/src/filters/basefilt.h b/src/filters/basefilt.h index 5252be973..be5cb7a26 100644 --- a/src/filters/basefilt.h +++ b/src/filters/basefilt.h @@ -1,6 +1,7 @@ /* * Basic Filters * (C) 1999-2007 Jack Lloyd +* (C) 2013 Joel Low * * Distributed under the terms of the Botan license */ @@ -9,6 +10,8 @@ #define BOTAN_BASEFILT_H__ #include <botan/filter.h> +#include <thread> +#include <memory> namespace Botan { @@ -76,6 +79,42 @@ class BOTAN_DLL Fork : public Fanout_Filter Fork(Filter* filter_arr[], size_t length); }; +/** +* This class is a threaded version of the Fork filter. While this uses +* threads, the class itself is NOT thread-safe. This is meant as a drop- +* in replacement for Fork where performance gains are possible. +*/ +class BOTAN_DLL Threaded_Fork : public Fork + { + public: + std::string name() const; + + /** + * Construct a Threaded_Fork filter with up to four forks. + */ + Threaded_Fork(Filter*, Filter*, Filter* = nullptr, Filter* = nullptr); + + /** + * Construct a Threaded_Fork from range of filters + * @param filter_arr the list of filters + * @param length how many filters + */ + Threaded_Fork(Filter* filter_arr[], size_t length); + + ~Threaded_Fork(); + + protected: + void set_next(Filter* f[], size_t n); + void send(const byte in[], size_t length); + + private: + void thread_delegate_work(const byte input[], size_t length); + void thread_entry(Filter* filter); + + std::vector<std::shared_ptr<std::thread>> m_threads; + std::unique_ptr<struct Threaded_Fork_Data> m_thread_data; + }; + } #endif diff --git a/src/filters/filter.h b/src/filters/filter.h index b541332a6..bdf4d11a4 100644 --- a/src/filters/filter.h +++ b/src/filters/filter.h @@ -1,6 +1,7 @@ /* * Filter * (C) 1999-2007 Jack Lloyd +* (C) 2013 Joel Low * * Distributed under the terms of the Botan license */ @@ -56,7 +57,7 @@ class BOTAN_DLL Filter * @param in some input for the filter * @param length the length of in */ - void send(const byte in[], size_t length); + virtual void send(const byte in[], size_t length); /** * @param in some input for the filter @@ -161,6 +162,12 @@ class BOTAN_DLL Fanout_Filter : public Filter void set_next(Filter* f[], size_t n) { Filter::set_next(f, n); } void attach(Filter* f) { Filter::attach(f); } + + private: + friend class Threaded_Fork; + using Filter::write_queue; + using Filter::total_ports; + using Filter::next; }; /** diff --git a/src/filters/info.txt b/src/filters/info.txt index fde5780f4..b5c03aa45 100644 --- a/src/filters/info.txt +++ b/src/filters/info.txt @@ -12,6 +12,7 @@ pipe.cpp pipe_io.cpp pipe_rw.cpp secqueue.cpp +threaded_fork.cpp </source> <header:public> diff --git a/src/filters/threaded_fork.cpp b/src/filters/threaded_fork.cpp new file mode 100644 index 000000000..05166d697 --- /dev/null +++ b/src/filters/threaded_fork.cpp @@ -0,0 +1,146 @@ +/* +* Threaded Fork +* (C) 2013 Joel Low +* 2013 Jack Lloyd +* +* Distributed under the terms of the Botan license +*/ + +#include <botan/basefilt.h> +#include <botan/internal/semaphore.h> + +namespace Botan { + +struct Threaded_Fork_Data + { + /* + * Semaphore for indicating that there is work to be done (or to + * quit) + */ + Semaphore m_input_ready_semaphore; + + /* + * Ensures that all threads have completed processing data. + */ + Semaphore m_input_complete_semaphore; + + /* + * The work that needs to be done. This should be only when the threads + * are NOT running (i.e. before notifying the work condition, after + * the input_complete_semaphore is completely reset.) + */ + const byte* m_input = nullptr; + + /* + * The length of the work that needs to be done. + */ + size_t m_input_length = 0; + }; + +/* +* Threaded_Fork constructor +*/ +Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) : + Fork(nullptr, static_cast<size_t>(0)), + m_thread_data(new Threaded_Fork_Data) + { + Filter* filters[4] = { f1, f2, f3, f4 }; + set_next(filters, 4); + } + +/* +* Threaded_Fork constructor +*/ +Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) : + Fork(nullptr, static_cast<size_t>(0)), + m_thread_data(new Threaded_Fork_Data) + { + set_next(filters, count); + } + +Threaded_Fork::~Threaded_Fork() + { + m_thread_data->m_input = nullptr; + m_thread_data->m_input_length = 0; + + m_thread_data->m_input_ready_semaphore.release(m_threads.size()); + + for(auto& thread : m_threads) + thread->join(); + } + +std::string Threaded_Fork::name() const + { + return "Threaded Fork"; + } + +void Threaded_Fork::set_next(Filter* f[], size_t n) + { + Fork::set_next(f, n); + n = next.size(); + + if(n < m_threads.size()) + m_threads.resize(n); + else + { + m_threads.reserve(n); + for(size_t i = m_threads.size(); i != n; ++i) + { + m_threads.push_back( + std::shared_ptr<std::thread>( + new std::thread( + std::bind(&Threaded_Fork::thread_entry, this, next[i])))); + } + } + } + +void Threaded_Fork::send(const byte input[], size_t length) + { + if(write_queue.size()) + thread_delegate_work(&write_queue[0], write_queue.size()); + thread_delegate_work(input, length); + + bool nothing_attached = true; + for(size_t j = 0; j != total_ports(); ++j) + if(next[j]) + nothing_attached = false; + + if(nothing_attached) + write_queue += std::make_pair(input, length); + else + write_queue.clear(); + } + +void Threaded_Fork::thread_delegate_work(const byte input[], size_t length) + { + //Set the data to do. + m_thread_data->m_input = input; + m_thread_data->m_input_length = length; + + //Let the workers start processing. + m_thread_data->m_input_ready_semaphore.release(total_ports()); + + //Wait for all the filters to finish processing. + for(size_t i = 0; i != total_ports(); ++i) + m_thread_data->m_input_complete_semaphore.acquire(); + + //Reset the thread data + m_thread_data->m_input = nullptr; + m_thread_data->m_input_length = 0; + } + +void Threaded_Fork::thread_entry(Filter* filter) + { + while(true) + { + m_thread_data->m_input_ready_semaphore.acquire(); + + if(!m_thread_data->m_input) + break; + + filter->write(m_thread_data->m_input, m_thread_data->m_input_length); + m_thread_data->m_input_complete_semaphore.release(); + } + } + +} diff --git a/src/utils/info.txt b/src/utils/info.txt index 5bf1b9a54..cc056fc69 100644 --- a/src/utils/info.txt +++ b/src/utils/info.txt @@ -8,6 +8,7 @@ calendar.cpp charset.cpp cpuid.cpp parsing.cpp +semaphore.cpp version.cpp zero_mem.cpp </source> @@ -17,6 +18,7 @@ assert.h bit_ops.h prefetch.h rounding.h +semaphore.h stl_util.h xor_buf.h </header:internal> @@ -27,13 +29,14 @@ calendar.h charset.h cpuid.h exceptn.h +get_byte.h loadstor.h mem_ops.h parsing.h rotate.h +semaphore.h types.h version.h -get_byte.h </header:public> <libs> diff --git a/src/utils/semaphore.cpp b/src/utils/semaphore.cpp new file mode 100644 index 000000000..f4f5b2b53 --- /dev/null +++ b/src/utils/semaphore.cpp @@ -0,0 +1,39 @@ +/* +* Semaphore +* by Pierre Gaston (http://p9as.blogspot.com/2012/06/c11-semaphores.html) +* modified by Joel Low for Botan +* +*/ + +#include <botan/internal/semaphore.h> + +namespace Botan { + +void Semaphore::release(size_t n) + { + for(size_t i = 0; i != n; ++i) + { + std::lock_guard<std::mutex> lock(m_mutex); + + ++m_value; + + if(m_value <= 0) + { + ++m_wakeups; + m_cond.notify_one(); + } + } + } + +void Semaphore::acquire() + { + std::unique_lock<std::mutex> lock(m_mutex); + --m_value; + if(m_value < 0) + { + m_cond.wait(lock, [this] { return m_wakeups > 0; }); + --m_wakeups; + } + } + +} diff --git a/src/utils/semaphore.h b/src/utils/semaphore.h new file mode 100644 index 000000000..c3ce73680 --- /dev/null +++ b/src/utils/semaphore.h @@ -0,0 +1,34 @@ +/* +* Semaphore +* by Pierre Gaston (http://p9as.blogspot.com/2012/06/c11-semaphores.html) +* modified by Joel Low for Botan +* +*/ + +#ifndef BOTAN_SEMAPHORE_H__ +#define BOTAN_SEMAPHORE_H__ + +#include <mutex> +#include <condition_variable> + +namespace Botan { + +class Semaphore + { + public: + Semaphore(int value = 0) : m_value(value), m_wakeups(0) {} + + void acquire(); + + void release(size_t n = 1); + + private: + int m_value; + int m_wakeups; + std::mutex m_mutex; + std::condition_variable m_cond; + }; + +} + +#endif |