From 9972a0c1159c217ddd9acd0c9314218697cf3a8c Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Thu, 27 Feb 2025 11:35:19 -0300
Subject: [PATCH 1/7] Fix tokio task leak

---
 src/agent/services/oracle.rs | 53 +++++++++++++++++++++++++++---------
 src/agent/state/oracle.rs    |  8 ++++--
 2 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs
index b64dbe4..d86bffa 100644
--- a/src/agent/services/oracle.rs
+++ b/src/agent/services/oracle.rs
@@ -13,7 +13,7 @@ use {
         },
         state::oracle::Oracle,
     },
-    anyhow::Result,
+    anyhow::{Context, Result},
     solana_account_decoder::UiAccountEncoding,
     solana_client::{
         nonblocking::{
@@ -67,6 +67,12 @@ where
     )));
 
     if config.oracle.subscriber_enabled {
+        let number_of_workers = 100;
+        let channel_size = 1000;
+        let (sender, receiver) = tokio::sync::mpsc::channel(channel_size);
+        let max_elapsed_time = Duration::from_secs(30);
+        let sleep_time = Duration::from_secs(1);
+
         handles.push(tokio::spawn(async move {
             loop {
                 let current_time = Instant::now();
@@ -75,17 +81,34 @@ where
                     network,
                     state.clone(),
                     key_store.pyth_oracle_program_key,
+                    sender.clone(),
                 )
                 .await
                 {
-                    tracing::error!(err = ?err, "Subscriber exited unexpectedly.");
-                    if current_time.elapsed() < Duration::from_secs(30) {
-                        tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second.");
-                        tokio::time::sleep(Duration::from_secs(1)).await;
+                    tracing::error!(?err, "Subscriber exited unexpectedly");
+                    if current_time.elapsed() < max_elapsed_time {
+                        tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
+                        tokio::time::sleep(sleep_time).await;
                     }
                 }
             }
         }));
+
+        let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
+        for _ in 0..number_of_workers {
+            let receiver = receiver.clone();
+            handles.push(tokio::spawn(async move {
+                loop {
+                    let mut receiver = receiver.lock().await;
+                    if let Some(task) = receiver.recv().await {
+                        drop(receiver);
+                        if let Err(err) = task.await {
+                            tracing::error!(%err, "error running price update");
+                        }
+                    }
+                }
+            }));
+        }
     }
 
     handles
@@ -102,6 +125,7 @@ async fn subscriber<S>(
     network: Network,
     state: Arc<S>,
     program_key: Pubkey,
+    sender: tokio::sync::mpsc::Sender<tokio::task::JoinHandle<()>>,
 ) -> Result<()>
 where
     S: Oracle,
@@ -129,14 +153,17 @@ where
             Some(account) => {
                 let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?;
                 let state = state.clone();
-                tokio::spawn(async move {
-                    if let Err(err) =
-                        Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
-                            .await
-                    {
-                        tracing::error!(err = ?err, "Failed to handle account update.");
-                    }
-                });
+                sender
+                    .send(tokio::spawn(async move {
+                        if let Err(err) =
+                            Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
+                                .await
+                        {
+                            tracing::error!(?err, "Failed to handle account update");
+                        }
+                    }))
+                    .await
+                    .context("sending handle_price_account_update task to worker")?;
             }
 
             None => {
diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs
index 761280d..2f0fd1e 100644
--- a/src/agent/state/oracle.rs
+++ b/src/agent/state/oracle.rs
@@ -241,6 +241,7 @@ where
         );
 
         data.price_accounts.insert(*account_key, price_entry.into());
