Skip to content

Commit

Permalink
Merge branch 'master' into release/5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
sveseli committed Oct 14, 2022
2 parents c3fea58 + 6826a7b commit ce8688a
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 2 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
229 changes: 228 additions & 1 deletion documentation/streamingFramework.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ class UserDataProcessor:

def __init__(self, configDict={}):
...
# The following will be set after processor gets instantiated.
self.processorId = None
self.pvaServer = None
self.outputChannel = None
self.objectIdField = None
self.metadataQueueMap = {}

# Method called at start
def start(self):
Expand Down Expand Up @@ -97,7 +103,8 @@ might have to be tweaked in order for examples to run without lost frames.
A medium range workstation (e.g. dual Intel Xeon E5-2620 2.40GHz CPU, 24
logical cores, 64GB RAM, local SSD drives) should be able to run all
examples shown here without any issues. Note that some commands use
[sample AD image processor](../examples/hpcAdImageProcessorExample.py) as
[sample AD image processor](../examples/hpcAdImageProcessorExample.py) or
[sample AD metadata processor](../examples/hpcAdMetadataProcessorExample.py) as
external (user) code. Also, instead of generating random image data, one
could, for example, concatenate actual image data into a set of NumPy arrays
and pass that file into the pvapy-ad-sim-server command using the
Expand Down Expand Up @@ -632,6 +639,138 @@ $ pvapy-ad-sim-server -cn ad:image -nx 128 -ny 128 -dt uint8 -fps 2000 -rt 60 -r
Once the data source starts publishing images, they will be streamed through
all the components of the system, and saved into the designated output folder.

### Metadata Handling with Data Collector

In many cases images need to be associated with with various pieces of metadata (e.g., position information)
before processing. The streaming framework allows one to receive PV updates from any number of metadata channels
(CA or PVA), which are made available to the user processing module as a dictionary of metadata
channel names/PvObject queues.

<p align="center">
<img alt="Metadata Handling with Data Collector" src="images/StreamingFrameworkMetadataHandlingDataCollector.jpg">
</p>

This example uses [sample AD metadata processor](../examples/hpcAdMetadataProcessorExample.py) module which is capable of
associating images with available metadata based on their timestamp comparison, and producing NtNdArray objects
that contain additional metadata attributes. To see how it works,
download the sample metadata processor and start data collector on terminal 1
using the following command:

```sh
$ pvapy-hpc-collector \
--collector-id 1 \
--producer-id-list 1 \
--input-channel pvapy:image \
--control-channel collector:*:control \
--status-channel collector:*:status \
--output-channel collector:*:output \
--processor-file /path/to/hpcAdMetadataProcessorExample.py \
--processor-class HpcAdMetadataProcessor \
--report-period 10 \
--server-queue-size 100 \
--collector-cache-size 100 \
--monitor-queue-size 1000 \
--metadata-channels pva://x,pva://y,pva://z
```

On terminal 2 generate test images on channel 'pvapy:image' and PVA metadata on channels 'x', 'y', and 'z':

```sh
$ pvapy-ad-sim-server \
-cn pvapy:image -nx 128 -ny 128 -fps 100 -rp 100 -rt 60 \
-mpv pva://x,pva://y,pva://z
```

After image generation starts, on terminal 3 inspect both the original and processed images. The output channel
should contain the original image data plus values for x, y, and z attributes:

```sh
$ pvget pvapy:image # original image, no metadata
$ pvget collector:1:output # should contain x,y,z metadata
```

Note that the generated PVA metadata channels have a structure containing value and timestamp:

```sh
$ pvinfo x
x
Server: ...
Type:
structure
double value
time_t timeStamp
long secondsPastEpoch
int nanoseconds
int userTag
```

Since retrieving PVs from CA IOCs results in the same channel structure as in the above example,
the sample metadata processor works with either CA or PVA metadata channels. This can be verified
by replacing the AD simulation server command with the following:

```sh
$ EPICS_DB_INCLUDE_PATH=/path/to/epics-base/dbd pvapy-ad-sim-server \
-cn pvapy:image -nx 128 -ny 128 -fps 1 -rt 60 \
-mpv x,y,z
```

This command will start CA IOC and generate CA metadata channels 'x', 'y', and 'z'.
Note that it requires path to the EPICS Base dbd folder. For example, if you are using PvaPy
conda package, this folder would be located at '/path/to/conda/envs/env-name/opt/epics/dbd'.

### Metadata Handling with Distributed Consumers

Distributing metadata processing should allow one to handle higher frame rates. In this example
we also use mirror server for all image and metadata channels.

<p align="center">
<img alt="Metadata Handling with Distributed Consumers" src="images/StreamingFrameworkMetadataHandlingDataConsumers.jpg">
</p>

On terminal 1, start 4 metadata processors listening on 'pvapy:image' channel:

```sh
$ pvapy-hpc-consumer \
--consumer-id 1 \
--n-consumers 4 \
--input-channel pvapy:image \
--control-channel consumer:*:control \
--status-channel consumer:*:status \
--output-channel consumer:*:output \
--processor-file /path/to/hpcAdMetadataProcessorExample.py \
--processor-class HpcAdMetadataProcessor \
--processor-args '{"timestampTolerance" : 0.00025}' \
--report-period 10 \
--server-queue-size 2000 \
--accumulate-objects 10 \
--monitor-queue-size 0 \
--distributor-updates 1 \
--metadata-channels pva://pvapy:x,pva://pvapy:y,pva://pvapy:z
```

Each consumer will accumulate 10 images in the queue before processing them, in order to make sure
all metadata arrives before it is needed.

On terminal 2, start mirror server mapping all channels:

