Skip to content

Commit

Permalink
random IOs support
Browse files Browse the repository at this point in the history
  • Loading branch information
ploki committed Oct 19, 2016
1 parent a4a0cd1 commit b50c3cb
Show file tree
Hide file tree
Showing 6 changed files with 807 additions and 322 deletions.
131 changes: 102 additions & 29 deletions src/FSAL/FSAL_SCALITY/dbd_rest_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ iso8601_str2timespec(const char* ts_str)
int y,M,d,h,m;
double s;
if (NULL != ts_str) {
int ret = sscanf(ts_str, "%d-%d-%dT%d:%d:%lfZ", &y, &M, &d, &h, &m, &s);
int ret = sscanf(ts_str, "%d-%d-%dT%d:%d:%lfZ",
&y, &M, &d, &h, &m, &s);
if ( 6 != ret ) {
LogCrit(COMPONENT_FSAL,
"malformed ISO8601 date %s. got %d fields",
Expand Down Expand Up @@ -113,7 +114,9 @@ dbd_get(struct scality_fsal_export *export,


if ( !object == !parameters ) {
LogCrit(COMPONENT_FSAL, "BUG: Invalid parameters object and parameters are both set");
LogCrit(COMPONENT_FSAL,
"BUG: Invalid parameters, object "
"and parameters are both set");
return NULL;
}

Expand All @@ -139,8 +142,10 @@ dbd_get(struct scality_fsal_export *export,

if ( NULL != parameters ) {
if ( NULL != parameters->prefix ) {
tmp = curl_easy_escape(curl, parameters->prefix, strlen(parameters->prefix));
pos += snprintf(query_string+pos, sizeof(query_string)-pos,
tmp = curl_easy_escape(curl, parameters->prefix,
strlen(parameters->prefix));
pos += snprintf(query_string+pos,
sizeof(query_string)-pos,
"prefix=%s&", tmp);
free(tmp);
if ( pos >= sizeof(query_string) ) {
Expand All @@ -149,8 +154,10 @@ dbd_get(struct scality_fsal_export *export,
}
}
if ( NULL != parameters->marker ) {
tmp = curl_easy_escape(curl, parameters->marker, strlen(parameters->marker));
pos += snprintf(query_string+pos, sizeof(query_string)-pos,
tmp = curl_easy_escape(curl, parameters->marker,
strlen(parameters->marker));
pos += snprintf(query_string+pos,
sizeof(query_string)-pos,
"marker=%s&", tmp);
if ( pos >= sizeof(query_string) ) {
LogCrit(COMPONENT_FSAL, "buffer overrun");
Expand All @@ -159,8 +166,10 @@ dbd_get(struct scality_fsal_export *export,
free(tmp);
}
if ( NULL != parameters->delimiter ) {
tmp = curl_easy_escape(curl, parameters->delimiter, strlen(parameters->delimiter));
pos += snprintf(query_string+pos, sizeof(query_string)-pos,
tmp = curl_easy_escape(curl, parameters->delimiter,
strlen(parameters->delimiter));
pos += snprintf(query_string+pos,
sizeof(query_string)-pos,
"delimiter=%s&", tmp);
free(tmp);
if ( pos >= sizeof(query_string) ) {
Expand All @@ -169,7 +178,8 @@ dbd_get(struct scality_fsal_export *export,
}
}
if ( parameters->maxkeys > 0 ) {
pos += snprintf(query_string+pos, sizeof(query_string)-pos,
pos += snprintf(query_string+pos,
sizeof(query_string)-pos,
"maxKeys=%d&", parameters->maxkeys);
if ( pos >= sizeof(query_string) ) {
LogCrit(COMPONENT_FSAL, "buffer overrun");
Expand All @@ -183,12 +193,14 @@ dbd_get(struct scality_fsal_export *export,
if (object) {
char *tmp = curl_easy_escape(curl, object, strlen(object));
pos = snprintf(url, sizeof(url), "%s%s/%s/%s",
export->module->dbd_url, base_path, export->bucket, tmp);
export->module->dbd_url, base_path,
export->bucket, tmp);
free(tmp);
}
else {
pos = snprintf(url, sizeof(url), "%s%s/%s%s",
export->module->dbd_url, base_path, export->bucket, query_string);
export->module->dbd_url, base_path,
export->bucket, query_string);
}


Expand Down Expand Up @@ -220,9 +232,11 @@ dbd_get(struct scality_fsal_export *export,
goto out;
}

curl_ret = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response->http_status);
curl_ret = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
&response->http_status);
if ( CURLE_OK != curl_ret ) {
LogCrit(COMPONENT_FSAL, "Unable to retrieve HTTP status for %s", url);
LogCrit(COMPONENT_FSAL,
"Unable to retrieve HTTP status for %s", url);
goto out;
}

Expand All @@ -233,16 +247,19 @@ dbd_get(struct scality_fsal_export *export,
LogCrit(COMPONENT_FSAL, "stream error at close");
goto out;
}
response->body = json_loads(body_text, JSON_REJECT_DUPLICATES,&json_error);
response->body = json_loads(body_text,
JSON_REJECT_DUPLICATES,&json_error);
if ( NULL == response->body ) {
LogWarn(COMPONENT_FSAL,
"json_load failed with error %s ...", json_error.text);
"json_load failed with error %s ...",
json_error.text);
if (object)
LogWarn(COMPONENT_FSAL,
"... parse error on object %s", object);
else
LogWarn(COMPONENT_FSAL,
"... parse error on query string %s", query_string);
"... parse error on query string %s",
query_string);

goto out;
}
Expand Down Expand Up @@ -456,7 +473,8 @@ dbd_delete(struct scality_fsal_export *export,


if ( CURLE_OK != curl_ret ) {
LogCrit(COMPONENT_FSAL, "Unable to retrieve HTTP status for %s", url);
LogCrit(COMPONENT_FSAL,
"Unable to retrieve HTTP status for %s", url);
goto out;
}

Expand Down Expand Up @@ -819,7 +837,6 @@ dbd_getattr_directory(struct scality_fsal_export* export,

if ( 0 == ret ) {
(void)dbd_getattr_regular_file(export, object_hdl);

}
return ret;
}
Expand All @@ -830,6 +847,8 @@ dbd_get_parts_size(struct scality_fsal_export *export,
{
size_t total = 0;
struct avltree_node *node;
int ret = 0;
scality_content_lock(myself);
for ( node = avltree_first(&myself->locations) ;
node != NULL ;
node = avltree_next(node) ) {
Expand All @@ -838,10 +857,9 @@ dbd_get_parts_size(struct scality_fsal_export *export,
struct scality_location,
avltree_node);
size_t len;
int ret;
ret = sproxyd_head(export, location->key, &len);
if ( ret < 0 )
return ret;
goto out;
location->start = total;
location->size = len;
total += len;
Expand All @@ -852,17 +870,31 @@ dbd_get_parts_size(struct scality_fsal_export *export,
location->start,
location->size);
}
return 0;
out:
scality_content_unlock(myself);
return ret;
}

static int
dbd_getattr_regular_file(struct scality_fsal_export* export,
struct scality_fsal_obj_handle *object_hdl)
{
LogDebug(COMPONENT_FSAL,
"%s: %p", object_hdl->object, object_hdl);

int ret = 0;
dbd_response_t *response = NULL;
char object[MAX_URL_SIZE];
bool directory = DIRECTORY == object_hdl->attributes.type;
bool directory;

scality_content_lock(object_hdl);

if (SCALITY_FSAL_OBJ_STATE_CLEAN != object_hdl->state ) {
ret = 0;
goto out;
}

directory = DIRECTORY == object_hdl->attributes.type;

snprintf(object, sizeof object,
directory
Expand Down Expand Up @@ -939,12 +971,14 @@ dbd_getattr_regular_file(struct scality_fsal_export* export,
case JSON_STRING:
new_location = scality_location_new(json_string_value(location),
0, filesize);

avltree_insert(&new_location->avltree_node,
&object_hdl->locations);
object_hdl->n_locations = 1;
break;
case JSON_ARRAY: {
size_t i;
size_t last_end = 0;
object_hdl->n_locations = json_array_size(location);
bool incomplete = false;
for ( i = 0 ; i < object_hdl->n_locations ; ++i ) {
Expand Down Expand Up @@ -995,6 +1029,8 @@ dbd_getattr_regular_file(struct scality_fsal_export* export,
break;
}
key = json_string_value(jkey);
assert(last_end == start);
last_end = start+size;
new_location = scality_location_new(key,
start,
size);
Expand All @@ -1010,15 +1046,39 @@ dbd_getattr_regular_file(struct scality_fsal_export* export,
default:break;
}
}
if ( incomplete )

if ( incomplete ) {
ret = dbd_get_parts_size(export,
object_hdl);
}
else {
struct avltree_node *node;
last_end = 0;
int i = 0;
for (node = avltree_first(&object_hdl->locations);
node != NULL ;
node = avltree_next(node) ) {
struct scality_location *loc;
loc = avltree_container_of(node,
struct
scality_location,
avltree_node);
LogDebug(COMPONENT_FSAL,
"i: %d, "
"loc->start: %"PRIu64", "
"loc->size: %"PRIu64", ",
i, loc->start, loc->size);
assert(last_end == loc->start);
last_end = loc->start + loc->size;
++i;
}
}
}
default:break;
}

struct scality_location * first_location;

node = avltree_first(&object_hdl->locations);
first_location = avltree_container_of(node,
struct scality_location,
Expand All @@ -1030,7 +1090,13 @@ dbd_getattr_regular_file(struct scality_fsal_export* export,
else
object_hdl->part_size = DEFAULT_PART_SIZE;

scality_sanity_check_parts(export, object_hdl);

assert(SCALITY_FSAL_OBJ_STATE_CLEAN == object_hdl->state);

out:

scality_content_unlock(object_hdl);
dbd_response_free(response);
return ret;
}
Expand Down Expand Up @@ -1078,9 +1144,6 @@ get_payload(struct scality_fsal_export* export,

gmtime_r(&time, &tm);

snprintf(size, sizeof(size), "%zu", regular_file
? object_hdl->attributes.filesize
: 0 );
ret = strftime(date, sizeof(date), "%Y-%m-%dT%H:%M:%S", &tm);
snprintf(date+ret, sizeof(date)-ret, ".%03ldZ", object_hdl->attributes.mtime.tv_nsec/1000000);

Expand All @@ -1093,7 +1156,6 @@ get_payload(struct scality_fsal_export* export,
json_object_set(metadata, "last-modified", json_string(date));
json_object_set(metadata, "owner-display-name", json_string(export->owner_display_name));
json_object_set(metadata, "owner-id", json_string(export->owner_id));
json_object_set(metadata, "content-length", json_string(size));
json_object_set(metadata, "content-type", json_string(regular_file
? DEFAULT_CONTENT_TYPE
: DIRECTORY_CONTENT_TYPE));
Expand All @@ -1107,6 +1169,10 @@ get_payload(struct scality_fsal_export* export,
json_object_set(metadata, "x-amz-server-side-encryption-customer-algorithm", json_string(""));
json_object_set(metadata, "x-amz-version-id", json_string("null"));

snprintf(size, sizeof(size), "%zu", regular_file
? object_hdl->attributes.filesize
: 0 );
json_object_set(metadata, "content-length", json_string(size));
if ( object_hdl->n_locations ) {
json_t *js_locations = json_array();
struct avltree_node *node;
Expand All @@ -1118,6 +1184,7 @@ get_payload(struct scality_fsal_export* export,
struct scality_location,
avltree_node);
json_t *js_location = json_object();
assert(NULL != location->key);
json_object_set(js_location, "key",
json_string(location->key));
json_object_set(js_location, "start",
Expand Down Expand Up @@ -1177,13 +1244,19 @@ int
dbd_post(struct scality_fsal_export* export,
struct scality_fsal_obj_handle *object_hdl)
{
LogDebug(COMPONENT_FSAL,
"%s: %p", object_hdl->object, object_hdl);

char url[MAX_URL_SIZE];
char *payload = get_payload(export, object_hdl);
char *payload;
CURL *curl = NULL;
CURLcode curl_ret;
long http_status;
int ret = -1;

scality_sanity_check_parts(export, object_hdl);

payload = get_payload(export, object_hdl);
if ( 0 == object_hdl->object[0]) {
//FIXME
//set bucket attributes must not be done here
Expand Down
22 changes: 22 additions & 0 deletions src/FSAL/FSAL_SCALITY/export.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ static struct config_block export_param = {
.blk_desc.u.blk.commit = noop_conf_commit
};


static int
handles_cmp(const struct avltree_node *haystackp,
const struct avltree_node *needlep)
{
struct scality_fsal_obj_handle *haystack, *needle;
haystack = avltree_container_of(haystackp,
struct scality_fsal_obj_handle,
avltree_node);
needle = avltree_container_of(needlep,
struct scality_fsal_obj_handle,
avltree_node);
fsal_cookie_t haystack_cookie = *(fsal_cookie_t*)haystack->handle;
fsal_cookie_t needle_cookie = *(fsal_cookie_t*)needle->handle;
return (haystack_cookie > needle_cookie) -
(haystack_cookie < needle_cookie);
}


/* create_export
* Create an export point and return a handle to it to be kept
* in the export list.
Expand All @@ -329,6 +348,9 @@ fsal_status_t scality_create_export(struct fsal_module *fsal_hdl,
}

myself->module = container_of(fsal_hdl, struct scality_fsal_module, fsal);
pthread_mutex_init(&myself->handles_mutex, NULL);
avltree_init(&myself->handles, handles_cmp, 0);

retval = load_config_from_node(parse_node,
&export_param,
myself,
Expand Down
Loading

0 comments on commit b50c3cb

Please sign in to comment.