root/_test_ftputil.py

Revision 869:808476873684, 13.4 kB (checked in by Stefan Schwarzer <sschwarzer@…>, 3 months ago)
Replaced licenses in each file with reference to common `LICENSE` file (suggested by Steve Steiner). I removed the copyright notice for Roger Demetrescu from `_test_ftputil.py` because I remembered only after the last commit that the tests for the `with` statement had gone into their own file, `_test_with_statement.py`. I added Roger's name there, and also in `ftp_file.py` which had also gotten `with` support.
Line 
1# Copyright (C) 2002-2009, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# See the file LICENSE for licensing terms.
3
4import ftplib
5import os
6import posixpath
7import random
8import time
9import unittest
10
11import _mock_ftplib
12import _test_base
13import ftp_error
14import ftp_stat
15import ftputil
16
17
18#
19# helper functions to generate random data
20#
21def random_data(pool, size=10000):
22    """
23    Return a sequence of characters consisting of those from
24    the pool of integer numbers.
25    """
26    character_list = []
27    for i in range(size):
28        ordinal = random.choice(pool)
29        character_list.append(chr(ordinal))
30    result = ''.join(character_list)
31    return result
32
33def ascii_data():
34    """Return an ASCII character string."""
35    pool = range(32, 128)
36    pool.append(ord('\n'))
37    return random_data(pool)
38
39def binary_data():
40    """Return a binary character string."""
41    pool = range(0, 256)
42    return random_data(pool)
43
44
45#
46# several customized `MockSession` classes
47#
48class FailOnLoginSession(_mock_ftplib.MockSession):
49    def __init__(self, host='', user='', password=''):
50        raise ftplib.error_perm
51
52class RecursiveListingForDotAsPathSession(_mock_ftplib.MockSession):
53    dir_contents = {
54      ".": """\
55lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
56
57dev:
58total 10
59
60etc:
61total 10
62
63pub:
64total 4
65-rw-r--r--   1 staff         74 Sep 25  2000 .message
66----------   1 staff          0 Aug 16  2003 .notar
67drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
68
69usr:
70total 4""",
71
72      "": """\
73total 10
74lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
75d--x--x--x   2 staff        512 Sep 24  2000 dev
76d--x--x--x   3 staff        512 Sep 25  2000 etc
77dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
78d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
79
80    def _transform_path(self, path):
81        return path
82
83class BinaryDownloadMockSession(_mock_ftplib.MockSession):
84    mock_file_content = binary_data()
85
86class TimeShiftMockSession(_mock_ftplib.MockSession):
87    def delete(self, file_name):
88        pass
89
90#
91# customized `FTPHost` class for conditional upload/download tests
92#  and time shift tests
93#
94class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
95    def upload(self, source, target, mode=''):
96        assert False, "`FTPHost.upload` should not have been called"
97
98    def download(self, source, target, mode=''):
99        assert False, "`FTPHost.download` should not have been called"
100
101class TimeShiftFTPHost(ftputil.FTPHost):
102    class _Path:
103        def split(self, path):
104            return posixpath.split(path)
105        def set_mtime(self, mtime):
106            self._mtime = mtime
107        def getmtime(self, file_name):
108            return self._mtime
109        def join(self, *args):
110            return posixpath.join(*args)
111        def normpath(self, path):
112            return posixpath.normpath(path)
113        def abspath(self, path):
114            return "/home/sschwarzer/_ftputil_sync_"
115        # needed for `isdir` in `FTPHost.remove`
116        def isfile(self, path):
117            return True
118
119    def __init__(self, *args, **kwargs):
120        ftputil.FTPHost.__init__(self, *args, **kwargs)
121        self.path = self._Path()
122
123#
124# test cases
125#
126class TestOpenAndClose(unittest.TestCase):
127    """Test opening and closing of `FTPHost` objects."""
128    def test_open_and_close(self):
129        """Test closing of `FTPHost`."""
130        host = _test_base.ftp_host_factory()
131        host.close()
132        self.assertEqual(host.closed, True)
133        self.assertEqual(host._children, [])
134
135
136class TestLogin(unittest.TestCase):
137    def test_invalid_login(self):
138        """Login to invalid host must fail."""
139        self.assertRaises(ftp_error.FTPOSError, _test_base.ftp_host_factory,
140                          FailOnLoginSession)
141
142
143class TestSetParser(unittest.TestCase):
144    def test_set_parser(self):
145        """Test if the selected parser is used."""
146        # this test isn't very practical but should help at least a bit ...
147        host = _test_base.ftp_host_factory()
148        # implicitly fix at Unix format
149        files = host.listdir("/home/sschwarzer")
150        self.assertEqual(files, ['chemeng', 'download', 'image', 'index.html',
151          'os2', 'osup', 'publications', 'python', 'scios2'])
152        host.set_parser(ftp_stat.MSParser())
153        files = host.listdir("/home/msformat/XPLaunch")
154        self.assertEqual(files, ['WindowsXP', 'XPLaunch', 'empty',
155                                 'abcd.exe', 'O2KKeys.exe'])
156        self.assertEqual(host._stat._allow_parser_switching, False)
157
158
159class TestCommandNotImplementedError(unittest.TestCase):
160    def test_command_not_implemented_error(self):
161        """
162        Test if we get the anticipated exception if a command isn't
163        implemented by the server.
164        """
165        host = _test_base.ftp_host_factory()
166        self.assertRaises(ftp_error.PermanentError,
167                          host.chmod, "nonexistent", 0644)
168        # `CommandNotImplementedError` is a subclass of `PermanentError`
169        self.assertRaises(ftp_error.CommandNotImplementedError,
170                          host.chmod, "nonexistent", 0644)
171
172
173class TestRecursiveListingForDotAsPath(unittest.TestCase):
174    """Return a recursive directory listing when the path to list
175    is a dot. This is used to test for issue #33, see
176    http://ftputil.sschwarzer.net/trac/ticket/33 .
177    """
178    def test_recursive_listing(self):
179        host = _test_base.ftp_host_factory(
180                 session_factory=RecursiveListingForDotAsPathSession)
181        lines = host._dir(host.curdir)
182        self.assertEqual(lines[0], "total 10")
183        self.failUnless(lines[1].startswith("lrwxrwxrwx   1 staff"))
184        self.failUnless(lines[2].startswith("d--x--x--x   2 staff"))
185        host.close()
186
187    def test_plain_listing(self):
188        host = _test_base.ftp_host_factory(
189                 session_factory=RecursiveListingForDotAsPathSession)
190        lines = host._dir("")
191        self.assertEqual(lines[0], "total 10")
192        self.failUnless(lines[1].startswith("lrwxrwxrwx   1 staff"))
193        self.failUnless(lines[2].startswith("d--x--x--x   2 staff"))
194        host.close()
195
196    def test_empty_string_instead_of_dot_workaround(self):
197        host = _test_base.ftp_host_factory(
198                 session_factory=RecursiveListingForDotAsPathSession)
199        files = host.listdir(host.curdir)
200        self.assertEqual(files, ['bin', 'dev', 'etc', 'pub', 'usr'])
201        host.close()
202
203
204class TestUploadAndDownload(unittest.TestCase):
205    """Test ASCII upload and binary download as examples."""
206
207    def generate_ascii_file(self, data, filename):
208        """Generate an ASCII data file."""
209        source_file = open(filename, 'w')
210        source_file.write(data)
211        source_file.close()
212
213    def test_ascii_upload(self):
214        """Test ASCII mode upload."""
215        local_source = '__test_source'
216        data = ascii_data()
217        self.generate_ascii_file(data, local_source)
218        # upload
219        host = _test_base.ftp_host_factory()
220        host.upload(local_source, 'dummy')
221        # check uploaded content
222        # the data which was uploaded has its line endings converted
223        #  so the conversion must also be applied to `data`
224        data = data.replace('\n', '\r\n')
225        remote_file_content = _mock_ftplib.content_of('dummy')
226        self.assertEqual(data, remote_file_content)
227        # clean up
228        os.unlink(local_source)
229
230    def test_binary_download(self):
231        """Test binary mode download."""
232        local_target = '__test_target'
233        host = _test_base.ftp_host_factory(
234               session_factory=BinaryDownloadMockSession)
235        # download
236        host.download('dummy', local_target, 'b')
237        # read file and compare
238        data = open(local_target, 'rb').read()
239        remote_file_content = _mock_ftplib.content_of('dummy')
240        self.assertEqual(data, remote_file_content)
241        # clean up
242        os.unlink(local_target)
243
244    def test_conditional_upload(self):
245        """Test conditional ASCII mode upload."""
246        local_source = '__test_source'
247        data = ascii_data()
248        self.generate_ascii_file(data, local_source)
249        # target is newer, so don't upload
250        host = _test_base.ftp_host_factory(
251               ftp_host_class=FailingUploadAndDownloadFTPHost)
252        flag = host.upload_if_newer(local_source, '/home/newer')
253        self.assertEqual(flag, False)
254        # target is older, so upload
255        host = _test_base.ftp_host_factory()
256        flag = host.upload_if_newer(local_source, '/home/older')
257        self.assertEqual(flag, True)
258        # check uploaded content
259        # the data which was uploaded has its line endings converted
260        #  so the conversion must also be applied to 'data'
261        data = data.replace('\n', '\r\n')
262        remote_file_content = _mock_ftplib.content_of('older')
263        self.assertEqual(data, remote_file_content)
264        # target doesn't exist, so upload
265        host = _test_base.ftp_host_factory()
266        flag = host.upload_if_newer(local_source, '/home/notthere')
267        self.assertEqual(flag, True)
268        remote_file_content = _mock_ftplib.content_of('notthere')
269        self.assertEqual(data, remote_file_content)
270        # clean up
271        os.unlink(local_source)
272
273    def compare_and_delete_downloaded_data(self, filename):
274        """Compare content of downloaded file with its source, then
275        delete the local target file."""
276        data = open(filename, 'rb').read()
277        remote_file_content = _mock_ftplib.content_of('newer')
278        self.assertEqual(data, remote_file_content)
279        # clean up
280        os.unlink(filename)
281
282    def test_conditional_download_without_target(self):
283        "Test conditional binary mode download when no target file exists."
284        local_target = '__test_target'
285        # target does not exist, so download
286        host = _test_base.ftp_host_factory(
287               session_factory=BinaryDownloadMockSession)
288        flag = host.download_if_newer('/home/newer', local_target, 'b')
289        self.assertEqual(flag, True)
290        self.compare_and_delete_downloaded_data(local_target)
291
292    def test_conditional_download_with_older_target(self):
293        """Test conditional binary mode download with newer source file."""
294        local_target = '__test_target'
295        # make target file
296        open(local_target, 'w').close()
297        # source is newer, so download
298        host = _test_base.ftp_host_factory(
299               session_factory=BinaryDownloadMockSession)
300        flag = host.download_if_newer('/home/newer', local_target, 'b')
301        self.assertEqual(flag, True)
302        self.compare_and_delete_downloaded_data(local_target)
303
304    def test_conditional_download_with_newer_target(self):
305        """Test conditional binary mode download with older source file."""
306        local_target = '__test_target'
307        # make target file
308        open(local_target, 'w').close()
309        # source is older, so don't download
310        host = _test_base.ftp_host_factory(
311               session_factory=BinaryDownloadMockSession)
312        host = _test_base.ftp_host_factory(
313               ftp_host_class=FailingUploadAndDownloadFTPHost,
314               session_factory=BinaryDownloadMockSession)
315        flag = host.download_if_newer('/home/older', local_target, 'b')
316        self.assertEqual(flag, False)
317        # remove target file
318        os.unlink(local_target)
319
320
321class TestTimeShift(unittest.TestCase):
322    def test_rounded_time_shift(self):
323        """Test if time shift is rounded correctly."""
324        host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
325        # use private bound method
326        rounded_time_shift = host._FTPHost__rounded_time_shift
327        # pairs consisting of original value and expected result
328        test_data = [
329          (0, 0), (0.1, 0), (-0.1, 0), (1500, 0), (-1500, 0),
330          (1800, 3600), (-1800, -3600), (2000, 3600), (-2000, -3600),
331          (5*3600-100, 5*3600), (-5*3600+100, -5*3600)]
332        for time_shift, expected_time_shift in test_data:
333            calculated_time_shift = rounded_time_shift(time_shift)
334            self.assertEqual(calculated_time_shift, expected_time_shift)
335
336    def test_assert_valid_time_shift(self):
337        """Test time shift sanity checks."""
338        host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
339        # use private bound method
340        assert_time_shift = host._FTPHost__assert_valid_time_shift
341        # valid time shifts
342        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
343        for time_shift in test_data:
344            self.failUnless(assert_time_shift(time_shift) is None)
345        # invalid time shift (exceeds one day)
346        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 25*3600)
347        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, -25*3600)
348        # invalid time shift (deviation from full hours unacceptable)
349        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 10*60)
350        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift,
351                          -3600-10*60)
352
353    def test_synchronize_times(self):
354        """Test time synchronization with server."""
355        host = _test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
356               session_factory=TimeShiftMockSession)
357        # valid time shift
358        host.path.set_mtime(time.time() + 3630)
359        host.synchronize_times()
360        self.assertEqual(host.time_shift(), 3600)
361        # invalid time shift
362        host.path.set_mtime(time.time() + 3600+10*60)
363        self.assertRaises(ftp_error.TimeShiftError, host.synchronize_times)
364
365
366if __name__ == '__main__':
367    unittest.main()
Note: See TracBrowser for help on using the browser.