]> Tony Duckles's Git Repositories (git.nynim.org) - svn2svn.git/blob - svn2svn/svnclient.py
Migrate shared code to commit_from_svn_log_entry()
[svn2svn.git] / svn2svn / svnclient.py
1 """ SVN client functions """
2
3 from shell import run_svn
4 from errors import EmptySVNLog
5
6 import os
7 import time
8 import calendar
9 import operator
10
11 try:
12 from xml.etree import cElementTree as ET
13 except ImportError:
14 try:
15 from xml.etree import ElementTree as ET
16 except ImportError:
17 try:
18 import cElementTree as ET
19 except ImportError:
20 from elementtree import ElementTree as ET
21
22 svn_log_args = ['log', '--xml']
23 svn_info_args = ['info', '--xml']
24 svn_checkout_args = ['checkout', '-q']
25 svn_status_args = ['status', '--xml', '--ignore-externals']
26
27 _identity_table = "".join(map(chr, range(256)))
28 _forbidden_xml_chars = "".join(
29 set(map(chr, range(32))) - set('\x09\x0A\x0D')
30 )
31
32
33 def strip_forbidden_xml_chars(xml_string):
34 """
35 Given an XML string, strips forbidden characters as per the XML spec.
36 (these are all control characters except 0x9, 0xA and 0xD).
37 """
38 return xml_string.translate(_identity_table, _forbidden_xml_chars)
39
40
41 def svn_date_to_timestamp(svn_date):
42 """
43 Parse an SVN date as read from the XML output and return the corresponding
44 timestamp.
45 """
46 # Strip microseconds and timezone (always UTC, hopefully)
47 # XXX there are various ISO datetime parsing routines out there,
48 # cf. http://seehuhn.de/comp/pdate
49 date = svn_date.split('.', 2)[0]
50 time_tuple = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
51 return calendar.timegm(time_tuple)
52
53 def parse_svn_info_xml(xml_string):
54 """
55 Parse the XML output from an "svn info" command and extract useful information
56 as a dict.
57 """
58 d = {}
59 xml_string = strip_forbidden_xml_chars(xml_string)
60 tree = ET.fromstring(xml_string)
61 entry = tree.find('.//entry')
62 d['url'] = entry.find('url').text
63 d['kind'] = entry.get('kind')
64 d['revision'] = int(entry.get('revision'))
65 d['repos_url'] = tree.find('.//repository/root').text
66 d['repos_uuid'] = tree.find('.//repository/uuid').text
67 d['last_changed_rev'] = int(tree.find('.//commit').get('revision'))
68 author_element = tree.find('.//commit/author')
69 if author_element is not None:
70 d['last_changed_author'] = author_element.text
71 d['last_changed_date'] = svn_date_to_timestamp(tree.find('.//commit/date').text)
72 return d
73
74 def parse_svn_log_xml(xml_string):
75 """
76 Parse the XML output from an "svn log" command and extract useful information
77 as a list of dicts (one per log changeset).
78 """
79 l = []
80 xml_string = strip_forbidden_xml_chars(xml_string)
81 tree = ET.fromstring(xml_string)
82 for entry in tree.findall('logentry'):
83 d = {}
84 d['revision'] = int(entry.get('revision'))
85 # Some revisions don't have authors, most notably the first revision
86 # in a repository.
87 # logentry nodes targeting directories protected by path-based
88 # authentication have no child nodes at all. We return an entry
89 # in that case. Anyway, as it has no path entries, no further
90 # processing will be made.
91 author = entry.find('author')
92 date = entry.find('date')
93 msg = entry.find('msg')
94 d['author'] = author is not None and author.text or "No author"
95 if date is not None:
96 d['date'] = svn_date_to_timestamp(date.text)
97 else:
98 d['date'] = None
99 d['message'] = msg is not None and msg.text.replace('\r\n', '\n').replace('\n\r', '\n').replace('\r', '\n') or ""
100 paths = []
101 for path in entry.findall('.//paths/path'):
102 copyfrom_rev = path.get('copyfrom-rev')
103 if copyfrom_rev:
104 copyfrom_rev = int(copyfrom_rev)
105 paths.append({
106 'path': path.text,
107 'kind': path.get('kind'),
108 'action': path.get('action'),
109 'copyfrom_path': path.get('copyfrom-path'),
110 'copyfrom_revision': copyfrom_rev,
111 })
112 # Sort paths (i.e. into hierarchical order), so that process_svn_log_entry()
113 # can process actions in depth-first order.
114 d['changed_paths'] = sorted(paths, key=operator.itemgetter('path'))
115 revprops = []
116 for prop in entry.findall('.//revprops/property'):
117 revprops.append({ 'name': prop.get('name'), 'value': prop.text })
118 d['revprops'] = revprops
119 l.append(d)
120 return l
121
122 def parse_svn_status_xml(xml_string, base_dir=None, ignore_externals=False):
123 """
124 Parse the XML output from an "svn status" command and extract useful info
125 as a list of dicts (one per status entry).
126 """
127 if base_dir:
128 base_dir = os.path.normcase(base_dir)
129 l = []
130 xml_string = strip_forbidden_xml_chars(xml_string)
131 tree = ET.fromstring(xml_string)
132 for entry in tree.findall('.//entry'):
133 d = {}
134 path = entry.get('path')
135 if base_dir is not None:
136 assert os.path.normcase(path).startswith(base_dir)
137 path = path[len(base_dir):].lstrip('/\\')
138 d['path'] = path
139 wc_status = entry.find('wc-status')
140 if wc_status.get('item') == 'external':
141 if ignore_externals:
142 continue
143 status = wc_status.get('item')
144 revision = wc_status.get('revision')
145 if status == 'external':
146 d['type'] = 'external'
147 elif revision is not None:
148 d['type'] = 'normal'
149 else:
150 d['type'] = 'unversioned'
151 d['status'] = status
152 d['revision'] = revision
153 d['props'] = wc_status.get('props')
154 d['copied'] = wc_status.get('copied')
155 l.append(d)
156 return l
157
158 def get_svn_info(svn_url_or_wc, rev_number=None):
159 """
160 Get SVN information for the given URL or working copy, with an optionally
161 specified revision number.
162 Returns a dict as created by parse_svn_info_xml().
163 """
164 if rev_number is not None:
165 args = ["-r", rev_number, svn_url_or_wc+"@"+str(rev_number)]
166 else:
167 args = [svn_url_or_wc]
168 xml_string = run_svn(svn_info_args + args, fail_if_stderr=True)
169 return parse_svn_info_xml(xml_string)
170
171 def svn_checkout(svn_url, checkout_dir, rev_number=None):
172 """
173 Checkout the given URL at an optional revision number.
174 """
175 args = []
176 if rev_number is not None:
177 args += ['-r', rev_number]
178 args += [svn_url, checkout_dir]
179 return run_svn(svn_checkout_args + args)
180
181 def run_svn_log(svn_url_or_wc, rev_start, rev_end, limit, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
182 """
183 Fetch up to 'limit' SVN log entries between the given revisions.
184 """
185 args = []
186 if stop_on_copy:
187 args += ['--stop-on-copy']
188 if get_changed_paths:
189 args += ['-v']
190 if get_revprops:
191 args += ['--with-all-revprops']
192 url = str(svn_url_or_wc)
193 args += ['-r', '%s:%s' % (rev_start, rev_end)]
194 if not "@" in svn_url_or_wc:
195 url = "%s@%s" % (svn_url_or_wc, str(max(rev_start, rev_end)))
196 args += ['--limit', str(limit), url]
197 xml_string = run_svn(svn_log_args + args)
198 return parse_svn_log_xml(xml_string)
199
200 def get_svn_status(svn_wc, quiet=False, no_recursive=False):
201 """
202 Get SVN status information about the given working copy.
203 """
204 # Ensure proper stripping by canonicalizing the path
205 svn_wc = os.path.abspath(svn_wc)
206 args = []
207 if quiet:
208 args += ['-q']
209 else:
210 args += ['-v']
211 if no_recursive:
212 args += ['-N']
213 xml_string = run_svn(svn_status_args + args + [svn_wc])
214 return parse_svn_status_xml(xml_string, svn_wc, ignore_externals=True)
215
216 def get_svn_versioned_files(svn_wc):
217 """
218 Get the list of versioned files in the SVN working copy.
219 """
220 contents = []
221 for e in get_svn_status(svn_wc):
222 if e['path'] and e['type'] == 'normal':
223 contents.append(e['path'])
224 return contents
225
226 def get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
227 """
228 Get the first SVN log entry in the requested revision range.
229 """
230 entries = run_svn_log(svn_url, rev_start, rev_end, 1, stop_on_copy, get_changed_paths, get_revprops)
231 if entries:
232 return entries[0]
233 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
234 (svn_url, rev_start, rev_end))
235
236 def get_first_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
237 """
238 Get the first log entry after (or at) the given revision number in an SVN branch.
239 By default the revision number is set to 0, which will give you the log
240 entry corresponding to the branch creaction.
241
242 NOTE: to know whether the branch creation corresponds to an SVN import or
243 a copy from another branch, inspect elements of the 'changed_paths' entry
244 in the returned dictionary.
245 """
246 return get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=True, get_changed_paths=True)
247
248 def get_last_svn_log_entry(svn_url, rev_start, rev_end, get_changed_paths=True):
249 """
250 Get the last log entry before/at the given revision number in an SVN branch.
251 By default the revision number is set to HEAD, which will give you the log
252 entry corresponding to the latest commit in branch.
253 """
254 return get_one_svn_log_entry(svn_url, rev_end, rev_start, stop_on_copy=True, get_changed_paths=True)
255
256
257 log_duration_threshold = 10.0
258 log_min_chunk_length = 10
259
260 def iter_svn_log_entries(svn_url, first_rev, last_rev, stop_on_copy=False, get_changed_paths=True, get_revprops=False):
261 """
262 Iterate over SVN log entries between first_rev and last_rev.
263
264 This function features chunked log fetching so that it isn't too nasty
265 to the SVN server if many entries are requested.
266 """
267 cur_rev = first_rev
268 chunk_length = log_min_chunk_length
269 first_run = True
270 while last_rev == "HEAD" or cur_rev <= last_rev:
271 start_t = time.time()
272 stop_rev = min(last_rev, cur_rev + chunk_length)
273 entries = run_svn_log(svn_url, cur_rev, "HEAD", chunk_length,
274 stop_on_copy, get_changed_paths, get_revprops)
275 duration = time.time() - start_t
276 if not first_run:
277 # skip first revision on subsequent runs, as it is overlapped
278 entries.pop(0)
279 first_run = False
280 if not entries:
281 break
282 for e in entries:
283 if e['revision'] > last_rev:
284 break
285 yield e
286 if e['revision'] >= last_rev:
287 break
288 cur_rev = e['revision']
289 # Adapt chunk length based on measured request duration
290 if duration < log_duration_threshold:
291 chunk_length = int(chunk_length * 2.0)
292 elif duration > log_duration_threshold * 2:
293 chunk_length = max(log_min_chunk_length, int(chunk_length / 2.0))
294
295
296 _svn_client_version = None
297
298 def get_svn_client_version():
299 """Returns the SVN client version as a tuple.
300
301 The returned tuple only contains numbers, non-digits in version string are
302 silently ignored.
303 """
304 global _svn_client_version
305 if _svn_client_version is None:
306 raw = run_svn(['--version', '-q']).strip()
307 _svn_client_version = tuple(map(int, [x for x in raw.split('.')
308 if x.isdigit()]))
309 return _svn_client_version