-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathspreadsheet_chunker.py
251 lines (208 loc) · 12.5 KB
/
spreadsheet_chunker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import logging
import os
import time
from io import BytesIO
from openpyxl import load_workbook
from tabulate import tabulate
from .base_chunker import BaseChunker
class SpreadsheetChunker(BaseChunker):
"""
SpreadsheetChunker processes and chunks spreadsheet content, such as Excel files, into manageable pieces for analysis and summarization.
It handles both chunking by rows or sheets, allowing users to specify whether to include header rows in each chunk, and ensures that
the content size does not exceed a specified token limit.
The class supports the following operations:
- Converts spreadsheets into chunkable content.
- Provides options to chunk either by row or by sheet.
- Includes optional header rows in chunks.
- Summarizes large sheets if the content exceeds the maximum chunk size.
Attributes:
-----------
max_chunk_size (int): Maximum allowed size of each chunk in tokens.
chunking_by_row (bool): Whether to chunk by row instead of by sheet.
include_header_in_chunks (bool): Whether to include header rows in each row-based chunk.
document_content (str): Processed spreadsheet content ready for chunking.
Methods:
--------
- get_chunks(): Splits the spreadsheet content into manageable chunks, based on the configuration.
- _spreadsheet_process(): Extracts and processes data from each sheet, including summaries if necessary.
- _get_sheet_data(sheet): Retrieves data and headers from the given sheet, handling empty cells.
- _clean_markdown_table(table_str): Cleans up Markdown table strings by removing excessive whitespace.
"""
def __init__(self, data, max_chunk_size=None, chunking_by_row=None, include_header_in_chunks=None):
"""
Initializes the SpreadsheetChunker with the provided data and environment configurations.
Args:
data (str): The spreadsheet content to be chunked.
max_chunk_size (int, optional): Maximum allowed size of each chunk in tokens. Defaults to an environment variable 'SPREADSHEET_NUM_TOKENS' or 0 if not set.
chunking_by_row (bool, optional): Whether to chunk by row instead of by sheet. Defaults to an environment variable 'CHUNKING_BY_ROW' or False.
include_header_in_chunks (bool, optional): Whether to include the header row in each chunk if chunking by row. Defaults to 'INCLUDE_HEADER_IN_CHUNKS' environment variable or False.
"""
super().__init__(data)
if max_chunk_size is None:
self.max_chunk_size = int(os.getenv("SPREADSHEET_NUM_TOKENS", 0))
else:
self.max_chunk_size = int(max_chunk_size)
if chunking_by_row is None:
chunking_env = os.getenv("SPREADSHEET_CHUNKING_BY_ROW", "false").lower()
self.chunking_by_row = chunking_env in ["true", "1", "yes"]
else:
self.chunking_by_row = bool(chunking_by_row)
if include_header_in_chunks is None:
include_header_env = os.getenv("SPREADSHEET_CHUNKING_BY_ROW_INCLUDE_HEADER", "false").lower()
self.include_header_in_chunks = include_header_env in ["true", "1", "yes"]
else:
self.include_header_in_chunks = bool(include_header_in_chunks)
def get_chunks(self):
"""
Splits the spreadsheet content into smaller chunks. Depending on the configuration, chunks can be created by sheet or by row.
- If chunking by sheet, the method summarizes content that exceeds the maximum chunk size.
- If chunking by row, each row is processed into its own chunk, optionally including the header row.
Returns:
List[dict]: A list of dictionaries representing the chunks created from the spreadsheet.
"""
chunks = []
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks] Running get_chunks.")
total_start_time = time.time()
sheets = self._spreadsheet_process()
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks] Workbook has {len(sheets)} sheets")
chunk_id = 0
for sheet in sheets:
if not self.chunking_by_row:
# Original behavior: Chunk per sheet
start_time = time.time()
chunk_id += 1
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Starting processing chunk {chunk_id} (sheet).")
table_content = sheet["table"]
table_content = self._clean_markdown_table(table_content)
table_tokens = self.token_estimator.estimate_tokens(table_content)
if self.max_chunk_size > 0 and table_tokens > self.max_chunk_size:
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Table has {table_tokens} tokens. Max tokens is {self.max_chunk_size}. Using summary.")
table_content = sheet["summary"]
chunk_dict = self._create_chunk(
chunk_id=chunk_id,
content=table_content,
summary=sheet["summary"] if not self.chunking_by_row else "",
embedding_text=sheet["summary"] if (sheet["summary"] and not self.chunking_by_row) else table_content,
title=sheet["name"]
)
chunks.append(chunk_dict)
elapsed_time = time.time() - start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Processed chunk {chunk_id} in {elapsed_time:.2f} seconds.")
else:
# New behavior: Chunk per row
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Starting row-wise chunking.")
headers = sheet.get("headers", [])
rows = sheet.get("data", [])
for row_index, row in enumerate(rows, start=1):
if not any(cell.strip() for cell in row):
continue
chunk_id += 1
start_time = time.time()
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Processing chunk {chunk_id} for row {row_index}.")
if self.include_header_in_chunks:
table = tabulate([headers, row], headers="firstrow", tablefmt="github")
else:
table = tabulate([row], headers=headers, tablefmt="github")
table = self._clean_markdown_table(table)
summary = ""
table_tokens = self.token_estimator.estimate_tokens(table)
if self.max_chunk_size > 0 and table_tokens > self.max_chunk_size:
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Row table has {table_tokens} tokens. Max tokens is {self.max_chunk_size}. Truncating content.")
content = table
embedding_text = table
else:
content = table
embedding_text = table
chunk_dict = self._create_chunk(
chunk_id=chunk_id,
content=content,
summary=summary,
embedding_text=embedding_text,
title=f"{sheet['name']} - Row {row_index}"
)
chunks.append(chunk_dict)
elapsed_time = time.time() - start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Processed chunk {chunk_id} in {elapsed_time:.2f} seconds.")
total_elapsed_time = time.time() - total_start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks] Finished get_chunks. Created {len(chunks)} chunks in {total_elapsed_time:.2f} seconds.")
return chunks
def _spreadsheet_process(self):
"""
Extracts and processes each sheet from the spreadsheet, converting the content into Markdown table format.
If chunking by sheet, a summary is generated if the sheet's content exceeds the maximum token size.
Returns:
List[dict]: A list of dictionaries, where each dictionary contains sheet metadata, headers, rows, table content, and a summary if applicable.
"""
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process] Starting blob download.")
blob_data = self.document_bytes
blob_stream = BytesIO(blob_data)
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process] Starting openpyxl load_workbook.")
workbook = load_workbook(blob_stream, data_only=True)
sheets = []
total_start_time = time.time()
for sheet_name in workbook.sheetnames:
logging.info(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_name}] Started processing.")
start_time = time.time()
sheet_dict = {}
sheet_dict['name'] = sheet_name
sheet = workbook[sheet_name]
data, headers = self._get_sheet_data(sheet)
sheet_dict["headers"] = headers
sheet_dict["data"] = data
table = tabulate(data, headers=headers, tablefmt="grid")
table = self._clean_markdown_table(table)
sheet_dict["table"] = table
if not self.chunking_by_row:
prompt = f"Summarize the table with data in it, by understanding the information clearly.\n table_data:{table}"
summary = self.aoai_client.get_completion(prompt, max_tokens=2048)
sheet_dict["summary"] = summary
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_dict['name']}] Generated summary.")
else:
sheet_dict["summary"] = ""
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_dict['name']}] Skipped summary generation (chunking by row).")
elapsed_time = time.time() - start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_dict['name']}] Processed in {elapsed_time:.2f} seconds.")
sheets.append(sheet_dict)
total_elapsed_time = time.time() - total_start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process] Total processing time: {total_elapsed_time:.2f} seconds.")
return sheets
def _get_sheet_data(self, sheet):
"""
Retrieves data and headers from the given sheet. Each row's data is processed into a list format, ensuring that empty rows are excluded.
Args:
sheet (Worksheet): The worksheet object to extract data from.
Returns:
Tuple[List[List[str]], List[str]]: A tuple containing a list of row data and a list of headers.
"""
data = []
for row in sheet.iter_rows(min_row=2): # Start from the second row to skip headers
row_data = []
for cell in row:
cell_value = cell.value
if cell_value is None:
cell_value = ""
cell_text = str(cell_value)
row_data.append(cell_text)
if "".join(row_data).strip() != "":
data.append(row_data)
headers = [cell.value if cell.value is not None else "" for cell in sheet[1]]
return data, headers
def _clean_markdown_table(self, table_str):
"""
Cleans up a Markdown table string by removing excessive whitespace from each cell.
Args:
table_str (str): The Markdown table string to be cleaned.
Returns:
str: The cleaned Markdown table string with reduced whitespace.
"""
cleaned_lines = []
lines = table_str.splitlines()
for line in lines:
if set(line.strip()) <= set('-| '):
cleaned_lines.append(line)
continue
cells = line.split('|')
stripped_cells = [cell.strip() for cell in cells[1:-1]]
cleaned_line = '| ' + ' | '.join(stripped_cells) + ' |'
cleaned_lines.append(cleaned_line)
return '\n'.join(cleaned_lines)