root/trunk/_test_ftputil.py

Revision 766, 12.7 kB (checked in by schwa, 3 weeks ago)
Added a comment on subclass relationship `PermanentError` /
`CommandNotImplementedError`.
  • Property svn:mime-type set to text/x-python
  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1 # Copyright (C) 2002-2008, Stefan Schwarzer
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # - Redistributions of source code must retain the above copyright
9 #   notice, this list of conditions and the following disclaimer.
10 #
11 # - Redistributions in binary form must reproduce the above copyright
12 #   notice, this list of conditions and the following disclaimer in the
13 #   documentation and/or other materials provided with the distribution.
14 #
15 # - Neither the name of the above author nor the names of the
16 #   contributors to the software may be used to endorse or promote
17 #   products derived from this software without specific prior written
18 #   permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
24 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32 # $Id$
33
34 import ftplib
35 import os
36 import posixpath
37 import random
38 import time
39 import unittest
40
41 import _mock_ftplib
42 import _test_base
43 import ftp_error
44 import ftp_stat
45 import ftputil
46
47
48 #
49 # helper functions to generate random data
50 #
51 def random_data(pool, size=10000):
52     """
53     Return a sequence of characters consisting of those from
54     the pool of integer numbers.
55     """
56     character_list = []
57     for i in range(size):
58         ordinal = random.choice(pool)
59         character_list.append(chr(ordinal))
60     result = ''.join(character_list)
61     return result
62
63 def ascii_data():
64     """Return an ASCII character string."""
65     pool = range(32, 128)
66     pool.append(ord('\n'))
67     return random_data(pool)
68
69 def binary_data():
70     """Return a binary character string."""
71     pool = range(0, 256)
72     return random_data(pool)
73
74
75 #
76 # several customized `MockSession` classes
77 #
78 class FailOnLoginSession(_mock_ftplib.MockSession):
79     def __init__(self, host='', user='', password=''):
80         raise ftplib.error_perm
81
82 class BinaryDownloadMockSession(_mock_ftplib.MockSession):
83     mock_file_content = binary_data()
84
85 class TimeShiftMockSession(_mock_ftplib.MockSession):
86     def delete(self, file_name):
87         pass
88
89 #
90 # customized `FTPHost` class for conditional upload/download tests
91 #  and time shift tests
92 #
93 class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
94     def upload(self, source, target, mode=''):
95         assert False, "`FTPHost.upload` should not have been called"
96
97     def download(self, source, target, mode=''):
98         assert False, "`FTPHost.download` should not have been called"
99
100 class TimeShiftFTPHost(ftputil.FTPHost):
101     class _Path:
102         def split(self, path):
103             return posixpath.split(path)
104         def set_mtime(self, mtime):
105             self._mtime = mtime
106         def getmtime(self, file_name):
107             return self._mtime
108         def abspath(self, path):
109             return "/home/sschwarzer/_ftputil_sync_"
110         # needed for `isdir` in `FTPHost.remove`
111         def isfile(self, path):
112             return True
113
114     def __init__(self, *args, **kwargs):
115         ftputil.FTPHost.__init__(self, *args, **kwargs)
116         self.path = self._Path()
117
118 #
119 # test cases
120 #
121 class TestOpenAndClose(unittest.TestCase):
122     """Test opening and closing of `FTPHost` objects."""
123     def test_open_and_close(self):
124         """Test closing of `FTPHost`."""
125         host = _test_base.ftp_host_factory()
126         host.close()
127         self.assertEqual(host.closed, True)
128         self.assertEqual(host._children, [])
129
130
131 class TestLogin(unittest.TestCase):
132     def test_invalid_login(self):
133         """Login to invalid host must fail."""
134         self.assertRaises(ftp_error.FTPOSError, _test_base.ftp_host_factory,
135                           FailOnLoginSession)
136
137
138 class TestSetParser(unittest.TestCase):
139     def test_set_parser(self):
140         """Test if the selected parser is used."""
141         # this test isn't very practical but should help at least a bit ...
142         host = _test_base.ftp_host_factory()
143         # implicitly fix at Unix format
144         files = host.listdir("/home/sschwarzer")
145         self.assertEqual(files, ['chemeng', 'download', 'image', 'index.html',
146           'os2', 'osup', 'publications', 'python', 'scios2'])
147         host.set_parser(ftp_stat.MSParser())
148         files = host.listdir("/home/msformat/XPLaunch")
149         self.assertEqual(files, ['WindowsXP', 'XPLaunch', 'empty',
150                                  'abcd.exe', 'O2KKeys.exe'])
151         self.assertEqual(host._stat._allow_parser_switching, False)
152
153
154 class TestCommandNotImplementedError(unittest.TestCase):
155     def test_command_not_implemented_error(self):
156         """
157         Test if we get the anticipated exception if a command isn't
158         implemented by the server.
159         """
160         host = _test_base.ftp_host_factory()
161         self.assertRaises(ftp_error.PermanentError,
162                           host.chmod, "nonexistent", 0644)
163         # `CommandNotImplementedError` is a subclass of `PermanentError`
164         self.assertRaises(ftp_error.CommandNotImplementedError,
165                           host.chmod, "nonexistent", 0644)
166
167
168 class TestUploadAndDownload(unittest.TestCase):
169     """Test ASCII upload and binary download as examples."""
170
171     def generate_ascii_file(self, data, filename):
172         """Generate an ASCII data file."""
173         source_file = open(filename, 'w')
174         source_file.write(data)
175         source_file.close()
176
177     def test_ascii_upload(self):
178         """Test ASCII mode upload."""
179         local_source = '__test_source'
180         data = ascii_data()
181         self.generate_ascii_file(data, local_source)
182         # upload
183         host = _test_base.ftp_host_factory()
184         host.upload(local_source, 'dummy')
185         # check uploaded content
186         # the data which was uploaded has its line endings converted
187         #  so the conversion must also be applied to `data`
188         data = data.replace('\n', '\r\n')
189         remote_file_content = _mock_ftplib.content_of('dummy')
190         self.assertEqual(data, remote_file_content)
191         # clean up
192         os.unlink(local_source)
193
194     def test_binary_download(self):
195         """Test binary mode download."""
196         local_target = '__test_target'
197         host = _test_base.ftp_host_factory(
198                session_factory=BinaryDownloadMockSession)
199         # download
200         host.download('dummy', local_target, 'b')
201         # read file and compare
202         data = open(local_target, 'rb').read()
203         remote_file_content = _mock_ftplib.content_of('dummy')
204         self.assertEqual(data, remote_file_content)
205         # clean up
206         os.unlink(local_target)
207
208     def test_conditional_upload(self):
209         """Test conditional ASCII mode upload."""
210         local_source = '__test_source'
211         data = ascii_data()
212         self.generate_ascii_file(data, local_source)
213         # target is newer, so don't upload
214         host = _test_base.ftp_host_factory(
215                ftp_host_class=FailingUploadAndDownloadFTPHost)
216         flag = host.upload_if_newer(local_source, '/home/newer')
217         self.assertEqual(flag, False)
218         # target is older, so upload
219         host = _test_base.ftp_host_factory()
220         flag = host.upload_if_newer(local_source, '/home/older')
221         self.assertEqual(flag, True)
222         # check uploaded content
223         # the data which was uploaded has its line endings converted
224         #  so the conversion must also be applied to 'data'
225         data = data.replace('\n', '\r\n')
226         remote_file_content = _mock_ftplib.content_of('older')
227         self.assertEqual(data, remote_file_content)
228         # target doesn't exist, so upload
229         host = _test_base.ftp_host_factory()
230         flag = host.upload_if_newer(local_source, '/home/notthere')
231         self.assertEqual(flag, True)
232         remote_file_content = _mock_ftplib.content_of('notthere')
233         self.assertEqual(data, remote_file_content)
234         # clean up
235         os.unlink(local_source)
236
237     def compare_and_delete_downloaded_data(self, filename):
238         """Compare content of downloaded file with its source, then
239         delete the local target file."""
240         data = open(filename, 'rb').read()
241         remote_file_content = _mock_ftplib.content_of('newer')
242         self.assertEqual(data, remote_file_content)
243         # clean up
244         os.unlink(filename)
245
246     def test_conditional_download_without_target(self):
247         "Test conditional binary mode download when no target file exists."
248         local_target = '__test_target'
249         # target does not exist, so download
250         host = _test_base.ftp_host_factory(
251                session_factory=BinaryDownloadMockSession)
252         flag = host.download_if_newer('/home/newer', local_target, 'b')
253         self.assertEqual(flag, True)
254         self.compare_and_delete_downloaded_data(local_target)
255
256     def test_conditional_download_with_older_target(self):
257         """Test conditional binary mode download with newer source file."""
258         local_target = '__test_target'
259         # make target file
260         open(local_target, 'w').close()
261         # source is newer, so download
262         host = _test_base.ftp_host_factory(
263                session_factory=BinaryDownloadMockSession)
264         flag = host.download_if_newer('/home/newer', local_target, 'b')
265         self.assertEqual(flag, True)
266         self.compare_and_delete_downloaded_data(local_target)
267
268     def test_conditional_download_with_newer_target(self):
269         """Test conditional binary mode download with older source file."""
270         local_target = '__test_target'
271         # make target file
272         open(local_target, 'w').close()
273         # source is older, so don't download
274         host = _test_base.ftp_host_factory(
275                session_factory=BinaryDownloadMockSession)
276         host = _test_base.ftp_host_factory(
277                ftp_host_class=FailingUploadAndDownloadFTPHost,
278                session_factory=BinaryDownloadMockSession)
279         flag = host.download_if_newer('/home/older', local_target, 'b')
280         self.assertEqual(flag, False)
281         # remove target file
282         os.unlink(local_target)
283
284
285 class TestTimeShift(unittest.TestCase):
286     def test_rounded_time_shift(self):
287         """Test if time shift is rounded correctly."""
288         host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
289         # use private bound method
290         rounded_time_shift = host._FTPHost__rounded_time_shift
291         # original value, expected result
292         test_data = [
293           (0, 0), (0.1, 0), (-0.1, 0), (1500, 0), (-1500, 0),
294           (1800, 3600), (-1800, -3600), (2000, 3600), (-2000, -3600),
295           (5*3600-100, 5*3600), (-5*3600+100, -5*3600)]
296         for time_shift, expected_time_shift in test_data:
297             calculated_time_shift = rounded_time_shift(time_shift)
298             self.assertEqual(calculated_time_shift, expected_time_shift)
299
300     def test_assert_valid_time_shift(self):
301         """Test time shift sanity checks."""
302         host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
303         # use private bound method
304         assert_time_shift = host._FTPHost__assert_valid_time_shift
305         # valid time shifts
306         test_data = [23*3600, -23*3600, 3600+30, -3600+30]
307         for time_shift in test_data:
308             self.failUnless(assert_time_shift(time_shift) is None)
309         # invalid time shift (exceeds one day)
310         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 25*3600)
311         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, -25*3600)
312         # invalid time shift (deviation from full hours unacceptable)
313         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 10*60)
314         self.assertRaises(ftp_error.TimeShiftError, assert_time_shift,
315                           -3600-10*60)
316
317     def test_synchronize_times(self):
318         """Test time synchronization with server."""
319         host = _test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
320                session_factory=TimeShiftMockSession)
321         # valid time shift
322         host.path.set_mtime(time.time() + 3630)
323         host.synchronize_times()
324         self.assertEqual(host.time_shift(), 3600)
325         # invalid time shift
326         host.path.set_mtime(time.time() + 3600+10*60)
327         self.assertRaises(ftp_error.TimeShiftError, host.synchronize_times)
328
329
330 if __name__ == '__main__':
331     unittest.main()
332
Note: See TracBrowser for help on using the browser.