diff options
-rw-r--r-- | src/lib/filters/threaded_fork.cpp | 18 | ||||
-rw-r--r-- | src/lib/utils/barrier.cpp | 39 | ||||
-rw-r--r-- | src/lib/utils/barrier.h | 45 | ||||
-rw-r--r-- | src/lib/utils/info.txt | 1 | ||||
-rw-r--r-- | src/tests/test_filters.cpp | 2 |
5 files changed, 93 insertions, 12 deletions
diff --git a/src/lib/filters/threaded_fork.cpp b/src/lib/filters/threaded_fork.cpp index ff54bcbc6..07df590f8 100644 --- a/src/lib/filters/threaded_fork.cpp +++ b/src/lib/filters/threaded_fork.cpp @@ -11,6 +11,7 @@ #if defined(BOTAN_TARGET_OS_HAS_THREADS) #include <botan/internal/semaphore.h> +#include <botan/internal/barrier.h> namespace Botan { @@ -23,14 +24,14 @@ struct Threaded_Fork_Data Semaphore m_input_ready_semaphore; /* - * Ensures that all threads have completed processing data. + * Synchronises all threads to complete processing data in lock-step. */ - Semaphore m_input_complete_semaphore; + Barrier m_input_complete_barrier; /* * The work that needs to be done. This should be only when the threads * are NOT running (i.e. before notifying the work condition, after - * the input_complete_semaphore is completely reset.) + * the input_complete_barrier has reset.) */ const byte* m_input = nullptr; @@ -121,11 +122,11 @@ void Threaded_Fork::thread_delegate_work(const byte input[], size_t length) m_thread_data->m_input_length = length; //Let the workers start processing. + m_thread_data->m_input_complete_barrier.wait(total_ports() + 1); m_thread_data->m_input_ready_semaphore.release(total_ports()); //Wait for all the filters to finish processing. - for(size_t i = 0; i != total_ports(); ++i) - m_thread_data->m_input_complete_semaphore.acquire(); + m_thread_data->m_input_complete_barrier.sync(); //Reset the thread data m_thread_data->m_input = nullptr; @@ -136,18 +137,13 @@ void Threaded_Fork::thread_entry(Filter* filter) { while(true) { - /* - * This is plain wrong: a single thread can get the semaphore - * more than one time, meaning it will process the input twice - * and some other thread/filter will not see this input. - */ m_thread_data->m_input_ready_semaphore.acquire(); if(!m_thread_data->m_input) break; filter->write(m_thread_data->m_input, m_thread_data->m_input_length); - m_thread_data->m_input_complete_semaphore.release(); + m_thread_data->m_input_complete_barrier.sync(); } } diff --git a/src/lib/utils/barrier.cpp b/src/lib/utils/barrier.cpp new file mode 100644 index 000000000..81c578b72 --- /dev/null +++ b/src/lib/utils/barrier.cpp @@ -0,0 +1,39 @@ +/* +* Barrier +* (C) 2016 Joel Low +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#include <botan/internal/barrier.h> + +#if defined(BOTAN_TARGET_OS_HAS_THREADS) + +namespace Botan { + +void Barrier::wait(unsigned delta) + { + lock_guard_type<mutex_type> lock(m_mutex); + m_value += delta; + } + +void Barrier::sync() + { + std::unique_lock<mutex_type> lock(m_mutex); + --m_value; + if(m_value > 0) + { + unsigned current_syncs = m_syncs; + m_cond.wait(lock, [this, ¤t_syncs] { return m_syncs != current_syncs; }); + } + else + { + m_value = 0; + ++m_syncs; + m_cond.notify_all(); + } + } + +} + +#endif diff --git a/src/lib/utils/barrier.h b/src/lib/utils/barrier.h new file mode 100644 index 000000000..6d5cf9e58 --- /dev/null +++ b/src/lib/utils/barrier.h @@ -0,0 +1,45 @@ +/* +* Barrier +* (C) 2016 Joel Low +* +* Botan is released under the Simplified BSD License (see license.txt) +*/ + +#ifndef BOTAN_UTIL_BARRIER_H__ +#define BOTAN_UTIL_BARRIER_H__ + +#include <botan/mutex.h> + +#if defined(BOTAN_TARGET_OS_HAS_THREADS) +#include <condition_variable> +#endif + +namespace Botan { + +#if defined(BOTAN_TARGET_OS_HAS_THREADS) +// Barrier implements a barrier synchronization primitive. wait() will indicate +// how many threads to synchronize; each thread needing synchronization should +// call sync(). When sync() returns, the barrier is reset to zero, and the +// m_syncs counter is incremented. m_syncs is a counter to ensure that wait() +// can be called after a sync() even if the previously sleeping threads have +// not awoken.) +class Barrier + { + public: + explicit Barrier(int value = 0) : m_value(value), m_syncs(0) {} + + void wait(unsigned delta); + + void sync(); + + private: + int m_value; + unsigned m_syncs; + mutex_type m_mutex; + std::condition_variable m_cond; + }; +#endif + +} + +#endif diff --git a/src/lib/utils/info.txt b/src/lib/utils/info.txt index 189b2da1f..75a428a83 100644 --- a/src/lib/utils/info.txt +++ b/src/lib/utils/info.txt @@ -22,6 +22,7 @@ version.h </header:public> <header:internal> +barrier.h bit_ops.h ct_utils.h donna128.h diff --git a/src/tests/test_filters.cpp b/src/tests/test_filters.cpp index d9e9ac23b..6ce83ccba 100644 --- a/src/tests/test_filters.cpp +++ b/src/tests/test_filters.cpp @@ -35,7 +35,7 @@ class Filter_Tests : public Test results.push_back(test_pipe_codec()); results.push_back(test_fork()); -#if defined(BOTAN_TARGET_OS_HAS_THREADS) && 0 +#if defined(BOTAN_TARGET_OS_HAS_THREADS) // Threaded_Fork is broken results.push_back(test_threaded_fork()); #endif |