Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Developed 1 new feature, fixed 2 bugs #65

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cola/core/mq/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def put_one(self, obj, force=False, commit=True):
if self.deduper.exist(prop):
return
if len(self.legal_files) == 0:
self._generate_file()
with self.lock:
self._generate_file()

obj_str = self._stringfy(obj)
# If no file has enough space
Expand Down
2 changes: 1 addition & 1 deletion cola/core/mq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def labelize(obj):
if isinstance(obj, str):
return obj
elif isinstance(obj, unicode):
return unicode.encode('utf-8')
return obj.encode('utf-8', 'ignore')
else:
try:
return str(obj)
Expand Down
32 changes: 21 additions & 11 deletions cola/core/opener.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ def __init__(self, cookie_filename=None, user_agent=None, timeout=None, **kwargs
self.browser.set_handle_redirect(True)
self.browser.set_handle_referer(True)
self.browser.set_handle_robots(False)
self.browser.addheaders = [
('User-agent', user_agent)]
self.browser.addheaders = [('User-agent', user_agent)]
self.proxies = {}

if timeout is None:
self._default_timout = mechanize._sockettimeout._GLOBAL_DEFAULT_TIMEOUT
else:
Expand Down Expand Up @@ -169,14 +169,17 @@ def _clear_content(self):
del self.content

def close(self):
"""
clear browse history, avoid memory issue
"""
self._clear_content()
resp = self.browser.response()
if resp is not None:
resp.close()
self.browser.clear_history()

class SpynnerOpener(Opener):
def __init__(self, user_agent=None, **kwargs):
def __init__(self, user_agent=None, timeout=30, **kwargs):
try:
import spynner
except ImportError:
Expand All @@ -185,8 +188,9 @@ def __init__(self, user_agent=None, **kwargs):
if user_agent is None:
user_agent = 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)'

self.br = spynner.Browser(user_agent=user_agent, **kwargs)

self.br = spynner.Browser(user_agent=user_agent)
self._default_timout = timeout

def spynner_open(self, url, data=None, headers=None, method='GET',
wait_for_text=None, wait_for_selector=None, tries=None):
try:
Expand All @@ -207,7 +211,7 @@ def wait_callback(br):
if method == 'POST':
operation = QNetworkAccessManager.PostOperation
self.br.load(url, wait_callback=wait_callback, tries=tries,
operation=operation, body=data, headers=headers)
operation=operation, body=data, headers=headers,load_timeout= self._default_timout)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同样,逗号后加空格。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同意

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些确定的改动(比如格式),可以先提一个commit修正下。

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里空格,再提交改动下哈。


return self.br

Expand All @@ -222,6 +226,12 @@ def read(self):
return self.content if hasattr(self, 'content') else self.br.contents

def wait_for_selector(self, selector, **kwargs):
self.br.wait_for_content(
lambda br: not br.webframe.findFirstElement(selector).isNull(),
**kwargs)
self.br.wait_for_content(lambda br: not br.webframe.findFirstElement(selector).isNull(),
**kwargs)

def add_proxy(self,addr, proxy_type='all',
user=None, password=None):
self.br.set_proxy(addr)

def set_default_timeout(self, timeout):
self._default_timout = timeout
20 changes: 12 additions & 8 deletions cola/functions/budget.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SUFFICIENT, NOAPPLIED, ALLFINISHED = range(3)
DEFAULT_BUDGETS = 3
BUDGET_APPLY_STATUS_FILENAME = 'budget.apply.status'
AUTO_BUDGET = 'auto'

