diff --git a/api/python/arrus/utils/imaging.py b/api/python/arrus/utils/imaging.py index 82912602f..a5c55a12a 100644 --- a/api/python/arrus/utils/imaging.py +++ b/api/python/arrus/utils/imaging.py @@ -247,6 +247,85 @@ def __unregister_buffer(self, buffers): cp.cuda.runtime.hostUnregister(element.data.ctypes.data) +class Operation: + """ + An operation to perform in the imaging pipeline -- one data processing + stage. + """ + + def prepare(self, const_metadata): + """ + Function that will called when the processing pipeline is prepared. + + :param const_metadata: const metadata describing output from the \ + previous Operation. + :return: const metadata describing output of this Operation. + """ + pass + + def process(self, data): + """ + Function that will be called when new data arrives. + + :param data: input data + :return: output data + """ + raise ValueError("Calling abstract method") + + def __call__(self, *args, **kwargs): + return self.process(*args, **kwargs) + + def initialize(self, data): + """ + Initialization function. + + This function will be called on a cupy initialization stage. + + By default, it runs `_process` function on a test cupy data. + + :return: the processed data. + """ + return self.process(data) + + def set_pkgs(self, **kwargs): + """ + Provides to possibility to gather python packages for numerical + processing and filtering. + + The provided kwargs are: + + - `num_pkg`: numerical package: numpy for CPU, cupy for GPU + - `filter_pkg`: scipy.ndimage for CPU, cupyx.scipy.ndimage for GPU + """ + pass + + +class Output(Operation): + """ + Output node. + + Adding this node into the pipeline at a specific path will cause + adding this operator to the Pipeline will cause the output buffer to + return data from a given processing step. + """ + + def __init__(self): + self.endpoint = True + + def set_pkgs(self, num_pkg, filter_pkg, **kwargs): + self.xp = num_pkg + self.filter_pkg = filter_pkg + + def initialize(self, data): + return data + + def prepare(self, const_metadata: arrus.metadata.ConstMetadata): + return const_metadata + + def process(self, data): + return (data, ) + + class Pipeline: """ Imaging pipeline. @@ -285,14 +364,14 @@ def __initialize(self, const_metadata): input_shape, dtype=input_dtype)+1000 data = self._input_buffer for step in self.steps: - if not isinstance(step, Pipeline): + if not isinstance(step, (Pipeline, Output)): data = step.initialize(data) def prepare(self, const_metadata): metadatas = deque() current_metadata = const_metadata for step in self.steps: - if isinstance(step, Pipeline): + if isinstance(step, (Pipeline, Output)): child_metadatas = step.prepare(current_metadata) if not isinstance(child_metadatas, Iterable): child_metadatas = (child_metadatas, ) @@ -304,7 +383,8 @@ def prepare(self, const_metadata): step.endpoint = False # Force cupy to recompile kernels before running the pipeline. self.__initialize(const_metadata) - if not isinstance(self.steps[-1], Pipeline): + last_step = self.steps[-1] + if not isinstance(last_step, (Pipeline, Output)): metadatas.appendleft(current_metadata) self._is_last_endpoint = False else: @@ -356,59 +436,6 @@ def set_placement(self, device): self.filter_pkg = pkgs['filter_pkg'] -class Operation: - """ - An operation to perform in the imaging pipeline -- one data processing - stage. - """ - - def prepare(self, const_metadata): - """ - Function that will called when the processing pipeline is prepared. - - :param const_metadata: const metadata describing output from the \ - previous Operation. - :return: const metadata describing output of this Operation. - """ - pass - - def process(self, data): - """ - Function that will be called when new data arrives. - - :param data: input data - :return: output data - """ - raise ValueError("Calling abstract method") - - def __call__(self, *args, **kwargs): - return self.process(*args, **kwargs) - - def initialize(self, data): - """ - Initialization function. - - This function will be called on a cupy initialization stage. - - By default, it runs `_process` function on a test cupy data. - - :return: the processed data. - """ - return self.process(data) - - def set_pkgs(self, **kwargs): - """ - Provides to possibility to gather python packages for numerical - processing and filtering. - - The provided kwargs are: - - - `num_pkg`: numerical package: numpy for CPU, cupy for GPU - - `filter_pkg`: scipy.ndimage for CPU, cupyx.scipy.ndimage for GPU - """ - pass - - class Lambda(Operation): """ Custom function to perform on data from a given step. diff --git a/arrus/core/api/devices/us4r/Us4OEM.h b/arrus/core/api/devices/us4r/Us4OEM.h index 25b0038cd..75abcd665 100644 --- a/arrus/core/api/devices/us4r/Us4OEM.h +++ b/arrus/core/api/devices/us4r/Us4OEM.h @@ -28,8 +28,6 @@ class Us4OEM : public Device, public TriggerGenerator { virtual float getFPGATemperature() = 0; - virtual float getFPGATemperature() = 0; - Us4OEM(Us4OEM const&) = delete; Us4OEM(Us4OEM const&&) = delete; void operator=(Us4OEM const&) = delete; diff --git a/arrus/core/devices/probe/ProbeImpl.cpp b/arrus/core/devices/probe/ProbeImpl.cpp index 958e3c9cf..7d21db2ce 100644 --- a/arrus/core/devices/probe/ProbeImpl.cpp +++ b/arrus/core/devices/probe/ProbeImpl.cpp @@ -129,8 +129,9 @@ void ProbeImpl::registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer: ::arrus::ops::us4r::Scheme::WorkMode workMode) { adapter->registerOutputBuffer(buffer, us4rBuffer, workMode); } -void ProbeImpl::unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4RBuffer) { - adapter->unregisterOutputBuffer(buffer, us4RBuffer); + +void ProbeImpl::unregisterOutputBuffer() { + adapter->unregisterOutputBuffer(); } // Remaps FCM according to given rx aperture active channels mappings. @@ -192,9 +193,10 @@ FrameChannelMapping::Handle ProbeImpl::remapFcm(const FrameChannelMapping::Handl auto nChannels = adapterFcm->getNumberOfLogicalChannels(); for (ChannelIdx pch = 0; pch < nChannels; ++pch) { if(pch >= paddingLeft && pch < (nChannels-paddingRight)) { - auto [us4oem, physicalFrame, physicalChannel] = - adapterFcm->getLogical(frameNumber, probe2AdapterMap[pch-paddingLeft]+paddingLeft); - + auto address = adapterFcm->getLogical(frameNumber, probe2AdapterMap[pch-paddingLeft]+paddingLeft); + auto us4oem = address.getUs4oem(); + auto physicalFrame = address.getFrame(); + auto physicalChannel = address.getChannel(); builder.setChannelMapping(frameNumber, pch, us4oem, physicalFrame, physicalChannel); } diff --git a/arrus/core/devices/probe/ProbeImpl.h b/arrus/core/devices/probe/ProbeImpl.h index 25f800a6d..805ecaf20 100644 --- a/arrus/core/devices/probe/ProbeImpl.h +++ b/arrus/core/devices/probe/ProbeImpl.h @@ -39,7 +39,7 @@ class ProbeImpl : public ProbeImplBase { void registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4rBuffer, ::arrus::ops::us4r::Scheme::WorkMode workMode) override; - void unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &handle) override; + void unregisterOutputBuffer() override; static FrameChannelMapping::Handle remapFcm(const FrameChannelMapping::Handle &adapterFcm, const std::vector> &adapterActiveChannels, diff --git a/arrus/core/devices/probe/ProbeImplBase.h b/arrus/core/devices/probe/ProbeImplBase.h index 36eda8b8f..ad0ebe1d1 100644 --- a/arrus/core/devices/probe/ProbeImplBase.h +++ b/arrus/core/devices/probe/ProbeImplBase.h @@ -21,7 +21,7 @@ class ProbeImplBase : public Probe, public UltrasoundDevice { virtual void registerOutputBuffer(Us4ROutputBuffer *, const Us4RBuffer::Handle &, ::arrus::ops::us4r::Scheme::WorkMode workMode) = 0; - virtual void unregisterOutputBuffer(Us4ROutputBuffer *, const Us4RBuffer::Handle &) = 0; + virtual void unregisterOutputBuffer() = 0; }; } diff --git a/arrus/core/devices/us4r/Us4RImpl.cpp b/arrus/core/devices/us4r/Us4RImpl.cpp index fe337e6ea..a28e3d915 100644 --- a/arrus/core/devices/us4r/Us4RImpl.cpp +++ b/arrus/core/devices/us4r/Us4RImpl.cpp @@ -149,7 +149,7 @@ void Us4RImpl::stopDevice() { this->buffer->shutdown(); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if(this->us4rBuffer) { - getProbeImpl()->unregisterOutputBuffer(this->buffer.get(), this->us4rBuffer); + getProbeImpl()->unregisterOutputBuffer(); this->us4rBuffer.reset(); } } diff --git a/arrus/core/devices/us4r/Us4RImpl.h b/arrus/core/devices/us4r/Us4RImpl.h index 9a7a60d53..2aa748f84 100644 --- a/arrus/core/devices/us4r/Us4RImpl.h +++ b/arrus/core/devices/us4r/Us4RImpl.h @@ -123,12 +123,7 @@ class Us4RImpl : public Us4R { void setDtgcAttenuation(std::optional value) override; void setActiveTermination(std::optional value) override; uint8_t getNumberOfUs4OEMs() override; - float getSamplingFrequency() const override; - - uint8_t getNumberOfUs4OEMs() override; - void setTestPattern(Us4OEM::RxTestPattern pattern) override; - float getSamplingFrequency() const override; private: diff --git a/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.cpp b/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.cpp index defbc16de..756fee17a 100644 --- a/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.cpp +++ b/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.cpp @@ -51,13 +51,11 @@ class ProbeAdapterTxRxValidator : public Validator { currActiveRxChannels += txRxs[firing].getRxPadding().sum(); ARRUS_VALIDATOR_EXPECT_TRUE_M(currActiveRxChannels == nActiveRxChannels, "Each rx aperture should have the same size."); - if(hasErrors()) { return; } } } - private: ChannelIdx nChannels; }; @@ -298,11 +296,11 @@ void ProbeAdapterImpl::registerOutputBuffer(Us4ROutputBuffer *bufferDst, const U Us4OEMImplBase *us4oem, Scheme::WorkMode workMode) { auto us4oemOrdinal = us4oem->getDeviceId().getOrdinal(); auto ius4oem = us4oem->getIUs4oem(); - auto &elementsSrc = bufferSrc.getElements(); const auto nElementsSrc = bufferSrc.getNumberOfElements(); const size_t nElementsDst = bufferDst->getNumberOfElements(); size_t elementSize = getUniqueUs4OEMBufferElementSize(bufferSrc); + if (elementSize == 0) { return; } @@ -377,28 +375,13 @@ size_t ProbeAdapterImpl::getUniqueUs4OEMBufferElementSize(const Us4OEMBuffer &us return elementSize; } -void ProbeAdapterImpl::unregisterOutputBuffer(Us4ROutputBuffer *hostBuffer, const Us4RBuffer::Handle &us4rBuffer) { - const size_t hostBufferNElements = hostBuffer->getNumberOfElements(); - +void ProbeAdapterImpl::unregisterOutputBuffer() { + if(transferRegistrar.empty()) { + return; + } for (Ordinal i = 0; i < us4oems.size(); ++i) { - auto &us4oem = us4oems[i]; - const Ordinal ordinal = us4oem->getDeviceId().getOrdinal(); - auto ius4oem = us4oem->getIUs4oem(); - - auto us4oemBuffer = us4rBuffer->getUs4oemBuffer(i); - size_t elementSize = getUniqueUs4OEMBufferElementSize(us4oemBuffer); - const auto rxBufferNElements = ARRUS_SAFE_CAST(us4oemBuffer.getNumberOfElements(), uint16); - uint16 hostElement = 0, rxElement = 0; - - while (hostElement < hostBufferNElements) { - auto dstAddress = hostBuffer->getAddress(hostElement, ordinal); - auto srcAddress = us4oemBuffer.getElement(rxElement).getAddress(); - logger->log(LogSeverity::DEBUG, - format("Unregistering transfer: to {} from {}, size {}", - (size_t)dstAddress, (size_t)srcAddress, elementSize)); - ius4oem->ReleaseTransferRxBufferToHost(dstAddress, elementSize, srcAddress); - ++hostElement; - rxElement = (rxElement + 1) % rxBufferNElements; + if(transferRegistrar[i]) { + transferRegistrar[i]->unregisterTransfers(); } } } diff --git a/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.h b/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.h index c0a665d45..83bac2dd9 100644 --- a/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.h +++ b/arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.h @@ -47,7 +47,7 @@ class ProbeAdapterImpl : public ProbeAdapterImplBase { void registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4rBuffer, ::arrus::ops::us4r::Scheme::WorkMode workMode) override; - void unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4rBuffer); + void unregisterOutputBuffer() override; private: void registerOutputBuffer(Us4ROutputBuffer *bufferDst, const Us4OEMBuffer &bufferSrc, @@ -64,6 +64,7 @@ class ProbeAdapterImpl : public ProbeAdapterImplBase { ChannelIdx numberOfChannels; ChannelMapping channelMapping; std::vector> transferRegistrar; +}; } #endif //ARRUS_CORE_DEVICES_US4R_PROBEADAPTER_PROBEADAPTERIMPL_H diff --git a/arrus/core/devices/us4r/probeadapter/ProbeAdapterImplBase.h b/arrus/core/devices/us4r/probeadapter/ProbeAdapterImplBase.h index db3afb644..77eb5654b 100644 --- a/arrus/core/devices/us4r/probeadapter/ProbeAdapterImplBase.h +++ b/arrus/core/devices/us4r/probeadapter/ProbeAdapterImplBase.h @@ -29,7 +29,8 @@ class ProbeAdapterImplBase : public ProbeAdapter { virtual void registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &transfers, ::arrus::ops::us4r::Scheme::WorkMode workMode) = 0; - virtual void unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &handle) = 0; + + virtual void unregisterOutputBuffer() = 0; virtual Ordinal getNumberOfUs4OEMs() = 0; diff --git a/arrus/core/devices/us4r/probeadapter/Us4OEMDataTransferRegistrar.h b/arrus/core/devices/us4r/probeadapter/Us4OEMDataTransferRegistrar.h index 4c9dae32e..74a239103 100644 --- a/arrus/core/devices/us4r/probeadapter/Us4OEMDataTransferRegistrar.h +++ b/arrus/core/devices/us4r/probeadapter/Us4OEMDataTransferRegistrar.h @@ -54,7 +54,6 @@ class Us4OEMDataTransferRegistrar { ARRUS_REQUIRES_AT_MOST(srcNTransfers, MAX_N_TRANSFERS, "Exceeded maximum number of transfers."); // If true: create only nSrc transfers, the callback function will reprogram the appropriate number transfers. - if(dstNTransfers > MAX_N_TRANSFERS) { strategy = 2; } @@ -80,6 +79,10 @@ class Us4OEMDataTransferRegistrar { scheduleTransfers(); } + void unregisterTransfers() { + pageUnlockDstMemory(); + } + static std::vector groupPartsIntoTransfers(const std::vector &parts) { std::vector transfers; size_t address = 0; @@ -114,6 +117,19 @@ class Us4OEMDataTransferRegistrar { } } + void pageUnlockDstMemory() { + for(uint16 dstIdx = 0, srcIdx = 0; dstIdx < dstNElements; ++dstIdx, srcIdx = (srcIdx+1) % srcNElements) { + uint8 *addressDst = dstBuffer->getAddress(dstIdx, us4oemOrdinal); + size_t addressSrc = srcBuffer->getElement(srcIdx).getAddress(); // byte-addressed + for(auto &transfer: elementTransfers) { + uint8 *dst = addressDst + transfer.address; + size_t src = addressSrc + transfer.address; + size_t size = transfer.size; + ius4oem->ReleaseTransferRxBufferToHost(dst, size, src); + } + } + } + void programTransfers(size_t nSrcPoints, size_t nDstPoints) { for(uint16 dstIdx = 0, srcIdx = 0; dstIdx < nDstPoints; ++dstIdx, srcIdx = (srcIdx+1) % nSrcPoints) { uint8 *addressDst = dstBuffer->getAddress(dstIdx, us4oemOrdinal);