source: test/test_real_ftp.py @ 1711:766c15f83205

Last change on this file since 1711:766c15f83205 was 1711:766c15f83205, checked in by Stefan Schwarzer <sschwarzer@…>, 11 months ago
Set `use_list_a_option` to `False` by default See ticket #110 for the reasoning.
File size: 36.1 KB
Line 
1# encoding: UTF-8
2# Copyright (C) 2003-2016, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7#
8# This test writes some files and directories on the local client and
9# the remote server. You'll need write access in the login directory.
10# This test can take a few minutes because it has to wait to test the
11# timezone calculation.
12
13from __future__ import absolute_import
14from __future__ import unicode_literals
15
16import ftplib
17import functools
18import gc
19import operator
20import os
21import time
22import stat
23
24import pytest
25
26import ftputil.compat
27import ftputil.error
28import ftputil.file_transfer
29import ftputil.session
30import ftputil.stat_cache
31
32import test
33
34
35def utc_local_time_shift():
36    """
37    Return the expected time shift in seconds assuming the server
38    uses UTC in its listings and the client uses local time.
39
40    This is needed because Pure-FTPd meanwhile seems to insist that
41    the displayed time for files is in UTC.
42    """
43    utc_tuple = time.gmtime()
44    localtime_tuple = time.localtime()
45    # To calculate the correct times shift, we need to ignore the
46    # DST component in the localtime tuple, i. e. set it to 0.
47    localtime_tuple = localtime_tuple[:-1] + (0,)
48    time_shift_in_seconds = (time.mktime(utc_tuple) -
49                             time.mktime(localtime_tuple))
50    # To be safe, round the above value to units of 3600 s (1 hour).
51    return round(time_shift_in_seconds / 3600.0) * 3600
52
53# Difference between local times of server and client. If 0.0, server
54# and client use the same timezone.
55#EXPECTED_TIME_SHIFT = utc_local_time_shift()
56# Pure-FTPd seems to have changed its mind (see docstring of
57# `utc_local_time_shift`).
58EXPECTED_TIME_SHIFT = 0.0
59
60
61class Cleaner(object):
62    """
63    This class helps remove directories and files which might
64    otherwise be left behind if a test fails in unexpected ways.
65    """
66
67    def __init__(self, host):
68        # The test class (probably `RealFTPTest`) and the helper
69        # class share the same `FTPHost` object.
70        self._host = host
71        self._ftp_items = []
72
73    def add_dir(self, path):
74        """Schedule a directory with path `path` for removal."""
75        self._ftp_items.append(("d", self._host.path.abspath(path)))
76
77    def add_file(self, path):
78        """Schedule a file with path `path` for removal."""
79        self._ftp_items.append(("f", self._host.path.abspath(path)))
80
81    def clean(self):
82        """
83        Remove the directories and files previously remembered.
84        The removal works in reverse order of the scheduling with
85        `add_dir` and `add_file`.
86
87        Errors due to a removal are ignored.
88        """
89        self._host.chdir("/")
90        for type_, path in reversed(self._ftp_items):
91            try:
92                if type_ == "d":
93                    # If something goes wrong in `rmtree` we might
94                    # leave a mess behind.
95                    self._host.rmtree(path)
96                elif type_ == "f":
97                    # Minor mess if `remove` fails
98                    self._host.remove(path)
99            except ftputil.error.FTPError:
100                pass
101
102
103class RealFTPTest(object):
104
105    def setup_method(self, method):
106        # Server, username, password.
107        self.login_data = ("localhost", "ftptest",
108                            "d605581757de5eb56d568a4419f4126e")
109        self.host = ftputil.FTPHost(*self.login_data)
110        self.cleaner = Cleaner(self.host)
111
112    def teardown_method(self, method):
113        self.cleaner.clean()
114        self.host.close()
115
116    #
117    # Helper methods
118    #
119    def make_remote_file(self, path):
120        """Create a file on the FTP host."""
121        self.cleaner.add_file(path)
122        with self.host.open(path, "wb") as file_:
123            # Write something. Otherwise the FTP server might not update
124            # the time of last modification if the file existed before.
125            file_.write(b"\n")
126
127    def make_local_file(self):
128        """Create a file on the local host (= on the client side)."""
129        with open("_local_file_", "wb") as fobj:
130            fobj.write(b"abc\x12\x34def\t")
131
132
133class TestMkdir(RealFTPTest):
134
135    def test_mkdir_rmdir(self):
136        host = self.host
137        dir_name = "_testdir_"
138        file_name = host.path.join(dir_name, "_nonempty_")
139        self.cleaner.add_dir(dir_name)
140        # Make dir and check if the directory is there.
141        host.mkdir(dir_name)
142        files = host.listdir(host.curdir)
143        assert dir_name in files
144        # Try to remove a non-empty directory.
145        self.cleaner.add_file(file_name)
146        non_empty = host.open(file_name, "w")
147        non_empty.close()
148        with pytest.raises(ftputil.error.PermanentError):
149            host.rmdir(dir_name)
150        # Remove file.
151        host.unlink(file_name)
152        # `remove` on a directory should fail.
153        try:
154            try:
155                host.remove(dir_name)
156            except ftputil.error.PermanentError as exc:
157                assert str(exc).startswith(
158                         "remove/unlink can only delete files")
159            else:
160                pytest.fail("we shouldn't have come here")
161        finally:
162            # Delete empty directory.
163            host.rmdir(dir_name)
164        files = host.listdir(host.curdir)
165        assert dir_name not in files
166
167    def test_makedirs_without_existing_dirs(self):
168        host = self.host
169        # No `_dir1_` yet
170        assert "_dir1_" not in host.listdir(host.curdir)
171        # Vanilla case, all should go well.
172        host.makedirs("_dir1_/dir2/dir3/dir4")
173        self.cleaner.add_dir("_dir1_")
174        # Check host.
175        assert host.path.isdir("_dir1_")
176        assert host.path.isdir("_dir1_/dir2")
177        assert host.path.isdir("_dir1_/dir2/dir3")
178        assert host.path.isdir("_dir1_/dir2/dir3/dir4")
179
180    def test_makedirs_from_non_root_directory(self):
181        # This is a testcase for issue #22, see
182        # http://ftputil.sschwarzer.net/trac/ticket/22 .
183        host = self.host
184        # No `_dir1_` and `_dir2_` yet
185        assert "_dir1_" not in host.listdir(host.curdir)
186        assert "_dir2_" not in host.listdir(host.curdir)
187        # Part 1: Try to make directories starting from `_dir1_` and
188        # change to non-root directory.
189        self.cleaner.add_dir("_dir1_")
190        host.mkdir("_dir1_")
191        host.chdir("_dir1_")
192        host.makedirs("_dir2_/_dir3_")
193        # Test for expected directory hierarchy.
194        assert host.path.isdir("/_dir1_")
195        assert host.path.isdir("/_dir1_/_dir2_")
196        assert host.path.isdir("/_dir1_/_dir2_/_dir3_")
197        assert not host.path.isdir("/_dir1_/_dir1_")
198        # Remove all but the directory we're in.
199        host.rmdir("/_dir1_/_dir2_/_dir3_")
200        host.rmdir("/_dir1_/_dir2_")
201        # Part 2: Try to make directories starting from root.
202        self.cleaner.add_dir("/_dir2_")
203        host.makedirs("/_dir2_/_dir3_")
204        # Test for expected directory hierarchy
205        assert host.path.isdir("/_dir2_")
206        assert host.path.isdir("/_dir2_/_dir3_")
207        assert not host.path.isdir("/_dir1_/_dir2_")
208
209    def test_makedirs_of_existing_directory(self):
210        host = self.host
211        # The (chrooted) login directory
212        host.makedirs("/")
213
214    def test_makedirs_with_file_in_the_way(self):
215        host = self.host
216        self.cleaner.add_dir("_dir1_")
217        host.mkdir("_dir1_")
218        self.make_remote_file("_dir1_/file1")
219        # Try it.
220        with pytest.raises(ftputil.error.PermanentError):
221            host.makedirs("_dir1_/file1")
222        with pytest.raises(ftputil.error.PermanentError):
223            host.makedirs("_dir1_/file1/dir2")
224
225    def test_makedirs_with_existing_directory(self):
226        host = self.host
227        self.cleaner.add_dir("_dir1_")
228        host.mkdir("_dir1_")
229        host.makedirs("_dir1_/dir2")
230        # Check
231        assert host.path.isdir("_dir1_")
232        assert host.path.isdir("_dir1_/dir2")
233
234    def test_makedirs_in_non_writable_directory(self):
235        host = self.host
236        # Preparation: `rootdir1` exists but is only writable by root.
237        with pytest.raises(ftputil.error.PermanentError):
238            host.makedirs("rootdir1/dir2")
239
240    def test_makedirs_with_writable_directory_at_end(self):
241        host = self.host
242        self.cleaner.add_dir("rootdir2/dir2")
243        # Preparation: `rootdir2` exists but is only writable by root.
244        # `dir2` is writable by regular ftp users. Both directories
245        # below should work.
246        host.makedirs("rootdir2/dir2")
247        host.makedirs("rootdir2/dir2/dir3")
248
249
250class TestRemoval(RealFTPTest):
251
252    def test_rmtree_without_error_handler(self):
253        host = self.host
254        # Build a tree.
255        self.cleaner.add_dir("_dir1_")
256        host.makedirs("_dir1_/dir2")
257        self.make_remote_file("_dir1_/file1")
258        self.make_remote_file("_dir1_/file2")
259        self.make_remote_file("_dir1_/dir2/file3")
260        self.make_remote_file("_dir1_/dir2/file4")
261        # Try to remove a _file_ with `rmtree`.
262        with pytest.raises(ftputil.error.PermanentError):
263            host.rmtree("_dir1_/file2")
264        # Remove `dir2`.
265        host.rmtree("_dir1_/dir2")
266        assert not host.path.exists("_dir1_/dir2")
267        assert host.path.exists("_dir1_/file2")
268        # Re-create `dir2` and remove `_dir1_`.
269        host.mkdir("_dir1_/dir2")
270        self.make_remote_file("_dir1_/dir2/file3")
271        self.make_remote_file("_dir1_/dir2/file4")
272        host.rmtree("_dir1_")
273        assert not host.path.exists("_dir1_")
274
275    def test_rmtree_with_error_handler(self):
276        host = self.host
277        self.cleaner.add_dir("_dir1_")
278        host.mkdir("_dir1_")
279        self.make_remote_file("_dir1_/file1")
280        # Prepare error "handler"
281        log = []
282        def error_handler(*args):
283            log.append(args)
284        # Try to remove a file as root "directory".
285        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
286        assert log == []
287        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
288        assert log[0][0] == host.listdir
289        assert log[0][1] == "_dir1_/file1"
290        assert log[1][0] == host.rmdir
291        assert log[1][1] == "_dir1_/file1"
292        host.rmtree("_dir1_")
293        # Try to remove a non-existent directory.
294        del log[:]
295        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
296        assert log[0][0] == host.listdir
297        assert log[0][1] == "_dir1_"
298        assert log[1][0] == host.rmdir
299        assert log[1][1] == "_dir1_"
300
301    def test_remove_non_existent_item(self):
302        host = self.host
303        with pytest.raises(ftputil.error.PermanentError):
304            host.remove("nonexistent")
305
306    def test_remove_existing_file(self):
307        self.cleaner.add_file("_testfile_")
308        self.make_remote_file("_testfile_")
309        host = self.host
310        assert host.path.isfile("_testfile_")
311        host.remove("_testfile_")
312        assert not host.path.exists("_testfile_")
313
314
315class TestWalk(RealFTPTest):
316    """
317    Walk the directory tree
318
319      walk_test
320      ├── dir1
321      │   ├── dir11
322      │   └── dir12
323      │       ├── dir123
324      │       │   └── file1234
325      │       ├── file121
326      │       └── file122
327      ├── dir2
328      ├── dir3
329      │   ├── dir31
330      │   ├── dir32 -> ../dir1/dir12/dir123
331      │   ├── file31
332      │   └── file32
333      └── file4
334
335    and check if the results are the expected ones.
336    """
337
338    def _walk_test(self, expected_result, **walk_kwargs):
339        """Walk the directory and test results."""
340        # Collect data using `walk`.
341        actual_result = []
342        for items in self.host.walk(**walk_kwargs):
343            actual_result.append(items)
344        # Compare with expected results.
345        assert len(actual_result) == len(expected_result)
346        for index, _ in enumerate(actual_result):
347            assert actual_result[index] == expected_result[index]
348
349    def test_walk_topdown(self):
350        # Preparation: build tree in directory `walk_test`.
351        expected_result = [
352          ("walk_test",
353           ["dir1", "dir2", "dir3"],
354           ["file4"]),
355          #
356          ("walk_test/dir1",
357           ["dir11", "dir12"],
358           []),
359          #
360          ("walk_test/dir1/dir11",
361           [],
362           []),
363          #
364          ("walk_test/dir1/dir12",
365           ["dir123"],
366           ["file121", "file122"]),
367          #
368          ("walk_test/dir1/dir12/dir123",
369           [],
370           ["file1234"]),
371          #
372          ("walk_test/dir2",
373           [],
374           []),
375          #
376          ("walk_test/dir3",
377           ["dir31", "dir32"],
378           ["file31", "file32"]),
379          #
380          ("walk_test/dir3/dir31",
381           [],
382           []),
383          ]
384        self._walk_test(expected_result, top="walk_test")
385
386    def test_walk_depth_first(self):
387        # Preparation: build tree in directory `walk_test`
388        expected_result = [
389          ("walk_test/dir1/dir11",
390           [],
391           []),
392          #
393          ("walk_test/dir1/dir12/dir123",
394           [],
395           ["file1234"]),
396          #
397          ("walk_test/dir1/dir12",
398           ["dir123"],
399           ["file121", "file122"]),
400          #
401          ("walk_test/dir1",
402           ["dir11", "dir12"],
403           []),
404          #
405          ("walk_test/dir2",
406           [],
407           []),
408          #
409          ("walk_test/dir3/dir31",
410           [],
411           []),
412          #
413          ("walk_test/dir3",
414           ["dir31", "dir32"],
415           ["file31", "file32"]),
416          #
417          ("walk_test",
418           ["dir1", "dir2", "dir3"],
419           ["file4"])
420          ]
421        self._walk_test(expected_result, top="walk_test", topdown=False)
422
423    def test_walk_following_links(self):
424        # Preparation: build tree in directory `walk_test`.
425        expected_result = [
426          ("walk_test",
427           ["dir1", "dir2", "dir3"],
428           ["file4"]),
429          #
430          ("walk_test/dir1",
431           ["dir11", "dir12"],
432           []),
433          #
434          ("walk_test/dir1/dir11",
435           [],
436           []),
437          #
438          ("walk_test/dir1/dir12",
439           ["dir123"],
440           ["file121", "file122"]),
441          #
442          ("walk_test/dir1/dir12/dir123",
443           [],
444           ["file1234"]),
445          #
446          ("walk_test/dir2",
447           [],
448           []),
449          #
450          ("walk_test/dir3",
451           ["dir31", "dir32"],
452           ["file31", "file32"]),
453          #
454          ("walk_test/dir3/dir31",
455           [],
456           []),
457          #
458          ("walk_test/dir3/dir32",
459           [],
460           ["file1234"]),
461          ]
462        self._walk_test(expected_result, top="walk_test", followlinks=True)
463
464
465class TestRename(RealFTPTest):
466
467    def test_rename(self):
468        host = self.host
469        # Make sure the target of the renaming operation is removed.
470        self.cleaner.add_file("_testfile2_")
471        self.make_remote_file("_testfile1_")
472        host.rename("_testfile1_", "_testfile2_")
473        assert not host.path.exists("_testfile1_")
474        assert host.path.exists("_testfile2_")
475
476    def test_rename_with_spaces_in_directory(self):
477        host = self.host
478        dir_name = "_dir with spaces_"
479        self.cleaner.add_dir(dir_name)
480        host.mkdir(dir_name)
481        self.make_remote_file(dir_name + "/testfile1")
482        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
483        assert not host.path.exists(dir_name + "/testfile1")
484        assert host.path.exists(dir_name + "/testfile2")
485
486
487class TestStat(RealFTPTest):
488
489    def test_stat(self):
490        host = self.host
491        dir_name = "_testdir_"
492        file_name = host.path.join(dir_name, "_nonempty_")
493        # Make a directory and a file in it.
494        self.cleaner.add_dir(dir_name)
495        host.mkdir(dir_name)
496        with host.open(file_name, "wb") as fobj:
497            fobj.write(b"abc\x12\x34def\t")
498        # Do some stats
499        # - dir
500        dir_stat = host.stat(dir_name)
501        assert isinstance(dir_stat._st_name, ftputil.compat.unicode_type)
502        assert host.listdir(dir_name) == ["_nonempty_"]
503        assert host.path.isdir(dir_name)
504        assert not host.path.isfile(dir_name)
505        assert not host.path.islink(dir_name)
506        # - file
507        file_stat = host.stat(file_name)
508        assert isinstance(file_stat._st_name, ftputil.compat.unicode_type)
509        assert not host.path.isdir(file_name)
510        assert host.path.isfile(file_name)
511        assert not host.path.islink(file_name)
512        assert host.path.getsize(file_name) == 9
513        # - file's modification time; allow up to two minutes difference
514        host.synchronize_times()
515        server_mtime = host.path.getmtime(file_name)
516        client_mtime = time.mktime(time.localtime())
517        calculated_time_shift = server_mtime - client_mtime
518        assert not abs(calculated_time_shift-host.time_shift()) > 120
519
520    def test_issomething_for_nonexistent_directory(self):
521        host = self.host
522        # Check if we get the right results if even the containing
523        # directory doesn't exist (see ticket #66).
524        nonexistent_path = "/nonexistent/nonexistent"
525        assert not host.path.isdir(nonexistent_path)
526        assert not host.path.isfile(nonexistent_path)
527        assert not host.path.islink(nonexistent_path)
528
529    def test_special_broken_link(self):
530        # Test for ticket #39.
531        host = self.host
532        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
533        assert (host.lstat(broken_link_name)._st_target ==
534                "../nonexistent/nonexistent")
535        assert not host.path.isdir(broken_link_name)
536        assert not host.path.isfile(broken_link_name)
537        assert host.path.islink(broken_link_name)
538
539    def test_concurrent_access(self):
540        self.make_remote_file("_testfile_")
541        with ftputil.FTPHost(*self.login_data) as host1:
542            with ftputil.FTPHost(*self.login_data) as host2:
543                stat_result1 = host1.stat("_testfile_")
544                stat_result2 = host2.stat("_testfile_")
545                assert stat_result1 == stat_result2
546                host2.remove("_testfile_")
547                # Can still get the result via `host1`
548                stat_result1 = host1.stat("_testfile_")
549                assert stat_result1 == stat_result2
550                # Stat'ing on `host2` gives an exception.
551                with pytest.raises(ftputil.error.PermanentError):
552                    host2.stat("_testfile_")
553                # Stat'ing on `host1` after invalidation
554                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
555                host1.stat_cache.invalidate(absolute_path)
556                with pytest.raises(ftputil.error.PermanentError):
557                    host1.stat("_testfile_")
558
559    def test_cache_auto_resizing(self):
560        """Test if the cache is resized appropriately."""
561        host = self.host
562        cache = host.stat_cache._cache
563        # Make sure the cache size isn't adjusted towards smaller values.
564        unused_entries = host.listdir("walk_test")
565        assert cache.size == ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE
566        # Make the cache very small initially and see if it gets resized.
567        cache.size = 2
568        entries = host.listdir("walk_test")
569        # The adjusted cache size should be larger or equal to the
570        # number of items in `walk_test` and its parent directory. The
571        # latter is read implicitly upon `listdir`'s `isdir` call.
572        expected_min_cache_size = max(len(host.listdir(host.curdir)),
573                                      len(entries))
574        assert cache.size >= expected_min_cache_size
575
576
577class TestUploadAndDownload(RealFTPTest):
578    """Test upload and download (including time shift test)."""
579
580    def test_time_shift(self):
581        self.host.synchronize_times()
582        assert self.host.time_shift() == EXPECTED_TIME_SHIFT
583
584    @pytest.mark.slow_test
585    def test_upload(self):
586        host = self.host
587        host.synchronize_times()
588        local_file = "_local_file_"
589        remote_file = "_remote_file_"
590        # Make local file to upload.
591        self.make_local_file()
592        # Wait, else small time differences between client and server
593        # actually could trigger the update.
594        time.sleep(65)
595        try:
596            self.cleaner.add_file(remote_file)
597            host.upload(local_file, remote_file)
598            # Retry; shouldn't be uploaded
599            uploaded = host.upload_if_newer(local_file, remote_file)
600            assert uploaded is False
601            # Rewrite the local file.
602            self.make_local_file()
603            # Retry; should be uploaded now
604            uploaded = host.upload_if_newer(local_file, remote_file)
605            assert uploaded is True
606        finally:
607            # Clean up
608            os.unlink(local_file)
609
610    @pytest.mark.slow_test
611    def test_download(self):
612        host = self.host
613        host.synchronize_times()
614        local_file = "_local_file_"
615        remote_file = "_remote_file_"
616        # Make a remote file.
617        self.make_remote_file(remote_file)
618        # File should be downloaded as it's not present yet.
619        downloaded = host.download_if_newer(remote_file, local_file)
620        assert downloaded is True
621        try:
622            # If the remote file, taking the datetime precision into
623            # account, _might_ be newer, the file will be downloaded
624            # again. To prevent this, wait a bit over a minute (the
625            # remote precision), then "touch" the local file.
626            time.sleep(65)
627            # Create empty file.
628            with open(local_file, "w") as fobj:
629                pass
630            # Local file is present and newer, so shouldn't download.
631            downloaded = host.download_if_newer(remote_file, local_file)
632            assert downloaded is False
633            # Re-make the remote file.
634            self.make_remote_file(remote_file)
635            # Local file is present but possibly older (taking the
636            # possible deviation because of the precision into account),
637            # so should download.
638            downloaded = host.download_if_newer(remote_file, local_file)
639            assert downloaded is True
640        finally:
641            # Clean up.
642            os.unlink(local_file)
643
644    def test_callback_with_transfer(self):
645        host = self.host
646        FILE_NAME = "debian-keyring.tar.gz"
647        # Default chunk size as in `FTPHost.copyfileobj`
648        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
649        file_size = host.path.getsize(FILE_NAME)
650        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
651        # Add one chunk for remainder.
652        chunk_count += 1
653        # Define a callback that just collects all data passed to it.
654        transferred_chunks_list = []
655        def test_callback(chunk):
656            transferred_chunks_list.append(chunk)
657        try:
658            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
659            # Construct a list of data chunks we expect.
660            expected_chunks_list = []
661            with open(FILE_NAME, "rb") as downloaded_fobj:
662                while True:
663                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
664                    if not chunk:
665                        break
666                    expected_chunks_list.append(chunk)
667            # Examine data collected by callback function.
668            assert len(transferred_chunks_list) == chunk_count
669            assert transferred_chunks_list == expected_chunks_list
670        finally:
671            os.unlink(FILE_NAME)
672
673
674class TestFTPFiles(RealFTPTest):
675
676    def test_only_closed_children(self):
677        REMOTE_FILE_NAME = "CONTENTS"
678        host = self.host
679        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
680            # Create empty file and close it.
681            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
682                pass
683            # This should re-use the second child because the first isn't
684            # closed but the second is.
685            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
686                assert len(host._children) == 2
687                assert file_obj._host is host._children[1]
688
689    def test_no_timed_out_children(self):
690        REMOTE_FILE_NAME = "CONTENTS"
691        host = self.host
692        # Implicitly create child host object.
693        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
694            pass
695        # Monkey-patch file to simulate an FTP server timeout below.
696        def timed_out_pwd():
697            raise ftplib.error_temp("simulated timeout")
698        file_obj1._host._session.pwd = timed_out_pwd
699        # Try to get a file - which shouldn't be the timed-out file.
700        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
701            assert file_obj1 is not file_obj2
702        # Re-use closed and not timed-out child session.
703        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
704            pass
705        assert file_obj2 is file_obj3
706
707    def test_no_delayed_226_children(self):
708        REMOTE_FILE_NAME = "CONTENTS"
709        host = self.host
710        # Implicitly create child host object.
711        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
712            pass
713        # Monkey-patch file to simulate an FTP server timeout below.
714        def timed_out_pwd():
715            raise ftplib.error_reply("delayed 226 reply")
716        file_obj1._host._session.pwd = timed_out_pwd
717        # Try to get a file - which shouldn't be the timed-out file.
718        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
719            assert file_obj1 is not file_obj2
720        # Re-use closed and not timed-out child session.
721        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
722            pass
723        assert file_obj2 is file_obj3
724
725
726class TestChmod(RealFTPTest):
727
728    def assert_mode(self, path, expected_mode):
729        """
730        Return an integer containing the allowed bits in the mode
731        change command.
732
733        The `FTPHost` object to test against is `self.host`.
734        """
735        full_mode = self.host.stat(path).st_mode
736        # Remove flags we can't set via `chmod`.
737        # Allowed flags according to Python documentation
738        # https://docs.python.org/library/stat.html
739        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
740          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
741          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
742          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
743          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
744        allowed_mask = functools.reduce(operator.or_, allowed_flags)
745        mode = full_mode & allowed_mask
746        assert mode == expected_mode, (
747                 "mode {0:o} != {1:o}".format(mode, expected_mode))
748
749    def test_chmod_existing_directory(self):
750        host = self.host
751        host.mkdir("_test dir_")
752        self.cleaner.add_dir("_test dir_")
753        # Set/get mode of a directory.
754        host.chmod("_test dir_", 0o757)
755        self.assert_mode("_test dir_", 0o757)
756        # Set/get mode in nested directory.
757        host.mkdir("_test dir_/nested_dir")
758        self.cleaner.add_dir("_test dir_/nested_dir")
759        host.chmod("_test dir_/nested_dir", 0o757)
760        self.assert_mode("_test dir_/nested_dir", 0o757)
761
762    def test_chmod_existing_file(self):
763        host = self.host
764        host.mkdir("_test dir_")
765        self.cleaner.add_dir("_test dir_")
766        # Set/get mode on a file.
767        file_name = host.path.join("_test dir_", "_testfile_")
768        self.make_remote_file(file_name)
769        host.chmod(file_name, 0o646)
770        self.assert_mode(file_name, 0o646)
771
772    def test_chmod_nonexistent_path(self):
773        # Set/get mode of a non-existing item.
774        with pytest.raises(ftputil.error.PermanentError):
775            self.host.chmod("nonexistent", 0o757)
776
777    def test_cache_invalidation(self):
778        host = self.host
779        host.mkdir("_test dir_")
780        self.cleaner.add_dir("_test dir_")
781        # Make sure the mode is in the cache.
782        unused_stat_result = host.stat("_test dir_")
783        # Set/get mode of the directory.
784        host.chmod("_test dir_", 0o757)
785        self.assert_mode("_test dir_", 0o757)
786        # Set/get mode on a file.
787        file_name = host.path.join("_test dir_", "_testfile_")
788        self.make_remote_file(file_name)
789        # Make sure the mode is in the cache.
790        unused_stat_result = host.stat(file_name)
791        host.chmod(file_name, 0o646)
792        self.assert_mode(file_name, 0o646)
793
794
795class TestRestArgument(RealFTPTest):
796
797    TEST_FILE_NAME = "rest_test"
798
799    def setup_method(self, method):
800        super(TestRestArgument, self).setup_method(method)
801        # Write test file.
802        with self.host.open(self.TEST_FILE_NAME, "wb") as fobj:
803            fobj.write(b"abcdefghijkl")
804        self.cleaner.add_file(self.TEST_FILE_NAME)
805
806    def test_for_reading(self):
807        """
808        If a `rest` argument is passed to `open`, the following read
809        operation should start at the byte given by `rest`.
810        """
811        with self.host.open(self.TEST_FILE_NAME, "rb", rest=3) as fobj:
812            data = fobj.read()
813        assert data == b"defghijkl"
814
815    def test_for_writing(self):
816        """
817        If a `rest` argument is passed to `open`, the following write
818        operation should start writing at the byte given by `rest`.
819        """
820        with self.host.open(self.TEST_FILE_NAME, "wb", rest=3) as fobj:
821            fobj.write(b"123")
822        with self.host.open(self.TEST_FILE_NAME, "rb") as fobj:
823            data = fobj.read()
824        assert data == b"abc123"
825
826    def test_invalid_read_from_text_file(self):
827        """
828        If the `rest` argument is used for reading from a text file,
829        a `CommandNotImplementedError` should be raised.
830        """
831        with pytest.raises(ftputil.error.CommandNotImplementedError):
832            self.host.open(self.TEST_FILE_NAME, "r", rest=3)
833
834    def test_invalid_write_to_text_file(self):
835        """
836        If the `rest` argument is used for reading from a text file,
837        a `CommandNotImplementedError` should be raised.
838        """
839        with pytest.raises(ftputil.error.CommandNotImplementedError):
840            self.host.open(self.TEST_FILE_NAME, "w", rest=3)
841
842    # There are no tests for reading and writing beyond the end of a
843    # file. For example, if the remote file is 10 bytes long and
844    # `open(remote_file, "rb", rest=100)` is used, the server may
845    # return an error status code or not.
846    #
847    # The server I use for testing returns a 554 status when
848    # attempting to _read_ beyond the end of the file. On the other
849    # hand, if attempting to _write_ beyond the end of the file, the
850    # server accepts the request, but starts writing after the end of
851    # the file, i. e. appends to the file.
852    #
853    # Instead of expecting certain responses that may differ between
854    # server implementations, I leave the bahavior for too large
855    # `rest` arguments undefined. In practice, this shouldn't be a
856    # problem because the `rest` argument should only be used for
857    # error recovery, and in this case a valid byte count for the
858    # `rest` argument should be known.
859
860
861class TestOther(RealFTPTest):
862
863    def test_open_for_reading(self):
864        # Test for issues #17 and #51,
865        # http://ftputil.sschwarzer.net/trac/ticket/17 and
866        # http://ftputil.sschwarzer.net/trac/ticket/51 .
867        file1 = self.host.open("debian-keyring.tar.gz", "rb")
868        time.sleep(1)
869        # Depending on the FTP server, this might return a status code
870        # unexpected by `ftplib` or block the socket connection until
871        # a server-side timeout.
872        file1.close()
873
874    def test_subsequent_reading(self):
875        # Open a file for reading.
876        with self.host.open("CONTENTS", "rb") as file1:
877            pass
878        # Make sure that there are no problems if the connection is reused.
879        with self.host.open("CONTENTS", "rb") as file2:
880            pass
881        assert file1._session is file2._session
882
883    def test_names_with_spaces(self):
884        # Test if directories and files with spaces in their names
885        # can be used.
886        host = self.host
887        assert host.path.isdir("dir with spaces")
888        assert (host.listdir("dir with spaces") ==
889                ["second dir", "some file", "some_file"])
890        assert host.path.isdir("dir with spaces/second dir")
891        assert host.path.isfile("dir with spaces/some_file")
892        assert host.path.isfile("dir with spaces/some file")
893
894    def test_synchronize_times_without_write_access(self):
895        """Test failing synchronization because of non-writable directory."""
896        host = self.host
897        # This isn't writable by the ftp account the tests are run under.
898        host.chdir("rootdir1")
899        with pytest.raises(ftputil.error.TimeShiftError):
900            host.synchronize_times()
901
902    def test_listdir_with_non_ascii_byte_string(self):
903        """
904        `listdir` should accept byte strings with non-ASCII
905        characters and return non-ASCII characters in directory or
906        file names.
907        """
908        host = self.host
909        path = "äbc".encode("UTF-8")
910        names = host.listdir(path)
911        assert names[0] == b"file1"
912        assert names[1] == "file1_ö".encode("UTF-8")
913
914    def test_listdir_with_non_ascii_unicode_string(self):
915        """
916        `listdir` should accept unicode strings with non-ASCII
917        characters and return non-ASCII characters in directory or
918        file names.
919        """
920        host = self.host
921        # `ftplib` under Python 3 only works correctly if the unicode
922        # strings are decoded from latin1. Under Python 2, ftputil
923        # is supposed to provide a compatible interface.
924        path = "äbc".encode("UTF-8").decode("latin1")
925        names = host.listdir(path)
926        assert names[0] == "file1"
927        assert names[1] == "file1_ö".encode("UTF-8").decode("latin1")
928
929    def test_path_with_non_latin1_unicode_string(self):
930        """
931        ftputil operations shouldn't accept file paths with non-latin1
932        characters.
933        """
934        # Use some musical symbols. These are certainly not latin1.
935        path = "𝄞𝄢"
936        # `UnicodeEncodeError` is also the exception that `ftplib`
937        # raises if it gets a non-latin1 path.
938        with pytest.raises(UnicodeEncodeError):
939            self.host.mkdir(path)
940
941    def test_list_a_option(self):
942        # For this test to pass, the server must _not_ list "hidden"
943        # files by default but instead only when the `LIST` `-a`
944        # option is used.
945        host = self.host
946        assert not host.use_list_a_option
947        directory_entries = host.listdir(host.curdir)
948        assert ".hidden" not in directory_entries
949        # Switch on showing of hidden paths.
950        host.use_list_a_option = True
951        directory_entries = host.listdir(host.curdir)
952        assert ".hidden" in directory_entries
953
954    def _make_objects_to_be_garbage_collected(self):
955        for _ in range(10):
956            with ftputil.FTPHost(*self.login_data) as host:
957                for _ in range(10):
958                    unused_stat_result = host.stat("CONTENTS")
959                    with host.open("CONTENTS") as fobj:
960                        unused_data = fobj.read()
961
962    def test_garbage_collection(self):
963        """Test whether there are cycles which prevent garbage collection."""
964        gc.collect()
965        objects_before_test = len(gc.garbage)
966        self._make_objects_to_be_garbage_collected()
967        gc.collect()
968        objects_after_test = len(gc.garbage)
969        assert not objects_after_test - objects_before_test
970
971    @pytest.mark.skipif(
972      ftputil.compat.python_version > 2,
973      reason="test requires M2Crypto which only works on Python 2")
974    def test_m2crypto_session(self):
975        """
976        Test if a session with `M2Crypto.ftpslib.FTP_TLS` is set up
977        correctly and works with unicode input.
978        """
979        # See ticket #78.
980        #
981        # M2Crypto is only available for Python 2.
982        import M2Crypto
983        factory = ftputil.session.session_factory(
984                    base_class=M2Crypto.ftpslib.FTP_TLS,
985                    encrypt_data_channel=True)
986        with ftputil.FTPHost(*self.login_data, session_factory=factory) as host:
987            # Test if unicode argument works.
988            files = host.listdir(".")
989        assert "CONTENTS" in files
Note: See TracBrowser for help on using the repository browser.