1 """ SVN client functions """
3 from shell
import run_svn
4 from errors
import EmptySVNLog
12 from xml
.etree
import cElementTree
as ET
15 from xml
.etree
import ElementTree
as ET
18 import cElementTree
as ET
20 from elementtree
import ElementTree
as ET
22 _identity_table
= "".join(map(chr, range(256)))
23 _forbidden_xml_chars
= "".join(
24 set(map(chr, range(32))) - set('\x09\x0A\x0D')
28 def strip_forbidden_xml_chars(xml_string
):
30 Given an XML string, strips forbidden characters as per the XML spec.
31 (these are all control characters except 0x9, 0xA and 0xD).
33 return xml_string
.translate(_identity_table
, _forbidden_xml_chars
)
36 def svn_date_to_timestamp(svn_date
):
38 Parse an SVN date as read from the XML output and return the corresponding
41 # Strip microseconds and timezone (always UTC, hopefully)
42 # XXX there are various ISO datetime parsing routines out there,
43 # cf. http://seehuhn.de/comp/pdate
44 date
= svn_date
.split('.', 2)[0]
45 time_tuple
= time
.strptime(date
, "%Y-%m-%dT%H:%M:%S")
46 return calendar
.timegm(time_tuple
)
48 def parse_svn_info_xml(xml_string
):
50 Parse the XML output from an "svn info" command and extract useful information
54 xml_string
= strip_forbidden_xml_chars(xml_string
)
55 tree
= ET
.fromstring(xml_string
)
56 entry
= tree
.find('.//entry')
57 d
['url'] = entry
.find('url').text
58 d
['kind'] = entry
.get('kind')
59 d
['revision'] = int(entry
.get('revision'))
60 d
['repos_url'] = tree
.find('.//repository/root').text
61 d
['repos_uuid'] = tree
.find('.//repository/uuid').text
62 d
['last_changed_rev'] = int(tree
.find('.//commit').get('revision'))
63 author_element
= tree
.find('.//commit/author')
64 if author_element
is not None:
65 d
['last_changed_author'] = author_element
.text
66 d
['last_changed_date'] = svn_date_to_timestamp(tree
.find('.//commit/date').text
)
69 def parse_svn_log_xml(xml_string
):
71 Parse the XML output from an "svn log" command and extract useful information
72 as a list of dicts (one per log changeset).
75 xml_string
= strip_forbidden_xml_chars(xml_string
)
76 tree
= ET
.fromstring(xml_string
)
77 for entry
in tree
.findall('logentry'):
79 d
['revision'] = int(entry
.get('revision'))
80 # Some revisions don't have authors, most notably the first revision
82 # logentry nodes targeting directories protected by path-based
83 # authentication have no child nodes at all. We return an entry
84 # in that case. Anyway, as it has no path entries, no further
85 # processing will be made.
86 author
= entry
.find('author')
87 date
= entry
.find('date')
88 msg
= entry
.find('msg')
89 d
['author'] = author
is not None and author
.text
or "No author"
91 d
['date'] = svn_date_to_timestamp(date
.text
)
94 d
['message'] = msg
is not None and msg
.text
.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
96 for path
in entry
.findall('.//paths/path'):
97 copyfrom_rev
= path
.get('copyfrom-rev')
99 copyfrom_rev
= int(copyfrom_rev
)
102 'kind': path
.get('kind'),
103 'action': path
.get('action'),
104 'copyfrom_path': path
.get('copyfrom-path'),
105 'copyfrom_revision': copyfrom_rev
,
107 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
108 # can process actions in depth-first order.
109 d
['changed_paths'] = sorted(paths
, key
=operator
.itemgetter('path'))
111 for prop
in entry
.findall('.//revprops/property'):
112 revprops
.append({ 'name': prop.get('name'), 'value': prop.text }
)
113 d
['revprops'] = revprops
117 def parse_svn_status_xml(xml_string
, base_dir
=None, ignore_externals
=False):
119 Parse the XML output from an "svn status" command and extract useful info
120 as a list of dicts (one per status entry).
123 base_dir
= os
.path
.normcase(base_dir
)
125 xml_string
= strip_forbidden_xml_chars(xml_string
)
126 tree
= ET
.fromstring(xml_string
)
127 for entry
in tree
.findall('.//entry'):
129 path
= entry
.get('path')
130 if base_dir
is not None:
131 assert os
.path
.normcase(path
).startswith(base_dir
)
132 path
= path
[len(base_dir
):].lstrip('/\\')
134 wc_status
= entry
.find('wc-status')
135 if wc_status
.get('item') == 'external':
138 status
= wc_status
.get('item')
139 revision
= wc_status
.get('revision')
140 if status
== 'external':
141 d
['type'] = 'external'
142 elif revision
is not None:
145 d
['type'] = 'unversioned'
147 d
['revision'] = revision
148 d
['props'] = wc_status
.get('props')
149 d
['copied'] = wc_status
.get('copied')
153 def get_svn_info(svn_url_or_wc
, rev_number
=None):
155 Get SVN information for the given URL or working copy, with an optionally
156 specified revision number.
157 Returns a dict as created by parse_svn_info_xml().
159 args
= ['info', '--xml']
160 if rev_number
is not None:
161 args
+= ["-r", rev_number
, svn_url_or_wc
+"@"+str(rev_number
)]
163 args
+= [svn_url_or_wc
]
164 xml_string
= run_svn(args
, fail_if_stderr
=True)
165 return parse_svn_info_xml(xml_string
)
167 def svn_checkout(svn_url
, checkout_dir
, rev_number
=None):
169 Checkout the given URL at an optional revision number.
171 args
= ['checkout', '-q']
172 if rev_number
is not None:
173 args
+= ['-r', rev_number
]
174 args
+= [svn_url
, checkout_dir
]
177 def run_svn_log(svn_url_or_wc
, rev_start
, rev_end
, limit
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
179 Fetch up to 'limit' SVN log entries between the given revisions.
181 args
= ['log', '--xml']
183 args
+= ['--stop-on-copy']
184 if get_changed_paths
:
187 args
+= ['--with-all-revprops']
188 url
= str(svn_url_or_wc
)
189 args
+= ['-r', '%s:%s' % (rev_start
, rev_end
)]
190 if not "@" in svn_url_or_wc
:
191 url
= "%s@%s" % (svn_url_or_wc
, str(max(rev_start
, rev_end
)))
192 args
+= ['--limit', str(limit
), url
]
193 xml_string
= run_svn(args
)
194 return parse_svn_log_xml(xml_string
)
196 def get_svn_status(svn_wc
, quiet
=False, no_recursive
=False):
198 Get SVN status information about the given working copy.
200 # Ensure proper stripping by canonicalizing the path
201 svn_wc
= os
.path
.abspath(svn_wc
)
202 args
= ['status', '--xml', '--ignore-externals']
209 xml_string
= run_svn(args
+ [svn_wc
])
210 return parse_svn_status_xml(xml_string
, svn_wc
, ignore_externals
=True)
212 def get_svn_versioned_files(svn_wc
):
214 Get the list of versioned files in the SVN working copy.
217 for e
in get_svn_status(svn_wc
):
218 if e
['path'] and e
['type'] == 'normal':
219 contents
.append(e
['path'])
222 def get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
224 Get the first SVN log entry in the requested revision range.
226 entries
= run_svn_log(svn_url
, rev_start
, rev_end
, 1, stop_on_copy
, get_changed_paths
, get_revprops
)
229 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
230 (svn_url
, rev_start
, rev_end
))
232 def get_first_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True):
234 Get the first log entry after (or at) the given revision number in an SVN branch.
235 By default the revision number is set to 0, which will give you the log
236 entry corresponding to the branch creaction.
238 NOTE: to know whether the branch creation corresponds to an SVN import or
239 a copy from another branch, inspect elements of the 'changed_paths' entry
240 in the returned dictionary.
242 return get_one_svn_log_entry(svn_url
, rev_start
, rev_end
, stop_on_copy
=True, get_changed_paths
=True)
244 def get_last_svn_log_entry(svn_url
, rev_start
, rev_end
, get_changed_paths
=True):
246 Get the last log entry before/at the given revision number in an SVN branch.
247 By default the revision number is set to HEAD, which will give you the log
248 entry corresponding to the latest commit in branch.
250 return get_one_svn_log_entry(svn_url
, rev_end
, rev_start
, stop_on_copy
=True, get_changed_paths
=True)
253 log_duration_threshold
= 10.0
254 log_min_chunk_length
= 10
256 def iter_svn_log_entries(svn_url
, first_rev
, last_rev
, stop_on_copy
=False, get_changed_paths
=True, get_revprops
=False):
258 Iterate over SVN log entries between first_rev and last_rev.
260 This function features chunked log fetching so that it isn't too nasty
261 to the SVN server if many entries are requested.
264 chunk_length
= log_min_chunk_length
266 while last_rev
== "HEAD" or cur_rev
<= last_rev
:
267 start_t
= time
.time()
268 stop_rev
= min(last_rev
, cur_rev
+ chunk_length
)
269 entries
= run_svn_log(svn_url
, cur_rev
, "HEAD", chunk_length
,
270 stop_on_copy
, get_changed_paths
, get_revprops
)
271 duration
= time
.time() - start_t
273 # skip first revision on subsequent runs, as it is overlapped
279 if e
['revision'] > last_rev
:
282 if e
['revision'] >= last_rev
:
284 cur_rev
= e
['revision']
285 # Adapt chunk length based on measured request duration
286 if duration
< log_duration_threshold
:
287 chunk_length
= int(chunk_length
* 2.0)
288 elif duration
> log_duration_threshold
* 2:
289 chunk_length
= max(log_min_chunk_length
, int(chunk_length
/ 2.0))
292 _svn_client_version
= None
294 def get_svn_client_version():
295 """Returns the SVN client version as a tuple.
297 The returned tuple only contains numbers, non-digits in version string are
300 global _svn_client_version
301 if _svn_client_version
is None:
302 raw
= run_svn(['--version', '-q']).strip()
303 _svn_client_version
= tuple(map(int, [x
for x
in raw
.split('.')
305 return _svn_client_version