Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring CurlEngine + Add Curl Socket Reuse Support #2604

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 279 additions & 3 deletions src/curlEngine.d
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,204 @@ import std.net.curl;
import etc.c.curl: CurlOption;
import std.datetime;
import std.conv;
import std.file;
import std.json;
import std.stdio;
import std.range;

// What other modules that we have created do we need to import?
import log;

class CurlResponse {
HTTP.Method method;
const(char)[] url;
const(char)[][const(char)[]] requestHeaders;
const(char)[] postBody;

string[string] responseHeaders;
HTTP.StatusLine statusLine;
char[] content;

void reset() {
method = HTTP.Method.undefined;
url = null;
requestHeaders = null;
postBody = null;

responseHeaders = null;
object.destroy(statusLine);
content = null;
}

void addRequestHeader(const(char)[] name, const(char)[] value) {
requestHeaders[name] = value;
}

void connect(HTTP.Method method, const(char)[] url) {
this.method = method;
this.url = url;
}

const JSONValue json() {
JSONValue json;
try {
json = content.parseJSON();
} catch (JSONException e) {
// Log that a JSON Exception was caught, dont output the HTML response from OneDrive
addLogEntry("JSON Exception caught when performing HTTP operations - use --debug-https to diagnose further", ["debug"]);
}
return json;
};

void update(HTTP *http) {
this.responseHeaders = http.responseHeaders();
this.statusLine = http.statusLine;
}

@safe pure HTTP.StatusLine getStatus() {
return this.statusLine;
}

// Return the current value of retryAfterValue
ulong getRetryAfterValue() {
ulong delayBeforeRetry;
// is retry-after in the response headers
if ("retry-after" in responseHeaders) {
// Set the retry-after value
addLogEntry("curlEngine.http.perform() => Received a 'Retry-After' Header Response with the following value: " ~ to!string(responseHeaders["retry-after"]), ["debug"]);
addLogEntry("curlEngine.http.perform() => Setting retryAfterValue to: " ~ responseHeaders["retry-after"], ["debug"]);
delayBeforeRetry = to!ulong(responseHeaders["retry-after"]);
} else {
// Use a 120 second delay as a default given header value was zero
// This value is based on log files and data when determining correct process for 429 response handling
delayBeforeRetry = 120;
// Update that we are over-riding the provided value with a default
addLogEntry("HTTP Response Header retry-after value was 0 - Using a preconfigured default of: " ~ to!string(delayBeforeRetry), ["debug"]);
}

return delayBeforeRetry; // default to 60 seconds
}

const string parseHeaders(const(string[string]) headers) {
string responseHeadersStr = "";
foreach (const(char)[] header; headers.byKey()) {
responseHeadersStr ~= "> " ~ header ~ ": " ~ headers[header] ~ "\n";
}
return responseHeadersStr;
}


const string parseHeaders(const(const(char)[][const(char)[]]) headers) {
string responseHeadersStr = "";
foreach (string header; headers.byKey()) {
if (header == "Authorization")
continue;
responseHeadersStr ~= "< " ~ header ~ ": " ~ headers[header] ~ "\n";
}
return responseHeadersStr;
}

const string dumpDebug() {
import std.range;
import std.format : format;

string str = "";
str ~= format("< %s %s\n", method, url);
if (!requestHeaders.empty) {
str ~= parseHeaders(requestHeaders);
}
if (!postBody.empty) {
str ~= format("----\n%s\n----\n", postBody);
}
str ~= format("< %s\n", statusLine);
if (!responseHeaders.empty) {
str ~= parseHeaders(responseHeaders);
}
return str;
}

const string dumpResponse() {
import std.range;
import std.format : format;

string str = "";
if (!content.empty) {
str ~= format("----\n%s\n----\n", content);
}
return str;
}

override string toString() const {
string str = "Curl debugging: \n";
str ~= dumpDebug();
str ~= "Curl response: \n";
str ~= dumpResponse();
return str;
}

CurlResponse dup() {
CurlResponse copy = new CurlResponse();
copy.method = method;
copy.url = url;
copy.requestHeaders = requestHeaders;
copy.postBody = postBody;

copy.responseHeaders = responseHeaders;
copy.statusLine = statusLine;
copy.content = content;

return copy;
}
}

