source: test/test_host.py @ 1482:600fb1c435a9

Last change on this file since 1482:600fb1c435a9 was 1482:600fb1c435a9, checked in by Stefan Schwarzer <sschwarzer@…>, 7 years ago
Deliberately don't support pickling for `FTPHost` and `FTPFile`. Problems with pickling (from ticket #75): - To re-connect from the pickle file, you'd need the username and password, and I would understand if someone was mad at me if I didn't just use their password in memory but also wrote it to a file. ;-) - The pickle file might be unpickled in a completely different environment. It might not be possible to connect to the host from there. - The state of the FTP server might have changed between pickling and unpickling. For example, your last current directory might no longer exist. Of course, this can also happen during a session, but the more time passes between two logins (before pickling and when unpickling), the more likely a change on the remote side is. - It gets much more complicated if there are open remote files when the FTPHost instance should be pickled.
File size: 21.3 KB
Line 
1# encoding: utf-8
2# Copyright (C) 2002-2014, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# See the file LICENSE for licensing terms.
4
5from __future__ import unicode_literals
6
7import ftplib
8import itertools
9import os
10import pickle
11import posixpath
12import random
13import time
14import unittest
15
16import ftputil
17import ftputil.compat
18import ftputil.error
19import ftputil.tool
20import ftputil.stat
21
22from test import mock_ftplib
23from test import test_base
24
25
26#
27# Helper functions to generate random data
28#
29def random_data(pool, size=10000):
30    """
31    Return a byte string of characters consisting of those from the
32    pool of integer numbers.
33    """
34    ordinal_list = [random.choice(pool) for i in range(size)]
35    return ftputil.compat.bytes_from_ints(ordinal_list)
36
37
38def ascii_data():
39    r"""
40    Return a unicode string of "normal" ASCII characters, including `\r`.
41    """
42    pool = list(range(32, 128))
43    # The idea is to have the "\r" converted to "\n" during the later
44    # text write and check this conversion.
45    pool.append(ord("\r"))
46    return ftputil.tool.as_unicode(random_data(pool))
47
48
49def binary_data():
50    """Return a binary character byte string."""
51    pool = list(range(0, 256))
52    return random_data(pool)
53
54
55#
56# Several customized `MockSession` classes
57#
58class FailOnLoginSession(mock_ftplib.MockSession):
59
60    def __init__(self, host="", user="", password=""):
61        raise ftplib.error_perm
62
63
64class FailOnKeepAliveSession(mock_ftplib.MockSession):
65
66    def pwd(self):
67        # Raise exception on second call to let the constructor work.
68        if not hasattr(self, "pwd_called"):
69            self.pwd_called = True
70        else:
71            raise ftplib.error_temp
72
73
74class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
75
76    dir_contents = {
77      ".": """\
78lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
79
80dev:
81total 10
82
83etc:
84total 10
85
86pub:
87total 4
88-rw-r--r--   1 staff         74 Sep 25  2000 .message
89----------   1 staff          0 Aug 16  2003 .notar
90drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
91
92usr:
93total 4""",
94
95      "": """\
96total 10
97lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
98d--x--x--x   2 staff        512 Sep 24  2000 dev
99d--x--x--x   3 staff        512 Sep 25  2000 etc
100dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
101d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
102
103    def _transform_path(self, path):
104        return path
105
106
107class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
108
109    mock_file_content = binary_data()
110
111
112class TimeShiftMockSession(mock_ftplib.MockSession):
113
114    def delete(self, file_name):
115        pass
116
117#
118# Customized `FTPHost` class for conditional upload/download tests
119# and time shift tests
120#
121class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
122
123    def upload(self, source, target, mode=""):
124        assert False, "`FTPHost.upload` should not have been called"
125
126    def download(self, source, target, mode=""):
127        assert False, "`FTPHost.download` should not have been called"
128
129
130class TimeShiftFTPHost(ftputil.FTPHost):
131
132    class _Path:
133        def split(self, path):
134            return posixpath.split(path)
135        def set_mtime(self, mtime):
136            self._mtime = mtime
137        def getmtime(self, file_name):
138            return self._mtime
139        def join(self, *args):
140            return posixpath.join(*args)
141        def normpath(self, path):
142            return posixpath.normpath(path)
143        def isabs(self, path):
144            return posixpath.isabs(path)
145        def abspath(self, path):
146            return "/home/sschwarzer/_ftputil_sync_"
147        # Needed for `isdir` in `FTPHost.remove`
148        def isfile(self, path):
149            return True
150
151    def __init__(self, *args, **kwargs):
152        ftputil.FTPHost.__init__(self, *args, **kwargs)
153        self.path = self._Path()
154
155#
156# Test cases
157#
158class TestInitAndClose(unittest.TestCase):
159    """Test initialization and closure of `FTPHost` objects."""
160
161    def test_open_and_close(self):
162        host = test_base.ftp_host_factory()
163        host.close()
164        self.assertEqual(host.closed, True)
165        self.assertEqual(host._children, [])
166
167
168class TestLogin(unittest.TestCase):
169
170    def test_invalid_login(self):
171        """Login to invalid host must fail."""
172        self.assertRaises(ftputil.error.FTPOSError, test_base.ftp_host_factory,
173                          FailOnLoginSession)
174
175
176class TestKeepAlive(unittest.TestCase):
177
178    def test_succeeding_keep_alive(self):
179        """Assume the connection is still alive."""
180        host = test_base.ftp_host_factory()
181        host.keep_alive()
182
183    def test_failing_keep_alive(self):
184        """Assume the connection has timed out, so `keep_alive` fails."""
185        host = test_base.ftp_host_factory(
186                 session_factory=FailOnKeepAliveSession)
187        self.assertRaises(ftputil.error.TemporaryError, host.keep_alive)
188
189
190class TestSetParser(unittest.TestCase):
191
192    class TrivialParser(ftputil.stat.Parser):
193        """
194        An instance of this parser always returns the same result
195        from its `parse_line` method. This is all we need to check
196        if ftputil uses the set parser. No actual parsing code is
197        required here.
198        """
199
200        def __init__(self):
201            # We can't use `os.stat("/home")` directly because we
202            # later need the object's `_st_name` attribute, which
203            # we can't set on a `os.stat` stat value.
204            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
205            default_stat_result._st_name = "home"
206            self.default_stat_result = default_stat_result
207
208        def parse_line(self, line, time_shift=0.0):
209            return self.default_stat_result
210
211    def test_set_parser(self):
212        """Test if the selected parser is used."""
213        host = test_base.ftp_host_factory()
214        self.assertEqual(host._stat._allow_parser_switching, True)
215        trivial_parser = TestSetParser.TrivialParser()
216        host.set_parser(trivial_parser)
217        stat_result = host.stat("/home")
218        self.assertEqual(stat_result, trivial_parser.default_stat_result)
219        self.assertEqual(host._stat._allow_parser_switching, False)
220
221
222class TestCommandNotImplementedError(unittest.TestCase):
223
224    def test_command_not_implemented_error(self):
225        """
226        Test if we get the anticipated exception if a command isn't
227        implemented by the server.
228        """
229        host = test_base.ftp_host_factory()
230        self.assertRaises(ftputil.error.CommandNotImplementedError,
231                          host.chmod, "nonexistent", 0o644)
232        # `CommandNotImplementedError` is a subclass of `PermanentError`.
233        self.assertRaises(ftputil.error.PermanentError,
234                          host.chmod, "nonexistent", 0o644)
235
236
237class TestRecursiveListingForDotAsPath(unittest.TestCase):
238    """
239    Return a recursive directory listing when the path to list
240    is a dot. This is used to test for issue #33, see
241    http://ftputil.sschwarzer.net/trac/ticket/33 .
242    """
243
244    def test_recursive_listing(self):
245        host = test_base.ftp_host_factory(
246                 session_factory=RecursiveListingForDotAsPathSession)
247        lines = host._dir(host.curdir)
248        self.assertEqual(lines[0], "total 10")
249        self.assertTrue(lines[1].startswith("lrwxrwxrwx   1 staff"))
250        self.assertTrue(lines[2].startswith("d--x--x--x   2 staff"))
251        host.close()
252
253    def test_plain_listing(self):
254        host = test_base.ftp_host_factory(
255                 session_factory=RecursiveListingForDotAsPathSession)
256        lines = host._dir("")
257        self.assertEqual(lines[0], "total 10")
258        self.assertTrue(lines[1].startswith("lrwxrwxrwx   1 staff"))
259        self.assertTrue(lines[2].startswith("d--x--x--x   2 staff"))
260        host.close()
261
262    def test_empty_string_instead_of_dot_workaround(self):
263        host = test_base.ftp_host_factory(
264                 session_factory=RecursiveListingForDotAsPathSession)
265        files = host.listdir(host.curdir)
266        self.assertEqual(files, ["bin", "dev", "etc", "pub", "usr"])
267        host.close()
268
269
270class TestUploadAndDownload(unittest.TestCase):
271    """Test ASCII upload and binary download as examples."""
272
273    def generate_file(self, data, file_name):
274        """Generate a local data file."""
275        with open(file_name, "wb") as source_file:
276            source_file.write(data)
277
278    def test_download(self):
279        """Test mode download."""
280        local_target = "_test_target_"
281        host = test_base.ftp_host_factory(
282                 session_factory=BinaryDownloadMockSession)
283        # Download
284        host.download("dummy", local_target)
285        # Read file and compare
286        with open(local_target, "rb") as fobj:
287            data = fobj.read()
288        remote_file_content = mock_ftplib.content_of("dummy")
289        self.assertEqual(data, remote_file_content)
290        # Clean up
291        os.unlink(local_target)
292
293    def test_conditional_upload(self):
294        """Test conditional upload."""
295        local_source = "_test_source_"
296        data = binary_data()
297        self.generate_file(data, local_source)
298        # Target is newer, so don't upload.
299        host = test_base.ftp_host_factory(
300                 ftp_host_class=FailingUploadAndDownloadFTPHost)
301        flag = host.upload_if_newer(local_source, "/home/newer")
302        self.assertEqual(flag, False)
303        # Target is older, so upload.
304        host = test_base.ftp_host_factory()
305        flag = host.upload_if_newer(local_source, "/home/older")
306        self.assertEqual(flag, True)
307        remote_file_content = mock_ftplib.content_of("older")
308        self.assertEqual(data, remote_file_content)
309        # Target doesn't exist, so upload.
310        host = test_base.ftp_host_factory()
311        flag = host.upload_if_newer(local_source, "/home/notthere")
312        self.assertEqual(flag, True)
313        remote_file_content = mock_ftplib.content_of("notthere")
314        self.assertEqual(data, remote_file_content)
315        # Clean up.
316        os.unlink(local_source)
317
318    def compare_and_delete_downloaded_data(self, file_name):
319        """
320        Compare content of downloaded file with its source, then
321        delete the local target file.
322        """
323        with open(file_name, "rb") as fobj:
324            data = fobj.read()
325        # The name `newer` is used by all callers, so use it here, too.
326        remote_file_content = mock_ftplib.content_of("newer")
327        self.assertEqual(data, remote_file_content)
328        # Clean up
329        os.unlink(file_name)
330
331    def test_conditional_download_without_target(self):
332        """
333        Test conditional binary mode download when no target file
334        exists.
335        """
336        local_target = "_test_target_"
337        # Target does not exist, so download.
338        host = test_base.ftp_host_factory(
339                 session_factory=BinaryDownloadMockSession)
340        flag = host.download_if_newer("/home/newer", local_target)
341        self.assertEqual(flag, True)
342        self.compare_and_delete_downloaded_data(local_target)
343
344    def test_conditional_download_with_older_target(self):
345        """Test conditional binary mode download with newer source file."""
346        local_target = "_test_target_"
347        # Make target file.
348        open(local_target, "w").close()
349        # Source is newer (date in 2020), so download.
350        host = test_base.ftp_host_factory(
351                 session_factory=BinaryDownloadMockSession)
352        flag = host.download_if_newer("/home/newer", local_target)
353        self.assertEqual(flag, True)
354        self.compare_and_delete_downloaded_data(local_target)
355
356    def test_conditional_download_with_newer_target(self):
357        """Test conditional binary mode download with older source file."""
358        local_target = "_test_target_"
359        # Make target file.
360        open(local_target, "w").close()
361        # Source is older (date in 1970), so don't download.
362        host = test_base.ftp_host_factory(
363                 ftp_host_class=FailingUploadAndDownloadFTPHost,
364                 session_factory=BinaryDownloadMockSession)
365        flag = host.download_if_newer("/home/older", local_target)
366        self.assertEqual(flag, False)
367        # Remove target file
368        os.unlink(local_target)
369
370
371class TestTimeShift(unittest.TestCase):
372
373    def test_rounded_time_shift(self):
374        """Test if time shift is rounded correctly."""
375        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
376        # Use private bound method.
377        rounded_time_shift = host._FTPHost__rounded_time_shift
378        # Pairs consisting of original value and expected result
379        test_data = [
380          (      0,           0),
381          (      0.1,         0),
382          (     -0.1,         0),
383          (   1500,           0),
384          (  -1500,           0),
385          (   1800,        3600),
386          (  -1800,       -3600),
387          (   2000,        3600),
388          (  -2000,       -3600),
389          ( 5*3600-100,  5*3600),
390          (-5*3600+100, -5*3600)]
391        for time_shift, expected_time_shift in test_data:
392            calculated_time_shift = rounded_time_shift(time_shift)
393            self.assertEqual(calculated_time_shift, expected_time_shift)
394
395    def test_assert_valid_time_shift(self):
396        """Test time shift sanity checks."""
397        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
398        # Use private bound method.
399        assert_time_shift = host._FTPHost__assert_valid_time_shift
400        # Valid time shifts
401        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
402        for time_shift in test_data:
403            self.assertTrue(assert_time_shift(time_shift) is None)
404        # Invalid time shift (exceeds one day)
405        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
406                          25*3600)
407        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
408                          -25*3600)
409        # Invalid time shift (too large deviation from full hours unacceptable)
410        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
411                          10*60)
412        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
413                          -3600-10*60)
414
415    def test_synchronize_times(self):
416        """Test time synchronization with server."""
417        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
418                                          session_factory=TimeShiftMockSession)
419        # Valid time shift
420        host.path.set_mtime(time.time() + 3630)
421        host.synchronize_times()
422        self.assertEqual(host.time_shift(), 3600)
423        # Invalid time shift
424        host.path.set_mtime(time.time() + 3600+10*60)
425        self.assertRaises(ftputil.error.TimeShiftError, host.synchronize_times)
426
427    def test_synchronize_times_for_server_in_east(self):
428        """Test for timestamp correction (see ticket #55)."""
429        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
430                                          session_factory=TimeShiftMockSession)
431        # Set this explicitly to emphasize the problem.
432        host.set_time_shift(0.0)
433        hour = 60 * 60
434        # This could be any negative time shift.
435        presumed_time_shift = -6 * hour
436        # Set `mtime` to simulate a server east of us.
437        # In case the `time_shift` value for this host instance is 0.0
438        # (as is to be expected before the time shift is determined),
439        # the directory parser (more specifically
440        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
441        # is a year too far in the past. The `synchronize_times`
442        # method needs to deal with this and add the year "back".
443        # I don't think it's a bug in `parse_unix_time` because the
444        # method should work once the time shift is set correctly.
445        local_time = time.localtime()
446        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
447        presumed_server_time = \
448          time.mktime(local_time_with_wrong_year) + presumed_time_shift
449        host.path.set_mtime(presumed_server_time)
450        host.synchronize_times()
451        self.assertEqual(host.time_shift(), presumed_time_shift)
452
453
454class TestAcceptEitherUnicodeOrBytes(unittest.TestCase):
455    """
456    Test whether certain `FTPHost` methods accept either unicode
457    or byte strings for the path(s).
458    """
459
460    def setUp(self):
461        self.host = test_base.ftp_host_factory()
462
463    def test_upload(self):
464        """Test whether `upload` accepts either unicode or bytes."""
465        host = self.host
466        # The source file needs to be present in the current directory.
467        host.upload("Makefile", "target")
468        host.upload("Makefile", ftputil.tool.as_bytes("target"))
469
470    def test_download(self):
471        """Test whether `download` accepts either unicode or bytes."""
472        host = test_base.ftp_host_factory(
473                 session_factory=BinaryDownloadMockSession)
474        local_file_name = "_local_target_"
475        host.download("source", local_file_name)
476        host.download(ftputil.tool.as_bytes("source"), local_file_name)
477        os.remove(local_file_name)
478
479    def test_rename(self):
480        """Test whether `rename` accepts either unicode or bytes."""
481        # It's possible to mix argument types, as for `os.rename`.
482        path_as_unicode = "/home/file_name_test/ä"
483        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
484        paths = [path_as_unicode, path_as_bytes]
485        for source_path, target_path in itertools.product(paths, paths):
486            self.host.rename(source_path, target_path)
487
488    def test_listdir(self):
489        """Test whether `listdir` accepts either unicode or bytes."""
490        host = self.host
491        as_bytes = ftputil.tool.as_bytes
492        host.chdir("/home/file_name_test")
493        # Unicode
494        items = host.listdir("ä")
495        self.assertEqual(items, ["ö", "o"])
496        #  Need explicit type check for Python 2
497        for item in items:
498            self.assertTrue(isinstance(item, ftputil.compat.unicode_type))
499        # Bytes
500        items = host.listdir(as_bytes("ä"))
501        self.assertEqual(items, [as_bytes("ö"), as_bytes("o")])
502        #  Need explicit type check for Python 2
503        for item in items:
504            self.assertTrue(isinstance(item, ftputil.compat.bytes_type))
505
506    def test_chmod(self):
507        """Test whether `chmod` accepts either unicode or bytes."""
508        host = self.host
509        # The `voidcmd` implementation in `MockSession` would raise an
510        # exception for the `CHMOD` command.
511        host._session.voidcmd = host._session._ignore_arguments
512        path = "/home/file_name_test/ä"
513        host.chmod(path, 0o755)
514        host.chmod(ftputil.tool.as_bytes(path), 0o755)
515
516    def _test_method_with_single_path_argument(self, method, path):
517        method(path)
518        method(ftputil.tool.as_bytes(path))
519
520    def test_chdir(self):
521        """Test whether `chdir` accepts either unicode or bytes."""
522        self._test_method_with_single_path_argument(
523          self.host.chdir, "/home/file_name_test/ö")
524
525    def test_mkdir(self):
526        """Test whether `mkdir` accepts either unicode or bytes."""
527        # This directory exists already in the mock session, but this
528        # shouldn't matter for the test.
529        self._test_method_with_single_path_argument(
530          self.host.mkdir, "/home/file_name_test/ä")
531
532    def test_makedirs(self):
533        """Test whether `makedirs` accepts either unicode or bytes."""
534        self._test_method_with_single_path_argument(
535          self.host.makedirs, "/home/file_name_test/ä")
536
537    def test_rmdir(self):
538        """Test whether `rmdir` accepts either unicode or bytes."""
539        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
540        self._test_method_with_single_path_argument(
541          self.host.rmdir, empty_directory_as_required_by_rmdir)
542
543    def test_remove(self):
544        """Test whether `remove` accepts either unicode or bytes."""
545        self._test_method_with_single_path_argument(
546          self.host.remove, "/home/file_name_test/ö")
547
548    def test_rmtree(self):
549        """Test whether `rmtree` accepts either unicode or bytes."""
550        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
551        self._test_method_with_single_path_argument(
552          self.host.rmtree, empty_directory_as_required_by_rmtree)
553
554    def test_lstat(self):
555        """Test whether `lstat` accepts either unicode or bytes."""
556        self._test_method_with_single_path_argument(
557          self.host.lstat, "/home/file_name_test/ä")
558
559    def test_stat(self):
560        """Test whether `stat` accepts either unicode or bytes."""
561        self._test_method_with_single_path_argument(
562          self.host.stat, "/home/file_name_test/ä")
563
564    def test_walk(self):
565        """Test whether `walk` accepts either unicode or bytes."""
566        # We're not interested in the return value of `walk`.
567        self._test_method_with_single_path_argument(
568          self.host.walk, "/home/file_name_test/ä")
569
570
571class TestFailingPickling(unittest.TestCase):
572
573    def test_failing_pickling(self):
574        """Test if pickling (intentionally) isn't supported."""
575        with test_base.ftp_host_factory() as host:
576            self.assertRaises(pickle.PicklingError, pickle.dumps, host)
577            with host.open("/home/sschwarzer/index.html") as file_obj:
578                self.assertRaises(pickle.PicklingError, pickle.dumps, file_obj)
579
580
581if __name__ == "__main__":
582    unittest.main()
583    import __main__
584    # unittest.main(__main__,
585    #   "TestUploadAndDownload.test_conditional_upload")
Note: See TracBrowser for help on using the repository browser.