From 76643dc2a6e5d5f761c8a96a1e1eccf40730ffdf Mon Sep 17 00:00:00 2001 From: Tony Duckles Date: Tue, 7 Aug 2012 22:00:26 -0500 Subject: [PATCH] Introduce 'svnancest' utility Introduce a helper utility for calling find_svn_ancestors(), to show full ancestry ("svn copy" history) for a given starting URL (optionally at a specific revision). * svn2svn/run/svnacest.py: Adding * svn2svn/run/common.py (in_svn, is_child_path, join_path, find_svn_ancestors): Move common helper functions to new separate library, for sharing with svn2svn/run/svnacest.py. * svn2svn/svnclient.py (valid_svn_actions): Move this constant here from svn2svn/run/svnreplay.py. --- svn2svn/run/common.py | 156 ++++++++++++++++++++++++++++++++++++++ svn2svn/run/svnancest.py | 77 +++++++++++++++++++ svn2svn/run/svnreplay.py | 160 +-------------------------------------- svn2svn/svnclient.py | 1 + svnancest.py | 4 + 5 files changed, 242 insertions(+), 156 deletions(-) create mode 100644 svn2svn/run/common.py create mode 100755 svn2svn/run/svnancest.py create mode 100755 svnancest.py diff --git a/svn2svn/run/common.py b/svn2svn/run/common.py new file mode 100644 index 0000000..2491993 --- /dev/null +++ b/svn2svn/run/common.py @@ -0,0 +1,156 @@ +from svn2svn import ui +from svn2svn import svnclient + +import operator + + +def in_svn(p, require_in_repo=False, prefix=""): + """ + Check if a given file/folder is being tracked by Subversion. + Prior to SVN 1.6, we could "cheat" and look for the existence of ".svn" directories. + With SVN 1.7 and beyond, WC-NG means only a single top-level ".svn" at the root of the working-copy. + Use "svn status" to check the status of the file/folder. + """ + entries = svnclient.status(p, non_recursive=True) + if not entries: + return False + d = entries[0] + if require_in_repo and (d['status'] == 'added' or d['revision'] is None): + # If caller requires this path to be in the SVN repo, prevent returning True + # for paths that are only locally-added. + ret = False + else: + # Don't consider files tracked as deleted in the WC as under source-control. + # Consider files which are locally added/copied as under source-control. + ret = True if not (d['status'] == 'deleted') and (d['type'] == 'normal' or d['status'] == 'added' or d['copied'] == 'true') else False + ui.status(prefix + ">> in_svn('%s', require_in_repo=%s) --> %s", p, str(require_in_repo), str(ret), level=ui.DEBUG, color='GREEN') + return ret + +def is_child_path(path, p_path): + return True if (path == p_path) or (path.startswith(p_path+"/")) else False + +def join_path(base, child): + base.rstrip('/') + return base+"/"+child if child else base + +def find_svn_ancestors(svn_repos_url, start_path, start_rev, stop_base_path=None, prefix=""): + """ + Given an initial starting path+rev, walk the SVN history backwards to inspect the + ancestry of that path, optionally seeing if it traces back to stop_base_path. + + Build an array of copyfrom_path and copyfrom_revision pairs for each of the "svn copy"'s. + If we find a copyfrom_path which stop_base_path is a substring match of (e.g. we crawled + back to the initial branch-copy from trunk), then return the collection of ancestor + paths. Otherwise, copyfrom_path has no ancestry compared to stop_base_path. + + This is useful when comparing "trunk" vs. "branch" paths, to handle cases where a + file/folder was renamed in a branch and then that branch was merged back to trunk. + + 'svn_repos_url' is the full URL to the root of the SVN repository, + e.g. 'file:///path/to/repo' + 'start_path' is the path in the SVN repo to the source path to start checking + ancestry at, e.g. '/branches/fix1/projectA/file1.txt'. + 'start_rev' is the revision to start walking the history of start_path backwards from. + 'stop_base_path' is the path in the SVN repo to stop tracing ancestry once we've reached, + i.e. the target path we're trying to trace ancestry back to, e.g. '/trunk'. + """ + ui.status(prefix + ">> find_svn_ancestors: Start: (%s) start_path: %s stop_base_path: %s", + svn_repos_url, start_path+"@"+str(start_rev), stop_base_path, level=ui.DEBUG, color='YELLOW') + done = False + no_ancestry = False + cur_path = start_path + cur_rev = start_rev + first_iter_done = False + ancestors = [] + while not done: + # Get the first "svn log" entry for cur_path (relative to @cur_rev) + ui.status(prefix + ">> find_svn_ancestors: %s", svn_repos_url+cur_path+"@"+str(cur_rev), level=ui.DEBUG, color='YELLOW') + log_entry = svnclient.get_first_svn_log_entry(svn_repos_url+cur_path, 1, cur_rev) + if not log_entry: + ui.status(prefix + ">> find_svn_ancestors: Done: no log_entry", level=ui.DEBUG, color='YELLOW') + done = True + break + # If we found a copy-from case which matches our stop_base_path, we're done. + # ...but only if we've at least tried to search for the first copy-from path. + if stop_base_path is not None and first_iter_done and is_child_path(cur_path, stop_base_path): + ui.status(prefix + ">> find_svn_ancestors: Done: Found is_child_path(cur_path, stop_base_path) and first_iter_done=True", level=ui.DEBUG, color='YELLOW') + done = True + break + first_iter_done = True + # Search for any actions on our target path (or parent paths). + changed_paths_temp = [] + for d in log_entry['changed_paths']: + path = d['path'] + if is_child_path(cur_path, path): + changed_paths_temp.append({'path': path, 'data': d}) + if not changed_paths_temp: + # If no matches, then we've hit the end of the ancestry-chain. + ui.status(prefix + ">> find_svn_ancestors: Done: No matching changed_paths", level=ui.DEBUG, color='YELLOW') + done = True + continue + # Reverse-sort any matches, so that we start with the most-granular (deepest in the tree) path. + changed_paths = sorted(changed_paths_temp, key=operator.itemgetter('path'), reverse=True) + # Find the action for our cur_path in this revision. Use a loop to check in reverse order, + # so that if the target file/folder is "M" but has a parent folder with an "A" copy-from + # then we still correctly match the deepest copy-from. + for v in changed_paths: + d = v['data'] + path = d['path'] + # Check action-type for this file + action = d['action'] + if action not in svnclient.valid_svn_actions: + raise UnsupportedSVNAction("In SVN rev. %d: action '%s' not supported. Please report a bug!" + % (log_entry['revision'], action)) + ui.status(prefix + "> %s %s%s", action, path, + (" (from %s)" % (d['copyfrom_path']+"@"+str(d['copyfrom_revision']))) if d['copyfrom_path'] else "", + level=ui.DEBUG, color='YELLOW') + if action == 'D': + # If file/folder was deleted, ancestry-chain stops here + if stop_base_path: + no_ancestry = True + ui.status(prefix + ">> find_svn_ancestors: Done: deleted", level=ui.DEBUG, color='YELLOW') + done = True + break + if action in 'RA': + # If file/folder was added/replaced but not a copy, ancestry-chain stops here + if not d['copyfrom_path']: + if stop_base_path: + no_ancestry = True + ui.status(prefix + ">> find_svn_ancestors: Done: %s with no copyfrom_path", + "Added" if action == "A" else "Replaced", + level=ui.DEBUG, color='YELLOW') + done = True + break + # Else, file/folder was added/replaced and is a copy, so add an entry to our ancestors list + # and keep checking for ancestors + ui.status(prefix + ">> find_svn_ancestors: Found copy-from (action=%s): %s --> %s", + action, path, d['copyfrom_path']+"@"+str(d['copyfrom_revision']), + level=ui.DEBUG, color='YELLOW') + ancestors.append({'path': cur_path, 'revision': log_entry['revision'], + 'copyfrom_path': cur_path.replace(d['path'], d['copyfrom_path']), 'copyfrom_rev': d['copyfrom_revision']}) + cur_path = cur_path.replace(d['path'], d['copyfrom_path']) + cur_rev = d['copyfrom_revision'] + # Follow the copy and keep on searching + break + if stop_base_path and no_ancestry: + # If we're tracing back ancestry to a specific target stop_base_path and + # the ancestry-chain stopped before we reached stop_base_path, then return + # nothing since there is no ancestry chaining back to that target. + ancestors = [] + if ancestors: + if ui.get_level() >= ui.DEBUG: + max_len = 0 + for idx in range(len(ancestors)): + d = ancestors[idx] + max_len = max(max_len, len(d['path']+"@"+str(d['revision']))) + ui.status(prefix + ">> find_svn_ancestors: Found parent ancestors:", level=ui.DEBUG, color='YELLOW_B') + for idx in range(len(ancestors)): + d = ancestors[idx] + ui.status(prefix + " [%s] %s --> %s", idx, + str(d['path']+"@"+str(d['revision'])).ljust(max_len), + str(d['copyfrom_path']+"@"+str(d['copyfrom_rev'])), + level=ui.DEBUG, color='YELLOW') + else: + ui.status(prefix + ">> find_svn_ancestors: No ancestor-chain found: %s", + svn_repos_url+start_path+"@"+str(start_rev), level=ui.DEBUG, color='YELLOW') + return ancestors diff --git a/svn2svn/run/svnancest.py b/svn2svn/run/svnancest.py new file mode 100755 index 0000000..5708edd --- /dev/null +++ b/svn2svn/run/svnancest.py @@ -0,0 +1,77 @@ +""" +Display ancestry for a given path in an SVN repository. +""" + +from svn2svn import base_version, full_version +from svn2svn import ui +from svn2svn import svnclient +from parse import HelpFormatter +from svn2svn.run.common import find_svn_ancestors + +import optparse +import re + +options = None + +def real_main(args): + global options + url = args.pop(0) + ui.status("url: %s", url, level=ui.DEBUG, color='GREEN') + info = svnclient.info(url) + repos_root = info['repos_url'] + repos_path = url[len(repos_root):] + ancestors = find_svn_ancestors(repos_root, repos_path, options.revision) + if ancestors: + max_len = 0 + for idx in range(len(ancestors)): + d = ancestors[idx] + max_len = max(max_len, len(d['path']+"@"+str(d['revision']))) + for idx in range(len(ancestors)): + d = ancestors[idx] + ui.status("[%s] %s --> %s", len(ancestors)-idx-1, + str(d['path']+"@"+str(d['revision'])).ljust(max_len), + str(d['copyfrom_path']+"@"+str(d['copyfrom_rev']))) + else: + ui.status("No ancestor-chain found: %s", repos_root+repos_path+"@"+str(options.revision)) + +def main(): + # Defined as entry point. Must be callable without arguments. + usage = "svn2svn, version %s\n" % str(full_version) + \ + " \n\n" + \ + "Usage: %prog [OPTIONS] url\n" + description = """\ +Display ancestry for a given path in an SVN repository.""" + parser = optparse.OptionParser(usage, description=description, + formatter=HelpFormatter(), version="%prog "+str(full_version)) + parser.add_option("-v", "--verbose", dest="verbosity", action="count", default=1, + help="enable additional output (use -vv or -vvv for more)") + parser.add_option("-r", "--revision", type="string", dest="revision", metavar="ARG", + help="revision range to replay from source_url\n" + "Any revision # formats which SVN understands are " + "supported, e.g. 'HEAD', '{2010-01-31}', etc.") + parser.add_option("--debug", dest="verbosity", const=ui.DEBUG, action="store_const", + help="enable debugging output (same as -vvv)") + global options + options, args = parser.parse_args() + if len(args) != 1: + parser.error("incorrect number of arguments") + if options.verbosity < 10: + # Expand multiple "-v" arguments to a real ui._level value + options.verbosity *= 10 + if options.revision: + # Reg-ex for matching a revision arg (http://svnbook.red-bean.com/en/1.5/svn.tour.revs.specifiers.html#svn.tour.revs.dates) + rev_patt = '[0-9A-Z]+|\{[0-9A-Za-z/\\ :-]+\}' + rev = None + match = re.match('^('+rev_patt+')$', options.revision) + if match is None: + parser.error("unexpected --revision argument format; see 'svn help log' for valid revision formats") + rev = match.groups() + options.revision = rev[0] if len(rev)>0 else None + else: + options.revision = 'HEAD' + ui.update_config(options) + return real_main(args) + + +if __name__ == "__main__": + sys.exit(main() or 0) diff --git a/svn2svn/run/svnreplay.py b/svn2svn/run/svnreplay.py index 77e1061..6a0922f 100644 --- a/svn2svn/run/svnreplay.py +++ b/svn2svn/run/svnreplay.py @@ -7,7 +7,8 @@ from svn2svn import ui from svn2svn import shell from svn2svn import svnclient from svn2svn.shell import run_svn,run_shell_command -from svn2svn.errors import (ExternalCommandFailed, UnsupportedSVNAction, InternalError, VerificationError) +from svn2svn.errors import ExternalCommandFailed, UnsupportedSVNAction, InternalError, VerificationError +from svn2svn.run.common import in_svn, is_child_path, join_path, find_svn_ancestors from parse import HelpFormatter from breakhandler import BreakHandler @@ -20,8 +21,6 @@ import re import urllib from datetime import datetime -_valid_svn_actions = "MARD" # The list of known SVN action abbr's, from "svn log" - # Module-level variables/parameters source_url = "" # URL to source path in source SVN repo, e.g. 'http://server/svn/source/trunk' source_repos_url = "" # URL to root of source SVN repo, e.g. 'http://server/svn/source' @@ -218,7 +217,7 @@ def verify_commit(source_rev, target_rev, log_entry=None): if not match_d: match_d = d path = d['path'] - if d['action'] not in _valid_svn_actions: + if d['action'] not in svnclient.valid_svn_actions: raise UnsupportedSVNAction("In SVN rev. %d: action '%s' not supported. Please report a bug!" % (log_entry['revision'], d['action'])) if d['action'] in 'AR' and d['copyfrom_revision']: @@ -385,157 +384,6 @@ def sync_svn_props(source_url, source_rev, path_offset): # whose value differs between source vs. target. run_svn(["propset", prop, source_props[prop], svnclient.safe_path(path_offset)]) -def in_svn(p, require_in_repo=False, prefix=""): - """ - Check if a given file/folder is being tracked by Subversion. - Prior to SVN 1.6, we could "cheat" and look for the existence of ".svn" directories. - With SVN 1.7 and beyond, WC-NG means only a single top-level ".svn" at the root of the working-copy. - Use "svn status" to check the status of the file/folder. - """ - entries = svnclient.status(p, non_recursive=True) - if not entries: - return False - d = entries[0] - if require_in_repo and (d['status'] == 'added' or d['revision'] is None): - # If caller requires this path to be in the SVN repo, prevent returning True - # for paths that are only locally-added. - ret = False - else: - # Don't consider files tracked as deleted in the WC as under source-control. - # Consider files which are locally added/copied as under source-control. - ret = True if not (d['status'] == 'deleted') and (d['type'] == 'normal' or d['status'] == 'added' or d['copied'] == 'true') else False - ui.status(prefix + ">> in_svn('%s', require_in_repo=%s) --> %s", p, str(require_in_repo), str(ret), level=ui.DEBUG, color='GREEN') - return ret - -def is_child_path(path, p_path): - return True if (path == p_path) or (path.startswith(p_path+"/")) else False - -def join_path(base, child): - base.rstrip('/') - return base+"/"+child if child else base - -def find_svn_ancestors(svn_repos_url, start_path, start_rev, stop_base_path=None, prefix=""): - """ - Given an initial starting path+rev, walk the SVN history backwards to inspect the - ancestry of that path, optionally seeing if it traces back to stop_base_path. - - Build an array of copyfrom_path and copyfrom_revision pairs for each of the "svn copy"'s. - If we find a copyfrom_path which stop_base_path is a substring match of (e.g. we crawled - back to the initial branch-copy from trunk), then return the collection of ancestor - paths. Otherwise, copyfrom_path has no ancestry compared to stop_base_path. - - This is useful when comparing "trunk" vs. "branch" paths, to handle cases where a - file/folder was renamed in a branch and then that branch was merged back to trunk. - - 'svn_repos_url' is the full URL to the root of the SVN repository, - e.g. 'file:///path/to/repo' - 'start_path' is the path in the SVN repo to the source path to start checking - ancestry at, e.g. '/branches/fix1/projectA/file1.txt'. - 'start_rev' is the revision to start walking the history of start_path backwards from. - 'stop_base_path' is the path in the SVN repo to stop tracing ancestry once we've reached, - i.e. the target path we're trying to trace ancestry back to, e.g. '/trunk'. - """ - ui.status(prefix + ">> find_svn_ancestors: Start: (%s) start_path: %s stop_base_path: %s", - svn_repos_url, start_path+"@"+str(start_rev), stop_base_path, level=ui.DEBUG, color='YELLOW') - done = False - no_ancestry = False - cur_path = start_path - cur_rev = start_rev - first_iter_done = False - ancestors = [] - while not done: - # Get the first "svn log" entry for cur_path (relative to @cur_rev) - ui.status(prefix + ">> find_svn_ancestors: %s", svn_repos_url+cur_path+"@"+str(cur_rev), level=ui.DEBUG, color='YELLOW') - log_entry = svnclient.get_first_svn_log_entry(svn_repos_url+cur_path, 1, cur_rev) - if not log_entry: - ui.status(prefix + ">> find_svn_ancestors: Done: no log_entry", level=ui.DEBUG, color='YELLOW') - done = True - break - # If we found a copy-from case which matches our stop_base_path, we're done. - # ...but only if we've at least tried to search for the first copy-from path. - if stop_base_path is not None and first_iter_done and is_child_path(cur_path, stop_base_path): - ui.status(prefix + ">> find_svn_ancestors: Done: Found is_child_path(cur_path, stop_base_path) and first_iter_done=True", level=ui.DEBUG, color='YELLOW') - done = True - break - first_iter_done = True - # Search for any actions on our target path (or parent paths). - changed_paths_temp = [] - for d in log_entry['changed_paths']: - path = d['path'] - if is_child_path(cur_path, path): - changed_paths_temp.append({'path': path, 'data': d}) - if not changed_paths_temp: - # If no matches, then we've hit the end of the ancestry-chain. - ui.status(prefix + ">> find_svn_ancestors: Done: No matching changed_paths", level=ui.DEBUG, color='YELLOW') - done = True - continue - # Reverse-sort any matches, so that we start with the most-granular (deepest in the tree) path. - changed_paths = sorted(changed_paths_temp, key=operator.itemgetter('path'), reverse=True) - # Find the action for our cur_path in this revision. Use a loop to check in reverse order, - # so that if the target file/folder is "M" but has a parent folder with an "A" copy-from - # then we still correctly match the deepest copy-from. - for v in changed_paths: - d = v['data'] - path = d['path'] - # Check action-type for this file - action = d['action'] - if action not in _valid_svn_actions: - raise UnsupportedSVNAction("In SVN rev. %d: action '%s' not supported. Please report a bug!" - % (log_entry['revision'], action)) - ui.status(prefix + "> %s %s%s", action, path, - (" (from %s)" % (d['copyfrom_path']+"@"+str(d['copyfrom_revision']))) if d['copyfrom_path'] else "", - level=ui.DEBUG, color='YELLOW') - if action == 'D': - # If file/folder was deleted, ancestry-chain stops here - if stop_base_path: - no_ancestry = True - ui.status(prefix + ">> find_svn_ancestors: Done: deleted", level=ui.DEBUG, color='YELLOW') - done = True - break - if action in 'RA': - # If file/folder was added/replaced but not a copy, ancestry-chain stops here - if not d['copyfrom_path']: - if stop_base_path: - no_ancestry = True - ui.status(prefix + ">> find_svn_ancestors: Done: %s with no copyfrom_path", - "Added" if action == "A" else "Replaced", - level=ui.DEBUG, color='YELLOW') - done = True - break - # Else, file/folder was added/replaced and is a copy, so add an entry to our ancestors list - # and keep checking for ancestors - ui.status(prefix + ">> find_svn_ancestors: Found copy-from (action=%s): %s --> %s", - action, path, d['copyfrom_path']+"@"+str(d['copyfrom_revision']), - level=ui.DEBUG, color='YELLOW') - ancestors.append({'path': cur_path, 'revision': log_entry['revision'], - 'copyfrom_path': cur_path.replace(d['path'], d['copyfrom_path']), 'copyfrom_rev': d['copyfrom_revision']}) - cur_path = cur_path.replace(d['path'], d['copyfrom_path']) - cur_rev = d['copyfrom_revision'] - # Follow the copy and keep on searching - break - if stop_base_path and no_ancestry: - # If we're tracing back ancestry to a specific target stop_base_path and - # the ancestry-chain stopped before we reached stop_base_path, then return - # nothing since there is no ancestry chaining back to that target. - ancestors = [] - if ancestors: - if ui.get_level() >= ui.DEBUG: - max_len = 0 - for idx in range(len(ancestors)): - d = ancestors[idx] - max_len = max(max_len, len(d['path']+"@"+str(d['revision']))) - ui.status(prefix + ">> find_svn_ancestors: Found parent ancestors:", level=ui.DEBUG, color='YELLOW_B') - for idx in range(len(ancestors)): - d = ancestors[idx] - ui.status(prefix + " [%s] %s --> %s", idx, - str(d['path']+"@"+str(d['revision'])).ljust(max_len), - str(d['copyfrom_path']+"@"+str(d['copyfrom_rev'])), - level=ui.DEBUG, color='YELLOW') - else: - ui.status(prefix + ">> find_svn_ancestors: No ancestor-chain found: %s", - svn_repos_url+start_path+"@"+str(start_rev), level=ui.DEBUG, color='YELLOW') - return ancestors - def get_rev_map(source_rev, prefix): """ Find the equivalent rev # in the target repo for the given rev # from the source repo. @@ -782,7 +630,7 @@ def process_svn_log_entry(log_entry, ancestors, commit_paths, prefix = ""): path_offset = path[len(source_base):].strip("/") # Get the action for this path action = d['action'] - if action not in _valid_svn_actions: + if action not in svnclient.valid_svn_actions: raise UnsupportedSVNAction("In SVN rev. %d: action '%s' not supported. Please report a bug!" % (source_rev, action)) ui.status(" %s %s%s", action, d['path'], diff --git a/svn2svn/svnclient.py b/svn2svn/svnclient.py index ba5c649..09d27a2 100644 --- a/svn2svn/svnclient.py +++ b/svn2svn/svnclient.py @@ -25,6 +25,7 @@ _forbidden_xml_chars = "".join( set(map(chr, range(32))) - set('\x09\x0A\x0D') ) +valid_svn_actions = "MARD" # The list of known SVN action abbr's, from "svn log" def _strip_forbidden_xml_chars(xml_string): """ diff --git a/svnancest.py b/svnancest.py new file mode 100755 index 0000000..3a679f9 --- /dev/null +++ b/svnancest.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python +from svn2svn.run.svnancest import main + +main() -- 2.45.2