Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync id and tile count to raster tile messages #1288

Merged
merged 9 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Fixed the case-sensitive of reading BUNIT from a file header ([#1187](https://github.com/CARTAvis/carta-backend/issues/1187)).
* Fixed the crash when reading beam table with 64-bit floats ([#1166](https://github.com/CARTAvis/carta-backend/issues/1166)).
* Fixed region spectral profile from FITS gz image ([#1271](https://github.com/CARTAvis/carta-backend/issues/1271)).
* Added id and count to fix missing tiles issue ([#1282](https://github.com/CARTAvis/carta-backend/issues/1282)).

## [4.0.0-beta.1]

Expand Down
2 changes: 1 addition & 1 deletion src/Session/OnMessageTask.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class GeneralMessageTask : public OnMessageTask {
if constexpr (std::is_same_v<T, CARTA::SetHistogramRequirements>) {
_session->OnSetHistogramRequirements(_message, _request_id);
} else if constexpr (std::is_same_v<T, CARTA::AddRequiredTiles>) {
_session->OnAddRequiredTiles(_message, _session->AnimationRunning());
_session->OnAddRequiredTiles(_message, 0, _session->AnimationRunning());
} else if constexpr (std::is_same_v<T, CARTA::SetContourParameters>) {
_session->OnSetContourParameters(_message);
} else if constexpr (std::is_same_v<T, CARTA::SetSpatialRequirements>) {
Expand Down
127 changes: 82 additions & 45 deletions src/Session/Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Session::Session(uWS::WebSocket<false, true, PerSocketData>* ws, uWS::Loop* loop
_enable_scripting(enable_scripting),
_region_handler(nullptr),
_file_list_handler(file_list_handler),
_sync_id(0),
_animation_id(0),
_animation_active(false),
_cursor_settings(this),
Expand Down Expand Up @@ -657,65 +658,68 @@ void Session::DeleteFrame(int file_id) {
}
}

void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, bool skip_data) {
void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int animation_id, bool skip_data) {
auto file_id = message.file_id();

if (!_frames.count(file_id)) {
return;
}

if (skip_data) {
// Update view settings and skip sending data
_frames.at(file_id)->SetAnimationViewSettings(message);
return;
}

if (message.tiles().empty()) {
return;
}

if (animation_id > 0 && _animation_object->_stop_called) {
return;
}

auto z = _frames.at(file_id)->CurrentZ();
auto stokes = _frames.at(file_id)->CurrentStokes();
auto animation_id = AnimationRunning() ? _animation_id : 0;
if (_frames.count(file_id)) {
if (skip_data) {
// Update view settings and skip sending data
_frames.at(file_id)->SetAnimationViewSettings(message);
return;
}
auto sync_id = ++_sync_id;

if (message.tiles().empty()) {
return;
}
int num_tiles = message.tiles_size();
auto start_message = Message::RasterTileSync(file_id, z, stokes, sync_id, animation_id, num_tiles, false);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, start_message);

auto start_message = Message::RasterTileSync(file_id, z, stokes, animation_id, false);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, start_message);
CARTA::CompressionType compression_type = message.compression_type();
float compression_quality = message.compression_quality();

int num_tiles = message.tiles_size();
CARTA::CompressionType compression_type = message.compression_type();
float compression_quality = message.compression_quality();

Timer t;
ThreadManager::ApplyThreadLimit();
Timer t;
ThreadManager::ApplyThreadLimit();
#pragma omp parallel
{
int num_threads = omp_get_num_threads();
int stride = std::min(num_tiles, std::min(num_threads, MAX_TILING_TASKS));
{
int num_threads = omp_get_num_threads();
int stride = std::min(num_tiles, std::min(num_threads, MAX_TILING_TASKS));
#pragma omp for
for (int j = 0; j < stride; j++) {
for (int i = j; i < num_tiles; i += stride) {
const auto& encoded_coordinate = message.tiles(i);
auto raster_tile_data = Message::RasterTileData(file_id, animation_id);
auto tile = Tile::Decode(encoded_coordinate);
if (_frames.count(file_id) &&
_frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, z, stokes, compression_type, compression_quality)) {
// Only use deflate on outgoing message if the raster image compression type is NONE
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_DATA, 0, raster_tile_data,
compression_type == CARTA::CompressionType::NONE);
} else {
spdlog::warn("Discarding stale tile request for channel={}, layer={}, x={}, y={}", z, tile.layer, tile.x, tile.y);
}
for (int j = 0; j < stride; j++) {
for (int i = j; i < num_tiles; i += stride) {
const auto& encoded_coordinate = message.tiles(i);
auto raster_tile_data = Message::RasterTileData(file_id, sync_id, animation_id);
auto tile = Tile::Decode(encoded_coordinate);
if (_frames.count(file_id) &&
_frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, z, stokes, compression_type, compression_quality)) {
// Only use deflate on outgoing message if the raster image compression type is NONE
SendFileEvent(
file_id, CARTA::EventType::RASTER_TILE_DATA, 0, raster_tile_data, compression_type == CARTA::CompressionType::NONE);
} else {
spdlog::warn("Discarding stale tile request for channel={}, layer={}, x={}, y={}", z, tile.layer, tile.x, tile.y);
}
}
}
}

// Measure duration for get tile data
spdlog::performance("Get tile data group in {:.3f} ms", t.Elapsed().ms());
// Measure duration for get tile data
spdlog::performance("Get tile data group in {:.3f} ms", t.Elapsed().ms());

// Send final message with no tiles to signify end of the tile stream, for synchronisation purposes
auto final_message = Message::RasterTileSync(file_id, z, stokes, animation_id, true);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, final_message);
}
// Send final message with no tiles to signify end of the tile stream, for synchronisation purposes
auto final_message = Message::RasterTileSync(file_id, z, stokes, sync_id, animation_id, num_tiles, true);
SendFileEvent(file_id, CARTA::EventType::RASTER_TILE_SYNC, 0, final_message);
}

void Session::OnSetImageChannels(const CARTA::SetImageChannels& message) {
Expand Down Expand Up @@ -2039,7 +2043,7 @@ void Session::BuildAnimationObject(CARTA::StartAnimation& msg, uint32_t request_
}
}

void Session::ExecuteAnimationFrameInner() {
void Session::ExecuteAnimationFrameInner(int animation_id) {
CARTA::AnimationFrame curr_frame;

curr_frame = _animation_object->_next_frame;
Expand All @@ -2056,6 +2060,10 @@ void Session::ExecuteAnimationFrameInner() {
return;
}

if (_animation_object->_stop_called) {
return;
}

bool z_changed(active_frame_z != active_frame->CurrentZ());
bool stokes_changed(active_frame_stokes != active_frame->CurrentStokes());

Expand Down Expand Up @@ -2106,33 +2114,60 @@ void Session::ExecuteAnimationFrameInner() {
auto file_id = file_ids_to_update[i];
bool is_active_frame = file_id == active_file_id;
// Send contour data if required. Empty contour data messages are sent if there are no contour levels
if (_animation_object->_stop_called) {
return;
}
SendContourData(file_id, is_active_frame);

// Send vector field data if required
if (_animation_object->_stop_called) {
return;
}
SendVectorFieldData(file_id);

// Send tile data
OnAddRequiredTiles(_frames.at(file_id)->GetAnimationViewSettings());
if (_animation_object->_stop_called) {
return;
}
OnAddRequiredTiles(_frames.at(file_id)->GetAnimationViewSettings(), animation_id);

// Send region histograms and profiles
if (_animation_object->_stop_called) {
return;
}
UpdateRegionData(file_id, ALL_REGIONS, z_changed, stokes_changed);
}
} else {
if (active_frame->SetImageChannels(active_frame_z, active_frame_stokes, err_message)) {
// Send image histogram and profiles
bool send_histogram(true);
if (_animation_object->_stop_called) {
return;
}
UpdateImageData(active_file_id, send_histogram, z_changed, stokes_changed);

// Send contour data if required
if (_animation_object->_stop_called) {
return;
}
SendContourData(active_file_id);

// Send vector field data if required
if (_animation_object->_stop_called) {
return;
}
SendVectorFieldData(active_file_id);

// Send tile data
OnAddRequiredTiles(active_frame->GetAnimationViewSettings());
if (_animation_object->_stop_called) {
return;
}
OnAddRequiredTiles(active_frame->GetAnimationViewSettings(), animation_id);

// Send region histograms and profiles
if (_animation_object->_stop_called) {
return;
}
UpdateRegionData(active_file_id, ALL_REGIONS, z_changed, stokes_changed);
} else {
if (!err_message.empty()) {
Expand Down Expand Up @@ -2171,6 +2206,8 @@ bool Session::ExecuteAnimationFrame() {
return false;
}

auto animation_id = _animation_id; // Make sure tile data message has an id

auto wait_duration_ms = std::chrono::duration_cast<std::chrono::microseconds>(
_animation_object->_t_last + _animation_object->_frame_interval - std::chrono::high_resolution_clock::now());

Expand All @@ -2183,7 +2220,7 @@ bool Session::ExecuteAnimationFrame() {
}

curr_frame = _animation_object->_next_frame;
ExecuteAnimationFrameInner();
ExecuteAnimationFrameInner(animation_id);

CARTA::AnimationFrame tmp_frame;
CARTA::AnimationFrame delta_frame = _animation_object->_delta_frame;
Expand Down
5 changes: 3 additions & 2 deletions src/Session/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Session {
bool OnOpenFile(int file_id, const string& name, std::shared_ptr<casacore::ImageInterface<casacore::Float>> image,
CARTA::OpenFileAck* open_file_ack);
void OnCloseFile(const CARTA::CloseFile& message);
void OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, bool skip_data = false);
void OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int animation_id = 0, bool skip_data = false);
void OnSetImageChannels(const CARTA::SetImageChannels& message);
void OnSetCursor(const CARTA::SetCursor& message, uint32_t request_id);
bool OnSetRegion(const CARTA::SetRegion& message, uint32_t request_id, bool silent = false);
Expand Down Expand Up @@ -140,7 +140,7 @@ class Session {
}
void BuildAnimationObject(CARTA::StartAnimation& msg, uint32_t request_id);
bool ExecuteAnimationFrame();
void ExecuteAnimationFrameInner();
void ExecuteAnimationFrameInner(int animation_id);
void StopAnimation(int file_id, const ::CARTA::AnimationFrame& frame);
void HandleAnimationFlowControlEvt(CARTA::AnimationFlowControl& message);
int CurrentFlowWindowSize() {
Expand Down Expand Up @@ -329,6 +329,7 @@ class Session {
SessionContext _animation_context;

std::atomic<int> _ref_count;
int _sync_id;
int _animation_id;
bool _connected;
static volatile int _num_sessions;
Expand Down
8 changes: 6 additions & 2 deletions src/Util/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,15 @@ CARTA::SpatialProfileData Message::SpatialProfileData(int32_t x, int32_t y, int3
return message;
}

CARTA::RasterTileSync Message::RasterTileSync(int32_t file_id, int32_t channel, int32_t stokes, int32_t animation_id, bool end_sync) {
CARTA::RasterTileSync Message::RasterTileSync(
int32_t file_id, int32_t channel, int32_t stokes, int32_t sync_id, int32_t animation_id, int32_t tile_count, bool end_sync) {
CARTA::RasterTileSync message;
message.set_file_id(file_id);
message.set_channel(channel);
message.set_stokes(stokes);
message.set_sync_id(sync_id);
message.set_animation_id(animation_id);
message.set_tile_count(tile_count);
message.set_end_sync(end_sync);
return message;
}
Expand Down Expand Up @@ -637,9 +640,10 @@ CARTA::FileInfo Message::FileInfo(const std::string& name, CARTA::FileType type,
return message;
}

CARTA::RasterTileData Message::RasterTileData(int32_t file_id, int32_t animation_id) {
CARTA::RasterTileData Message::RasterTileData(int32_t file_id, int32_t sync_id, int32_t animation_id) {
CARTA::RasterTileData message;
message.set_file_id(file_id);
message.set_sync_id(sync_id);
message.set_animation_id(animation_id);
return message;
}
Expand Down
5 changes: 3 additions & 2 deletions src/Util/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ class Message {
int32_t stokes, float value, int32_t start, int32_t end, std::vector<float>& profile, std::string& coordinate, int32_t mip,
CARTA::ProfileAxisType axis_type, float crpix, float crval, float cdelt, std::string& unit);
static CARTA::SpatialProfileData SpatialProfileData(int32_t x, int32_t y, int32_t channel, int32_t stokes, float value);
static CARTA::RasterTileSync RasterTileSync(int32_t file_id, int32_t channel, int32_t stokes, int32_t animation_id, bool end_sync);
static CARTA::RasterTileSync RasterTileSync(
int32_t file_id, int32_t channel, int32_t stokes, int32_t sync_id, int32_t animation_id, int32_t tile_count, bool end_sync);
static CARTA::SetRegionAck SetRegionAck(int32_t region_id, bool success, std::string err_message);
static CARTA::RegisterViewerAck RegisterViewerAck(
uint32_t session_id, bool success, const std::string& status, const CARTA::SessionType& type);
Expand All @@ -141,7 +142,7 @@ class Message {
int32_t stokes_angle, const CARTA::CompressionType& compression_type, float compression_quality);
static CARTA::ErrorData ErrorData(const std::string& message, std::vector<std::string> tags, CARTA::ErrorSeverity severity);
static CARTA::FileInfo FileInfo(const std::string& name, CARTA::FileType type, int64_t size = 0, const std::string& hdu = "");
static CARTA::RasterTileData RasterTileData(int32_t file_id, int32_t animation_id);
static CARTA::RasterTileData RasterTileData(int32_t file_id, int32_t sync_id, int32_t animation_id);
static CARTA::StartAnimationAck StartAnimationAck(bool success, int32_t animation_id, const std::string& message);
static CARTA::ImportRegionAck ImportRegionAck(bool success, const std::string& message);
static CARTA::RegionStatsData RegionStatsData(int32_t file_id, int32_t region_id, int32_t channel, int32_t stokes);
Expand Down
Loading