source: test/test_host.py @ 1718:8bed138bc404

Last change on this file since 1718:8bed138bc404 was 1718:8bed138bc404, checked in by Stefan Schwarzer <sschwarzer@…>, 6 months ago
Don't inherit from `object` This is no longer needed because we're targeting Python 3 only now.
File size: 21.7 KB
Line 
1# encoding: utf-8
2# Copyright (C) 2002-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6import ftplib
7import itertools
8import os
9import pickle
10import posixpath
11import random
12import time
13import warnings
14
15import pytest
16
17import ftputil
18import ftputil.compat
19import ftputil.error
20import ftputil.tool
21import ftputil.stat
22
23from test import mock_ftplib
24from test import test_base
25
26
27#
28# Helper functions to generate random data
29#
30def random_data(pool, size=10000):
31    """
32    Return a byte string of characters consisting of those from the
33    pool of integer numbers.
34    """
35    ordinal_list = [random.choice(pool) for i in range(size)]
36    return ftputil.compat.bytes_from_ints(ordinal_list)
37
38
39def ascii_data():
40    r"""
41    Return a unicode string of "normal" ASCII characters, including `\r`.
42    """
43    pool = list(range(32, 128))
44    # The idea is to have the "\r" converted to "\n" during the later
45    # text write and check this conversion.
46    pool.append(ord("\r"))
47    return ftputil.tool.as_unicode(random_data(pool))
48
49
50def binary_data():
51    """Return a binary character byte string."""
52    pool = list(range(0, 256))
53    return random_data(pool)
54
55
56#
57# Several customized `MockSession` classes
58#
59class FailOnLoginSession(mock_ftplib.MockSession):
60
61    def __init__(self, host="", user="", password=""):
62        raise ftplib.error_perm
63
64
65class FailOnKeepAliveSession(mock_ftplib.MockSession):
66
67    def pwd(self):
68        # Raise exception on second call to let the constructor work.
69        if not hasattr(self, "pwd_called"):
70            self.pwd_called = True
71            return "/home"
72        else:
73            raise ftplib.error_temp
74
75
76class UnnormalizedCurrentWorkingDirectory(mock_ftplib.MockSession):
77
78    def pwd(self):
79        # Deliberately return the current working directory with a
80        # trailing slash to test if it's removed when stored in the
81        # `FTPHost` instance.
82        return "/home/"
83
84
85class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
86
87    dir_contents = {
88      ".": """\
89lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
90
91dev:
92total 10
93
94etc:
95total 10
96
97pub:
98total 4
99-rw-r--r--   1 staff         74 Sep 25  2000 .message
100----------   1 staff          0 Aug 16  2003 .notar
101drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
102
103usr:
104total 4""",
105
106      "": """\
107total 10
108lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
109d--x--x--x   2 staff        512 Sep 24  2000 dev
110d--x--x--x   3 staff        512 Sep 25  2000 etc
111dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
112d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
113
114    def _transform_path(self, path):
115        return path
116
117
118class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
119
120    mock_file_content = binary_data()
121
122
123class TimeShiftMockSession(mock_ftplib.MockSession):
124
125    def delete(self, file_name):
126        pass
127
128#
129# Customized `FTPHost` class for conditional upload/download tests
130# and time shift tests
131#
132class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
133
134    def upload(self, source, target, mode=""):
135        pytest.fail("`FTPHost.upload` should not have been called")
136
137    def download(self, source, target, mode=""):
138        pytest.fail("`FTPHost.download` should not have been called")
139
140
141class TimeShiftFTPHost(ftputil.FTPHost):
142
143    class _Path:
144        def split(self, path):
145            return posixpath.split(path)
146        def set_mtime(self, mtime):
147            self._mtime = mtime
148        def getmtime(self, file_name):
149            return self._mtime
150        def join(self, *args):
151            return posixpath.join(*args)
152        def normpath(self, path):
153            return posixpath.normpath(path)
154        def isabs(self, path):
155            return posixpath.isabs(path)
156        def abspath(self, path):
157            return "/home/sschwarzer/_ftputil_sync_"
158        # Needed for `isdir` in `FTPHost.remove`
159        def isfile(self, path):
160            return True
161
162    def __init__(self, *args, **kwargs):
163        ftputil.FTPHost.__init__(self, *args, **kwargs)
164        self.path = self._Path()
165
166#
167# Test cases
168#
169class TestConstructor:
170    """
171    Test initialization of `FTPHost` objects.
172    """
173
174    def test_open_and_close(self):
175        """
176        Test if opening and closing an `FTPHost` object works as
177        expected.
178        """
179        host = test_base.ftp_host_factory()
180        host.close()
181        assert host.closed is True
182        assert host._children == []
183
184    def test_invalid_login(self):
185        """Login to invalid host must fail."""
186        with pytest.raises(ftputil.error.FTPOSError):
187            test_base.ftp_host_factory(FailOnLoginSession)
188
189    def test_pwd_normalization(self):
190        """
191        Test if the stored current directory is normalized.
192        """
193        host = test_base.ftp_host_factory(UnnormalizedCurrentWorkingDirectory)
194        assert host.getcwd() == "/home"
195
196
197class TestKeepAlive:
198
199    def test_succeeding_keep_alive(self):
200        """Assume the connection is still alive."""
201        host = test_base.ftp_host_factory()
202        host.keep_alive()
203
204    def test_failing_keep_alive(self):
205        """Assume the connection has timed out, so `keep_alive` fails."""
206        host = test_base.ftp_host_factory(
207                 session_factory=FailOnKeepAliveSession)
208        with pytest.raises(ftputil.error.TemporaryError):
209            host.keep_alive()
210
211
212class TestSetParser:
213
214    class TrivialParser(ftputil.stat.Parser):
215        """
216        An instance of this parser always returns the same result
217        from its `parse_line` method. This is all we need to check
218        if ftputil uses the set parser. No actual parsing code is
219        required here.
220        """
221
222        def __init__(self):
223            # We can't use `os.stat("/home")` directly because we
224            # later need the object's `_st_name` attribute, which
225            # we can't set on a `os.stat` stat value.
226            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
227            default_stat_result._st_name = "home"
228            self.default_stat_result = default_stat_result
229
230        def parse_line(self, line, time_shift=0.0):
231            return self.default_stat_result
232
233    def test_set_parser(self):
234        """Test if the selected parser is used."""
235        host = test_base.ftp_host_factory()
236        assert host._stat._allow_parser_switching is True
237        trivial_parser = TestSetParser.TrivialParser()
238        host.set_parser(trivial_parser)
239        stat_result = host.stat("/home")
240        assert stat_result == trivial_parser.default_stat_result
241        assert host._stat._allow_parser_switching is False
242
243
244class TestCommandNotImplementedError:
245
246    def test_command_not_implemented_error(self):
247        """
248        Test if we get the anticipated exception if a command isn't
249        implemented by the server.
250        """
251        host = test_base.ftp_host_factory()
252        with pytest.raises(ftputil.error.CommandNotImplementedError):
253            host.chmod("nonexistent", 0o644)
254        # `CommandNotImplementedError` is a subclass of `PermanentError`.
255        with pytest.raises(ftputil.error.PermanentError):
256            host.chmod("nonexistent", 0o644)
257
258
259class TestRecursiveListingForDotAsPath:
260    """
261    Return a recursive directory listing when the path to list
262    is a dot. This is used to test for issue #33, see
263    http://ftputil.sschwarzer.net/trac/ticket/33 .
264    """
265
266    def test_recursive_listing(self):
267        host = test_base.ftp_host_factory(
268                 session_factory=RecursiveListingForDotAsPathSession)
269        lines = host._dir(host.curdir)
270        assert lines[0] == "total 10"
271        assert lines[1].startswith("lrwxrwxrwx   1 staff")
272        assert lines[2].startswith("d--x--x--x   2 staff")
273        host.close()
274
275    def test_plain_listing(self):
276        host = test_base.ftp_host_factory(
277                 session_factory=RecursiveListingForDotAsPathSession)
278        lines = host._dir("")
279        assert lines[0] == "total 10"
280        assert lines[1].startswith("lrwxrwxrwx   1 staff")
281        assert lines[2].startswith("d--x--x--x   2 staff")
282        host.close()
283
284    def test_empty_string_instead_of_dot_workaround(self):
285        host = test_base.ftp_host_factory(
286                 session_factory=RecursiveListingForDotAsPathSession)
287        files = host.listdir(host.curdir)
288        assert files == ["bin", "dev", "etc", "pub", "usr"]
289        host.close()
290
291
292class TestUploadAndDownload:
293    """Test ASCII upload and binary download as examples."""
294
295    def generate_file(self, data, file_name):
296        """Generate a local data file."""
297        with open(file_name, "wb") as source_file:
298            source_file.write(data)
299
300    def test_download(self):
301        """Test mode download."""
302        local_target = "_test_target_"
303        host = test_base.ftp_host_factory(
304                 session_factory=BinaryDownloadMockSession)
305        # Download
306        host.download("dummy", local_target)
307        # Read file and compare
308        with open(local_target, "rb") as fobj:
309            data = fobj.read()
310        remote_file_content = mock_ftplib.content_of("dummy")
311        assert data == remote_file_content
312        # Clean up
313        os.unlink(local_target)
314
315    def test_conditional_upload(self):
316        """Test conditional upload."""
317        local_source = "_test_source_"
318        data = binary_data()
319        self.generate_file(data, local_source)
320        # Target is newer, so don't upload.
321        host = test_base.ftp_host_factory(
322                 ftp_host_class=FailingUploadAndDownloadFTPHost)
323        flag = host.upload_if_newer(local_source, "/home/newer")
324        assert flag is False
325        # Target is older, so upload.
326        host = test_base.ftp_host_factory()
327        flag = host.upload_if_newer(local_source, "/home/older")
328        assert flag is True
329        remote_file_content = mock_ftplib.content_of("older")
330        assert data == remote_file_content
331        # Target doesn't exist, so upload.
332        host = test_base.ftp_host_factory()
333        flag = host.upload_if_newer(local_source, "/home/notthere")
334        assert flag is True
335        remote_file_content = mock_ftplib.content_of("notthere")
336        assert data == remote_file_content
337        # Clean up.
338        os.unlink(local_source)
339
340    def compare_and_delete_downloaded_data(self, file_name):
341        """
342        Compare content of downloaded file with its source, then
343        delete the local target file.
344        """
345        with open(file_name, "rb") as fobj:
346            data = fobj.read()
347        # The name `newer` is used by all callers, so use it here, too.
348        remote_file_content = mock_ftplib.content_of("newer")
349        assert data == remote_file_content
350        # Clean up
351        os.unlink(file_name)
352
353    def test_conditional_download_without_target(self):
354        """
355        Test conditional binary mode download when no target file
356        exists.
357        """
358        local_target = "_test_target_"
359        # Target does not exist, so download.
360        host = test_base.ftp_host_factory(
361                 session_factory=BinaryDownloadMockSession)
362        flag = host.download_if_newer("/home/newer", local_target)
363        assert flag is True
364        self.compare_and_delete_downloaded_data(local_target)
365
366    def test_conditional_download_with_older_target(self):
367        """Test conditional binary mode download with newer source file."""
368        local_target = "_test_target_"
369        # Make target file.
370        open(local_target, "w").close()
371        # Source is newer (date in 2020), so download.
372        host = test_base.ftp_host_factory(
373                 session_factory=BinaryDownloadMockSession)
374        flag = host.download_if_newer("/home/newer", local_target)
375        assert flag is True
376        self.compare_and_delete_downloaded_data(local_target)
377
378    def test_conditional_download_with_newer_target(self):
379        """Test conditional binary mode download with older source file."""
380        local_target = "_test_target_"
381        # Make target file.
382        open(local_target, "w").close()
383        # Source is older (date in 1970), so don't download.
384        host = test_base.ftp_host_factory(
385                 ftp_host_class=FailingUploadAndDownloadFTPHost,
386                 session_factory=BinaryDownloadMockSession)
387        flag = host.download_if_newer("/home/older", local_target)
388        assert flag is False
389        # Remove target file
390        os.unlink(local_target)
391
392
393class TestTimeShift:
394
395    def test_rounded_time_shift(self):
396        """Test if time shift is rounded correctly."""
397        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
398        # Use private bound method.
399        rounded_time_shift = host._FTPHost__rounded_time_shift
400        # Pairs consisting of original value and expected result
401        test_data = [
402          (      0,           0),
403          (      0.1,         0),
404          (     -0.1,         0),
405          (   1500,        1800),
406          (  -1500,       -1800),
407          (   1800,        1800),
408          (  -1800,       -1800),
409          (   2000,        1800),
410          (  -2000,       -1800),
411          ( 5*3600-100,  5*3600),
412          (-5*3600+100, -5*3600)]
413        for time_shift, expected_time_shift in test_data:
414            calculated_time_shift = rounded_time_shift(time_shift)
415            assert calculated_time_shift == expected_time_shift
416
417    def test_assert_valid_time_shift(self):
418        """Test time shift sanity checks."""
419        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
420        # Use private bound method.
421        assert_time_shift = host._FTPHost__assert_valid_time_shift
422        # Valid time shifts
423        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
424        for time_shift in test_data:
425            assert assert_time_shift(time_shift) is None
426        # Invalid time shift (exceeds one day)
427        with pytest.raises(ftputil.error.TimeShiftError):
428            assert_time_shift(25*3600)
429        with pytest.raises(ftputil.error.TimeShiftError):
430            assert_time_shift(-25*3600)
431        # Invalid time shift (too large deviation from 15-minute units
432        # is unacceptable)
433        with pytest.raises(ftputil.error.TimeShiftError):
434            assert_time_shift(8*60)
435        with pytest.raises(ftputil.error.TimeShiftError):
436            assert_time_shift(-3600-8*60)
437
438    def test_synchronize_times(self):
439        """Test time synchronization with server."""
440        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
441                                          session_factory=TimeShiftMockSession)
442        # Valid time shifts
443        test_data = [
444          (60*60+30,  60*60),
445          (60*60-100, 60*60),
446          (30*60+100, 30*60),
447          (45*60-100, 45*60),
448        ]
449        for measured_time_shift, expected_time_shift in test_data:
450            host.path.set_mtime(time.time() + measured_time_shift)
451            host.synchronize_times()
452            assert host.time_shift() == expected_time_shift
453        # Invalid time shifts
454        measured_time_shifts = [60*60+8*60, 45*60-6*60]
455        for measured_time_shift in measured_time_shifts:
456            host.path.set_mtime(time.time() + measured_time_shift)
457            with pytest.raises(ftputil.error.TimeShiftError):
458                host.synchronize_times()
459
460    def test_synchronize_times_for_server_in_east(self):
461        """Test for timestamp correction (see ticket #55)."""
462        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
463                                          session_factory=TimeShiftMockSession)
464        # Set this explicitly to emphasize the problem.
465        host.set_time_shift(0.0)
466        hour = 60 * 60
467        # This could be any negative time shift.
468        presumed_time_shift = -6 * hour
469        # Set `mtime` to simulate a server east of us.
470        # In case the `time_shift` value for this host instance is 0.0
471        # (as is to be expected before the time shift is determined),
472        # the directory parser (more specifically
473        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
474        # is a year too far in the past. The `synchronize_times`
475        # method needs to deal with this and add the year "back".
476        # I don't think it's a bug in `parse_unix_time` because the
477        # method should work once the time shift is set correctly.
478        local_time = time.localtime()
479        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
480        presumed_server_time = \
481          time.mktime(local_time_with_wrong_year) + presumed_time_shift
482        host.path.set_mtime(presumed_server_time)
483        host.synchronize_times()
484        assert host.time_shift() == presumed_time_shift
485
486
487class TestAcceptEitherUnicodeOrBytes:
488    """
489    Test whether certain `FTPHost` methods accept either unicode
490    or byte strings for the path(s).
491    """
492
493    def setup_method(self, method):
494        self.host = test_base.ftp_host_factory()
495
496    def test_upload(self):
497        """Test whether `upload` accepts either unicode or bytes."""
498        host = self.host
499        # The source file needs to be present in the current directory.
500        host.upload("Makefile", "target")
501        host.upload("Makefile", ftputil.tool.as_bytes("target"))
502
503    def test_download(self):
504        """Test whether `download` accepts either unicode or bytes."""
505        host = test_base.ftp_host_factory(
506                 session_factory=BinaryDownloadMockSession)
507        local_file_name = "_local_target_"
508        host.download("source", local_file_name)
509        host.download(ftputil.tool.as_bytes("source"), local_file_name)
510        os.remove(local_file_name)
511
512    def test_rename(self):
513        """Test whether `rename` accepts either unicode or bytes."""
514        # It's possible to mix argument types, as for `os.rename`.
515        path_as_unicode = "/home/file_name_test/ä"
516        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
517        paths = [path_as_unicode, path_as_bytes]
518        for source_path, target_path in itertools.product(paths, paths):
519            self.host.rename(source_path, target_path)
520
521    def test_listdir(self):
522        """Test whether `listdir` accepts either unicode or bytes."""
523        host = self.host
524        as_bytes = ftputil.tool.as_bytes
525        host.chdir("/home/file_name_test")
526        # Unicode
527        items = host.listdir("ä")
528        assert items == ["ö", "o"]
529        #  Need explicit type check for Python 2
530        for item in items:
531            assert isinstance(item, ftputil.compat.unicode_type)
532        # Bytes
533        items = host.listdir(as_bytes("ä"))
534        assert items == [as_bytes("ö"), as_bytes("o")]
535        #  Need explicit type check for Python 2
536        for item in items:
537            assert isinstance(item, ftputil.compat.bytes_type)
538
539    def test_chmod(self):
540        """Test whether `chmod` accepts either unicode or bytes."""
541        host = self.host
542        # The `voidcmd` implementation in `MockSession` would raise an
543        # exception for the `CHMOD` command.
544        host._session.voidcmd = host._session._ignore_arguments
545        path = "/home/file_name_test/ä"
546        host.chmod(path, 0o755)
547        host.chmod(ftputil.tool.as_bytes(path), 0o755)
548
549    def _test_method_with_single_path_argument(self, method, path):
550        method(path)
551        method(ftputil.tool.as_bytes(path))
552
553    def test_chdir(self):
554        """Test whether `chdir` accepts either unicode or bytes."""
555        self._test_method_with_single_path_argument(
556          self.host.chdir, "/home/file_name_test/ö")
557
558    def test_mkdir(self):
559        """Test whether `mkdir` accepts either unicode or bytes."""
560        # This directory exists already in the mock session, but this
561        # shouldn't matter for the test.
562        self._test_method_with_single_path_argument(
563          self.host.mkdir, "/home/file_name_test/ä")
564
565    def test_makedirs(self):
566        """Test whether `makedirs` accepts either unicode or bytes."""
567        self._test_method_with_single_path_argument(
568          self.host.makedirs, "/home/file_name_test/ä")
569
570    def test_rmdir(self):
571        """Test whether `rmdir` accepts either unicode or bytes."""
572        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
573        self._test_method_with_single_path_argument(
574          self.host.rmdir, empty_directory_as_required_by_rmdir)
575
576    def test_remove(self):
577        """Test whether `remove` accepts either unicode or bytes."""
578        self._test_method_with_single_path_argument(
579          self.host.remove, "/home/file_name_test/ö")
580
581    def test_rmtree(self):
582        """Test whether `rmtree` accepts either unicode or bytes."""
583        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
584        self._test_method_with_single_path_argument(
585          self.host.rmtree, empty_directory_as_required_by_rmtree)
586
587    def test_lstat(self):
588        """Test whether `lstat` accepts either unicode or bytes."""
589        self._test_method_with_single_path_argument(
590          self.host.lstat, "/home/file_name_test/ä")
591
592    def test_stat(self):
593        """Test whether `stat` accepts either unicode or bytes."""
594        self._test_method_with_single_path_argument(
595          self.host.stat, "/home/file_name_test/ä")
596
597    def test_walk(self):
598        """Test whether `walk` accepts either unicode or bytes."""
599        # We're not interested in the return value of `walk`.
600        self._test_method_with_single_path_argument(
601          self.host.walk, "/home/file_name_test/ä")
602
603
604class TestFailingPickling:
605
606    def test_failing_pickling(self):
607        """Test if pickling (intentionally) isn't supported."""
608        with test_base.ftp_host_factory() as host:
609            with pytest.raises(TypeError):
610                pickle.dumps(host)
611            with host.open("/home/sschwarzer/index.html") as file_obj:
612                with pytest.raises(TypeError):
613                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.