Skip to content

Commit ab8cfdb

Browse files
committed
Added chunkexports extension
1 parent 59e0426 commit ab8cfdb

File tree

2 files changed

+344
-0
lines changed

2 files changed

+344
-0
lines changed

scrapylib/chunkexports.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import os
2+
import copy
3+
from datetime import datetime
4+
5+
from scrapy.contrib.feedexport import FeedExporter
6+
from scrapy.exceptions import NotConfigured
7+
8+
9+
DEFAULT_TIMESTAMP_FORMAT = '%Y-%m-%d-%H'
10+
11+
12+
class ChunkedFeedExporter(FeedExporter):
13+
"""Extension for breaking item exports into chunks.
14+
15+
Settings:
16+
* CHUNKED_FEED_URI: The feed uri to use for exporting (Overrides FEED_URI setting).
17+
* CHUNKED_FEED_FORMAT: The feed format to use for exporting (Overrides FEED_FORMAT setting).
18+
* CHUNKED_FEED_ITEMS_PER_CHUNK: Number of items included in each chunk
19+
* CHUNKED_FEED_TIMESTAMP_FORMAT: A string representing the format to be used for representing \
20+
the ``timestamp`` uri parameter.
21+
22+
Example:
23+
CHUNKED_FEED_URI = 'export_%(chunk_number)02d.json'
24+
CHUNKED_FEED_FORMAT = 'json'
25+
CHUNKED_FEED_ITEMS_PER_CHUNK = 100
26+
27+
For 250 items will generate the following files:
28+
* export_01.json (100 items)
29+
* export_02.json (100 items)
30+
* export_03.json (50 items)
31+
32+
Available uri format values:
33+
* chunk_number: The active chunk counter. (Starts in 1).
34+
* scrapy_job: The Scrapy job (if available).
35+
* scrapy_project_id: The Scrapy job id (if available).
36+
* timestamp: Current timestamp in UTC (formatted with CHUNKED_FEED_TIMESTAMP_FORMAT setting).
37+
38+
"""
39+
40+
def __init__(self, settings):
41+
42+
# Override settings object to reuse feed exporter settings
43+
settings = copy.deepcopy(settings)
44+
self.settings = settings
45+
46+
# Get chunked settings
47+
chunked_feed_uri = self._get_from_settings_or_not_configured('CHUNKED_FEED_URI', None)
48+
chunked_feed_format = self._get_from_settings_or_not_configured('CHUNKED_FEED_FORMAT')
49+
self._items_per_chunk = self._get_from_settings_or_not_configured('CHUNKED_FEED_ITEMS_PER_CHUNK')
50+
51+
# Settings override
52+
settings.set('FEED_URI', chunked_feed_uri, 100)
53+
settings.set('FEED_FORMAT', chunked_feed_format, 100)
54+
55+
# Parent call with overridden settings
56+
super(ChunkedFeedExporter, self).__init__(settings)
57+
58+
# Internal stuff
59+
self._chunk_number = 1
60+
self._uripar = self.get_uri_parameters
61+
self._timestamp_format = settings.get('CHUNKED_FEED_TIMESTAMP_FORMAT', DEFAULT_TIMESTAMP_FORMAT)
62+
63+
# Get uri parameters from settings or environment
64+
self.settings = settings
65+
self._scrapy_job = self._get_from_settings_or_environ('SCRAPY_JOB', 'nojob')
66+
self._scrapy_project = self._get_from_settings_or_environ('SCRAPY_PROJECT', 'noproject')
67+
self._scrapy_project_id = self._get_from_settings_or_environ('SCRAPY_PROJECT_ID', 'noprojectid')
68+
69+
def get_uri_parameters(self, params, spider):
70+
"""Update feed uri available parameters. Override if you want to add more parameters"""
71+
params.update({
72+
'chunk_number': self._chunk_number,
73+
'scrapy_job': self._scrapy_job,
74+
'scrapy_project_id': self._scrapy_project_id,
75+
'timestamp': datetime.utcnow().strftime("%Y-%m-%d-%H"),
76+
})
77+
78+
def item_scraped(self, item, spider):
79+
if self._items_per_chunk and self.slot.itemcount >= self._items_per_chunk:
80+
self._reset_exporter(spider)
81+
item = super(ChunkedFeedExporter, self).item_scraped(item, spider)
82+
return item
83+
84+
85+
def _reset_exporter(self, spider):
86+
self.close_spider(spider)
87+
self._chunk_number += 1
88+
self.open_spider(spider)
89+
90+
def _get_from_settings_or_environ(self, name, default):
91+
return self.settings.get(name=name, default=os.environ.get(name, default))
92+
93+
def _get_from_settings_or_not_configured(self, name, default=None):
94+
value = self.settings.get(name, default)
95+
if not value:
96+
raise NotConfigured
97+
return value

