-
Notifications
You must be signed in to change notification settings - Fork 739
/
Copy pathpr_description.py
656 lines (575 loc) · 31 KB
/
pr_description.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
import asyncio
import copy
import re
from functools import partial
from typing import List, Tuple
import yaml
from jinja2 import Environment, StrictUndefined
from pr_agent.algo.ai_handlers.base_ai_handler import BaseAiHandler
from pr_agent.algo.ai_handlers.litellm_ai_handler import LiteLLMAIHandler
from pr_agent.algo.pr_processing import get_pr_diff, retry_with_fallback_models, get_pr_diff_multiple_patchs, \
OUTPUT_BUFFER_TOKENS_HARD_THRESHOLD
from pr_agent.algo.token_handler import TokenHandler
from pr_agent.algo.utils import set_custom_labels
from pr_agent.algo.utils import load_yaml, get_user_labels, ModelType, show_relevant_configurations, get_max_tokens, \
clip_tokens
from pr_agent.config_loader import get_settings
from pr_agent.git_providers import get_git_provider, GithubProvider, get_git_provider_with_context
from pr_agent.git_providers.git_provider import get_main_pr_language
from pr_agent.log import get_logger
from pr_agent.servers.help import HelpMessage
class PRDescription:
def __init__(self, pr_url: str, args: list = None,
ai_handler: partial[BaseAiHandler,] = LiteLLMAIHandler):
"""
Initialize the PRDescription object with the necessary attributes and objects for generating a PR description
using an AI model.
Args:
pr_url (str): The URL of the pull request.
args (list, optional): List of arguments passed to the PRDescription class. Defaults to None.
"""
# Initialize the git provider and main PR language
self.git_provider = get_git_provider_with_context(pr_url)
self.main_pr_language = get_main_pr_language(
self.git_provider.get_languages(), self.git_provider.get_files()
)
self.pr_id = self.git_provider.get_pr_id()
if get_settings().pr_description.enable_semantic_files_types and not self.git_provider.is_supported(
"gfm_markdown"):
get_logger().debug(f"Disabling semantic files types for {self.pr_id}, gfm_markdown not supported.")
get_settings().pr_description.enable_semantic_files_types = False
# Initialize the AI handler
self.ai_handler = ai_handler()
self.ai_handler.main_pr_language = self.main_pr_language
# Initialize the variables dictionary
self.vars = {
"title": self.git_provider.pr.title,
"branch": self.git_provider.get_pr_branch(),
"description": self.git_provider.get_pr_description(full=False),
"language": self.main_pr_language,
"diff": "", # empty diff for initial calculation
"extra_instructions": get_settings().pr_description.extra_instructions,
"commit_messages_str": self.git_provider.get_commit_messages(),
"enable_custom_labels": get_settings().config.enable_custom_labels,
"custom_labels_class": "", # will be filled if necessary in 'set_custom_labels' function
"enable_semantic_files_types": get_settings().pr_description.enable_semantic_files_types,
}
self.user_description = self.git_provider.get_user_description()
# Initialize the token handler
self.token_handler = TokenHandler(
self.git_provider.pr,
self.vars,
get_settings().pr_description_prompt.system,
get_settings().pr_description_prompt.user,
)
# Initialize patches_diff and prediction attributes
self.patches_diff = None
self.prediction = None
self.file_label_dict = None
self.COLLAPSIBLE_FILE_LIST_THRESHOLD = 8
async def run(self):
try:
get_logger().info(f"Generating a PR description for pr_id: {self.pr_id}")
relevant_configs = {'pr_description': dict(get_settings().pr_description),
'config': dict(get_settings().config)}
get_logger().debug("Relevant configs", artifacts=relevant_configs)
if get_settings().config.publish_output and not get_settings().config.get('is_auto_command', False):
self.git_provider.publish_comment("Preparing PR description...", is_temporary=True)
await retry_with_fallback_models(self._prepare_prediction, ModelType.TURBO)
if self.prediction:
self._prepare_data()
else:
get_logger().error(f"Error getting AI prediction {self.pr_id}")
self.git_provider.remove_initial_comment()
return None
if get_settings().pr_description.enable_semantic_files_types:
self.file_label_dict = self._prepare_file_labels()
pr_labels, pr_file_changes = [], []
if get_settings().pr_description.publish_labels:
pr_labels = self._prepare_labels()
if get_settings().pr_description.use_description_markers:
pr_title, pr_body, changes_walkthrough, pr_file_changes = self._prepare_pr_answer_with_markers()
else:
pr_title, pr_body, changes_walkthrough, pr_file_changes = self._prepare_pr_answer()
if not self.git_provider.is_supported(
"publish_file_comments") or not get_settings().pr_description.inline_file_summary:
pr_body += "\n\n" + changes_walkthrough
get_logger().debug("PR output", artifact={"title": pr_title, "body": pr_body})
# Add help text if gfm_markdown is supported
if self.git_provider.is_supported("gfm_markdown") and get_settings().pr_description.enable_help_text:
pr_body += "<hr>\n\n<details> <summary><strong>✨ Describe tool usage guide:</strong></summary><hr> \n\n"
pr_body += HelpMessage.get_describe_usage_guide()
pr_body += "\n</details>\n"
elif get_settings().pr_description.enable_help_comment:
pr_body += "\n\n___\n\n> 💡 **PR-Agent usage**:"
pr_body += "\n>Comment `/help` on the PR to get a list of all available PR-Agent tools and their descriptions\n\n"
# Output the relevant configurations if enabled
if get_settings().get('config', {}).get('output_relevant_configurations', False):
pr_body += show_relevant_configurations(relevant_section='pr_description')
if get_settings().config.publish_output:
# publish labels
if get_settings().pr_description.publish_labels and self.git_provider.is_supported("get_labels"):
original_labels = self.git_provider.get_pr_labels(update=True)
get_logger().debug(f"original labels", artifact=original_labels)
user_labels = get_user_labels(original_labels)
new_labels = pr_labels + user_labels
get_logger().debug(f"published labels", artifact=new_labels)
if sorted(new_labels) != sorted(original_labels):
self.git_provider.publish_labels(new_labels)
else:
get_logger().debug(f"Labels are the same, not updating")
# publish description
if get_settings().pr_description.publish_description_as_comment:
full_markdown_description = f"## Title\n\n{pr_title}\n\n___\n{pr_body}"
if get_settings().pr_description.publish_description_as_comment_persistent:
self.git_provider.publish_persistent_comment(full_markdown_description,
initial_header="## Title",
update_header=True,
name="describe",
final_update_message=False, )
else:
self.git_provider.publish_comment(full_markdown_description)
else:
self.git_provider.publish_description(pr_title, pr_body)
# publish final update message
if (get_settings().pr_description.final_update_message):
latest_commit_url = self.git_provider.get_latest_commit_url()
if latest_commit_url:
pr_url = self.git_provider.get_pr_url()
update_comment = f"**[PR Description]({pr_url})** updated to latest commit ({latest_commit_url})"
self.git_provider.publish_comment(update_comment)
self.git_provider.remove_initial_comment()
except Exception as e:
get_logger().error(f"Error generating PR description {self.pr_id}: {e}")
return ""
async def _prepare_prediction(self, model: str) -> None:
if get_settings().pr_description.use_description_markers and 'pr_agent:' not in self.user_description:
return None
large_pr_handling = get_settings().pr_description.enable_large_pr_handling and "pr_description_only_files_prompts" in get_settings()
output = get_pr_diff(self.git_provider, self.token_handler, model, large_pr_handling=large_pr_handling, return_remaining_files=True)
if isinstance(output, tuple):
patches_diff, remaining_files_list = output
else:
patches_diff = output
remaining_files_list = []
if not large_pr_handling or patches_diff:
self.patches_diff = patches_diff
if patches_diff:
get_logger().debug(f"PR diff", artifact=self.patches_diff)
self.prediction = await self._get_prediction(model, patches_diff, prompt="pr_description_prompt")
if (remaining_files_list and 'pr_files' in self.prediction and 'label:' in self.prediction and
get_settings().pr_description.mention_extra_files):
get_logger().debug(f"Extending additional files, {len(remaining_files_list)} files")
self.prediction = await self.extend_additional_files(remaining_files_list)
else:
get_logger().error(f"Error getting PR diff {self.pr_id}")
self.prediction = None
else:
# get the diff in multiple patches, with the token handler only for the files prompt
get_logger().debug('large_pr_handling for describe')
token_handler_only_files_prompt = TokenHandler(
self.git_provider.pr,
self.vars,
get_settings().pr_description_only_files_prompts.system,
get_settings().pr_description_only_files_prompts.user,
)
(patches_compressed_list, total_tokens_list, deleted_files_list, remaining_files_list, file_dict,
files_in_patches_list) = get_pr_diff_multiple_patchs(
self.git_provider, token_handler_only_files_prompt, model)
# get the files prediction for each patch
if not get_settings().pr_description.async_ai_calls:
results = []
for i, patches in enumerate(patches_compressed_list): # sync calls
patches_diff = "\n".join(patches)
get_logger().debug(f"PR diff number {i + 1} for describe files")
prediction_files = await self._get_prediction(model, patches_diff,
prompt="pr_description_only_files_prompts")
results.append(prediction_files)
else: # async calls
tasks = []
for i, patches in enumerate(patches_compressed_list):
patches_diff = "\n".join(patches)
get_logger().debug(f"PR diff number {i + 1} for describe files")
task = asyncio.create_task(
self._get_prediction(model, patches_diff, prompt="pr_description_only_files_prompts"))
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
file_description_str_list = []
for i, result in enumerate(results):
prediction_files = result.strip().removeprefix('```yaml').strip('`').strip()
if load_yaml(prediction_files) and prediction_files.startswith('pr_files'):
prediction_files = prediction_files.removeprefix('pr_files:').strip()
file_description_str_list.append(prediction_files)
else:
get_logger().debug(f"failed to generate predictions in iteration {i + 1} for describe files")
# generate files_walkthrough string, with proper token handling
token_handler_only_description_prompt = TokenHandler(
self.git_provider.pr,
self.vars,
get_settings().pr_description_only_description_prompts.system,
get_settings().pr_description_only_description_prompts.user)
files_walkthrough = "\n".join(file_description_str_list)
files_walkthrough_prompt = copy.deepcopy(files_walkthrough)
if remaining_files_list:
files_walkthrough_prompt += "\n\nNo more token budget. Additional unprocessed files:"
for file in remaining_files_list:
files_walkthrough_prompt += f"\n- {file}"
if deleted_files_list:
files_walkthrough_prompt += "\n\nAdditional deleted files:"
for file in deleted_files_list:
files_walkthrough_prompt += f"\n- {file}"
tokens_files_walkthrough = len(
token_handler_only_description_prompt.encoder.encode(files_walkthrough_prompt))
total_tokens = token_handler_only_description_prompt.prompt_tokens + tokens_files_walkthrough
max_tokens_model = get_max_tokens(model)
if total_tokens > max_tokens_model - OUTPUT_BUFFER_TOKENS_HARD_THRESHOLD:
# clip files_walkthrough to git the tokens within the limit
files_walkthrough_prompt = clip_tokens(files_walkthrough_prompt,
max_tokens_model - OUTPUT_BUFFER_TOKENS_HARD_THRESHOLD - token_handler_only_description_prompt.prompt_tokens,
num_input_tokens=tokens_files_walkthrough)
# PR header inference
get_logger().debug(f"PR diff only description", artifact=files_walkthrough_prompt)
prediction_headers = await self._get_prediction(model, patches_diff=files_walkthrough_prompt,
prompt="pr_description_only_description_prompts")
prediction_headers = prediction_headers.strip().removeprefix('```yaml').strip('`').strip()
# manually add extra files to final prediction
if get_settings().pr_description.mention_extra_files:
for file in remaining_files_list:
extra_file_yaml = f"""\
- filename: |
{file}
changes_summary: |
...
changes_title: |
...
label: |
additional files (token-limit)
"""
files_walkthrough = files_walkthrough.strip() + "\n" + extra_file_yaml.strip()
# final processing
self.prediction = prediction_headers + "\n" + "pr_files:\n" + files_walkthrough
if not load_yaml(self.prediction):
get_logger().error(f"Error getting valid YAML in large PR handling for describe {self.pr_id}")
if load_yaml(prediction_headers):
get_logger().debug(f"Using only headers for describe {self.pr_id}")
self.prediction = prediction_headers
async def extend_additional_files(self, remaining_files_list) -> str:
prediction = self.prediction
try:
original_prediction_dict = load_yaml(self.prediction)
prediction_extra = "pr_files:"
for file in remaining_files_list:
extra_file_yaml = f"""\
- filename: |
{file}
changes_summary: |
...
changes_title: |
...
label: |
additional files (token-limit)
"""
prediction_extra = prediction_extra + "\n" + extra_file_yaml.strip()
prediction_extra_dict = load_yaml(prediction_extra)
# merge the two dictionaries
if isinstance(original_prediction_dict, dict) and isinstance(prediction_extra_dict, dict):
original_prediction_dict["pr_files"].extend(prediction_extra_dict["pr_files"])
new_yaml = yaml.dump(original_prediction_dict)
if load_yaml(new_yaml):
prediction = new_yaml
return prediction
except Exception as e:
get_logger().error(f"Error extending additional files {self.pr_id}: {e}")
return self.prediction
async def _get_prediction(self, model: str, patches_diff: str, prompt="pr_description_prompt") -> str:
variables = copy.deepcopy(self.vars)
variables["diff"] = patches_diff # update diff
environment = Environment(undefined=StrictUndefined)
set_custom_labels(variables, self.git_provider)
self.variables = variables
system_prompt = environment.from_string(get_settings().get(prompt, {}).get("system", "")).render(variables)
user_prompt = environment.from_string(get_settings().get(prompt, {}).get("user", "")).render(variables)
response, finish_reason = await self.ai_handler.chat_completion(
model=model,
temperature=get_settings().config.temperature,
system=system_prompt,
user=user_prompt
)
return response
def _prepare_data(self):
# Load the AI prediction data into a dictionary
self.data = load_yaml(self.prediction.strip())
if get_settings().pr_description.add_original_user_description and self.user_description:
self.data["User Description"] = self.user_description
# re-order keys
if 'User Description' in self.data:
self.data['User Description'] = self.data.pop('User Description')
if 'title' in self.data:
self.data['title'] = self.data.pop('title')
if 'type' in self.data:
self.data['type'] = self.data.pop('type')
if 'labels' in self.data:
self.data['labels'] = self.data.pop('labels')
if 'description' in self.data:
self.data['description'] = self.data.pop('description')
if 'pr_files' in self.data:
self.data['pr_files'] = self.data.pop('pr_files')
def _prepare_labels(self) -> List[str]:
pr_types = []
# If the 'PR Type' key is present in the dictionary, split its value by comma and assign it to 'pr_types'
if 'labels' in self.data:
if type(self.data['labels']) == list:
pr_types = self.data['labels']
elif type(self.data['labels']) == str:
pr_types = self.data['labels'].split(',')
elif 'type' in self.data:
if type(self.data['type']) == list:
pr_types = self.data['type']
elif type(self.data['type']) == str:
pr_types = self.data['type'].split(',')
pr_types = [label.strip() for label in pr_types]
# convert lowercase labels to original case
try:
if "labels_minimal_to_labels_dict" in self.variables:
d: dict = self.variables["labels_minimal_to_labels_dict"]
for i, label_i in enumerate(pr_types):
if label_i in d:
pr_types[i] = d[label_i]
except Exception as e:
get_logger().error(f"Error converting labels to original case {self.pr_id}: {e}")
return pr_types
def _prepare_pr_answer_with_markers(self) -> Tuple[str, str, str, List[dict]]:
get_logger().info(f"Using description marker replacements {self.pr_id}")
title = self.vars["title"]
body = self.user_description
if get_settings().pr_description.include_generated_by_header:
ai_header = f"### 🤖 Generated by PR Agent at {self.git_provider.last_commit_id.sha}\n\n"
else:
ai_header = ""
ai_type = self.data.get('type')
if ai_type and not re.search(r'<!--\s*pr_agent:type\s*-->', body):
pr_type = f"{ai_header}{ai_type}"
body = body.replace('pr_agent:type', pr_type)
ai_summary = self.data.get('description')
if ai_summary and not re.search(r'<!--\s*pr_agent:summary\s*-->', body):
summary = f"{ai_header}{ai_summary}"
body = body.replace('pr_agent:summary', summary)
ai_walkthrough = self.data.get('pr_files')
walkthrough_gfm = ""
pr_file_changes = []
if ai_walkthrough and not re.search(r'<!--\s*pr_agent:walkthrough\s*-->', body):
try:
walkthrough_gfm, pr_file_changes = self.process_pr_files_prediction(walkthrough_gfm,
self.file_label_dict)
body = body.replace('pr_agent:walkthrough', walkthrough_gfm)
except Exception as e:
get_logger().error(f"Failing to process walkthrough {self.pr_id}: {e}")
body = body.replace('pr_agent:walkthrough', "")
return title, body, walkthrough_gfm, pr_file_changes
def _prepare_pr_answer(self) -> Tuple[str, str, str, List[dict]]:
"""
Prepare the PR description based on the AI prediction data.
Returns:
- title: a string containing the PR title.
- pr_body: a string containing the PR description body in a markdown format.
"""
# Iterate over the dictionary items and append the key and value to 'markdown_text' in a markdown format
markdown_text = ""
# Don't display 'PR Labels'
if 'labels' in self.data and self.git_provider.is_supported("get_labels"):
self.data.pop('labels')
if not get_settings().pr_description.enable_pr_type:
self.data.pop('type')
for key, value in self.data.items():
markdown_text += f"## **{key}**\n\n"
markdown_text += f"{value}\n\n"
# Remove the 'PR Title' key from the dictionary
ai_title = self.data.pop('title', self.vars["title"])
if (not get_settings().pr_description.generate_ai_title):
# Assign the original PR title to the 'title' variable
title = self.vars["title"]
else:
# Assign the value of the 'PR Title' key to 'title' variable
title = ai_title
# Iterate over the remaining dictionary items and append the key and value to 'pr_body' in a markdown format,
# except for the items containing the word 'walkthrough'
pr_body, changes_walkthrough = "", ""
pr_file_changes = []
for idx, (key, value) in enumerate(self.data.items()):
if key == 'pr_files':
value = self.file_label_dict
else:
key_publish = key.rstrip(':').replace("_", " ").capitalize()
if key_publish == "Type":
key_publish = "PR Type"
# elif key_publish == "Description":
# key_publish = "PR Description"
pr_body += f"### **{key_publish}**\n"
if 'walkthrough' in key.lower():
if self.git_provider.is_supported("gfm_markdown"):
pr_body += "<details> <summary>files:</summary>\n\n"
for file in value:
filename = file['filename'].replace("'", "`")
description = file['changes_in_file']
pr_body += f'- `{filename}`: {description}\n'
if self.git_provider.is_supported("gfm_markdown"):
pr_body += "</details>\n"
elif 'pr_files' in key.lower():
changes_walkthrough, pr_file_changes = self.process_pr_files_prediction(changes_walkthrough, value)
changes_walkthrough = f"### **Changes walkthrough** 📝\n{changes_walkthrough}"
else:
# if the value is a list, join its items by comma
if isinstance(value, list):
value = ', '.join(v.rstrip() for v in value)
pr_body += f"{value}\n"
if idx < len(self.data) - 1:
pr_body += "\n\n___\n\n"
return title, pr_body, changes_walkthrough, pr_file_changes,
def _prepare_file_labels(self):
file_label_dict = {}
for file in self.data['pr_files']:
try:
filename = file['filename'].replace("'", "`").replace('"', '`')
changes_summary = file['changes_summary']
changes_title = file['changes_title'].strip()
label = file.get('label').strip().lower()
if label not in file_label_dict:
file_label_dict[label] = []
file_label_dict[label].append((filename, changes_title, changes_summary))
except Exception as e:
get_logger().error(f"Error preparing file label dict {self.pr_id}: {e}")
pass
return file_label_dict
def process_pr_files_prediction(self, pr_body, value):
pr_comments = []
# logic for using collapsible file list
use_collapsible_file_list = get_settings().pr_description.collapsible_file_list
num_files = 0
if value:
for semantic_label in value.keys():
num_files += len(value[semantic_label])
if use_collapsible_file_list == "adaptive":
use_collapsible_file_list = num_files > self.COLLAPSIBLE_FILE_LIST_THRESHOLD
if not self.git_provider.is_supported("gfm_markdown"):
return pr_body
try:
pr_body += "<table>"
header = f"Relevant files"
delta = 75
# header += " " * delta
pr_body += f"""<thead><tr><th></th><th align="left">{header}</th></tr></thead>"""
pr_body += """<tbody>"""
for semantic_label in value.keys():
s_label = semantic_label.strip("'").strip('"')
pr_body += f"""<tr><td><strong>{s_label.capitalize()}</strong></td>"""
list_tuples = value[semantic_label]
if use_collapsible_file_list:
pr_body += f"""<td><details><summary>{len(list_tuples)} files</summary><table>"""
else:
pr_body += f"""<td><table>"""
for filename, file_changes_title, file_change_description in list_tuples:
filename = filename.replace("'", "`").rstrip()
filename_publish = filename.split("/")[-1]
file_changes_title_code = f"<code>{file_changes_title}</code>"
file_changes_title_code_br = insert_br_after_x_chars(file_changes_title_code, x=(delta - 5)).strip()
if len(file_changes_title_code_br) < (delta - 5):
file_changes_title_code_br += " " * ((delta - 5) - len(file_changes_title_code_br))
filename_publish = f"<strong>{filename_publish}</strong><dd>{file_changes_title_code_br}</dd>"
diff_plus_minus = ""
delta_nbsp = ""
diff_files = self.git_provider.get_diff_files()
for f in diff_files:
if f.filename.lower().strip('/') == filename.lower().strip('/'):
num_plus_lines = f.num_plus_lines
num_minus_lines = f.num_minus_lines
diff_plus_minus += f"+{num_plus_lines}/-{num_minus_lines}"
delta_nbsp = " " * max(0, (8 - len(diff_plus_minus)))
break
# try to add line numbers link to code suggestions
link = ""
if hasattr(self.git_provider, 'get_line_link'):
filename = filename.strip()
link = self.git_provider.get_line_link(filename, relevant_line_start=-1)
file_change_description_br = insert_br_after_x_chars(file_change_description, x=(delta - 5))
pr_body += f"""
<tr>
<td>
<details>
<summary>{filename_publish}</summary>
<hr>
{filename}
{file_change_description_br}
</details>
</td>
<td><a href="{link}">{diff_plus_minus}</a>{delta_nbsp}</td>
</tr>
"""
if use_collapsible_file_list:
pr_body += """</table></details></td></tr>"""
else:
pr_body += """</table></td></tr>"""
pr_body += """</tr></tbody></table>"""
except Exception as e:
get_logger().error(f"Error processing pr files to markdown {self.pr_id}: {e}")
pass
return pr_body, pr_comments
def count_chars_without_html(string):
if '<' not in string:
return len(string)
no_html_string = re.sub('<[^>]+>', '', string)
return len(no_html_string)
def insert_br_after_x_chars(text, x=70):
"""
Insert <br> into a string after a word that increases its length above x characters.
Use proper HTML tags for code and new lines.
"""
if count_chars_without_html(text) < x:
return text
# replace odd instances of ` with <code> and even instances of ` with </code>
text = replace_code_tags(text)
# convert list items to <li>
if text.startswith("- "):
text = "<li>" + text[2:]
text = text.replace("\n- ", '<br><li> ').replace("\n - ", '<br><li> ')
# convert new lines to <br>
text = text.replace("\n", '<br>')
# split text into lines
lines = text.split('<br>')
words = []
for i, line in enumerate(lines):
words += line.split(' ')
if i < len(lines) - 1:
words[-1] += "<br>"
new_text = []
is_inside_code = False
current_length = 0
for word in words:
is_saved_word = False
if word == "<code>" or word == "</code>" or word == "<li>" or word == "<br>":
is_saved_word = True
len_word = count_chars_without_html(word)
if not is_saved_word and (current_length + len_word > x):
if is_inside_code:
new_text.append("</code><br><code>")
else:
new_text.append("<br>")
current_length = 0 # Reset counter
new_text.append(word + " ")
if not is_saved_word:
current_length += len_word + 1 # Add 1 for the space
if word == "<li>" or word == "<br>":
current_length = 0
if "<code>" in word:
is_inside_code = True
if "</code>" in word:
is_inside_code = False
return ''.join(new_text).strip()
def replace_code_tags(text):
"""
Replace odd instances of ` with <code> and even instances of ` with </code>
"""
parts = text.split('`')
for i in range(1, len(parts), 2):
parts[i] = '<code>' + parts[i] + '</code>'
return ''.join(parts)