aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorlloyd <[email protected]>2013-02-02 23:03:47 +0000
committerlloyd <[email protected]>2013-02-02 23:03:47 +0000
commit06fc6aa688dcb4d4b1d742c7978c020f94b82e5b (patch)
tree0ed8d2e6b68d7adbfdce4e2cd04b506cca5be7f8 /src
parentcc0765f2946f7aee146e4df370460a4d06fda3ae (diff)
Add Threaded_Fork, which acts like a normal Fork filter except that
each subchain of filters will run in its own thread. Written and contributed by Joel Low. A thread on botan-devel contains the original patch and some discussion: http://lists.randombit.net/pipermail/botan-devel/2013-January/001699.html
Diffstat (limited to 'src')
-rw-r--r--src/filters/basefilt.h39
-rw-r--r--src/filters/filter.h9
-rw-r--r--src/filters/info.txt1
-rw-r--r--src/filters/threaded_fork.cpp146
-rw-r--r--src/utils/info.txt5
-rw-r--r--src/utils/semaphore.cpp39
-rw-r--r--src/utils/semaphore.h34
7 files changed, 271 insertions, 2 deletions
diff --git a/src/filters/basefilt.h b/src/filters/basefilt.h
index 5252be973..be5cb7a26 100644
--- a/src/filters/basefilt.h
+++ b/src/filters/basefilt.h
@@ -1,6 +1,7 @@
/*
* Basic Filters
* (C) 1999-2007 Jack Lloyd
+* (C) 2013 Joel Low
*
* Distributed under the terms of the Botan license
*/
@@ -9,6 +10,8 @@
#define BOTAN_BASEFILT_H__
#include <botan/filter.h>
+#include <thread>
+#include <memory>
namespace Botan {
@@ -76,6 +79,42 @@ class BOTAN_DLL Fork : public Fanout_Filter
Fork(Filter* filter_arr[], size_t length);
};
+/**
+* This class is a threaded version of the Fork filter. While this uses
+* threads, the class itself is NOT thread-safe. This is meant as a drop-
+* in replacement for Fork where performance gains are possible.
+*/
+class BOTAN_DLL Threaded_Fork : public Fork
+ {
+ public:
+ std::string name() const;
+
+ /**
+ * Construct a Threaded_Fork filter with up to four forks.
+ */
+ Threaded_Fork(Filter*, Filter*, Filter* = nullptr, Filter* = nullptr);
+
+ /**
+ * Construct a Threaded_Fork from range of filters
+ * @param filter_arr the list of filters
+ * @param length how many filters
+ */
+ Threaded_Fork(Filter* filter_arr[], size_t length);
+
+ ~Threaded_Fork();
+
+ protected:
+ void set_next(Filter* f[], size_t n);
+ void send(const byte in[], size_t length);
+
+ private:
+ void thread_delegate_work(const byte input[], size_t length);
+ void thread_entry(Filter* filter);
+
+ std::vector<std::shared_ptr<std::thread>> m_threads;
+ std::unique_ptr<struct Threaded_Fork_Data> m_thread_data;
+ };
+
}
#endif
diff --git a/src/filters/filter.h b/src/filters/filter.h
index b541332a6..bdf4d11a4 100644
--- a/src/filters/filter.h
+++ b/src/filters/filter.h
@@ -1,6 +1,7 @@
/*
* Filter
* (C) 1999-2007 Jack Lloyd
+* (C) 2013 Joel Low
*
* Distributed under the terms of the Botan license
*/
@@ -56,7 +57,7 @@ class BOTAN_DLL Filter
* @param in some input for the filter
* @param length the length of in
*/
- void send(const byte in[], size_t length);
+ virtual void send(const byte in[], size_t length);
/**
* @param in some input for the filter
@@ -161,6 +162,12 @@ class BOTAN_DLL Fanout_Filter : public Filter
void set_next(Filter* f[], size_t n) { Filter::set_next(f, n); }
void attach(Filter* f) { Filter::attach(f); }
+
+ private:
+ friend class Threaded_Fork;
+ using Filter::write_queue;
+ using Filter::total_ports;
+ using Filter::next;
};
/**
diff --git a/src/filters/info.txt b/src/filters/info.txt
index fde5780f4..b5c03aa45 100644
--- a/src/filters/info.txt
+++ b/src/filters/info.txt
@@ -12,6 +12,7 @@ pipe.cpp
pipe_io.cpp
pipe_rw.cpp
secqueue.cpp
+threaded_fork.cpp
</source>
<header:public>
diff --git a/src/filters/threaded_fork.cpp b/src/filters/threaded_fork.cpp
new file mode 100644
index 000000000..05166d697
--- /dev/null
+++ b/src/filters/threaded_fork.cpp
@@ -0,0 +1,146 @@
+/*
+* Threaded Fork
+* (C) 2013 Joel Low
+* 2013 Jack Lloyd
+*
+* Distributed under the terms of the Botan license
+*/
+
+#include <botan/basefilt.h>
+#include <botan/internal/semaphore.h>
+
+namespace Botan {
+
+struct Threaded_Fork_Data
+ {
+ /*
+ * Semaphore for indicating that there is work to be done (or to
+ * quit)
+ */
+ Semaphore m_input_ready_semaphore;
+
+ /*
+ * Ensures that all threads have completed processing data.
+ */
+ Semaphore m_input_complete_semaphore;
+
+ /*
+ * The work that needs to be done. This should be only when the threads
+ * are NOT running (i.e. before notifying the work condition, after
+ * the input_complete_semaphore is completely reset.)
+ */
+ const byte* m_input = nullptr;
+
+ /*
+ * The length of the work that needs to be done.
+ */
+ size_t m_input_length = 0;
+ };
+
+/*
+* Threaded_Fork constructor
+*/
+Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
+ Fork(nullptr, static_cast<size_t>(0)),
+ m_thread_data(new Threaded_Fork_Data)
+ {
+ Filter* filters[4] = { f1, f2, f3, f4 };
+ set_next(filters, 4);
+ }
+
+/*
+* Threaded_Fork constructor
+*/
+Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
+ Fork(nullptr, static_cast<size_t>(0)),
+ m_thread_data(new Threaded_Fork_Data)
+ {
+ set_next(filters, count);
+ }
+
+Threaded_Fork::~Threaded_Fork()
+ {
+ m_thread_data->m_input = nullptr;
+ m_thread_data->m_input_length = 0;
+
+ m_thread_data->m_input_ready_semaphore.release(m_threads.size());
+
+ for(auto& thread : m_threads)
+ thread->join();
+ }
+
+std::string Threaded_Fork::name() const
+ {
+ return "Threaded Fork";
+ }
+
+void Threaded_Fork::set_next(Filter* f[], size_t n)
+ {
+ Fork::set_next(f, n);
+ n = next.size();
+
+ if(n < m_threads.size())
+ m_threads.resize(n);
+ else
+ {
+ m_threads.reserve(n);
+ for(size_t i = m_threads.size(); i != n; ++i)
+ {
+ m_threads.push_back(
+ std::shared_ptr<std::thread>(
+ new std::thread(
+ std::bind(&Threaded_Fork::thread_entry, this, next[i]))));
+ }
+ }
+ }
+
+void Threaded_Fork::send(const byte input[], size_t length)
+ {
+ if(write_queue.size())
+ thread_delegate_work(&write_queue[0], write_queue.size());
+ thread_delegate_work(input, length);
+
+ bool nothing_attached = true;
+ for(size_t j = 0; j != total_ports(); ++j)
+ if(next[j])
+ nothing_attached = false;
+
+ if(nothing_attached)
+ write_queue += std::make_pair(input, length);
+ else
+ write_queue.clear();
+ }
+
+void Threaded_Fork::thread_delegate_work(const byte input[], size_t length)
+ {
+ //Set the data to do.
+ m_thread_data->m_input = input;
+ m_thread_data->m_input_length = length;
+
+ //Let the workers start processing.
+ m_thread_data->m_input_ready_semaphore.release(total_ports());
+
+ //Wait for all the filters to finish processing.
+ for(size_t i = 0; i != total_ports(); ++i)
+ m_thread_data->m_input_complete_semaphore.acquire();
+
+ //Reset the thread data
+ m_thread_data->m_input = nullptr;
+ m_thread_data->m_input_length = 0;
+ }
+
+void Threaded_Fork::thread_entry(Filter* filter)
+ {
+ while(true)
+ {
+ m_thread_data->m_input_ready_semaphore.acquire();
+
+ if(!m_thread_data->m_input)
+ break;
+
+ filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
+ m_thread_data->m_input_complete_semaphore.release();
+ }
+ }
+
+}
diff --git a/src/utils/info.txt b/src/utils/info.txt
index 5bf1b9a54..cc056fc69 100644
--- a/src/utils/info.txt
+++ b/src/utils/info.txt
@@ -8,6 +8,7 @@ calendar.cpp
charset.cpp
cpuid.cpp
parsing.cpp
+semaphore.cpp
version.cpp
zero_mem.cpp
</source>
@@ -17,6 +18,7 @@ assert.h
bit_ops.h
prefetch.h
rounding.h
+semaphore.h
stl_util.h
xor_buf.h
</header:internal>
@@ -27,13 +29,14 @@ calendar.h
charset.h
cpuid.h
exceptn.h
+get_byte.h
loadstor.h
mem_ops.h
parsing.h
rotate.h
+semaphore.h
types.h
version.h
-get_byte.h
</header:public>
<libs>
diff --git a/src/utils/semaphore.cpp b/src/utils/semaphore.cpp
new file mode 100644
index 000000000..f4f5b2b53
--- /dev/null
+++ b/src/utils/semaphore.cpp
@@ -0,0 +1,39 @@
+/*
+* Semaphore
+* by Pierre Gaston (http://p9as.blogspot.com/2012/06/c11-semaphores.html)
+* modified by Joel Low for Botan
+*
+*/
+
+#include <botan/internal/semaphore.h>
+
+namespace Botan {
+
+void Semaphore::release(size_t n)
+ {
+ for(size_t i = 0; i != n; ++i)
+ {
+ std::lock_guard<std::mutex> lock(m_mutex);
+
+ ++m_value;
+
+ if(m_value <= 0)
+ {
+ ++m_wakeups;
+ m_cond.notify_one();
+ }
+ }
+ }
+
+void Semaphore::acquire()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ --m_value;
+ if(m_value < 0)
+ {
+ m_cond.wait(lock, [this] { return m_wakeups > 0; });
+ --m_wakeups;
+ }
+ }
+
+}
diff --git a/src/utils/semaphore.h b/src/utils/semaphore.h
new file mode 100644
index 000000000..c3ce73680
--- /dev/null
+++ b/src/utils/semaphore.h
@@ -0,0 +1,34 @@
+/*
+* Semaphore
+* by Pierre Gaston (http://p9as.blogspot.com/2012/06/c11-semaphores.html)
+* modified by Joel Low for Botan
+*
+*/
+
+#ifndef BOTAN_SEMAPHORE_H__
+#define BOTAN_SEMAPHORE_H__
+
+#include <mutex>
+#include <condition_variable>
+
+namespace Botan {
+
+class Semaphore
+ {
+ public:
+ Semaphore(int value = 0) : m_value(value), m_wakeups(0) {}
+
+ void acquire();
+
+ void release(size_t n = 1);
+
+ private:
+ int m_value;
+ int m_wakeups;
+ std::mutex m_mutex;
+ std::condition_variable m_cond;
+ };
+
+}
+
+#endif