1 """ SVN client functions """ 
   4 from shell 
import run_svn
 
   5 from errors 
import EmptySVNLog
 
  11 from operator 
import itemgetter
 
  14     from xml
.etree 
import cElementTree 
as ET
 
  17         from xml
.etree 
import ElementTree 
as ET
 
  20             import cElementTree 
as ET
 
  22             from elementtree 
import ElementTree 
as ET
 
  24 svn_log_args 
= ['log', '--xml'] 
  25 svn_info_args 
= ['info', '--xml'] 
  26 svn_checkout_args 
= ['checkout', '-q'] 
  27 svn_status_args 
= ['status', '--xml', '-v', '--ignore-externals'] 
  29 _identity_table 
= "".join(map(chr, range(256))) 
  30 _forbidden_xml_chars 
= "".join( 
  31     set(map(chr, range(32))) - set('\x09\x0A\x0D') 
  35 def strip_forbidden_xml_chars(xml_string
): 
  37     Given an XML string, strips forbidden characters as per the XML spec. 
  38     (these are all control characters except 0x9, 0xA and 0xD). 
  40     return xml_string
.translate(_identity_table
, _forbidden_xml_chars
) 
  43 def svn_date_to_timestamp(svn_date
): 
  45     Parse an SVN date as read from the XML output and return the corresponding 
  48     # Strip microseconds and timezone (always UTC, hopefully) 
  49     # XXX there are various ISO datetime parsing routines out there, 
  50     # cf. http://seehuhn.de/comp/pdate 
  51     date 
= svn_date
.split('.', 2)[0] 
  52     time_tuple 
= time
.strptime(date
, "%Y-%m-%dT%H:%M:%S") 
  53     return calendar
.timegm(time_tuple
) 
  55 def parse_svn_info_xml(xml_string
): 
  57     Parse the XML output from an "svn info" command and extract useful information 
  61     xml_string 
= strip_forbidden_xml_chars(xml_string
) 
  62     tree 
= ET
.fromstring(xml_string
) 
  63     entry 
= tree
.find('.//entry') 
  64     d
['url'] = entry
.find('url').text
 
  65     d
['kind'] = entry
.get('kind') 
  66     d
['revision'] = int(entry
.get('revision')) 
  67     d
['repos_url'] = tree
.find('.//repository/root').text
 
  68     d
['repos_uuid'] = tree
.find('.//repository/uuid').text
 
  69     d
['last_changed_rev'] = int(tree
.find('.//commit').get('revision')) 
  70     author_element 
= tree
.find('.//commit/author') 
  71     if author_element 
is not None: 
  72         d
['last_changed_author'] = author_element
.text
 
  73     d
['last_changed_date'] = svn_date_to_timestamp(tree
.find('.//commit/date').text
) 
  76 def parse_svn_log_xml(xml_string
): 
  78     Parse the XML output from an "svn log" command and extract useful information 
  79     as a list of dicts (one per log changeset). 
  82     xml_string 
= strip_forbidden_xml_chars(xml_string
) 
  83     tree 
= ET
.fromstring(xml_string
) 
  84     for entry 
in tree
.findall('logentry'): 
  86         d
['revision'] = int(entry
.get('revision')) 
  87         # Some revisions don't have authors, most notably the first revision 
  89         # logentry nodes targeting directories protected by path-based 
  90         # authentication have no child nodes at all. We return an entry 
  91         # in that case. Anyway, as it has no path entries, no further 
  92         # processing will be made. 
  93         author 
= entry
.find('author') 
  94         date 
= entry
.find('date') 
  95         msg 
= entry
.find('msg') 
  96         d
['author'] = author 
is not None and author
.text 
or "No author" 
  98             d
['date'] = svn_date_to_timestamp(date
.text
) 
 101         d
['message'] = msg 
is not None and msg
.text
.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or "" 
 103         for path 
in entry
.findall('.//paths/path'): 
 104             copyfrom_rev 
= path
.get('copyfrom-rev') 
 106                 copyfrom_rev 
= int(copyfrom_rev
) 
 109                 'kind': path
.get('kind'), 
 110                 'action': path
.get('action'), 
 111                 'copyfrom_path': path
.get('copyfrom-path'), 
 112                 'copyfrom_revision': copyfrom_rev
, 
 114         # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry() 
 115         # can process actions in depth-first order. 
 116         d
['changed_paths'] = sorted(paths
, key
=itemgetter('path')) 
 118         for prop 
in entry
.findall('.//revprops/property'): 
 119             revprops
.append({ 'name': prop.get('name'), 'value': prop.text }
) 
 120         d
['revprops'] = revprops
 
 124 def parse_svn_status_xml(xml_string
, base_dir
=None, ignore_externals
=False): 
 126     Parse the XML output from an "svn status" command and extract useful info 
 127     as a list of dicts (one per status entry). 
 130         base_dir 
= os
.path
.normcase(base_dir
) 
 132     xml_string 
= strip_forbidden_xml_chars(xml_string
) 
 133     tree 
= ET
.fromstring(xml_string
) 
 134     for entry 
in tree
.findall('.//entry'): 
 136         path 
= entry
.get('path') 
 137         if base_dir 
is not None: 
 138             assert os
.path
.normcase(path
).startswith(base_dir
) 
 139             path 
= path
[len(base_dir
):].lstrip('/\\') 
 141         wc_status 
= entry
.find('wc-status') 
 142         if wc_status
.get('item') == 'external': 
 145         status 
=   wc_status
.get('item') 
 146         revision 
= wc_status
.get('revision') 
 147         if status 
== 'external': 
 148             d
['type'] = 'external' 
 149         elif revision 
is not None: 
 152             d
['type'] = 'unversioned' 
 154         d
['revision'] = revision
 
 155         d
['props'] =    wc_status
.get('props') 
 156         d
['copied'] =   wc_status
.get('copied') 
 160 def get_svn_info(svn_url_or_wc
, rev_number
=None): 
 162     Get SVN information for the given URL or working copy, with an optionally 
 163     specified revision number. 
 164     Returns a dict as created by parse_svn_info_xml(). 
 166     if rev_number 
is not None: 
 167         args 
= ["-r", rev_number
, svn_url_or_wc
+"@"+str(rev_number
)] 
 169         args 
= [svn_url_or_wc
] 
 170     xml_string 
= run_svn(svn_info_args 
+ args
, fail_if_stderr
=True) 
 171     return parse_svn_info_xml(xml_string
) 
 173 def svn_checkout(svn_url
, checkout_dir
, rev_number
=None): 
 175     Checkout the given URL at an optional revision number. 
 178     if rev_number 
is not None: 
 179         args 
+= ['-r', rev_number
] 
 180     args 
+= [svn_url
, checkout_dir
] 
 181     return run_svn(svn_checkout_args 
+ args
) 
 183 def run_svn_log(svn_url_or_wc
, rev_start
, rev_end
, limit
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False): 
 185     Fetch up to 'limit' SVN log entries between the given revisions. 
 189         args 
+= ['--stop-on-copy'] 
 190     if get_changed_paths
: 
 193         args 
+= ['--with-all-revprops'] 
 194     url 
= str(svn_url_or_wc
) 
 195     if rev_start 
!= 'HEAD' and rev_end 
!= 'HEAD': 
 196         args 
+= ['-r', '%s:%s' % (rev_start
, rev_end
)] 
 197         if not "@" in svn_url_or_wc
: 
 198             url 
= "%s@%s" % (svn_url_or_wc
, str(max(rev_start
, rev_end
))) 
 199     args 
+= ['--limit', str(limit
), url
] 
 200     xml_string 
= run_svn(svn_log_args 
+ args
) 
 201     return parse_svn_log_xml(xml_string
) 
 203 def get_svn_status(svn_wc
, quiet
=False, no_recursive
=False): 
 205     Get SVN status information about the given working copy. 
 207     # Ensure proper stripping by canonicalizing the path 
 208     svn_wc 
= os
.path
.abspath(svn_wc
) 
 216     xml_string 
= run_svn(svn_status_args 
+ args
) 
 217     return parse_svn_status_xml(xml_string
, svn_wc
, ignore_externals
=True) 
 219 def get_svn_versioned_files(svn_wc
): 
 221     Get the list of versioned files in the SVN working copy. 
 224     for e 
in get_svn_status(svn_wc
): 
 225         if e
['path'] and e
['type'] == 'normal': 
 226             contents
.append(e
['path']) 
 229 def get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False): 
 231     Get the first SVN log entry in the requested revision range. 
 233     entries 