tests/test_chunkexports.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
import os
2+
import json
3+
import shutil
4+
import unittest
5+
6+
from scrapy.spider import Spider
7+
from scrapy import Item, Field
8+
from scrapy.utils.test import get_crawler
9+
from scrapy.exceptions import NotConfigured
10+
11+
from scrapylib.chunkexports import ChunkedFeedExporter
12+
13+
EXPORT_TEMP_DIR = '.exports'
14+
EXPORT_FILE_PATTERN = EXPORT_TEMP_DIR + '/export_test_%(chunk_number)02d.json'
15+
JSON_FEED_EXPORTERS = {'json': 'scrapy.contrib.exporter.JsonItemExporter'}
16+
17+
18+
class FakeItem(Item):
19+
id = Field()
20+
21+
22+
class ItemGenerator(object):
23+
item_id = 0
24+
25+
@classmethod
26+
def generate(cls):
27+
cls.item_id += 1
28+
return FakeItem(id=cls.item_id)
29+
30+
@classmethod
31+
def reset(cls):
32+
cls.item_id = 0
33+
34+
35+
class ChunkExtensionTest(object):
36+
settings = {}
37+
38+
def tearDown(self):
39+
self.remove_temp_dir()
40+
41+
def start(self, n_items_per_chunk=None, n_items=None, settings=None):
42+
43+
# Reset item generator and remove temporary dir
44+
ItemGenerator.reset()
45+
self.remove_temp_dir()
46+
47+
# Setup settings
48+
settings = settings or self.settings.copy()
49+
if n_items_per_chunk is not None:
50+
settings['CHUNKED_FEED_ITEMS_PER_CHUNK'] = n_items_per_chunk
51+
52+
# Init Scrapy
53+
self.crawler = get_crawler(settings)
54+
self.spider = Spider('chunk_test')
55+
self.spider.set_crawler(self.crawler)
56+
self.extension = ChunkedFeedExporter.from_crawler(self.crawler)
57+
self.extension.open_spider(self.spider)
58+
59+
# Add items if we have to
60+
if n_items:
61+
self.add_items(n_items)
62+
63+
def stop(self):
64+
return self.extension.close_spider(self.spider)
65+
66+
def remove_temp_dir(self):
67+
shutil.rmtree(EXPORT_TEMP_DIR, ignore_errors=True)
68+
69+
def add_items(self, n_items):
70+
for i in range(1, n_items+1):
71+
item = ItemGenerator.generate()
72+
self.extension.item_scraped(item, self.spider)
73+
74+
def get_chunk_filename(self, chunk):
75+
return EXPORT_FILE_PATTERN % {'chunk_number':chunk}
76+
77+
def get_chunk_filenames(self):
78+
return [f for f in os.listdir(EXPORT_TEMP_DIR) if f.endswith(".json")]
79+
80+
def get_number_of_chunks(self):
81+
return len(self.get_chunk_filenames())
82+
83+
def get_chunk_content(self, chunk):
84+
with open(self.get_chunk_filename(chunk)) as f:
85+
return json.load(f)
86+
87+
def ensure_number_of_chunks(self, n_chunks):
88+
n = self.get_number_of_chunks()
89+
assert n_chunks == n, "Wrong number of chunks. found %d, expecting %d" % (n, n_chunks)
90+
91+
def ensure_number_of_exported_items_per_chunk(self, chunk, n_items):
92+
n_exported_items = len(self.get_chunk_content(chunk))
93+
assert n_items == n_exported_items, "Wrong number of exported items. found %d, expecting %d" % \
94+
(n_exported_items, n_items)
95+
96+
97+
class ConfigFailures(ChunkExtensionTest, unittest.TestCase):
98+
99+
def test_no_settings(self):
100+
self.assertRaises(NotConfigured, self.start, settings={})
101+
102+
def test_no_feed_uri(self):
103+
self.assertRaises(NotConfigured, self.start, settings={
104+
'CHUNKED_FEED_FORMAT': 'json',
105+
'CHUNKED_FEED_ITEMS_PER_CHUNK': 1,
106+
'FEED_EXPORTERS': JSON_FEED_EXPORTERS
107+
})
108+
109+
def test_no_feed_format(self):
110+
self.assertRaises(NotConfigured, self.start, settings={
111+
'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN,
112+
'CHUNKED_FEED_ITEMS_PER_CHUNK': 1,
113+
'FEED_EXPORTERS': JSON_FEED_EXPORTERS
114+
})
115+
116+
def test_no_feed_items_per_chunk(self):
117+
self.assertRaises(NotConfigured, self.start, settings={
118+
'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN,
119+
'CHUNKED_FEED_FORMAT': 'json',
120+
'FEED_EXPORTERS': JSON_FEED_EXPORTERS
121+
})
122+
123+
def test_zero_feed_items_per_chunk(self):
124+
self.assertRaises(NotConfigured, self.start, settings={
125+
'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN,
126+
'CHUNKED_FEED_FORMAT': 'json',
127+
'CHUNKED_FEED_ITEMS_PER_CHUNK': 0,
128+
'FEED_EXPORTERS': JSON_FEED_EXPORTERS
129+
})
130+
131+
132+
class ItemsAndChunks(ChunkExtensionTest, unittest.TestCase):
133+
settings = {
134+
'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN,
135+
'CHUNKED_FEED_FORMAT': 'json',
136+
'CHUNKED_FEED_ITEMS_PER_CHUNK': 1,
137+
'FEED_EXPORTERS': JSON_FEED_EXPORTERS
138+
}
139+
140+
def test_items_0(self):
141+
# FIXME: Scrapy exporter creates always one file
142+
self.start(n_items=0, n_items_per_chunk=1)
143+
self.stop()
144+
#self.ensure_number_of_chunks(n_chunks=0)
145+
146+
def test_items_1_chunksize_1(self):
147+
self.start(n_items=1, n_items_per_chunk=1)
148+
self.stop()
149+
self.ensure_number_of_chunks(n_chunks=1)
150+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1)
151+
152+
def test_items_1_chunksize_2(self):
153+
self.start(n_items=1, n_items_per_chunk=2)
154+
self.stop()
155+
self.ensure_number_of_chunks(n_chunks=1)
156+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1)
157+
158+
def test_items_2_chunksize_1(self):
159+
self.start(n_items=2, n_items_per_chunk=1)
160+
self.stop()
161+
self.ensure_number_of_chunks(n_chunks=2)
162+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1)
163+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=1)
164+
165+
def test_items_2_chunksize_2(self):
166+
self.start(n_items=2, n_items_per_chunk=2)
167+
self.stop()
168+
self.ensure_number_of_chunks(n_chunks=1)
169+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=2)
170+
171+
def test_items_2_chunksize_3(self):
172+
self.start(n_items=2, n_items_per_chunk=3)
173+
self.stop()
174+
self.ensure_number_of_chunks(n_chunks=1)
175+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=2)
176+
177+
def test_items_5_chunksize_1(self):
178+
self.start(n_items=5, n_items_per_chunk=1)
179+
self.stop()
180+
self.ensure_number_of_chunks(n_chunks=5)
181+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1)
182+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=1)
183+
self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=1)
184+
self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=1)
185+
self.ensure_number_of_exported_items_per_chunk(chunk=5, n_items=1)
186+
187+
def test_items_5_chunksize_2(self):
188+
self.start(n_items=5, n_items_per_chunk=2)
189+
self.stop()
190+
self.ensure_number_of_chunks(n_chunks=3)
191+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=2)
192+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=2)
193+
self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=1)
194+
195+
def test_items_5_chunksize_3(self):
196+
self.start(n_items=5, n_items_per_chunk=3)
197+
self.stop()
198+
self.ensure_number_of_chunks(n_chunks=2)
199+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=3)
200+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=2)
201+
202+
def test_items_5_chunksize_4(self):
203+
self.start(n_items=5, n_items_per_chunk=4)
204+
self.stop()
205+
self.ensure_number_of_chunks(n_chunks=2)
206+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=4)
207+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=1)
208+
209+
def test_items_5_chunksize_5(self):
210+
self.start(n_items=5, n_items_per_chunk=5)
211+
self.stop()
212+
self.ensure_number_of_chunks(n_chunks=1)
213+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=5)
214+
215+
def test_items_5_chunksize_6(self):
216+
self.start(n_items=5, n_items_per_chunk=6)
217+
self.stop()
218+
self.ensure_number_of_chunks(n_chunks=1)
219+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=5)
220+
221+
def test_items_100_chunksize_25(self):
222+
self.start(n_items=100, n_items_per_chunk=25)
223+
self.stop()
224+
self.ensure_number_of_chunks(n_chunks=4)
225+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=25)
226+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=25)
227+
self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=25)
228+
self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=25)
229+
230+
def test_items_100_chunksize_24(self):
231+
self.start(n_items=100, n_items_per_chunk=24)
232+
self.stop()
233+
self.ensure_number_of_chunks(n_chunks=5)
234+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=24)
235+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=24)
236+
self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=24)
237+
self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=24)
238+
self.ensure_number_of_exported_items_per_chunk(chunk=5, n_items=4)
239+
240+
def test_items_100_chunksize_26(self):
241+
self.start(n_items=100, n_items_per_chunk=26)
242+
self.stop()
243+
self.ensure_number_of_chunks(n_chunks=4)
244+
self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=26)
245+
self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=26)
246+
self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=26)
247+
self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=22)

0 commit comments

Comments
 (0)