1 """ SVN client functions """
3 from shell
import run_svn
4 from errors
import EmptySVNLog
10 from operator
import itemgetter
13 from xml
.etree
import cElementTree
as ET
16 from xml
.etree
import ElementTree
as ET
19 import cElementTree
as ET
21 from elementtree
import ElementTree
as ET
23 svn_log_args
= ['log', '--xml']
24 svn_info_args
= ['info', '--xml']
25 svn_checkout_args
= ['checkout', '-q']
26 svn_status_args
= ['status', '--xml', '--ignore-externals']
28 _identity_table
= "".join(map(chr, range(256)))
29 _forbidden_xml_chars
= "".join(
30 set(map(chr, range(32))) - set('\x09\x0A\x0D')
34 def strip_forbidden_xml_chars(xml_string
):
36 Given an XML string, strips forbidden characters as per the XML spec.
37 (these are all control characters except 0x9, 0xA and 0xD).
39 return xml_string
.translate(_identity_table
, _forbidden_xml_chars
)
42 def svn_date_to_timestamp(svn_date
):
44 Parse an SVN date as read from the XML output and return the corresponding
47 # Strip microseconds and timezone (always UTC, hopefully)
48 # XXX there are various ISO datetime parsing routines out there,
49 # cf. http://seehuhn.de/comp/pdate
50 date
= svn_date
.split('.', 2)[0]
51 time_tuple
= time
.strptime(date
, "%Y-%m-%dT%H:%M:%S")
52 return calendar
.timegm(time_tuple
)
54 def parse_svn_info_xml(xml_string
):
56 Parse the XML output from an "svn info" command and extract useful information
60 xml_string
= strip_forbidden_xml_chars(xml_string
)
61 tree
= ET
.fromstring(xml_string
)
62 entry
= tree
.find('.//entry')
63 d
['url'] = entry
.find('url').text
64 d
['kind'] = entry
.get('kind')
65 d
['revision'] = int(entry
.get('revision'))
66 d
['repos_url'] = tree
.find('.//repository/root').text
67 d
['repos_uuid'] = tree
.find('.//repository/uuid').text
68 d
['last_changed_rev'] = int(tree
.find('.//commit').get('revision'))
69 author_element
= tree
.find('.//commit/author')
70 if author_element
is not None:
71 d
['last_changed_author'] = author_element
.text
72 d
['last_changed_date'] = svn_date_to_timestamp(tree
.find('.//commit/date').text
)
75 def parse_svn_log_xml(xml_string
):
77 Parse the XML output from an "svn log" command and extract useful information
78 as a list of dicts (one per log changeset).
81 xml_string
= strip_forbidden_xml_chars(xml_string
)
82 tree
= ET
.fromstring(xml_string
)
83 for entry
in tree
.findall('logentry'):
85 d
['revision'] = int(entry
.get('revision'))
86 # Some revisions don't have authors, most notably the first revision
88 # logentry nodes targeting directories protected by path-based
89 # authentication have no child nodes at all. We return an entry
90 # in that case. Anyway, as it has no path entries, no further
91 # processing will be made.
92 author
= entry
.find('author')
93 date
= entry
.find('date')
94 msg
= entry
.find('msg')
95 d
['author'] = author
is not None and author
.text
or "No author"
97 d
['date'] = svn_date_to_timestamp(date
.text
)
100 d
['message'] = msg
is not None and msg
.text
.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
102 for path
in entry
.findall('.//paths/path'):
103 copyfrom_rev
= path
.get('copyfrom-rev')
105 copyfrom_rev
= int(copyfrom_rev
)
108 'kind': path
.get('kind'),
109 'action': path
.get('action'),
110 'copyfrom_path': path
.get('copyfrom-path'),
111 'copyfrom_revision': copyfrom_rev
,
113 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
114 # can process actions in depth-first order.
115 d
['changed_paths'] = sorted(paths
, key
=itemgetter('path'))
117 for prop
in entry
.findall('.//revprops/property'):
118 revprops
.append({ 'name': prop.get('name'), 'value': prop.text }
)
119 d
['revprops'] = revprops
123 def parse_svn_status_xml(xml_string
, base_dir
=None, ignore_externals
=False):
125 Parse the XML output from an "svn status" command and extract useful info
126 as a list of dicts (one per status entry).
129 base_dir
= os
.path
.normcase(base_dir
)
131 xml_string
= strip_forbidden_xml_chars(xml_string
)
132 tree
= ET
.fromstring(xml_string
)
133 for entry
in tree
.findall('.//entry'):
135 path
= entry
.get('path')
136 if base_dir
is not None:
137 assert os
.path
.normcase(path
).startswith(base_dir
)
138 path
= path
[len(base_dir
):].lstrip('/\\')
140 wc_status
= entry
.find('wc-status')
141 if wc_status
.get('item') == 'external':
144 status
= wc_status
.get('item')
145 revision
= wc_status
.get('revision')
146 if status
== 'external':
147 d
['type'] = 'external'
148 elif revision
is not None:
151 d
['type'] = 'unversioned'
153 d
['revision'] = revision
154 d
['props'] = wc_status
.get('props')
155 d
['copied'] = wc_status
.get('copied')
159 def get_svn_info(svn_url_or_wc
, rev_number
=None):
161 Get SVN information for the given URL or working copy, with an optionally
162 specified revision number.
163 Returns a dict as created by parse_svn_info_xml().
165 if rev_number
is not None:
166 args
= ["-r", rev_number
, svn_url_or_wc
+"@"+str(rev_number
)]
168 args
= [svn_url_or_wc
]
169 xml_string
= run_svn(svn_info_args
+ args
, fail_if_stderr
=True)
170 return parse_svn_info_xml(xml_string
)
172 def svn_checkout(svn_url
, checkout_dir
, rev_number
=None):
174 Checkout the given URL at an optional revision number.
177 if rev_number
is not None:
178 args
+= ['-r', rev_number
]
179 args
+= [svn_url
, checkout_dir
]
180 return run_svn(svn_checkout_args
+ args
)
182 def run_svn_log(svn_url_or_wc
, rev_start
, rev_end
, limit
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
184 Fetch up to 'limit' SVN log entries between the given revisions.
188 args
+= ['--stop-on-copy']
189 if get_changed_paths
:
192 args
+= ['--with-all-revprops']
193 url
= str(svn_url_or_wc
)
194 args
+= ['-r', '%s:%s' % (rev_start
, rev_end
)]
195 if not "@" in svn_url_or_wc
:
196 url
= "%s@%s" % (svn_url_or_wc
, str(max(rev_start
, rev_end
)))
197 args
+= ['--limit', str(limit
), url
]
198 xml_string
= run_svn(svn_log_args
+ args
)
199 return parse_svn_log_xml(xml_string
)
201 def get_svn_status(svn_wc
, quiet
=False, no_recursive
=False):
203 Get SVN status information about the given working copy.
205 # Ensure proper stripping by canonicalizing the path
206 svn_wc
= os
.path
.abspath(svn_wc
)
214 xml_string
= run_svn(svn_status_args
+ args
+ [svn_wc
])
215 return parse_svn_status_xml(xml_string
, svn_wc
, ignore_externals
=True)
217 def get_svn_versioned_files(svn_wc
):
219 Get the list of versioned files in the SVN working copy.
222 for e
in get_svn_status(svn_wc
):
223 if e
['path'] and e
['type'] == 'normal':
224 contents
.append(e
['path'])
227 def get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
229 Get the first SVN log entry in the requested revision range.
231 entries
= run_svn_log(svn_url
, rev_start
, rev_end
, 1, stop_on_copy
, get_changed_paths
, get_revprops
)
234 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
235 (svn_url
, rev_start
, rev_end
))
237 def get_first_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True):
239 Get the first log entry after (or at) the given revision number in an SVN branch.
240 By default the revision number is set to 0, which will give you the log
241 entry corresponding to the branch creaction.
243 NOTE: to know whether the branch creation corresponds to an SVN import or
244 a copy from another branch, inspect elements of the 'changed_paths' entry
245 in the returned dictionary.
247 return get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=True, get_changed_paths
=True)
249 def get_last_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True):
251 Get the last log entry before/at the given revision number in an SVN branch.
252 By default the revision number is set to HEAD, which will give you the log
253 entry corresponding to the latest commit in branch.
255 return get_one_svn_log_entry(svn_url
, rev_end
, rev_start
, stop_on_copy
=True, get_changed_paths
=True)
258 log_duration_threshold
= 10.0
259 log_min_chunk_length
= 10
261 def iter_svn_log_entries(svn_url
, first_rev
, last_rev
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
263 Iterate over SVN log entries between first_rev and last_rev.
265 This function features chunked log fetching so that it isn't too nasty
266 to the SVN server if many entries are requested.
269 chunk_length
= log_min_chunk_length
271 while last_rev
== "HEAD" or cur_rev
<= last_rev
:
272 start_t
= time
.time()
273 stop_rev
= min(last_rev
, cur_rev
+ chunk_length
)
274 entries
= run_svn_log(svn_url
, cur_rev
, "HEAD", chunk_length
,
275 stop_on_copy
, get_changed_paths
, get_revprops
)
276 duration
= time
.time() - start_t
278 # skip first revision on subsequent runs, as it is overlapped
284 if e
['revision'] > last_rev
:
287 if e
['revision'] >= last_rev
:
289 cur_rev
= e
['revision']
290 # Adapt chunk length based on measured request duration
291 if duration
< log_duration_threshold
:
292 chunk_length
= int(chunk_length
* 2.0)
293 elif duration
> log_duration_threshold
* 2:
294 chunk_length
= max(log_min_chunk_length
, int(chunk_length
/ 2.0))
297 _svn_client_version
= None
299 def get_svn_client_version():
300 """Returns the SVN client version as a tuple.
302 The returned tuple only contains numbers, non-digits in version string are
305 global _svn_client_version
306 if _svn_client_version
is None:
307 raw
= run_svn(['--version', '-q']).strip()
308 _svn_client_version
= tuple(map(int, [x
for x
in raw
.split('.')
310 return _svn_client_version