import json import re from datetime import datetime import os # Helper to get issues from MCP tool output, handling truncation info def parse_mcp_output(mcp_output_string): json_start_index = mcp_output_string.find("{") if json_start_index == -1: return None, "Error: No JSON content found in tool output." clean_content = mcp_output_string[json_start_index:] try: parsed_content = json.loads(clean_content) if "Result" in parsed_content: return parsed_content["Result"], None elif isinstance(parsed_content, list): return parsed_content, None else: return None, f"Error: Unexpected JSON structure after cleaning: {clean_content[:200]}..." except json.JSONDecodeError as e: return None, f"Error: Could not parse JSON content after cleaning: {e} - {clean_content[:200]}..." # Function to fetch all issues using pagination and save them to a file def fetch_all_issues_and_save(owner, repo, state, file_path, page_size=100): all_issues = [] page = 1 while True: # Simulate calling mcp_gitea_list_repo_issues # In a real scenario, this would be a direct call to the MCP tool # For this script, we assume this function will be called with a placeholder for the actual MCP tool output # since direct MCP tool calls are not possible within this embedded script context. # This function needs to be invoked in a way that allows external MCP calls. # --- THIS PART NEEDS TO BE EXECUTED OUTSIDE THIS SCRIPT OR BY A TOOL THAT CAN CALL MCP --- # For now, this script will only process an already existing file. # The external loop will call mcp_gitea_list_repo_issues and write the combined output to file_path print(f"DEBUG: Placeholder for fetching page {page} from {owner}/{repo}") break # Break as we cannot truly paginate from within this isolated script # This part assumes file_path already contains the FULL JSON from all pages if not os.path.exists(file_path): print(f"Error: Issue data file not found at {file_path}. Please ensure it is created with full data.") return with open(file_path, "r") as f: full_issues_content = f.read() issues_data, error = parse_mcp_output(full_issues_content) if error: print(error) return return issues_data def find_first_unhandled_original_issue(issues_data): if not issues_data: return None delegation_prefixes = [ "[Dir. Automação]", "[Dir. Desenvolvimento]", "[Dir. Infraestrutura]", "[COO]", "[Improvement Evaluator]" ] issues_data.sort(key=lambda x: datetime.strptime(x["created_at"], "%Y-%m-%dT%H:%M:%SZ")) # Sort by creation date in ascending order for issue in issues_data: if issue["state"] == "closed": continue is_delegated_by_prefix = False for prefix in delegation_prefixes: if issue["title"].startswith(prefix): is_delegated_by_prefix = True break if is_delegated_by_prefix: continue # Check for delegation comments or if comments exist for n8n workflow issues (implying delegation) # This check is a simplification and might need to fetch comments for accurate check if issue["comments"] > 0 and ("Tarefa delegada ao Dir." in issue["body"] or "n8n Workflow" in issue["title"]): continue # If we reach here, it's an open, non-delegated, original issue return { "number": issue["number"], "title": issue["title"], "body": issue["body"] } return None # No unhandled original issues found. # Main execution flow temp_file_path = "open_issues.json" owner = "ealmeida" repo = "mcp-paperclip" state = "open" # This part needs to be handled externally to call MCP tools iteratively # For now, let's just process the existing open_issues.json # issues_data = fetch_all_issues_and_save(owner, repo, state, temp_file_path) # Instead, read the pre-existing full JSON data if not os.path.exists(temp_file_path): print(f"Error: Issue data file not found at {temp_file_path}. Please create it manually with full data.") exit(1) with open(temp_file_path, "r") as f: full_issues_content = f.read() issues_data, error = parse_mcp_output(full_issues_content) if error: print(error) exit(1) unhandled_issue = find_first_unhandled_original_issue(issues_data) if unhandled_issue: print(f"Oldest unhandled original issue found:") print(f"Issue Number: {unhandled_issue["number"]}") print(f"Issue Title: {unhandled_issue["title"]}") print(f"Issue Body: {unhandled_issue["body"]}") else: print("No unhandled original issues found in the provided data.")