diff options
author | Sven Gothel <[email protected]> | 2022-01-12 05:08:41 +0100 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2022-01-12 05:08:41 +0100 |
commit | aa85ac0105b0971a737ad1809512442d7a993558 (patch) | |
tree | e94335ff96dce11f7beaba1027e7475cb0cbcbe1 | |
parent | 7db1599ef3655aa644b60eca29a051f16fdea0cd (diff) |
BTGattHandler: Use jau::service_runner
-rw-r--r-- | api/direct_bt/BTGattHandler.hpp | 13 | ||||
-rw-r--r-- | src/direct_bt/BTGattHandler.cpp | 244 |
2 files changed, 102 insertions, 155 deletions
diff --git a/api/direct_bt/BTGattHandler.hpp b/api/direct_bt/BTGattHandler.hpp index 102c12e8..5e77de0a 100644 --- a/api/direct_bt/BTGattHandler.hpp +++ b/api/direct_bt/BTGattHandler.hpp @@ -39,6 +39,7 @@ #include <jau/ringbuffer.hpp> #include <jau/cow_darray.hpp> #include <jau/uuid.hpp> +#include <jau/service_runner.hpp> #include "BTTypes0.hpp" #include "L2CAPComm.hpp" @@ -192,13 +193,8 @@ namespace direct_bt { jau::sc_atomic_bool is_connected; // reflects state jau::relaxed_atomic_bool has_ioerror; // reflects state + jau::service_runner l2cap_reader_service; jau::ringbuffer<std::unique_ptr<const AttPDUMsg>, jau::nsize_t> attPDURing; - jau::sc_atomic_bool l2capReaderShallStop; - - std::mutex mtx_l2capReaderLifecycle; - std::condition_variable cv_l2capReaderInit; - pthread_t l2capReaderThreadId; - jau::sc_atomic_bool l2capReaderRunning; /** send immediate confirmation of indication events from device, defaults to true. */ jau::relaxed_atomic_bool sendIndicationConfirmation = true; @@ -228,8 +224,9 @@ namespace direct_bt { void replyReadByGroupTypeReq(const AttReadByNTypeReq * pdu); void replyAttPDUReq(std::unique_ptr<const AttPDUMsg> && pdu); - void l2capReaderThreadImpl(); - + void l2capReaderWork(jau::service_runner& sr); + void l2capReaderEndLocked(jau::service_runner& sr); + void l2capReaderEndFinal(jau::service_runner& sr); /** * Sends the given AttPDUMsg to the connected device via l2cap. diff --git a/src/direct_bt/BTGattHandler.cpp b/src/direct_bt/BTGattHandler.cpp index c946b4e1..5ec9a88c 100644 --- a/src/direct_bt/BTGattHandler.cpp +++ b/src/direct_bt/BTGattHandler.cpp @@ -1014,117 +1014,102 @@ void BTGattHandler::replyAttPDUReq(std::unique_ptr<const AttPDUMsg> && pdu) { } } -void BTGattHandler::l2capReaderThreadImpl() { - { - const std::lock_guard<std::mutex> lock(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor - l2capReaderShallStop = false; - l2capReaderRunning = true; - DBG_PRINT("GATTHandler::reader Started"); - } - cv_l2capReaderInit.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread. - - thread_local jau::call_on_release thread_cleanup([&]() { - DBG_PRINT("GATTHandler::l2capReaderThreadCleanup: l2capReaderRunning %d -> 0", l2capReaderRunning.load()); - l2capReaderRunning = false; - cv_l2capReaderInit.notify_all(); - }); - - while( !l2capReaderShallStop ) { - jau::snsize_t len; - if( !validateConnected() ) { - ERR_PRINT("GATTHandler::reader: Invalid IO state -> Stop"); - l2capReaderShallStop = true; - break; - } +void BTGattHandler::l2capReaderWork(jau::service_runner& sr) { + jau::snsize_t len; + if( !validateConnected() ) { + ERR_PRINT("GATTHandler::reader: Invalid IO state -> Stop"); + sr.set_shall_stop(); + return; + } - len = l2cap.read(rbuffer.get_wptr(), rbuffer.size()); - if( 0 < len ) { - std::unique_ptr<const AttPDUMsg> attPDU = AttPDUMsg::getSpecialized(rbuffer.get_ptr(), static_cast<jau::nsize_t>(len)); - COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Got %s", attPDU->toString().c_str()); - - const AttPDUMsg::Opcode opc = attPDU->getOpcode(); - const AttPDUMsg::OpcodeType opc_type = AttPDUMsg::get_type(opc); - - if( AttPDUMsg::Opcode::MULTIPLE_HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION - // FIXME TODO .. - ERR_PRINT("GATTHandler::reader: MULTI-NTF not implemented: %s", attPDU->toString().c_str()); - } else if( AttPDUMsg::Opcode::HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION - const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get()); - COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: NTF: %s, listener %zd", a->toString().c_str(), characteristicListenerList.size()); - BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle()); - const jau::TOctetSlice& a_value_view = a->getValue(); - const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU - // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) ); - const uint64_t timestamp = a->ts_creation; - int i=0; - jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) { - try { - if( l->match(*decl) ) { - l->notificationReceived(decl, data_view, timestamp); - } - } catch (std::exception &e) { - ERR_PRINT("GATTHandler::notificationReceived-CBs %d/%zd: GATTCharacteristicListener %s: Caught exception %s", - i+1, characteristicListenerList.size(), - jau::to_hexstring((void*)l.get()).c_str(), e.what()); + len = l2cap.read(rbuffer.get_wptr(), rbuffer.size()); + if( 0 < len ) { + std::unique_ptr<const AttPDUMsg> attPDU = AttPDUMsg::getSpecialized(rbuffer.get_ptr(), static_cast<jau::nsize_t>(len)); + COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Got %s", attPDU->toString().c_str()); + + const AttPDUMsg::Opcode opc = attPDU->getOpcode(); + const AttPDUMsg::OpcodeType opc_type = AttPDUMsg::get_type(opc); + + if( AttPDUMsg::Opcode::MULTIPLE_HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION + // FIXME TODO .. + ERR_PRINT("GATTHandler::reader: MULTI-NTF not implemented: %s", attPDU->toString().c_str()); + } else if( AttPDUMsg::Opcode::HANDLE_VALUE_NTF == opc ) { // AttPDUMsg::OpcodeType::NOTIFICATION + const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get()); + COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: NTF: %s, listener %zd", a->toString().c_str(), characteristicListenerList.size()); + BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle()); + const jau::TOctetSlice& a_value_view = a->getValue(); + const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU + // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) ); + const uint64_t timestamp = a->ts_creation; + int i=0; + jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) { + try { + if( l->match(*decl) ) { + l->notificationReceived(decl, data_view, timestamp); } - i++; - }); - } else if( AttPDUMsg::Opcode::HANDLE_VALUE_IND == opc ) { // AttPDUMsg::OpcodeType::INDICATION - const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get()); - COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd", - a->toString().c_str(), sendIndicationConfirmation.load(), characteristicListenerList.size()); - bool cfmSent = false; - if( sendIndicationConfirmation ) { - AttHandleValueCfm cfm; - send(cfm); - cfmSent = true; + } catch (std::exception &e) { + ERR_PRINT("GATTHandler::notificationReceived-CBs %d/%zd: GATTCharacteristicListener %s: Caught exception %s", + i+1, characteristicListenerList.size(), + jau::to_hexstring((void*)l.get()).c_str(), e.what()); } - BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle()); - const jau::TOctetSlice& a_value_view = a->getValue(); - const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU - // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) ); - const uint64_t timestamp = a->ts_creation; - int i=0; - jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) { - try { - if( l->match(*decl) ) { - l->indicationReceived(decl, data_view, timestamp, cfmSent); - } - } catch (std::exception &e) { - ERR_PRINT("GATTHandler::indicationReceived-CBs %d/%zd: GATTCharacteristicListener %s, cfmSent %d: Caught exception %s", - i+1, characteristicListenerList.size(), - jau::to_hexstring((void*)l.get()).c_str(), cfmSent, e.what()); - } - i++; - }); - } else if( AttPDUMsg::OpcodeType::RESPONSE == opc_type ) { - COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Ring: %s", attPDU->toString().c_str()); - attPDURing.putBlocking( std::move(attPDU) ); - } else if( AttPDUMsg::OpcodeType::REQUEST == opc_type ) { - replyAttPDUReq( std::move( attPDU ) ); - } else { - ERR_PRINT("GATTHandler::reader: Unhandled: %s", attPDU->toString().c_str()); + i++; + }); + } else if( AttPDUMsg::Opcode::HANDLE_VALUE_IND == opc ) { // AttPDUMsg::OpcodeType::INDICATION + const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get()); + COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd", + a->toString().c_str(), sendIndicationConfirmation.load(), characteristicListenerList.size()); + bool cfmSent = false; + if( sendIndicationConfirmation ) { + AttHandleValueCfm cfm; + send(cfm); + cfmSent = true; } - } else if( 0 > len && ETIMEDOUT != errno && !l2capReaderShallStop ) { // expected exits - IRQ_PRINT("GATTHandler::reader: l2cap read error -> Stop; l2cap.read %d (%s); %s", - len, L2CAPComm::getRWExitCodeString(len).c_str(), - getStateString().c_str()); - l2capReaderShallStop = true; - has_ioerror = true; - } else if( len != L2CAPComm::number(L2CAPComm::RWExitCode::POLL_TIMEOUT) ) { // expected POLL_TIMEOUT if idle - WORDY_PRINT("GATTHandler::reader: l2cap read: l2cap.read %d (%s); %s", - len, L2CAPComm::getRWExitCodeString(len).c_str(), - getStateString().c_str()); + BTGattCharRef decl = findCharacterisicsByValueHandle(services, a->getHandle()); + const jau::TOctetSlice& a_value_view = a->getValue(); + const jau::TROOctets data_view(a_value_view.get_ptr_nc(0), a_value_view.size(), a_value_view.byte_order()); // just a view, still owned by attPDU + // const std::shared_ptr<TROOctets> data( std::make_shared<POctets>(a->getValue()) ); + const uint64_t timestamp = a->ts_creation; + int i=0; + jau::for_each_fidelity(characteristicListenerList, [&](std::shared_ptr<BTGattCharListener> &l) { + try { + if( l->match(*decl) ) { + l->indicationReceived(decl, data_view, timestamp, cfmSent); + } + } catch (std::exception &e) { + ERR_PRINT("GATTHandler::indicationReceived-CBs %d/%zd: GATTCharacteristicListener %s, cfmSent %d: Caught exception %s", + i+1, characteristicListenerList.size(), + jau::to_hexstring((void*)l.get()).c_str(), cfmSent, e.what()); + } + i++; + }); + } else if( AttPDUMsg::OpcodeType::RESPONSE == opc_type ) { + COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: Ring: %s", attPDU->toString().c_str()); + attPDURing.putBlocking( std::move(attPDU) ); + } else if( AttPDUMsg::OpcodeType::REQUEST == opc_type ) { + replyAttPDUReq( std::move( attPDU ) ); + } else { + ERR_PRINT("GATTHandler::reader: Unhandled: %s", attPDU->toString().c_str()); } + } else if( 0 > len && ETIMEDOUT != errno && !sr.get_shall_stop() ) { // expected exits + IRQ_PRINT("GATTHandler::reader: l2cap read error -> Stop; l2cap.read %d (%s); %s", + len, L2CAPComm::getRWExitCodeString(len).c_str(), + getStateString().c_str()); + sr.set_shall_stop(); + has_ioerror = true; + } else if( len != L2CAPComm::number(L2CAPComm::RWExitCode::POLL_TIMEOUT) ) { // expected POLL_TIMEOUT if idle + WORDY_PRINT("GATTHandler::reader: l2cap read: l2cap.read %d (%s); %s", + len, L2CAPComm::getRWExitCodeString(len).c_str(), + getStateString().c_str()); } - { - const std::lock_guard<std::mutex> lock(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor - WORDY_PRINT("GATTHandler::reader: Ended. Ring has %u entries flushed", attPDURing.size()); - attPDURing.clear(); - l2capReaderRunning = false; - } - cv_l2capReaderInit.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread. +} +void BTGattHandler::l2capReaderEndLocked(jau::service_runner& sr) { + (void)sr; + WORDY_PRINT("GATTHandler::reader: Ended. Ring has %u entries flushed", attPDURing.size()); + attPDURing.clear(); +} +void BTGattHandler::l2capReaderEndFinal(jau::service_runner& sr) { + (void)sr; disconnect(true /* disconnectDevice */, has_ioerror); } @@ -1138,8 +1123,12 @@ BTGattHandler::BTGattHandler(const BTDeviceRef &device, L2CAPComm& l2cap_att, co deviceString(device->getAddressAndType().toString()), rbuffer(number(Defaults::MAX_ATT_MTU), jau::endian::little), is_connected(l2cap.isOpen()), has_ioerror(false), - attPDURing(env.ATTPDU_RING_CAPACITY), l2capReaderShallStop(false), - l2capReaderThreadId(0), l2capReaderRunning(false), + l2cap_reader_service("GATTHandler::reader", THREAD_SHUTDOWN_TIMEOUT_MS, + jau::bindMemberFunc(this, &BTGattHandler::l2capReaderWork), + jau::service_runner::Callback() /* init */, + jau::bindMemberFunc(this, &BTGattHandler::l2capReaderEndLocked), + jau::bindMemberFunc(this, &BTGattHandler::l2capReaderEndFinal)), + attPDURing(env.ATTPDU_RING_CAPACITY), gattServerData( GATTRole::Server == role ? device->getAdapter().getGATTServerData() : nullptr ), serverMTU(number(Defaults::MIN_ATT_MTU)), usedMTU(number(Defaults::MIN_ATT_MTU)) { @@ -1155,19 +1144,7 @@ BTGattHandler::BTGattHandler(const BTDeviceRef &device, L2CAPComm& l2cap_att, co * We utilize DBTManager's mgmthandler_sigaction SIGALRM handler, * as we only can install one handler. */ - { - std::unique_lock<std::mutex> lock(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor - - std::thread l2capReaderThread(&BTGattHandler::l2capReaderThreadImpl, this); // @suppress("Invalid arguments") - l2capReaderThreadId = l2capReaderThread.native_handle(); - // Avoid 'terminate called without an active exception' - // as l2capReaderThread may end due to I/O errors. - l2capReaderThread.detach(); - - while( false == l2capReaderRunning ) { - cv_l2capReaderInit.wait(lock); - } - } + l2cap_reader_service.start(); if( GATTRole::Client == getRole() ) { // First point of failure if remote device exposes no GATT functionality. Allow a longer timeout! @@ -1264,34 +1241,7 @@ bool BTGattHandler::disconnect(const bool disconnectDevice, const bool ioErrorCa characteristicListenerList.clear(); PERF3_TS_TD("GATTHandler::disconnect.1"); - { - std::unique_lock<std::mutex> lockReader(mtx_l2capReaderLifecycle); // RAII-style acquire and relinquish via destructor - has_ioerror = false; - - const pthread_t tid_self = pthread_self(); - const pthread_t tid_l2capReader = l2capReaderThreadId; - l2capReaderThreadId = 0; - const bool is_l2capReader = tid_l2capReader == tid_self; - DBG_PRINT("GATTHandler.disconnect: l2capReader[running %d, shallStop %d, isReader %d, tid %p)", - l2capReaderRunning.load(), l2capReaderShallStop.load(), is_l2capReader, (void*)tid_l2capReader); - if( l2capReaderRunning ) { - l2capReaderShallStop = true; - if( !is_l2capReader && 0 != tid_l2capReader ) { - int kerr; - if( 0 != ( kerr = pthread_kill(tid_l2capReader, SIGALRM) ) ) { - ERR_PRINT("GATTHandler::disconnect: pthread_kill %p FAILED: %d", (void*)tid_l2capReader, kerr); - } - } - // Ensure the reader thread has ended, no runaway-thread using *this instance after destruction - while( true == l2capReaderRunning ) { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cv_l2capReaderInit.wait_until(lockReader, t0 + std::chrono::milliseconds(THREAD_SHUTDOWN_TIMEOUT_MS)); - if( std::cv_status::timeout == s && true == l2capReaderRunning ) { - ERR_PRINT("GattHandler::disconnect::l2capReader: Timeout: %s", toString().c_str()); - } - } - } - } + l2cap_reader_service.stop(); PERF3_TS_TD("GATTHandler::disconnect.2"); if( disconnectDevice ) { |