ykit/buildtool/polly/patcher.py

112 lines
4.4 KiB
Python

from dataclasses import dataclass
from typing import Any
from .parser import CharIndex, InsertionAction, PatchAction, Location, parse_patch_file
from pathlib import Path
from task.util import *
# Converts nebulous locations to real positions
def evaluate_position(content: str, location: Location):
if isinstance(location, CharIndex):
return location.char
# Ineligent yet effective solution...
chars_read = 0
if location.line > len(content.split('\n')):
raise RuntimeError("Line number out of range")
for i, line in enumerate(content.split('\n')):
if location.line == i + 1:
if location.column-1 > len(line):
raise RuntimeError("Column number out of range")
return chars_read + location.column - 1
chars_read += len(line) + 1 # Don't forget the newline character!!!
raise RuntimeError("Line number out of range")
@dataclass
class ProcessedInsertion:
position: int
content: str
patch_file_name: str # TODO: Refactor this out of ProcessedInsertion
is_old: bool
type ProcessedAction = ProcessedInsertion
def update_positions_of_patches(actions: list[ProcessedAction], at: int, by: int):
for action in actions:
if action.position < at:
continue
action.position += by
def process_all_patches(dir: Path | str, dest: Path | str, check_date: bool = True):
dir = Path(dir)
patches: dict[Path, list[ProcessedAction]] = {}
# First, we collect all actions from all patches in each patch file
for patch_file in dir.rglob("*.patch"):
pfrn = patch_file.relative_to(dir)
for patch in parse_patch_file(str(patch_file)):
patches[patch.target] = patches.get(patch.target, [])
src_file = EXTRACTED_DIR / patch.target
dest_file = dest / patch.target
for action in patch.actions:
with open(src_file, 'r') as f:
content = f.read()
is_old = dest_file.exists() and patch_file.stat().st_mtime <= dest_file.stat().st_mtime
# We also consider the true position of each insertion, too
if isinstance(action, InsertionAction):
processed_action = ProcessedInsertion(position=evaluate_position(content, action.location),
content=action.content,
patch_file_name=str(pfrn),
is_old=is_old)
else:
raise RuntimeError("I don't know how to deal with those kinds of patches")
patches[patch.target].append(processed_action)
# Skip all patches for which the destination file is newer than all the patches themselves
for file in list(patches.keys()):
for action in patches[file]:
if not action.is_old:
break
else:
print(f"Skipped {file}")
del patches[file]
# Now we actually do all the patching
for target_file, actions in patches.items():
src_file = EXTRACTED_DIR / target_file
dest_file = dest / target_file
with open(src_file, 'r') as f:
content = f.read()
print(f"Processing patches for: {target_file}")
for action in actions:
if isinstance(action, ProcessedInsertion):
# Update the positions of patches which modify after this one, so that their position isn't scrambled by this one
position = action.position
# update_positions_of_patches(actions, position, len(action.content))
# Update content
content = content[:position] + action.content + content[position:]
print(f" |- {action.patch_file_name}")
else:
raise RuntimeError("I don't know how to deal with those kinds of patches")
# Commit changes
dest_file.parent.mkdir(parents=True, exist_ok=True)
with open(dest_file, 'w') as f:
_ = f.write(content)