source: test/test_host.py @ 1677:b4c9b089b6b8

Last change on this file since 1677:b4c9b089b6b8 was 1677:b4c9b089b6b8, checked in by Stefan Schwarzer <sschwarzer@…>, 22 months ago
Apply `normpath` when getting the working directory When getting the current working directory of the FTP session, apply `normpath`. This makes sure that an initial `FTPHost.getcwd` call returns the current working without a trailing slash (with the exception of the current directory being `/`). ticket: 113
File size: 21.8 KB
Line 
1# encoding: utf-8
2# Copyright (C) 2002-2016, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6from __future__ import unicode_literals
7
8import ftplib
9import itertools
10import os
11import pickle
12import posixpath
13import random
14import time
15import warnings
16
17import pytest
18
19import ftputil
20import ftputil.compat
21import ftputil.error
22import ftputil.tool
23import ftputil.stat
24
25from test import mock_ftplib
26from test import test_base
27
28
29#
30# Helper functions to generate random data
31#
32def random_data(pool, size=10000):
33    """
34    Return a byte string of characters consisting of those from the
35    pool of integer numbers.
36    """
37    ordinal_list = [random.choice(pool) for i in range(size)]
38    return ftputil.compat.bytes_from_ints(ordinal_list)
39
40
41def ascii_data():
42    r"""
43    Return a unicode string of "normal" ASCII characters, including `\r`.
44    """
45    pool = list(range(32, 128))
46    # The idea is to have the "\r" converted to "\n" during the later
47    # text write and check this conversion.
48    pool.append(ord("\r"))
49    return ftputil.tool.as_unicode(random_data(pool))
50
51
52def binary_data():
53    """Return a binary character byte string."""
54    pool = list(range(0, 256))
55    return random_data(pool)
56
57
58#
59# Several customized `MockSession` classes
60#
61class FailOnLoginSession(mock_ftplib.MockSession):
62
63    def __init__(self, host="", user="", password=""):
64        raise ftplib.error_perm
65
66
67class FailOnKeepAliveSession(mock_ftplib.MockSession):
68
69    def pwd(self):
70        # Raise exception on second call to let the constructor work.
71        if not hasattr(self, "pwd_called"):
72            self.pwd_called = True
73            return "/home"
74        else:
75            raise ftplib.error_temp
76
77
78class UnnormalizedCurrentWorkingDirectory(mock_ftplib.MockSession):
79
80    def pwd(self):
81        # Deliberately return the current working directory with a
82        # trailing slash to test if it's removed when stored in the
83        # `FTPHost` instance.
84        return "/home/"
85
86
87class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
88
89    dir_contents = {
90      ".": """\
91lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
92
93dev:
94total 10
95
96etc:
97total 10
98
99pub:
100total 4
101-rw-r--r--   1 staff         74 Sep 25  2000 .message
102----------   1 staff          0 Aug 16  2003 .notar
103drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
104
105usr:
106total 4""",
107
108      "": """\
109total 10
110lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
111d--x--x--x   2 staff        512 Sep 24  2000 dev
112d--x--x--x   3 staff        512 Sep 25  2000 etc
113dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
114d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
115
116    def _transform_path(self, path):
117        return path
118
119
120class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
121
122    mock_file_content = binary_data()
123
124
125class TimeShiftMockSession(mock_ftplib.MockSession):
126
127    def delete(self, file_name):
128        pass
129
130#
131# Customized `FTPHost` class for conditional upload/download tests
132# and time shift tests
133#
134class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
135
136    def upload(self, source, target, mode=""):
137        pytest.fail("`FTPHost.upload` should not have been called")
138
139    def download(self, source, target, mode=""):
140        pytest.fail("`FTPHost.download` should not have been called")
141
142
143class TimeShiftFTPHost(ftputil.FTPHost):
144
145    class _Path:
146        def split(self, path):
147            return posixpath.split(path)
148        def set_mtime(self, mtime):
149            self._mtime = mtime
150        def getmtime(self, file_name):
151            return self._mtime
152        def join(self, *args):
153            return posixpath.join(*args)
154        def normpath(self, path):
155            return posixpath.normpath(path)
156        def isabs(self, path):
157            return posixpath.isabs(path)
158        def abspath(self, path):
159            return "/home/sschwarzer/_ftputil_sync_"
160        # Needed for `isdir` in `FTPHost.remove`
161        def isfile(self, path):
162            return True
163
164    def __init__(self, *args, **kwargs):
165        ftputil.FTPHost.__init__(self, *args, **kwargs)
166        self.path = self._Path()
167
168#
169# Test cases
170#
171class TestConstructor(object):
172    """
173    Test initialization of `FTPHost` objects.
174    """
175
176    def test_open_and_close(self):
177        """
178        Test if opening and closing an `FTPHost` object works as
179        expected.
180        """
181        host = test_base.ftp_host_factory()
182        host.close()
183        assert host.closed is True
184        assert host._children == []
185
186    def test_invalid_login(self):
187        """Login to invalid host must fail."""
188        with pytest.raises(ftputil.error.FTPOSError):
189            test_base.ftp_host_factory(FailOnLoginSession)
190
191    def test_pwd_normalization(self):
192        """
193        Test if the stored current directory is normalized.
194        """
195        host = test_base.ftp_host_factory(UnnormalizedCurrentWorkingDirectory)
196        assert host.getcwd() == "/home"
197
198
199class TestKeepAlive(object):
200
201    def test_succeeding_keep_alive(self):
202        """Assume the connection is still alive."""
203        host = test_base.ftp_host_factory()
204        host.keep_alive()
205
206    def test_failing_keep_alive(self):
207        """Assume the connection has timed out, so `keep_alive` fails."""
208        host = test_base.ftp_host_factory(
209                 session_factory=FailOnKeepAliveSession)
210        with pytest.raises(ftputil.error.TemporaryError):
211            host.keep_alive()
212
213
214class TestSetParser(object):
215
216    class TrivialParser(ftputil.stat.Parser):
217        """
218        An instance of this parser always returns the same result
219        from its `parse_line` method. This is all we need to check
220        if ftputil uses the set parser. No actual parsing code is
221        required here.
222        """
223
224        def __init__(self):
225            # We can't use `os.stat("/home")` directly because we
226            # later need the object's `_st_name` attribute, which
227            # we can't set on a `os.stat` stat value.
228            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
229            default_stat_result._st_name = "home"
230            self.default_stat_result = default_stat_result
231
232        def parse_line(self, line, time_shift=0.0):
233            return self.default_stat_result
234
235    def test_set_parser(self):
236        """Test if the selected parser is used."""
237        host = test_base.ftp_host_factory()
238        assert host._stat._allow_parser_switching is True
239        trivial_parser = TestSetParser.TrivialParser()
240        host.set_parser(trivial_parser)
241        stat_result = host.stat("/home")
242        assert stat_result == trivial_parser.default_stat_result
243        assert host._stat._allow_parser_switching is False
244
245
246class TestCommandNotImplementedError(object):
247
248    def test_command_not_implemented_error(self):
249        """
250        Test if we get the anticipated exception if a command isn't
251        implemented by the server.
252        """
253        host = test_base.ftp_host_factory()
254        with pytest.raises(ftputil.error.CommandNotImplementedError):
255            host.chmod("nonexistent", 0o644)
256        # `CommandNotImplementedError` is a subclass of `PermanentError`.
257        with pytest.raises(ftputil.error.PermanentError):
258            host.chmod("nonexistent", 0o644)
259
260
261class TestRecursiveListingForDotAsPath(object):
262    """
263    Return a recursive directory listing when the path to list
264    is a dot. This is used to test for issue #33, see
265    http://ftputil.sschwarzer.net/trac/ticket/33 .
266    """
267
268    def test_recursive_listing(self):
269        host = test_base.ftp_host_factory(
270                 session_factory=RecursiveListingForDotAsPathSession)
271        lines = host._dir(host.curdir)
272        assert lines[0] == "total 10"
273        assert lines[1].startswith("lrwxrwxrwx   1 staff")
274        assert lines[2].startswith("d--x--x--x   2 staff")
275        host.close()
276
277    def test_plain_listing(self):
278        host = test_base.ftp_host_factory(
279                 session_factory=RecursiveListingForDotAsPathSession)
280        lines = host._dir("")
281        assert lines[0] == "total 10"
282        assert lines[1].startswith("lrwxrwxrwx   1 staff")
283        assert lines[2].startswith("d--x--x--x   2 staff")
284        host.close()
285
286    def test_empty_string_instead_of_dot_workaround(self):
287        host = test_base.ftp_host_factory(
288                 session_factory=RecursiveListingForDotAsPathSession)
289        files = host.listdir(host.curdir)
290        assert files == ["bin", "dev", "etc", "pub", "usr"]
291        host.close()
292
293
294class TestUploadAndDownload(object):
295    """Test ASCII upload and binary download as examples."""
296
297    def generate_file(self, data, file_name):
298        """Generate a local data file."""
299        with open(file_name, "wb") as source_file:
300            source_file.write(data)
301
302    def test_download(self):
303        """Test mode download."""
304        local_target = "_test_target_"
305        host = test_base.ftp_host_factory(
306                 session_factory=BinaryDownloadMockSession)
307        # Download
308        host.download("dummy", local_target)
309        # Read file and compare
310        with open(local_target, "rb") as fobj:
311            data = fobj.read()
312        remote_file_content = mock_ftplib.content_of("dummy")
313        assert data == remote_file_content
314        # Clean up
315        os.unlink(local_target)
316
317    def test_conditional_upload(self):
318        """Test conditional upload."""
319        local_source = "_test_source_"
320        data = binary_data()
321        self.generate_file(data, local_source)
322        # Target is newer, so don't upload.
323        host = test_base.ftp_host_factory(
324                 ftp_host_class=FailingUploadAndDownloadFTPHost)
325        flag = host.upload_if_newer(local_source, "/home/newer")
326        assert flag is False
327        # Target is older, so upload.
328        host = test_base.ftp_host_factory()
329        flag = host.upload_if_newer(local_source, "/home/older")
330        assert flag is True
331        remote_file_content = mock_ftplib.content_of("older")
332        assert data == remote_file_content
333        # Target doesn't exist, so upload.
334        host = test_base.ftp_host_factory()
335        flag = host.upload_if_newer(local_source, "/home/notthere")
336        assert flag is True
337        remote_file_content = mock_ftplib.content_of("notthere")
338        assert data == remote_file_content
339        # Clean up.
340        os.unlink(local_source)
341
342    def compare_and_delete_downloaded_data(self, file_name):
343        """
344        Compare content of downloaded file with its source, then
345        delete the local target file.
346        """
347        with open(file_name, "rb") as fobj:
348            data = fobj.read()
349        # The name `newer` is used by all callers, so use it here, too.
350        remote_file_content = mock_ftplib.content_of("newer")
351        assert data == remote_file_content
352        # Clean up
353        os.unlink(file_name)
354
355    def test_conditional_download_without_target(self):
356        """
357        Test conditional binary mode download when no target file
358        exists.
359        """
360        local_target = "_test_target_"
361        # Target does not exist, so download.
362        host = test_base.ftp_host_factory(
363                 session_factory=BinaryDownloadMockSession)
364        flag = host.download_if_newer("/home/newer", local_target)
365        assert flag is True
366        self.compare_and_delete_downloaded_data(local_target)
367
368    def test_conditional_download_with_older_target(self):
369        """Test conditional binary mode download with newer source file."""
370        local_target = "_test_target_"
371        # Make target file.
372        open(local_target, "w").close()
373        # Source is newer (date in 2020), so download.
374        host = test_base.ftp_host_factory(
375                 session_factory=BinaryDownloadMockSession)
376        flag = host.download_if_newer("/home/newer", local_target)
377        assert flag is True
378        self.compare_and_delete_downloaded_data(local_target)
379
380    def test_conditional_download_with_newer_target(self):
381        """Test conditional binary mode download with older source file."""
382        local_target = "_test_target_"
383        # Make target file.
384        open(local_target, "w").close()
385        # Source is older (date in 1970), so don't download.
386        host = test_base.ftp_host_factory(
387                 ftp_host_class=FailingUploadAndDownloadFTPHost,
388                 session_factory=BinaryDownloadMockSession)
389        flag = host.download_if_newer("/home/older", local_target)
390        assert flag is False
391        # Remove target file
392        os.unlink(local_target)
393
394
395class TestTimeShift(object):
396
397    def test_rounded_time_shift(self):
398        """Test if time shift is rounded correctly."""
399        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
400        # Use private bound method.
401        rounded_time_shift = host._FTPHost__rounded_time_shift
402        # Pairs consisting of original value and expected result
403        test_data = [
404          (      0,           0),
405          (      0.1,         0),
406          (     -0.1,         0),
407          (   1500,        1800),
408          (  -1500,       -1800),
409          (   1800,        1800),
410          (  -1800,       -1800),
411          (   2000,        1800),
412          (  -2000,       -1800),
413          ( 5*3600-100,  5*3600),
414          (-5*3600+100, -5*3600)]
415        for time_shift, expected_time_shift in test_data:
416            calculated_time_shift = rounded_time_shift(time_shift)
417            assert calculated_time_shift == expected_time_shift
418
419    def test_assert_valid_time_shift(self):
420        """Test time shift sanity checks."""
421        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
422        # Use private bound method.
423        assert_time_shift = host._FTPHost__assert_valid_time_shift
424        # Valid time shifts
425        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
426        for time_shift in test_data:
427            assert assert_time_shift(time_shift) is None
428        # Invalid time shift (exceeds one day)
429        with pytest.raises(ftputil.error.TimeShiftError):
430            assert_time_shift(25*3600)
431        with pytest.raises(ftputil.error.TimeShiftError):
432            assert_time_shift(-25*3600)
433        # Invalid time shift (too large deviation from 15-minute units
434        # is unacceptable)
435        with pytest.raises(ftputil.error.TimeShiftError):
436            assert_time_shift(8*60)
437        with pytest.raises(ftputil.error.TimeShiftError):
438            assert_time_shift(-3600-8*60)
439
440    def test_synchronize_times(self):
441        """Test time synchronization with server."""
442        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
443                                          session_factory=TimeShiftMockSession)
444        # Valid time shifts
445        test_data = [
446          (60*60+30,  60*60),
447          (60*60-100, 60*60),
448          (30*60+100, 30*60),
449          (45*60-100, 45*60),
450        ]
451        for measured_time_shift, expected_time_shift in test_data:
452            host.path.set_mtime(time.time() + measured_time_shift)
453            host.synchronize_times()
454            assert host.time_shift() == expected_time_shift
455        # Invalid time shifts
456        measured_time_shifts = [60*60+8*60, 45*60-6*60]
457        for measured_time_shift in measured_time_shifts:
458            host.path.set_mtime(time.time() + measured_time_shift)
459            with pytest.raises(ftputil.error.TimeShiftError):
460                host.synchronize_times()
461
462    def test_synchronize_times_for_server_in_east(self):
463        """Test for timestamp correction (see ticket #55)."""
464        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
465                                          session_factory=TimeShiftMockSession)
466        # Set this explicitly to emphasize the problem.
467        host.set_time_shift(0.0)
468        hour = 60 * 60
469        # This could be any negative time shift.
470        presumed_time_shift = -6 * hour
471        # Set `mtime` to simulate a server east of us.
472        # In case the `time_shift` value for this host instance is 0.0
473        # (as is to be expected before the time shift is determined),
474        # the directory parser (more specifically
475        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
476        # is a year too far in the past. The `synchronize_times`
477        # method needs to deal with this and add the year "back".
478        # I don't think it's a bug in `parse_unix_time` because the
479        # method should work once the time shift is set correctly.
480        local_time = time.localtime()
481        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
482        presumed_server_time = \
483          time.mktime(local_time_with_wrong_year) + presumed_time_shift
484        host.path.set_mtime(presumed_server_time)
485        host.synchronize_times()
486        assert host.time_shift() == presumed_time_shift
487
488
489class TestAcceptEitherUnicodeOrBytes(object):
490    """
491    Test whether certain `FTPHost` methods accept either unicode
492    or byte strings for the path(s).
493    """
494
495    def setup_method(self, method):
496        self.host = test_base.ftp_host_factory()
497
498    def test_upload(self):
499        """Test whether `upload` accepts either unicode or bytes."""
500        host = self.host
501        # The source file needs to be present in the current directory.
502        host.upload("Makefile", "target")
503        host.upload("Makefile", ftputil.tool.as_bytes("target"))
504
505    def test_download(self):
506        """Test whether `download` accepts either unicode or bytes."""
507        host = test_base.ftp_host_factory(
508                 session_factory=BinaryDownloadMockSession)
509        local_file_name = "_local_target_"
510        host.download("source", local_file_name)
511        host.download(ftputil.tool.as_bytes("source"), local_file_name)
512        os.remove(local_file_name)
513
514    def test_rename(self):
515        """Test whether `rename` accepts either unicode or bytes."""
516        # It's possible to mix argument types, as for `os.rename`.
517        path_as_unicode = "/home/file_name_test/ä"
518        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
519        paths = [path_as_unicode, path_as_bytes]
520        for source_path, target_path in itertools.product(paths, paths):
521            self.host.rename(source_path, target_path)
522
523    def test_listdir(self):
524        """Test whether `listdir` accepts either unicode or bytes."""
525        host = self.host
526        as_bytes = ftputil.tool.as_bytes
527        host.chdir("/home/file_name_test")
528        # Unicode
529        items = host.listdir("ä")
530        assert items == ["ö", "o"]
531        #  Need explicit type check for Python 2
532        for item in items:
533            assert isinstance(item, ftputil.compat.unicode_type)
534        # Bytes
535        items = host.listdir(as_bytes("ä"))
536        assert items == [as_bytes("ö"), as_bytes("o")]
537        #  Need explicit type check for Python 2
538        for item in items:
539            assert isinstance(item, ftputil.compat.bytes_type)
540
541    def test_chmod(self):
542        """Test whether `chmod` accepts either unicode or bytes."""
543        host = self.host
544        # The `voidcmd` implementation in `MockSession` would raise an
545        # exception for the `CHMOD` command.
546        host._session.voidcmd = host._session._ignore_arguments
547        path = "/home/file_name_test/ä"
548        host.chmod(path, 0o755)
549        host.chmod(ftputil.tool.as_bytes(path), 0o755)
550
551    def _test_method_with_single_path_argument(self, method, path):
552        method(path)
553        method(ftputil.tool.as_bytes(path))
554
555    def test_chdir(self):
556        """Test whether `chdir` accepts either unicode or bytes."""
557        self._test_method_with_single_path_argument(
558          self.host.chdir, "/home/file_name_test/ö")
559
560    def test_mkdir(self):
561        """Test whether `mkdir` accepts either unicode or bytes."""
562        # This directory exists already in the mock session, but this
563        # shouldn't matter for the test.
564        self._test_method_with_single_path_argument(
565          self.host.mkdir, "/home/file_name_test/ä")
566
567    def test_makedirs(self):
568        """Test whether `makedirs` accepts either unicode or bytes."""
569        self._test_method_with_single_path_argument(
570          self.host.makedirs, "/home/file_name_test/ä")
571
572    def test_rmdir(self):
573        """Test whether `rmdir` accepts either unicode or bytes."""
574        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
575        self._test_method_with_single_path_argument(
576          self.host.rmdir, empty_directory_as_required_by_rmdir)
577
578    def test_remove(self):
579        """Test whether `remove` accepts either unicode or bytes."""
580        self._test_method_with_single_path_argument(
581          self.host.remove, "/home/file_name_test/ö")
582
583    def test_rmtree(self):
584        """Test whether `rmtree` accepts either unicode or bytes."""
585        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
586        self._test_method_with_single_path_argument(
587          self.host.rmtree, empty_directory_as_required_by_rmtree)
588
589    def test_lstat(self):
590        """Test whether `lstat` accepts either unicode or bytes."""
591        self._test_method_with_single_path_argument(
592          self.host.lstat, "/home/file_name_test/ä")
593
594    def test_stat(self):
595        """Test whether `stat` accepts either unicode or bytes."""
596        self._test_method_with_single_path_argument(
597          self.host.stat, "/home/file_name_test/ä")
598
599    def test_walk(self):
600        """Test whether `walk` accepts either unicode or bytes."""
601        # We're not interested in the return value of `walk`.
602        self._test_method_with_single_path_argument(
603          self.host.walk, "/home/file_name_test/ä")
604
605
606class TestFailingPickling(object):
607
608    def test_failing_pickling(self):
609        """Test if pickling (intentionally) isn't supported."""
610        with test_base.ftp_host_factory() as host:
611            with pytest.raises(TypeError):
612                pickle.dumps(host)
613            with host.open("/home/sschwarzer/index.html") as file_obj:
614                with pytest.raises(TypeError):
615                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.