summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2022-01-12 05:08:41 +0100
committerSven Gothel <[email protected]>2022-01-12 05:08:41 +0100
commitaa85ac0105b0971a737ad1809512442d7a993558 (patch)
treee94335ff96dce11f7beaba1027e7475cb0cbcbe1 /src
parent7db1599ef3655aa644b60eca29a051f16fdea0cd (diff)
BTGattHandler: Use jau::service_runner
Diffstat (limited to 'src')
-rw-r--r--src/direct_bt/BTGattHandler.cpp244
1 files changed, 97 insertions, 147 deletions
diff --git a/src/direct_bt/BTGattHandler.cpp b/src/direct_bt/BTGattHandler.cpp
index c946b4e1..5ec9a88c 100644
--- a/src/direct_bt/BTGattHandler.cpp
+++ b/src/direct_bt/BTGattHandler.cpp
@@ -1014,117 +1014,102 @@ void BTGattHandler::replyAttPDUReq(std::unique_ptr<const AttPDUMsg> && pdu) {
}
}
-void BTGattHandler::l2capReaderThreadImpl() {
- {
- const std::lock_guard<std::mutex> lock(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor
- l2capReaderShallStop = false;
- l2capReaderRunning = true;
- DBG_PRINT("GATTHandler::reader Started");
- }
- cv_l2capReaderInit.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
-
- thread_local jau::call_on_release thread_cleanup([&]() {
- DBG_PRINT("GATTHandler::l2capReaderThreadCleanup: l2capReaderRunning %d -> 0", l2capReaderRunning.load());
- l2capReaderRunning = false;
- cv_l2capReaderInit.notify_all();
- });
-
- while( !l2capReaderShallStop ) {
- jau::snsize_t len;
- if( !validateConnected() ) {
- ERR_PRINT("GATTHandler::reader: Invalid IO state -> Stop");
- l2capReaderShallStop = true;
- break;
- }
+void BTGattHandler::l2capReaderWork(jau::service_runner& sr) {
+ jau::snsize_t len;
+ if( !validateConnected() ) {
+ ERR_PRINT("GATTHandler::reader: Invalid IO state -> Stop");
+ sr.set_shall_stop();
+ return;
+ }
- len = l2cap.read(rbuffer.get_wptr(), rbuffer.size());
- if( 0 < len ) {
- std::unique_ptr<const AttPDUMsg> attPDU = AttPDUMsg::getSpecialized(rbuffer.get_ptr(), static_cast<jau::nsize_t>(len));
- COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Got %s", attPDU->toString().c_str());
-
- const AttPDUMsg::Opcode opc = attPDU->getOpcode();
- const AttPDUMsg::OpcodeType opc_type = AttPDUMsg::get_type(opc);
-
- if( AttPDUMsg::Opcode::MULTIPLE_HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION
- // FIXME TODO ..
- ERR_PRINT("GATTHandler::reader: MULTI-NTF not implemented: %s", attPDU->toString().c_str());
- } else if( AttPDUMsg::Opcode::HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION
- const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get());
- COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: NTF: %s, listener %zd", a->toString().c_str(), characteristicListenerList.size());
- BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle());
- const jau::TOctetSlice& a_value_view = a->getValue();
- const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU
- // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) );
- const uint64_t timestamp = a->ts_creation;
- int i=0;
- jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) {
- try {
- if( l->match(*decl) ) {
- l->notificationReceived(decl, data_view, timestamp);
- }
- } catch (std::exception &e) {
- ERR_PRINT("GATTHandler::notificationReceived-CBs %d/%zd: GATTCharacteristicListener %s: Caught exception %s",
- i+1, characteristicListenerList.size(),
- jau::to_hexstring((void*)l.get()).c_str(), e.what());
+ len = l2cap.read(rbuffer.get_wptr(), rbuffer.size());
+ if( 0 < len ) {
+ std::unique_ptr<const AttPDUMsg> attPDU = AttPDUMsg::getSpecialized(rbuffer.get_ptr(), static_cast<jau::nsize_t>(len));
+ COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Got %s", attPDU->toString().c_str());
+
+ const AttPDUMsg::Opcode opc = attPDU->getOpcode();
+ const AttPDUMsg::OpcodeType opc_type = AttPDUMsg::get_type(opc);
+
+ if( AttPDUMsg::Opcode::MULTIPLE_HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION
+ // FIXME TODO ..
+ ERR_PRINT("GATTHandler::reader: MULTI-NTF not implemented: %s", attPDU->toString().c_str());
+ } else if( AttPDUMsg::Opcode::HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION
+ const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get());
+ COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: NTF: %s, listener %zd", a->toString().c_str(), characteristicListenerList.size());
+ BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle());
+ const jau::TOctetSlice& a_value_view = a->getValue();
+ const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU
+ // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) );
+ const uint64_t timestamp = a->ts_creation;
+ int i=0;
+ jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) {
+ try {
+ if( l->match(*decl) ) {
+ l->notificationReceived(decl, data_view, timestamp);
}
- i++;
- });
- } else if( AttPDUMsg::Opcode::HANDLE_VALUE_IND == opc ) { // AttPDUMsg::OpcodeType::INDICATION
- const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get());
- COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd",
- a->toString().c_str(), sendIndicationConfirmation.load(), characteristicListenerList.size());
- bool cfmSent = false;
- if( sendIndicationConfirmation ) {
- AttHandleValueCfm cfm;
- send(cfm);
- cfmSent = true;
+ } catch (std::exception &e) {
+ ERR_PRINT("GATTHandler::notificationReceived-CBs %d/%zd: GATTCharacteristicListener %s: Caught exception %s",
+ i+1, characteristicListenerList.size(),
+ jau::to_hexstring((void*)l.get()).c_str(), e.what());
}
- BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle());
- const jau::TOctetSlice& a_value_view = a->getValue();
- const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU
- // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) );
- const uint64_t timestamp = a->ts_creation;
- int i=0;
- jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) {
- try {
- if( l->match(*decl) ) {
- l->indicationReceived(decl, data_view, timestamp, cfmSent);
- }
- } catch (std::exception &e) {
- ERR_PRINT("GATTHandler::indicationReceived-CBs %d/%zd: GATTCharacteristicListener %s, cfmSent %d: Caught exception %s",
- i+1, characteristicListenerList.size(),
- jau::to_hexstring((void*)l.get()).c_str(), cfmSent, e.what());
- }
- i++;
- });
- } else if( AttPDUMsg::OpcodeType::RESPONSE == opc_type ) {
- COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Ring: %s", attPDU->toString().c_str());
- attPDURing.putBlocking( std::move(attPDU) );
- } else if( AttPDUMsg::OpcodeType::REQUEST == opc_type ) {
- replyAttPDUReq( std::move( attPDU ) );
- } else {
- ERR_PRINT("GATTHandler::reader: Unhandled: %s", attPDU->toString().c_str());
+ i++;
+ });
+ } else if( AttPDUMsg::Opcode::HANDLE_VALUE_IND == opc ) { // AttPDUMsg::OpcodeType::INDICATION
+ const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get());
+ COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd",
+ a->toString().c_str(), sendIndicationConfirmation.load(), characteristicListenerList.size());
+ bool cfmSent = false;
+ if( sendIndicationConfirmation ) {
+ AttHandleValueCfm cfm;
+ send(cfm);
+ cfmSent = true;
}
- } else if( 0 > len && ETIMEDOUT != errno && !l2capReaderShallStop ) { // expected exits
- IRQ_PRINT("GATTHandler::reader: l2cap read error -> Stop; l2cap.read %d (%s); %s",
- len, L2CAPComm::getRWExitCodeString(len).c_str(),
- getStateString().c_str());
- l2capReaderShallStop = true;
- has_ioerror = true;
- } else if( len != L2CAPComm::number(L2CAPComm::RWExitCode::POLL_TIMEOUT) ) { // expected POLL_TIMEOUT if idle
- WORDY_PRINT("GATTHandler::reader: l2cap read: l2cap.read %d (%s); %s",
- len, L2CAPComm::getRWExitCodeString(len).c_str(),
- getStateString().c_str());
+ BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle());
+ const jau::TOctetSlice& a_value_view = a->getValue();
+ const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU
+ // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) );
+ const uint64_t timestamp = a->ts_creation;
+ int i=0;
+ jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) {
+ try {
+ if( l->match(*decl) ) {
+ l->indicationReceived(decl, data_view, timestamp, cfmSent);
+ }
+ } catch (std::exception &e) {
+ ERR_PRINT("GATTHandler::indicationReceived-CBs %d/%zd: GATTCharacteristicListener %s, cfmSent %d: Caught exception %s",
+ i+1, characteristicListenerList.size(),
+ jau::to_hexstring((void*)l.get()).c_str(), cfmSent, e.what());
+ }
+ i++;
+ });
+ } else if( AttPDUMsg::OpcodeType::RESPONSE == opc_type ) {
+ COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Ring: %s", attPDU->toString().c_str());
+ attPDURing.putBlocking( std::move(attPDU) );
+ } else if( AttPDUMsg::OpcodeType::REQUEST == opc_type ) {
+ replyAttPDUReq( std::move( attPDU ) );
+ } else {
+ ERR_PRINT("GATTHandler::reader: Unhandled: %s", attPDU->toString().c_str());
}
+ } else if( 0 > len && ETIMEDOUT != errno && !sr.get_shall_stop() ) { // expected exits
+ IRQ_PRINT("GATTHandler::reader: l2cap read error -> Stop; l2cap.read %d (%s); %s",
+ len, L2CAPComm::getRWExitCodeString(len).c_str(),
+ getStateString().c_str());
+ sr.set_shall_stop();
+ has_ioerror = true;
+ } else if( len != L2CAPComm::number(L2CAPComm::RWExitCode::POLL_TIMEOUT) ) { // expected POLL_TIMEOUT if idle
+ WORDY_PRINT("GATTHandler::reader: l2cap read: l2cap.read %d (%s); %s",
+ len, L2CAPComm::getRWExitCodeString(len).c_str(),
+ getStateString().c_str());
}
- {
- const std::lock_guard<std::mutex> lock(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor
- WORDY_PRINT("GATTHandler::reader: Ended. Ring has %u entries flushed", attPDURing.size());
- attPDURing.clear();
- l2capReaderRunning = false;
- }
- cv_l2capReaderInit.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
+}
+void BTGattHandler::l2capReaderEndLocked(jau::service_runner& sr) {
+ (void)sr;
+ WORDY_PRINT("GATTHandler::reader: Ended. Ring has %u entries flushed", attPDURing.size());
+ attPDURing.clear();
+}
+void BTGattHandler::l2capReaderEndFinal(jau::service_runner& sr) {
+ (void)sr;
disconnect(true /* disconnectDevice */, has_ioerror);
}
@@ -1138,8 +1123,12 @@ BTGattHandler::BTGattHandler(const BTDeviceRef &device, L2CAPComm& l2cap_att, co
deviceString(device->getAddressAndType().toString()),
rbuffer(number(Defaults::MAX_ATT_MTU), jau::endian::little),
is_connected(l2cap.isOpen()), has_ioerror(false),
- attPDURing(env.ATTPDU_RING_CAPACITY), l2capReaderShallStop(false),
- l2capReaderThreadId(0), l2capReaderRunning(false),
+ l2cap_reader_service("GATTHandler::reader", THREAD_SHUTDOWN_TIMEOUT_MS,
+ jau::bindMemberFunc(this, &BTGattHandler::l2capReaderWork),
+ jau::service_runner::Callback() /* init */,
+ jau::bindMemberFunc(this, &BTGattHandler::l2capReaderEndLocked),
+ jau::bindMemberFunc(this, &BTGattHandler::l2capReaderEndFinal)),
+ attPDURing(env.ATTPDU_RING_CAPACITY),
gattServerData( GATTRole::Server == role ? device->getAdapter().getGATTServerData() : nullptr ),
serverMTU(number(Defaults::MIN_ATT_MTU)), usedMTU(number(Defaults::MIN_ATT_MTU))
{
@@ -1155,19 +1144,7 @@ BTGattHandler::BTGattHandler(const BTDeviceRef &device, L2CAPComm& l2cap_att, co
* We utilize DBTManager's mgmthandler_sigaction SIGALRM handler,
* as we only can install one handler.
*/
- {
- std::unique_lock<std::mutex> lock(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor
-
- std::thread l2capReaderThread(&BTGattHandler::l2capReaderThreadImpl, this); // @suppress("Invalid arguments")
- l2capReaderThreadId = l2capReaderThread.native_handle();
- // Avoid 'terminate called without an active exception'
- // as l2capReaderThread may end due to I/O errors.
- l2capReaderThread.detach();
-
- while( false == l2capReaderRunning ) {
- cv_l2capReaderInit.wait(lock);
- }
- }
+ l2cap_reader_service.start();
if( GATTRole::Client == getRole() ) {
// First point of failure if remote device exposes no GATT functionality. Allow a longer timeout!
@@ -1264,34 +1241,7 @@ bool BTGattHandler::disconnect(const bool disconnectDevice, const bool ioErrorCa
characteristicListenerList.clear();
PERF3_TS_TD("GATTHandler::disconnect.1");
- {
- std::unique_lock<std::mutex> lockReader(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor
- has_ioerror = false;
-
- const pthread_t tid_self = pthread_self();
- const pthread_t tid_l2capReader = l2capReaderThreadId;
- l2capReaderThreadId = 0;
- const bool is_l2capReader = tid_l2capReader == tid_self;
- DBG_PRINT("GATTHandler.disconnect: l2capReader[running %d, shallStop %d, isReader %d, tid %p)",
- l2capReaderRunning.load(), l2capReaderShallStop.load(), is_l2capReader, (void*)tid_l2capReader);
- if( l2capReaderRunning ) {
- l2capReaderShallStop = true;
- if( !is_l2capReader && 0 != tid_l2capReader ) {
- int kerr;
- if( 0 != ( kerr = pthread_kill(tid_l2capReader, SIGALRM) ) ) {
- ERR_PRINT("GATTHandler::disconnect: pthread_kill %p FAILED: %d", (void*)tid_l2capReader, kerr);
- }
- }
- // Ensure the reader thread has ended, no runaway-thread using *this instance after destruction
- while( true == l2capReaderRunning ) {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cv_l2capReaderInit.wait_until(lockReader, t0 + std::chrono::milliseconds(THREAD_SHUTDOWN_TIMEOUT_MS));
- if( std::cv_status::timeout == s && true == l2capReaderRunning ) {
- ERR_PRINT("GattHandler::disconnect::l2capReader: Timeout: %s", toString().c_str());
- }
- }
- }
- }
+ l2cap_reader_service.stop();
PERF3_TS_TD("GATTHandler::disconnect.2");
if( disconnectDevice ) {