Program Listing for File DataQueue.hpp

Return to documentation for file (include/depthai/device/DataQueue.hpp)

#pragma once

// std
#include <atomic>
#include <memory>
#include <vector>

// project
#include "depthai/pipeline/datatype/ADatatype.hpp"
#include "depthai/utility/LockingQueue.hpp"
#include "depthai/xlink/XLinkConnection.hpp"

// shared
#include "depthai-shared/datatype/RawBuffer.hpp"
#include "depthai-shared/xlink/XLinkConstants.hpp"

namespace dai {

class DataOutputQueue {
   public:
    using CallbackId = int;

   private:
    LockingQueue<std::shared_ptr<ADatatype>> queue;
    std::thread readingThread;
    std::atomic<bool> running{true};
    std::string exceptionMessage{""};
    const std::string name{""};
    std::mutex callbacksMtx;
    std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks;
    CallbackId uniqueCallbackId{0};

    // const std::chrono::milliseconds READ_TIMEOUT{500};

   public:
    // DataOutputQueue constructor
    DataOutputQueue(const std::shared_ptr<XLinkConnection> conn, const std::string& streamName, unsigned int maxSize = 16, bool blocking = true);
    ~DataOutputQueue();

    bool isClosed() const;

    void close();

    void setBlocking(bool blocking);

    bool getBlocking() const;

    void setMaxSize(unsigned int maxSize);

    unsigned int getMaxSize() const;

    std::string getName() const;

    CallbackId addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)>);

    CallbackId addCallback(std::function<void(std::shared_ptr<ADatatype>)>);

    CallbackId addCallback(std::function<void()> callback);

    bool removeCallback(CallbackId callbackId);

    template <class T>
    bool has() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());
        std::shared_ptr<ADatatype> val = nullptr;
        if(queue.front(val) && dynamic_cast<T*>(val.get())) {
            return true;
        }
        return false;
    }

    bool has() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());
        return !queue.empty();
    }

    template <class T>
    std::shared_ptr<T> tryGet() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.tryPop(val)) return nullptr;
        return std::dynamic_pointer_cast<T>(val);
    }

    std::shared_ptr<ADatatype> tryGet() {
        return tryGet<ADatatype>();
    }

    template <class T>
    std::shared_ptr<T> get() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.waitAndPop(val)) {
            throw std::runtime_error(exceptionMessage.c_str());
        }
        return std::dynamic_pointer_cast<T>(val);
    }

    std::shared_ptr<ADatatype> get() {
        return get<ADatatype>();
    }

    template <class T>
    std::shared_ptr<T> front() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.front(val)) return nullptr;
        return std::dynamic_pointer_cast<T>(val);
    }

    std::shared_ptr<ADatatype> front() {
        return front<ADatatype>();
    }

    template <class T, typename Rep, typename Period>
    std::shared_ptr<T> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());
        std::shared_ptr<ADatatype> val = nullptr;
        if(!queue.tryWaitAndPop(val, timeout)) {
            hasTimedout = true;
            return nullptr;
        }
        hasTimedout = false;
        return std::dynamic_pointer_cast<T>(val);
    }

    template <typename Rep, typename Period>
    std::shared_ptr<ADatatype> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        return get<ADatatype>(timeout, hasTimedout);
    }

    template <class T>
    std::vector<std::shared_ptr<T>> tryGetAll() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());

        std::vector<std::shared_ptr<T>> messages;
        queue.consumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
            // dynamic pointer cast may return nullptr
            // in which case that message in vector will be nullptr
            messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
        });

        return messages;
    }

    std::vector<std::shared_ptr<ADatatype>> tryGetAll() {
        return tryGetAll<ADatatype>();
    }

    template <class T>
    std::vector<std::shared_ptr<T>> getAll() {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());

        std::vector<std::shared_ptr<T>> messages;
        queue.waitAndConsumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
            // dynamic pointer cast may return nullptr
            // in which case that message in vector will be nullptr
            messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
        });

        return messages;
    }

    std::vector<std::shared_ptr<ADatatype>> getAll() {
        return getAll<ADatatype>();
    }

    template <class T, typename Rep, typename Period>
    std::vector<std::shared_ptr<T>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        if(!running) throw std::runtime_error(exceptionMessage.c_str());

        std::vector<std::shared_ptr<T>> messages;
        hasTimedout = !queue.waitAndConsumeAll(
            [&messages](std::shared_ptr<ADatatype>& msg) {
                // dynamic pointer cast may return nullptr
                // in which case that message in vector will be nullptr
                messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
            },
            timeout);

        return messages;
    }

    template <typename Rep, typename Period>
    std::vector<std::shared_ptr<ADatatype>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
        return getAll<ADatatype>(timeout, hasTimedout);
    }
};

class DataInputQueue {
    LockingQueue<std::shared_ptr<RawBuffer>> queue;
    std::thread writingThread;
    std::atomic<bool> running{true};
    std::string exceptionMessage;
    const std::string name;
    std::atomic<std::size_t> maxDataSize{device::XLINK_USB_BUFFER_MAX_SIZE};

   public:
    DataInputQueue(const std::shared_ptr<XLinkConnection> conn,
                   const std::string& streamName,
                   unsigned int maxSize = 16,
                   bool blocking = true,
                   std::size_t maxDataSize = device::XLINK_USB_BUFFER_MAX_SIZE);
    ~DataInputQueue();

    bool isClosed() const;

    void close();

    void setMaxDataSize(std::size_t maxSize);

    std::size_t getMaxDataSize();

    void setBlocking(bool blocking);

    bool getBlocking() const;

    void setMaxSize(unsigned int maxSize);

    unsigned int getMaxSize() const;

    std::string getName() const;

    void send(const std::shared_ptr<RawBuffer>& rawMsg);

    void send(const std::shared_ptr<ADatatype>& msg);

    void send(const ADatatype& msg);

    bool send(const std::shared_ptr<RawBuffer>& rawMsg, std::chrono::milliseconds timeout);

    bool send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout);

    bool send(const ADatatype& msg, std::chrono::milliseconds timeout);
};

}  // namespace dai