Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azhdankin - python file uploader #53

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions azhdankin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Multi-threaded file uploader

# Overview

This is a python application that uploads a set of given files to a cloud object storage in parallel through the cloud provider's or third party API.

# Features

1. Support up to 100,000nds of files, all inside one directory with arbitrary sizes. The root directory may contain subdirectories.
2. The object storage container holds the objects is private and only credential-based access is allowed.
3. Each object inside object storage has an associated metadata which contains file size, last modification time and file permissions.

The utility is fast (utilizes full network bandwidth), consumes low CPU (low enough not block all other processes) and low Memory (<25%)
It supports GCP Cloud Storage, however it has a modular and Object oriented implementation so the other cloud providers can be added.

# Prerequisites
You must have Python 3.8 and the Google Cloud Storage Python client installed.

# To run
1. Clone the git repository. Make sure the create_files.py and upload_files.py file permissions are set to "executable".
2. Run ./create_files.py utility. This utility will create the files that need to be uploaded to the cloud storage. You can set the
number of file sto be created as a cmd line parameter (i.e. ./create_files.py 10000 to create 10000 files).
3. Run ./upload_files.py to upload the files to the Cloud Storage.

# Notes
41 changes: 41 additions & 0 deletions azhdankin/cloud_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
This module contains the definitions for the CloudStorage base class which acta like an "abstract class" or
the equivalent of the "interface" for the Cloud storage.
It also has an implementation of the Cloud storage implememntation for GCP, which allows to upload the files.
"""

from google.cloud import storage

#Base CloudStorage class
class CloudStorage:
def __init__(self, name):
self.name = name

def upload_object(self, object_name, source_file_name):
pass

def __str__(self):
return self.name

#Cloud storage implementation for GCP
class CloudStorageGCP(CloudStorage):
def __init__(self, bucket_name, project=None):
super().__init__("CloudStorageGCP")
self.project = project
self.bucket_name = bucket_name
self.client = storage.Client(project=self.project)

#Resolve the reference to the destination bucket
self.bucket = self.client.bucket(self.bucket_name)

#If target bucket does not exist it will be created
if not self.bucket.exists():
self.bucket = self.client.create_bucket(self.bucket_name)

def upload_object(self, object_name, source_file_name):
# Create a new blob object
blob = self.bucket.blob(object_name)
# Upload the file to the bucket
blob.upload_from_filename(source_file_name)


57 changes: 57 additions & 0 deletions azhdankin/create_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!python
""" File creation utility.

This is a utility to crate the directory and sample files which
will be used for transfer to a cloud storage.

The utility takes one command line parameter: number of files to create.
If the parameter is not given by default it will create 10 files.

The file creation is performed by copying the content of the ./seed-file.txt
content n-times (where n is a randomly generated number in a range from 1 to a 100)
into a destination file and naming the destination file by appending sequentially
incremented number to the base file name.

