Skip to content

add chdfs-bucket and rename #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions coscmd/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from wsgiref.handlers import format_date_time
import qcloud_cos

from coscmd.cos_rename import CosMoveConfig

if sys.version > '3':
from coscmd.cos_global import Version
from coscmd.cos_auth import CosS3Auth
Expand Down Expand Up @@ -1466,7 +1468,7 @@ def single_download(self, cos_path, local_path, _http_headers='{}', **kwargs):
http_headers = yaml.safe_load(http_headers)
http_headers = mapped(http_headers)
except Exception as e:
logger.warn("Http_haeder parse error.")
logger.warn("Http_header parse error.")
logger.warn(to_unicode(e))
return -1
try:
Expand Down Expand Up @@ -1571,6 +1573,48 @@ def download_file(self, cos_path, local_path, _http_headers='{}', **kwargs):
except Exception as e:
logger.warn(to_unicode(e))

def move_file(self, source_path, dst_path, _http_headers='{}', **kwargs):
move_source = {}
try:
_source_path = source_path.split('/')
source_tmp_path = _source_path[0].split('.')
source_key = '/'.join(_source_path[1:])
move_source['Bucket'] = source_tmp_path[0]
if len(source_tmp_path) == 5 and source_tmp_path[1] == 'cos':
source_region = source_tmp_path[2]
elif len(source_tmp_path) == 4:
source_region = source_tmp_path[1]
else:
raise Exception("Parse Region Error")
move_source['Key'] = source_key
move_source['RawPath'] = move_source['Bucket'] + ".cos." + \
source_region + \
".myqcloud.com/" + move_source['Key']
logger.info(u"move log move_source={move_source}".format(move_source=move_source))
source_path = '/' + '/'.join(_source_path[1:])
url = self._conf.uri(quote(to_printable_str(dst_path)) + "?rename")
try:
_http_headers = yaml.safe_load(_http_headers)
except Exception as e:
logger.warn("Http_haeder parse error.")
logger.warn(to_unicode(e))
return -1
logger.info(u"Move cos://{source_bucket}/{source_path} => cos://{dst_bucket}/{dst_path}".format(
source_bucket=source_tmp_path[0],
source_path=source_path,
dst_bucket=source_tmp_path[0],
dst_path=dst_path
))
if not CosMoveConfig.move_object(self, source_path, url, _http_headers):
return 0
else:
return -1
except Exception as e:
logger.warn(to_unicode(e))
logger.warn(u"MoveSource is invalid: {movesource}".format(
movesource=source_path))
return -1

def restore_folder(self, cos_path, **kwargs):
self._inner_threadpool = SimpleThreadPool(self._conf._max_thread)
_success_num = 0
Expand Down Expand Up @@ -1759,11 +1803,16 @@ def get_object_acl(self, cos_path):
return False
return False