+        drop(data);
 
         Prices::update_global_price(
             self,
@@ -333,13 +334,16 @@ where
         let mut data = self.into().data.write().await;
         log_data_diff(&data, &new_data);
         *data = new_data;
+        let data_publisher_permissions = data.publisher_permissions.clone();
+        let data_publisher_buffer_key = data.publisher_buffer_key;
+        drop(data);
 
         Exporter::update_on_chain_state(
             self,
             network,
             publish_keypair,
-            data.publisher_permissions.clone(),
-            data.publisher_buffer_key,
+            data_publisher_permissions,
+            data_publisher_buffer_key,
         )
         .await?;
 

From 63f0b9083868d69ab06c4a01907a5e5501a4f62d Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Thu, 27 Feb 2025 12:02:21 -0300
Subject: [PATCH 2/7] Using config

---
 src/agent/services/oracle.rs | 12 ++++++------
 src/agent/state/oracle.rs    | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs
index d86bffa..bbf2292 100644
--- a/src/agent/services/oracle.rs
+++ b/src/agent/services/oracle.rs
@@ -67,11 +67,11 @@ where
     )));
 
     if config.oracle.subscriber_enabled {
-        let number_of_workers = 100;
-        let channel_size = 1000;
-        let (sender, receiver) = tokio::sync::mpsc::channel(channel_size);
-        let max_elapsed_time = Duration::from_secs(30);
-        let sleep_time = Duration::from_secs(1);
+        let number_of_workers = config.oracle.handle_price_account_update_worker_poll_size;
+        let (sender, receiver) =
+            tokio::sync::mpsc::channel(config.oracle.handle_price_account_update_channel_size);
+        let min_elapsed_time = config.oracle.subscriber_finished_min_time;
+        let sleep_time = config.oracle.subscriber_finished_sleep_time;
 
         handles.push(tokio::spawn(async move {
             loop {
@@ -86,7 +86,7 @@ where
                 .await
                 {
                     tracing::error!(?err, "Subscriber exited unexpectedly");
-                    if current_time.elapsed() < max_elapsed_time {
+                    if current_time.elapsed() < min_elapsed_time {
                         tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
                         tokio::time::sleep(sleep_time).await;
                     }
diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs
index 2f0fd1e..0a10746 100644
--- a/src/agent/state/oracle.rs
+++ b/src/agent/state/oracle.rs
@@ -138,6 +138,22 @@ pub struct Data {
     pub publisher_buffer_key:  Option<Pubkey>,
 }
 
+fn default_handle_price_account_update_channel_size() -> usize {
+    1000
+}
+
+fn default_handle_price_account_update_worker_poll_size() -> usize {
+    50
+}
+
+fn default_subscriber_finished_min_time() -> Duration {
+    Duration::from_secs(30)
+}
+
+fn default_subscriber_finished_sleep_time() -> Duration {
+    Duration::from_secs(1)
+}
+
 #[derive(Clone, Serialize, Deserialize, Debug)]
 #[serde(default)]
 pub struct Config {
@@ -159,6 +175,19 @@ pub struct Config {
     /// socket count at bay, the batches are looked up sequentially,
     /// trading off overall time it takes to fetch all symbols.
     pub max_lookup_batch_size: usize,
+
+    /// Number of workers used to wait for the handle_price_account_update
+    #[serde(default = "default_handle_price_account_update_worker_poll_size")]
+    pub handle_price_account_update_worker_poll_size: usize,
+    /// Channel size used to wait for the handle_price_account_update
+    #[serde(default = "default_handle_price_account_update_channel_size")]
+    pub handle_price_account_update_channel_size: usize,
+    /// Minimum time for a subscriber to run
+    #[serde(default = "default_subscriber_finished_min_time")]
+    pub subscriber_finished_min_time: Duration,
+    /// Time to sleep if the subscriber do not run for more than the minimum time
+    #[serde(default = "default_subscriber_finished_sleep_time")]
+    pub subscriber_finished_sleep_time: Duration,
 }
 
 impl Default for Config {
@@ -170,6 +199,12 @@ impl Default for Config {
             updates_channel_capacity: 10000,
             data_channel_capacity:    10000,
             max_lookup_batch_size:    100,
+            handle_price_account_update_worker_poll_size:
+                default_handle_price_account_update_worker_poll_size(),
+            handle_price_account_update_channel_size:
+                default_handle_price_account_update_channel_size(),
+            subscriber_finished_min_time: default_subscriber_finished_min_time(),
+            subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
         }
     }
 }

From 228d0af70fc29a79f0ec41a72d0abb72296aaa2e Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Thu, 27 Feb 2025 12:07:01 -0300
Subject: [PATCH 3/7] Using config file

---
 config/config.toml        | 9 +++++++++
 src/agent/state/oracle.rs | 2 +-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/config/config.toml b/config/config.toml
index 51a4498..f168340 100644
--- a/config/config.toml
+++ b/config/config.toml
@@ -73,6 +73,15 @@ key_store.pyth_price_store_program_key = "3m6sv6HGqEbuyLV84mD7rJn4MAC9LhUa1y1AUN
 # takes to fetch all symbols.
 # oracle.max_lookup_batch_size = 100
 
+# Number of workers used to wait for the handle_price_account_update
+# oracle.handle_price_account_update_worker_poll_size = 25
+# Channel size used to wait for the handle_price_account_update
+# oracle.handle_price_account_update_channel_size = 1000
+# Minimum time for a subscriber to run
+# oracle.subscriber_finished_min_time = "30s"
+# Time to sleep if the subscriber do not run for more than the minimum time
+# oracle.subscriber_finished_sleep_time = "1s"
+
 # How often to refresh the cached network state (current slot and blockhash).
 # It is recommended to set this to slightly less than the network's block time,
 # as the slot fetched will be used as the time of the price update.
diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs
index 0a10746..5065c4f 100644
--- a/src/agent/state/oracle.rs
+++ b/src/agent/state/oracle.rs
@@ -143,7 +143,7 @@ fn default_handle_price_account_update_channel_size() -> usize {
 }
 
 fn default_handle_price_account_update_worker_poll_size() -> usize {
-    50
+    25
 }
 
 fn default_subscriber_finished_min_time() -> Duration {

From 927ae6cec0fb2e45d3fe1b208367dd578bd41c3a Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Thu, 27 Feb 2025 13:08:21 -0300
Subject: [PATCH 4/7] Bump version

---
 Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/Cargo.toml b/Cargo.toml
index b92d9a1..7aec16e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "pyth-agent"
-version = "2.12.1"
+version = "2.12.2"
 edition = "2021"
 
 [[bin]]

From ff9eeba58a0bcb18185d6da6a9395e7bc656db67 Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Fri, 28 Feb 2025 15:42:29 -0300
Subject: [PATCH 5/7] Format

---
 src/agent/services/oracle.rs |  5 ++++-
 src/agent/state/oracle.rs    | 22 +++++++++++-----------
 2 files changed, 15 insertions(+), 12 deletions(-)

diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs
index bbf2292..a8cb4a6 100644
--- a/src/agent/services/oracle.rs
+++ b/src/agent/services/oracle.rs
@@ -13,7 +13,10 @@ use {
         },
         state::oracle::Oracle,
     },
-    anyhow::{Context, Result},
+    anyhow::{
+        Context,
+        Result,
+    },
     solana_account_decoder::UiAccountEncoding,
     solana_client::{
         nonblocking::{
diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs
index 5065c4f..7db379b 100644
--- a/src/agent/state/oracle.rs
+++ b/src/agent/state/oracle.rs
@@ -181,30 +181,30 @@ pub struct Config {
     pub handle_price_account_update_worker_poll_size: usize,
     /// Channel size used to wait for the handle_price_account_update
     #[serde(default = "default_handle_price_account_update_channel_size")]
-    pub handle_price_account_update_channel_size: usize,
+    pub handle_price_account_update_channel_size:     usize,
     /// Minimum time for a subscriber to run
     #[serde(default = "default_subscriber_finished_min_time")]
-    pub subscriber_finished_min_time: Duration,
+    pub subscriber_finished_min_time:                 Duration,
     /// Time to sleep if the subscriber do not run for more than the minimum time
     #[serde(default = "default_subscriber_finished_sleep_time")]
-    pub subscriber_finished_sleep_time: Duration,
+    pub subscriber_finished_sleep_time:               Duration,
 }
 
 impl Default for Config {
     fn default() -> Self {
         Self {
-            commitment:               CommitmentLevel::Confirmed,
-            poll_interval_duration:   Duration::from_secs(5),
-            subscriber_enabled:       true,
-            updates_channel_capacity: 10000,
-            data_channel_capacity:    10000,
-            max_lookup_batch_size:    100,
+            commitment:                                   CommitmentLevel::Confirmed,
+            poll_interval_duration:                       Duration::from_secs(5),
+            subscriber_enabled:                           true,
+            updates_channel_capacity:                     10000,
+            data_channel_capacity:                        10000,
+            max_lookup_batch_size:                        100,
             handle_price_account_update_worker_poll_size:
                 default_handle_price_account_update_worker_poll_size(),
             handle_price_account_update_channel_size:
                 default_handle_price_account_update_channel_size(),
-            subscriber_finished_min_time: default_subscriber_finished_min_time(),
-            subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
+            subscriber_finished_min_time:                 default_subscriber_finished_min_time(),
+            subscriber_finished_sleep_time:               default_subscriber_finished_sleep_time(),
         }
     }
 }

From e199600796206029899ac9273a8bbb22eba58507 Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Wed, 12 Mar 2025 14:55:47 -0300
Subject: [PATCH 6/7] Removing worker group

---
 Cargo.lock                   |  2 +-
 config/config.toml           |  4 ----
 src/agent/services/oracle.rs | 40 ++++++++----------------------------
 src/agent/state/oracle.rs    | 38 +++++++++-------------------------
 4 files changed, 19 insertions(+), 65 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 054b095..6735e99 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3400,7 +3400,7 @@ dependencies = [
 
 [[package]]
 name = "pyth-agent"
-version = "2.12.1"
+version = "2.12.2"
 dependencies = [
  "anyhow",
  "async-trait",
diff --git a/config/config.toml b/config/config.toml
index f168340..d38b5bb 100644
--- a/config/config.toml
+++ b/config/config.toml
@@ -73,10 +73,6 @@ key_store.pyth_price_store_program_key = "3m6sv6HGqEbuyLV84mD7rJn4MAC9LhUa1y1AUN
 # takes to fetch all symbols.
 # oracle.max_lookup_batch_size = 100
 
-# Number of workers used to wait for the handle_price_account_update
-# oracle.handle_price_account_update_worker_poll_size = 25
-# Channel size used to wait for the handle_price_account_update
-# oracle.handle_price_account_update_channel_size = 1000
 # Minimum time for a subscriber to run
 # oracle.subscriber_finished_min_time = "30s"
 # Time to sleep if the subscriber do not run for more than the minimum time
diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs
index a8cb4a6..ebff723 100644
--- a/src/agent/services/oracle.rs
+++ b/src/agent/services/oracle.rs
@@ -70,9 +70,6 @@ where
     )));
 
     if config.oracle.subscriber_enabled {
-        let number_of_workers = config.oracle.handle_price_account_update_worker_poll_size;
-        let (sender, receiver) =
-            tokio::sync::mpsc::channel(config.oracle.handle_price_account_update_channel_size);
         let min_elapsed_time = config.oracle.subscriber_finished_min_time;
         let sleep_time = config.oracle.subscriber_finished_sleep_time;
 
@@ -84,7 +81,6 @@ where
                     network,
                     state.clone(),
                     key_store.pyth_oracle_program_key,
-                    sender.clone(),
                 )
                 .await
                 {
@@ -96,22 +92,6 @@ where
                 }
             }
         }));
-
-        let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
-        for _ in 0..number_of_workers {
-            let receiver = receiver.clone();
-            handles.push(tokio::spawn(async move {
-                loop {
-                    let mut receiver = receiver.lock().await;
-                    if let Some(task) = receiver.recv().await {
-                        drop(receiver);
-                        if let Err(err) = task.await {
-                            tracing::error!(%err, "error running price update");
-                        }
-                    }
-                }
-            }));
-        }
     }
 
     handles
@@ -128,7 +108,6 @@ async fn subscriber<S>(
     network: Network,
     state: Arc<S>,
     program_key: Pubkey,
-    sender: tokio::sync::mpsc::Sender<tokio::task::JoinHandle<()>>,
 ) -> Result<()>
 where
     S: Oracle,
@@ -156,17 +135,14 @@ where
             Some(account) => {
                 let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?;
                 let state = state.clone();
-                sender
-                    .send(tokio::spawn(async move {
-                        if let Err(err) =
-                            Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
-                                .await
-                        {
-                            tracing::error!(?err, "Failed to handle account update");
-                        }
-                    }))
-                    .await
-                    .context("sending handle_price_account_update task to worker")?;
+                tokio::spawn(async move {
+                    if let Err(err) =
+                        Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
+                            .await
+                    {
+                        tracing::error!(?err, "Failed to handle account update");
+                    }
+                });
             }
 
             None => {
diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs
index 7db379b..90a0fa8 100644
--- a/src/agent/state/oracle.rs
+++ b/src/agent/state/oracle.rs
@@ -138,14 +138,6 @@ pub struct Data {
     pub publisher_buffer_key:  Option<Pubkey>,
 }
 
-fn default_handle_price_account_update_channel_size() -> usize {
-    1000
-}
-
-fn default_handle_price_account_update_worker_poll_size() -> usize {
-    25
-}
-
 fn default_subscriber_finished_min_time() -> Duration {
     Duration::from_secs(30)
 }
@@ -176,35 +168,25 @@ pub struct Config {
     /// trading off overall time it takes to fetch all symbols.
     pub max_lookup_batch_size: usize,
 
-    /// Number of workers used to wait for the handle_price_account_update
-    #[serde(default = "default_handle_price_account_update_worker_poll_size")]
-    pub handle_price_account_update_worker_poll_size: usize,
-    /// Channel size used to wait for the handle_price_account_update
-    #[serde(default = "default_handle_price_account_update_channel_size")]
-    pub handle_price_account_update_channel_size:     usize,
     /// Minimum time for a subscriber to run
     #[serde(default = "default_subscriber_finished_min_time")]
-    pub subscriber_finished_min_time:                 Duration,
+    pub subscriber_finished_min_time:   Duration,
     /// Time to sleep if the subscriber do not run for more than the minimum time
     #[serde(default = "default_subscriber_finished_sleep_time")]
-    pub subscriber_finished_sleep_time:               Duration,
+    pub subscriber_finished_sleep_time: Duration,
 }
 
 impl Default for Config {
     fn default() -> Self {
         Self {
-            commitment:                                   CommitmentLevel::Confirmed,
-            poll_interval_duration:                       Duration::from_secs(5),
-            subscriber_enabled:                           true,
-            updates_channel_capacity:                     10000,
-            data_channel_capacity:                        10000,
-            max_lookup_batch_size:                        100,
-            handle_price_account_update_worker_poll_size:
-                default_handle_price_account_update_worker_poll_size(),
-            handle_price_account_update_channel_size:
-                default_handle_price_account_update_channel_size(),
-            subscriber_finished_min_time:                 default_subscriber_finished_min_time(),
-            subscriber_finished_sleep_time:               default_subscriber_finished_sleep_time(),
+            commitment:                     CommitmentLevel::Confirmed,
+            poll_interval_duration:         Duration::from_secs(5),
+            subscriber_enabled:             true,
+            updates_channel_capacity:       10000,
+            data_channel_capacity:          10000,
+            max_lookup_batch_size:          100,
+            subscriber_finished_min_time:   default_subscriber_finished_min_time(),
+            subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
         }
     }
 }

From ad05964d58851b82c360cbb9acbec180dbcb3590 Mon Sep 17 00:00:00 2001
From: Rodolfo <rodoufu@gmail.com>
Date: Wed, 12 Mar 2025 15:02:27 -0300
Subject: [PATCH 7/7] Fix unused imports

---
 src/agent/services/oracle.rs | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)

diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs
index ebff723..e89ccbe 100644
--- a/src/agent/services/oracle.rs
+++ b/src/agent/services/oracle.rs
@@ -13,10 +13,7 @@ use {
         },
         state::oracle::Oracle,
     },
-    anyhow::{
-        Context,
-        Result,
-    },
+    anyhow::Result,
     solana_account_decoder::UiAccountEncoding,
     solana_client::{
         nonblocking::{
@@ -36,10 +33,7 @@ use {
     },
     std::{
         sync::Arc,
-        time::{
-            Duration,
-            Instant,
-        },
+        time::Instant,
     },
     tokio::task::JoinHandle,
     tokio_stream::StreamExt,