source: test/test_real_ftp.py @ 1713:f146a1ea66aa

Last change on this file since 1713:f146a1ea66aa was 1713:f146a1ea66aa, checked in by Stefan Schwarzer <sschwarzer@…>, 8 months ago
Remove `__future__` imports With the switch to Python 3.x-only, the `__future__` imports are no longer needed. Update copyright years along with the `__future__` import removal.
File size: 36.1 KB
Line 
1# encoding: UTF-8
2# Copyright (C) 2003-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7#
8# This test writes some files and directories on the local client and
9# the remote server. You'll need write access in the login directory.
10# This test can take a few minutes because it has to wait to test the
11# timezone calculation.
12
13import ftplib
14import functools
15import gc
16import operator
17import os
18import time
19import stat
20
21import pytest
22
23import ftputil.compat
24import ftputil.error
25import ftputil.file_transfer
26import ftputil.session
27import ftputil.stat_cache
28
29import test
30
31
32def utc_local_time_shift():
33    """
34    Return the expected time shift in seconds assuming the server
35    uses UTC in its listings and the client uses local time.
36
37    This is needed because Pure-FTPd meanwhile seems to insist that
38    the displayed time for files is in UTC.
39    """
40    utc_tuple = time.gmtime()
41    localtime_tuple = time.localtime()
42    # To calculate the correct times shift, we need to ignore the
43    # DST component in the localtime tuple, i. e. set it to 0.
44    localtime_tuple = localtime_tuple[:-1] + (0,)
45    time_shift_in_seconds = (time.mktime(utc_tuple) -
46                             time.mktime(localtime_tuple))
47    # To be safe, round the above value to units of 3600 s (1 hour).
48    return round(time_shift_in_seconds / 3600.0) * 3600
49
50# Difference between local times of server and client. If 0.0, server
51# and client use the same timezone.
52#EXPECTED_TIME_SHIFT = utc_local_time_shift()
53# Pure-FTPd seems to have changed its mind (see docstring of
54# `utc_local_time_shift`).
55EXPECTED_TIME_SHIFT = 0.0
56
57
58class Cleaner(object):
59    """
60    This class helps remove directories and files which might
61    otherwise be left behind if a test fails in unexpected ways.
62    """
63
64    def __init__(self, host):
65        # The test class (probably `RealFTPTest`) and the helper
66        # class share the same `FTPHost` object.
67        self._host = host
68        self._ftp_items = []
69
70    def add_dir(self, path):
71        """Schedule a directory with path `path` for removal."""
72        self._ftp_items.append(("d", self._host.path.abspath(path)))
73
74    def add_file(self, path):
75        """Schedule a file with path `path` for removal."""
76        self._ftp_items.append(("f", self._host.path.abspath(path)))
77
78    def clean(self):
79        """
80        Remove the directories and files previously remembered.
81        The removal works in reverse order of the scheduling with
82        `add_dir` and `add_file`.
83
84        Errors due to a removal are ignored.
85        """
86        self._host.chdir("/")
87        for type_, path in reversed(self._ftp_items):
88            try:
89                if type_ == "d":
90                    # If something goes wrong in `rmtree` we might
91                    # leave a mess behind.
92                    self._host.rmtree(path)
93                elif type_ == "f":
94                    # Minor mess if `remove` fails
95                    self._host.remove(path)
96            except ftputil.error.FTPError:
97                pass
98
99
100class RealFTPTest(object):
101
102    def setup_method(self, method):
103        # Server, username, password.
104        self.login_data = ("localhost", "ftptest",
105                            "d605581757de5eb56d568a4419f4126e")
106        self.host = ftputil.FTPHost(*self.login_data)
107        self.cleaner = Cleaner(self.host)
108
109    def teardown_method(self, method):
110        self.cleaner.clean()
111        self.host.close()
112
113    #
114    # Helper methods
115    #
116    def make_remote_file(self, path):
117        """Create a file on the FTP host."""
118        self.cleaner.add_file(path)
119        with self.host.open(path, "wb") as file_:
120            # Write something. Otherwise the FTP server might not update
121            # the time of last modification if the file existed before.
122            file_.write(b"\n")
123
124    def make_local_file(self):
125        """Create a file on the local host (= on the client side)."""
126        with open("_local_file_", "wb") as fobj:
127            fobj.write(b"abc\x12\x34def\t")
128
129
130class TestMkdir(RealFTPTest):
131
132    def test_mkdir_rmdir(self):
133        host = self.host
134        dir_name = "_testdir_"
135        file_name = host.path.join(dir_name, "_nonempty_")
136        self.cleaner.add_dir(dir_name)
137        # Make dir and check if the directory is there.
138        host.mkdir(dir_name)
139        files = host.listdir(host.curdir)
140        assert dir_name in files
141        # Try to remove a non-empty directory.
142        self.cleaner.add_file(file_name)
143        non_empty = host.open(file_name, "w")
144        non_empty.close()
145        with pytest.raises(ftputil.error.PermanentError):
146            host.rmdir(dir_name)
147        # Remove file.
148        host.unlink(file_name)
149        # `remove` on a directory should fail.
150        try:
151            try:
152                host.remove(dir_name)
153            except ftputil.error.PermanentError as exc:
154                assert str(exc).startswith(
155                         "remove/unlink can only delete files")
156            else:
157                pytest.fail("we shouldn't have come here")
158        finally:
159            # Delete empty directory.
160            host.rmdir(dir_name)
161        files = host.listdir(host.curdir)
162        assert dir_name not in files
163
164    def test_makedirs_without_existing_dirs(self):
165        host = self.host
166        # No `_dir1_` yet
167        assert "_dir1_" not in host.listdir(host.curdir)
168        # Vanilla case, all should go well.
169        host.makedirs("_dir1_/dir2/dir3/dir4")
170        self.cleaner.add_dir("_dir1_")
171        # Check host.
172        assert host.path.isdir("_dir1_")
173        assert host.path.isdir("_dir1_/dir2")
174        assert host.path.isdir("_dir1_/dir2/dir3")
175        assert host.path.isdir("_dir1_/dir2/dir3/dir4")
176
177    def test_makedirs_from_non_root_directory(self):
178        # This is a testcase for issue #22, see
179        # http://ftputil.sschwarzer.net/trac/ticket/22 .
180        host = self.host
181        # No `_dir1_` and `_dir2_` yet
182        assert "_dir1_" not in host.listdir(host.curdir)
183        assert "_dir2_" not in host.listdir(host.curdir)
184        # Part 1: Try to make directories starting from `_dir1_` and
185        # change to non-root directory.
186        self.cleaner.add_dir("_dir1_")
187        host.mkdir("_dir1_")
188        host.chdir("_dir1_")
189        host.makedirs("_dir2_/_dir3_")
190        # Test for expected directory hierarchy.
191        assert host.path.isdir("/_dir1_")
192        assert host.path.isdir("/_dir1_/_dir2_")
193        assert host.path.isdir("/_dir1_/_dir2_/_dir3_")
194        assert not host.path.isdir("/_dir1_/_dir1_")
195        # Remove all but the directory we're in.
196        host.rmdir("/_dir1_/_dir2_/_dir3_")
197        host.rmdir("/_dir1_/_dir2_")
198        # Part 2: Try to make directories starting from root.
199        self.cleaner.add_dir("/_dir2_")
200        host.makedirs("/_dir2_/_dir3_")
201        # Test for expected directory hierarchy
202        assert host.path.isdir("/_dir2_")
203        assert host.path.isdir("/_dir2_/_dir3_")
204        assert not host.path.isdir("/_dir1_/_dir2_")
205
206    def test_makedirs_of_existing_directory(self):
207        host = self.host
208        # The (chrooted) login directory
209        host.makedirs("/")
210
211    def test_makedirs_with_file_in_the_way(self):
212        host = self.host
213        self.cleaner.add_dir("_dir1_")
214        host.mkdir("_dir1_")
215        self.make_remote_file("_dir1_/file1")
216        # Try it.
217        with pytest.raises(ftputil.error.PermanentError):
218            host.makedirs("_dir1_/file1")
219        with pytest.raises(ftputil.error.PermanentError):
220            host.makedirs("_dir1_/file1/dir2")
221
222    def test_makedirs_with_existing_directory(self):
223        host = self.host
224        self.cleaner.add_dir("_dir1_")
225        host.mkdir("_dir1_")
226        host.makedirs("_dir1_/dir2")
227        # Check
228        assert host.path.isdir("_dir1_")
229        assert host.path.isdir("_dir1_/dir2")
230
231    def test_makedirs_in_non_writable_directory(self):
232        host = self.host
233        # Preparation: `rootdir1` exists but is only writable by root.
234        with pytest.raises(ftputil.error.PermanentError):
235            host.makedirs("rootdir1/dir2")
236
237    def test_makedirs_with_writable_directory_at_end(self):
238        host = self.host
239        self.cleaner.add_dir("rootdir2/dir2")
240        # Preparation: `rootdir2` exists but is only writable by root.
241        # `dir2` is writable by regular ftp users. Both directories
242        # below should work.
243        host.makedirs("rootdir2/dir2")
244        host.makedirs("rootdir2/dir2/dir3")
245
246
247class TestRemoval(RealFTPTest):
248
249    def test_rmtree_without_error_handler(self):
250        host = self.host
251        # Build a tree.
252        self.cleaner.add_dir("_dir1_")
253        host.makedirs("_dir1_/dir2")
254        self.make_remote_file("_dir1_/file1")
255        self.make_remote_file("_dir1_/file2")
256        self.make_remote_file("_dir1_/dir2/file3")
257        self.make_remote_file("_dir1_/dir2/file4")
258        # Try to remove a _file_ with `rmtree`.
259        with pytest.raises(ftputil.error.PermanentError):
260            host.rmtree("_dir1_/file2")
261        # Remove `dir2`.
262        host.rmtree("_dir1_/dir2")
263        assert not host.path.exists("_dir1_/dir2")
264        assert host.path.exists("_dir1_/file2")
265        # Re-create `dir2` and remove `_dir1_`.
266        host.mkdir("_dir1_/dir2")
267        self.make_remote_file("_dir1_/dir2/file3")
268        self.make_remote_file("_dir1_/dir2/file4")
269        host.rmtree("_dir1_")
270        assert not host.path.exists("_dir1_")
271
272    def test_rmtree_with_error_handler(self):
273        host = self.host
274        self.cleaner.add_dir("_dir1_")
275        host.mkdir("_dir1_")
276        self.make_remote_file("_dir1_/file1")
277        # Prepare error "handler"
278        log = []
279        def error_handler(*args):
280            log.append(args)
281        # Try to remove a file as root "directory".
282        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
283        assert log == []
284        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
285        assert log[0][0] == host.listdir
286        assert log[0][1] == "_dir1_/file1"
287        assert log[1][0] == host.rmdir
288        assert log[1][1] == "_dir1_/file1"
289        host.rmtree("_dir1_")
290        # Try to remove a non-existent directory.
291        del log[:]
292        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
293        assert log[0][0] == host.listdir
294        assert log[0][1] == "_dir1_"
295        assert log[1][0] == host.rmdir
296        assert log[1][1] == "_dir1_"
297
298    def test_remove_non_existent_item(self):
299        host = self.host
300        with pytest.raises(ftputil.error.PermanentError):
301            host.remove("nonexistent")
302
303    def test_remove_existing_file(self):
304        self.cleaner.add_file("_testfile_")
305        self.make_remote_file("_testfile_")
306        host = self.host
307        assert host.path.isfile("_testfile_")
308        host.remove("_testfile_")
309        assert not host.path.exists("_testfile_")
310
311
312class TestWalk(RealFTPTest):
313    """
314    Walk the directory tree
315
316      walk_test
317      ├── dir1
318      │   ├── dir11
319      │   └── dir12
320      │       ├── dir123
321      │       │   └── file1234
322      │       ├── file121
323      │       └── file122
324      ├── dir2
325      ├── dir3
326      │   ├── dir31
327      │   ├── dir32 -> ../dir1/dir12/dir123
328      │   ├── file31
329      │   └── file32
330      └── file4
331
332    and check if the results are the expected ones.
333    """
334
335    def _walk_test(self, expected_result, **walk_kwargs):
336        """Walk the directory and test results."""
337        # Collect data using `walk`.
338        actual_result = []
339        for items in self.host.walk(**walk_kwargs):
340            actual_result.append(items)
341        # Compare with expected results.
342        assert len(actual_result) == len(expected_result)
343        for index, _ in enumerate(actual_result):
344            assert actual_result[index] == expected_result[index]
345
346    def test_walk_topdown(self):
347        # Preparation: build tree in directory `walk_test`.
348        expected_result = [
349          ("walk_test",
350           ["dir1", "dir2", "dir3"],
351           ["file4"]),
352          #
353          ("walk_test/dir1",
354           ["dir11", "dir12"],
355           []),
356          #
357          ("walk_test/dir1/dir11",
358           [],
359           []),
360          #
361          ("walk_test/dir1/dir12",
362           ["dir123"],
363           ["file121", "file122"]),
364          #
365          ("walk_test/dir1/dir12/dir123",
366           [],
367           ["file1234"]),
368          #
369          ("walk_test/dir2",
370           [],
371           []),
372          #
373          ("walk_test/dir3",
374           ["dir31", "dir32"],
375           ["file31", "file32"]),
376          #
377          ("walk_test/dir3/dir31",
378           [],
379           []),
380          ]
381        self._walk_test(expected_result, top="walk_test")
382
383    def test_walk_depth_first(self):
384        # Preparation: build tree in directory `walk_test`
385        expected_result = [
386          ("walk_test/dir1/dir11",
387           [],
388           []),
389          #
390          ("walk_test/dir1/dir12/dir123",
391           [],
392           ["file1234"]),
393          #
394          ("walk_test/dir1/dir12",
395           ["dir123"],
396           ["file121", "file122"]),
397          #
398          ("walk_test/dir1",
399           ["dir11", "dir12"],
400           []),
401          #
402          ("walk_test/dir2",
403           [],
404           []),
405          #
406          ("walk_test/dir3/dir31",
407           [],
408           []),
409          #
410          ("walk_test/dir3",
411           ["dir31", "dir32"],
412           ["file31", "file32"]),
413          #
414          ("walk_test",
415           ["dir1", "dir2", "dir3"],
416           ["file4"])
417          ]
418        self._walk_test(expected_result, top="walk_test", topdown=False)
419
420    def test_walk_following_links(self):
421        # Preparation: build tree in directory `walk_test`.
422        expected_result = [
423          ("walk_test",
424           ["dir1", "dir2", "dir3"],
425           ["file4"]),
426          #
427          ("walk_test/dir1",
428           ["dir11", "dir12"],
429           []),
430          #
431          ("walk_test/dir1/dir11",
432           [],
433           []),
434          #
435          ("walk_test/dir1/dir12",
436           ["dir123"],
437           ["file121", "file122"]),
438          #
439          ("walk_test/dir1/dir12/dir123",
440           [],
441           ["file1234"]),
442          #
443          ("walk_test/dir2",
444           [],
445           []),
446          #
447          ("walk_test/dir3",
448           ["dir31", "dir32"],
449           ["file31", "file32"]),
450          #
451          ("walk_test/dir3/dir31",
452           [],
453           []),
454          #
455          ("walk_test/dir3/dir32",
456           [],
457           ["file1234"]),
458          ]
459        self._walk_test(expected_result, top="walk_test", followlinks=True)
460
461
462class TestRename(RealFTPTest):
463
464    def test_rename(self):
465        host = self.host
466        # Make sure the target of the renaming operation is removed.
467        self.cleaner.add_file("_testfile2_")
468        self.make_remote_file("_testfile1_")
469        host.rename("_testfile1_", "_testfile2_")
470        assert not host.path.exists("_testfile1_")
471        assert host.path.exists("_testfile2_")
472
473    def test_rename_with_spaces_in_directory(self):
474        host = self.host
475        dir_name = "_dir with spaces_"
476        self.cleaner.add_dir(dir_name)
477        host.mkdir(dir_name)
478        self.make_remote_file(dir_name + "/testfile1")
479        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
480        assert not host.path.exists(dir_name + "/testfile1")
481        assert host.path.exists(dir_name + "/testfile2")
482
483
484class TestStat(RealFTPTest):
485
486    def test_stat(self):
487        host = self.host
488        dir_name = "_testdir_"
489        file_name = host.path.join(dir_name, "_nonempty_")
490        # Make a directory and a file in it.
491        self.cleaner.add_dir(dir_name)
492        host.mkdir(dir_name)
493        with host.open(file_name, "wb") as fobj:
494            fobj.write(b"abc\x12\x34def\t")
495        # Do some stats
496        # - dir
497        dir_stat = host.stat(dir_name)
498        assert isinstance(dir_stat._st_name, ftputil.compat.unicode_type)
499        assert host.listdir(dir_name) == ["_nonempty_"]
500        assert host.path.isdir(dir_name)
501        assert not host.path.isfile(dir_name)
502        assert not host.path.islink(dir_name)
503        # - file
504        file_stat = host.stat(file_name)
505        assert isinstance(file_stat._st_name, ftputil.compat.unicode_type)
506        assert not host.path.isdir(file_name)
507        assert host.path.isfile(file_name)
508        assert not host.path.islink(file_name)
509        assert host.path.getsize(file_name) == 9
510        # - file's modification time; allow up to two minutes difference
511        host.synchronize_times()
512        server_mtime = host.path.getmtime(file_name)
513        client_mtime = time.mktime(time.localtime())
514        calculated_time_shift = server_mtime - client_mtime
515        assert not abs(calculated_time_shift-host.time_shift()) > 120
516
517    def test_issomething_for_nonexistent_directory(self):
518        host = self.host
519        # Check if we get the right results if even the containing
520        # directory doesn't exist (see ticket #66).
521        nonexistent_path = "/nonexistent/nonexistent"
522        assert not host.path.isdir(nonexistent_path)
523        assert not host.path.isfile(nonexistent_path)
524        assert not host.path.islink(nonexistent_path)
525
526    def test_special_broken_link(self):
527        # Test for ticket #39.
528        host = self.host
529        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
530        assert (host.lstat(broken_link_name)._st_target ==
531                "../nonexistent/nonexistent")
532        assert not host.path.isdir(broken_link_name)
533        assert not host.path.isfile(broken_link_name)
534        assert host.path.islink(broken_link_name)
535
536    def test_concurrent_access(self):
537        self.make_remote_file("_testfile_")
538        with ftputil.FTPHost(*self.login_data) as host1:
539            with ftputil.FTPHost(*self.login_data) as host2:
540                stat_result1 = host1.stat("_testfile_")
541                stat_result2 = host2.stat("_testfile_")
542                assert stat_result1 == stat_result2
543                host2.remove("_testfile_")
544                # Can still get the result via `host1`
545                stat_result1 = host1.stat("_testfile_")
546                assert stat_result1 == stat_result2
547                # Stat'ing on `host2` gives an exception.
548                with pytest.raises(ftputil.error.PermanentError):
549                    host2.stat("_testfile_")
550                # Stat'ing on `host1` after invalidation
551                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
552                host1.stat_cache.invalidate(absolute_path)
553                with pytest.raises(ftputil.error.PermanentError):
554                    host1.stat("_testfile_")
555
556    def test_cache_auto_resizing(self):
557        """Test if the cache is resized appropriately."""
558        host = self.host
559        cache = host.stat_cache._cache
560        # Make sure the cache size isn't adjusted towards smaller values.
561        unused_entries = host.listdir("walk_test")
562        assert cache.size == ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE
563        # Make the cache very small initially and see if it gets resized.
564        cache.size = 2
565        entries = host.listdir("walk_test")
566        # The adjusted cache size should be larger or equal to the
567        # number of items in `walk_test` and its parent directory. The
568        # latter is read implicitly upon `listdir`'s `isdir` call.
569        expected_min_cache_size = max(len(host.listdir(host.curdir)),
570                                      len(entries))
571        assert cache.size >= expected_min_cache_size
572
573
574class TestUploadAndDownload(RealFTPTest):
575    """Test upload and download (including time shift test)."""
576
577    def test_time_shift(self):
578        self.host.synchronize_times()
579        assert self.host.time_shift() == EXPECTED_TIME_SHIFT
580
581    @pytest.mark.slow_test
582    def test_upload(self):
583        host = self.host
584        host.synchronize_times()
585        local_file = "_local_file_"
586        remote_file = "_remote_file_"
587        # Make local file to upload.
588        self.make_local_file()
589        # Wait, else small time differences between client and server
590        # actually could trigger the update.
591        time.sleep(65)
592        try:
593            self.cleaner.add_file(remote_file)
594            host.upload(local_file, remote_file)
595            # Retry; shouldn't be uploaded
596            uploaded = host.upload_if_newer(local_file, remote_file)
597            assert uploaded is False
598            # Rewrite the local file.
599            self.make_local_file()
600            # Retry; should be uploaded now
601            uploaded = host.upload_if_newer(local_file, remote_file)
602            assert uploaded is True
603        finally:
604            # Clean up
605            os.unlink(local_file)
606
607    @pytest.mark.slow_test
608    def test_download(self):
609        host = self.host
610        host.synchronize_times()
611        local_file = "_local_file_"
612        remote_file = "_remote_file_"
613        # Make a remote file.
614        self.make_remote_file(remote_file)
615        # File should be downloaded as it's not present yet.
616        downloaded = host.download_if_newer(remote_file, local_file)
617        assert downloaded is True
618        try:
619            # If the remote file, taking the datetime precision into
620            # account, _might_ be newer, the file will be downloaded
621            # again. To prevent this, wait a bit over a minute (the
622            # remote precision), then "touch" the local file.
623            time.sleep(65)
624            # Create empty file.
625            with open(local_file, "w") as fobj:
626                pass
627            # Local file is present and newer, so shouldn't download.
628            downloaded = host.download_if_newer(remote_file, local_file)
629            assert downloaded is False
630            # Re-make the remote file.
631            self.make_remote_file(remote_file)
632            # Local file is present but possibly older (taking the
633            # possible deviation because of the precision into account),
634            # so should download.
635            downloaded = host.download_if_newer(remote_file, local_file)
636            assert downloaded is True
637        finally:
638            # Clean up.
639            os.unlink(local_file)
640
641    def test_callback_with_transfer(self):
642        host = self.host
643        FILE_NAME = "debian-keyring.tar.gz"
644        # Default chunk size as in `FTPHost.copyfileobj`
645        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
646        file_size = host.path.getsize(FILE_NAME)
647        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
648        # Add one chunk for remainder.
649        chunk_count += 1
650        # Define a callback that just collects all data passed to it.
651        transferred_chunks_list = []
652        def test_callback(chunk):
653            transferred_chunks_list.append(chunk)
654        try:
655            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
656            # Construct a list of data chunks we expect.
657            expected_chunks_list = []
658            with open(FILE_NAME, "rb") as downloaded_fobj:
659                while True:
660                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
661                    if not chunk:
662                        break
663                    expected_chunks_list.append(chunk)
664            # Examine data collected by callback function.
665            assert len(transferred_chunks_list) == chunk_count
666            assert transferred_chunks_list == expected_chunks_list
667        finally:
668            os.unlink(FILE_NAME)
669
670
671class TestFTPFiles(RealFTPTest):
672
673    def test_only_closed_children(self):
674        REMOTE_FILE_NAME = "CONTENTS"
675        host = self.host
676        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
677            # Create empty file and close it.
678            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
679                pass
680            # This should re-use the second child because the first isn't
681            # closed but the second is.
682            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
683                assert len(host._children) == 2
684                assert file_obj._host is host._children[1]
685
686    def test_no_timed_out_children(self):
687        REMOTE_FILE_NAME = "CONTENTS"
688        host = self.host
689        # Implicitly create child host object.
690        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
691            pass
692        # Monkey-patch file to simulate an FTP server timeout below.
693        def timed_out_pwd():
694            raise ftplib.error_temp("simulated timeout")
695        file_obj1._host._session.pwd = timed_out_pwd
696        # Try to get a file - which shouldn't be the timed-out file.
697        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
698            assert file_obj1 is not file_obj2
699        # Re-use closed and not timed-out child session.
700        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
701            pass
702        assert file_obj2 is file_obj3
703
704    def test_no_delayed_226_children(self):
705        REMOTE_FILE_NAME = "CONTENTS"
706        host = self.host
707        # Implicitly create child host object.
708        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
709            pass
710        # Monkey-patch file to simulate an FTP server timeout below.
711        def timed_out_pwd():
712            raise ftplib.error_reply("delayed 226 reply")
713        file_obj1._host._session.pwd = timed_out_pwd
714        # Try to get a file - which shouldn't be the timed-out file.
715        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
716            assert file_obj1 is not file_obj2
717        # Re-use closed and not timed-out child session.
718        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
719            pass
720        assert file_obj2 is file_obj3
721
722
723class TestChmod(RealFTPTest):
724
725    def assert_mode(self, path, expected_mode):
726        """
727        Return an integer containing the allowed bits in the mode
728        change command.
729
730        The `FTPHost` object to test against is `self.host`.
731        """
732        full_mode = self.host.stat(path).st_mode
733        # Remove flags we can't set via `chmod`.
734        # Allowed flags according to Python documentation
735        # https://docs.python.org/library/stat.html
736        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
737          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
738          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
739          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
740          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
741        allowed_mask = functools.reduce(operator.or_, allowed_flags)
742        mode = full_mode & allowed_mask
743        assert mode == expected_mode, (
744                 "mode {0:o} != {1:o}".format(mode, expected_mode))
745
746    def test_chmod_existing_directory(self):
747        host = self.host
748        host.mkdir("_test dir_")
749        self.cleaner.add_dir("_test dir_")
750        # Set/get mode of a directory.
751        host.chmod("_test dir_", 0o757)
752        self.assert_mode("_test dir_", 0o757)
753        # Set/get mode in nested directory.
754        host.mkdir("_test dir_/nested_dir")
755        self.cleaner.add_dir("_test dir_/nested_dir")
756        host.chmod("_test dir_/nested_dir", 0o757)
757        self.assert_mode("_test dir_/nested_dir", 0o757)
758
759    def test_chmod_existing_file(self):
760        host = self.host
761        host.mkdir("_test dir_")
762        self.cleaner.add_dir("_test dir_")
763        # Set/get mode on a file.
764        file_name = host.path.join("_test dir_", "_testfile_")
765        self.make_remote_file(file_name)
766        host.chmod(file_name, 0o646)
767        self.assert_mode(file_name, 0o646)
768
769    def test_chmod_nonexistent_path(self):
770        # Set/get mode of a non-existing item.
771        with pytest.raises(ftputil.error.PermanentError):
772            self.host.chmod("nonexistent", 0o757)
773
774    def test_cache_invalidation(self):
775        host = self.host
776        host.mkdir("_test dir_")
777        self.cleaner.add_dir("_test dir_")
778        # Make sure the mode is in the cache.
779        unused_stat_result = host.stat("_test dir_")
780        # Set/get mode of the directory.
781        host.chmod("_test dir_", 0o757)
782        self.assert_mode("_test dir_", 0o757)
783        # Set/get mode on a file.
784        file_name = host.path.join("_test dir_", "_testfile_")
785        self.make_remote_file(file_name)
786        # Make sure the mode is in the cache.
787        unused_stat_result = host.stat(file_name)
788        host.chmod(file_name, 0o646)
789        self.assert_mode(file_name, 0o646)
790
791
792class TestRestArgument(RealFTPTest):
793
794    TEST_FILE_NAME = "rest_test"
795
796    def setup_method(self, method):
797        super(TestRestArgument, self).setup_method(method)
798        # Write test file.
799        with self.host.open(self.TEST_FILE_NAME, "wb") as fobj:
800            fobj.write(b"abcdefghijkl")
801        self.cleaner.add_file(self.TEST_FILE_NAME)
802
803    def test_for_reading(self):
804        """
805        If a `rest` argument is passed to `open`, the following read
806        operation should start at the byte given by `rest`.
807        """
808        with self.host.open(self.TEST_FILE_NAME, "rb", rest=3) as fobj:
809            data = fobj.read()
810        assert data == b"defghijkl"
811
812    def test_for_writing(self):
813        """
814        If a `rest` argument is passed to `open`, the following write
815        operation should start writing at the byte given by `rest`.
816        """
817        with self.host.open(self.TEST_FILE_NAME, "wb", rest=3) as fobj:
818            fobj.write(b"123")
819        with self.host.open(self.TEST_FILE_NAME, "rb") as fobj:
820            data = fobj.read()
821        assert data == b"abc123"
822
823    def test_invalid_read_from_text_file(self):
824        """
825        If the `rest` argument is used for reading from a text file,
826        a `CommandNotImplementedError` should be raised.
827        """
828        with pytest.raises(ftputil.error.CommandNotImplementedError):
829            self.host.open(self.TEST_FILE_NAME, "r", rest=3)
830
831    def test_invalid_write_to_text_file(self):
832        """
833        If the `rest` argument is used for reading from a text file,
834        a `CommandNotImplementedError` should be raised.
835        """
836        with pytest.raises(ftputil.error.CommandNotImplementedError):
837            self.host.open(self.TEST_FILE_NAME, "w", rest=3)
838
839    # There are no tests for reading and writing beyond the end of a
840    # file. For example, if the remote file is 10 bytes long and
841    # `open(remote_file, "rb", rest=100)` is used, the server may
842    # return an error status code or not.
843    #
844    # The server I use for testing returns a 554 status when
845    # attempting to _read_ beyond the end of the file. On the other
846    # hand, if attempting to _write_ beyond the end of the file, the
847    # server accepts the request, but starts writing after the end of
848    # the file, i. e. appends to the file.
849    #
850    # Instead of expecting certain responses that may differ between
851    # server implementations, I leave the bahavior for too large
852    # `rest` arguments undefined. In practice, this shouldn't be a
853    # problem because the `rest` argument should only be used for
854    # error recovery, and in this case a valid byte count for the
855    # `rest` argument should be known.
856
857
858class TestOther(RealFTPTest):
859
860    def test_open_for_reading(self):
861        # Test for issues #17 and #51,
862        # http://ftputil.sschwarzer.net/trac/ticket/17 and
863        # http://ftputil.sschwarzer.net/trac/ticket/51 .
864        file1 = self.host.open("debian-keyring.tar.gz", "rb")
865        time.sleep(1)
866        # Depending on the FTP server, this might return a status code
867        # unexpected by `ftplib` or block the socket connection until
868        # a server-side timeout.
869        file1.close()
870
871    def test_subsequent_reading(self):
872        # Open a file for reading.
873        with self.host.open("CONTENTS", "rb") as file1:
874            pass
875        # Make sure that there are no problems if the connection is reused.
876        with self.host.open("CONTENTS", "rb") as file2:
877            pass
878        assert file1._session is file2._session
879
880    def test_names_with_spaces(self):
881        # Test if directories and files with spaces in their names
882        # can be used.
883        host = self.host
884        assert host.path.isdir("dir with spaces")
885        assert (host.listdir("dir with spaces") ==
886                ["second dir", "some file", "some_file"])
887        assert host.path.isdir("dir with spaces/second dir")
888        assert host.path.isfile("dir with spaces/some_file")
889        assert host.path.isfile("dir with spaces/some file")
890
891    def test_synchronize_times_without_write_access(self):
892        """Test failing synchronization because of non-writable directory."""
893        host = self.host
894        # This isn't writable by the ftp account the tests are run under.
895        host.chdir("rootdir1")
896        with pytest.raises(ftputil.error.TimeShiftError):
897            host.synchronize_times()
898
899    def test_listdir_with_non_ascii_byte_string(self):
900        """
901        `listdir` should accept byte strings with non-ASCII
902        characters and return non-ASCII characters in directory or
903        file names.
904        """
905        host = self.host
906        path = "äbc".encode("UTF-8")
907        names = host.listdir(path)
908        assert names[0] == b"file1"
909        assert names[1] == "file1_ö".encode("UTF-8")
910
911    def test_listdir_with_non_ascii_unicode_string(self):
912        """
913        `listdir` should accept unicode strings with non-ASCII
914        characters and return non-ASCII characters in directory or
915        file names.
916        """
917        host = self.host
918        # `ftplib` under Python 3 only works correctly if the unicode
919        # strings are decoded from latin1. Under Python 2, ftputil
920        # is supposed to provide a compatible interface.
921        path = "äbc".encode("UTF-8").decode("latin1")
922        names = host.listdir(path)
923        assert names[0] == "file1"
924        assert names[1] == "file1_ö".encode("UTF-8").decode("latin1")
925
926    def test_path_with_non_latin1_unicode_string(self):
927        """
928        ftputil operations shouldn't accept file paths with non-latin1
929        characters.
930        """
931        # Use some musical symbols. These are certainly not latin1.
932        path = "𝄞𝄢"
933        # `UnicodeEncodeError` is also the exception that `ftplib`
934        # raises if it gets a non-latin1 path.
935        with pytest.raises(UnicodeEncodeError):
936            self.host.mkdir(path)
937
938    def test_list_a_option(self):
939        # For this test to pass, the server must _not_ list "hidden"
940        # files by default but instead only when the `LIST` `-a`
941        # option is used.
942        host = self.host
943        assert not host.use_list_a_option
944        directory_entries = host.listdir(host.curdir)
945        assert ".hidden" not in directory_entries
946        # Switch on showing of hidden paths.
947        host.use_list_a_option = True
948        directory_entries = host.listdir(host.curdir)
949        assert ".hidden" in directory_entries
950
951    def _make_objects_to_be_garbage_collected(self):
952        for _ in range(10):
953            with ftputil.FTPHost(*self.login_data) as host:
954                for _ in range(10):
955                    unused_stat_result = host.stat("CONTENTS")
956                    with host.open("CONTENTS") as fobj:
957                        unused_data = fobj.read()
958
959    def test_garbage_collection(self):
960        """Test whether there are cycles which prevent garbage collection."""
961        gc.collect()
962        objects_before_test = len(gc.garbage)
963        self._make_objects_to_be_garbage_collected()
964        gc.collect()
965        objects_after_test = len(gc.garbage)
966        assert not objects_after_test - objects_before_test
967
968    @pytest.mark.skipif(
969      ftputil.compat.python_version > 2,
970      reason="test requires M2Crypto which only works on Python 2")
971    def test_m2crypto_session(self):
972        """
973        Test if a session with `M2Crypto.ftpslib.FTP_TLS` is set up
974        correctly and works with unicode input.
975        """
976        # See ticket #78.
977        #
978        # M2Crypto is only available for Python 2.
979        import M2Crypto
980        factory = ftputil.session.session_factory(
981                    base_class=M2Crypto.ftpslib.FTP_TLS,
982                    encrypt_data_channel=True)
983        with ftputil.FTPHost(*self.login_data, session_factory=factory) as host:
984            # Test if unicode argument works.
985            files = host.listdir(".")
986        assert "CONTENTS" in files
Note: See TracBrowser for help on using the repository browser.