]> Tony Duckles's Git Repositories (git.nynim.org) - svn2svn.git/blob - svn2svn/svnclient.py
Migrate to run/svn2svn.py
[svn2svn.git] / svn2svn / svnclient.py
1 """ SVN client functions """
2
3 from . import ui
4 from shell import run_svn
5 from errors import EmptySVNLog
6
7 import os
8 import time
9 import calendar
10 import operator
11 from operator import itemgetter
12
13 try:
14 from xml.etree import cElementTree as ET
15 except ImportError:
16 try:
17 from xml.etree import ElementTree as ET
18 except ImportError:
19 try:
20 import cElementTree as ET
21 except ImportError:
22 from elementtree import ElementTree as ET
23
24 svn_log_args = ['log', '--xml']
25 svn_info_args = ['info', '--xml']
26 svn_checkout_args = ['checkout', '-q']
27 svn_status_args = ['status', '--xml', '-v', '--ignore-externals']
28
29 _identity_table = "".join(map(chr, range(256)))
30 _forbidden_xml_chars = "".join(
31 set(map(chr, range(32))) - set('\x09\x0A\x0D')
32 )
33
34
35 def strip_forbidden_xml_chars(xml_string):
36 """
37 Given an XML string, strips forbidden characters as per the XML spec.
38 (these are all control characters except 0x9, 0xA and 0xD).
39 """
40 return xml_string.translate(_identity_table, _forbidden_xml_chars)
41
42
43 def svn_date_to_timestamp(svn_date):
44 """
45 Parse an SVN date as read from the XML output and return the corresponding
46 timestamp.
47 """
48 # Strip microseconds and timezone (always UTC, hopefully)
49 # XXX there are various ISO datetime parsing routines out there,
50 # cf. http://seehuhn.de/comp/pdate
51 date = svn_date.split('.', 2)[0]
52 time_tuple = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
53 return calendar.timegm(time_tuple)
54
55 def parse_svn_info_xml(xml_string):
56 """
57 Parse the XML output from an "svn info" command and extract useful information
58 as a dict.
59 """
60 d = {}
61 xml_string = strip_forbidden_xml_chars(xml_string)
62 tree = ET.fromstring(xml_string)
63 entry = tree.find('.//entry')
64 d['url'] = entry.find('url').text
65 d['kind'] = entry.get('kind')
66 d['revision'] = int(entry.get('revision'))
67 d['repos_url'] = tree.find('.//repository/root').text
68 d['repos_uuid'] = tree.find('.//repository/uuid').text
69 d['last_changed_rev'] = int(tree.find('.//commit').get('revision'))
70 author_element = tree.find('.//commit/author')
71 if author_element is not None:
72 d['last_changed_author'] = author_element.text
73 d['last_changed_date'] = svn_date_to_timestamp(tree.find('.//commit/date').text)
74 return d
75
76 def parse_svn_log_xml(xml_string):
77 """
78 Parse the XML output from an "svn log" command and extract useful information
79 as a list of dicts (one per log changeset).
80 """
81 l = []
82 xml_string = strip_forbidden_xml_chars(xml_string)
83 tree = ET.fromstring(xml_string)
84 for entry in tree.findall('logentry'):
85 d = {}
86 d['revision'] = int(entry.get('revision'))
87 # Some revisions don't have authors, most notably the first revision
88 # in a repository.
89 # logentry nodes targeting directories protected by path-based
90 # authentication have no child nodes at all. We return an entry
91 # in that case. Anyway, as it has no path entries, no further
92 # processing will be made.
93 author = entry.find('author')
94 date = entry.find('date')
95 msg = entry.find('msg')
96 d['author'] = author is not None and author.text or "No author"
97 if date is not None:
98 d['date'] = svn_date_to_timestamp(date.text)
99 else:
100 d['date'] = None
101 d['message'] = msg is not None and msg.text.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
102 paths = []
103 for path in entry.findall('.//paths/path'):
104 copyfrom_rev = path.get('copyfrom-rev')
105 if copyfrom_rev:
106 copyfrom_rev = int(copyfrom_rev)
107 paths.append({
108 'path': path.text,
109 'kind': path.get('kind'),
110 'action': path.get('action'),
111 'copyfrom_path': path.get('copyfrom-path'),
112 'copyfrom_revision': copyfrom_rev,
113 })
114 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
115 # can process actions in depth-first order.
116 d['changed_paths'] = sorted(paths, key=itemgetter('path'))
117 revprops = []
118 for prop in entry.findall('.//revprops/property'):
119 revprops.append({ 'name': prop.get('name'), 'value': prop.text })
120 d['revprops'] = revprops
121 l.append(d)
122 return l
123
124 def parse_svn_status_xml(xml_string, base_dir=None, ignore_externals=False):
125 """
126 Parse the XML output from an "svn status" command and extract useful info
127 as a list of dicts (one per status entry).
128 """
129 if base_dir:
130 base_dir = os.path.normcase(base_dir)
131 l = []
132 xml_string = strip_forbidden_xml_chars(xml_string)
133 tree = ET.fromstring(xml_string)
134 for entry in tree.findall('.//entry'):
135 d = {}
136 path = entry.get('path')
137 if base_dir is not None:
138 assert os.path.normcase(path).startswith(base_dir)
139 path = path[len(base_dir):].lstrip('/\\')
140 d['path'] = path
141 wc_status = entry.find('wc-status')
142 if wc_status.get('item') == 'external':
143 if ignore_externals:
144 continue
145 status = wc_status.get('item')
146 revision = wc_status.get('revision')
147 if status == 'external':
148 d['type'] = 'external'
149 elif revision is not None:
150 d['type'] = 'normal'
151 else:
152 d['type'] = 'unversioned'
153 d['status'] = status
154 d['revision'] = revision
155 d['props'] = wc_status.get('props')
156 d['copied'] = wc_status.get('copied')
157 l.append(d)
158 return l
159
160 def get_svn_info(svn_url_or_wc, rev_number=None):
161 """
162 Get SVN information for the given URL or working copy, with an optionally
163 specified revision number.
164 Returns a dict as created by parse_svn_info_xml().
165 """
166 if rev_number is not None:
167 args = ["-r", rev_number, svn_url_or_wc+"@"+str(rev_number)]
168 else:
169 args = [svn_url_or_wc]
170 xml_string = run_svn(svn_info_args + args, fail_if_stderr=True)
171 return parse_svn_info_xml(xml_string)
172
173 def svn_checkout(svn_url, checkout_dir, rev_number=None):
174 """
175 Checkout the given URL at an optional revision number.
176 """
177 args = []
178 if rev_number is not None:
179 args += ['-r', rev_number]
180 args += [svn_url, checkout_dir]
181 return run_svn(svn_checkout_args + args)
182
183 def run_svn_log(svn_url_or_wc, rev_start, rev_end, limit, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
184 """
185 Fetch up to 'limit' SVN log entries between the given revisions.
186 """
187 args = []
188 if stop_on_copy:
189 args += ['--stop-on-copy']
190 if get_changed_paths:
191 args += ['-v']
192 if get_revprops:
193 args += ['--with-all-revprops']
194 url = str(svn_url_or_wc)
195 if rev_start != 'HEAD' and rev_end != 'HEAD':
196 args += ['-r', '%s:%s' % (rev_start, rev_end)]
197 if not "@" in svn_url_or_wc:
198 url = "%s@%s" % (svn_url_or_wc, str(max(rev_start, rev_end)))
199 args += ['--limit', str(limit), url]
200 xml_string = run_svn(svn_log_args + args)
201 return parse_svn_log_xml(xml_string)
202
203 def get_svn_status(svn_wc, quiet=False, no_recursive=False):
204 """
205 Get SVN status information about the given working copy.
206 """
207 # Ensure proper stripping by canonicalizing the path
208 svn_wc = os.path.abspath(svn_wc)
209 args = [svn_wc]
210 if quiet:
211 args += ['-q']
212 else:
213 args += ['-v']
214 if no_recursive:
215 args += ['-N']
216 xml_string = run_svn(svn_status_args + args)
217 return parse_svn_status_xml(xml_string, svn_wc, ignore_externals=True)
218
219 def get_svn_versioned_files(svn_wc):
220 """
221 Get the list of versioned files in the SVN working copy.
222 """
223 contents = []
224 for e in get_svn_status(svn_wc):
225 if e['path'] and e['type'] == 'normal':
226 contents.append(e['path'])
227 return contents
228
229 def get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
230 """
231 Get the first SVN log entry in the requested revision range.
232 """
233 entries = run_svn_log(svn_url, rev_start, rev_end, 1, stop_on_copy, get_changed_paths, get_revprops)
234 if entries:
235 return entries[0]
236 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
237 (svn_url, rev_start, rev_end))
238
239 def get_first_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
240 """
241 Get the first log entry after (or at) the given revision number in an SVN branch.
242 By default the revision number is set to 0, which will give you the log
243 entry corresponding to the branch creaction.
244
245 NOTE: to know whether the branch creation corresponds to an SVN import or
246 a copy from another branch, inspect elements of the 'changed_paths' entry
247 in the returned dictionary.
248 """
249 return get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=True, get_changed_paths=True)
250
251 def get_last_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
252 """
253 Get the last log entry before/at the given revision number in an SVN branch.
254 By default the revision number is set to HEAD, which will give you the log
255 entry corresponding to the latest commit in branch.
256 """
257 return get_one_svn_log_entry(svn_url, rev_end, rev_start, stop_on_copy=True, get_changed_paths=True)
258
259
260 log_duration_threshold = 10.0
261 log_min_chunk_length = 10
262
263 def iter_svn_log_entries(svn_url, first_rev, last_rev, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
264 """
265 Iterate over SVN log entries between first_rev and last_rev.
266
267 This function features chunked log fetching so that it isn't too nasty
268 to the SVN server if many entries are requested.
269 """
270 cur_rev = first_rev
271 chunk_length = log_min_chunk_length
272 first_run = True
273 while last_rev == "HEAD" or cur_rev <= last_rev:
274 start_t = time.time()
275 stop_rev = min(last_rev, cur_rev + chunk_length)
276 entries = run_svn_log(svn_url, cur_rev, "HEAD", chunk_length,
277 stop_on_copy, get_changed_paths, get_revprops)
278 duration = time.time() - start_t
279 if not first_run:
280 # skip first revision on subsequent runs, as it is overlapped
281 entries.pop(0)
282 first_run = False
283 if not entries:
284 break
285 for e in entries:
286 if e['revision'] > last_rev:
287 break
288 yield e
289 if e['revision'] >= last_rev:
290 break
291 cur_rev = e['revision']
292 # Adapt chunk length based on measured request duration
293 if duration < log_duration_threshold:
294 chunk_length = int(chunk_length * 2.0)
295 elif duration > log_duration_threshold * 2:
296 chunk_length = max(log_min_chunk_length, int(chunk_length / 2.0))
297
298
299 _svn_client_version = None
300
301 def get_svn_client_version():
302 """Returns the SVN client version as a tuple.
303
304 The returned tuple only contains numbers, non-digits in version string are
305 silently ignored.
306 """
307 global _svn_client_version
308 if _svn_client_version is None:
309 raw = run_svn(['--version', '-q']).strip()
310 _svn_client_version = tuple(map(int, [x for x in raw.split('.')
311 if x.isdigit()]))
312 return _svn_client_version