```sh
$ pvapy-mirror-server \
--channel-map "(pvapy:image,ad:image,pva,1000),(pvapy:x,ad:x,pva,1000),(pvapy:y,ad:y,pva,1000),(pvapy:z,ad:z,pva,1000)"
```

On terminal 3 generate images and metadata:

```sh
$ pvapy-ad-sim-server \
-cn ad:image -nx 128 -ny 128 -fps 2000 -rp 2000 -rt 60 \
-mpv pva://ad:x,pva://ad:y,pva://ad:z
```

Processing speed gains are not linear when compared to the single consumer case, because
each consumer receives alternate set of images and all metadata values, and hence some
metadata values will have to be discarded. This will be reflected in the metadata
processor statistics.

### Data Encryption

This example illustrates how data can be encrypted in the first processing
Expand Down Expand Up @@ -712,3 +851,91 @@ $ pvget enc:1:output # encrypted data
$ pvget dec:1:output # decrypted (raw) data
$ pvget proc:1:output # processed image
```

## Performance Testing

All tests described in this section have been performed with PvaPy version
5.1.0 (Python 3.9 conda package) on a 64-bit linux machine with 96 logical cores (Intel Xeon
Gold 6342 CPU with hyperthreading enabled) running at 3.5 GHz, and
with 2TB of RAM. Image server and all consumers were running on the
same machine.

### Throughput Tests

In order to asses how much data can be pushed through the framework we
ran a series of tests using the [base system user processor] (../pvapy/hpc/userDataProcessor.py)
that does not manipulate image and hence does not generate any additional
load on the test machine.

On terminal 1, we used the following command to spawn 1 or more
consumer processes:

```sh
$ pvapy-hpc-consumer \
--input-channel pvapy:image \
--control-channel consumer:*:control \
--status-channel consumer:*:status \
--output-channel consumer:*:output \
--processor-class pvapy.hpc.userDataProcessor.UserDataProcessor \
--report-period 10 \
--server-queue-size SERVER_QUEUE_SIZE \
--n-consumers N_CONSUMERS \
[--distributor-updates 1]
```

Server queue size varied according to the test image size.
Whenever we used multiple consumers (N_CONSUMERS > 1) data distributor was
turned on using the '--distributor-updates 1' option. For a single consumer
this option was left out.

On terminal 2 images were generated for 60 seconds using the following command:

```sh
$ pvapy-ad-sim-server \
-cn ad:image -nf 100 -dt uint8 -rt 60 \
-nx FRAME_SIZE -ny FRAME_SIZE -fps FRAME_RATE -rp FRAME_RATE
```

The above command was able to reliably generate images at stable rates
of up to 20 KHz. Going beyond that number, the resulting frame output frame rate varied
too much (more than a few Hz).

A given test was deemed successful if no frames were
missed during the 60 second server runtime. Results for the maximum
simulated detector rate that image consumers were able to sustain
without missing any frames are shown below:

* Image size: 4096 x 4096 (uint8, 16.78 MB); Server queue size: 100

| Consumers | Frames/second | Frames/second/consumer | Frames/minute | Data rate/consumer | Total data rate |
| ---: | ---: | ---: | ---: | ---: | ---: |
| 1 | 150 | 150 | 9000 | 2.52 GBps | 2.52 GBps |
| 4 | 600 | 150 | 36000 | 2.52 GBps | 10.07 GBps |
| 8 | 1000 | 125 | 60000 | 2.10 GBps | 16.78 GBps |
| 10 | 1200 | 120 | 72000 | 2.01 GBps | 20.13 GBps |

* Image size: 2048 x 2048 (uint8, 4.19 MB); Server queue size: 200

| Consumers | Frames/second | Frames/second/consumer | Frames/minute | Data rate/consumer | Total data rate |
| ---: | ---: | ---: | ---: | ---: | ---: |
| 1 | 700 | 700 | 42000 | 2.94 GBps | 2.94 GBps |
| 4 | 2600 | 650 | 156000 | 2.73 GBps | 10.91 GBps |
| 8 | 4000 | 500 | 240000 | 2.10 GBps | 16.78 GBps |
| 10 | 4500 | 450 | 270000 | 1.89 GBps | 18.88 GBps |

* Image size: 1024 x 1024 (uint8, 1.05 MB); Server queue size: 500

| Consumers | Frames/second | Frames/second/consumer | Frames/minute | Data rate/consumer | Total data rate |
| ---: | ---: | ---: | ---: | ---: | ---: |
| 1 | 3200 | 3200 | 192000 | 3.36 GBps | 3.36 GBps |
| 4 | 10000 | 2500 | 600000 | 2.62 GBps | 10.49 GBps |
| 8 | 12000 | 1500 | 720000 | 1.57 GBps | 12.58 GBps |
| 10 | 14000 | 1400 | 840000 | 1.47 GBps | 14.68 GBps |

* Image size: 512 x 512 (uint8, 0.26 MB); Server queue size: 1000

| Consumers | Frames/second | Frames/second/consumer | Frames/minute | Data rate/consumer | Total data rate |
| ---: | ---: | ---: | ---: | ---: | ---: |
| 1 | 10000 | 10000 | 600000 | 2.62 GBps | 2.62 GBps |
| 4 | 20000 | 5000 | 1200000 | 1.31 GBps | 5.24 GBps |

2 changes: 1 addition & 1 deletion pvapy/hpc/userDataProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, configDict={}):
'''
self.logger = LoggingManager.getLogger(self.__class__.__name__)

# The following will be set after object gets created.
# The following will be set after processor gets instantiated.
self.processorId = None
self.pvaServer = None
self.outputChannel = None
Expand Down

0 comments on commit ce8688a

Please sign in to comment.