= run_svn_log(svn_url
, rev_start
, rev_end
, 1, stop_on_copy
, get_changed_paths
, get_revprops
) 
 236     raise EmptySVNLog("No SVN log for %s between revisions %s and %s" % 
 237         (svn_url
, rev_start
, rev_end
)) 
 239 def get_first_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True): 
 241     Get the first log entry after (or at) the given revision number in an SVN branch. 
 242     By default the revision number is set to 0, which will give you the log 
 243     entry corresponding to the branch creaction. 
 245     NOTE: to know whether the branch creation corresponds to an SVN import or 
 246     a copy from another branch, inspect elements of the 'changed_paths' entry 
 247     in the returned dictionary. 
 249     return get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=True, get_changed_paths
=True) 
 251 def get_last_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True): 
 253     Get the last log entry before/at the given revision number in an SVN branch. 
 254     By default the revision number is set to HEAD, which will give you the log 
 255     entry corresponding to the latest commit in branch. 
 257     return get_one_svn_log_entry(svn_url
, rev_end
, rev_start
, stop_on_copy
=True, get_changed_paths
=True) 
 260 log_duration_threshold 
= 10.0 
 261 log_min_chunk_length 
= 10 
 263 def iter_svn_log_entries(svn_url
, first_rev
, last_rev
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False): 
 265     Iterate over SVN log entries between first_rev and last_rev. 
 267     This function features chunked log fetching so that it isn't too nasty 
 268     to the SVN server if many entries are requested. 
 271     chunk_length 
