source: test/test_real_ftp.py @ 1721:3557f65ded13

Last change on this file since 1721:3557f65ded13 was 1721:3557f65ded13, checked in by Stefan Schwarzer <sschwarzer@…>, 12 months ago
Remove `compat.py` This module was for Python 2/3 compatibility, similar to the `six` package. Since the next version of ftputil will only support Python 3, hardcode the types from `compat.py` in the code that used to use the `compat` module. There may still be redundant code that isn't needed when running the tests under Python 3.
File size: 35.1 KB
Line 
1# Copyright (C) 2003-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5# Execute tests on a real FTP server (other tests use a mock server).
6#
7# This test writes some files and directories on the local client and
8# the remote server. You'll need write access in the login directory.
9# This test can take a few minutes because it has to wait to test the
10# timezone calculation.
11
12import ftplib
13import functools
14import gc
15import operator
16import os
17import time
18import stat
19
20import pytest
21
22import ftputil.error
23import ftputil.file_transfer
24import ftputil.session
25import ftputil.stat_cache
26
27import test
28
29
30def utc_local_time_shift():
31    """
32    Return the expected time shift in seconds assuming the server
33    uses UTC in its listings and the client uses local time.
34
35    This is needed because Pure-FTPd meanwhile seems to insist that
36    the displayed time for files is in UTC.
37    """
38    utc_tuple = time.gmtime()
39    localtime_tuple = time.localtime()
40    # To calculate the correct times shift, we need to ignore the
41    # DST component in the localtime tuple, i. e. set it to 0.
42    localtime_tuple = localtime_tuple[:-1] + (0,)
43    time_shift_in_seconds = (time.mktime(utc_tuple) -
44                             time.mktime(localtime_tuple))
45    # To be safe, round the above value to units of 3600 s (1 hour).
46    return round(time_shift_in_seconds / 3600.0) * 3600
47
48# Difference between local times of server and client. If 0.0, server
49# and client use the same timezone.
50#EXPECTED_TIME_SHIFT = utc_local_time_shift()
51# Pure-FTPd seems to have changed its mind (see docstring of
52# `utc_local_time_shift`).
53EXPECTED_TIME_SHIFT = 0.0
54
55
56class Cleaner:
57    """
58    This class helps remove directories and files which might
59    otherwise be left behind if a test fails in unexpected ways.
60    """
61
62    def __init__(self, host):
63        # The test class (probably `RealFTPTest`) and the helper
64        # class share the same `FTPHost` object.
65        self._host = host
66        self._ftp_items = []
67
68    def add_dir(self, path):
69        """Schedule a directory with path `path` for removal."""
70        self._ftp_items.append(("d", self._host.path.abspath(path)))
71
72    def add_file(self, path):
73        """Schedule a file with path `path` for removal."""
74        self._ftp_items.append(("f", self._host.path.abspath(path)))
75
76    def clean(self):
77        """
78        Remove the directories and files previously remembered.
79        The removal works in reverse order of the scheduling with
80        `add_dir` and `add_file`.
81
82        Errors due to a removal are ignored.
83        """
84        self._host.chdir("/")
85        for type_, path in reversed(self._ftp_items):
86            try:
87                if type_ == "d":
88                    # If something goes wrong in `rmtree` we might
89                    # leave a mess behind.
90                    self._host.rmtree(path)
91                elif type_ == "f":
92                    # Minor mess if `remove` fails
93                    self._host.remove(path)
94            except ftputil.error.FTPError:
95                pass
96
97
98class RealFTPTest:
99
100    def setup_method(self, method):
101        # Server, username, password.
102        self.login_data = ("localhost", "ftptest",
103                            "d605581757de5eb56d568a4419f4126e")
104        self.host = ftputil.FTPHost(*self.login_data)
105        self.cleaner = Cleaner(self.host)
106
107    def teardown_method(self, method):
108        self.cleaner.clean()
109        self.host.close()
110
111    #
112    # Helper methods
113    #
114    def make_remote_file(self, path):
115        """Create a file on the FTP host."""
116        self.cleaner.add_file(path)
117        with self.host.open(path, "wb") as file_:
118            # Write something. Otherwise the FTP server might not update
119            # the time of last modification if the file existed before.
120            file_.write(b"\n")
121
122    def make_local_file(self):
123        """Create a file on the local host (= on the client side)."""
124        with open("_local_file_", "wb") as fobj:
125            fobj.write(b"abc\x12\x34def\t")
126
127
128class TestMkdir(RealFTPTest):
129
130    def test_mkdir_rmdir(self):
131        host = self.host
132        dir_name = "_testdir_"
133        file_name = host.path.join(dir_name, "_nonempty_")
134        self.cleaner.add_dir(dir_name)
135        # Make dir and check if the directory is there.
136        host.mkdir(dir_name)
137        files = host.listdir(host.curdir)
138        assert dir_name in files
139        # Try to remove a non-empty directory.
140        self.cleaner.add_file(file_name)
141        non_empty = host.open(file_name, "w")
142        non_empty.close()
143        with pytest.raises(ftputil.error.PermanentError):
144            host.rmdir(dir_name)
145        # Remove file.
146        host.unlink(file_name)
147        # `remove` on a directory should fail.
148        try:
149            try:
150                host.remove(dir_name)
151            except ftputil.error.PermanentError as exc:
152                assert str(exc).startswith(
153                         "remove/unlink can only delete files")
154            else:
155                pytest.fail("we shouldn't have come here")
156        finally:
157            # Delete empty directory.
158            host.rmdir(dir_name)
159        files = host.listdir(host.curdir)
160        assert dir_name not in files
161
162    def test_makedirs_without_existing_dirs(self):
163        host = self.host
164        # No `_dir1_` yet
165        assert "_dir1_" not in host.listdir(host.curdir)
166        # Vanilla case, all should go well.
167        host.makedirs("_dir1_/dir2/dir3/dir4")
168        self.cleaner.add_dir("_dir1_")
169        # Check host.
170        assert host.path.isdir("_dir1_")
171        assert host.path.isdir("_dir1_/dir2")
172        assert host.path.isdir("_dir1_/dir2/dir3")
173        assert host.path.isdir("_dir1_/dir2/dir3/dir4")
174
175    def test_makedirs_from_non_root_directory(self):
176        # This is a testcase for issue #22, see
177        # http://ftputil.sschwarzer.net/trac/ticket/22 .
178        host = self.host
179        # No `_dir1_` and `_dir2_` yet
180        assert "_dir1_" not in host.listdir(host.curdir)
181        assert "_dir2_" not in host.listdir(host.curdir)
182        # Part 1: Try to make directories starting from `_dir1_` and
183        # change to non-root directory.
184        self.cleaner.add_dir("_dir1_")
185        host.mkdir("_dir1_")
186        host.chdir("_dir1_")
187        host.makedirs("_dir2_/_dir3_")
188        # Test for expected directory hierarchy.
189        assert host.path.isdir("/_dir1_")
190        assert host.path.isdir("/_dir1_/_dir2_")
191        assert host.path.isdir("/_dir1_/_dir2_/_dir3_")
192        assert not host.path.isdir("/_dir1_/_dir1_")
193        # Remove all but the directory we're in.
194        host.rmdir("/_dir1_/_dir2_/_dir3_")
195        host.rmdir("/_dir1_/_dir2_")
196        # Part 2: Try to make directories starting from root.
197        self.cleaner.add_dir("/_dir2_")
198        host.makedirs("/_dir2_/_dir3_")
199        # Test for expected directory hierarchy
200        assert host.path.isdir("/_dir2_")
201        assert host.path.isdir("/_dir2_/_dir3_")
202        assert not host.path.isdir("/_dir1_/_dir2_")
203
204    def test_makedirs_of_existing_directory(self):
205        host = self.host
206        # The (chrooted) login directory
207        host.makedirs("/")
208
209    def test_makedirs_with_file_in_the_way(self):
210        host = self.host
211        self.cleaner.add_dir("_dir1_")
212        host.mkdir("_dir1_")
213        self.make_remote_file("_dir1_/file1")
214        # Try it.
215        with pytest.raises(ftputil.error.PermanentError):
216            host.makedirs("_dir1_/file1")
217        with pytest.raises(ftputil.error.PermanentError):
218            host.makedirs("_dir1_/file1/dir2")
219
220    def test_makedirs_with_existing_directory(self):
221        host = self.host
222        self.cleaner.add_dir("_dir1_")
223        host.mkdir("_dir1_")
224        host.makedirs("_dir1_/dir2")
225        # Check
226        assert host.path.isdir("_dir1_")
227        assert host.path.isdir("_dir1_/dir2")
228
229    def test_makedirs_in_non_writable_directory(self):
230        host = self.host
231        # Preparation: `rootdir1` exists but is only writable by root.
232        with pytest.raises(ftputil.error.PermanentError):
233            host.makedirs("rootdir1/dir2")
234
235    def test_makedirs_with_writable_directory_at_end(self):
236        host = self.host
237        self.cleaner.add_dir("rootdir2/dir2")
238        # Preparation: `rootdir2` exists but is only writable by root.
239        # `dir2` is writable by regular ftp users. Both directories
240        # below should work.
241        host.makedirs("rootdir2/dir2")
242        host.makedirs("rootdir2/dir2/dir3")
243
244
245class TestRemoval(RealFTPTest):
246
247    def test_rmtree_without_error_handler(self):
248        host = self.host
249        # Build a tree.
250        self.cleaner.add_dir("_dir1_")
251        host.makedirs("_dir1_/dir2")
252        self.make_remote_file("_dir1_/file1")
253        self.make_remote_file("_dir1_/file2")
254        self.make_remote_file("_dir1_/dir2/file3")
255        self.make_remote_file("_dir1_/dir2/file4")
256        # Try to remove a _file_ with `rmtree`.
257        with pytest.raises(ftputil.error.PermanentError):
258            host.rmtree("_dir1_/file2")
259        # Remove `dir2`.
260        host.rmtree("_dir1_/dir2")
261        assert not host.path.exists("_dir1_/dir2")
262        assert host.path.exists("_dir1_/file2")
263        # Re-create `dir2` and remove `_dir1_`.
264        host.mkdir("_dir1_/dir2")
265        self.make_remote_file("_dir1_/dir2/file3")
266        self.make_remote_file("_dir1_/dir2/file4")
267        host.rmtree("_dir1_")
268        assert not host.path.exists("_dir1_")
269
270    def test_rmtree_with_error_handler(self):
271        host = self.host
272        self.cleaner.add_dir("_dir1_")
273        host.mkdir("_dir1_")
274        self.make_remote_file("_dir1_/file1")
275        # Prepare error "handler"
276        log = []
277        def error_handler(*args):
278            log.append(args)
279        # Try to remove a file as root "directory".
280        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
281        assert log == []
282        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
283        assert log[0][0] == host.listdir
284        assert log[0][1] == "_dir1_/file1"
285        assert log[1][0] == host.rmdir
286        assert log[1][1] == "_dir1_/file1"
287        host.rmtree("_dir1_")
288        # Try to remove a non-existent directory.
289        del log[:]
290        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
291        assert log[0][0] == host.listdir
292        assert log[0][1] == "_dir1_"
293        assert log[1][0] == host.rmdir
294        assert log[1][1] == "_dir1_"
295
296    def test_remove_non_existent_item(self):
297        host = self.host
298        with pytest.raises(ftputil.error.PermanentError):
299            host.remove("nonexistent")
300
301    def test_remove_existing_file(self):
302        self.cleaner.add_file("_testfile_")
303        self.make_remote_file("_testfile_")
304        host = self.host
305        assert host.path.isfile("_testfile_")
306        host.remove("_testfile_")
307        assert not host.path.exists("_testfile_")
308
309
310class TestWalk(RealFTPTest):
311    """
312    Walk the directory tree
313
314      walk_test
315      ├── dir1
316      │   ├── dir11
317      │   └── dir12
318      │       ├── dir123
319      │       │   └── file1234
320      │       ├── file121
321      │       └── file122
322      ├── dir2
323      ├── dir3
324      │   ├── dir31
325      │   ├── dir32 -> ../dir1/dir12/dir123
326      │   ├── file31
327      │   └── file32
328      └── file4
329
330    and check if the results are the expected ones.
331    """
332
333    def _walk_test(self, expected_result, **walk_kwargs):
334        """Walk the directory and test results."""
335        # Collect data using `walk`.
336        actual_result = []
337        for items in self.host.walk(**walk_kwargs):
338            actual_result.append(items)
339        # Compare with expected results.
340        assert len(actual_result) == len(expected_result)
341        for index, _ in enumerate(actual_result):
342            assert actual_result[index] == expected_result[index]
343
344    def test_walk_topdown(self):
345        # Preparation: build tree in directory `walk_test`.
346        expected_result = [
347          ("walk_test",
348           ["dir1", "dir2", "dir3"],
349           ["file4"]),
350          #
351          ("walk_test/dir1",
352           ["dir11", "dir12"],
353           []),
354          #
355          ("walk_test/dir1/dir11",
356           [],
357           []),
358          #
359          ("walk_test/dir1/dir12",
360           ["dir123"],
361           ["file121", "file122"]),
362          #
363          ("walk_test/dir1/dir12/dir123",
364           [],
365           ["file1234"]),
366          #
367          ("walk_test/dir2",
368           [],
369           []),
370          #
371          ("walk_test/dir3",
372           ["dir31", "dir32"],
373           ["file31", "file32"]),
374          #
375          ("walk_test/dir3/dir31",
376           [],
377           []),
378          ]
379        self._walk_test(expected_result, top="walk_test")
380
381    def test_walk_depth_first(self):
382        # Preparation: build tree in directory `walk_test`
383        expected_result = [
384          ("walk_test/dir1/dir11",
385           [],
386           []),
387          #
388          ("walk_test/dir1/dir12/dir123",
389           [],
390           ["file1234"]),
391          #
392          ("walk_test/dir1/dir12",
393           ["dir123"],
394           ["file121", "file122"]),
395          #
396          ("walk_test/dir1",
397           ["dir11", "dir12"],
398           []),
399          #
400          ("walk_test/dir2",
401           [],
402           []),
403          #
404          ("walk_test/dir3/dir31",
405           [],
406           []),
407          #
408          ("walk_test/dir3",
409           ["dir31", "dir32"],
410           ["file31", "file32"]),
411          #
412          ("walk_test",
413           ["dir1", "dir2", "dir3"],
414           ["file4"])
415          ]
416        self._walk_test(expected_result, top="walk_test", topdown=False)
417
418    def test_walk_following_links(self):
419        # Preparation: build tree in directory `walk_test`.
420        expected_result = [
421          ("walk_test",
422           ["dir1", "dir2", "dir3"],
423           ["file4"]),
424          #
425          ("walk_test/dir1",
426           ["dir11", "dir12"],
427           []),
428          #
429          ("walk_test/dir1/dir11",
430           [],
431           []),
432          #
433          ("walk_test/dir1/dir12",
434           ["dir123"],
435           ["file121", "file122"]),
436          #
437          ("walk_test/dir1/dir12/dir123",
438           [],
439           ["file1234"]),
440          #
441          ("walk_test/dir2",
442           [],
443           []),
444          #
445          ("walk_test/dir3",
446           ["dir31", "dir32"],
447           ["file31", "file32"]),
448          #
449          ("walk_test/dir3/dir31",
450           [],
451           []),
452          #
453          ("walk_test/dir3/dir32",
454           [],
455           ["file1234"]),
456          ]
457        self._walk_test(expected_result, top="walk_test", followlinks=True)
458
459
460class TestRename(RealFTPTest):
461
462    def test_rename(self):
463        host = self.host
464        # Make sure the target of the renaming operation is removed.
465        self.cleaner.add_file("_testfile2_")
466        self.make_remote_file("_testfile1_")
467        host.rename("_testfile1_", "_testfile2_")
468        assert not host.path.exists("_testfile1_")
469        assert host.path.exists("_testfile2_")
470
471    def test_rename_with_spaces_in_directory(self):
472        host = self.host
473        dir_name = "_dir with spaces_"
474        self.cleaner.add_dir(dir_name)
475        host.mkdir(dir_name)
476        self.make_remote_file(dir_name + "/testfile1")
477        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
478        assert not host.path.exists(dir_name + "/testfile1")
479        assert host.path.exists(dir_name + "/testfile2")
480
481
482class TestStat(RealFTPTest):
483
484    def test_stat(self):
485        host = self.host
486        dir_name = "_testdir_"
487        file_name = host.path.join(dir_name, "_nonempty_")
488        # Make a directory and a file in it.
489        self.cleaner.add_dir(dir_name)
490        host.mkdir(dir_name)
491        with host.open(file_name, "wb") as fobj:
492            fobj.write(b"abc\x12\x34def\t")
493        # Do some stats
494        # - dir
495        dir_stat = host.stat(dir_name)
496        assert isinstance(dir_stat._st_name, str)
497        assert host.listdir(dir_name) == ["_nonempty_"]
498        assert host.path.isdir(dir_name)
499        assert not host.path.isfile(dir_name)
500        assert not host.path.islink(dir_name)
501        # - file
502        file_stat = host.stat(file_name)
503        assert isinstance(file_stat._st_name, str)
504        assert not host.path.isdir(file_name)
505        assert host.path.isfile(file_name)
506        assert not host.path.islink(file_name)
507        assert host.path.getsize(file_name) == 9
508        # - file's modification time; allow up to two minutes difference
509        host.synchronize_times()
510        server_mtime = host.path.getmtime(file_name)
511        client_mtime = time.mktime(time.localtime())
512        calculated_time_shift = server_mtime - client_mtime
513        assert not abs(calculated_time_shift-host.time_shift()) > 120
514
515    def test_issomething_for_nonexistent_directory(self):
516        host = self.host
517        # Check if we get the right results if even the containing
518        # directory doesn't exist (see ticket #66).
519        nonexistent_path = "/nonexistent/nonexistent"
520        assert not host.path.isdir(nonexistent_path)
521        assert not host.path.isfile(nonexistent_path)
522        assert not host.path.islink(nonexistent_path)
523
524    def test_special_broken_link(self):
525        # Test for ticket #39.
526        host = self.host
527        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
528        assert (host.lstat(broken_link_name)._st_target ==
529                "../nonexistent/nonexistent")
530        assert not host.path.isdir(broken_link_name)
531        assert not host.path.isfile(broken_link_name)
532        assert host.path.islink(broken_link_name)
533
534    def test_concurrent_access(self):
535        self.make_remote_file("_testfile_")
536        with ftputil.FTPHost(*self.login_data) as host1:
537            with ftputil.FTPHost(*self.login_data) as host2:
538                stat_result1 = host1.stat("_testfile_")
539                stat_result2 = host2.stat("_testfile_")
540                assert stat_result1 == stat_result2
541                host2.remove("_testfile_")
542                # Can still get the result via `host1`
543                stat_result1 = host1.stat("_testfile_")
544                assert stat_result1 == stat_result2
545                # Stat'ing on `host2` gives an exception.
546                with pytest.raises(ftputil.error.PermanentError):
547                    host2.stat("_testfile_")
548                # Stat'ing on `host1` after invalidation
549                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
550                host1.stat_cache.invalidate(absolute_path)
551                with pytest.raises(ftputil.error.PermanentError):
552                    host1.stat("_testfile_")
553
554    def test_cache_auto_resizing(self):
555        """Test if the cache is resized appropriately."""
556        host = self.host
557        cache = host.stat_cache._cache
558        # Make sure the cache size isn't adjusted towards smaller values.
559        unused_entries = host.listdir("walk_test")
560        assert cache.size == ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE
561        # Make the cache very small initially and see if it gets resized.
562        cache.size = 2
563        entries = host.listdir("walk_test")
564        # The adjusted cache size should be larger or equal to the
565        # number of items in `walk_test` and its parent directory. The
566        # latter is read implicitly upon `listdir`'s `isdir` call.
567        expected_min_cache_size = max(len(host.listdir(host.curdir)),
568                                      len(entries))
569        assert cache.size >= expected_min_cache_size
570
571
572class TestUploadAndDownload(RealFTPTest):
573    """Test upload and download (including time shift test)."""
574
575    def test_time_shift(self):
576        self.host.synchronize_times()
577        assert self.host.time_shift() == EXPECTED_TIME_SHIFT
578
579    @pytest.mark.slow_test
580    def test_upload(self):
581        host = self.host
582        host.synchronize_times()
583        local_file = "_local_file_"
584        remote_file = "_remote_file_"
585        # Make local file to upload.
586        self.make_local_file()
587        # Wait, else small time differences between client and server
588        # actually could trigger the update.
589        time.sleep(65)
590        try:
591            self.cleaner.add_file(remote_file)
592            host.upload(local_file, remote_file)
593            # Retry; shouldn't be uploaded
594            uploaded = host.upload_if_newer(local_file, remote_file)
595            assert uploaded is False
596            # Rewrite the local file.
597            self.make_local_file()
598            # Retry; should be uploaded now
599            uploaded = host.upload_if_newer(local_file, remote_file)
600            assert uploaded is True
601        finally:
602            # Clean up
603            os.unlink(local_file)
604
605    @pytest.mark.slow_test
606    def test_download(self):
607        host = self.host
608        host.synchronize_times()
609        local_file = "_local_file_"
610        remote_file = "_remote_file_"
611        # Make a remote file.
612        self.make_remote_file(remote_file)
613        # File should be downloaded as it's not present yet.
614        downloaded = host.download_if_newer(remote_file, local_file)
615        assert downloaded is True
616        try:
617            # If the remote file, taking the datetime precision into
618            # account, _might_ be newer, the file will be downloaded
619            # again. To prevent this, wait a bit over a minute (the
620            # remote precision), then "touch" the local file.
621            time.sleep(65)
622            # Create empty file.
623            with open(local_file, "w") as fobj:
624                pass
625            # Local file is present and newer, so shouldn't download.
626            downloaded = host.download_if_newer(remote_file, local_file)
627            assert downloaded is False
628            # Re-make the remote file.
629            self.make_remote_file(remote_file)
630            # Local file is present but possibly older (taking the
631            # possible deviation because of the precision into account),
632            # so should download.
633            downloaded = host.download_if_newer(remote_file, local_file)
634            assert downloaded is True
635        finally:
636            # Clean up.
637            os.unlink(local_file)
638
639    def test_callback_with_transfer(self):
640        host = self.host
641        FILE_NAME = "debian-keyring.tar.gz"
642        # Default chunk size as in `FTPHost.copyfileobj`
643        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
644        file_size = host.path.getsize(FILE_NAME)
645        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
646        # Add one chunk for remainder.
647        chunk_count += 1
648        # Define a callback that just collects all data passed to it.
649        transferred_chunks_list = []
650        def test_callback(chunk):
651            transferred_chunks_list.append(chunk)
652        try:
653            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
654            # Construct a list of data chunks we expect.
655            expected_chunks_list = []
656            with open(FILE_NAME, "rb") as downloaded_fobj:
657                while True:
658                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
659                    if not chunk:
660                        break
661                    expected_chunks_list.append(chunk)
662            # Examine data collected by callback function.
663            assert len(transferred_chunks_list) == chunk_count
664            assert transferred_chunks_list == expected_chunks_list
665        finally:
666            os.unlink(FILE_NAME)
667
668
669class TestFTPFiles(RealFTPTest):
670
671    def test_only_closed_children(self):
672        REMOTE_FILE_NAME = "CONTENTS"
673        host = self.host
674        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
675            # Create empty file and close it.
676            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
677                pass
678            # This should re-use the second child because the first isn't
679            # closed but the second is.
680            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
681                assert len(host._children) == 2
682                assert file_obj._host is host._children[1]
683
684    def test_no_timed_out_children(self):
685        REMOTE_FILE_NAME = "CONTENTS"
686        host = self.host
687        # Implicitly create child host object.
688        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
689            pass
690        # Monkey-patch file to simulate an FTP server timeout below.
691        def timed_out_pwd():
692            raise ftplib.error_temp("simulated timeout")
693        file_obj1._host._session.pwd = timed_out_pwd
694        # Try to get a file - which shouldn't be the timed-out file.
695        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
696            assert file_obj1 is not file_obj2
697        # Re-use closed and not timed-out child session.
698        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
699            pass
700        assert file_obj2 is file_obj3
701
702    def test_no_delayed_226_children(self):
703        REMOTE_FILE_NAME = "CONTENTS"
704        host = self.host
705        # Implicitly create child host object.
706        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
707            pass
708        # Monkey-patch file to simulate an FTP server timeout below.
709        def timed_out_pwd():
710            raise ftplib.error_reply("delayed 226 reply")
711        file_obj1._host._session.pwd = timed_out_pwd
712        # Try to get a file - which shouldn't be the timed-out file.
713        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
714            assert file_obj1 is not file_obj2
715        # Re-use closed and not timed-out child session.
716        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
717            pass
718        assert file_obj2 is file_obj3
719
720
721class TestChmod(RealFTPTest):
722
723    def assert_mode(self, path, expected_mode):
724        """
725        Return an integer containing the allowed bits in the mode
726        change command.
727
728        The `FTPHost` object to test against is `self.host`.
729        """
730        full_mode = self.host.stat(path).st_mode
731        # Remove flags we can't set via `chmod`.
732        # Allowed flags according to Python documentation
733        # https://docs.python.org/library/stat.html
734        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
735          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
736          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
737          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
738          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
739        allowed_mask = functools.reduce(operator.or_, allowed_flags)
740        mode = full_mode & allowed_mask
741        assert mode == expected_mode, (
742                 "mode {0:o} != {1:o}".format(mode, expected_mode))
743
744    def test_chmod_existing_directory(self):
745        host = self.host
746        host.mkdir("_test dir_")
747        self.cleaner.add_dir("_test dir_")
748        # Set/get mode of a directory.
749        host.chmod("_test dir_", 0o757)
750        self.assert_mode("_test dir_", 0o757)
751        # Set/get mode in nested directory.
752        host.mkdir("_test dir_/nested_dir")
753        self.cleaner.add_dir("_test dir_/nested_dir")
754        host.chmod("_test dir_/nested_dir", 0o757)
755        self.assert_mode("_test dir_/nested_dir", 0o757)
756
757    def test_chmod_existing_file(self):
758        host = self.host
759        host.mkdir("_test dir_")
760        self.cleaner.add_dir("_test dir_")
761        # Set/get mode on a file.
762        file_name = host.path.join("_test dir_", "_testfile_")
763        self.make_remote_file(file_name)
764        host.chmod(file_name, 0o646)
765        self.assert_mode(file_name, 0o646)
766
767    def test_chmod_nonexistent_path(self):
768        # Set/get mode of a non-existing item.
769        with pytest.raises(ftputil.error.PermanentError):
770            self.host.chmod("nonexistent", 0o757)
771
772    def test_cache_invalidation(self):
773        host = self.host
774        host.mkdir("_test dir_")
775        self.cleaner.add_dir("_test dir_")
776        # Make sure the mode is in the cache.
777        unused_stat_result = host.stat("_test dir_")
778        # Set/get mode of the directory.
779        host.chmod("_test dir_", 0o757)
780        self.assert_mode("_test dir_", 0o757)
781        # Set/get mode on a file.
782        file_name = host.path.join("_test dir_", "_testfile_")
783        self.make_remote_file(file_name)
784        # Make sure the mode is in the cache.
785        unused_stat_result = host.stat(file_name)
786        host.chmod(file_name, 0o646)
787        self.assert_mode(file_name, 0o646)
788
789
790class TestRestArgument(RealFTPTest):
791
792    TEST_FILE_NAME = "rest_test"
793
794    def setup_method(self, method):
795        super(TestRestArgument, self).setup_method(method)
796        # Write test file.
797        with self.host.open(self.TEST_FILE_NAME, "wb") as fobj:
798            fobj.write(b"abcdefghijkl")
799        self.cleaner.add_file(self.TEST_FILE_NAME)
800
801    def test_for_reading(self):
802        """
803        If a `rest` argument is passed to `open`, the following read
804        operation should start at the byte given by `rest`.
805        """
806        with self.host.open(self.TEST_FILE_NAME, "rb", rest=3) as fobj:
807            data = fobj.read()
808        assert data == b"defghijkl"
809
810    def test_for_writing(self):
811        """
812        If a `rest` argument is passed to `open`, the following write
813        operation should start writing at the byte given by `rest`.
814        """
815        with self.host.open(self.TEST_FILE_NAME, "wb", rest=3) as fobj:
816            fobj.write(b"123")
817        with self.host.open(self.TEST_FILE_NAME, "rb") as fobj:
818            data = fobj.read()
819        assert data == b"abc123"
820
821    def test_invalid_read_from_text_file(self):
822        """
823        If the `rest` argument is used for reading from a text file,
824        a `CommandNotImplementedError` should be raised.
825        """
826        with pytest.raises(ftputil.error.CommandNotImplementedError):
827            self.host.open(self.TEST_FILE_NAME, "r", rest=3)
828
829    def test_invalid_write_to_text_file(self):
830        """
831        If the `rest` argument is used for reading from a text file,
832        a `CommandNotImplementedError` should be raised.
833        """
834        with pytest.raises(ftputil.error.CommandNotImplementedError):
835            self.host.open(self.TEST_FILE_NAME, "w", rest=3)
836
837    # There are no tests for reading and writing beyond the end of a
838    # file. For example, if the remote file is 10 bytes long and
839    # `open(remote_file, "rb", rest=100)` is used, the server may
840    # return an error status code or not.
841    #
842    # The server I use for testing returns a 554 status when
843    # attempting to _read_ beyond the end of the file. On the other
844    # hand, if attempting to _write_ beyond the end of the file, the
845    # server accepts the request, but starts writing after the end of
846    # the file, i. e. appends to the file.
847    #
848    # Instead of expecting certain responses that may differ between
849    # server implementations, I leave the bahavior for too large
850    # `rest` arguments undefined. In practice, this shouldn't be a
851    # problem because the `rest` argument should only be used for
852    # error recovery, and in this case a valid byte count for the
853    # `rest` argument should be known.
854
855
856class TestOther(RealFTPTest):
857
858    def test_open_for_reading(self):
859        # Test for issues #17 and #51,
860        # http://ftputil.sschwarzer.net/trac/ticket/17 and
861        # http://ftputil.sschwarzer.net/trac/ticket/51 .
862        file1 = self.host.open("debian-keyring.tar.gz", "rb")
863        time.sleep(1)
864        # Depending on the FTP server, this might return a status code
865        # unexpected by `ftplib` or block the socket connection until
866        # a server-side timeout.
867        file1.close()
868
869    def test_subsequent_reading(self):
870        # Open a file for reading.
871        with self.host.open("CONTENTS", "rb") as file1:
872            pass
873        # Make sure that there are no problems if the connection is reused.
874        with self.host.open("CONTENTS", "rb") as file2:
875            pass
876        assert file1._session is file2._session
877
878    def test_names_with_spaces(self):
879        # Test if directories and files with spaces in their names
880        # can be used.
881        host = self.host
882        assert host.path.isdir("dir with spaces")
883        assert (host.listdir("dir with spaces") ==
884                ["second dir", "some file", "some_file"])
885        assert host.path.isdir("dir with spaces/second dir")
886        assert host.path.isfile("dir with spaces/some_file")
887        assert host.path.isfile("dir with spaces/some file")
888
889    def test_synchronize_times_without_write_access(self):
890        """Test failing synchronization because of non-writable directory."""
891        host = self.host
892        # This isn't writable by the ftp account the tests are run under.
893        host.chdir("rootdir1")
894        with pytest.raises(ftputil.error.TimeShiftError):
895            host.synchronize_times()
896
897    def test_listdir_with_non_ascii_byte_string(self):
898        """
899        `listdir` should accept byte strings with non-ASCII
900        characters and return non-ASCII characters in directory or
901        file names.
902        """
903        host = self.host
904        path = "äbc".encode("UTF-8")
905        names = host.listdir(path)
906        assert names[0] == b"file1"
907        assert names[1] == "file1_ö".encode("UTF-8")
908
909    def test_listdir_with_non_ascii_unicode_string(self):
910        """
911        `listdir` should accept unicode strings with non-ASCII
912        characters and return non-ASCII characters in directory or
913        file names.
914        """
915        host = self.host
916        # `ftplib` under Python 3 only works correctly if the unicode
917        # strings are decoded from latin1.
918        path = "äbc".encode("UTF-8").decode("latin1")
919        names = host.listdir(path)
920        assert names[0] == "file1"
921        assert names[1] == "file1_ö".encode("UTF-8").decode("latin1")
922
923    def test_path_with_non_latin1_unicode_string(self):
924        """
925        ftputil operations shouldn't accept file paths with non-latin1
926        characters.
927        """
928        # Use some musical symbols. These are certainly not latin1.
929        path = "𝄞𝄢"
930        # `UnicodeEncodeError` is also the exception that `ftplib`
931        # raises if it gets a non-latin1 path.
932        with pytest.raises(UnicodeEncodeError):
933            self.host.mkdir(path)
934
935    def test_list_a_option(self):
936        # For this test to pass, the server must _not_ list "hidden"
937        # files by default but instead only when the `LIST` `-a`
938        # option is used.
939        host = self.host
940        assert not host.use_list_a_option
941        directory_entries = host.listdir(host.curdir)
942        assert ".hidden" not in directory_entries
943        # Switch on showing of hidden paths.
944        host.use_list_a_option = True
945        directory_entries = host.listdir(host.curdir)
946        assert ".hidden" in directory_entries
947
948    def _make_objects_to_be_garbage_collected(self):
949        for _ in range(10):
950            with ftputil.FTPHost(*self.login_data) as host:
951                for _ in range(10):
952                    unused_stat_result = host.stat("CONTENTS")
953                    with host.open("CONTENTS") as fobj:
954                        unused_data = fobj.read()
955
956    def test_garbage_collection(self):
957        """Test whether there are cycles which prevent garbage collection."""
958        gc.collect()
959        objects_before_test = len(gc.garbage)
960        self._make_objects_to_be_garbage_collected()
961        gc.collect()
962        objects_after_test = len(gc.garbage)
963        assert not objects_after_test - objects_before_test
Note: See TracBrowser for help on using the repository browser.