]> Tony Duckles's Git Repositories (git.nynim.org) - svn2svn.git/blob - svn2svn/svnclient.py
Remove HEAD-specific short-circuiting
[svn2svn.git] / svn2svn / svnclient.py
1 """ SVN client functions """
2
3 from shell import run_svn
4 from errors import EmptySVNLog
5
6 import os
7 import time
8 import calendar
9 import operator
10 from operator import itemgetter
11
12 try:
13 from xml.etree import cElementTree as ET
14 except ImportError:
15 try:
16 from xml.etree import ElementTree as ET
17 except ImportError:
18 try:
19 import cElementTree as ET
20 except ImportError:
21 from elementtree import ElementTree as ET
22
23 svn_log_args = ['log', '--xml']
24 svn_info_args = ['info', '--xml']
25 svn_checkout_args = ['checkout', '-q']
26 svn_status_args = ['status', '--xml', '--ignore-externals']
27
28 _identity_table = "".join(map(chr, range(256)))
29 _forbidden_xml_chars = "".join(
30 set(map(chr, range(32))) - set('\x09\x0A\x0D')
31 )
32
33
34 def strip_forbidden_xml_chars(xml_string):
35 """
36 Given an XML string, strips forbidden characters as per the XML spec.
37 (these are all control characters except 0x9, 0xA and 0xD).
38 """
39 return xml_string.translate(_identity_table, _forbidden_xml_chars)
40
41
42 def svn_date_to_timestamp(svn_date):
43 """
44 Parse an SVN date as read from the XML output and return the corresponding
45 timestamp.
46 """
47 # Strip microseconds and timezone (always UTC, hopefully)
48 # XXX there are various ISO datetime parsing routines out there,
49 # cf. http://seehuhn.de/comp/pdate
50 date = svn_date.split('.', 2)[0]
51 time_tuple = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
52 return calendar.timegm(time_tuple)
53
54 def parse_svn_info_xml(xml_string):
55 """
56 Parse the XML output from an "svn info" command and extract useful information
57 as a dict.
58 """
59 d = {}
60 xml_string = strip_forbidden_xml_chars(xml_string)
61 tree = ET.fromstring(xml_string)
62 entry = tree.find('.//entry')
63 d['url'] = entry.find('url').text
64 d['kind'] = entry.get('kind')
65 d['revision'] = int(entry.get('revision'))
66 d['repos_url'] = tree.find('.//repository/root').text
67 d['repos_uuid'] = tree.find('.//repository/uuid').text
68 d['last_changed_rev'] = int(tree.find('.//commit').get('revision'))
69 author_element = tree.find('.//commit/author')
70 if author_element is not None:
71 d['last_changed_author'] = author_element.text
72 d['last_changed_date'] = svn_date_to_timestamp(tree.find('.//commit/date').text)
73 return d
74
75 def parse_svn_log_xml(xml_string):
76 """
77 Parse the XML output from an "svn log" command and extract useful information
78 as a list of dicts (one per log changeset).
79 """
80 l = []
81 xml_string = strip_forbidden_xml_chars(xml_string)
82 tree = ET.fromstring(xml_string)
83 for entry in tree.findall('logentry'):
84 d = {}
85 d['revision'] = int(entry.get('revision'))
86 # Some revisions don't have authors, most notably the first revision
87 # in a repository.
88 # logentry nodes targeting directories protected by path-based
89 # authentication have no child nodes at all. We return an entry
90 # in that case. Anyway, as it has no path entries, no further
91 # processing will be made.
92 author = entry.find('author')
93 date = entry.find('date')
94 msg = entry.find('msg')
95 d['author'] = author is not None and author.text or "No author"
96 if date is not None:
97 d['date'] = svn_date_to_timestamp(date.text)
98 else:
99 d['date'] = None
100 d['message'] = msg is not None and msg.text.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
101 paths = []
102 for path in entry.findall('.//paths/path'):
103 copyfrom_rev = path.get('copyfrom-rev')
104 if copyfrom_rev:
105 copyfrom_rev = int(copyfrom_rev)
106 paths.append({
107 'path': path.text,
108 'kind': path.get('kind'),
109 'action': path.get('action'),
110 'copyfrom_path': path.get('copyfrom-path'),
111 'copyfrom_revision': copyfrom_rev,
112 })
113 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
114 # can process actions in depth-first order.
115 d['changed_paths'] = sorted(paths, key=itemgetter('path'))
116 revprops = []
117 for prop in entry.findall('.//revprops/property'):
118 revprops.append({ 'name': prop.get('name'), 'value': prop.text })
119 d['revprops'] = revprops
120 l.append(d)
121 return l
122
123 def parse_svn_status_xml(xml_string, base_dir=None, ignore_externals=False):
124 """
125 Parse the XML output from an "svn status" command and extract useful info
126 as a list of dicts (one per status entry).
127 """
128 if base_dir:
129 base_dir = os.path.normcase(base_dir)
130 l = []
131 xml_string = strip_forbidden_xml_chars(xml_string)
132 tree = ET.fromstring(xml_string)
133 for entry in tree.findall('.//entry'):
134 d = {}
135 path = entry.get('path')
136 if base_dir is not None:
137 assert os.path.normcase(path).startswith(base_dir)
138 path = path[len(base_dir):].lstrip('/\\')
139 d['path'] = path
140 wc_status = entry.find('wc-status')
141 if wc_status.get('item') == 'external':
142 if ignore_externals:
143 continue
144 status = wc_status.get('item')
145 revision = wc_status.get('revision')
146 if status == 'external':
147 d['type'] = 'external'
148 elif revision is not None:
149 d['type'] = 'normal'
150 else:
151 d['type'] = 'unversioned'
152 d['status'] = status
153 d['revision'] = revision
154 d['props'] = wc_status.get('props')
155 d['copied'] = wc_status.get('copied')
156 l.append(d)
157 return l
158
159 def get_svn_info(svn_url_or_wc, rev_number=None):
160 """
161 Get SVN information for the given URL or working copy, with an optionally
162 specified revision number.
163 Returns a dict as created by parse_svn_info_xml().
164 """
165 if rev_number is not None:
166 args = ["-r", rev_number, svn_url_or_wc+"@"+str(rev_number)]
167 else:
168 args = [svn_url_or_wc]
169 xml_string = run_svn(svn_info_args + args, fail_if_stderr=True)
170 return parse_svn_info_xml(xml_string)
171
172 def svn_checkout(svn_url, checkout_dir, rev_number=None):
173 """
174 Checkout the given URL at an optional revision number.
175 """
176 args = []
177 if rev_number is not None:
178 args += ['-r', rev_number]
179 args += [svn_url, checkout_dir]
180 return run_svn(svn_checkout_args + args)
181
182 def run_svn_log(svn_url_or_wc, rev_start, rev_end, limit, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
183 """
184 Fetch up to 'limit' SVN log entries between the given revisions.
185 """
186 args = []
187 if stop_on_copy:
188 args += ['--stop-on-copy']
189 if get_changed_paths:
190 args += ['-v']
191 if get_revprops:
192 args += ['--with-all-revprops']
193 url = str(svn_url_or_wc)
194 args += ['-r', '%s:%s' % (rev_start, rev_end)]
195 if not "@" in svn_url_or_wc:
196 url = "%s@%s" % (svn_url_or_wc, str(max(rev_start, rev_end)))
197 args += ['--limit', str(limit), url]
198 xml_string = run_svn(svn_log_args + args)
199 return parse_svn_log_xml(xml_string)
200
201 def get_svn_status(svn_wc, quiet=False, no_recursive=False):
202 """
203 Get SVN status information about the given working copy.
204 """
205 # Ensure proper stripping by canonicalizing the path
206 svn_wc = os.path.abspath(svn_wc)
207 args = []
208 if quiet:
209 args += ['-q']
210 else:
211 args += ['-v']
212 if no_recursive:
213 args += ['-N']
214 xml_string = run_svn(svn_status_args + args + [svn_wc])
215 return parse_svn_status_xml(xml_string, svn_wc, ignore_externals=True)
216
217 def get_svn_versioned_files(svn_wc):
218 """
219 Get the list of versioned files in the SVN working copy.
220 """
221 contents = []
222 for e in get_svn_status(svn_wc):
223 if e['path'] and e['type'] == 'normal':
224 contents.append(e['path'])
225 return contents
226
227 def get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
228 """
229 Get the first SVN log entry in the requested revision range.
230 """
231 entries = run_svn_log(svn_url, rev_start, rev_end, 1, stop_on_copy, get_changed_paths, get_revprops)
232 if entries:
233 return entries[0]
234 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
235 (svn_url, rev_start, rev_end))
236
237 def get_first_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
238 """
239 Get the first log entry after (or at) the given revision number in an SVN branch.
240 By default the revision number is set to 0, which will give you the log
241 entry corresponding to the branch creaction.
242
243 NOTE: to know whether the branch creation corresponds to an SVN import or
244 a copy from another branch, inspect elements of the 'changed_paths' entry
245 in the returned dictionary.
246 """
247 return get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=True, get_changed_paths=True)
248
249 def get_last_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
250 """
251 Get the last log entry before/at the given revision number in an SVN branch.
252 By default the revision number is set to HEAD, which will give you the log
253 entry corresponding to the latest commit in branch.
254 """
255 return get_one_svn_log_entry(svn_url, rev_end, rev_start, stop_on_copy=True, get_changed_paths=True)
256
257
258 log_duration_threshold = 10.0
259 log_min_chunk_length = 10
260
261 def iter_svn_log_entries(svn_url, first_rev, last_rev, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
262 """
263 Iterate over SVN log entries between first_rev and last_rev.
264
265 This function features chunked log fetching so that it isn't too nasty
266 to the SVN server if many entries are requested.
267 """
268 cur_rev = first_rev
269 chunk_length = log_min_chunk_length
270 first_run = True
271 while last_rev == "HEAD" or cur_rev <= last_rev:
272 start_t = time.time()
273 stop_rev = min(last_rev, cur_rev + chunk_length)
274 entries = run_svn_log(svn_url, cur_rev, "HEAD", chunk_length,
275 stop_on_copy, get_changed_paths, get_revprops)
276 duration = time.time() - start_t
277 if not first_run:
278 # skip first revision on subsequent runs, as it is overlapped
279 entries.pop(0)
280 first_run = False
281 if not entries:
282 break
283 for e in entries:
284 if e['revision'] > last_rev:
285 break
286 yield e
287 if e['revision'] >= last_rev:
288 break
289 cur_rev = e['revision']
290 # Adapt chunk length based on measured request duration
291 if duration < log_duration_threshold:
292 chunk_length = int(chunk_length * 2.0)
293 elif duration > log_duration_threshold * 2:
294 chunk_length = max(log_min_chunk_length, int(chunk_length / 2.0))
295
296
297 _svn_client_version = None
298
299 def get_svn_client_version():
300 """Returns the SVN client version as a tuple.
301
302 The returned tuple only contains numbers, non-digits in version string are
303 silently ignored.
304 """
305 global _svn_client_version
306 if _svn_client_version is None:
307 raw = run_svn(['--version', '-q']).strip()
308 _svn_client_version = tuple(map(int, [x for x in raw.split('.')
309 if x.isdigit()]))
310 return _svn_client_version