Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Developed 1 new feature, fixed 2 bugs #65

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

brightgems
Copy link

hi chineking,
Issue #64 中讨论的功能(size设置为auto),功能己实现。另外修复了两个BUG:

  1. MQ中消息存储的中文转码错误
  2. URL模式如果parser返回值,executor中会报错
list(res)

thanks,
brightgems

@@ -24,7 +24,7 @@ def labelize(obj):
if isinstance(obj, str):
return obj
elif isinstance(obj, unicode):
return unicode.encode('utf-8')
return obj.encode('utf-8','ignore')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,这是个明显的错误。逗号后要加空格。

@@ -207,7 +211,7 @@ def wait_callback(br):
if method == 'POST':
operation = QNetworkAccessManager.PostOperation
self.br.load(url, wait_callback=wait_callback, tries=tries,
operation=operation, body=data, headers=headers)
operation=operation, body=data, headers=headers,load_timeout= self._default_timout)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同样,逗号后加空格。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同意

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些确定的改动(比如格式),可以先提一个commit修正下。

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里空格,再提交改动下哈。

name='get_jobs')

def init_deduper(self):
deduper_cls = import_module(self.settings.job.components.deduper.cls)

base = 1 if not self.is_bundle else 1000
size = self.job_desc.settings.job.size
if isinstance(size,basestring):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

空格。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同意

@@ -304,6 +302,8 @@ class UrlExecutor(Executor):
def __init__(self, *args, **kwargs):
super(UrlExecutor, self).__init__(*args, **kwargs)
self.budges = 0
self.url_error_times = {}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的改动是因为,url是个str类型,而没有error_times属性?

原则上这么改应该没有问题,但为了和bundle做统一,应当在task里送到executor时,判断url类型,如果是str,应当转成Url对象。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

url_error_times 在实例化后用到,但没有初始化
···

    self.url_error_times[url] = self.url_error_times.get(url, 0) + 1

···

Copy link
Owner

@qinxuye qinxuye Apr 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不是,我的意思是,url的error_times,应当是记在Url对象上的,这样再放回mq的时候,也能保存信息,这里放在executor上,等executor退出了,就丢了。

而url有时候在用户parser返回的是个str,我们应当让executor处理的时候转成cola.core.unit.Url对象才是。而error_times的信息记录在Url对象上。bundle也是这么做的。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有道理,这里下一次pr先修改

@@ -350,6 +353,7 @@ def _handle_error(self, url, e, pack=True):
self._error()
raise UnitRetryFailed


Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

添加行多余了。下同。

if self.settings.job.size == 'auto':
inc_budgets = len(next_urls)
if inc_budgets > 0:
self.budget_client.inc_budgets(inc_budgets)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里,如果next_urls当中存在已经抓取的,这个len(next_urls)就不等于待抓取的urls了吧。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里确实没有考虑到,似乎需要判断mq里put方法存放的url数量
不过由于现在mq的MessageQueueNodeProxy.put没有返回值; 而Store.put其实是返回了保存成功的消息的
是否要先改mq?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方会比较棘手一点,因为MessageQueueNodeProxy在put的时候,并没有真正做放进去的操作,而是对每个其他worker,做了一个cache,当cache满了的时候,才会flush出去。这样做的目的是为了减少网络间的传输开销。

所以,现在put方法并不知道真正放进去了多少个,而去重的操作是在mq的每个节点上才会去做的。

这里budgets的数量大于真实抓取的数量,会导致不能立刻结束么?如果不导致,这里应该就不用改了我理解。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,如果budgets的数量大于真实抓取的数量,会导致JOB一直在等待状态。

好像也没有什么好办法,如果遇到有重复的URL,只能按run_loca_job的思路做。如果IDLE超时,自动结束,对吗?

 while t.is_alive():
            if job.get_status() == FINISHED:
                break
            if job.get_status() == IDLE:
                idle_times += 1
                if idle_times > MAX_IDLE_TIMES:
                    break
            else:

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,我理解现在只能这样。

cola/job/task.py Outdated
else:
no_budgets_times = 0
self._get_unit(curr_priority, self.runnings)
self._get_unit(curr_priority, self.runnings)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方的逻辑改掉了是为什么呢?以前会先申请budgets,现在先取一个unit出来?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果先申请到budget,然后再取不到unit,实际上什么也没有执行,但budget已经消耗了。这样当size=auto时任务会提前结束,但实际还有URL没有处理完

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那我的建议是对是不是auto的情况分开来处理,单独拉一个逻辑,以前的逻辑不动。这样兼容以前的行为。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -314,14 +314,17 @@ def _parse(self, parser_cls, options, url):
counter=ExecutorCounter(self),
settings=ReadOnlySettings(self.settings),
**options).parse()
return list(res)
if res:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免res返回空值后list函数错误

Copy link
Author

@brightgems brightgems left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多谢回复

if self.settings.job.size == 'auto':
inc_budgets = len(next_urls)
if inc_budgets > 0:
self.budget_client.inc_budgets(inc_budgets)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里确实没有考虑到,似乎需要判断mq里put方法存放的url数量
不过由于现在mq的MessageQueueNodeProxy.put没有返回值; 而Store.put其实是返回了保存成功的消息的
是否要先改mq?

cola/job/task.py Outdated
else:
no_budgets_times = 0
self._get_unit(curr_priority, self.runnings)
self._get_unit(curr_priority, self.runnings)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果先申请到budget,然后再取不到unit,实际上什么也没有执行,但budget已经消耗了。这样当size=auto时任务会提前结束,但实际还有URL没有处理完

@@ -207,7 +211,7 @@ def wait_callback(br):
if method == 'POST':
operation = QNetworkAccessManager.PostOperation
self.br.load(url, wait_callback=wait_callback, tries=tries,
operation=operation, body=data, headers=headers)
operation=operation, body=data, headers=headers,load_timeout= self._default_timout)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同意

name='get_jobs')

def init_deduper(self):
deduper_cls = import_module(self.settings.job.components.deduper.cls)

base = 1 if not self.is_bundle else 1000
size = self.job_desc.settings.job.size
if isinstance(size,basestring):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同意

auto close job if idle for 5 times and auto budget enabled
fix lint style
@brightgems
Copy link
Author

submit code again:

  • move UrlExecutor.retury_error_times to Url.error_times
  • auto close job if idle for 5 times and auto budget enabled
  • fix lint style

Copy link
Owner

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解现在除了格式问题,应该没有问题了。麻烦再提交commit,然后跑下tests下的所有case,如果都通过,就可以merge了。

@@ -207,7 +211,7 @@ def wait_callback(br):
if method == 'POST':
operation = QNetworkAccessManager.PostOperation
self.br.load(url, wait_callback=wait_callback, tries=tries,
operation=operation, body=data, headers=headers)
operation=operation, body=data, headers=headers,load_timeout= self._default_timout)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里空格,再提交改动下哈。

while not self.stopped.is_set():
self.idle_statuses[self.container_id] = \
all([task.is_idle() for task in self.tasks])
if self.idle_statuses[self.container_id]:
idle_times += 1
if self.job_desc.settings.job.size=='auto' and idle_times > MAX_IDLE_TIMES:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== 两边空格

continue
else:
no_budgets_times = 0
if self.settings.job.size=='auto':
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== 两边空格

self._get_unit(curr_priority, self.runnings)
# if get unit success, then apply budget, in case budget are wasted
if len(self.runnings)>0:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

两边空格

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants