Skip to content

Commit

Permalink
Fixed merge, added Output operation to arrus.utils.imaging package.
Browse files Browse the repository at this point in the history
  • Loading branch information
pjarosik committed Dec 23, 2021
1 parent 08d2569 commit d79cca0
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 98 deletions.
139 changes: 83 additions & 56 deletions api/python/arrus/utils/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,85 @@ def __unregister_buffer(self, buffers):
cp.cuda.runtime.hostUnregister(element.data.ctypes.data)


class Operation:
"""
An operation to perform in the imaging pipeline -- one data processing
stage.
"""

def prepare(self, const_metadata):
"""
Function that will called when the processing pipeline is prepared.
:param const_metadata: const metadata describing output from the \
previous Operation.
:return: const metadata describing output of this Operation.
"""
pass

def process(self, data):
"""
Function that will be called when new data arrives.
:param data: input data
:return: output data
"""
raise ValueError("Calling abstract method")

def __call__(self, *args, **kwargs):
return self.process(*args, **kwargs)

def initialize(self, data):
"""
Initialization function.
This function will be called on a cupy initialization stage.
By default, it runs `_process` function on a test cupy data.
:return: the processed data.
"""
return self.process(data)

def set_pkgs(self, **kwargs):
"""
Provides to possibility to gather python packages for numerical
processing and filtering.
The provided kwargs are:
- `num_pkg`: numerical package: numpy for CPU, cupy for GPU
- `filter_pkg`: scipy.ndimage for CPU, cupyx.scipy.ndimage for GPU
"""
pass


class Output(Operation):
"""
Output node.
Adding this node into the pipeline at a specific path will cause
adding this operator to the Pipeline will cause the output buffer to
return data from a given processing step.
"""

def __init__(self):
self.endpoint = True

def set_pkgs(self, num_pkg, filter_pkg, **kwargs):
self.xp = num_pkg
self.filter_pkg = filter_pkg

def initialize(self, data):
return data

def prepare(self, const_metadata: arrus.metadata.ConstMetadata):
return const_metadata

def process(self, data):
return (data, )


