Skip to content

Commit

Permalink
Add sync id and tile count to raster tile messages (#1288)
Browse files Browse the repository at this point in the history
* Update protobuf to add sync id to tile messages

* Add sync_id field in tile messages and add checks if animation stopped

* Fix call to execute required tiles request

* Added tile count when sending RasterTileSync

* Update protobuf to the latest

* Refactor tile count in tile sync messages

* Update changelog

* Update protobuf commit.

---------

Co-authored-by: Pam Harris <pharris@nrao.edu>
Co-authored-by: crocka <raulomar@ualberta.ca>
Co-authored-by: Adrianna Pińska <adrianna.pinska@gmail.com>
  • Loading branch information
4 people authored Aug 22, 2023
1 parent 3ec97a1 commit aa5df48
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Fixed region spectral profile from FITS gz image ([#1271](https://github.com/CARTAvis/carta-backend/issues/1271)).
* Fixed the lack of mask for LEL images ([#1291](https://github.com/CARTAvis/carta-backend/issues/1291)).
* Fixed file path to save generated image ([#1252](https://github.com/CARTAvis/carta-backend/issues/1252)).
* Fixed missing tiles issue ([#1282](https://github.com/CARTAvis/carta-backend/issues/1282)).

## [4.0.0-beta.1]

Expand Down
2 changes: 1 addition & 1 deletion carta-protobuf
2 changes: 1 addition & 1 deletion src/Session/OnMessageTask.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class GeneralMessageTask : public OnMessageTask {
if constexpr (std::is_same_v<T, CARTA::SetHistogramRequirements>) {
_session->OnSetHistogramRequirements(_message, _request_id);
} else if constexpr (std::is_same_v<T, CARTA::AddRequiredTiles>) {
_session->OnAddRequiredTiles(_message, _session->AnimationRunning());
_session->OnAddRequiredTiles(_message, 0, _session->AnimationRunning());
} else if constexpr (std::is_same_v<T, CARTA::SetContourParameters>) {
_session->OnSetContourParameters(_message);
} else if constexpr (std::is_same_v<T, CARTA::SetSpatialRequirements>) {
Expand Down
127 changes: 82 additions & 45 deletions src/Session/Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Session::Session(uWS::WebSocket<false, true, PerSocketData>* ws, uWS::Loop* loop
_enable_scripting(enable_scripting),
_region_handler(nullptr),
_file_list_handler(file_list_handler),
_sync_id(0),
_animation_id(0),
_animation_active(false),
_cursor_settings(this),
Expand Down Expand Up @@ -689,65 +690,68 @@ void Session::DeleteFrame(int file_id) {
}
}

void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, bool skip_data) {
void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int animation_id, bool skip_data) {
auto file_id = message.file_id();

if (!_frames.count(file_id)) {
return;
}

if (skip_data) {
// Update view settings and skip sending data
_frames.at(file_id)->SetAnimationViewSettings(message);
return;
}

if (message.tiles().empty()) {
return;
}

if (animation_id > 0 && _animation_object->_stop_called) {
return;
}

auto z = _frames.at(file_id)->CurrentZ();
auto stokes = _frames.at(file_id)->CurrentStokes();
auto animation_id = AnimationRunning() ? _animation_id : 0;
if (_frames.count(file_id)) {
if (skip_data) {
// Update view settings and skip sending data
_frames.at(file_id)->SetAnimationViewSettings(message);
return;
}
auto sync_id = ++_sync_id;

if (message.tiles().empty()) {
return;
}
int num_tiles = message.tiles_size();
auto start_message = Message::RasterTileSync(file_id, z, stokes, sync_id, animation_id, num_tiles, false);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, start_message);

auto start_message = Message::RasterTileSync(file_id, z, stokes, animation_id, false);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, start_message);
CARTA::CompressionType compression_type = message.compression_type();
float compression_quality = message.compression_quality();

int num_tiles = message.tiles_size();
CARTA::CompressionType compression_type = message.compression_type();
float compression_quality = message.compression_quality();

Timer t;
ThreadManager::ApplyThreadLimit();
Timer t;
ThreadManager::ApplyThreadLimit();
#pragma omp parallel
{
int num_threads = omp_get_num_threads();
int stride = std::min(num_tiles, std::min(num_threads, MAX_TILING_TASKS));
{
int num_threads = omp_get_num_threads();
int stride = std::min(num_tiles, std::min(num_threads, MAX_TILING_TASKS));
#pragma omp for
for (int j = 0; j < stride; j++) {
for (int i = j; i < num_tiles; i += stride) {
const auto& encoded_coordinate = message.tiles(i);
auto raster_tile_data = Message::RasterTileData(file_id, animation_id);
auto tile = Tile::Decode(encoded_coordinate);
if (_frames.count(file_id) &&
_frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, z, stokes, compression_type, compression_quality)) {
// Only use deflate on outgoing message if the raster image compression type is NONE
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_DATA, 0, raster_tile_data,
compression_type == CARTA::CompressionType::NONE);
} else {
spdlog::warn("Discarding stale tile request for channel={}, layer={}, x={}, y={}", z, tile.layer, tile.x, tile.y);
}
for (int j = 0; j < stride; j++) {
for (int i = j; i < num_tiles; i += stride) {
const auto& encoded_coordinate = message.tiles(i);
auto raster_tile_data = Message::RasterTileData(file_id, sync_id, animation_id);
auto tile = Tile::Decode(encoded_coordinate);
if (_frames.count(file_id) &&
_frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, z, stokes, compression_type, compression_quality)) {
// Only use deflate on outgoing message if the raster image compression type is NONE
SendFileEvent(
file_id, CARTA::EventType::RASTER_TILE_DATA, 0, raster_tile_data, compression_type == CARTA::CompressionType::NONE);
} else {
spdlog::warn("Discarding stale tile request for channel={}, layer={}, x={}, y={}", z, tile.layer, tile.x, tile.y);
}
}
}
}

// Measure duration for get tile data
spdlog::performance("Get tile data group in {:.3f} ms", t.Elapsed().ms());
// Measure duration for get tile data
spdlog::performance("Get tile data group in {:.3f} ms", t.Elapsed().ms());

// Send final message with no tiles to signify end of the tile stream, for synchronisation purposes
auto final_message = Message::RasterTileSync(file_id, z, stokes, animation_id, true);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, final_message);
}
// Send final message with no tiles to signify end of the tile stream, for synchronisation purposes
auto final_message = Message::RasterTileSync(file_id, z, stokes, sync_id, animation_id, num_tiles, true);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, final_message);
}

void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) {
Expand Down Expand Up @@ -2072,7 +2076,7 @@ void Session::BuildAnimationObject(CARTA::StartAnimation& msg, uint32_t request_
}
}

void Session::ExecuteAnimationFrameInner() {
void Session::ExecuteAnimationFrameInner(int animation_id) {
CARTA::AnimationFrame curr_frame;

curr_frame = _animation_object->_next_frame;
Expand All @@ -2089,6 +2093,10 @@ void Session::ExecuteAnimationFrameInner() {
return;
}

if (_animation_object->_stop_called) {
return;
}

bool z_changed(active_frame_z != active_frame->CurrentZ());
bool stokes_changed(active_frame_stokes != active_frame->CurrentStokes());

Expand Down Expand Up @@ -2139,33 +2147,60 @@ void Session::ExecuteAnimationFrameInner() {
auto file_id = file_ids_to_update[i];
bool is_active_frame = file_id == active_file_id;
// Send contour data if required. Empty contour data messages are sent if there are no contour levels
if (_animation_object->_stop_called) {
return;
}
SendContourData(file_id, is_active_frame);

// Send vector field data if required
if (_animation_object->_stop_called) {
return;
}
SendVectorFieldData(file_id);

// Send tile data
OnAddRequiredTiles(_frames.at(file_id)->GetAnimationViewSettings());
if (_animation_object->_stop_called) {
return;
}
OnAddRequiredTiles(_frames.at(file_id)->GetAnimationViewSettings(), animation_id);

// Send region histograms and profiles
if (_animation_object->_stop_called) {
return;
}
UpdateRegionData(file_id, ALL_REGIONS, z_changed, stokes_changed);
}
} else {
if (active_frame->SetImageChannels(active_frame_z, active_frame_stokes, err_message)) {
// Send image histogram and profiles
bool send_histogram(true);
if (_animation_object->_stop_called) {
return;
}
UpdateImageData(active_file_id, send_histogram, z_changed, stokes_changed);

// Send contour data if required
if (_animation_object->_stop_called) {
return;
}
SendContourData(active_file_id);

// Send vector field data if required
if (_animation_object->_stop_called) {
return;
}
SendVectorFieldData(active_file_id);

// Send tile data
OnAddRequiredTiles(active_frame->GetAnimationViewSettings());
if (_animation_object->_stop_called) {
return;
}
OnAddRequiredTiles(active_frame->GetAnimationViewSettings(), animation_id);

// Send region histograms and profiles
if (_animation_object->_stop_called) {
return;
}
UpdateRegionData(active_file_id, ALL_REGIONS, z_changed, stokes_changed);
} else {
if (!err_message.empty()) {
Expand Down Expand Up @@ -2204,6 +2239,8 @@ bool Session::ExecuteAnimationFrame() {
return false;
}

auto animation_id = _animation_id; // Make sure tile data message has an id

auto wait_duration_ms = std::chrono::duration_cast<std::chrono::microseconds>(
_animation_object->_t_last + _animation_object->_frame_interval - std::chrono::high_resolution_clock::now());

Expand All @@ -2216,7 +2253,7 @@ bool Session::ExecuteAnimationFrame() {
}

curr_frame = _animation_object->_next_frame;
ExecuteAnimationFrameInner();
ExecuteAnimationFrameInner(animation_id);

CARTA::AnimationFrame tmp_frame;
CARTA::AnimationFrame delta_frame = _animation_object->_delta_frame;
Expand Down
5 changes: 3 additions & 2 deletions src/Session/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Session {
bool OnOpenFile(int file_id, const string& name, std::shared_ptr<casacore::ImageInterface<casacore::Float>> image,
CARTA::OpenFileAck* open_file_ack);
void OnCloseFile(const CARTA::CloseFile& message);
void OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, bool skip_data = false);
void OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int animation_id = 0, bool skip_data = false);
void OnSetImageChannels(const CARTA::SetImageChannels& message);
void OnSetCursor(const CARTA::SetCursor& message, uint32_t request_id);
bool OnSetRegion(const CARTA::SetRegion& message, uint32_t request_id, bool silent = false);
Expand Down Expand Up @@ -140,7 +140,7 @@ class Session {
}
void BuildAnimationObject(CARTA::StartAnimation& msg, uint32_t request_id);
bool ExecuteAnimationFrame();
void ExecuteAnimationFrameInner();
void ExecuteAnimationFrameInner(int animation_id);
void StopAnimation(int file_id, const ::CARTA::AnimationFrame& frame);
void HandleAnimationFlowControlEvt(CARTA::AnimationFlowControl& message);
int CurrentFlowWindowSize() {
Expand Down Expand Up @@ -329,6 +329,7 @@ class Session {
SessionContext _animation_context;

std::atomic<int> _ref_count;
int _sync_id;
int _animation_id;
bool _connected;
static volatile int _num_sessions;
Expand Down
8 changes: 6 additions & 2 deletions src/Util/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,15 @@ CARTA::SpatialProfileData Message::SpatialProfileData(int32_t x, int32_t y, int3
return message;
}

CARTA::RasterTileSync Message::RasterTileSync(int32_t file_id, int32_t channel, int32_t stokes, int32_t animation_id, bool end_sync) {
CARTA::RasterTileSync Message::RasterTileSync(
int32_t file_id, int32_t channel, int32_t stokes, int32_t sync_id, int32_t animation_id, int32_t tile_count, bool end_sync) {
CARTA::RasterTileSync message;
message.set_file_id(file_id);
message.set_channel(channel);
message.set_stokes(stokes);
message.set_sync_id(sync_id);
message.set_animation_id(animation_id);
message.set_tile_count(tile_count);
message.set_end_sync(end_sync);
return message;
}
Expand Down Expand Up @@ -638,9 +641,10 @@ CARTA::FileInfo Message::FileInfo(const std::string& name, CARTA::FileType type,
return message;
}

CARTA::RasterTileData Message::RasterTileData(int32_t file_id, int32_t animation_id) {
CARTA::RasterTileData Message::RasterTileData(int32_t file_id, int32_t sync_id, int32_t animation_id) {
CARTA::RasterTileData message;
message.set_file_id(file_id);
message.set_sync_id(sync_id);
message.set_animation_id(animation_id);
return message;
}
Expand Down
5 changes: 3 additions & 2 deletions src/Util/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ class Message {
int32_t stokes, float value, int32_t start, int32_t end, std::vector<float>& profile, std::string& coordinate, int32_t mip,
CARTA::ProfileAxisType axis_type, float crpix, float crval, float cdelt, std::string& unit);
static CARTA::SpatialProfileData SpatialProfileData(int32_t x, int32_t y, int32_t channel, int32_t stokes, float value);
static CARTA::RasterTileSync RasterTileSync(int32_t file_id, int32_t channel, int32_t stokes, int32_t animation_id, bool end_sync);
static CARTA::RasterTileSync RasterTileSync(
int32_t file_id, int32_t channel, int32_t stokes, int32_t sync_id, int32_t animation_id, int32_t tile_count, bool end_sync);
static CARTA::SetRegionAck SetRegionAck(int32_t region_id, bool success, std::string err_message);
static CARTA::RegisterViewerAck RegisterViewerAck(
uint32_t session_id, bool success, const std::string& status, const CARTA::SessionType& type);
Expand All @@ -141,7 +142,7 @@ class Message {
int32_t stokes_angle, const CARTA::CompressionType& compression_type, float compression_quality);
static CARTA::ErrorData ErrorData(const std::string& message, std::vector<std::string> tags, CARTA::ErrorSeverity severity);
static CARTA::FileInfo FileInfo(const std::string& name, CARTA::FileType type, int64_t size = 0, const std::string& hdu = "");
static CARTA::RasterTileData RasterTileData(int32_t file_id, int32_t animation_id);
static CARTA::RasterTileData RasterTileData(int32_t file_id, int32_t sync_id, int32_t animation_id);
static CARTA::StartAnimationAck StartAnimationAck(bool success, int32_t animation_id, const std::string& message);
static CARTA::ImportRegionAck ImportRegionAck(bool success, const std::string& message);
static CARTA::RegionStatsData RegionStatsData(int32_t file_id, int32_t region_id, int32_t channel, int32_t stokes);
Expand Down

0 comments on commit aa5df48

Please sign in to comment.