def synchronized(func):
def inner(self, *args, **kw):
Expand All @@ -50,8 +51,11 @@ def __init__(self, working_dir, settings,
self.settings = settings
self.rpc_server = rpc_server
self.app_name = app_name

self.budgets = settings.job.size
# check if auto budget setting enabled
if settings.job.size == AUTO_BUDGET:
self.budgets = len(settings.job.starts)
else:
self.budgets = settings.job.size
self.limit = self.budgets >= 0
self.applied = 0
self.finished = 0
Expand Down Expand Up @@ -178,11 +182,11 @@ def finish(self, size=1):
def error(self, size=1):
return self._call('error', size)

def set_budget(self, budget):
return self._call('set_budget', budget)
def set_budgets(self, budget):
return self._call('set_budgets', budget)

def inc_budget(self, budget):
return self._call('inc_budget', budget)
def inc_budgets(self, budget):
return self._call('inc_budgets', budget)

def dec_budget(self, budget):
return self._call('dec_budget', budget)
def dec_budgets(self, budget):
return self._call('dec_budgets', budget)
15 changes: 8 additions & 7 deletions cola/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ def run_containers(n_containers, n_instances, working_dir, job_def_path,
is_local=is_local, master_ip=master_ip, task_start_id=acc)
if is_multi_process:
process = multiprocessing.Process(target=container.run,
args=(True, ))
args=(True,))
process.start()
processes.append(process)
else:
thread = threading.Thread(target=container.run,
args=(True, ))
args=(True,))
thread.start()
processes.append(thread)
acc += n_tasks
Expand All @@ -138,7 +138,7 @@ def __init__(self, ctx, job_def_path, job_name,
self.job_name = job_name
self.working_dir = working_dir or os.path.join(self.ctx.working_dir,
self.job_name)
self.logger = get_logger(name='cola_job'+str(time.time()))
self.logger = get_logger(name='cola_job' + str(time.time()))
self.job_desc = job_desc or import_job_desc(job_def_path)

self.settings = self.job_desc.settings
Expand Down Expand Up @@ -168,14 +168,16 @@ def _register_rpc(self):
self.rpc_server.register_function(self.shutdown, name='shutdown',
prefix=self.prefix)
if self.ctx.is_local_mode:
self.rpc_server.register_function(lambda: [self.job_name, ],
self.rpc_server.register_function(lambda: [self.job_name,],
name='get_jobs')

def init_deduper(self):
deduper_cls = import_module(self.settings.job.components.deduper.cls)

base = 1 if not self.is_bundle else 1000
size = self.job_desc.settings.job.size
if isinstance(size, basestring):
size = len(self.job_desc.settings.job.starts) * 10
capacity = UNLIMIT_BLOOM_FILTER_CAPACITY
if size > 0:
capacity = max(base * size * 10, capacity)
Expand Down Expand Up @@ -203,7 +205,7 @@ def init_mq(self):

def _init_function_servers(self):
budget_dir = os.path.join(self.working_dir, 'budget')
budget_cls = BudgetApplyServer if not self.is_multi_process \
budget_cls = BudgetApplyServer if not self.is_multi_process \
else self.manager.budget_server
self.budget_server = budget_cls(budget_dir, self.settings,
None, self.job_name)
Expand Down Expand Up @@ -264,8 +266,7 @@ def init(self):
def run(self, block=False):
self.init()
try:
self.processes = run_containers(
self.n_containers, self.n_instances, self.working_dir,
self.processes = run_containers(self.n_containers, self.n_instances, self.working_dir,
self.job_def_path, self.job_name, self.ctx.env, self.mq,
self.counter_arg, self.budget_arg, self.speed_arg,
self.stopped, self.nonsuspend, self.idle_statuses,
Expand Down
9 changes: 9 additions & 0 deletions cola/job/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from cola.functions.speed import SpeedControlClient
from cola.functions.counter import CounterClient

MAX_IDLE_TIMES = 5

class Container(object):
def __init__(self, container_id, working_dir,
job_path, job_name, env, mq,
Expand Down Expand Up @@ -125,9 +127,16 @@ def sync():

def _init_idle_status_checker(self):
def check():
idle_times = 0
while not self.stopped.is_set():
self.idle_statuses[self.container_id] = \
all([task.is_idle() for task in self.tasks])
if self.idle_statuses[self.container_id]:
idle_times += 1
if self.job_desc.settings.job.size=='auto' and idle_times > MAX_IDLE_TIMES:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== 两边空格

break
else:
idle_times = 0
self.stopped.wait(5)
self.check_idle_t = threading.Thread(target=check)

Expand Down
54 changes: 32 additions & 22 deletions cola/job/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ def __init__(self, id_, job_desc, mq,
is_local=False, env=None, logger=None):
self.id_ = id_
self.job_desc = job_desc
self.opener = job_desc.opener_cls(
timeout=DEFAULT_OPENER_TIMEOUT)
self.opener = job_desc.opener_cls(timeout=DEFAULT_OPENER_TIMEOUT)
self.mq = mq
self.dir_ = working_dir
self.settings = job_desc.settings
Expand Down Expand Up @@ -116,8 +115,7 @@ def _configure_proxy(self):
for p in proxies:
proxy_type = p.type if p.has('type') else 'all'
if p.has('addr'):
self.opener.add_proxy(
p.addr,
self.opener.add_proxy(p.addr,
proxy_type=proxy_type,
user=p.user if p.has('user') else None,
password=p.password if p.has('password') else None)
Expand Down Expand Up @@ -210,7 +208,7 @@ def _pack_error(self, url, msg, error, content=None,

msg_filename = os.path.join(path, ERROR_MSG_FILENAME)
with open(msg_filename, 'w') as f:
f.write(msg+'\n')
f.write(msg + '\n')
traceback.print_exc(file=f)

content_filename = os.path.join(path,
Expand Down Expand Up @@ -314,7 +312,10 @@ def _parse(self, parser_cls, options, url):
counter=ExecutorCounter(self),
settings=ReadOnlySettings(self.settings),
**options).parse()
return list(res)
if res:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免res返回空值后list函数错误

return list(res)
else:
return list()

def _log_error(self, url, e):
if self.logger:
Expand Down Expand Up @@ -350,6 +351,7 @@ def _handle_error(self, url, e, pack=True):
self._error()
raise UnitRetryFailed


Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

添加行多余了。下同。

def _clear_error(self, url):
if hasattr(url, 'error_times'):
del url.error_times
Expand All @@ -364,7 +366,7 @@ def _parse_with_process_exception(self, parser_cls, options, url):
kw = {'pages': 1, 'secs': t}
self.counter_client.multi_local_inc(self.ip, self.id_, **kw)
self.counter_client.multi_global_inc(**kw)

self._clear_error(url)
self._recover_normal()

Expand All @@ -383,7 +385,7 @@ def _parse_with_process_exception(self, parser_cls, options, url):
except Exception, e:
self._handle_error(url, e)

return [url, ]
return [url,]

def execute(self, url, is_inc=False):
failed = False
Expand All @@ -402,20 +404,22 @@ def execute(self, url, is_inc=False):
parser_cls, options = self.job_desc.url_patterns.get_parser(url, options=True)
if parser_cls is not None:
if rates == 0:
rates, span = self.speed_client.require(
DEFAULT_SPEEED_REQUIRE_SIZE)
rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE)
if rates == 0:
if self.stopped.wait(5):
return
rates -= 1

try:
next_urls = self._parse_with_process_exception(
parser_cls, options, url)
next_urls = self._parse_with_process_exception(parser_cls, options, url)
next_urls = list(self.job_desc.url_patterns.matches(next_urls))

if next_urls:
self.mq.put(next_urls)
# inc budget if auto budget enabled
if self.settings.job.size == 'auto':
inc_budgets = len(next_urls)
if inc_budgets > 0:
self.budget_client.inc_budgets(inc_budgets)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里,如果next_urls当中存在已经抓取的,这个len(next_urls)就不等于待抓取的urls了吧。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里确实没有考虑到,似乎需要判断mq里put方法存放的url数量
不过由于现在mq的MessageQueueNodeProxy.put没有返回值; 而Store.put其实是返回了保存成功的消息的
是否要先改mq?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方会比较棘手一点,因为MessageQueueNodeProxy在put的时候,并没有真正做放进去的操作,而是对每个其他worker,做了一个cache,当cache满了的时候,才会flush出去。这样做的目的是为了减少网络间的传输开销。

所以,现在put方法并不知道真正放进去了多少个,而去重的操作是在mq的每个节点上才会去做的。

这里budgets的数量大于真实抓取的数量,会导致不能立刻结束么?如果不导致,这里应该就不用改了我理解。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,如果budgets的数量大于真实抓取的数量,会导致JOB一直在等待状态。

好像也没有什么好办法,如果遇到有重复的URL,只能按run_loca_job的思路做。如果IDLE超时,自动结束,对吗?

 while t.is_alive():
            if job.get_status() == FINISHED:
                break
            if job.get_status() == IDLE:
                idle_times += 1
                if idle_times > MAX_IDLE_TIMES:
                    break
            else:

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,我理解现在只能这样。

if hasattr(self.opener, 'close'):
self.opener.close()

Expand Down Expand Up @@ -458,8 +462,7 @@ def _parse(self, parser_cls, options, bundle, url):

def _log_error(self, bundle, url, e):
if self.logger:
self.logger.error('Error when handle bundle: %s, url: %s' % (
str(bundle), str(url)))
self.logger.error('Error when handle bundle: %s, url: %s' % (str(bundle), str(url)))
self.logger.exception(e)
if url == getattr(bundle, 'error_url', None):
bundle.error_times = getattr(bundle, 'error_times', 0) + 1
Expand Down Expand Up @@ -499,6 +502,9 @@ def _handle_error(self, bundle, url, e, pack=True):

if ignore:
bundle.error_urls.append(url)
# dec budget if auto budget enabled
if self.settings.job.size == 'auto':
self.budget_client.dec_budgets(1)
return
else:
bundle.current_urls.insert(0, url)
Expand All @@ -525,6 +531,7 @@ def _parse_with_process_exception(self, parser_cls, options,
self.counter_client.multi_local_inc(self.ip, self.id_, **kw)
self.counter_client.multi_global_inc(**kw)


self._clear_error(bundle)
self._recover_normal()

Expand All @@ -543,7 +550,7 @@ def _parse_with_process_exception(self, parser_cls, options,
except Exception, e:
self._handle_error(bundle, url, e)

return [url, ], []
return [url,], []

def execute(self, bundle, max_sec, is_inc=False):
failed = False
Expand All @@ -565,25 +572,22 @@ def execute(self, bundle, max_sec, is_inc=False):

url = bundle.current_urls.pop(0)
if self.logger:
self.logger.debug('get %s url: %s' %
(bundle.label, url))
self.logger.debug('get %s url: %s' % (bundle.label, url))

rates = 0
span = 0.0
parser_cls, options = self.job_desc.url_patterns.get_parser(url,
options=True)
if parser_cls is not None:
if rates == 0:
rates, span = self.speed_client.require(
DEFAULT_SPEEED_REQUIRE_SIZE)
rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE)
if rates == 0:
if self.stopped.wait(5):
break
rates -= 1

try:
next_urls, bundles = self._parse_with_process_exception(
parser_cls, options, bundle, url)
next_urls, bundles = self._parse_with_process_exception(parser_cls, options, bundle, url)
next_urls = list(self.job_desc.url_patterns.matches(next_urls))
next_urls.extend(bundle.current_urls)
if self.shuffle_urls:
Expand All @@ -597,6 +601,12 @@ def execute(self, bundle, max_sec, is_inc=False):

if bundles:
self.mq.put(bundles)
# inc budget if auto budget enabled
if self.settings.job.size == 'auto':
inc_budgets = len(bundles)
if inc_budgets > 0:
self.budget_client.inc_budgets(inc_budgets)

if hasattr(self.opener, 'close'):
self.opener.close()

Expand Down
Loading