def create_bucket(self):
def create_bucket(self, ofs):
url = self._conf.uri(path='')
self._have_finished = 0
try:
rt = self._session.put(url=url, auth=CosS3Auth(self._conf))
data = '''
<CreateBucketConfiguration>
<BucketArchConfig>{ofs}</BucketArchConfig>
</CreateBucketConfiguration>
'''.format(ofs=ofs)
rt = self._session.put(url=url, auth=CosS3Auth(self._conf), data=data)
logger.debug(u"put resp, status code: {code}, headers: {headers}, text: {text}".format(
code=rt.status_code,
headers=rt.headers,
Expand All @@ -1775,6 +1824,7 @@ def create_bucket(self):
else:
logger.warn(response_info(rt))
return False
self._session.get()
except Exception as e:
logger.warn(str(e))
return False
Expand Down Expand Up @@ -2059,3 +2109,4 @@ def op_int(self):

if __name__ == "__main__":
pass

46 changes: 36 additions & 10 deletions coscmd/cos_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ def move(args):
client = CosS3Client(conf)
Interface = client.op_int()
_, args.cos_path = concat_path(args.source_path, args.cos_path)
logger.info(u"====move log source_path={source_path}, cos_path={cos_path}".format(
source_path=args.source_path, cos_path=args.cos_path))
while args.cos_path.startswith('/'):
args.cos_path = args.cos_path[1:]
if not isinstance(args.source_path, text_type):
Expand All @@ -397,20 +399,32 @@ def move(args):
kwargs['delete'] = False
kwargs['move'] = True
if args.recursive:
_, args.cos_path = concat_path(args.source_path, args.cos_path)
if args.cos_path.endswith('/') is False:
args.cos_path += '/'
if args.cos_path.startswith('/'):
args.cos_path = args.cos_path[1:]
if not Interface.copy_folder(args.source_path, args.cos_path, args.headers, **kwargs):
return 0
if args.ofs:
#融合桶 头部添加
if not Interface.move_file(args.source_path, args.cos_path, args.headers, **kwargs):
return 0
else:
return 1
else:
return 1
if args.cos_path.endswith('/') is False:
args.cos_path += '/'
if not Interface.copy_folder(args.source_path, args.cos_path, args.headers, **kwargs):
return 0
else:
return 1
else:
if not Interface.copy_file(args.source_path, args.cos_path, args.headers, **kwargs):
return 0
if args.ofs:
if not Interface.move_file(args.source_path, args.cos_path, args.headers, **kwargs):
return 0
else:
return 1
else:
return -1
if not Interface.copy_file(args.source_path, args.cos_path, args.headers, **kwargs):
return 0
else:
return -1
except Exception as e:
logger.warn(e)
return -2
Expand Down Expand Up @@ -588,8 +602,12 @@ def create_bucket(args):
try:
conf = load_conf()
client = CosS3Client(conf)
if args.ofs:
ofs = "OFS"
else:
ofs = ""
Interface = client.op_int()
if Interface.create_bucket():
if Interface.create_bucket(ofs):
return 0
else:
logger.warn("Create bucket fail")
Expand Down Expand Up @@ -725,6 +743,7 @@ def command_thread():
desc = """an easy-to-use but powerful command-line tool.
try \'coscmd -h\' to get more informations.
try \'coscmd sub-command -h\' to learn all command usage, likes \'coscmd upload -h\'"""
#ArgumentParser命令行解析的主要入口点,add_argument()方法为解析器填充可选参数和位置参数的动作
parser = ArgumentParser(description=desc)
parser.add_argument('-d', '--debug', help="Debug mode", action="store_true", default=False)
parser.add_argument('-s', '--silence', help="Silence mode", action="store_true", default=False)
Expand All @@ -735,13 +754,16 @@ def command_thread():
parser.add_argument('--log_size', help='specify max log size in MB (default 1MB)', type=int, default=128)
parser.add_argument('--log_backup_count', help='specify log backup num', type=int, default=1)

#add_subparsers()方法去创建子命令
sub_parser = parser.add_subparsers()
#sub_parser.add_parser()添加子命令
parser_config = sub_parser.add_parser("config", help="Config your information at first")
parser_config.add_argument('-a', '--secret_id', help='Specify your secret id', type=str, required=True)
parser_config.add_argument('-s', '--secret_key', help='Specify your secret key', type=str, required=True)
parser_config.add_argument('-t', '--token', help='Set x-cos-security-token header', type=str, default="")
parser_config.add_argument('-b', '--bucket', help='Specify your bucket', type=str, required=True)

#add_mutually_exclusive_gruop()方法也接受一个required参数,表示在互斥组中至少有一个参数是需要的
group = parser_config.add_mutually_exclusive_group(required=True)
group.add_argument('-r', '--region', help='Specify your region', type=str)
group.add_argument('-e', '--endpoint', help='Specify COS endpoint', type=str)
Expand Down Expand Up @@ -820,6 +842,7 @@ def command_thread():
parser_move.add_argument('-H', '--headers', help="Specify HTTP headers", type=str, default='{}')
parser_move.add_argument('-d', '--directive', help="if Overwrite headers", type=str, choices=['Copy', 'Replaced'], default="Copy")
parser_move.add_argument('-r', '--recursive', help="Copy files recursively", action="store_true", default=False)
parser_move.add_argument('-o', "--ofs", help="Move bucket files", action="store_true", default=False)
parser_move.add_argument('--include', help='Specify filter rules, separated by commas; Example: *.txt,*.docx,*.ppt', type=str, default="*")
parser_move.add_argument('--ignore', help='Specify ignored rules, separated by commas; Example: *.txt,*.docx,*.ppt', type=str, default="")
parser_move.set_defaults(func=Op.move)
Expand Down Expand Up @@ -855,6 +878,7 @@ def command_thread():
parser_signurl.set_defaults(func=Op.signurl)

parser_create_bucket = sub_parser.add_parser("createbucket", help='Create bucket')
parser_create_bucket.add_argument('-o', "--ofs", help="create ofs bucket", action="store_true", default=False)
parser_create_bucket.set_defaults(func=Op.create_bucket)

parser_delete_bucket = sub_parser.add_parser("deletebucket", help='Delete bucket')
Expand Down Expand Up @@ -970,3 +994,5 @@ def _main():
_main()
global res
sys.exit(res)


138 changes: 138 additions & 0 deletions coscmd/cos_rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# -*- coding=utf-8
import json
import threading

from qcloud_cos import CosServiceError, CosClientError, CosS3Client
from qcloud_cos.cos_comm import check_object_content_length, get_content_md5, client_can_retry, format_values
from requests import Request, Timeout, __version__

from .cos_auth import CosS3Auth
from .cos_comm import *

# python 3.10报错"module 'collections' has no attribute 'Iterable'",这里先规避
if sys.version_info.major >= 3 and sys.version_info.minor >= 10:
import collections.abc
collections.Iterable = collections.abc.Iterable

logger = logging.getLogger(__name__)


class CosMoveConfig(object):
__built_in_sessions = None # 内置的静态连接池,多个Client间共享使用

def __init__(self, conf, retry=1, session=None):
"""初始化client对象

:param conf(CosConfig): 用户的配置.
:param retry(int): 失败重试的次数.
:param session(object): http session.
"""
self._conf = conf
self._retry = retry # 重试的次数,分片上传时可适当增大

if not CosMoveConfig.__built_in_sessions:
with threading.Lock():
if not CosMoveConfig.__built_in_sessions: # 加锁后double check
CosMoveConfig.__built_in_sessions = self.generate_built_in_connection_pool(
self._conf._pool_connections, self._conf._pool_maxsize)

if session is None:
self._session = CosMoveConfig.__built_in_sessions
else:
self._session = session

def set_built_in_connection_pool_max_size(self, PoolConnections, PoolMaxSize):
"""设置SDK内置的连接池的连接大小,并且重新绑定到client中"""
if not CosS3Client.__built_in_sessions:
return

if CosS3Client.__built_in_sessions.get_adapter('http://')._pool_connections == PoolConnections \
and CosS3Client.__built_in_sessions.get_adapter('http://')._pool_maxsize == PoolMaxSize:
return

# 判断之前是否绑定到内置连接池
rebound = False
if self._session and self._session is CosS3Client.__built_in_sessions:
rebound = True

# 重新生成内置连接池
CosS3Client.__built_in_sessions.close()
CosS3Client.__built_in_sessions = self.generate_built_in_connection_pool(PoolConnections, PoolMaxSize)

# 重新绑定到内置连接池
if rebound:
self._session = CosS3Client.__built_in_sessions
logger.info("rebound built-in connection pool success. maxsize=%d,%d" % (PoolConnections, PoolMaxSize))

def generate_built_in_connection_pool(self, PoolConnections, PoolMaxSize):
"""生成SDK内置的连接池,此连接池是client间共用的"""
built_in_sessions = requests.session()
built_in_sessions.mount('http://', requests.adapters.HTTPAdapter(pool_connections=PoolConnections, pool_maxsize=PoolMaxSize))
built_in_sessions.mount('https://', requests.adapters.HTTPAdapter(pool_connections=PoolConnections, pool_maxsize=PoolMaxSize))
logger.info("generate built-in connection pool success. maxsize=%d,%d" % (PoolConnections, PoolMaxSize))
return built_in_sessions

def get_conf(self):
"""获取配置"""
return self._conf

def get_auth(self, Method, Bucket, Key, Expired=300, Headers={}, Params={}, SignHost=None):
"""获取签名

:param Method(string): http method,如'PUT','GET'.
:param Bucket(string): 存储桶名称.
:param Key(string): 请求COS的路径.
:param Expired(int): 签名有效时间,单位为s.
:param headers(dict): 签名中的http headers.
:param params(dict): 签名中的http params.
:param SignHost(bool): 是否将host算入签名.
:return (string): 计算出的V5签名.

.. code-block:: python

config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
# 获取上传请求的签名
auth_string = client.get_auth(
Method='PUT',
Bucket='bucket',
Key='test.txt',
Expired=600,
Headers={'header1': 'value1'},
Params={'param1': 'value1'}
)
print (auth_string)
"""

# python中默认参数只会初始化一次,这里重新生成可变对象实例避免多线程访问问题
if not Headers:
Headers = dict()
if not Params:
Params = dict()

url = self._conf.uri(bucket=Bucket, path=Key)
r = Request(Method, url, headers=Headers, params=Params)
auth = CosS3Auth(self._conf, Key, Params, Expired, SignHost)
return auth(r).headers['Authorization']

def move_object(self, source_path, url, _http_headers='{}', EnableMD5=False):
http_headers = _http_headers
logger.info("put object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=http_headers))
if EnableMD5:
md5_str = get_content_md5(url)
if md5_str:
http_headers['Content-MD5'] = md5_str
http_headers['x-cos-rename-source'] = source_path
logger.info("put object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=http_headers))
rt = self._session.put(url=url, auth=CosS3Auth(self._conf), headers=http_headers, timeout=self._timeout)
if rt.status_code == 200:
return 0
else:
raise Exception(response_info(rt))

if __name__ == "__main__":
pass
Empty file added coscmd/test01.py
Empty file.