class Pipeline:
"""
Imaging pipeline.
Expand Down Expand Up @@ -285,14 +364,14 @@ def __initialize(self, const_metadata):
input_shape, dtype=input_dtype)+1000
data = self._input_buffer
for step in self.steps:
if not isinstance(step, Pipeline):
if not isinstance(step, (Pipeline, Output)):
data = step.initialize(data)

def prepare(self, const_metadata):
metadatas = deque()
current_metadata = const_metadata
for step in self.steps:
if isinstance(step, Pipeline):
if isinstance(step, (Pipeline, Output)):
child_metadatas = step.prepare(current_metadata)
if not isinstance(child_metadatas, Iterable):
child_metadatas = (child_metadatas, )
Expand All @@ -304,7 +383,8 @@ def prepare(self, const_metadata):
step.endpoint = False
# Force cupy to recompile kernels before running the pipeline.
self.__initialize(const_metadata)
if not isinstance(self.steps[-1], Pipeline):
last_step = self.steps[-1]
if not isinstance(last_step, (Pipeline, Output)):
metadatas.appendleft(current_metadata)
self._is_last_endpoint = False
else:
Expand Down Expand Up @@ -356,59 +436,6 @@ def set_placement(self, device):
self.filter_pkg = pkgs['filter_pkg']


class Operation:
"""
An operation to perform in the imaging pipeline -- one data processing
stage.
"""

def prepare(self, const_metadata):
"""
Function that will called when the processing pipeline is prepared.
:param const_metadata: const metadata describing output from the \
previous Operation.
:return: const metadata describing output of this Operation.
"""
pass

def process(self, data):
"""
Function that will be called when new data arrives.
:param data: input data
:return: output data
"""
raise ValueError("Calling abstract method")

def __call__(self, *args, **kwargs):
return self.process(*args, **kwargs)

def initialize(self, data):
"""
Initialization function.
This function will be called on a cupy initialization stage.
By default, it runs `_process` function on a test cupy data.
:return: the processed data.
"""
return self.process(data)

def set_pkgs(self, **kwargs):
"""
Provides to possibility to gather python packages for numerical
processing and filtering.
The provided kwargs are:
- `num_pkg`: numerical package: numpy for CPU, cupy for GPU
- `filter_pkg`: scipy.ndimage for CPU, cupyx.scipy.ndimage for GPU
"""
pass


class Lambda(Operation):
"""
Custom function to perform on data from a given step.
Expand Down
2 changes: 0 additions & 2 deletions arrus/core/api/devices/us4r/Us4OEM.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class Us4OEM : public Device, public TriggerGenerator {

virtual float getFPGATemperature() = 0;

virtual float getFPGATemperature() = 0;

Us4OEM(Us4OEM const&) = delete;
Us4OEM(Us4OEM const&&) = delete;
void operator=(Us4OEM const&) = delete;
Expand Down
12 changes: 7 additions & 5 deletions arrus/core/devices/probe/ProbeImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ void ProbeImpl::registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer:
::arrus::ops::us4r::Scheme::WorkMode workMode) {
adapter->registerOutputBuffer(buffer, us4rBuffer, workMode);
}
void ProbeImpl::unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4RBuffer) {
adapter->unregisterOutputBuffer(buffer, us4RBuffer);

void ProbeImpl::unregisterOutputBuffer() {
adapter->unregisterOutputBuffer();
}

// Remaps FCM according to given rx aperture active channels mappings.
Expand Down Expand Up @@ -192,9 +193,10 @@ FrameChannelMapping::Handle ProbeImpl::remapFcm(const FrameChannelMapping::Handl
auto nChannels = adapterFcm->getNumberOfLogicalChannels();
for (ChannelIdx pch = 0; pch < nChannels; ++pch) {
if(pch >= paddingLeft && pch < (nChannels-paddingRight)) {
auto [us4oem, physicalFrame, physicalChannel] =
adapterFcm->getLogical(frameNumber, probe2AdapterMap[pch-paddingLeft]+paddingLeft);

auto address = adapterFcm->getLogical(frameNumber, probe2AdapterMap[pch-paddingLeft]+paddingLeft);
auto us4oem = address.getUs4oem();
auto physicalFrame = address.getFrame();
auto physicalChannel = address.getChannel();
builder.setChannelMapping(frameNumber, pch, us4oem, physicalFrame, physicalChannel);
}

Expand Down
2 changes: 1 addition & 1 deletion arrus/core/devices/probe/ProbeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ProbeImpl : public ProbeImplBase {
void registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4rBuffer,
::arrus::ops::us4r::Scheme::WorkMode workMode) override;

void unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &handle) override;
void unregisterOutputBuffer() override;

static FrameChannelMapping::Handle remapFcm(const FrameChannelMapping::Handle &adapterFcm,
const std::vector<std::vector<ChannelIdx>> &adapterActiveChannels,
Expand Down
2 changes: 1 addition & 1 deletion arrus/core/devices/probe/ProbeImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ProbeImplBase : public Probe, public UltrasoundDevice {

virtual void registerOutputBuffer(Us4ROutputBuffer *, const Us4RBuffer::Handle &,
::arrus::ops::us4r::Scheme::WorkMode workMode) = 0;
virtual void unregisterOutputBuffer(Us4ROutputBuffer *, const Us4RBuffer::Handle &) = 0;
virtual void unregisterOutputBuffer() = 0;
};

}
Expand Down
2 changes: 1 addition & 1 deletion arrus/core/devices/us4r/Us4RImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void Us4RImpl::stopDevice() {
this->buffer->shutdown();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if(this->us4rBuffer) {
getProbeImpl()->unregisterOutputBuffer(this->buffer.get(), this->us4rBuffer);
getProbeImpl()->unregisterOutputBuffer();
this->us4rBuffer.reset();
}
}
Expand Down
5 changes: 0 additions & 5 deletions arrus/core/devices/us4r/Us4RImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,7 @@ class Us4RImpl : public Us4R {
void setDtgcAttenuation(std::optional<uint16> value) override;
void setActiveTermination(std::optional<uint16> value) override;
uint8_t getNumberOfUs4OEMs() override;
float getSamplingFrequency() const override;

uint8_t getNumberOfUs4OEMs() override;

void setTestPattern(Us4OEM::RxTestPattern pattern) override;

float getSamplingFrequency() const override;

private:
Expand Down
31 changes: 7 additions & 24 deletions arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ class ProbeAdapterTxRxValidator : public Validator<TxRxParamsSequence> {
currActiveRxChannels += txRxs[firing].getRxPadding().sum();
ARRUS_VALIDATOR_EXPECT_TRUE_M(currActiveRxChannels == nActiveRxChannels,
"Each rx aperture should have the same size.");

if(hasErrors()) {
return;
}
}
}

private:
ChannelIdx nChannels;
};
Expand Down Expand Up @@ -298,11 +296,11 @@ void ProbeAdapterImpl::registerOutputBuffer(Us4ROutputBuffer *bufferDst, const U
Us4OEMImplBase *us4oem, Scheme::WorkMode workMode) {
auto us4oemOrdinal = us4oem->getDeviceId().getOrdinal();
auto ius4oem = us4oem->getIUs4oem();
auto &elementsSrc = bufferSrc.getElements();
const auto nElementsSrc = bufferSrc.getNumberOfElements();
const size_t nElementsDst = bufferDst->getNumberOfElements();

size_t elementSize = getUniqueUs4OEMBufferElementSize(bufferSrc);

if (elementSize == 0) {
return;
}
Expand Down Expand Up @@ -377,28 +375,13 @@ size_t ProbeAdapterImpl::getUniqueUs4OEMBufferElementSize(const Us4OEMBuffer &us
return elementSize;
}

void ProbeAdapterImpl::unregisterOutputBuffer(Us4ROutputBuffer *hostBuffer, const Us4RBuffer::Handle &us4rBuffer) {
const size_t hostBufferNElements = hostBuffer->getNumberOfElements();

void ProbeAdapterImpl::unregisterOutputBuffer() {
if(transferRegistrar.empty()) {
return;
}
for (Ordinal i = 0; i < us4oems.size(); ++i) {
auto &us4oem = us4oems[i];
const Ordinal ordinal = us4oem->getDeviceId().getOrdinal();
auto ius4oem = us4oem->getIUs4oem();

auto us4oemBuffer = us4rBuffer->getUs4oemBuffer(i);
size_t elementSize = getUniqueUs4OEMBufferElementSize(us4oemBuffer);
const auto rxBufferNElements = ARRUS_SAFE_CAST(us4oemBuffer.getNumberOfElements(), uint16);
uint16 hostElement = 0, rxElement = 0;

while (hostElement < hostBufferNElements) {
auto dstAddress = hostBuffer->getAddress(hostElement, ordinal);
auto srcAddress = us4oemBuffer.getElement(rxElement).getAddress();
logger->log(LogSeverity::DEBUG,
format("Unregistering transfer: to {} from {}, size {}",
(size_t)dstAddress, (size_t)srcAddress, elementSize));
ius4oem->ReleaseTransferRxBufferToHost(dstAddress, elementSize, srcAddress);
++hostElement;
rxElement = (rxElement + 1) % rxBufferNElements;
if(transferRegistrar[i]) {
transferRegistrar[i]->unregisterTransfers();
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion arrus/core/devices/us4r/probeadapter/ProbeAdapterImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ProbeAdapterImpl : public ProbeAdapterImplBase {
void registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4rBuffer,
::arrus::ops::us4r::Scheme::WorkMode workMode) override;

void unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &us4rBuffer);
void unregisterOutputBuffer() override;

private:
void registerOutputBuffer(Us4ROutputBuffer *bufferDst, const Us4OEMBuffer &bufferSrc,
Expand All @@ -64,6 +64,7 @@ class ProbeAdapterImpl : public ProbeAdapterImplBase {
ChannelIdx numberOfChannels;
ChannelMapping channelMapping;
std::vector<std::shared_ptr<Us4OEMDataTransferRegistrar>> transferRegistrar;
};
}

#endif //ARRUS_CORE_DEVICES_US4R_PROBEADAPTER_PROBEADAPTERIMPL_H
3 changes: 2 additions & 1 deletion arrus/core/devices/us4r/probeadapter/ProbeAdapterImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class ProbeAdapterImplBase : public ProbeAdapter {
virtual
void registerOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &transfers,
::arrus::ops::us4r::Scheme::WorkMode workMode) = 0;
virtual void unregisterOutputBuffer(Us4ROutputBuffer *buffer, const Us4RBuffer::Handle &handle) = 0;

virtual void unregisterOutputBuffer() = 0;

virtual Ordinal getNumberOfUs4OEMs() = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class Us4OEMDataTransferRegistrar {
ARRUS_REQUIRES_AT_MOST(srcNTransfers, MAX_N_TRANSFERS, "Exceeded maximum number of transfers.");

// If true: create only nSrc transfers, the callback function will reprogram the appropriate number transfers.

if(dstNTransfers > MAX_N_TRANSFERS) {
strategy = 2;
}
Expand All @@ -80,6 +79,10 @@ class Us4OEMDataTransferRegistrar {
scheduleTransfers();
}

void unregisterTransfers() {
pageUnlockDstMemory();
}

static std::vector<Transfer> groupPartsIntoTransfers(const std::vector<Us4OEMBufferElementPart> &parts) {
std::vector<Transfer> transfers;
size_t address = 0;
Expand Down Expand Up @@ -114,6 +117,19 @@ class Us4OEMDataTransferRegistrar {
}
}

void pageUnlockDstMemory() {
for(uint16 dstIdx = 0, srcIdx = 0; dstIdx < dstNElements; ++dstIdx, srcIdx = (srcIdx+1) % srcNElements) {
uint8 *addressDst = dstBuffer->getAddress(dstIdx, us4oemOrdinal);
size_t addressSrc = srcBuffer->getElement(srcIdx).getAddress(); // byte-addressed
for(auto &transfer: elementTransfers) {
uint8 *dst = addressDst + transfer.address;
size_t src = addressSrc + transfer.address;
size_t size = transfer.size;
ius4oem->ReleaseTransferRxBufferToHost(dst, size, src);
}
}
}

void programTransfers(size_t nSrcPoints, size_t nDstPoints) {
for(uint16 dstIdx = 0, srcIdx = 0; dstIdx < nDstPoints; ++dstIdx, srcIdx = (srcIdx+1) % nSrcPoints) {
uint8 *addressDst = dstBuffer->getAddress(dstIdx, us4oemOrdinal);
Expand Down

0 comments on commit d79cca0

Please sign in to comment.