source: test/test_host.py @ 1511:ce5c6ccebd47

Last change on this file since 1511:ce5c6ccebd47 was 1511:ce5c6ccebd47, checked in by Stefan Schwarzer <sschwarzer@…>, 7 years ago
Expect time shift in 15-minute units, not hour units (ticket #81).
File size: 21.7 KB
Line 
1# encoding: utf-8
2# Copyright (C) 2002-2014, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# See the file LICENSE for licensing terms.
4
5from __future__ import unicode_literals
6
7import ftplib
8import itertools
9import os
10import pickle
11import posixpath
12import random
13import time
14import unittest
15import warnings
16
17import ftputil
18import ftputil.compat
19import ftputil.error
20import ftputil.tool
21import ftputil.stat
22
23from test import mock_ftplib
24from test import test_base
25
26
27#
28# Helper functions to generate random data
29#
30def random_data(pool, size=10000):
31    """
32    Return a byte string of characters consisting of those from the
33    pool of integer numbers.
34    """
35    ordinal_list = [random.choice(pool) for i in range(size)]
36    return ftputil.compat.bytes_from_ints(ordinal_list)
37
38
39def ascii_data():
40    r"""
41    Return a unicode string of "normal" ASCII characters, including `\r`.
42    """
43    pool = list(range(32, 128))
44    # The idea is to have the "\r" converted to "\n" during the later
45    # text write and check this conversion.
46    pool.append(ord("\r"))
47    return ftputil.tool.as_unicode(random_data(pool))
48
49
50def binary_data():
51    """Return a binary character byte string."""
52    pool = list(range(0, 256))
53    return random_data(pool)
54
55
56#
57# Several customized `MockSession` classes
58#
59class FailOnLoginSession(mock_ftplib.MockSession):
60
61    def __init__(self, host="", user="", password=""):
62        raise ftplib.error_perm
63
64
65class FailOnKeepAliveSession(mock_ftplib.MockSession):
66
67    def pwd(self):
68        # Raise exception on second call to let the constructor work.
69        if not hasattr(self, "pwd_called"):
70            self.pwd_called = True
71        else:
72            raise ftplib.error_temp
73
74
75class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
76
77    dir_contents = {
78      ".": """\
79lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
80
81dev:
82total 10
83
84etc:
85total 10
86
87pub:
88total 4
89-rw-r--r--   1 staff         74 Sep 25  2000 .message
90----------   1 staff          0 Aug 16  2003 .notar
91drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
92
93usr:
94total 4""",
95
96      "": """\
97total 10
98lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
99d--x--x--x   2 staff        512 Sep 24  2000 dev
100d--x--x--x   3 staff        512 Sep 25  2000 etc
101dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
102d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
103
104    def _transform_path(self, path):
105        return path
106
107
108class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
109
110    mock_file_content = binary_data()
111
112
113class TimeShiftMockSession(mock_ftplib.MockSession):
114
115    def delete(self, file_name):
116        pass
117
118#
119# Customized `FTPHost` class for conditional upload/download tests
120# and time shift tests
121#
122class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
123
124    def upload(self, source, target, mode=""):
125        assert False, "`FTPHost.upload` should not have been called"
126
127    def download(self, source, target, mode=""):
128        assert False, "`FTPHost.download` should not have been called"
129
130
131class TimeShiftFTPHost(ftputil.FTPHost):
132
133    class _Path:
134        def split(self, path):
135            return posixpath.split(path)
136        def set_mtime(self, mtime):
137            self._mtime = mtime
138        def getmtime(self, file_name):
139            return self._mtime
140        def join(self, *args):
141            return posixpath.join(*args)
142        def normpath(self, path):
143            return posixpath.normpath(path)
144        def isabs(self, path):
145            return posixpath.isabs(path)
146        def abspath(self, path):
147            return "/home/sschwarzer/_ftputil_sync_"
148        # Needed for `isdir` in `FTPHost.remove`
149        def isfile(self, path):
150            return True
151
152    def __init__(self, *args, **kwargs):
153        ftputil.FTPHost.__init__(self, *args, **kwargs)
154        self.path = self._Path()
155
156#
157# Test cases
158#
159class TestInitAndClose(unittest.TestCase):
160    """Test initialization and closure of `FTPHost` objects."""
161
162    def test_open_and_close(self):
163        host = test_base.ftp_host_factory()
164        host.close()
165        self.assertEqual(host.closed, True)
166        self.assertEqual(host._children, [])
167
168
169class TestLogin(unittest.TestCase):
170
171    def test_invalid_login(self):
172        """Login to invalid host must fail."""
173        self.assertRaises(ftputil.error.FTPOSError, test_base.ftp_host_factory,
174                          FailOnLoginSession)
175
176
177class TestKeepAlive(unittest.TestCase):
178
179    def test_succeeding_keep_alive(self):
180        """Assume the connection is still alive."""
181        host = test_base.ftp_host_factory()
182        host.keep_alive()
183
184    def test_failing_keep_alive(self):
185        """Assume the connection has timed out, so `keep_alive` fails."""
186        host = test_base.ftp_host_factory(
187                 session_factory=FailOnKeepAliveSession)
188        self.assertRaises(ftputil.error.TemporaryError, host.keep_alive)
189
190
191class TestSetParser(unittest.TestCase):
192
193    class TrivialParser(ftputil.stat.Parser):
194        """
195        An instance of this parser always returns the same result
196        from its `parse_line` method. This is all we need to check
197        if ftputil uses the set parser. No actual parsing code is
198        required here.
199        """
200
201        def __init__(self):
202            # We can't use `os.stat("/home")` directly because we
203            # later need the object's `_st_name` attribute, which
204            # we can't set on a `os.stat` stat value.
205            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
206            default_stat_result._st_name = "home"
207            self.default_stat_result = default_stat_result
208
209        def parse_line(self, line, time_shift=0.0):
210            return self.default_stat_result
211
212    def test_set_parser(self):
213        """Test if the selected parser is used."""
214        host = test_base.ftp_host_factory()
215        self.assertEqual(host._stat._allow_parser_switching, True)
216        trivial_parser = TestSetParser.TrivialParser()
217        host.set_parser(trivial_parser)
218        stat_result = host.stat("/home")
219        self.assertEqual(stat_result, trivial_parser.default_stat_result)
220        self.assertEqual(host._stat._allow_parser_switching, False)
221
222
223class TestCommandNotImplementedError(unittest.TestCase):
224
225    def test_command_not_implemented_error(self):
226        """
227        Test if we get the anticipated exception if a command isn't
228        implemented by the server.
229        """
230        host = test_base.ftp_host_factory()
231        self.assertRaises(ftputil.error.CommandNotImplementedError,
232                          host.chmod, "nonexistent", 0o644)
233        # `CommandNotImplementedError` is a subclass of `PermanentError`.
234        self.assertRaises(ftputil.error.PermanentError,
235                          host.chmod, "nonexistent", 0o644)
236
237
238class TestRecursiveListingForDotAsPath(unittest.TestCase):
239    """
240    Return a recursive directory listing when the path to list
241    is a dot. This is used to test for issue #33, see
242    http://ftputil.sschwarzer.net/trac/ticket/33 .
243    """
244
245    def test_recursive_listing(self):
246        host = test_base.ftp_host_factory(
247                 session_factory=RecursiveListingForDotAsPathSession)
248        lines = host._dir(host.curdir)
249        self.assertEqual(lines[0], "total 10")
250        self.assertTrue(lines[1].startswith("lrwxrwxrwx   1 staff"))
251        self.assertTrue(lines[2].startswith("d--x--x--x   2 staff"))
252        host.close()
253
254    def test_plain_listing(self):
255        host = test_base.ftp_host_factory(
256                 session_factory=RecursiveListingForDotAsPathSession)
257        lines = host._dir("")
258        self.assertEqual(lines[0], "total 10")
259        self.assertTrue(lines[1].startswith("lrwxrwxrwx   1 staff"))
260        self.assertTrue(lines[2].startswith("d--x--x--x   2 staff"))
261        host.close()
262
263    def test_empty_string_instead_of_dot_workaround(self):
264        host = test_base.ftp_host_factory(
265                 session_factory=RecursiveListingForDotAsPathSession)
266        files = host.listdir(host.curdir)
267        self.assertEqual(files, ["bin", "dev", "etc", "pub", "usr"])
268        host.close()
269
270
271class TestUploadAndDownload(unittest.TestCase):
272    """Test ASCII upload and binary download as examples."""
273
274    def generate_file(self, data, file_name):
275        """Generate a local data file."""
276        with open(file_name, "wb") as source_file:
277            source_file.write(data)
278
279    def test_download(self):
280        """Test mode download."""
281        local_target = "_test_target_"
282        host = test_base.ftp_host_factory(
283                 session_factory=BinaryDownloadMockSession)
284        # Download
285        host.download("dummy", local_target)
286        # Read file and compare
287        with open(local_target, "rb") as fobj:
288            data = fobj.read()
289        remote_file_content = mock_ftplib.content_of("dummy")
290        self.assertEqual(data, remote_file_content)
291        # Clean up
292        os.unlink(local_target)
293
294    def test_conditional_upload(self):
295        """Test conditional upload."""
296        local_source = "_test_source_"
297        data = binary_data()
298        self.generate_file(data, local_source)
299        # Target is newer, so don't upload.
300        host = test_base.ftp_host_factory(
301                 ftp_host_class=FailingUploadAndDownloadFTPHost)
302        flag = host.upload_if_newer(local_source, "/home/newer")
303        self.assertEqual(flag, False)
304        # Target is older, so upload.
305        host = test_base.ftp_host_factory()
306        flag = host.upload_if_newer(local_source, "/home/older")
307        self.assertEqual(flag, True)
308        remote_file_content = mock_ftplib.content_of("older")
309        self.assertEqual(data, remote_file_content)
310        # Target doesn't exist, so upload.
311        host = test_base.ftp_host_factory()
312        flag = host.upload_if_newer(local_source, "/home/notthere")
313        self.assertEqual(flag, True)
314        remote_file_content = mock_ftplib.content_of("notthere")
315        self.assertEqual(data, remote_file_content)
316        # Clean up.
317        os.unlink(local_source)
318
319    def compare_and_delete_downloaded_data(self, file_name):
320        """
321        Compare content of downloaded file with its source, then
322        delete the local target file.
323        """
324        with open(file_name, "rb") as fobj:
325            data = fobj.read()
326        # The name `newer` is used by all callers, so use it here, too.
327        remote_file_content = mock_ftplib.content_of("newer")
328        self.assertEqual(data, remote_file_content)
329        # Clean up
330        os.unlink(file_name)
331
332    def test_conditional_download_without_target(self):
333        """
334        Test conditional binary mode download when no target file
335        exists.
336        """
337        local_target = "_test_target_"
338        # Target does not exist, so download.
339        host = test_base.ftp_host_factory(
340                 session_factory=BinaryDownloadMockSession)
341        flag = host.download_if_newer("/home/newer", local_target)
342        self.assertEqual(flag, True)
343        self.compare_and_delete_downloaded_data(local_target)
344
345    def test_conditional_download_with_older_target(self):
346        """Test conditional binary mode download with newer source file."""
347        local_target = "_test_target_"
348        # Make target file.
349        open(local_target, "w").close()
350        # Source is newer (date in 2020), so download.
351        host = test_base.ftp_host_factory(
352                 session_factory=BinaryDownloadMockSession)
353        flag = host.download_if_newer("/home/newer", local_target)
354        self.assertEqual(flag, True)
355        self.compare_and_delete_downloaded_data(local_target)
356
357    def test_conditional_download_with_newer_target(self):
358        """Test conditional binary mode download with older source file."""
359        local_target = "_test_target_"
360        # Make target file.
361        open(local_target, "w").close()
362        # Source is older (date in 1970), so don't download.
363        host = test_base.ftp_host_factory(
364                 ftp_host_class=FailingUploadAndDownloadFTPHost,
365                 session_factory=BinaryDownloadMockSession)
366        flag = host.download_if_newer("/home/older", local_target)
367        self.assertEqual(flag, False)
368        # Remove target file
369        os.unlink(local_target)
370
371
372class TestTimeShift(unittest.TestCase):
373
374    def test_rounded_time_shift(self):
375        """Test if time shift is rounded correctly."""
376        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
377        # Use private bound method.
378        rounded_time_shift = host._FTPHost__rounded_time_shift
379        # Pairs consisting of original value and expected result
380        test_data = [
381          (      0,           0),
382          (      0.1,         0),
383          (     -0.1,         0),
384          (   1500,        1800),
385          (  -1500,       -1800),
386          (   1800,        1800),
387          (  -1800,       -1800),
388          (   2000,        1800),
389          (  -2000,       -1800),
390          ( 5*3600-100,  5*3600),
391          (-5*3600+100, -5*3600)]
392        for time_shift, expected_time_shift in test_data:
393            calculated_time_shift = rounded_time_shift(time_shift)
394            self.assertEqual(calculated_time_shift, expected_time_shift)
395
396    def test_assert_valid_time_shift(self):
397        """Test time shift sanity checks."""
398        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
399        # Use private bound method.
400        assert_time_shift = host._FTPHost__assert_valid_time_shift
401        # Valid time shifts
402        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
403        for time_shift in test_data:
404            self.assertTrue(assert_time_shift(time_shift) is None)
405        # Invalid time shift (exceeds one day)
406        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
407                          25*3600)
408        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
409                          -25*3600)
410        # Invalid time shift (too large deviation from 15-minute units
411        # is unacceptable)
412        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
413                          8*60)
414        self.assertRaises(ftputil.error.TimeShiftError, assert_time_shift,
415                          -3600-8*60)
416
417    def test_synchronize_times(self):
418        """Test time synchronization with server."""
419        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
420                                          session_factory=TimeShiftMockSession)
421        # Valid time shifts
422        test_data = [
423          (60*60+30,  60*60),
424          (60*60-100, 60*60),
425          (30*60+100, 30*60),
426          (45*60-100, 45*60),
427        ]
428        for measured_time_shift, expected_time_shift in test_data:
429            host.path.set_mtime(time.time() + measured_time_shift)
430            host.synchronize_times()
431            self.assertEqual(host.time_shift(), expected_time_shift)
432        # Invalid time shifts
433        measured_time_shifts = [60*60+8*60, 45*60-6*60]
434        for measured_time_shift in measured_time_shifts:
435            host.path.set_mtime(time.time() + measured_time_shift)
436            self.assertRaises(ftputil.error.TimeShiftError,
437                              host.synchronize_times)
438
439    def test_synchronize_times_for_server_in_east(self):
440        """Test for timestamp correction (see ticket #55)."""
441        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
442                                          session_factory=TimeShiftMockSession)
443        # Set this explicitly to emphasize the problem.
444        host.set_time_shift(0.0)
445        hour = 60 * 60
446        # This could be any negative time shift.
447        presumed_time_shift = -6 * hour
448        # Set `mtime` to simulate a server east of us.
449        # In case the `time_shift` value for this host instance is 0.0
450        # (as is to be expected before the time shift is determined),
451        # the directory parser (more specifically
452        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
453        # is a year too far in the past. The `synchronize_times`
454        # method needs to deal with this and add the year "back".
455        # I don't think it's a bug in `parse_unix_time` because the
456        # method should work once the time shift is set correctly.
457        local_time = time.localtime()
458        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
459        presumed_server_time = \
460          time.mktime(local_time_with_wrong_year) + presumed_time_shift
461        host.path.set_mtime(presumed_server_time)
462        host.synchronize_times()
463        self.assertEqual(host.time_shift(), presumed_time_shift)
464
465
466class TestAcceptEitherUnicodeOrBytes(unittest.TestCase):
467    """
468    Test whether certain `FTPHost` methods accept either unicode
469    or byte strings for the path(s).
470    """
471
472    def setUp(self):
473        self.host = test_base.ftp_host_factory()
474
475    def test_upload(self):
476        """Test whether `upload` accepts either unicode or bytes."""
477        host = self.host
478        # The source file needs to be present in the current directory.
479        host.upload("Makefile", "target")
480        host.upload("Makefile", ftputil.tool.as_bytes("target"))
481
482    def test_download(self):
483        """Test whether `download` accepts either unicode or bytes."""
484        host = test_base.ftp_host_factory(
485                 session_factory=BinaryDownloadMockSession)
486        local_file_name = "_local_target_"
487        host.download("source", local_file_name)
488        host.download(ftputil.tool.as_bytes("source"), local_file_name)
489        os.remove(local_file_name)
490
491    def test_rename(self):
492        """Test whether `rename` accepts either unicode or bytes."""
493        # It's possible to mix argument types, as for `os.rename`.
494        path_as_unicode = "/home/file_name_test/ä"
495        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
496        paths = [path_as_unicode, path_as_bytes]
497        for source_path, target_path in itertools.product(paths, paths):
498            self.host.rename(source_path, target_path)
499
500    def test_listdir(self):
501        """Test whether `listdir` accepts either unicode or bytes."""
502        host = self.host
503        as_bytes = ftputil.tool.as_bytes
504        host.chdir("/home/file_name_test")
505        # Unicode
506        items = host.listdir("ä")
507        self.assertEqual(items, ["ö", "o"])
508        #  Need explicit type check for Python 2
509        for item in items:
510            self.assertTrue(isinstance(item, ftputil.compat.unicode_type))
511        # Bytes
512        items = host.listdir(as_bytes("ä"))
513        self.assertEqual(items, [as_bytes("ö"), as_bytes("o")])
514        #  Need explicit type check for Python 2
515        for item in items:
516            self.assertTrue(isinstance(item, ftputil.compat.bytes_type))
517
518    def test_chmod(self):
519        """Test whether `chmod` accepts either unicode or bytes."""
520        host = self.host
521        # The `voidcmd` implementation in `MockSession` would raise an
522        # exception for the `CHMOD` command.
523        host._session.voidcmd = host._session._ignore_arguments
524        path = "/home/file_name_test/ä"
525        host.chmod(path, 0o755)
526        host.chmod(ftputil.tool.as_bytes(path), 0o755)
527
528    def _test_method_with_single_path_argument(self, method, path):
529        method(path)
530        method(ftputil.tool.as_bytes(path))
531
532    def test_chdir(self):
533        """Test whether `chdir` accepts either unicode or bytes."""
534        self._test_method_with_single_path_argument(
535          self.host.chdir, "/home/file_name_test/ö")
536
537    def test_mkdir(self):
538        """Test whether `mkdir` accepts either unicode or bytes."""
539        # This directory exists already in the mock session, but this
540        # shouldn't matter for the test.
541        self._test_method_with_single_path_argument(
542          self.host.mkdir, "/home/file_name_test/ä")
543
544    def test_makedirs(self):
545        """Test whether `makedirs` accepts either unicode or bytes."""
546        self._test_method_with_single_path_argument(
547          self.host.makedirs, "/home/file_name_test/ä")
548
549    def test_rmdir(self):
550        """Test whether `rmdir` accepts either unicode or bytes."""
551        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
552        self._test_method_with_single_path_argument(
553          self.host.rmdir, empty_directory_as_required_by_rmdir)
554
555    def test_remove(self):
556        """Test whether `remove` accepts either unicode or bytes."""
557        self._test_method_with_single_path_argument(
558          self.host.remove, "/home/file_name_test/ö")
559
560    def test_rmtree(self):
561        """Test whether `rmtree` accepts either unicode or bytes."""
562        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
563        self._test_method_with_single_path_argument(
564          self.host.rmtree, empty_directory_as_required_by_rmtree)
565
566    def test_lstat(self):
567        """Test whether `lstat` accepts either unicode or bytes."""
568        self._test_method_with_single_path_argument(
569          self.host.lstat, "/home/file_name_test/ä")
570
571    def test_stat(self):
572        """Test whether `stat` accepts either unicode or bytes."""
573        self._test_method_with_single_path_argument(
574          self.host.stat, "/home/file_name_test/ä")
575
576    def test_walk(self):
577        """Test whether `walk` accepts either unicode or bytes."""
578        # We're not interested in the return value of `walk`.
579        self._test_method_with_single_path_argument(
580          self.host.walk, "/home/file_name_test/ä")
581
582
583class TestFailingPickling(unittest.TestCase):
584
585    def test_failing_pickling(self):
586        """Test if pickling (intentionally) isn't supported."""
587        with test_base.ftp_host_factory() as host:
588            self.assertRaises(TypeError, pickle.dumps, host)
589            with host.open("/home/sschwarzer/index.html") as file_obj:
590                self.assertRaises(TypeError, pickle.dumps, file_obj)
591
592
593if __name__ == "__main__":
594    unittest.main()
595    import __main__
596    # unittest.main(__main__,
597    #   "TestUploadAndDownload.test_conditional_upload")
Note: See TracBrowser for help on using the repository browser.