source: test/test_real_ftp.py @ 1720:a10aefe0f71f

Last change on this file since 1720:a10aefe0f71f was 1720:a10aefe0f71f, checked in by Stefan Schwarzer <sschwarzer@…>, 12 months ago
Remove support for `M2Crypto` M2Crypto was used to get FTP_TLS support in Python 2. Since we now target Python 3.5+, users can and should use `ftplib.FTP_TLS` to get FTP_TLS support.
File size: 35.3 KB
Line 
1# Copyright (C) 2003-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5# Execute tests on a real FTP server (other tests use a mock server).
6#
7# This test writes some files and directories on the local client and
8# the remote server. You'll need write access in the login directory.
9# This test can take a few minutes because it has to wait to test the
10# timezone calculation.
11
12import ftplib
13import functools
14import gc
15import operator
16import os
17import time
18import stat
19
20import pytest
21
22import ftputil.compat
23import ftputil.error
24import ftputil.file_transfer
25import ftputil.session
26import ftputil.stat_cache
27
28import test
29
30
31def utc_local_time_shift():
32    """
33    Return the expected time shift in seconds assuming the server
34    uses UTC in its listings and the client uses local time.
35
36    This is needed because Pure-FTPd meanwhile seems to insist that
37    the displayed time for files is in UTC.
38    """
39    utc_tuple = time.gmtime()
40    localtime_tuple = time.localtime()
41    # To calculate the correct times shift, we need to ignore the
42    # DST component in the localtime tuple, i. e. set it to 0.
43    localtime_tuple = localtime_tuple[:-1] + (0,)
44    time_shift_in_seconds = (time.mktime(utc_tuple) -
45                             time.mktime(localtime_tuple))
46    # To be safe, round the above value to units of 3600 s (1 hour).
47    return round(time_shift_in_seconds / 3600.0) * 3600
48
49# Difference between local times of server and client. If 0.0, server
50# and client use the same timezone.
51#EXPECTED_TIME_SHIFT = utc_local_time_shift()
52# Pure-FTPd seems to have changed its mind (see docstring of
53# `utc_local_time_shift`).
54EXPECTED_TIME_SHIFT = 0.0
55
56
57class Cleaner:
58    """
59    This class helps remove directories and files which might
60    otherwise be left behind if a test fails in unexpected ways.
61    """
62
63    def __init__(self, host):
64        # The test class (probably `RealFTPTest`) and the helper
65        # class share the same `FTPHost` object.
66        self._host = host
67        self._ftp_items = []
68
69    def add_dir(self, path):
70        """Schedule a directory with path `path` for removal."""
71        self._ftp_items.append(("d", self._host.path.abspath(path)))
72
73    def add_file(self, path):
74        """Schedule a file with path `path` for removal."""
75        self._ftp_items.append(("f", self._host.path.abspath(path)))
76
77    def clean(self):
78        """
79        Remove the directories and files previously remembered.
80        The removal works in reverse order of the scheduling with
81        `add_dir` and `add_file`.
82
83        Errors due to a removal are ignored.
84        """
85        self._host.chdir("/")
86        for type_, path in reversed(self._ftp_items):
87            try:
88                if type_ == "d":
89                    # If something goes wrong in `rmtree` we might
90                    # leave a mess behind.
91                    self._host.rmtree(path)
92                elif type_ == "f":
93                    # Minor mess if `remove` fails
94                    self._host.remove(path)
95            except ftputil.error.FTPError:
96                pass
97
98
99class RealFTPTest:
100
101    def setup_method(self, method):
102        # Server, username, password.
103        self.login_data = ("localhost", "ftptest",
104                            "d605581757de5eb56d568a4419f4126e")
105        self.host = ftputil.FTPHost(*self.login_data)
106        self.cleaner = Cleaner(self.host)
107
108    def teardown_method(self, method):
109        self.cleaner.clean()
110        self.host.close()
111
112    #
113    # Helper methods
114    #
115    def make_remote_file(self, path):
116        """Create a file on the FTP host."""
117        self.cleaner.add_file(path)
118        with self.host.open(path, "wb") as file_:
119            # Write something. Otherwise the FTP server might not update
120            # the time of last modification if the file existed before.
121            file_.write(b"\n")
122
123    def make_local_file(self):
124        """Create a file on the local host (= on the client side)."""
125        with open("_local_file_", "wb") as fobj:
126            fobj.write(b"abc\x12\x34def\t")
127
128
129class TestMkdir(RealFTPTest):
130
131    def test_mkdir_rmdir(self):
132        host = self.host
133        dir_name = "_testdir_"
134        file_name = host.path.join(dir_name, "_nonempty_")
135        self.cleaner.add_dir(dir_name)
136        # Make dir and check if the directory is there.
137        host.mkdir(dir_name)
138        files = host.listdir(host.curdir)
139        assert dir_name in files
140        # Try to remove a non-empty directory.
141        self.cleaner.add_file(file_name)
142        non_empty = host.open(file_name, "w")
143        non_empty.close()
144        with pytest.raises(ftputil.error.PermanentError):
145            host.rmdir(dir_name)
146        # Remove file.
147        host.unlink(file_name)
148        # `remove` on a directory should fail.
149        try:
150            try:
151                host.remove(dir_name)
152            except ftputil.error.PermanentError as exc:
153                assert str(exc).startswith(
154                         "remove/unlink can only delete files")
155            else:
156                pytest.fail("we shouldn't have come here")
157        finally:
158            # Delete empty directory.
159            host.rmdir(dir_name)
160        files = host.listdir(host.curdir)
161        assert dir_name not in files
162
163    def test_makedirs_without_existing_dirs(self):
164        host = self.host
165        # No `_dir1_` yet
166        assert "_dir1_" not in host.listdir(host.curdir)
167        # Vanilla case, all should go well.
168        host.makedirs("_dir1_/dir2/dir3/dir4")
169        self.cleaner.add_dir("_dir1_")
170        # Check host.
171        assert host.path.isdir("_dir1_")
172        assert host.path.isdir("_dir1_/dir2")
173        assert host.path.isdir("_dir1_/dir2/dir3")
174        assert host.path.isdir("_dir1_/dir2/dir3/dir4")
175
176    def test_makedirs_from_non_root_directory(self):
177        # This is a testcase for issue #22, see
178        # http://ftputil.sschwarzer.net/trac/ticket/22 .
179        host = self.host
180        # No `_dir1_` and `_dir2_` yet
181        assert "_dir1_" not in host.listdir(host.curdir)
182        assert "_dir2_" not in host.listdir(host.curdir)
183        # Part 1: Try to make directories starting from `_dir1_` and
184        # change to non-root directory.
185        self.cleaner.add_dir("_dir1_")
186        host.mkdir("_dir1_")
187        host.chdir("_dir1_")
188        host.makedirs("_dir2_/_dir3_")
189        # Test for expected directory hierarchy.
190        assert host.path.isdir("/_dir1_")
191        assert host.path.isdir("/_dir1_/_dir2_")
192        assert host.path.isdir("/_dir1_/_dir2_/_dir3_")
193        assert not host.path.isdir("/_dir1_/_dir1_")
194        # Remove all but the directory we're in.
195        host.rmdir("/_dir1_/_dir2_/_dir3_")
196        host.rmdir("/_dir1_/_dir2_")
197        # Part 2: Try to make directories starting from root.
198        self.cleaner.add_dir("/_dir2_")
199        host.makedirs("/_dir2_/_dir3_")
200        # Test for expected directory hierarchy
201        assert host.path.isdir("/_dir2_")
202        assert host.path.isdir("/_dir2_/_dir3_")
203        assert not host.path.isdir("/_dir1_/_dir2_")
204
205    def test_makedirs_of_existing_directory(self):
206        host = self.host
207        # The (chrooted) login directory
208        host.makedirs("/")
209
210    def test_makedirs_with_file_in_the_way(self):
211        host = self.host
212        self.cleaner.add_dir("_dir1_")
213        host.mkdir("_dir1_")
214        self.make_remote_file("_dir1_/file1")
215        # Try it.
216        with pytest.raises(ftputil.error.PermanentError):
217            host.makedirs("_dir1_/file1")
218        with pytest.raises(ftputil.error.PermanentError):
219            host.makedirs("_dir1_/file1/dir2")
220
221    def test_makedirs_with_existing_directory(self):
222        host = self.host
223        self.cleaner.add_dir("_dir1_")
224        host.mkdir("_dir1_")
225        host.makedirs("_dir1_/dir2")
226        # Check
227        assert host.path.isdir("_dir1_")
228        assert host.path.isdir("_dir1_/dir2")
229
230    def test_makedirs_in_non_writable_directory(self):
231        host = self.host
232        # Preparation: `rootdir1` exists but is only writable by root.
233        with pytest.raises(ftputil.error.PermanentError):
234            host.makedirs("rootdir1/dir2")
235
236    def test_makedirs_with_writable_directory_at_end(self):
237        host = self.host
238        self.cleaner.add_dir("rootdir2/dir2")
239        # Preparation: `rootdir2` exists but is only writable by root.
240        # `dir2` is writable by regular ftp users. Both directories
241        # below should work.
242        host.makedirs("rootdir2/dir2")
243        host.makedirs("rootdir2/dir2/dir3")
244
245
246class TestRemoval(RealFTPTest):
247
248    def test_rmtree_without_error_handler(self):
249        host = self.host
250        # Build a tree.
251        self.cleaner.add_dir("_dir1_")
252        host.makedirs("_dir1_/dir2")
253        self.make_remote_file("_dir1_/file1")
254        self.make_remote_file("_dir1_/file2")
255        self.make_remote_file("_dir1_/dir2/file3")
256        self.make_remote_file("_dir1_/dir2/file4")
257        # Try to remove a _file_ with `rmtree`.
258        with pytest.raises(ftputil.error.PermanentError):
259            host.rmtree("_dir1_/file2")
260        # Remove `dir2`.
261        host.rmtree("_dir1_/dir2")
262        assert not host.path.exists("_dir1_/dir2")
263        assert host.path.exists("_dir1_/file2")
264        # Re-create `dir2` and remove `_dir1_`.
265        host.mkdir("_dir1_/dir2")
266        self.make_remote_file("_dir1_/dir2/file3")
267        self.make_remote_file("_dir1_/dir2/file4")
268        host.rmtree("_dir1_")
269        assert not host.path.exists("_dir1_")
270
271    def test_rmtree_with_error_handler(self):
272        host = self.host
273        self.cleaner.add_dir("_dir1_")
274        host.mkdir("_dir1_")
275        self.make_remote_file("_dir1_/file1")
276        # Prepare error "handler"
277        log = []
278        def error_handler(*args):
279            log.append(args)
280        # Try to remove a file as root "directory".
281        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
282        assert log == []
283        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
284        assert log[0][0] == host.listdir
285        assert log[0][1] == "_dir1_/file1"
286        assert log[1][0] == host.rmdir
287        assert log[1][1] == "_dir1_/file1"
288        host.rmtree("_dir1_")
289        # Try to remove a non-existent directory.
290        del log[:]
291        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
292        assert log[0][0] == host.listdir
293        assert log[0][1] == "_dir1_"
294        assert log[1][0] == host.rmdir
295        assert log[1][1] == "_dir1_"
296
297    def test_remove_non_existent_item(self):
298        host = self.host
299        with pytest.raises(ftputil.error.PermanentError):
300            host.remove("nonexistent")
301
302    def test_remove_existing_file(self):
303        self.cleaner.add_file("_testfile_")
304        self.make_remote_file("_testfile_")
305        host = self.host
306        assert host.path.isfile("_testfile_")
307        host.remove("_testfile_")
308        assert not host.path.exists("_testfile_")
309
310
311class TestWalk(RealFTPTest):
312    """
313    Walk the directory tree
314
315      walk_test
316      ├── dir1
317      │   ├── dir11
318      │   └── dir12
319      │       ├── dir123
320      │       │   └── file1234
321      │       ├── file121
322      │       └── file122
323      ├── dir2
324      ├── dir3
325      │   ├── dir31
326      │   ├── dir32 -> ../dir1/dir12/dir123
327      │   ├── file31
328      │   └── file32
329      └── file4
330
331    and check if the results are the expected ones.
332    """
333
334    def _walk_test(self, expected_result, **walk_kwargs):
335        """Walk the directory and test results."""
336        # Collect data using `walk`.
337        actual_result = []
338        for items in self.host.walk(**walk_kwargs):
339            actual_result.append(items)
340        # Compare with expected results.
341        assert len(actual_result) == len(expected_result)
342        for index, _ in enumerate(actual_result):
343            assert actual_result[index] == expected_result[index]
344
345    def test_walk_topdown(self):
346        # Preparation: build tree in directory `walk_test`.
347        expected_result = [
348          ("walk_test",
349           ["dir1", "dir2", "dir3"],
350           ["file4"]),
351          #
352          ("walk_test/dir1",
353           ["dir11", "dir12"],
354           []),
355          #
356          ("walk_test/dir1/dir11",
357           [],
358           []),
359          #
360          ("walk_test/dir1/dir12",
361           ["dir123"],
362           ["file121", "file122"]),
363          #
364          ("walk_test/dir1/dir12/dir123",
365           [],
366           ["file1234"]),
367          #
368          ("walk_test/dir2",
369           [],
370           []),
371          #
372          ("walk_test/dir3",
373           ["dir31", "dir32"],
374           ["file31", "file32"]),
375          #
376          ("walk_test/dir3/dir31",
377           [],
378           []),
379          ]
380        self._walk_test(expected_result, top="walk_test")
381
382    def test_walk_depth_first(self):
383        # Preparation: build tree in directory `walk_test`
384        expected_result = [
385          ("walk_test/dir1/dir11",
386           [],
387           []),
388          #
389          ("walk_test/dir1/dir12/dir123",
390           [],
391           ["file1234"]),
392          #
393          ("walk_test/dir1/dir12",
394           ["dir123"],
395           ["file121", "file122"]),
396          #
397          ("walk_test/dir1",
398           ["dir11", "dir12"],
399           []),
400          #
401          ("walk_test/dir2",
402           [],
403           []),
404          #
405          ("walk_test/dir3/dir31",
406           [],
407           []),
408          #
409          ("walk_test/dir3",
410           ["dir31", "dir32"],
411           ["file31", "file32"]),
412          #
413          ("walk_test",
414           ["dir1", "dir2", "dir3"],
415           ["file4"])
416          ]
417        self._walk_test(expected_result, top="walk_test", topdown=False)
418
419    def test_walk_following_links(self):
420        # Preparation: build tree in directory `walk_test`.
421        expected_result = [
422          ("walk_test",
423           ["dir1", "dir2", "dir3"],
424           ["file4"]),
425          #
426          ("walk_test/dir1",
427           ["dir11", "dir12"],
428           []),
429          #
430          ("walk_test/dir1/dir11",
431           [],
432           []),
433          #
434          ("walk_test/dir1/dir12",
435           ["dir123"],
436           ["file121", "file122"]),
437          #
438          ("walk_test/dir1/dir12/dir123",
439           [],
440           ["file1234"]),
441          #
442          ("walk_test/dir2",
443           [],
444           []),
445          #
446          ("walk_test/dir3",
447           ["dir31", "dir32"],
448           ["file31", "file32"]),
449          #
450          ("walk_test/dir3/dir31",
451           [],
452           []),
453          #
454          ("walk_test/dir3/dir32",
455           [],
456           ["file1234"]),
457          ]
458        self._walk_test(expected_result, top="walk_test", followlinks=True)
459
460
461class TestRename(RealFTPTest):
462
463    def test_rename(self):
464        host = self.host
465        # Make sure the target of the renaming operation is removed.
466        self.cleaner.add_file("_testfile2_")
467        self.make_remote_file("_testfile1_")
468        host.rename("_testfile1_", "_testfile2_")
469        assert not host.path.exists("_testfile1_")
470        assert host.path.exists("_testfile2_")
471
472    def test_rename_with_spaces_in_directory(self):
473        host = self.host
474        dir_name = "_dir with spaces_"
475        self.cleaner.add_dir(dir_name)
476        host.mkdir(dir_name)
477        self.make_remote_file(dir_name + "/testfile1")
478        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
479        assert not host.path.exists(dir_name + "/testfile1")
480        assert host.path.exists(dir_name + "/testfile2")
481
482
483class TestStat(RealFTPTest):
484
485    def test_stat(self):
486        host = self.host
487        dir_name = "_testdir_"
488        file_name = host.path.join(dir_name, "_nonempty_")
489        # Make a directory and a file in it.
490        self.cleaner.add_dir(dir_name)
491        host.mkdir(dir_name)
492        with host.open(file_name, "wb") as fobj:
493            fobj.write(b"abc\x12\x34def\t")
494        # Do some stats
495        # - dir
496        dir_stat = host.stat(dir_name)
497        assert isinstance(dir_stat._st_name, ftputil.compat.unicode_type)
498        assert host.listdir(dir_name) == ["_nonempty_"]
499        assert host.path.isdir(dir_name)
500        assert not host.path.isfile(dir_name)
501        assert not host.path.islink(dir_name)
502        # - file
503        file_stat = host.stat(file_name)
504        assert isinstance(file_stat._st_name, ftputil.compat.unicode_type)
505        assert not host.path.isdir(file_name)
506        assert host.path.isfile(file_name)
507        assert not host.path.islink(file_name)
508        assert host.path.getsize(file_name) == 9
509        # - file's modification time; allow up to two minutes difference
510        host.synchronize_times()
511        server_mtime = host.path.getmtime(file_name)
512        client_mtime = time.mktime(time.localtime())
513        calculated_time_shift = server_mtime - client_mtime
514        assert not abs(calculated_time_shift-host.time_shift()) > 120
515
516    def test_issomething_for_nonexistent_directory(self):
517        host = self.host
518        # Check if we get the right results if even the containing
519        # directory doesn't exist (see ticket #66).
520        nonexistent_path = "/nonexistent/nonexistent"
521        assert not host.path.isdir(nonexistent_path)
522        assert not host.path.isfile(nonexistent_path)
523        assert not host.path.islink(nonexistent_path)
524
525    def test_special_broken_link(self):
526        # Test for ticket #39.
527        host = self.host
528        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
529        assert (host.lstat(broken_link_name)._st_target ==
530                "../nonexistent/nonexistent")
531        assert not host.path.isdir(broken_link_name)
532        assert not host.path.isfile(broken_link_name)
533        assert host.path.islink(broken_link_name)
534
535    def test_concurrent_access(self):
536        self.make_remote_file("_testfile_")
537        with ftputil.FTPHost(*self.login_data) as host1:
538            with ftputil.FTPHost(*self.login_data) as host2:
539                stat_result1 = host1.stat("_testfile_")
540                stat_result2 = host2.stat("_testfile_")
541                assert stat_result1 == stat_result2
542                host2.remove("_testfile_")
543                # Can still get the result via `host1`
544                stat_result1 = host1.stat("_testfile_")
545                assert stat_result1 == stat_result2
546                # Stat'ing on `host2` gives an exception.
547                with pytest.raises(ftputil.error.PermanentError):
548                    host2.stat("_testfile_")
549                # Stat'ing on `host1` after invalidation
550                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
551                host1.stat_cache.invalidate(absolute_path)
552                with pytest.raises(ftputil.error.PermanentError):
553                    host1.stat("_testfile_")
554
555    def test_cache_auto_resizing(self):
556        """Test if the cache is resized appropriately."""
557        host = self.host
558        cache = host.stat_cache._cache
559        # Make sure the cache size isn't adjusted towards smaller values.
560        unused_entries = host.listdir("walk_test")
561        assert cache.size == ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE
562        # Make the cache very small initially and see if it gets resized.
563        cache.size = 2
564        entries = host.listdir("walk_test")
565        # The adjusted cache size should be larger or equal to the
566        # number of items in `walk_test` and its parent directory. The
567        # latter is read implicitly upon `listdir`'s `isdir` call.
568        expected_min_cache_size = max(len(host.listdir(host.curdir)),
569                                      len(entries))
570        assert cache.size >= expected_min_cache_size
571
572
573class TestUploadAndDownload(RealFTPTest):
574    """Test upload and download (including time shift test)."""
575
576    def test_time_shift(self):
577        self.host.synchronize_times()
578        assert self.host.time_shift() == EXPECTED_TIME_SHIFT
579
580    @pytest.mark.slow_test
581    def test_upload(self):
582        host = self.host
583        host.synchronize_times()
584        local_file = "_local_file_"
585        remote_file = "_remote_file_"
586        # Make local file to upload.
587        self.make_local_file()
588        # Wait, else small time differences between client and server
589        # actually could trigger the update.
590        time.sleep(65)
591        try:
592            self.cleaner.add_file(remote_file)
593            host.upload(local_file, remote_file)
594            # Retry; shouldn't be uploaded
595            uploaded = host.upload_if_newer(local_file, remote_file)
596            assert uploaded is False
597            # Rewrite the local file.
598            self.make_local_file()
599            # Retry; should be uploaded now
600            uploaded = host.upload_if_newer(local_file, remote_file)
601            assert uploaded is True
602        finally:
603            # Clean up
604            os.unlink(local_file)
605
606    @pytest.mark.slow_test
607    def test_download(self):
608        host = self.host
609        host.synchronize_times()
610        local_file = "_local_file_"
611        remote_file = "_remote_file_"
612        # Make a remote file.
613        self.make_remote_file(remote_file)
614        # File should be downloaded as it's not present yet.
615        downloaded = host.download_if_newer(remote_file, local_file)
616        assert downloaded is True
617        try:
618            # If the remote file, taking the datetime precision into
619            # account, _might_ be newer, the file will be downloaded
620            # again. To prevent this, wait a bit over a minute (the
621            # remote precision), then "touch" the local file.
622            time.sleep(65)
623            # Create empty file.
624            with open(local_file, "w") as fobj:
625                pass
626            # Local file is present and newer, so shouldn't download.
627            downloaded = host.download_if_newer(remote_file, local_file)
628            assert downloaded is False
629            # Re-make the remote file.
630            self.make_remote_file(remote_file)
631            # Local file is present but possibly older (taking the
632            # possible deviation because of the precision into account),
633            # so should download.
634            downloaded = host.download_if_newer(remote_file, local_file)
635            assert downloaded is True
636        finally:
637            # Clean up.
638            os.unlink(local_file)
639
640    def test_callback_with_transfer(self):
641        host = self.host
642        FILE_NAME = "debian-keyring.tar.gz"
643        # Default chunk size as in `FTPHost.copyfileobj`
644        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
645        file_size = host.path.getsize(FILE_NAME)
646        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
647        # Add one chunk for remainder.
648        chunk_count += 1
649        # Define a callback that just collects all data passed to it.
650        transferred_chunks_list = []
651        def test_callback(chunk):
652            transferred_chunks_list.append(chunk)
653        try:
654            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
655            # Construct a list of data chunks we expect.
656            expected_chunks_list = []
657            with open(FILE_NAME, "rb") as downloaded_fobj:
658                while True:
659                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
660                    if not chunk:
661                        break
662                    expected_chunks_list.append(chunk)
663            # Examine data collected by callback function.
664            assert len(transferred_chunks_list) == chunk_count
665            assert transferred_chunks_list == expected_chunks_list
666        finally:
667            os.unlink(FILE_NAME)
668
669
670class TestFTPFiles(RealFTPTest):
671
672    def test_only_closed_children(self):
673        REMOTE_FILE_NAME = "CONTENTS"
674        host = self.host
675        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
676            # Create empty file and close it.
677            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
678                pass
679            # This should re-use the second child because the first isn't
680            # closed but the second is.
681            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
682                assert len(host._children) == 2
683                assert file_obj._host is host._children[1]
684
685    def test_no_timed_out_children(self):
686        REMOTE_FILE_NAME = "CONTENTS"
687        host = self.host
688        # Implicitly create child host object.
689        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
690            pass
691        # Monkey-patch file to simulate an FTP server timeout below.
692        def timed_out_pwd():
693            raise ftplib.error_temp("simulated timeout")
694        file_obj1._host._session.pwd = timed_out_pwd
695        # Try to get a file - which shouldn't be the timed-out file.
696        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
697            assert file_obj1 is not file_obj2
698        # Re-use closed and not timed-out child session.
699        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
700            pass
701        assert file_obj2 is file_obj3
702
703    def test_no_delayed_226_children(self):
704        REMOTE_FILE_NAME = "CONTENTS"
705        host = self.host
706        # Implicitly create child host object.
707        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
708            pass
709        # Monkey-patch file to simulate an FTP server timeout below.
710        def timed_out_pwd():
711            raise ftplib.error_reply("delayed 226 reply")
712        file_obj1._host._session.pwd = timed_out_pwd
713        # Try to get a file - which shouldn't be the timed-out file.
714        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
715            assert file_obj1 is not file_obj2
716        # Re-use closed and not timed-out child session.
717        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
718            pass
719        assert file_obj2 is file_obj3
720
721
722class TestChmod(RealFTPTest):
723
724    def assert_mode(self, path, expected_mode):
725        """
726        Return an integer containing the allowed bits in the mode
727        change command.
728
729        The `FTPHost` object to test against is `self.host`.
730        """
731        full_mode = self.host.stat(path).st_mode
732        # Remove flags we can't set via `chmod`.
733        # Allowed flags according to Python documentation
734        # https://docs.python.org/library/stat.html
735        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
736          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
737          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
738          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
739          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
740        allowed_mask = functools.reduce(operator.or_, allowed_flags)
741        mode = full_mode & allowed_mask
742        assert mode == expected_mode, (
743                 "mode {0:o} != {1:o}".format(mode, expected_mode))
744
745    def test_chmod_existing_directory(self):
746        host = self.host
747        host.mkdir("_test dir_")
748        self.cleaner.add_dir("_test dir_")
749        # Set/get mode of a directory.
750        host.chmod("_test dir_", 0o757)
751        self.assert_mode("_test dir_", 0o757)
752        # Set/get mode in nested directory.
753        host.mkdir("_test dir_/nested_dir")
754        self.cleaner.add_dir("_test dir_/nested_dir")
755        host.chmod("_test dir_/nested_dir", 0o757)
756        self.assert_mode("_test dir_/nested_dir", 0o757)
757
758    def test_chmod_existing_file(self):
759        host = self.host
760        host.mkdir("_test dir_")
761        self.cleaner.add_dir("_test dir_")
762        # Set/get mode on a file.
763        file_name = host.path.join("_test dir_", "_testfile_")
764        self.make_remote_file(file_name)
765        host.chmod(file_name, 0o646)
766        self.assert_mode(file_name, 0o646)
767
768    def test_chmod_nonexistent_path(self):
769        # Set/get mode of a non-existing item.
770        with pytest.raises(ftputil.error.PermanentError):
771            self.host.chmod("nonexistent", 0o757)
772
773    def test_cache_invalidation(self):
774        host = self.host
775        host.mkdir("_test dir_")
776        self.cleaner.add_dir("_test dir_")
777        # Make sure the mode is in the cache.
778        unused_stat_result = host.stat("_test dir_")
779        # Set/get mode of the directory.
780        host.chmod("_test dir_", 0o757)
781        self.assert_mode("_test dir_", 0o757)
782        # Set/get mode on a file.
783        file_name = host.path.join("_test dir_", "_testfile_")
784        self.make_remote_file(file_name)
785        # Make sure the mode is in the cache.
786        unused_stat_result = host.stat(file_name)
787        host.chmod(file_name, 0o646)
788        self.assert_mode(file_name, 0o646)
789
790
791class TestRestArgument(RealFTPTest):
792
793    TEST_FILE_NAME = "rest_test"
794
795    def setup_method(self, method):
796        super(TestRestArgument, self).setup_method(method)
797        # Write test file.
798        with self.host.open(self.TEST_FILE_NAME, "wb") as fobj:
799            fobj.write(b"abcdefghijkl")
800        self.cleaner.add_file(self.TEST_FILE_NAME)
801
802    def test_for_reading(self):
803        """
804        If a `rest` argument is passed to `open`, the following read
805        operation should start at the byte given by `rest`.
806        """
807        with self.host.open(self.TEST_FILE_NAME, "rb", rest=3) as fobj:
808            data = fobj.read()
809        assert data == b"defghijkl"
810
811    def test_for_writing(self):
812        """
813        If a `rest` argument is passed to `open`, the following write
814        operation should start writing at the byte given by `rest`.
815        """
816        with self.host.open(self.TEST_FILE_NAME, "wb", rest=3) as fobj:
817            fobj.write(b"123")
818        with self.host.open(self.TEST_FILE_NAME, "rb") as fobj:
819            data = fobj.read()
820        assert data == b"abc123"
821
822    def test_invalid_read_from_text_file(self):
823        """
824        If the `rest` argument is used for reading from a text file,
825        a `CommandNotImplementedError` should be raised.
826        """
827        with pytest.raises(ftputil.error.CommandNotImplementedError):
828            self.host.open(self.TEST_FILE_NAME, "r", rest=3)
829
830    def test_invalid_write_to_text_file(self):
831        """
832        If the `rest` argument is used for reading from a text file,
833        a `CommandNotImplementedError` should be raised.
834        """
835        with pytest.raises(ftputil.error.CommandNotImplementedError):
836            self.host.open(self.TEST_FILE_NAME, "w", rest=3)
837
838    # There are no tests for reading and writing beyond the end of a
839    # file. For example, if the remote file is 10 bytes long and
840    # `open(remote_file, "rb", rest=100)` is used, the server may
841    # return an error status code or not.
842    #
843    # The server I use for testing returns a 554 status when
844    # attempting to _read_ beyond the end of the file. On the other
845    # hand, if attempting to _write_ beyond the end of the file, the
846    # server accepts the request, but starts writing after the end of
847    # the file, i. e. appends to the file.
848    #
849    # Instead of expecting certain responses that may differ between
850    # server implementations, I leave the bahavior for too large
851    # `rest` arguments undefined. In practice, this shouldn't be a
852    # problem because the `rest` argument should only be used for
853    # error recovery, and in this case a valid byte count for the
854    # `rest` argument should be known.
855
856
857class TestOther(RealFTPTest):
858
859    def test_open_for_reading(self):
860        # Test for issues #17 and #51,
861        # http://ftputil.sschwarzer.net/trac/ticket/17 and
862        # http://ftputil.sschwarzer.net/trac/ticket/51 .
863        file1 = self.host.open("debian-keyring.tar.gz", "rb")
864        time.sleep(1)
865        # Depending on the FTP server, this might return a status code
866        # unexpected by `ftplib` or block the socket connection until
867        # a server-side timeout.
868        file1.close()
869
870    def test_subsequent_reading(self):
871        # Open a file for reading.
872        with self.host.open("CONTENTS", "rb") as file1:
873            pass
874        # Make sure that there are no problems if the connection is reused.
875        with self.host.open("CONTENTS", "rb") as file2:
876            pass
877        assert file1._session is file2._session
878
879    def test_names_with_spaces(self):
880        # Test if directories and files with spaces in their names
881        # can be used.
882        host = self.host
883        assert host.path.isdir("dir with spaces")
884        assert (host.listdir("dir with spaces") ==
885                ["second dir", "some file", "some_file"])
886        assert host.path.isdir("dir with spaces/second dir")
887        assert host.path.isfile("dir with spaces/some_file")
888        assert host.path.isfile("dir with spaces/some file")
889
890    def test_synchronize_times_without_write_access(self):
891        """Test failing synchronization because of non-writable directory."""
892        host = self.host
893        # This isn't writable by the ftp account the tests are run under.
894        host.chdir("rootdir1")
895        with pytest.raises(ftputil.error.TimeShiftError):
896            host.synchronize_times()
897
898    def test_listdir_with_non_ascii_byte_string(self):
899        """
900        `listdir` should accept byte strings with non-ASCII
901        characters and return non-ASCII characters in directory or
902        file names.
903        """
904        host = self.host
905        path = "äbc".encode("UTF-8")
906        names = host.listdir(path)
907        assert names[0] == b"file1"
908        assert names[1] == "file1_ö".encode("UTF-8")
909
910    def test_listdir_with_non_ascii_unicode_string(self):
911        """
912        `listdir` should accept unicode strings with non-ASCII
913        characters and return non-ASCII characters in directory or
914        file names.
915        """
916        host = self.host
917        # `ftplib` under Python 3 only works correctly if the unicode
918        # strings are decoded from latin1. Under Python 2, ftputil
919        # is supposed to provide a compatible interface.
920        path = "äbc".encode("UTF-8").decode("latin1")
921        names = host.listdir(path)
922        assert names[0] == "file1"
923        assert names[1] == "file1_ö".encode("UTF-8").decode("latin1")
924
925    def test_path_with_non_latin1_unicode_string(self):
926        """
927        ftputil operations shouldn't accept file paths with non-latin1
928        characters.
929        """
930        # Use some musical symbols. These are certainly not latin1.
931        path = "𝄞𝄢"
932        # `UnicodeEncodeError` is also the exception that `ftplib`
933        # raises if it gets a non-latin1 path.
934        with pytest.raises(UnicodeEncodeError):
935            self.host.mkdir(path)
936
937    def test_list_a_option(self):
938        # For this test to pass, the server must _not_ list "hidden"
939        # files by default but instead only when the `LIST` `-a`
940        # option is used.
941        host = self.host
942        assert not host.use_list_a_option
943        directory_entries = host.listdir(host.curdir)
944        assert ".hidden" not in directory_entries
945        # Switch on showing of hidden paths.
946        host.use_list_a_option = True
947        directory_entries = host.listdir(host.curdir)
948        assert ".hidden" in directory_entries
949
950    def _make_objects_to_be_garbage_collected(self):
951        for _ in range(10):
952            with ftputil.FTPHost(*self.login_data) as host:
953                for _ in range(10):
954                    unused_stat_result = host.stat("CONTENTS")
955                    with host.open("CONTENTS") as fobj:
956                        unused_data = fobj.read()
957
958    def test_garbage_collection(self):
959        """Test whether there are cycles which prevent garbage collection."""
960        gc.collect()
961        objects_before_test = len(gc.garbage)
962        self._make_objects_to_be_garbage_collected()
963        gc.collect()
964        objects_after_test = len(gc.garbage)
965        assert not objects_after_test - objects_before_test
Note: See TracBrowser for help on using the repository browser.