from dataclasses import dataclass from typing import Any from .parser import CharIndex, InsertionAction, PatchAction, Location, parse_patch_file from pathlib import Path from task.util import * # Converts nebulous locations to real positions def evaluate_position(content: str, location: Location): if isinstance(location, CharIndex): return location.char # Ineligent yet effective solution... chars_read = 0 if location.line > len(content.split('\n')): raise RuntimeError("Line number out of range") for i, line in enumerate(content.split('\n')): if location.line == i + 1: if location.column-1 > len(line): raise RuntimeError("Column number out of range") return chars_read + location.column - 1 chars_read += len(line) + 1 # Don't forget the newline character!!! raise RuntimeError("Line number out of range") @dataclass class ProcessedInsertion: position: int content: str patch_file_name: str # TODO: Refactor this out of ProcessedInsertion is_old: bool type ProcessedAction = ProcessedInsertion def update_positions_of_patches(actions: list[ProcessedAction], at: int, by: int): for action in actions: if action.position < at: continue action.position += by def process_all_patches(dir: Path | str, dest: Path | str, check_date: bool = True): dir = Path(dir) patches: dict[Path, list[ProcessedAction]] = {} # First, we collect all actions from all patches in each patch file for patch_file in dir.rglob("*.patch"): pfrn = patch_file.relative_to(dir) for patch in parse_patch_file(str(patch_file)): patches[patch.target] = patches.get(patch.target, []) src_file = EXTRACTED_DIR / patch.target dest_file = dest / patch.target for action in patch.actions: with open(src_file, 'r') as f: content = f.read() is_old = dest_file.exists() and patch_file.stat().st_mtime <= dest_file.stat().st_mtime # We also consider the true position of each insertion, too if isinstance(action, InsertionAction): processed_action = ProcessedInsertion(position=evaluate_position(content, action.location), content=action.content, patch_file_name=str(pfrn), is_old=is_old) else: raise RuntimeError("I don't know how to deal with those kinds of patches") patches[patch.target].append(processed_action) # Skip all patches for which the destination file is newer than all the patches themselves for file in list(patches.keys()): for action in patches[file]: if not action.is_old: break else: print(f"Skipped {file}") del patches[file] # Now we actually do all the patching for target_file, actions in patches.items(): src_file = EXTRACTED_DIR / target_file dest_file = dest / target_file with open(src_file, 'r') as f: content = f.read() print(f"Processing patches for: {target_file}") for action in actions: if isinstance(action, ProcessedInsertion): # Update the positions of patches which modify after this one, so that their position isn't scrambled by this one position = action.position # update_positions_of_patches(actions, position, len(action.content)) # Update content content = content[:position] + action.content + content[position:] print(f" |- {action.patch_file_name}") else: raise RuntimeError("I don't know how to deal with those kinds of patches") # Commit changes dest_file.parent.mkdir(parents=True, exist_ok=True) with open(dest_file, 'w') as f: _ = f.write(content)