]> Tony Duckles's Git Repositories (git.nynim.org) - svn2svn.git/blob - svn2svn/svnclient.py
* Use ui.status for all status messages (verbose and debug)
[svn2svn.git] / svn2svn / svnclient.py
1 """ SVN client functions """
2
3 from shell import run_svn
4 from errors import EmptySVNLog
5
6 import os
7 import time
8 import calendar
9 import operator
10 from operator import itemgetter
11
12 try:
13 from xml.etree import cElementTree as ET
14 except ImportError:
15 try:
16 from xml.etree import ElementTree as ET
17 except ImportError:
18 try:
19 import cElementTree as ET
20 except ImportError:
21 from elementtree import ElementTree as ET
22
23 svn_log_args = ['log', '--xml']
24 svn_info_args = ['info', '--xml']
25 svn_checkout_args = ['checkout', '-q']
26 svn_status_args = ['status', '--xml', '--ignore-externals']
27
28 _identity_table = "".join(map(chr, range(256)))
29 _forbidden_xml_chars = "".join(
30 set(map(chr, range(32))) - set('\x09\x0A\x0D')
31 )
32
33
34 def strip_forbidden_xml_chars(xml_string):
35 """
36 Given an XML string, strips forbidden characters as per the XML spec.
37 (these are all control characters except 0x9, 0xA and 0xD).
38 """
39 return xml_string.translate(_identity_table, _forbidden_xml_chars)
40
41
42 def svn_date_to_timestamp(svn_date):
43 """
44 Parse an SVN date as read from the XML output and return the corresponding
45 timestamp.
46 """
47 # Strip microseconds and timezone (always UTC, hopefully)
48 # XXX there are various ISO datetime parsing routines out there,
49 # cf. http://seehuhn.de/comp/pdate
50 date = svn_date.split('.', 2)[0]
51 time_tuple = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
52 return calendar.timegm(time_tuple)
53
54 def parse_svn_info_xml(xml_string):
55 """
56 Parse the XML output from an "svn info" command and extract useful information
57 as a dict.
58 """
59 d = {}
60 xml_string = strip_forbidden_xml_chars(xml_string)
61 tree = ET.fromstring(xml_string)
62 entry = tree.find('.//entry')
63 d['url'] = entry.find('url').text
64 d['kind'] = entry.get('kind')
65 d['revision'] = int(entry.get('revision'))
66 d['repos_url'] = tree.find('.//repository/root').text
67 d['repos_uuid'] = tree.find('.//repository/uuid').text
68 d['last_changed_rev'] = int(tree.find('.//commit').get('revision'))
69 author_element = tree.find('.//commit/author')
70 if author_element is not None:
71 d['last_changed_author'] = author_element.text
72 d['last_changed_date'] = svn_date_to_timestamp(tree.find('.//commit/date').text)
73 return d
74
75 def parse_svn_log_xml(xml_string):
76 """
77 Parse the XML output from an "svn log" command and extract useful information
78 as a list of dicts (one per log changeset).
79 """
80 l = []
81 xml_string = strip_forbidden_xml_chars(xml_string)
82 tree = ET.fromstring(xml_string)
83 for entry in tree.findall('logentry'):
84 d = {}
85 d['revision'] = int(entry.get('revision'))
86 # Some revisions don't have authors, most notably the first revision
87 # in a repository.
88 # logentry nodes targeting directories protected by path-based
89 # authentication have no child nodes at all. We return an entry
90 # in that case. Anyway, as it has no path entries, no further
91 # processing will be made.
92 author = entry.find('author')
93 date = entry.find('date')
94 msg = entry.find('msg')
95 d['author'] = author is not None and author.text or "No author"
96 if date is not None:
97 d['date'] = svn_date_to_timestamp(date.text)
98 else:
99 d['date'] = None
100 d['message'] = msg is not None and msg.text.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
101 paths = []
102 for path in entry.findall('.//paths/path'):
103 copyfrom_rev = path.get('copyfrom-rev')
104 if copyfrom_rev:
105 copyfrom_rev = int(copyfrom_rev)
106 paths.append({
107 'path': path.text,
108 'kind': path.get('kind'),
109 'action': path.get('action'),
110 'copyfrom_path': path.get('copyfrom-path'),
111 'copyfrom_revision': copyfrom_rev,
112 })
113 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
114 # can process actions in depth-first order.
115 d['changed_paths'] = sorted(paths, key=itemgetter('path'))
116 revprops = []
117 for prop in entry.findall('.//revprops/property'):
118 revprops.append({ 'name': prop.get('name'), 'value': prop.text })
119 d['revprops'] = revprops
120 l.append(d)
121 return l
122
123 def parse_svn_status_xml(xml_string, base_dir=None, ignore_externals=False):
124 """
125 Parse the XML output from an "svn status" command and extract useful info
126 as a list of dicts (one per status entry).
127 """
128 if base_dir:
129 base_dir = os.path.normcase(base_dir)
130 l = []
131 xml_string = strip_forbidden_xml_chars(xml_string)
132 tree = ET.fromstring(xml_string)
133 for entry in tree.findall('.//entry'):
134 d = {}
135 path = entry.get('path')
136 if base_dir is not None:
137 assert os.path.normcase(path).startswith(base_dir)
138 path = path[len(base_dir):].lstrip('/\\')
139 d['path'] = path
140 wc_status = entry.find('wc-status')
141 if wc_status.get('item') == 'external':
142 if ignore_externals:
143 continue
144 status = wc_status.get('item')
145 revision = wc_status.get('revision')
146 if status == 'external':
147 d['type'] = 'external'
148 elif revision is not None:
149 d['type'] = 'normal'
150 else:
151 d['type'] = 'unversioned'
152 d['status'] = status
153 d['revision'] = revision
154 d['props'] = wc_status.get('props')
155 d['copied'] = wc_status.get('copied')
156 l.append(d)
157 return l
158
159 def get_svn_info(svn_url_or_wc, rev_number=None):
160 """
161 Get SVN information for the given URL or working copy, with an optionally
162 specified revision number.
163 Returns a dict as created by parse_svn_info_xml().
164 """
165 if rev_number is not None:
166 args = ["-r", rev_number, svn_url_or_wc+"@"+str(rev_number)]
167 else:
168 args = [svn_url_or_wc]
169 xml_string = run_svn(svn_info_args + args, fail_if_stderr=True)
170 return parse_svn_info_xml(xml_string)
171
172 def svn_checkout(svn_url, checkout_dir, rev_number=None):
173 """
174 Checkout the given URL at an optional revision number.
175 """
176 args = []
177 if rev_number is not None:
178 args += ['-r', rev_number]
179 args += [svn_url, checkout_dir]
180 return run_svn(svn_checkout_args + args)
181
182 def run_svn_log(svn_url_or_wc, rev_start, rev_end, limit, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
183 """
184 Fetch up to 'limit' SVN log entries between the given revisions.
185 """
186 args = []
187 if stop_on_copy:
188 args += ['--stop-on-copy']
189 if get_changed_paths:
190 args += ['-v']
191 if get_revprops:
192 args += ['--with-all-revprops']
193 url = str(svn_url_or_wc)
194 if rev_start != 'HEAD' and rev_end != 'HEAD':
195 args += ['-r', '%s:%s' % (rev_start, rev_end)]
196 if not "@" in svn_url_or_wc:
197 url = "%s@%s" % (svn_url_or_wc, str(max(rev_start, rev_end)))
198 args += ['--limit', str(limit), url]
199 xml_string = run_svn(svn_log_args + args)
200 return parse_svn_log_xml(xml_string)
201
202 def get_svn_status(svn_wc, quiet=False, no_recursive=False):
203 """
204 Get SVN status information about the given working copy.
205 """
206 # Ensure proper stripping by canonicalizing the path
207 svn_wc = os.path.abspath(svn_wc)
208 args = []
209 if quiet:
210 args += ['-q']
211 else:
212 args += ['-v']
213 if no_recursive:
214 args += ['-N']
215 xml_string = run_svn(svn_status_args + args + [svn_wc])
216 return parse_svn_status_xml(xml_string, svn_wc, ignore_externals=True)
217
218 def get_svn_versioned_files(svn_wc):
219 """
220 Get the list of versioned files in the SVN working copy.
221 """
222 contents = []
223 for e in get_svn_status(svn_wc):
224 if e['path'] and e['type'] == 'normal':
225 contents.append(e['path'])
226 return contents
227
228 def get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
229 """
230 Get the first SVN log entry in the requested revision range.
231 """
232 entries = run_svn_log(svn_url, rev_start, rev_end, 1, stop_on_copy, get_changed_paths, get_revprops)
233 if entries:
234 return entries[0]
235 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
236 (svn_url, rev_start, rev_end))
237
238 def get_first_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
239 """
240 Get the first log entry after (or at) the given revision number in an SVN branch.
241 By default the revision number is set to 0, which will give you the log
242 entry corresponding to the branch creaction.
243
244 NOTE: to know whether the branch creation corresponds to an SVN import or
245 a copy from another branch, inspect elements of the 'changed_paths' entry
246 in the returned dictionary.
247 """
248 return get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=True, get_changed_paths=True)
249
250 def get_last_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
251 """
252 Get the last log entry before/at the given revision number in an SVN branch.
253 By default the revision number is set to HEAD, which will give you the log
254 entry corresponding to the latest commit in branch.
255 """
256 return get_one_svn_log_entry(svn_url, rev_end, rev_start, stop_on_copy=True, get_changed_paths=True)
257
258
259 log_duration_threshold = 10.0
260 log_min_chunk_length = 10
261
262 def iter_svn_log_entries(svn_url, first_rev, last_rev, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
263 """
264 Iterate over SVN log entries between first_rev and last_rev.
265
266 This function features chunked log fetching so that it isn't too nasty
267 to the SVN server if many entries are requested.
268 """
269 cur_rev = first_rev
270 chunk_length = log_min_chunk_length
271 first_run = True
272 while last_rev == "HEAD" or cur_rev <= last_rev:
273 start_t = time.time()
274 stop_rev = min(last_rev, cur_rev + chunk_length)
275 entries = run_svn_log(svn_url, cur_rev, "HEAD", chunk_length,
276 stop_on_copy, get_changed_paths, get_revprops)
277 duration = time.time() - start_t
278 if not first_run:
279 # skip first revision on subsequent runs, as it is overlapped
280 entries.pop(0)
281 first_run = False
282 if not entries:
283 break
284 for e in entries:
285 if e['revision'] > last_rev:
286 break
287 yield e
288 if e['revision'] >= last_rev:
289 break
290 cur_rev = e['revision']
291 # Adapt chunk length based on measured request duration
292 if duration < log_duration_threshold:
293 chunk_length = int(chunk_length * 2.0)
294 elif duration > log_duration_threshold * 2:
295 chunk_length = max(log_min_chunk_length, int(chunk_length / 2.0))
296
297
298 _svn_client_version = None
299
300 def get_svn_client_version():
301 """Returns the SVN client version as a tuple.
302
303 The returned tuple only contains numbers, non-digits in version string are
304 silently ignored.
305 """
306 global _svn_client_version
307 if _svn_client_version is None:
308 raw = run_svn(['--version', '-q']).strip()
309 _svn_client_version = tuple(map(int, [x for x in raw.split('.')
310 if x.isdigit()]))
311 return _svn_client_version