Skip to content

Commit 382c1ea

Browse files
authored
Revert "enhance(sdk-rs): switch from reqwest::blocking to regular one" (#7245)
1 parent d40d3a5 commit 382c1ea

File tree

3 files changed

+48
-123
lines changed

3 files changed

+48
-123
lines changed

.changeset/wet-windows-walk.md

Lines changed: 0 additions & 11 deletions
This file was deleted.

packages/libraries/router/src/registry.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use crate::consts::PLUGIN_VERSION;
22
use crate::registry_logger::Logger;
33
use anyhow::{anyhow, Result};
4-
use futures::executor::block_on;
54
use hive_console_sdk::supergraph_fetcher::SupergraphFetcher;
65
use sha2::Digest;
76
use sha2::Sha256;
87
use std::env;
98
use std::io::Write;
9+
use std::thread;
1010
use std::time::Duration;
1111

1212
#[derive(Debug)]
@@ -130,7 +130,6 @@ impl HiveRegistry {
130130
Duration::from_secs(5),
131131
Duration::from_secs(60),
132132
accept_invalid_certs,
133-
3,
134133
)
135134
.map_err(|e| anyhow!("Failed to create SupergraphFetcher: {}", e))?,
136135
file_name,
@@ -149,19 +148,17 @@ impl HiveRegistry {
149148
}
150149
}
151150

