source: test/test_host.py @ 1721:3557f65ded13

Last change on this file since 1721:3557f65ded13 was 1721:3557f65ded13, checked in by Stefan Schwarzer <sschwarzer@…>, 2 years ago
Remove `compat.py` This module was for Python 2/3 compatibility, similar to the `six` package. Since the next version of ftputil will only support Python 3, hardcode the types from `compat.py` in the code that used to use the `compat` module. There may still be redundant code that isn't needed when running the tests under Python 3.
File size: 21.3 KB
Line 
1# Copyright (C) 2002-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5import ftplib
6import itertools
7import os
8import pickle
9import posixpath
10import random
11import time
12import warnings
13
14import pytest
15
16import ftputil
17import ftputil.error
18import ftputil.tool
19import ftputil.stat
20
21from test import mock_ftplib
22from test import test_base
23
24
25#
26# Helper functions to generate random data
27#
28def random_data(pool, size=10000):
29    """
30    Return a byte string of characters consisting of those from the
31    pool of integer numbers.
32    """
33    ordinal_list = [random.choice(pool) for i in range(size)]
34    return bytes(ordinal_list)
35
36
37def ascii_data():
38    r"""
39    Return a unicode string of "normal" ASCII characters, including `\r`.
40    """
41    pool = list(range(32, 128))
42    # The idea is to have the "\r" converted to "\n" during the later
43    # text write and check this conversion.
44    pool.append(ord("\r"))
45    return ftputil.tool.as_unicode(random_data(pool))
46
47
48def binary_data():
49    """Return a binary character byte string."""
50    pool = list(range(0, 256))
51    return random_data(pool)
52
53
54#
55# Several customized `MockSession` classes
56#
57class FailOnLoginSession(mock_ftplib.MockSession):
58
59    def __init__(self, host="", user="", password=""):
60        raise ftplib.error_perm
61
62
63class FailOnKeepAliveSession(mock_ftplib.MockSession):
64
65    def pwd(self):
66        # Raise exception on second call to let the constructor work.
67        if not hasattr(self, "pwd_called"):
68            self.pwd_called = True
69            return "/home"
70        else:
71            raise ftplib.error_temp
72
73
74class UnnormalizedCurrentWorkingDirectory(mock_ftplib.MockSession):
75
76    def pwd(self):
77        # Deliberately return the current working directory with a
78        # trailing slash to test if it's removed when stored in the
79        # `FTPHost` instance.
80        return "/home/"
81
82
83class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
84
85    dir_contents = {
86      ".": """\
87lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
88
89dev:
90total 10
91
92etc:
93total 10
94
95pub:
96total 4
97-rw-r--r--   1 staff         74 Sep 25  2000 .message
98----------   1 staff          0 Aug 16  2003 .notar
99drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
100
101usr:
102total 4""",
103
104      "": """\
105total 10
106lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
107d--x--x--x   2 staff        512 Sep 24  2000 dev
108d--x--x--x   3 staff        512 Sep 25  2000 etc
109dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
110d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
111
112    def _transform_path(self, path):
113        return path
114
115
116class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
117
118    mock_file_content = binary_data()
119
120
121class TimeShiftMockSession(mock_ftplib.MockSession):
122
123    def delete(self, file_name):
124        pass
125
126#
127# Customized `FTPHost` class for conditional upload/download tests
128# and time shift tests
129#
130class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
131
132    def upload(self, source, target, mode=""):
133        pytest.fail("`FTPHost.upload` should not have been called")
134
135    def download(self, source, target, mode=""):
136        pytest.fail("`FTPHost.download` should not have been called")
137
138
139class TimeShiftFTPHost(ftputil.FTPHost):
140
141    class _Path:
142        def split(self, path):
143            return posixpath.split(path)
144        def set_mtime(self, mtime):
145            self._mtime = mtime
146        def getmtime(self, file_name):
147            return self._mtime
148        def join(self, *args):
149            return posixpath.join(*args)
150        def normpath(self, path):
151            return posixpath.normpath(path)
152        def isabs(self, path):
153            return posixpath.isabs(path)
154        def abspath(self, path):
155            return "/home/sschwarzer/_ftputil_sync_"
156        # Needed for `isdir` in `FTPHost.remove`
157        def isfile(self, path):
158            return True
159
160    def __init__(self, *args, **kwargs):
161        ftputil.FTPHost.__init__(self, *args, **kwargs)
162        self.path = self._Path()
163
164#
165# Test cases
166#
167class TestConstructor:
168    """
169    Test initialization of `FTPHost` objects.
170    """
171
172    def test_open_and_close(self):
173        """
174        Test if opening and closing an `FTPHost` object works as
175        expected.
176        """
177        host = test_base.ftp_host_factory()
178        host.close()
179        assert host.closed is True
180        assert host._children == []
181
182    def test_invalid_login(self):
183        """Login to invalid host must fail."""
184        with pytest.raises(ftputil.error.FTPOSError):
185            test_base.ftp_host_factory(FailOnLoginSession)
186
187    def test_pwd_normalization(self):
188        """
189        Test if the stored current directory is normalized.
190        """
191        host = test_base.ftp_host_factory(UnnormalizedCurrentWorkingDirectory)
192        assert host.getcwd() == "/home"
193
194
195class TestKeepAlive:
196
197    def test_succeeding_keep_alive(self):
198        """Assume the connection is still alive."""
199        host = test_base.ftp_host_factory()
200        host.keep_alive()
201
202    def test_failing_keep_alive(self):
203        """Assume the connection has timed out, so `keep_alive` fails."""
204        host = test_base.ftp_host_factory(
205                 session_factory=FailOnKeepAliveSession)
206        with pytest.raises(ftputil.error.TemporaryError):
207            host.keep_alive()
208
209
210class TestSetParser:
211
212    class TrivialParser(ftputil.stat.Parser):
213        """
214        An instance of this parser always returns the same result
215        from its `parse_line` method. This is all we need to check
216        if ftputil uses the set parser. No actual parsing code is
217        required here.
218        """
219
220        def __init__(self):
221            # We can't use `os.stat("/home")` directly because we
222            # later need the object's `_st_name` attribute, which
223            # we can't set on a `os.stat` stat value.
224            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
225            default_stat_result._st_name = "home"
226            self.default_stat_result = default_stat_result
227
228        def parse_line(self, line, time_shift=0.0):
229            return self.default_stat_result
230
231    def test_set_parser(self):
232        """Test if the selected parser is used."""
233        host = test_base.ftp_host_factory()
234        assert host._stat._allow_parser_switching is True
235        trivial_parser = TestSetParser.TrivialParser()
236        host.set_parser(trivial_parser)
237        stat_result = host.stat("/home")
238        assert stat_result == trivial_parser.default_stat_result
239        assert host._stat._allow_parser_switching is False
240
241
242class TestCommandNotImplementedError:
243
244    def test_command_not_implemented_error(self):
245        """
246        Test if we get the anticipated exception if a command isn't
247        implemented by the server.
248        """
249        host = test_base.ftp_host_factory()
250        with pytest.raises(ftputil.error.CommandNotImplementedError):
251            host.chmod("nonexistent", 0o644)
252        # `CommandNotImplementedError` is a subclass of `PermanentError`.
253        with pytest.raises(ftputil.error.PermanentError):
254            host.chmod("nonexistent", 0o644)
255
256
257class TestRecursiveListingForDotAsPath:
258    """
259    Return a recursive directory listing when the path to list
260    is a dot. This is used to test for issue #33, see
261    http://ftputil.sschwarzer.net/trac/ticket/33 .
262    """
263
264    def test_recursive_listing(self):
265        host = test_base.ftp_host_factory(
266                 session_factory=RecursiveListingForDotAsPathSession)
267        lines = host._dir(host.curdir)
268        assert lines[0] == "total 10"
269        assert lines[1].startswith("lrwxrwxrwx   1 staff")
270        assert lines[2].startswith("d--x--x--x   2 staff")
271        host.close()
272
273    def test_plain_listing(self):
274        host = test_base.ftp_host_factory(
275                 session_factory=RecursiveListingForDotAsPathSession)
276        lines = host._dir("")
277        assert lines[0] == "total 10"
278        assert lines[1].startswith("lrwxrwxrwx   1 staff")
279        assert lines[2].startswith("d--x--x--x   2 staff")
280        host.close()
281
282    def test_empty_string_instead_of_dot_workaround(self):
283        host = test_base.ftp_host_factory(
284                 session_factory=RecursiveListingForDotAsPathSession)
285        files = host.listdir(host.curdir)
286        assert files == ["bin", "dev", "etc", "pub", "usr"]
287        host.close()
288
289
290class TestUploadAndDownload:
291    """Test ASCII upload and binary download as examples."""
292
293    def generate_file(self, data, file_name):
294        """Generate a local data file."""
295        with open(file_name, "wb") as source_file:
296            source_file.write(data)
297
298    def test_download(self):
299        """Test mode download."""
300        local_target = "_test_target_"
301        host = test_base.ftp_host_factory(
302                 session_factory=BinaryDownloadMockSession)
303        # Download
304        host.download("dummy", local_target)
305        # Read file and compare
306        with open(local_target, "rb") as fobj:
307            data = fobj.read()
308        remote_file_content = mock_ftplib.content_of("dummy")
309        assert data == remote_file_content
310        # Clean up
311        os.unlink(local_target)
312
313    def test_conditional_upload(self):
314        """Test conditional upload."""
315        local_source = "_test_source_"
316        data = binary_data()
317        self.generate_file(data, local_source)
318        # Target is newer, so don't upload.
319        host = test_base.ftp_host_factory(
320                 ftp_host_class=FailingUploadAndDownloadFTPHost)
321        flag = host.upload_if_newer(local_source, "/home/newer")
322        assert flag is False
323        # Target is older, so upload.
324        host = test_base.ftp_host_factory()
325        flag = host.upload_if_newer(local_source, "/home/older")
326        assert flag is True
327        remote_file_content = mock_ftplib.content_of("older")
328        assert data == remote_file_content
329        # Target doesn't exist, so upload.
330        host = test_base.ftp_host_factory()
331        flag = host.upload_if_newer(local_source, "/home/notthere")
332        assert flag is True
333        remote_file_content = mock_ftplib.content_of("notthere")
334        assert data == remote_file_content
335        # Clean up.
336        os.unlink(local_source)
337
338    def compare_and_delete_downloaded_data(self, file_name):
339        """
340        Compare content of downloaded file with its source, then
341        delete the local target file.
342        """
343        with open(file_name, "rb") as fobj:
344            data = fobj.read()
345        # The name `newer` is used by all callers, so use it here, too.
346        remote_file_content = mock_ftplib.content_of("newer")
347        assert data == remote_file_content
348        # Clean up
349        os.unlink(file_name)
350
351    def test_conditional_download_without_target(self):
352        """
353        Test conditional binary mode download when no target file
354        exists.
355        """
356        local_target = "_test_target_"
357        # Target does not exist, so download.
358        host = test_base.ftp_host_factory(
359                 session_factory=BinaryDownloadMockSession)
360        flag = host.download_if_newer("/home/newer", local_target)
361        assert flag is True
362        self.compare_and_delete_downloaded_data(local_target)
363
364    def test_conditional_download_with_older_target(self):
365        """Test conditional binary mode download with newer source file."""
366        local_target = "_test_target_"
367        # Make target file.
368        open(local_target, "w").close()
369        # Source is newer (date in 2020), so download.
370        host = test_base.ftp_host_factory(
371                 session_factory=BinaryDownloadMockSession)
372        flag = host.download_if_newer("/home/newer", local_target)
373        assert flag is True
374        self.compare_and_delete_downloaded_data(local_target)
375
376    def test_conditional_download_with_newer_target(self):
377        """Test conditional binary mode download with older source file."""
378        local_target = "_test_target_"
379        # Make target file.
380        open(local_target, "w").close()
381        # Source is older (date in 1970), so don't download.
382        host = test_base.ftp_host_factory(
383                 ftp_host_class=FailingUploadAndDownloadFTPHost,
384                 session_factory=BinaryDownloadMockSession)
385        flag = host.download_if_newer("/home/older", local_target)
386        assert flag is False
387        # Remove target file
388        os.unlink(local_target)
389
390
391class TestTimeShift:
392
393    def test_rounded_time_shift(self):
394        """Test if time shift is rounded correctly."""
395        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
396        # Use private bound method.
397        rounded_time_shift = host._FTPHost__rounded_time_shift
398        # Pairs consisting of original value and expected result
399        test_data = [
400          (      0,           0),
401          (      0.1,         0),
402          (     -0.1,         0),
403          (   1500,        1800),
404          (  -1500,       -1800),
405          (   1800,        1800),
406          (  -1800,       -1800),
407          (   2000,        1800),
408          (  -2000,       -1800),
409          ( 5*3600-100,  5*3600),
410          (-5*3600+100, -5*3600)]
411        for time_shift, expected_time_shift in test_data:
412            calculated_time_shift = rounded_time_shift(time_shift)
413            assert calculated_time_shift == expected_time_shift
414
415    def test_assert_valid_time_shift(self):
416        """Test time shift sanity checks."""
417        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
418        # Use private bound method.
419        assert_time_shift = host._FTPHost__assert_valid_time_shift
420        # Valid time shifts
421        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
422        for time_shift in test_data:
423            assert assert_time_shift(time_shift) is None
424        # Invalid time shift (exceeds one day)
425        with pytest.raises(ftputil.error.TimeShiftError):
426            assert_time_shift(25*3600)
427        with pytest.raises(ftputil.error.TimeShiftError):
428            assert_time_shift(-25*3600)
429        # Invalid time shift (too large deviation from 15-minute units
430        # is unacceptable)
431        with pytest.raises(ftputil.error.TimeShiftError):
432            assert_time_shift(8*60)
433        with pytest.raises(ftputil.error.TimeShiftError):
434            assert_time_shift(-3600-8*60)
435
436    def test_synchronize_times(self):
437        """Test time synchronization with server."""
438        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
439                                          session_factory=TimeShiftMockSession)
440        # Valid time shifts
441        test_data = [
442          (60*60+30,  60*60),
443          (60*60-100, 60*60),
444          (30*60+100, 30*60),
445          (45*60-100, 45*60),
446        ]
447        for measured_time_shift, expected_time_shift in test_data:
448            host.path.set_mtime(time.time() + measured_time_shift)
449            host.synchronize_times()
450            assert host.time_shift() == expected_time_shift
451        # Invalid time shifts
452        measured_time_shifts = [60*60+8*60, 45*60-6*60]
453        for measured_time_shift in measured_time_shifts:
454            host.path.set_mtime(time.time() + measured_time_shift)
455            with pytest.raises(ftputil.error.TimeShiftError):
456                host.synchronize_times()
457
458    def test_synchronize_times_for_server_in_east(self):
459        """Test for timestamp correction (see ticket #55)."""
460        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
461                                          session_factory=TimeShiftMockSession)
462        # Set this explicitly to emphasize the problem.
463        host.set_time_shift(0.0)
464        hour = 60 * 60
465        # This could be any negative time shift.
466        presumed_time_shift = -6 * hour
467        # Set `mtime` to simulate a server east of us.
468        # In case the `time_shift` value for this host instance is 0.0
469        # (as is to be expected before the time shift is determined),
470        # the directory parser (more specifically
471        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
472        # is a year too far in the past. The `synchronize_times`
473        # method needs to deal with this and add the year "back".
474        # I don't think it's a bug in `parse_unix_time` because the
475        # method should work once the time shift is set correctly.
476        local_time = time.localtime()
477        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
478        presumed_server_time = \
479          time.mktime(local_time_with_wrong_year) + presumed_time_shift
480        host.path.set_mtime(presumed_server_time)
481        host.synchronize_times()
482        assert host.time_shift() == presumed_time_shift
483
484
485class TestAcceptEitherUnicodeOrBytes:
486    """
487    Test whether certain `FTPHost` methods accept either unicode
488    or byte strings for the path(s).
489    """
490
491    def setup_method(self, method):
492        self.host = test_base.ftp_host_factory()
493
494    def test_upload(self):
495        """Test whether `upload` accepts either unicode or bytes."""
496        host = self.host
497        # The source file needs to be present in the current directory.
498        host.upload("Makefile", "target")
499        host.upload("Makefile", ftputil.tool.as_bytes("target"))
500
501    def test_download(self):
502        """Test whether `download` accepts either unicode or bytes."""
503        host = test_base.ftp_host_factory(
504                 session_factory=BinaryDownloadMockSession)
505        local_file_name = "_local_target_"
506        host.download("source", local_file_name)
507        host.download(ftputil.tool.as_bytes("source"), local_file_name)
508        os.remove(local_file_name)
509
510    def test_rename(self):
511        """Test whether `rename` accepts either unicode or bytes."""
512        # It's possible to mix argument types, as for `os.rename`.
513        path_as_unicode = "/home/file_name_test/ä"
514        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
515        paths = [path_as_unicode, path_as_bytes]
516        for source_path, target_path in itertools.product(paths, paths):
517            self.host.rename(source_path, target_path)
518
519    def test_listdir(self):
520        """Test whether `listdir` accepts either unicode or bytes."""
521        host = self.host
522        as_bytes = ftputil.tool.as_bytes
523        host.chdir("/home/file_name_test")
524        # Unicode
525        items = host.listdir("ä")
526        assert items == ["ö", "o"]
527        # Bytes
528        items = host.listdir(as_bytes("ä"))
529        assert items == [as_bytes("ö"), as_bytes("o")]
530
531    def test_chmod(self):
532        """Test whether `chmod` accepts either unicode or bytes."""
533        host = self.host
534        # The `voidcmd` implementation in `MockSession` would raise an
535        # exception for the `CHMOD` command.
536        host._session.voidcmd = host._session._ignore_arguments
537        path = "/home/file_name_test/ä"
538        host.chmod(path, 0o755)
539        host.chmod(ftputil.tool.as_bytes(path), 0o755)
540
541    def _test_method_with_single_path_argument(self, method, path):
542        method(path)
543        method(ftputil.tool.as_bytes(path))
544
545    def test_chdir(self):
546        """Test whether `chdir` accepts either unicode or bytes."""
547        self._test_method_with_single_path_argument(
548          self.host.chdir, "/home/file_name_test/ö")
549
550    def test_mkdir(self):
551        """Test whether `mkdir` accepts either unicode or bytes."""
552        # This directory exists already in the mock session, but this
553        # shouldn't matter for the test.
554        self._test_method_with_single_path_argument(
555          self.host.mkdir, "/home/file_name_test/ä")
556
557    def test_makedirs(self):
558        """Test whether `makedirs` accepts either unicode or bytes."""
559        self._test_method_with_single_path_argument(
560          self.host.makedirs, "/home/file_name_test/ä")
561
562    def test_rmdir(self):
563        """Test whether `rmdir` accepts either unicode or bytes."""
564        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
565        self._test_method_with_single_path_argument(
566          self.host.rmdir, empty_directory_as_required_by_rmdir)
567
568    def test_remove(self):
569        """Test whether `remove` accepts either unicode or bytes."""
570        self._test_method_with_single_path_argument(
571          self.host.remove, "/home/file_name_test/ö")
572
573    def test_rmtree(self):
574        """Test whether `rmtree` accepts either unicode or bytes."""
575        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
576        self._test_method_with_single_path_argument(
577          self.host.rmtree, empty_directory_as_required_by_rmtree)
578
579    def test_lstat(self):
580        """Test whether `lstat` accepts either unicode or bytes."""
581        self._test_method_with_single_path_argument(
582          self.host.lstat, "/home/file_name_test/ä")
583
584    def test_stat(self):
585        """Test whether `stat` accepts either unicode or bytes."""
586        self._test_method_with_single_path_argument(
587          self.host.stat, "/home/file_name_test/ä")
588
589    def test_walk(self):
590        """Test whether `walk` accepts either unicode or bytes."""
591        # We're not interested in the return value of `walk`.
592        self._test_method_with_single_path_argument(
593          self.host.walk, "/home/file_name_test/ä")
594
595
596class TestFailingPickling:
597
598    def test_failing_pickling(self):
599        """Test if pickling (intentionally) isn't supported."""
600        with test_base.ftp_host_factory() as host:
601            with pytest.raises(TypeError):
602                pickle.dumps(host)
603            with host.open("/home/sschwarzer/index.html") as file_obj:
604                with pytest.raises(TypeError):
605                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.