source: test/test_host.py @ 1663:40280d912474

Last change on this file since 1663:40280d912474 was 1663:40280d912474, checked in by Stefan Schwarzer <sschwarzer@…>, 4 years ago
Remove `unittest` dependency - Remove `unittest` imports - Let test case classes inherit from `object` instead of `unittest.TestCase` - Use `setup_method` and `teardown_method` instead of `unittest`'s `setUp` and `tearDown` - Use `pytest.mark.skipif` instead of `unittest.skipIf` - Use `pytest.mark.slow_test` instead of decorator in `test/__init__.py`. Exclude slow tests with py.test -m "not slow_test" test - Replace some leftover `assert*` calls with `assert` statements - Replace `assert False, ...` with `pytest.fail` ticket: 98
File size: 21.2 KB
Line 
1# encoding: utf-8
2# Copyright (C) 2002-2016, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6from __future__ import unicode_literals
7
8import ftplib
9import itertools
10import os
11import pickle
12import posixpath
13import random
14import time
15import warnings
16
17import pytest
18
19import ftputil
20import ftputil.compat
21import ftputil.error
22import ftputil.tool
23import ftputil.stat
24
25from test import mock_ftplib
26from test import test_base
27
28
29#
30# Helper functions to generate random data
31#
32def random_data(pool, size=10000):
33    """
34    Return a byte string of characters consisting of those from the
35    pool of integer numbers.
36    """
37    ordinal_list = [random.choice(pool) for i in range(size)]
38    return ftputil.compat.bytes_from_ints(ordinal_list)
39
40
41def ascii_data():
42    r"""
43    Return a unicode string of "normal" ASCII characters, including `\r`.
44    """
45    pool = list(range(32, 128))
46    # The idea is to have the "\r" converted to "\n" during the later
47    # text write and check this conversion.
48    pool.append(ord("\r"))
49    return ftputil.tool.as_unicode(random_data(pool))
50
51
52def binary_data():
53    """Return a binary character byte string."""
54    pool = list(range(0, 256))
55    return random_data(pool)
56
57
58#
59# Several customized `MockSession` classes
60#
61class FailOnLoginSession(mock_ftplib.MockSession):
62
63    def __init__(self, host="", user="", password=""):
64        raise ftplib.error_perm
65
66
67class FailOnKeepAliveSession(mock_ftplib.MockSession):
68
69    def pwd(self):
70        # Raise exception on second call to let the constructor work.
71        if not hasattr(self, "pwd_called"):
72            self.pwd_called = True
73        else:
74            raise ftplib.error_temp
75
76
77class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
78
79    dir_contents = {
80      ".": """\
81lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
82
83dev:
84total 10
85
86etc:
87total 10
88
89pub:
90total 4
91-rw-r--r--   1 staff         74 Sep 25  2000 .message
92----------   1 staff          0 Aug 16  2003 .notar
93drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
94
95usr:
96total 4""",
97
98      "": """\
99total 10
100lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
101d--x--x--x   2 staff        512 Sep 24  2000 dev
102d--x--x--x   3 staff        512 Sep 25  2000 etc
103dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
104d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
105
106    def _transform_path(self, path):
107        return path
108
109
110class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
111
112    mock_file_content = binary_data()
113
114
115class TimeShiftMockSession(mock_ftplib.MockSession):
116
117    def delete(self, file_name):
118        pass
119
120#
121# Customized `FTPHost` class for conditional upload/download tests
122# and time shift tests
123#
124class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
125
126    def upload(self, source, target, mode=""):
127        pytest.fail("`FTPHost.upload` should not have been called")
128
129    def download(self, source, target, mode=""):
130        pytest.fail("`FTPHost.download` should not have been called")
131
132
133class TimeShiftFTPHost(ftputil.FTPHost):
134
135    class _Path:
136        def split(self, path):
137            return posixpath.split(path)
138        def set_mtime(self, mtime):
139            self._mtime = mtime
140        def getmtime(self, file_name):
141            return self._mtime
142        def join(self, *args):
143            return posixpath.join(*args)
144        def normpath(self, path):
145            return posixpath.normpath(path)
146        def isabs(self, path):
147            return posixpath.isabs(path)
148        def abspath(self, path):
149            return "/home/sschwarzer/_ftputil_sync_"
150        # Needed for `isdir` in `FTPHost.remove`
151        def isfile(self, path):
152            return True
153
154    def __init__(self, *args, **kwargs):
155        ftputil.FTPHost.__init__(self, *args, **kwargs)
156        self.path = self._Path()
157
158#
159# Test cases
160#
161class TestInitAndClose(object):
162    """Test initialization and closure of `FTPHost` objects."""
163
164    def test_open_and_close(self):
165        host = test_base.ftp_host_factory()
166        host.close()
167        assert host.closed is True
168        assert host._children == []
169
170
171class TestLogin(object):
172
173    def test_invalid_login(self):
174        """Login to invalid host must fail."""
175        with pytest.raises(ftputil.error.FTPOSError):
176            test_base.ftp_host_factory(FailOnLoginSession)
177
178
179class TestKeepAlive(object):
180
181    def test_succeeding_keep_alive(self):
182        """Assume the connection is still alive."""
183        host = test_base.ftp_host_factory()
184        host.keep_alive()
185
186    def test_failing_keep_alive(self):
187        """Assume the connection has timed out, so `keep_alive` fails."""
188        host = test_base.ftp_host_factory(
189                 session_factory=FailOnKeepAliveSession)
190        with pytest.raises(ftputil.error.TemporaryError):
191            host.keep_alive()
192
193
194class TestSetParser(object):
195
196    class TrivialParser(ftputil.stat.Parser):
197        """
198        An instance of this parser always returns the same result
199        from its `parse_line` method. This is all we need to check
200        if ftputil uses the set parser. No actual parsing code is
201        required here.
202        """
203
204        def __init__(self):
205            # We can't use `os.stat("/home")` directly because we
206            # later need the object's `_st_name` attribute, which
207            # we can't set on a `os.stat` stat value.
208            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
209            default_stat_result._st_name = "home"
210            self.default_stat_result = default_stat_result
211
212        def parse_line(self, line, time_shift=0.0):
213            return self.default_stat_result
214
215    def test_set_parser(self):
216        """Test if the selected parser is used."""
217        host = test_base.ftp_host_factory()
218        assert host._stat._allow_parser_switching is True
219        trivial_parser = TestSetParser.TrivialParser()
220        host.set_parser(trivial_parser)
221        stat_result = host.stat("/home")
222        assert stat_result == trivial_parser.default_stat_result
223        assert host._stat._allow_parser_switching is False
224
225
226class TestCommandNotImplementedError(object):
227
228    def test_command_not_implemented_error(self):
229        """
230        Test if we get the anticipated exception if a command isn't
231        implemented by the server.
232        """
233        host = test_base.ftp_host_factory()
234        with pytest.raises(ftputil.error.CommandNotImplementedError):
235            host.chmod("nonexistent", 0o644)
236        # `CommandNotImplementedError` is a subclass of `PermanentError`.
237        with pytest.raises(ftputil.error.PermanentError):
238            host.chmod("nonexistent", 0o644)
239
240
241class TestRecursiveListingForDotAsPath(object):
242    """
243    Return a recursive directory listing when the path to list
244    is a dot. This is used to test for issue #33, see
245    http://ftputil.sschwarzer.net/trac/ticket/33 .
246    """
247
248    def test_recursive_listing(self):
249        host = test_base.ftp_host_factory(
250                 session_factory=RecursiveListingForDotAsPathSession)
251        lines = host._dir(host.curdir)
252        assert lines[0] == "total 10"
253        assert lines[1].startswith("lrwxrwxrwx   1 staff")
254        assert lines[2].startswith("d--x--x--x   2 staff")
255        host.close()
256
257    def test_plain_listing(self):
258        host = test_base.ftp_host_factory(
259                 session_factory=RecursiveListingForDotAsPathSession)
260        lines = host._dir("")
261        assert lines[0] == "total 10"
262        assert lines[1].startswith("lrwxrwxrwx   1 staff")
263        assert lines[2].startswith("d--x--x--x   2 staff")
264        host.close()
265
266    def test_empty_string_instead_of_dot_workaround(self):
267        host = test_base.ftp_host_factory(
268                 session_factory=RecursiveListingForDotAsPathSession)
269        files = host.listdir(host.curdir)
270        assert files == ["bin", "dev", "etc", "pub", "usr"]
271        host.close()
272
273
274class TestUploadAndDownload(object):
275    """Test ASCII upload and binary download as examples."""
276
277    def generate_file(self, data, file_name):
278        """Generate a local data file."""
279        with open(file_name, "wb") as source_file:
280            source_file.write(data)
281
282    def test_download(self):
283        """Test mode download."""
284        local_target = "_test_target_"
285        host = test_base.ftp_host_factory(
286                 session_factory=BinaryDownloadMockSession)
287        # Download
288        host.download("dummy", local_target)
289        # Read file and compare
290        with open(local_target, "rb") as fobj:
291            data = fobj.read()
292        remote_file_content = mock_ftplib.content_of("dummy")
293        assert data == remote_file_content
294        # Clean up
295        os.unlink(local_target)
296
297    def test_conditional_upload(self):
298        """Test conditional upload."""
299        local_source = "_test_source_"
300        data = binary_data()
301        self.generate_file(data, local_source)
302        # Target is newer, so don't upload.
303        host = test_base.ftp_host_factory(
304                 ftp_host_class=FailingUploadAndDownloadFTPHost)
305        flag = host.upload_if_newer(local_source, "/home/newer")
306        assert flag is False
307        # Target is older, so upload.
308        host = test_base.ftp_host_factory()
309        flag = host.upload_if_newer(local_source, "/home/older")
310        assert flag is True
311        remote_file_content = mock_ftplib.content_of("older")
312        assert data == remote_file_content
313        # Target doesn't exist, so upload.
314        host = test_base.ftp_host_factory()
315        flag = host.upload_if_newer(local_source, "/home/notthere")
316        assert flag is True
317        remote_file_content = mock_ftplib.content_of("notthere")
318        assert data == remote_file_content
319        # Clean up.
320        os.unlink(local_source)
321
322    def compare_and_delete_downloaded_data(self, file_name):
323        """
324        Compare content of downloaded file with its source, then
325        delete the local target file.
326        """
327        with open(file_name, "rb") as fobj:
328            data = fobj.read()
329        # The name `newer` is used by all callers, so use it here, too.
330        remote_file_content = mock_ftplib.content_of("newer")
331        assert data == remote_file_content
332        # Clean up
333        os.unlink(file_name)
334
335    def test_conditional_download_without_target(self):
336        """
337        Test conditional binary mode download when no target file
338        exists.
339        """
340        local_target = "_test_target_"
341        # Target does not exist, so download.
342        host = test_base.ftp_host_factory(
343                 session_factory=BinaryDownloadMockSession)
344        flag = host.download_if_newer("/home/newer", local_target)
345        assert flag is True
346        self.compare_and_delete_downloaded_data(local_target)
347
348    def test_conditional_download_with_older_target(self):
349        """Test conditional binary mode download with newer source file."""
350        local_target = "_test_target_"
351        # Make target file.
352        open(local_target, "w").close()
353        # Source is newer (date in 2020), so download.
354        host = test_base.ftp_host_factory(
355                 session_factory=BinaryDownloadMockSession)
356        flag = host.download_if_newer("/home/newer", local_target)
357        assert flag is True
358        self.compare_and_delete_downloaded_data(local_target)
359
360    def test_conditional_download_with_newer_target(self):
361        """Test conditional binary mode download with older source file."""
362        local_target = "_test_target_"
363        # Make target file.
364        open(local_target, "w").close()
365        # Source is older (date in 1970), so don't download.
366        host = test_base.ftp_host_factory(
367                 ftp_host_class=FailingUploadAndDownloadFTPHost,
368                 session_factory=BinaryDownloadMockSession)
369        flag = host.download_if_newer("/home/older", local_target)
370        assert flag is False
371        # Remove target file
372        os.unlink(local_target)
373
374
375class TestTimeShift(object):
376
377    def test_rounded_time_shift(self):
378        """Test if time shift is rounded correctly."""
379        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
380        # Use private bound method.
381        rounded_time_shift = host._FTPHost__rounded_time_shift
382        # Pairs consisting of original value and expected result
383        test_data = [
384          (      0,           0),
385          (      0.1,         0),
386          (     -0.1,         0),
387          (   1500,        1800),
388          (  -1500,       -1800),
389          (   1800,        1800),
390          (  -1800,       -1800),
391          (   2000,        1800),
392          (  -2000,       -1800),
393          ( 5*3600-100,  5*3600),
394          (-5*3600+100, -5*3600)]
395        for time_shift, expected_time_shift in test_data:
396            calculated_time_shift = rounded_time_shift(time_shift)
397            assert calculated_time_shift == expected_time_shift
398
399    def test_assert_valid_time_shift(self):
400        """Test time shift sanity checks."""
401        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
402        # Use private bound method.
403        assert_time_shift = host._FTPHost__assert_valid_time_shift
404        # Valid time shifts
405        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
406        for time_shift in test_data:
407            assert assert_time_shift(time_shift) is None
408        # Invalid time shift (exceeds one day)
409        with pytest.raises(ftputil.error.TimeShiftError):
410            assert_time_shift(25*3600)
411        with pytest.raises(ftputil.error.TimeShiftError):
412            assert_time_shift(-25*3600)
413        # Invalid time shift (too large deviation from 15-minute units
414        # is unacceptable)
415        with pytest.raises(ftputil.error.TimeShiftError):
416            assert_time_shift(8*60)
417        with pytest.raises(ftputil.error.TimeShiftError):
418            assert_time_shift(-3600-8*60)
419
420    def test_synchronize_times(self):
421        """Test time synchronization with server."""
422        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
423                                          session_factory=TimeShiftMockSession)
424        # Valid time shifts
425        test_data = [
426          (60*60+30,  60*60),
427          (60*60-100, 60*60),
428          (30*60+100, 30*60),
429          (45*60-100, 45*60),
430        ]
431        for measured_time_shift, expected_time_shift in test_data:
432            host.path.set_mtime(time.time() + measured_time_shift)
433            host.synchronize_times()
434            assert host.time_shift() == expected_time_shift
435        # Invalid time shifts
436        measured_time_shifts = [60*60+8*60, 45*60-6*60]
437        for measured_time_shift in measured_time_shifts:
438            host.path.set_mtime(time.time() + measured_time_shift)
439            with pytest.raises(ftputil.error.TimeShiftError):
440                host.synchronize_times()
441
442    def test_synchronize_times_for_server_in_east(self):
443        """Test for timestamp correction (see ticket #55)."""
444        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
445                                          session_factory=TimeShiftMockSession)
446        # Set this explicitly to emphasize the problem.
447        host.set_time_shift(0.0)
448        hour = 60 * 60
449        # This could be any negative time shift.
450        presumed_time_shift = -6 * hour
451        # Set `mtime` to simulate a server east of us.
452        # In case the `time_shift` value for this host instance is 0.0
453        # (as is to be expected before the time shift is determined),
454        # the directory parser (more specifically
455        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
456        # is a year too far in the past. The `synchronize_times`
457        # method needs to deal with this and add the year "back".
458        # I don't think it's a bug in `parse_unix_time` because the
459        # method should work once the time shift is set correctly.
460        local_time = time.localtime()
461        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
462        presumed_server_time = \
463          time.mktime(local_time_with_wrong_year) + presumed_time_shift
464        host.path.set_mtime(presumed_server_time)
465        host.synchronize_times()
466        assert host.time_shift() == presumed_time_shift
467
468
469class TestAcceptEitherUnicodeOrBytes(object):
470    """
471    Test whether certain `FTPHost` methods accept either unicode
472    or byte strings for the path(s).
473    """
474
475    def setup_method(self, method):
476        self.host = test_base.ftp_host_factory()
477
478    def test_upload(self):
479        """Test whether `upload` accepts either unicode or bytes."""
480        host = self.host
481        # The source file needs to be present in the current directory.
482        host.upload("Makefile", "target")
483        host.upload("Makefile", ftputil.tool.as_bytes("target"))
484
485    def test_download(self):
486        """Test whether `download` accepts either unicode or bytes."""
487        host = test_base.ftp_host_factory(
488                 session_factory=BinaryDownloadMockSession)
489        local_file_name = "_local_target_"
490        host.download("source", local_file_name)
491        host.download(ftputil.tool.as_bytes("source"), local_file_name)
492        os.remove(local_file_name)
493
494    def test_rename(self):
495        """Test whether `rename` accepts either unicode or bytes."""
496        # It's possible to mix argument types, as for `os.rename`.
497        path_as_unicode = "/home/file_name_test/ä"
498        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
499        paths = [path_as_unicode, path_as_bytes]
500        for source_path, target_path in itertools.product(paths, paths):
501            self.host.rename(source_path, target_path)
502
503    def test_listdir(self):
504        """Test whether `listdir` accepts either unicode or bytes."""
505        host = self.host
506        as_bytes = ftputil.tool.as_bytes
507        host.chdir("/home/file_name_test")
508        # Unicode
509        items = host.listdir("ä")
510        assert items == ["ö", "o"]
511        #  Need explicit type check for Python 2
512        for item in items:
513            assert isinstance(item, ftputil.compat.unicode_type)
514        # Bytes
515        items = host.listdir(as_bytes("ä"))
516        assert items == [as_bytes("ö"), as_bytes("o")]
517        #  Need explicit type check for Python 2
518        for item in items:
519            assert isinstance(item, ftputil.compat.bytes_type)
520
521    def test_chmod(self):
522        """Test whether `chmod` accepts either unicode or bytes."""
523        host = self.host
524        # The `voidcmd` implementation in `MockSession` would raise an
525        # exception for the `CHMOD` command.
526        host._session.voidcmd = host._session._ignore_arguments
527        path = "/home/file_name_test/ä"
528        host.chmod(path, 0o755)
529        host.chmod(ftputil.tool.as_bytes(path), 0o755)
530
531    def _test_method_with_single_path_argument(self, method, path):
532        method(path)
533        method(ftputil.tool.as_bytes(path))
534
535    def test_chdir(self):
536        """Test whether `chdir` accepts either unicode or bytes."""
537        self._test_method_with_single_path_argument(
538          self.host.chdir, "/home/file_name_test/ö")
539
540    def test_mkdir(self):
541        """Test whether `mkdir` accepts either unicode or bytes."""
542        # This directory exists already in the mock session, but this
543        # shouldn't matter for the test.
544        self._test_method_with_single_path_argument(
545          self.host.mkdir, "/home/file_name_test/ä")
546
547    def test_makedirs(self):
548        """Test whether `makedirs` accepts either unicode or bytes."""
549        self._test_method_with_single_path_argument(
550          self.host.makedirs, "/home/file_name_test/ä")
551
552    def test_rmdir(self):
553        """Test whether `rmdir` accepts either unicode or bytes."""
554        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
555        self._test_method_with_single_path_argument(
556          self.host.rmdir, empty_directory_as_required_by_rmdir)
557
558    def test_remove(self):
559        """Test whether `remove` accepts either unicode or bytes."""
560        self._test_method_with_single_path_argument(
561          self.host.remove, "/home/file_name_test/ö")
562
563    def test_rmtree(self):
564        """Test whether `rmtree` accepts either unicode or bytes."""
565        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
566        self._test_method_with_single_path_argument(
567          self.host.rmtree, empty_directory_as_required_by_rmtree)
568
569    def test_lstat(self):
570        """Test whether `lstat` accepts either unicode or bytes."""
571        self._test_method_with_single_path_argument(
572          self.host.lstat, "/home/file_name_test/ä")
573
574    def test_stat(self):
575        """Test whether `stat` accepts either unicode or bytes."""
576        self._test_method_with_single_path_argument(
577          self.host.stat, "/home/file_name_test/ä")
578
579    def test_walk(self):
580        """Test whether `walk` accepts either unicode or bytes."""
581        # We're not interested in the return value of `walk`.
582        self._test_method_with_single_path_argument(
583          self.host.walk, "/home/file_name_test/ä")
584
585
586class TestFailingPickling(object):
587
588    def test_failing_pickling(self):
589        """Test if pickling (intentionally) isn't supported."""
590        with test_base.ftp_host_factory() as host:
591            with pytest.raises(TypeError):
592                pickle.dumps(host)
593            with host.open("/home/sschwarzer/index.html") as file_obj:
594                with pytest.raises(TypeError):
595                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.