class CurlEngine {

__gshared CurlEngine[] curlEnginePool;

static CurlEngine get() {
synchronized(CurlEngine.classinfo) {
if (curlEnginePool.empty) {
return new CurlEngine;
} else {
CurlEngine curlEngine = curlEnginePool[$-1];
curlEnginePool.popBack();
return curlEngine;
}
}
}

static releaseAll() {
synchronized(CurlEngine.classinfo) {
foreach(curlEngine; curlEnginePool) {
curlEngine.shutdown();
}
curlEnginePool = null;
}
}

void release() {
cleanUp();
synchronized(CurlEngine.classinfo) {
curlEnginePool ~= this;
}
}

HTTP http;
bool keepAlive;
ulong dnsTimeout;

CurlResponse response;

this() {
http = HTTP();
response = new CurlResponse();
}

~this() {
object.destroy(http);
object.destroy(response);
}

void initialise(ulong dnsTimeout, ulong connectTimeout, ulong dataTimeout, ulong operationTimeout, int maxRedirects, bool httpsDebug, string userAgent, bool httpProtocol, ulong userRateLimit, ulong protocolVersion, bool keepAlive=false) {
void initialise(ulong dnsTimeout, ulong connectTimeout, ulong dataTimeout, ulong operationTimeout, int maxRedirects, bool httpsDebug, string userAgent, bool httpProtocol, ulong userRateLimit, ulong protocolVersion, bool keepAlive=true) {
// Setting this to false ensures that when we close the curl instance, any open sockets are closed - which we need to do when running
// multiple threads and API instances at the same time otherwise we run out of local files | sockets pretty quickly
this.keepAlive = keepAlive;
Expand Down Expand Up @@ -103,11 +286,104 @@ class CurlEngine {
}
}

void addRequestHeader(const(char)[] name, const(char)[] value) {
http.addRequestHeader(name, value);
response.addRequestHeader(name, value);
}

void connect(HTTP.Method method, const(char)[] url) {
if (!keepAlive)
http.addRequestHeader("Connection", "close");
addRequestHeader("Connection", "close");
http.method = method;
http.url = url;
response.connect(method, url);
}

void setContent(const(char)[] contentType, const(char)[] sendData) {
addRequestHeader("Content-Type", contentType);
if (sendData) {
http.contentLength = sendData.length;
http.onSend = (void[] buf) {
import std.algorithm: min;
size_t minLen = min(buf.length, sendData.length);
if (minLen == 0) return 0;
buf[0 .. minLen] = cast(void[]) sendData[0 .. minLen];
sendData = sendData[minLen .. $];
return minLen;
};
response.postBody = sendData;
}
}

void setFile(File* file, ulong offsetSize) {
addRequestHeader("Content-Type", "application/octet-stream");
http.onSend = data => file.rawRead(data).length;
http.contentLength = offsetSize;
}

CurlResponse execute() {
scope(exit) {
cleanUp();
}
http.onReceive = (ubyte[] data) {
response.content ~= data;
// HTTP Server Response Code Debugging if --https-debug is being used

return data.length;
};
http.perform();
response.update(&http);
return response.dup;
}

CurlResponse download(string originalFilename, string downloadFilename) {
// Threshold for displaying download bar
long thresholdFileSize = 4 * 2^^20; // 4 MiB

CurlResponse response = new CurlResponse();
// open downloadFilename as write in binary mode
auto file = File(downloadFilename, "wb");

// function scopes
scope(exit) {
cleanUp();
if (file.isOpen()){
// close open file
file.close();
}
}

http.onReceive = (ubyte[] data) {
file.rawWrite(data);
return data.length;
};

http.perform();

// Rename downloaded file
rename(downloadFilename, originalFilename);

response.update(&http);
return response;
}

void cleanUp() {
// Reset any values to defaults, freeing any set objects
http.clearRequestHeaders();
http.onSend = null;
http.onReceive = null;
http.onReceiveHeader = null;
http.onReceiveStatusLine = null;
http.onProgress = delegate int(size_t dltotal, size_t dlnow, size_t ultotal, size_t ulnow) {
return 0;
};
http.contentLength = 0;
response.reset();
}

void shutdown() {
// Shut down the curl instance & close any open sockets
http.shutdown();
}

void setDisableSSLVerifyPeer() {
Expand Down
3 changes: 3 additions & 0 deletions src/main.d
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,9 @@ void performStandardExitProcess(string scopeCaller = null) {
}
object.destroy(itemDB);
}

// Shutdown cached sockets
CurlEngine.releaseAll();

// Set all objects to null
if (scopeCaller == "failureScope") {
Expand Down
20 changes: 7 additions & 13 deletions src/onedrive.d
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ class OneDriveApi {
}

// Initialise the OneDrive API class
bool initialise(bool keepAlive=false) {
bool initialise(bool keepAlive=true) {
// Initialise the curl engine
curlEngine = new CurlEngine();
curlEngine = CurlEngine.get();
curlEngine.initialise(appConfig.getValueLong("dns_timeout"), appConfig.getValueLong("connect_timeout"), appConfig.getValueLong("data_timeout"), appConfig.getValueLong("operation_timeout"), appConfig.defaultMaxRedirects, appConfig.getValueBool("debug_https"), appConfig.getValueString("user_agent"), appConfig.getValueBool("force_http_11"), appConfig.getValueLong("rate_limit"), appConfig.getValueLong("ip_protocol_version"), keepAlive);

// Authorised value to return
Expand Down Expand Up @@ -489,17 +489,11 @@ class OneDriveApi {
object.destroy(webhook);
}

// Reset any values to defaults, freeing any set objects
curlEngine.http.clearRequestHeaders();
curlEngine.http.onSend = null;
curlEngine.http.onReceive = null;
curlEngine.http.onReceiveHeader = null;
curlEngine.http.onReceiveStatusLine = null;
curlEngine.http.contentLength = 0;
// Shut down the curl instance & close any open sockets
curlEngine.http.shutdown();
// Free object and memory
object.destroy(curlEngine);
// Release curl instance
if (curlEngine !is null) {
curlEngine.release();
curlEngine = null;
}
}

// Authenticate this client against Microsoft OneDrive API
Expand Down
6 changes: 3 additions & 3 deletions src/util.d
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ bool testInternetReachability(ApplicationConfig appConfig) {
bool result = false;
try {
// Use preconfigured object with all the correct http values assigned
curlEngine = new CurlEngine();
curlEngine = CurlEngine.get();
curlEngine.initialise(appConfig.getValueLong("dns_timeout"), appConfig.getValueLong("connect_timeout"), appConfig.getValueLong("data_timeout"), appConfig.getValueLong("operation_timeout"), appConfig.defaultMaxRedirects, appConfig.getValueBool("debug_https"), appConfig.getValueString("user_agent"), appConfig.getValueBool("force_http_11"), appConfig.getValueLong("rate_limit"), appConfig.getValueLong("ip_protocol_version"));

// Configure the remaining items required
Expand All @@ -228,8 +228,8 @@ bool testInternetReachability(ApplicationConfig appConfig) {
displayOneDriveErrorMessage(e.msg, getFunctionName!({}));
} finally {
if (curlEngine) {
curlEngine.http.shutdown();
object.destroy(curlEngine);
curlEngine.release();
curlEngine = null;
}
}

Expand Down