1 """ SVN client functions """
4 from svn2svn
.shell
import run_svn
5 from svn2svn
.errors
import EmptySVNLog
13 from xml
.etree
import cElementTree
as ET
16 from xml
.etree
import ElementTree
as ET
19 import cElementTree
as ET
21 from elementtree
import ElementTree
as ET
23 svn_log_args
= ['log', '--xml']
24 svn_info_args
= ['info', '--xml']
25 svn_checkout_args
= ['checkout', '-q']
26 svn_status_args
= ['status', '--xml', '-v', '--ignore-externals']
28 _identity_table
= "".join(map(chr, range(256)))
29 _forbidden_xml_chars
= "".join(
30 set(map(chr, range(32))) - set('\x09\x0A\x0D')
34 def strip_forbidden_xml_chars(xml_string
):
36 Given an XML string, strips forbidden characters as per the XML spec.
37 (these are all control characters except 0x9, 0xA and 0xD).
39 return xml_string
.translate(_identity_table
, _forbidden_xml_chars
)
42 def svn_date_to_timestamp(svn_date
):
44 Parse an SVN date as read from the XML output and return the corresponding
47 # Strip microseconds and timezone (always UTC, hopefully)
48 # XXX there are various ISO datetime parsing routines out there,
49 # cf. http://seehuhn.de/comp/pdate
50 date
= svn_date
.split('.', 2)[0]
51 time_tuple
= time
.strptime(date
, "%Y-%m-%dT%H:%M:%S")
52 return calendar
.timegm(time_tuple
)
54 def parse_svn_info_xml(xml_string
):
56 Parse the XML output from an "svn info" command and extract useful information
60 xml_string
= strip_forbidden_xml_chars(xml_string
)
61 tree
= ET
.fromstring(xml_string
)
62 entry
= tree
.find('.//entry')
63 d
['url'] = entry
.find('url').text
64 d
['kind'] = entry
.get('kind')
65 d
['revision'] = int(entry
.get('revision'))
66 d
['repos_url'] = tree
.find('.//repository/root').text
67 d
['repos_uuid'] = tree
.find('.//repository/uuid').text
68 d
['last_changed_rev'] = int(tree
.find('.//commit').get('revision'))
69 author_element
= tree
.find('.//commit/author')
70 if author_element
is not None:
71 d
['last_changed_author'] = author_element
.text
72 d
['last_changed_date'] = svn_date_to_timestamp(tree
.find('.//commit/date').text
)
75 def parse_svn_log_xml(xml_string
):
77 Parse the XML output from an "svn log" command and extract useful information
78 as a list of dicts (one per log changeset).
81 xml_string
= strip_forbidden_xml_chars(xml_string
)
82 tree
= ET
.fromstring(xml_string
)
83 for entry
in tree
.findall('logentry'):
85 d
['revision'] = int(entry
.get('revision'))
86 # Some revisions don't have authors, most notably the first revision
88 # logentry nodes targeting directories protected by path-based
89 # authentication have no child nodes at all. We return an entry
90 # in that case. Anyway, as it has no path entries, no further
91 # processing will be made.
92 author
= entry
.find('author')
93 date
= entry
.find('date')
94 msg
= entry
.find('msg')
95 d
['author'] = author
is not None and author
.text
or "No author"
97 d
['date'] = svn_date_to_timestamp(date
.text
)
100 d
['message'] = msg
is not None and msg
.text
.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
102 for path
in entry
.findall('.//paths/path'):
103 copyfrom_rev
= path
.get('copyfrom-rev')
105 copyfrom_rev
= int(copyfrom_rev
)
108 'kind': path
.get('kind'),
109 'action': path
.get('action'),
110 'copyfrom_path': path
.get('copyfrom-path'),
111 'copyfrom_revision': copyfrom_rev
,
113 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
114 # can process actions in depth-first order.
115 d
['changed_paths'] = sorted(paths
, key
=itemgetter('path'))
117 for prop
in entry
.findall('.//revprops/property'):
118 revprops
.append({ 'name': prop.get('name'), 'value': prop.text }
)
119 d
['revprops'] = revprops
123 def parse_svn_status_xml(xml_string
, base_dir
=None, ignore_externals
=False):
125 Parse the XML output from an "svn status" command and extract useful info
126 as a list of dicts (one per status entry).
129 base_dir
= os
.path
.normcase(base_dir
)
131 xml_string
= strip_forbidden_xml_chars(xml_string
)
132 tree
= ET
.fromstring(xml_string
)
133 for entry
in tree
.findall('.//entry'):
135 path
= entry
.get('path')
136 if base_dir
is not None:
137 assert os
.path
.normcase(path
).startswith(base_dir
)
138 path
= path
[len(base_dir
):].lstrip('/\\')
140 wc_status
= entry
.find('wc-status')
141 if wc_status
.get('item') == 'external':
144 status
= wc_status
.get('item')
145 revision
= wc_status
.get('revision')
146 if status
== 'external':
147 d
['type'] = 'external'
148 elif revision
is not None:
151 d
['type'] = 'unversioned'
153 d
['revision'] = revision
154 d
['props'] = wc_status
.get('props')
155 d
['copied'] = wc_status
.get('copied')
159 def get_svn_info(svn_url_or_wc
, rev_number
=None):
161 Get SVN information for the given URL or working copy, with an optionally
162 specified revision number.
163 Returns a dict as created by parse_svn_info_xml().
165 if rev_number
is not None:
166 args
= ["-r", rev_number
, svn_url_or_wc
+"@"+str(rev_number
)]
168 args
= [svn_url_or_wc
]
169 xml_string
= run_svn(svn_info_args
+ args
, fail_if_stderr
=True)
170 return parse_svn_info_xml(xml_string
)
172 def svn_checkout(svn_url
, checkout_dir
, rev_number
=None):
174 Checkout the given URL at an optional revision number.
177 if rev_number
is not None:
178 args
+= ['-r', rev_number
]
179 args
+= [svn_url
, checkout_dir
]
180 return run_svn(svn_checkout_args
+ args
)
182 def run_svn_log(svn_url_or_wc
, rev_start
, rev_end
, limit
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
184 Fetch up to 'limit' SVN log entries between the given revisions.
188 args
+= ['--stop-on-copy']
189 if get_changed_paths
:
192 args
+= ['--with-all-revprops']
193 url
= str(svn_url_or_wc
)
194 if rev_start
!= 'HEAD' and rev_end
!= 'HEAD':
195 args
+= ['-r', '%s:%s' % (rev_start
, rev_end
)]
196 if not "@" in svn_url_or_wc
:
197 url
= "%s@%s" % (svn_url_or_wc
, str(max(rev_start
, rev_end
)))
198 args
+= ['--limit', str(limit
), url
]
199 xml_string
= run_svn(svn_log_args
+ args
)
200 return parse_svn_log_xml(xml_string
)
202 def get_svn_status(svn_wc
, quiet
=False, no_recursive
=False):
204 Get SVN status information about the given working copy.
206 # Ensure proper stripping by canonicalizing the path
207 svn_wc
= os
.path
.abspath(svn_wc
)
215 xml_string
= run_svn(svn_status_args
+ args
)
216 return parse_svn_status_xml(xml_string
, svn_wc
, ignore_externals
=True)
218 def get_svn_versioned_files(svn_wc
):
220 Get the list of versioned files in the SVN working copy.
223 for e
in get_svn_status(svn_wc
):
224 if e
['path'] and e
['type'] == 'normal':
225 contents
.append(e
['path'])
228 def get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
230 Get the first SVN log entry in the requested revision range.
232 entries
= run_svn_log(svn_url
, rev_start
, rev_end
, 1, stop_on_copy
, get_changed_paths
, get_revprops
)
235 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
236 (svn_url
, rev_start
, rev_end
))
238 def get_first_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True):
240 Get the first log entry after (or at) the given revision number in an SVN branch.
241 By default the revision number is set to 0, which will give you the log
242 entry corresponding to the branch creaction.
244 NOTE: to know whether the branch creation corresponds to an SVN import or
245 a copy from another branch, inspect elements of the 'changed_paths' entry
246 in the returned dictionary.
248 return get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=True, get_changed_paths
=True)
250 def get_last_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True):
252 Get the last log entry before/at the given revision number in an SVN branch.
253 By default the revision number is set to HEAD, which will give you the log
254 entry corresponding to the latest commit in branch.
256 return get_one_svn_log_entry(svn_url
, rev_end
, rev_start
, stop_on_copy
=True, get_changed_paths
=True)
259 log_duration_threshold
= 10.0
260 log_min_chunk_length
= 10
262 def iter_svn_log_entries(svn_url
, first_rev
, last_rev
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
264 Iterate over SVN log entries between first_rev and last_rev.
266 This function features chunked log fetching so that it isn't too nasty
267 to the SVN server if many entries are requested.
270 chunk_length
= log_min_chunk_length
272 while last_rev
== "HEAD" or cur_rev
<= last_rev
:
273 start_t
= time
.time()
274 stop_rev
= min(last_rev
, cur_rev
+ chunk_length
)
275 entries
= run_svn_log(svn_url
, cur_rev
, "HEAD", chunk_length
,
276 stop_on_copy
, get_changed_paths
, get_revprops
)
277 duration
= time
.time() - start_t
279 # skip first revision on subsequent runs, as it is overlapped
285 if e
['revision'] > last_rev
:
288 if e
['revision'] >= last_rev
:
290 cur_rev
= e
['revision']
291 # Adapt chunk length based on measured request duration
292 if duration
< log_duration_threshold
:
293 chunk_length
= int(chunk_length
* 2.0)
294 elif duration
> log_duration_threshold
* 2:
295 chunk_length
= max(log_min_chunk_length
, int(chunk_length
/ 2.0))
298 _svn_client_version
= None
300 def get_svn_client_version():
301 """Returns the SVN client version as a tuple.
303 The returned tuple only contains numbers, non-digits in version string are
306 global _svn_client_version
307 if _svn_client_version
is None:
308 raw
= run_svn(['--version', '-q']).strip()
309 _svn_client_version
= tuple(map(int, [x
for x
in raw
.split('.')
311 return _svn_client_version