"""
import sys
import os
import random

#Performs the file creation
def create_files (path, num_files):
#Specify the "seed" for the generated files' content.
seed_file = "./seed-file.txt"
name_prefix = "file-2-upload"

#Create the destination directory if it does not exist
if not os.path.exists(path):
os.makedirs(path)

#Populate the seed string for the files to be created and initialize the content
file=open(seed_file,"r")
seed_content = file.read()
target_file_content = ""

#Create the files for upload
for target_file_idx in range (0, num_files):
#Replicate the seed content a random number of times
repeat = random.randint(1,100)
for chunk_num in range (0, repeat):
target_file_content = target_file_content + seed_content
target_file = open (path + name_prefix + str(target_file_idx) + ".txt", 'w')
target_file.write (target_file_content)
target_file_content = ""


#Set the root of the files location and the name prefix for the files to be generated
path = "./files/"

#Set the default number of files to be generated
num_files = 10

#Read the number of files to be generated from the cmd line if provided
if len(sys.argv) > 1:
num_files = int(sys.argv[1])

create_files(path, num_files)

3 changes: 3 additions & 0 deletions azhdankin/seed-file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dsgsddhjdnkdhfkdfjhekfndsmcndkfhed
dhfdhfgefgejfhefhekfekfhekfekf
jhdfhejfgejfgejgfehgfehgfehgfhegf
40 changes: 40 additions & 0 deletions azhdankin/test_create_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#Test for file creation utility.
import glob
import os
from create_files import create_files

#Perform test of file creation
def test_create_files():

#Set the root of the files location and the name prefix for the files to be generated
path = "./test_files/"

#Set the default number of files to be generated
num_files = 10

create_files(path, num_files)

#Path to the root directory where the created files are located
path = "./test_files/*"

#Initialize the file names list
file_list = []

#Populate the list of the file names that were generated. Currently we support two levels of the directories
#where files are located
for entry in glob.iglob(path, recursive=True):
if os.path.isfile(entry):
file_list.append(entry)
else:
entry = entry + "/*"
for element in glob.iglob(entry, recursive=True):
if os.path.isfile(element):
file_list.append(element)

file_list_len = len(file_list)
assert file_list_len == 10





72 changes: 72 additions & 0 deletions azhdankin/upload_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!python
""" Main module to run to upload the files to the Cloud storage.

This program establishes the connection to the Cloud Storage (GCP in this case),
reads the names of the files available for upload
and performs parallel upload of the files to the specified Cloud Storage bucket.

"""
import os
import glob
import time

from concurrent.futures import ThreadPoolExecutor

from cloud_storage import CloudStorageGCP
from uploader import FileUploader

#Path to the root directory where the files to be uploaded are located
path = "./files/*"

#Initialize the file names list
file_list = []

#Populate the list of the file names set for upload. Currently we support two levels of the directories
#where files are located
for entry in glob.iglob(path, recursive=True):
if os.path.isfile(entry):
file_list.append(entry)
else:
entry = entry + "/*"
for element in glob.iglob(entry, recursive=True):
if os.path.isfile(element):
file_list.append(element)

#Specify the maximum number of the workers that perform files upload simultaneously
MAX_UPLOAD_WORKERS = 100

#Calculate the partitioning of the file names list - each partition or chunk will be assigned
#to a single upload worker

file_list_len = len(file_list)
step = int(file_list_len/MAX_UPLOAD_WORKERS)
remainder = file_list_len%MAX_UPLOAD_WORKERS

#Initialize a Cloud Storage Provider
storage = CloudStorageGCP("azhdanki-test-bucket1", project='rewotes')

#Create the Thread Pool which will be used to run the uploader tasks
pool = ThreadPoolExecutor (max_workers=MAX_UPLOAD_WORKERS)

#Schedule the upload tasks
i=0
time_start = time.time()

while i < (file_list_len - remainder):
print(i)
uploader = FileUploader (storage, file_list, i, step)
pool.submit (uploader.run)
i += step

if remainder > 0:
uploader = FileUploader (storage, file_list, i, remainder)
pool.submit (uploader.run)

pool.shutdown (wait=True)

time_end = time.time()
time_delta = time_end - time_start
print ("It took " + str(time_delta) + " seconds to upload " + str(file_list_len) + " files.")



34 changes: 34 additions & 0 deletions azhdankin/uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Class performing upload of the files to the cloud storage.
"""

import os

class FileUploader:
"""
Constructor is taking the following parameters:
storage - a reference to the instance of CloudStorage object.
a CloudStorage class is a parent class for the cloud storage
provider specific implementations, i.e. CloudStorageGCP, CloudStrageAWS

files - a reference to the entire list of the file names that need to be uploaded

start_idx - an index, a "pointer" to the files list specifying where this file uploader
will start

count - a number which specifies how many files this instance of uploader has to process (upload)

"""
def __init__(self, storage, files, start_idx, count):
self.storage = storage
self.files = files
self.start = start_idx
self.count = count

#The method performing the upload of the group of the files to the Cloud storage
def run (self):
for i in range(self.start, self.start + self.count):
object_name = os.path.split(self.files[i])[1]
self.storage.upload_object(object_name, self.files[i])