source: test/test_host.py @ 1719:560867ae70f0

Last change on this file since 1719:560867ae70f0 was 1719:560867ae70f0, checked in by Stefan Schwarzer <sschwarzer@…>, 9 months ago
Remove `encoding: utf-8` lines These are no longer needed because we code for only Python 3 now and Python 3 uses UTF-8 encoding for source files by default.
File size: 21.6 KB
Line 
1# Copyright (C) 2002-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5import ftplib
6import itertools
7import os
8import pickle
9import posixpath
10import random
11import time
12import warnings
13
14import pytest
15
16import ftputil
17import ftputil.compat
18import ftputil.error
19import ftputil.tool
20import ftputil.stat
21
22from test import mock_ftplib
23from test import test_base
24
25
26#
27# Helper functions to generate random data
28#
29def random_data(pool, size=10000):
30    """
31    Return a byte string of characters consisting of those from the
32    pool of integer numbers.
33    """
34    ordinal_list = [random.choice(pool) for i in range(size)]
35    return ftputil.compat.bytes_from_ints(ordinal_list)
36
37
38def ascii_data():
39    r"""
40    Return a unicode string of "normal" ASCII characters, including `\r`.
41    """
42    pool = list(range(32, 128))
43    # The idea is to have the "\r" converted to "\n" during the later
44    # text write and check this conversion.
45    pool.append(ord("\r"))
46    return ftputil.tool.as_unicode(random_data(pool))
47
48
49def binary_data():
50    """Return a binary character byte string."""
51    pool = list(range(0, 256))
52    return random_data(pool)
53
54
55#
56# Several customized `MockSession` classes
57#
58class FailOnLoginSession(mock_ftplib.MockSession):
59
60    def __init__(self, host="", user="", password=""):
61        raise ftplib.error_perm
62
63
64class FailOnKeepAliveSession(mock_ftplib.MockSession):
65
66    def pwd(self):
67        # Raise exception on second call to let the constructor work.
68        if not hasattr(self, "pwd_called"):
69            self.pwd_called = True
70            return "/home"
71        else:
72            raise ftplib.error_temp
73
74
75class UnnormalizedCurrentWorkingDirectory(mock_ftplib.MockSession):
76
77    def pwd(self):
78        # Deliberately return the current working directory with a
79        # trailing slash to test if it's removed when stored in the
80        # `FTPHost` instance.
81        return "/home/"
82
83
84class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
85
86    dir_contents = {
87      ".": """\
88lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
89
90dev:
91total 10
92
93etc:
94total 10
95
96pub:
97total 4
98-rw-r--r--   1 staff         74 Sep 25  2000 .message
99----------   1 staff          0 Aug 16  2003 .notar
100drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
101
102usr:
103total 4""",
104
105      "": """\
106total 10
107lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
108d--x--x--x   2 staff        512 Sep 24  2000 dev
109d--x--x--x   3 staff        512 Sep 25  2000 etc
110dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
111d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
112
113    def _transform_path(self, path):
114        return path
115
116
117class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
118
119    mock_file_content = binary_data()
120
121
122class TimeShiftMockSession(mock_ftplib.MockSession):
123
124    def delete(self, file_name):
125        pass
126
127#
128# Customized `FTPHost` class for conditional upload/download tests
129# and time shift tests
130#
131class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
132
133    def upload(self, source, target, mode=""):
134        pytest.fail("`FTPHost.upload` should not have been called")
135
136    def download(self, source, target, mode=""):
137        pytest.fail("`FTPHost.download` should not have been called")
138
139
140class TimeShiftFTPHost(ftputil.FTPHost):
141
142    class _Path:
143        def split(self, path):
144            return posixpath.split(path)
145        def set_mtime(self, mtime):
146            self._mtime = mtime
147        def getmtime(self, file_name):
148            return self._mtime
149        def join(self, *args):
150            return posixpath.join(*args)
151        def normpath(self, path):
152            return posixpath.normpath(path)
153        def isabs(self, path):
154            return posixpath.isabs(path)
155        def abspath(self, path):
156            return "/home/sschwarzer/_ftputil_sync_"
157        # Needed for `isdir` in `FTPHost.remove`
158        def isfile(self, path):
159            return True
160
161    def __init__(self, *args, **kwargs):
162        ftputil.FTPHost.__init__(self, *args, **kwargs)
163        self.path = self._Path()
164
165#
166# Test cases
167#
168class TestConstructor:
169    """
170    Test initialization of `FTPHost` objects.
171    """
172
173    def test_open_and_close(self):
174        """
175        Test if opening and closing an `FTPHost` object works as
176        expected.
177        """
178        host = test_base.ftp_host_factory()
179        host.close()
180        assert host.closed is True
181        assert host._children == []
182
183    def test_invalid_login(self):
184        """Login to invalid host must fail."""
185        with pytest.raises(ftputil.error.FTPOSError):
186            test_base.ftp_host_factory(FailOnLoginSession)
187
188    def test_pwd_normalization(self):
189        """
190        Test if the stored current directory is normalized.
191        """
192        host = test_base.ftp_host_factory(UnnormalizedCurrentWorkingDirectory)
193        assert host.getcwd() == "/home"
194
195
196class TestKeepAlive:
197
198    def test_succeeding_keep_alive(self):
199        """Assume the connection is still alive."""
200        host = test_base.ftp_host_factory()
201        host.keep_alive()
202
203    def test_failing_keep_alive(self):
204        """Assume the connection has timed out, so `keep_alive` fails."""
205        host = test_base.ftp_host_factory(
206                 session_factory=FailOnKeepAliveSession)
207        with pytest.raises(ftputil.error.TemporaryError):
208            host.keep_alive()
209
210
211class TestSetParser:
212
213    class TrivialParser(ftputil.stat.Parser):
214        """
215        An instance of this parser always returns the same result
216        from its `parse_line` method. This is all we need to check
217        if ftputil uses the set parser. No actual parsing code is
218        required here.
219        """
220
221        def __init__(self):
222            # We can't use `os.stat("/home")` directly because we
223            # later need the object's `_st_name` attribute, which
224            # we can't set on a `os.stat` stat value.
225            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
226            default_stat_result._st_name = "home"
227            self.default_stat_result = default_stat_result
228
229        def parse_line(self, line, time_shift=0.0):
230            return self.default_stat_result
231
232    def test_set_parser(self):
233        """Test if the selected parser is used."""
234        host = test_base.ftp_host_factory()
235        assert host._stat._allow_parser_switching is True
236        trivial_parser = TestSetParser.TrivialParser()
237        host.set_parser(trivial_parser)
238        stat_result = host.stat("/home")
239        assert stat_result == trivial_parser.default_stat_result
240        assert host._stat._allow_parser_switching is False
241
242
243class TestCommandNotImplementedError:
244
245    def test_command_not_implemented_error(self):
246        """
247        Test if we get the anticipated exception if a command isn't
248        implemented by the server.
249        """
250        host = test_base.ftp_host_factory()
251        with pytest.raises(ftputil.error.CommandNotImplementedError):
252            host.chmod("nonexistent", 0o644)
253        # `CommandNotImplementedError` is a subclass of `PermanentError`.
254        with pytest.raises(ftputil.error.PermanentError):
255            host.chmod("nonexistent", 0o644)
256
257
258class TestRecursiveListingForDotAsPath:
259    """
260    Return a recursive directory listing when the path to list
261    is a dot. This is used to test for issue #33, see
262    http://ftputil.sschwarzer.net/trac/ticket/33 .
263    """
264
265    def test_recursive_listing(self):
266        host = test_base.ftp_host_factory(
267                 session_factory=RecursiveListingForDotAsPathSession)
268        lines = host._dir(host.curdir)
269        assert lines[0] == "total 10"
270        assert lines[1].startswith("lrwxrwxrwx   1 staff")
271        assert lines[2].startswith("d--x--x--x   2 staff")
272        host.close()
273
274    def test_plain_listing(self):
275        host = test_base.ftp_host_factory(
276                 session_factory=RecursiveListingForDotAsPathSession)
277        lines = host._dir("")
278        assert lines[0] == "total 10"
279        assert lines[1].startswith("lrwxrwxrwx   1 staff")
280        assert lines[2].startswith("d--x--x--x   2 staff")
281        host.close()
282
283    def test_empty_string_instead_of_dot_workaround(self):
284        host = test_base.ftp_host_factory(
285                 session_factory=RecursiveListingForDotAsPathSession)
286        files = host.listdir(host.curdir)
287        assert files == ["bin", "dev", "etc", "pub", "usr"]
288        host.close()
289
290
291class TestUploadAndDownload:
292    """Test ASCII upload and binary download as examples."""
293
294    def generate_file(self, data, file_name):
295        """Generate a local data file."""
296        with open(file_name, "wb") as source_file:
297            source_file.write(data)
298
299    def test_download(self):
300        """Test mode download."""
301        local_target = "_test_target_"
302        host = test_base.ftp_host_factory(
303                 session_factory=BinaryDownloadMockSession)
304        # Download
305        host.download("dummy", local_target)
306        # Read file and compare
307        with open(local_target, "rb") as fobj:
308            data = fobj.read()
309        remote_file_content = mock_ftplib.content_of("dummy")
310        assert data == remote_file_content
311        # Clean up
312        os.unlink(local_target)
313
314    def test_conditional_upload(self):
315        """Test conditional upload."""
316        local_source = "_test_source_"
317        data = binary_data()
318        self.generate_file(data, local_source)
319        # Target is newer, so don't upload.
320        host = test_base.ftp_host_factory(
321                 ftp_host_class=FailingUploadAndDownloadFTPHost)
322        flag = host.upload_if_newer(local_source, "/home/newer")
323        assert flag is False
324        # Target is older, so upload.
325        host = test_base.ftp_host_factory()
326        flag = host.upload_if_newer(local_source, "/home/older")
327        assert flag is True
328        remote_file_content = mock_ftplib.content_of("older")
329        assert data == remote_file_content
330        # Target doesn't exist, so upload.
331        host = test_base.ftp_host_factory()
332        flag = host.upload_if_newer(local_source, "/home/notthere")
333        assert flag is True
334        remote_file_content = mock_ftplib.content_of("notthere")
335        assert data == remote_file_content
336        # Clean up.
337        os.unlink(local_source)
338
339    def compare_and_delete_downloaded_data(self, file_name):
340        """
341        Compare content of downloaded file with its source, then
342        delete the local target file.
343        """
344        with open(file_name, "rb") as fobj:
345            data = fobj.read()
346        # The name `newer` is used by all callers, so use it here, too.
347        remote_file_content = mock_ftplib.content_of("newer")
348        assert data == remote_file_content
349        # Clean up
350        os.unlink(file_name)
351
352    def test_conditional_download_without_target(self):
353        """
354        Test conditional binary mode download when no target file
355        exists.
356        """
357        local_target = "_test_target_"
358        # Target does not exist, so download.
359        host = test_base.ftp_host_factory(
360                 session_factory=BinaryDownloadMockSession)
361        flag = host.download_if_newer("/home/newer", local_target)
362        assert flag is True
363        self.compare_and_delete_downloaded_data(local_target)
364
365    def test_conditional_download_with_older_target(self):
366        """Test conditional binary mode download with newer source file."""
367        local_target = "_test_target_"
368        # Make target file.
369        open(local_target, "w").close()
370        # Source is newer (date in 2020), so download.
371        host = test_base.ftp_host_factory(
372                 session_factory=BinaryDownloadMockSession)
373        flag = host.download_if_newer("/home/newer", local_target)
374        assert flag is True
375        self.compare_and_delete_downloaded_data(local_target)
376
377    def test_conditional_download_with_newer_target(self):
378        """Test conditional binary mode download with older source file."""
379        local_target = "_test_target_"
380        # Make target file.
381        open(local_target, "w").close()
382        # Source is older (date in 1970), so don't download.
383        host = test_base.ftp_host_factory(
384                 ftp_host_class=FailingUploadAndDownloadFTPHost,
385                 session_factory=BinaryDownloadMockSession)
386        flag = host.download_if_newer("/home/older", local_target)
387        assert flag is False
388        # Remove target file
389        os.unlink(local_target)
390
391
392class TestTimeShift:
393
394    def test_rounded_time_shift(self):
395        """Test if time shift is rounded correctly."""
396        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
397        # Use private bound method.
398        rounded_time_shift = host._FTPHost__rounded_time_shift
399        # Pairs consisting of original value and expected result
400        test_data = [
401          (      0,           0),
402          (      0.1,         0),
403          (     -0.1,         0),
404          (   1500,        1800),
405          (  -1500,       -1800),
406          (   1800,        1800),
407          (  -1800,       -1800),
408          (   2000,        1800),
409          (  -2000,       -1800),
410          ( 5*3600-100,  5*3600),
411          (-5*3600+100, -5*3600)]
412        for time_shift, expected_time_shift in test_data:
413            calculated_time_shift = rounded_time_shift(time_shift)
414            assert calculated_time_shift == expected_time_shift
415
416    def test_assert_valid_time_shift(self):
417        """Test time shift sanity checks."""
418        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
419        # Use private bound method.
420        assert_time_shift = host._FTPHost__assert_valid_time_shift
421        # Valid time shifts
422        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
423        for time_shift in test_data:
424            assert assert_time_shift(time_shift) is None
425        # Invalid time shift (exceeds one day)
426        with pytest.raises(ftputil.error.TimeShiftError):
427            assert_time_shift(25*3600)
428        with pytest.raises(ftputil.error.TimeShiftError):
429            assert_time_shift(-25*3600)
430        # Invalid time shift (too large deviation from 15-minute units
431        # is unacceptable)
432        with pytest.raises(ftputil.error.TimeShiftError):
433            assert_time_shift(8*60)
434        with pytest.raises(ftputil.error.TimeShiftError):
435            assert_time_shift(-3600-8*60)
436
437    def test_synchronize_times(self):
438        """Test time synchronization with server."""
439        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
440                                          session_factory=TimeShiftMockSession)
441        # Valid time shifts
442        test_data = [
443          (60*60+30,  60*60),
444          (60*60-100, 60*60),
445          (30*60+100, 30*60),
446          (45*60-100, 45*60),
447        ]
448        for measured_time_shift, expected_time_shift in test_data:
449            host.path.set_mtime(time.time() + measured_time_shift)
450            host.synchronize_times()
451            assert host.time_shift() == expected_time_shift
452        # Invalid time shifts
453        measured_time_shifts = [60*60+8*60, 45*60-6*60]
454        for measured_time_shift in measured_time_shifts:
455            host.path.set_mtime(time.time() + measured_time_shift)
456            with pytest.raises(ftputil.error.TimeShiftError):
457                host.synchronize_times()
458
459    def test_synchronize_times_for_server_in_east(self):
460        """Test for timestamp correction (see ticket #55)."""
461        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
462                                          session_factory=TimeShiftMockSession)
463        # Set this explicitly to emphasize the problem.
464        host.set_time_shift(0.0)
465        hour = 60 * 60
466        # This could be any negative time shift.
467        presumed_time_shift = -6 * hour
468        # Set `mtime` to simulate a server east of us.
469        # In case the `time_shift` value for this host instance is 0.0
470        # (as is to be expected before the time shift is determined),
471        # the directory parser (more specifically
472        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
473        # is a year too far in the past. The `synchronize_times`
474        # method needs to deal with this and add the year "back".
475        # I don't think it's a bug in `parse_unix_time` because the
476        # method should work once the time shift is set correctly.
477        local_time = time.localtime()
478        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
479        presumed_server_time = \
480          time.mktime(local_time_with_wrong_year) + presumed_time_shift
481        host.path.set_mtime(presumed_server_time)
482        host.synchronize_times()
483        assert host.time_shift() == presumed_time_shift
484
485
486class TestAcceptEitherUnicodeOrBytes:
487    """
488    Test whether certain `FTPHost` methods accept either unicode
489    or byte strings for the path(s).
490    """
491
492    def setup_method(self, method):
493        self.host = test_base.ftp_host_factory()
494
495    def test_upload(self):
496        """Test whether `upload` accepts either unicode or bytes."""
497        host = self.host
498        # The source file needs to be present in the current directory.
499        host.upload("Makefile", "target")
500        host.upload("Makefile", ftputil.tool.as_bytes("target"))
501
502    def test_download(self):
503        """Test whether `download` accepts either unicode or bytes."""
504        host = test_base.ftp_host_factory(
505                 session_factory=BinaryDownloadMockSession)
506        local_file_name = "_local_target_"
507        host.download("source", local_file_name)
508        host.download(ftputil.tool.as_bytes("source"), local_file_name)
509        os.remove(local_file_name)
510
511    def test_rename(self):
512        """Test whether `rename` accepts either unicode or bytes."""
513        # It's possible to mix argument types, as for `os.rename`.
514        path_as_unicode = "/home/file_name_test/ä"
515        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
516        paths = [path_as_unicode, path_as_bytes]
517        for source_path, target_path in itertools.product(paths, paths):
518            self.host.rename(source_path, target_path)
519
520    def test_listdir(self):
521        """Test whether `listdir` accepts either unicode or bytes."""
522        host = self.host
523        as_bytes = ftputil.tool.as_bytes
524        host.chdir("/home/file_name_test")
525        # Unicode
526        items = host.listdir("ä")
527        assert items == ["ö", "o"]
528        #  Need explicit type check for Python 2
529        for item in items:
530            assert isinstance(item, ftputil.compat.unicode_type)
531        # Bytes
532        items = host.listdir(as_bytes("ä"))
533        assert items == [as_bytes("ö"), as_bytes("o")]
534        #  Need explicit type check for Python 2
535        for item in items:
536            assert isinstance(item, ftputil.compat.bytes_type)
537
538    def test_chmod(self):
539        """Test whether `chmod` accepts either unicode or bytes."""
540        host = self.host
541        # The `voidcmd` implementation in `MockSession` would raise an
542        # exception for the `CHMOD` command.
543        host._session.voidcmd = host._session._ignore_arguments
544        path = "/home/file_name_test/ä"
545        host.chmod(path, 0o755)
546        host.chmod(ftputil.tool.as_bytes(path), 0o755)
547
548    def _test_method_with_single_path_argument(self, method, path):
549        method(path)
550        method(ftputil.tool.as_bytes(path))
551
552    def test_chdir(self):
553        """Test whether `chdir` accepts either unicode or bytes."""
554        self._test_method_with_single_path_argument(
555          self.host.chdir, "/home/file_name_test/ö")
556
557    def test_mkdir(self):
558        """Test whether `mkdir` accepts either unicode or bytes."""
559        # This directory exists already in the mock session, but this
560        # shouldn't matter for the test.
561        self._test_method_with_single_path_argument(
562          self.host.mkdir, "/home/file_name_test/ä")
563
564    def test_makedirs(self):
565        """Test whether `makedirs` accepts either unicode or bytes."""
566        self._test_method_with_single_path_argument(
567          self.host.makedirs, "/home/file_name_test/ä")
568
569    def test_rmdir(self):
570        """Test whether `rmdir` accepts either unicode or bytes."""
571        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
572        self._test_method_with_single_path_argument(
573          self.host.rmdir, empty_directory_as_required_by_rmdir)
574
575    def test_remove(self):
576        """Test whether `remove` accepts either unicode or bytes."""
577        self._test_method_with_single_path_argument(
578          self.host.remove, "/home/file_name_test/ö")
579
580    def test_rmtree(self):
581        """Test whether `rmtree` accepts either unicode or bytes."""
582        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
583        self._test_method_with_single_path_argument(
584          self.host.rmtree, empty_directory_as_required_by_rmtree)
585
586    def test_lstat(self):
587        """Test whether `lstat` accepts either unicode or bytes."""
588        self._test_method_with_single_path_argument(
589          self.host.lstat, "/home/file_name_test/ä")
590
591    def test_stat(self):
592        """Test whether `stat` accepts either unicode or bytes."""
593        self._test_method_with_single_path_argument(
594          self.host.stat, "/home/file_name_test/ä")
595
596    def test_walk(self):
597        """Test whether `walk` accepts either unicode or bytes."""
598        # We're not interested in the return value of `walk`.
599        self._test_method_with_single_path_argument(
600          self.host.walk, "/home/file_name_test/ä")
601
602
603class TestFailingPickling:
604
605    def test_failing_pickling(self):
606        """Test if pickling (intentionally) isn't supported."""
607        with test_base.ftp_host_factory() as host:
608            with pytest.raises(TypeError):
609                pickle.dumps(host)
610            with host.open("/home/sschwarzer/index.html") as file_obj:
611                with pytest.raises(TypeError):
612                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.