]> Tony Duckles's Git Repositories (git.nynim.org) - svn2svn.git/blob - svn2svn/svnclient.py
Copy shared hgsvn code: https://bitbucket.org/andialbrecht/hgsvn @ 528dea531a2e
[svn2svn.git] / svn2svn / svnclient.py
1
2 from hgsvn import ui
3 from hgsvn.common import (run_svn, once_or_more)
4 from hgsvn.errors import EmptySVNLog
5
6 import os
7 import time
8 import calendar
9 import operator
10
11 try:
12 from xml.etree import cElementTree as ET
13 except ImportError:
14 try:
15 from xml.etree import ElementTree as ET
16 except ImportError:
17 try:
18 import cElementTree as ET
19 except ImportError:
20 from elementtree import ElementTree as ET
21
22
23 svn_log_args = ['log', '--xml', '-v']
24 svn_info_args = ['info', '--xml']
25 svn_checkout_args = ['checkout', '-q']
26 svn_status_args = ['status', '--xml', '--ignore-externals']
27
28 _identity_table = "".join(map(chr, range(256)))
29 _forbidden_xml_chars = "".join(
30 set(map(chr, range(32))) - set('\x09\x0A\x0D')
31 )
32
33
34 def strip_forbidden_xml_chars(xml_string):
35 """
36 Given an XML string, strips forbidden characters as per the XML spec.
37 (these are all control characters except 0x9, 0xA and 0xD).
38 """
39 return xml_string.translate(_identity_table, _forbidden_xml_chars)
40
41
42 def svn_date_to_timestamp(svn_date):
43 """
44 Parse an SVN date as read from the XML output and return the corresponding
45 timestamp.
46 """
47 # Strip microseconds and timezone (always UTC, hopefully)
48 # XXX there are various ISO datetime parsing routines out there,
49 # cf. http://seehuhn.de/comp/pdate
50 date = svn_date.split('.', 2)[0]
51 time_tuple = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
52 return calendar.timegm(time_tuple)
53
54 def parse_svn_info_xml(xml_string):
55 """
56 Parse the XML output from an "svn info" command and extract useful information
57 as a dict.
58 """
59 d = {}
60 xml_string = strip_forbidden_xml_chars(xml_string)
61 tree = ET.fromstring(xml_string)
62 entry = tree.find('.//entry')
63 d['url'] = entry.find('url').text
64 d['revision'] = int(entry.get('revision'))
65 d['repos_url'] = tree.find('.//repository/root').text
66 d['last_changed_rev'] = int(tree.find('.//commit').get('revision'))
67 author_element = tree.find('.//commit/author')
68 if author_element is not None:
69 d['last_changed_author'] = author_element.text
70 d['last_changed_date'] = svn_date_to_timestamp(tree.find('.//commit/date').text)
71 return d
72
73 def parse_svn_log_xml(xml_string):
74 """
75 Parse the XML output from an "svn log" command and extract useful information
76 as a list of dicts (one per log changeset).
77 """
78 l = []
79 xml_string = strip_forbidden_xml_chars(xml_string)
80 tree = ET.fromstring(xml_string)
81 for entry in tree.findall('logentry'):
82 d = {}
83 d['revision'] = int(entry.get('revision'))
84 # Some revisions don't have authors, most notably the first revision
85 # in a repository.
86 # logentry nodes targeting directories protected by path-based
87 # authentication have no child nodes at all. We return an entry
88 # in that case. Anyway, as it has no path entries, no further
89 # processing will be made.
90 author = entry.find('author')
91 date = entry.find('date')
92 msg = entry.find('msg')
93 # Issue 64 - modified to prevent crashes on svn log entries with "No author"
94 d['author'] = author is not None and author.text or "No author"
95 if date is not None:
96 d['date'] = svn_date_to_timestamp(date.text)
97 else:
98 d['date'] = None
99 d['message'] = msg is not None and msg.text or ""
100 paths = d['changed_paths'] = []
101 for path in entry.findall('.//path'):
102 copyfrom_rev = path.get('copyfrom-rev')
103 if copyfrom_rev:
104 copyfrom_rev = int(copyfrom_rev)
105 paths.append({
106 'path': path.text,
107 'action': path.get('action'),
108 'copyfrom_path': path.get('copyfrom-path'),
109 'copyfrom_revision': copyfrom_rev,
110 })
111 l.append(d)
112 return l
113
114 def parse_svn_status_xml(xml_string, base_dir=None, ignore_externals=False):
115 """
116 Parse the XML output from an "svn status" command and extract useful info
117 as a list of dicts (one per status entry).
118 """
119 if base_dir:
120 base_dir = os.path.normcase(base_dir)
121 l = []
122 xml_string = strip_forbidden_xml_chars(xml_string)
123 tree = ET.fromstring(xml_string)
124 for entry in tree.findall('.//entry'):
125 d = {}
126 path = entry.get('path')
127 if base_dir is not None:
128 assert os.path.normcase(path).startswith(base_dir)
129 path = path[len(base_dir):].lstrip('/\\')
130 d['path'] = path
131 wc_status = entry.find('wc-status')
132 if wc_status.get('item') == 'external':
133 if ignore_externals:
134 continue
135 d['type'] = 'external'
136 elif wc_status.get('revision') is not None:
137 d['type'] = 'normal'
138 else:
139 d['type'] = 'unversioned'
140 d['status'] = wc_status.get('item')
141 l.append(d)
142 return l
143
144 def get_svn_info(svn_url_or_wc, rev_number=None):
145 """
146 Get SVN information for the given URL or working copy, with an optionally
147 specified revision number.
148 Returns a dict as created by parse_svn_info_xml().
149 """
150 if rev_number is not None:
151 args = ['-r', rev_number]
152 else:
153 args = []
154 xml_string = run_svn(svn_info_args + args + [svn_url_or_wc],
155 fail_if_stderr=True)
156 return parse_svn_info_xml(xml_string)
157
158 def svn_checkout(svn_url, checkout_dir, rev_number=None):
159 """
160 Checkout the given URL at an optional revision number.
161 """
162 args = []
163 if rev_number is not None:
164 args += ['-r', rev_number]
165 args += [svn_url, checkout_dir]
166 return run_svn(svn_checkout_args + args)
167
168 def run_svn_log(svn_url, rev_start, rev_end, limit, stop_on_copy=False):
169 """
170 Fetch up to 'limit' SVN log entries between the given revisions.
171 """
172 if stop_on_copy:
173 args = ['--stop-on-copy']
174 else:
175 args = []
176 args += ['-r', '%s:%s' % (rev_start, rev_end), '--limit', limit, svn_url]
177 xml_string = run_svn(svn_log_args + args)
178 return parse_svn_log_xml(xml_string)
179
180 def get_svn_status(svn_wc, quiet=False):
181 """
182 Get SVN status information about the given working copy.
183 """
184 # Ensure proper stripping by canonicalizing the path
185 svn_wc = os.path.abspath(svn_wc)
186 args = [svn_wc]
187 if quiet:
188 args += ['-q']
189 else:
190 args += ['-v']
191 xml_string = run_svn(svn_status_args + args)
192 return parse_svn_status_xml(xml_string, svn_wc, ignore_externals=True)
193
194 def get_svn_versioned_files(svn_wc):
195 """
196 Get the list of versioned files in the SVN working copy.
197 """
198 contents = []
199 for e in get_svn_status(svn_wc):
200 if e['path'] and e['type'] == 'normal':
201 contents.append(e['path'])
202 return contents
203
204
205 def get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=False):
206 """
207 Get the first SVN log entry in the requested revision range.
208 """
209 entries = run_svn_log(svn_url, rev_start, rev_end, 1, stop_on_copy)
210 if entries:
211 return entries[0]
212 raise EmptySVNLog("No SVN log for %s between revisions %s and %s" %
213 (svn_url, rev_start, rev_end))
214
215
216 def get_first_svn_log_entry(svn_url, rev_start, rev_end):
217 """
218 Get the first log entry after (or at) the given revision number in an SVN branch.
219 By default the revision number is set to 0, which will give you the log
220 entry corresponding to the branch creaction.
221
222 NOTE: to know whether the branch creation corresponds to an SVN import or
223 a copy from another branch, inspect elements of the 'changed_paths' entry
224 in the returned dictionary.
225 """
226 return get_one_svn_log_entry(svn_url, rev_start, rev_end, stop_on_copy=True)
227
228 def get_last_svn_log_entry(svn_url, rev_start, rev_end):
229 """
230 Get the last log entry before (or at) the given revision number in an SVN branch.
231 By default the revision number is set to HEAD, which will give you the log
232 entry corresponding to the latest commit in branch.
233 """
234 return get_one_svn_log_entry(svn_url, rev_end, rev_start, stop_on_copy=True)
235
236
237 log_duration_threshold = 10.0
238 log_min_chunk_length = 10
239
240 def iter_svn_log_entries(svn_url, first_rev, last_rev, retry):
241 """
242 Iterate over SVN log entries between first_rev and last_rev.
243
244 This function features chunked log fetching so that it isn't too nasty
245 to the SVN server if many entries are requested.
246 """
247 cur_rev = first_rev
248 chunk_length = log_min_chunk_length
249 first_run = True
250 while last_rev == "HEAD" or cur_rev <= last_rev:
251 start_t = time.time()
252 stop_rev = min(last_rev, cur_rev + chunk_length)
253 ui.status("Fetching %s SVN log entries starting from revision %d...",
254 chunk_length, cur_rev, level=ui.VERBOSE)
255 entries = once_or_more("Fetching SVN log", retry, run_svn_log, svn_url,
256 cur_rev, "HEAD", chunk_length)
257 duration = time.time() - start_t
258 if not first_run:
259 # skip first revision on subsequent runs, as it is overlapped
260 entries.pop(0)
261 first_run = False
262 if not entries:
263 break
264 for e in entries:
265 if e['revision'] > last_rev:
266 break
267 yield e
268 if e['revision'] >= last_rev:
269 break
270 cur_rev = e['revision']
271 # Adapt chunk length based on measured request duration
272 if duration < log_duration_threshold:
273 chunk_length = int(chunk_length * 2.0)
274 elif duration > log_duration_threshold * 2:
275 chunk_length = max(log_min_chunk_length, int(chunk_length / 2.0))
276
277
278 _svn_client_version = None
279
280 def get_svn_client_version():
281 """Returns the SVN client version as a tuple.
282
283 The returned tuple only contains numbers, non-digits in version string are
284 silently ignored.
285 """
286 global _svn_client_version
287 if _svn_client_version is None:
288 raw = run_svn(['--version', '-q']).strip()
289 _svn_client_version = tuple(map(int, [x for x in raw.split('.')
290 if x.isdigit()]))
291 return _svn_client_version