root/tags/release2_2/_test_ftputil.py

Revision 640, 20.6 kB (checked in by schwa, 2 years ago)
Made parsers public, i. e. removed the underscore in front of
`_UnixParser` and `_MSParser`.
  • Property svn:mime-type set to text/x-python
  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1 # Copyright (C) 2002-2006, Stefan Schwarzer
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # - Redistributions of source code must retain the above copyright
9 #   notice, this list of conditions and the following disclaimer.
10 #
11 # - Redistributions in binary form must reproduce the above copyright
12 #   notice, this list of conditions and the following disclaimer in the
13 #   documentation and/or other materials provided with the distribution.
14 #
15 # - Neither the name of the above author nor the names of the
16 #   contributors to the software may be used to endorse or promote
17 #   products derived from this software without specific prior written
18 #   permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
24 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32 # $Id$
33
34 import ftplib
35 import operator
36 import os
37 import random
38 import stat
39 import time
40 import unittest
41
42 import _mock_ftplib
43 import _test_base
44 import ftp_error
45 import ftp_file
46 import ftp_stat
47 import ftputil
48
49 #
50 # helper functions to generate random data
51 #
52 def random_data(pool, size=10000):
53     """
54     Return a sequence of characters consisting of those from
55     the pool of integer numbers.
56     """
57     character_list = []
58     for i in range(size):
59         ordinal = random.choice(pool)
60         character_list.append(chr(ordinal))
61     result = ''.join(character_list)
62     return result
63
64 def ascii_data():
65     """Return an ASCII character string."""
66     pool = range(32, 128)
67     pool.append(ord('\n'))
68     return random_data(pool)
69
70 def binary_data():
71     """Return a binary character string."""
72     pool = range(0, 256)
73     return random_data(pool)
74
75
76 #
77 # several customized `MockSession` classes
78 #
79 class FailOnLoginSession(_mock_ftplib.MockSession):
80     def __init__(self, host='', user='', password=''):
81         raise ftplib.error_perm
82
83 class ReadMockSession(_mock_ftplib.MockSession):
84     mock_file_content = 'line 1\r\nanother line\r\nyet another line'
85
86 class AsciiReadMockSession(_mock_ftplib.MockSession):
87     mock_file_content = '\r\n'.join(map(str, range(20)))
88
89 class BinaryDownloadMockSession(_mock_ftplib.MockSession):
90     mock_file_content = binary_data()
91
92 class TimeShiftMockSession(_mock_ftplib.MockSession):
93     def delete(self, file_name):
94         pass
95
96 class InaccessibleDirSession(_mock_ftplib.MockSession):
97     _login_dir = '/inaccessible'
98
99     def pwd(self):
100         return self._login_dir
101
102     def cwd(self, dir):
103         if dir in (self._login_dir, self._login_dir + '/'):
104             raise ftplib.error_perm
105         else:
106             _mock_ftplib.MockSession.cwd(self, dir)
107
108 #
109 # customized `FTPHost` class for conditional upload/download tests
110 #  and time shift tests
111 #
112 class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
113     def upload(self, source, target, mode=''):
114         assert False, "`FTPHost.upload` should not have been called"
115
116     def download(self, source, target, mode=''):
117         assert False, "`FTPHost.download` should not have been called"
118
119 class TimeShiftFTPHost(ftputil.FTPHost):
120     class _Path:
121         def set_mtime(self, mtime):
122             self._mtime = mtime
123         def getmtime(self, file_name):
124             return self._mtime
125         def abspath(self, path):
126             return "/home/sschwarzer/_ftputil_sync_"
127         # needed for `isdir` in `FTPHost.remove`
128         def isfile(self, path):
129             return True
130
131     def __init__(self, *args, **kwargs):
132         ftputil.FTPHost.__init__(self, *args, **kwargs)
133         self.path = self._Path()
134
135 #
136 # test cases
137 #
138 class TestOpenAndClose(unittest.TestCase):
139     """Test opening and closing of `FTPHost` objects."""
140     def test_open_and_close(self):
141         """Test closing of `FTPHost`."""
142         host = _test_base.ftp_host_factory()
143         host.close()
144         self.assertEqual(host.closed, 1)
145         self.assertEqual(host._children, [])
146
147
148 class TestLogin(unittest.TestCase):
149     def test_invalid_login(self):
150         """Login to invalid host must fail."""
151         self.assertRaises(ftp_error.FTPOSError, _test_base.ftp_host_factory,
152                           FailOnLoginSession)
153
154
155 class TestSetParser(unittest.TestCase):
156     def test_set_parser(self):
157         """Test if the selected parser is used."""
158         # this test isn't very practical but should help at least a bit ...
159         host = _test_base.ftp_host_factory()
160         # implicitly fix at Unix format
161         files = host.listdir("/home/sschwarzer")
162         self.assertEqual(files, ['chemeng', 'download', 'image', 'index.html',
163           'os2', 'osup', 'publications', 'python', 'scios2'])
164         host.set_parser(ftp_stat.MSParser())
165         files = host.listdir("/home/msformat/XPLaunch")
166         self.assertEqual(files, ['WindowsXP', 'XPLaunch', 'empty',
167           'abcd.exe', 'O2KKeys.exe'])
168         self.assertEqual(host._stat._allow_parser_switching, False)
169
170
171 class TestFileOperations(unittest.TestCase):
172     """Test operations with file-like objects."""
173     def test_inaccessible_dir(self):
174         """Test whether opening a file at an invalid location fails."""
175         host = _test_base.ftp_host_factory(
176                session_factory=InaccessibleDirSession)
177         self.assertRaises(ftp_error.FTPIOError, host.file,
178                           '/inaccessible/new_file', 'w')
179
180     def test_caching(self):
181         """Test whether `_FTPFile` cache of `FTPHost` object works."""
182         host = _test_base.ftp_host_factory()
183         self.assertEqual(len(host._children), 0)
184         path1 = 'path1'
185         path2 = 'path2'
186         # open one file and inspect cache
187         file1 = host.file(path1, 'w')
188         child1 = host._children[0]
189         self.assertEqual(len(host._children), 1)
190         self.failIf(child1._file.closed)
191         # open another file
192         file2 = host.file(path2, 'w')
193         child2 = host._children[1]
194         self.assertEqual(len(host._children), 2)
195         self.failIf(child2._file.closed)
196         # close first file
197         file1.close()
198         self.assertEqual(len(host._children), 2)
199         self.failUnless(child1._file.closed)
200         self.failIf(child2._file.closed)
201         # re-open first child's file
202         file1 = host.file(path1, 'w')
203         child1_1 = file1._host
204         # check if it's reused
205         self.failUnless(child1 is child1_1)
206         self.failIf(child1._file.closed)
207         self.failIf(child2._file.closed)
208         # close second file
209         file2.close()
210         self.failUnless(child2._file.closed)
211
212     def test_write_to_directory(self):
213         """Test whether attempting to write to a directory fails."""
214         host = _test_base.ftp_host_factory()
215         self.assertRaises(ftp_error.FTPIOError, host.file,
216                           '/home/sschwarzer', 'w')
217
218     def test_binary_write(self):
219         """Write binary data with `write`."""
220         host = _test_base.ftp_host_factory()
221         data = '\000a\001b\r\n\002c\003\n\004\r\005'
222         output = host.file('dummy', 'wb')
223         output.write(data)
224         output.close()
225         child_data = _mock_ftplib.content_of('dummy')
226         expected_data = data
227         self.assertEqual(child_data, expected_data)
228
229     def test_ascii_write(self):
230         """Write ASCII text with `write`."""
231         host = _test_base.ftp_host_factory()
232         data = ' \nline 2\nline 3'
233         output = host.file('dummy', 'w')
234         output.write(data)
235         output.close()
236         child_data = _mock_ftplib.content_of('dummy')
237         expected_data = ' \r\nline 2\r\nline 3'
238         self.assertEqual(child_data, expected_data)
239
240     def test_ascii_writelines(self):
241         """Write ASCII text with `writelines`."""
242         host = _test_base.ftp_host_factory()
243         data = [' \n', 'line 2\n', 'line 3']
244         backup_data = data[:]
245         output = host.file('dummy', 'w')
246         output.writelines(data)
247         output.close()
248         child_data = _mock_ftplib.content_of('dummy')
249         expected_data = ' \r\nline 2\r\nline 3'
250         self.assertEqual(child_data, expected_data)
251         # ensure that the original data was not modified
252         self.assertEqual(data, backup_data)
253
254     def test_ascii_read(self):
255         """Read ASCII text with plain `read`."""
256         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
257         input_ = host.file('dummy', 'r')
258         data = input_.read(0)
259         self.assertEqual(data, '')
260         data = input_.read(3)
261         self.assertEqual(data, 'lin')
262         data = input_.read(7)
263         self.assertEqual(data, 'e 1\nano')
264         data = input_.read()
265         self.assertEqual(data, 'ther line\nyet another line')
266         data = input_.read()
267         self.assertEqual(data, '')
268         input_.close()
269         # try it again with a more "problematic" string which
270         #  makes several reads in the `read` method necessary
271         host = _test_base.ftp_host_factory(session_factory=AsciiReadMockSession)
272         expected_data = AsciiReadMockSession.mock_file_content.\
273                         replace('\r\n', '\n')
274         input_ = host.file('dummy', 'r')
275         data = input_.read(len(expected_data))
276         self.assertEqual(data, expected_data)
277
278     def test_binary_readline(self):
279         """Read binary data with `readline`."""
280         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
281         input_ = host.file('dummy', 'rb')
282         data = input_.readline(3)
283         self.assertEqual(data, 'lin')
284         data = input_.readline(10)
285         self.assertEqual(data, 'e 1\r\n')
286         data = input_.readline(13)
287         self.assertEqual(data, 'another line\r')
288         data = input_.readline()
289         self.assertEqual(data, '\n')
290         data = input_.readline()
291         self.assertEqual(data, 'yet another line')
292         data = input_.readline()
293         self.assertEqual(data, '')
294         input_.close()
295
296     def test_ascii_readline(self):
297         """Read ASCII text with `readline`."""
298         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
299         input_ = host.file('dummy', 'r')
300         data = input_.readline(3)
301         self.assertEqual(data, 'lin')
302         data = input_.readline(10)
303         self.assertEqual(data, 'e 1\n')
304         data = input_.readline(13)
305         self.assertEqual(data, 'another line\n')
306         data = input_.readline()
307         self.assertEqual(data, 'yet another line')
308         data = input_.readline()
309         self.assertEqual(data, '')
310         input_.close()
311
312     def test_ascii_readlines(self):
313         """Read ASCII text with `readlines`."""
314         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
315         input_ = host.file('dummy', 'r')
316         data = input_.read(3)
317         self.assertEqual(data, 'lin')
318         data = input_.readlines()
319         self.assertEqual(data, ['e 1\n', 'another line\n',
320                                 'yet another line'])
321         input_.close()
322
323     def test_ascii_xreadlines(self):
324         """Read ASCII text with `xreadlines`."""
325         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
326         # open file, skip some bytes
327         input_ = host.file('dummy', 'r')
328         data = input_.read(3)
329         xrl_obj = input_.xreadlines()
330         self.failUnless(xrl_obj.__class__ is ftp_file._XReadlines)
331         self.failUnless(xrl_obj._ftp_file.__class__ is ftp_file._FTPFile)
332         data = xrl_obj[0]
333         self.assertEqual(data, 'e 1\n')
334         # try to skip an index
335         self.assertRaises(RuntimeError, operator.__getitem__, xrl_obj, 2)
336         # continue reading
337         data = xrl_obj[1]
338         self.assertEqual(data, 'another line\n')
339         data = xrl_obj[2]
340         self.assertEqual(data, 'yet another line')
341         # try to read beyond EOF
342         self.assertRaises(IndexError, operator.__getitem__, xrl_obj, 3)
343
344     def test_binary_iterator(self):
345         """Test the iterator interface of `FTPFile` objects."""
346         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
347         input_ = host.file('dummy')
348         input_iterator = iter(input_)
349         self.assertEqual(input_iterator.next(), "line 1\n")
350         self.assertEqual(input_iterator.next(), "another line\n")
351         self.assertEqual(input_iterator.next(), "yet another line")
352         self.assertRaises(StopIteration, input_iterator.next)
353         input_.close()
354
355     def test_ascii_iterator(self):
356         """Test the iterator interface of `FTPFile` objects."""
357         host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
358         input_ = host.file('dummy', 'rb')
359         input_iterator = iter(input_)
360         self.assertEqual(input_iterator.next(), "line 1\r\n")
361         self.assertEqual(input_iterator.next(), "another line\r\n")
362         self.assertEqual(input_iterator.next(), "yet another line")
363         self.assertRaises(StopIteration, input_iterator.next)
364         input_.close()
365
366     def test_read_unknown_file(self):
367         """Test whether reading a file which isn't there fails."""
368         host = _test_base.ftp_host_factory()
369         self.assertRaises(ftp_error.FTPIOError, host.file, 'notthere', 'r')
370
371
372 class TestUploadAndDownload(unittest.TestCase):
373     """Test ASCII upload and binary download as examples."""
374     def generate_ascii_file(self, data, filename):
375         """Generate an ASCII data file."""
376         source_file = open(filename, 'w')
377         source_file.write(data)
378         source_file.close()
379
380     def test_ascii_upload(self):
381         """Test ASCII mode upload."""
382         local_source = '__test_source'
383         data = ascii_data()
384         self.generate_ascii_file(data, local_source)
385         # upload
386         host = _test_base.ftp_host_factory()
387         host.upload(local_source, 'dummy')
388         # check uploaded content
389         # the data which was uploaded has its line endings converted
390         #  so the conversion must also be applied to `data`
391         data = data.replace('\n', '\r\n')
392         remote_file_content = _mock_ftplib.content_of('dummy')
393         self.assertEqual(data, remote_file_content)
394         # clean up
395         os.unlink(local_source)
396
397     def test_binary_download(self):
398         """Test binary mode download."""
399         local_target = '__test_target'
400         host = _test_base.ftp_host_factory(
401                session_factory=BinaryDownloadMockSession)
402         # download
403         host.download('dummy', local_target, 'b')
404         # read file and compare
405         data = open(local_target, 'rb').read()
406         remote_file_content = _mock_ftplib.content_of('dummy')
407         self.assertEqual(data, remote_file_content)
408         # clean up
409         os.unlink(local_target)
410
411     def test_conditional_upload(self):
412         """Test conditional ASCII mode upload."""
413         local_source = '__test_source'
414         data = ascii_data()
415         self.generate_ascii_file(data, local_source)
416         # target is newer, so don't upload
417         host = _test_base.ftp_host_factory(
418                ftp_host_class=FailingUploadAndDownloadFTPHost)
419         flag = host.upload_if_newer(local_source, '/home/newer')
420         self.assertEqual(flag, False)
421         # target is older, so upload
422         host = _test_base.ftp_host_factory()
423         flag = host.upload_if_newer(local_source, '/home/older')
424         self.assertEqual(flag, True)
425         # check uploaded content
426         # the data which was uploaded has its line endings converted
427         #  so the conversion must also be applied to 'data'
428         data = data.replace('\n', '\r\n')
429         remote_file_content = _mock_ftplib.content_of('older')
430         self.assertEqual(data, remote_file_content)
431         # target doesn't exist, so upload
432         host = _test_base.ftp_host_factory()
433         flag = host.upload_if_newer(local_source, '/home/notthere')
434         self.assertEqual(flag, True)
435         remote_file_content = _mock_ftplib.content_of('notthere')
436         self.assertEqual(data, remote_file_content)
437         # clean up
438         os.unlink(local_source)
439
440     def compare_and_delete_downloaded_data(self, filename):
441         """Compare content of downloaded file with its source, then
442         delete the local target file."""
443         data = open(filename, 'rb').read()
444         remote_file_content = _mock_ftplib.content_of('newer')
445         self.assertEqual(data, remote_file_content)
446         # clean up
447         os.unlink(filename)
448
449     def test_conditional_download_without_target(self):
450         "Test conditional binary mode download when no target file exists."
451         local_target = '__test_target'
452         # target does not exist, so download
453         host = _test_base.ftp_host_factory(
454                session_factory=BinaryDownloadMockSession)
455         flag = host.download_if_newer('/home/newer', local_target, 'b')
456         self.assertEqual(flag, True)
457         self.compare_and_delete_downloaded_data(local_target)
458
459     def test_conditional_download_with_older_target(self):
460         """Test conditional binary mode download with newer source file."""
461         local_target = '__test_target'
462         # make target file
463         open(local_target, 'w').close()
464         # source is newer, so download
465         host = _test_base.ftp_host_factory(
466                session_factory=BinaryDownloadMockSession)
467         flag = host.download_if_newer('/home/newer', local_target, 'b')
468         self.assertEqual(flag, True)
469         self.compare_and_delete_downloaded_data(local_target)
470
471     def test_conditional_download_with_newer_target(self):
472         """Test conditional binary mode download with older source file."""
473         local_target = '__test_target'
474         # make target file
475         open(local_target, 'w').close()
476         # source is older, so don't download
477         host = _test_base.ftp_host_factory(
478                session_factory=BinaryDownloadMockSession)
479         host = _test_base.ftp_host_factory(
480                ftp_host_class=FailingUploadAndDownloadFTPHost,
481                session_factory=BinaryDownloadMockSession)
482         flag = host.download_if_newer('/home/older', local_target, 'b')
483         self.assertEqual(flag, False)
484         # remove target file
485         os.unlink(local_target)
486
487
488 class TestTimeShift(unittest.TestCase):
489     def test_rounded_time_shift(self):
490         """Test if time shift is rounded correctly."""
491         host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
492         # use private bound method
493         rounded_time_shift = host._FTPHost__rounded_time_shift
494         # original value, expected result
495         test_data = [
496           (0, 0), (0.1, 0), (-0.1, 0), (1500, 0), (-1500, 0),
497           (1800, 3600), (-1800, -3600), (2000, 3600), (-2000, -3600),
498           (5*3600-100, 5*3600), (-5*3600+100, -5*3600)]
499         for time_shift, expected_time_shift in test_data:
500             calculated_time_shift = rounded_time_shift(time_shift)
501             self.assertEqual(calculated_time_shift, expected_time_shift)
502
503     def test_assert_valid_time_shift(self):
504         """Test time shift sanity checks."""
505         host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
506         # use private bound method
507         assert_time_shift = host._FTPHost__assert_valid_time_shift
508         # valid time shifts
509         test_data = [23*3600, -23*3600, 3600+30, -3600+30]
510         for time_shift in test_data:
511             self.failUnless(assert_time_shift(time_shift) is None)
512         # invalid time shift (exceeds one day)
513         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 25*3600)
514         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, -25*3600)
515         # invalid time shift (deviation from full hours unacceptable)
516         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 10*60)
517         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift,
518                           -3600-10*60)
519
520     def test_synchronize_times(self):
521         """Test time synchronization with server."""
522         host = _test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
523                session_factory=TimeShiftMockSession)
524         # valid time shift
525         host.path.set_mtime(time.time() + 3630)
526         host.synchronize_times()
527         self.assertEqual(host.time_shift(), 3600)
528         # invalid time shift
529         host.path.set_mtime(time.time() + 3600+10*60)
530         self.assertRaises(ftp_error.TimeShiftError, host.synchronize_times)
531
532
533 if __name__ == '__main__':
534     unittest.main()
535
Note: See TracBrowser for help on using the browser.