implement stream default src/sink
Some checks are pending
ContinuousDelivery / linux-ubuntu (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:arm64-v8a vcpkg_toolkit:arm64-android]) (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:x86_64 vcpkg_toolkit:x64-android]) (push) Waiting to run
ContinuousDelivery / windows (push) Waiting to run
ContinuousDelivery / windows-asan (push) Waiting to run
ContinuousDelivery / release (push) Blocked by required conditions
ContinuousIntegration / linux (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:arm64-v8a vcpkg_toolkit:arm64-android]) (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:x86_64 vcpkg_toolkit:x64-android]) (push) Waiting to run
ContinuousIntegration / macos (push) Waiting to run
ContinuousIntegration / windows (push) Waiting to run
Some checks are pending
ContinuousDelivery / linux-ubuntu (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:arm64-v8a vcpkg_toolkit:arm64-android]) (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:x86_64 vcpkg_toolkit:x64-android]) (push) Waiting to run
ContinuousDelivery / windows (push) Waiting to run
ContinuousDelivery / windows-asan (push) Waiting to run
ContinuousDelivery / release (push) Blocked by required conditions
ContinuousIntegration / linux (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:arm64-v8a vcpkg_toolkit:arm64-android]) (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:x86_64 vcpkg_toolkit:x64-android]) (push) Waiting to run
ContinuousIntegration / macos (push) Waiting to run
ContinuousIntegration / windows (push) Waiting to run
This commit is contained in:
@@ -27,7 +27,11 @@ StreamManager::Connection::Connection(
|
||||
}
|
||||
}
|
||||
|
||||
StreamManager::StreamManager(ObjectStore2& os) : _os(os) {}
|
||||
StreamManager::StreamManager(ObjectStore2& os) : _os(os) {
|
||||
_os.subscribe(this, ObjectStore_Event::object_construct);
|
||||
//_os.subscribe(this, ObjectStore_Event::object_update);
|
||||
_os.subscribe(this, ObjectStore_Event::object_destroy);
|
||||
}
|
||||
|
||||
StreamManager::~StreamManager(void) {
|
||||
// stop all connetions
|
||||
@@ -137,3 +141,66 @@ float StreamManager::tick(float) {
|
||||
return 2.f; // TODO: 2sec makes mainthread connections unusable
|
||||
}
|
||||
|
||||
bool StreamManager::onEvent(const ObjectStore::Events::ObjectConstruct& e) {
|
||||
if (!e.e.any_of<Components::StreamSink, Components::StreamSource>()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// update default targets
|
||||
if (e.e.all_of<Components::TagDefaultTarget>()) {
|
||||
if (e.e.all_of<Components::StreamSource>()) {
|
||||
_default_sources[e.e.get<Components::StreamSource>().frame_type_name] = e.e;
|
||||
} else { // sink
|
||||
_default_sinks[e.e.get<Components::StreamSink>().frame_type_name] = e.e;
|
||||
}
|
||||
}
|
||||
|
||||
// connect to default
|
||||
// only ever do this on new objects
|
||||
if (e.e.all_of<Components::TagConnectToDefault>()) {
|
||||
if (e.e.all_of<Components::StreamSource>()) {
|
||||
auto it_d_sink = _default_sinks.find(e.e.get<Components::StreamSource>().frame_type_name);
|
||||
if (it_d_sink != _default_sinks.cend()) {
|
||||
// TODO: threaded
|
||||
connect(e.e, it_d_sink->second);
|
||||
}
|
||||
} else { // sink
|
||||
auto it_d_src = _default_sources.find(e.e.get<Components::StreamSink>().frame_type_name);
|
||||
if (it_d_src != _default_sources.cend()) {
|
||||
// TODO: threaded
|
||||
connect(e.e, it_d_src->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool StreamManager::onEvent(const ObjectStore::Events::ObjectUpdate&) {
|
||||
// what do we do here?
|
||||
return false;
|
||||
}
|
||||
|
||||
bool StreamManager::onEvent(const ObjectStore::Events::ObjectDestory& e) {
|
||||
// typeless
|
||||
for (auto it = _default_sources.cbegin(); it != _default_sources.cend();) {
|
||||
if (it->second == e.e) {
|
||||
it = _default_sources.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
for (auto it = _default_sinks.cbegin(); it != _default_sinks.cend();) {
|
||||
if (it->second == e.e) {
|
||||
it = _default_sinks.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: destroy connections
|
||||
// TODO: auto reconnect default following devices if another default exists
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user