= log_min_chunk_length
 
 273     while last_rev 
== "HEAD" or cur_rev 
<= last_rev
: 
 274         start_t 
= time
.time() 
 275         stop_rev 
= min(last_rev
, cur_rev 
+ chunk_length
) 
 276         entries 
= run_svn_log(svn_url
, cur_rev
, "HEAD", chunk_length
, 
 277                               stop_on_copy
, get_changed_paths
, get_revprops
) 
 278         duration 
= time
.time() - start_t
 
 280             # skip first revision on subsequent runs, as it is overlapped 
 286             if e
['revision'] > last_rev
: 
 289         if e
['revision'] >= last_rev
: 
 291         cur_rev 
= e
['revision'] 
 292         # Adapt chunk length based on measured request duration 
 293         if duration 
< log_duration_threshold
: 
 294             chunk_length 
= int(chunk_length 
* 2.0) 
 295         elif duration 
> log_duration_threshold 
* 2: 
 296             chunk_length 
= max(log_min_chunk_length
, int(chunk_length 
/ 2.0)) 
 299 _svn_client_version 
= None 
 301 def get_svn_client_version(): 
 302     """Returns the SVN client version as a tuple. 
 304     The returned tuple only contains numbers, non-digits in version string are 
 307     global _svn_client_version
 
 308     if _svn_client_version 
is None: 
 309         raw 
= run_svn(['--version', '-q']).strip() 
 310         _svn_client_version 
= tuple(map(int, [x 
for x 
in raw
.split('.') 
 312     return _svn_client_version