152-
tokio::spawn(async move {
153-
loop {
154-
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
155-
registry.poll().await;
156-
}
151+
thread::spawn(move || loop {
152+
thread::sleep(std::time::Duration::from_secs(poll_interval));
153+
registry.poll()
157154
});
158155

159156
Ok(())
160157
}
161158

162159
fn initial_supergraph(&mut self) -> Result<(), String> {
163160
let mut file = std::fs::File::create(self.file_name.clone()).map_err(|e| e.to_string())?;
164-
let resp = block_on(self.fetcher.fetch_supergraph()).map_err(|e| e.to_string())?;
161+
let resp = self.fetcher.fetch_supergraph()?;
165162

166163
match resp {
167164
Some(supergraph) => {
@@ -176,8 +173,8 @@ impl HiveRegistry {
176173
Ok(())
177174
}
178175

179-
async fn poll(&mut self) {
180-
match self.fetcher.fetch_supergraph().await {
176+
fn poll(&mut self) {
177+
match self.fetcher.fetch_supergraph() {
181178
Ok(new_supergraph) => {
182179
if let Some(new_supergraph) = new_supergraph {
183180
let current_file = std::fs::read_to_string(self.file_name.clone())
Lines changed: 41 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,12 @@
1-
use std::fmt::Display;
2-
use std::sync::RwLock;
31
use std::time::Duration;
42

5-
use reqwest::header::HeaderMap;
6-
use reqwest::header::HeaderValue;
7-
use reqwest_middleware::ClientBuilder;
8-
use reqwest_middleware::ClientWithMiddleware;
9-
use reqwest_retry::policies::ExponentialBackoff;
10-
use reqwest_retry::RetryTransientMiddleware;
11-
123
#[derive(Debug)]
134
pub struct SupergraphFetcher {
14-
agent: ClientWithMiddleware,
5+
client: reqwest::blocking::Client,
156
endpoint: String,
16-
headers: RwLock<HeaderMap>,
17-
}
18-
19-
pub enum SupergraphFetcherError {
20-
FetcherCreationError(reqwest::Error),
21-
NetworkError(reqwest_middleware::Error),
22-
NetworkResponseError(reqwest::Error),
23-
HeadersLock(String),
24-
InvalidKey(String),
25-
InvalidUserAgent(String),
26-
}
27-
28-
impl Display for SupergraphFetcherError {
29-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30-
match self {
31-
SupergraphFetcherError::FetcherCreationError(e) => {
32-
write!(f, "Creating fetcher failed: {}", e)
33-
}
34-
SupergraphFetcherError::NetworkError(e) => write!(f, "Network error: {}", e),
35-
SupergraphFetcherError::NetworkResponseError(e) => {
36-
write!(f, "Network response error: {}", e)
37-
}
38-
SupergraphFetcherError::HeadersLock(e) => write!(f, "Headers lock error: {}", e),
39-
SupergraphFetcherError::InvalidKey(e) => write!(f, "Invalid key: {}", e),
40-
SupergraphFetcherError::InvalidUserAgent(e) => {
41-
write!(f, "Invalid user agent: {}", e)
42-
}
43-
}
44-
}
7+
key: String,
8+
user_agent: String,
9+
etag: Option<String>,
4510
}
4611

4712
impl SupergraphFetcher {
@@ -52,96 +17,70 @@ impl SupergraphFetcher {
5217
connect_timeout: Duration,
5318
request_timeout: Duration,
5419
accept_invalid_certs: bool,
55-
retry_count: u32,
56-
) -> Result<Self, SupergraphFetcherError> {
20+
) -> Result<Self, String> {
5721
let mut endpoint = endpoint;
5822
if !endpoint.ends_with("/supergraph") {
5923
if endpoint.ends_with("/") {
60-
endpoint.push_str("supergraph");
24+
endpoint.push_str("supergraph")
6125
} else {
62-
endpoint.push_str("/supergraph");
26+
endpoint.push_str("/supergraph")
6327
}
6428
}
6529

66-
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(retry_count);
67-
68-
let reqwest_agent = reqwest::Client::builder()
30+
let client = reqwest::blocking::Client::builder()
6931
.danger_accept_invalid_certs(accept_invalid_certs)
7032
.connect_timeout(connect_timeout)
7133
.timeout(request_timeout)
7234
.build()
73-
.map_err(SupergraphFetcherError::FetcherCreationError)?;
74-
75-
let agent = ClientBuilder::new(reqwest_agent)
76-
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
77-
.build();
78-
79-
let mut headers = HeaderMap::new();
80-
headers.insert(
81-
reqwest::header::USER_AGENT,
82-
HeaderValue::from_str(&user_agent)
83-
.map_err(|err| SupergraphFetcherError::InvalidUserAgent(err.to_string()))?,
84-
);
85-
headers.insert(
86-
"X-Hive-CDN-Key",
87-
HeaderValue::from_str(&key)
88-
.map_err(|err| SupergraphFetcherError::InvalidKey(err.to_string()))?,
89-
);
35+
.map_err(|e| e.to_string())?;
9036

9137
Ok(Self {
92-
agent,
38+
client,
9339
endpoint,
94-
headers: RwLock::new(headers),
40+
key,
41+
user_agent,
42+
etag: None,
9543
})
9644
}
9745

98-
pub async fn fetch_supergraph(&self) -> Result<Option<String>, SupergraphFetcherError> {
99-
let headers = self.get_headers()?;
46+
pub fn fetch_supergraph(&mut self) -> Result<Option<String>, String> {
47+
let mut headers = reqwest::header::HeaderMap::new();
48+
49+
headers.insert(
50+
reqwest::header::USER_AGENT,
51+
reqwest::header::HeaderValue::from_str(&self.user_agent).unwrap(),
52+
);
53+
headers.insert("X-Hive-CDN-Key", self.key.parse().unwrap());
54+
55+
if let Some(checksum) = &self.etag {
56+
headers.insert("If-None-Match", checksum.parse().unwrap());
57+
}
10058

10159
let resp = self
102-
.agent
103-
.get(self.endpoint.clone())
60+
.client
61+
.get(self.endpoint.as_str())
10462
.headers(headers)
10563
.send()
106-
.await
107-
.map_err(SupergraphFetcherError::NetworkError)?;
64+
.map_err(|e| e.to_string())?;
65+
66+
match resp.headers().get("etag") {
67+
Some(checksum) => {
68+
let etag = checksum.to_str().map_err(|e| e.to_string())?;
69+
self.update_latest_etag(Some(etag.to_string()));
70+
}
71+
None => {
72+
self.update_latest_etag(None);
73+
}
74+
}
10875

10976
if resp.status().as_u16() == 304 {
11077
return Ok(None);
11178
}
11279

113-
let etag = resp.headers().get("etag");
114-
self.update_latest_etag(etag)?;
115-
116-
let text = resp
117-
.text()
118-
.await
119-
.map_err(SupergraphFetcherError::NetworkResponseError)?;
120-
121-
Ok(Some(text))
122-
}
123-
124-
fn get_headers(&self) -> Result<HeaderMap, SupergraphFetcherError> {
125-
let guard = self.headers.try_read().map_err(|e| {
126-
SupergraphFetcherError::HeadersLock(format!("Failed to read the etag record: {:?}", e))
127-
})?;
128-
Ok(guard.clone())
80+
Ok(Some(resp.text().map_err(|e| e.to_string())?))
12981
}
13082

131-
fn update_latest_etag(&self, etag: Option<&HeaderValue>) -> Result<(), SupergraphFetcherError> {
132-
let mut guard = self.headers.try_write().map_err(|e| {
133-
SupergraphFetcherError::HeadersLock(format!(
134-
"Failed to update the etag record: {:?}",
135-
e
136-
))
137-
})?;
138-
139-
if let Some(etag_value) = etag {
140-
guard.insert("If-None-Match", etag_value.clone());
141-
} else {
142-
guard.remove("If-None-Match");
143-
}
144-
145-
Ok(())
83+
fn update_latest_etag(&mut self, etag: Option<String>) {
84+
self.etag = etag;
14685
}
14786
}

0 commit comments

Comments
 (0)