source: test/test_real_ftp.py @ 1671:2a3350ab9374

Last change on this file since 1671:2a3350ab9374 was 1671:2a3350ab9374, checked in by Stefan Schwarzer <sschwarzer@…>, 3 years ago
Handle socket read timeout in `_available_child` When `FTPFile.close` causes a socket read timeout, the _next_ read on the socket will raise an `OSError`. Hence, the `OSError` can happen in the `host._session.pwd` call in `FTPHost._available_child`. Catch this `OSError`. Although the fix in `ftputil.host.FTPHost._available_child` will make the child session handling work, socket read errors could result in child sessions being returned in a non-reproducible way. To make the identity checks in the tests reproducible, use the small `CONTENTS` file instead of the larger `debian-keyring.tar.gz` file. Investigation of ticket #112 showed that socket read errors would rather occur with larger files. This change in the tests is slightly hackish, but should be ok, unless it'll cause other problems later on. ticket: 112
File size: 36.1 KB
Line 
1# encoding: UTF-8
2# Copyright (C) 2003-2016, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7#
8# This test writes some files and directories on the local client and
9# the remote server. You'll need write access in the login directory.
10# This test can take a few minutes because it has to wait to test the
11# timezone calculation.
12
13from __future__ import absolute_import
14from __future__ import unicode_literals
15
16import ftplib
17import functools
18import gc
19import operator
20import os
21import time
22import stat
23
24import pytest
25
26import ftputil.compat
27import ftputil.error
28import ftputil.file_transfer
29import ftputil.session
30import ftputil.stat_cache
31
32import test
33
34
35def utc_local_time_shift():
36    """
37    Return the expected time shift in seconds assuming the server
38    uses UTC in its listings and the client uses local time.
39
40    This is needed because Pure-FTPd meanwhile seems to insist that
41    the displayed time for files is in UTC.
42    """
43    utc_tuple = time.gmtime()
44    localtime_tuple = time.localtime()
45    # To calculate the correct times shift, we need to ignore the
46    # DST component in the localtime tuple, i. e. set it to 0.
47    localtime_tuple = localtime_tuple[:-1] + (0,)
48    time_shift_in_seconds = (time.mktime(utc_tuple) -
49                             time.mktime(localtime_tuple))
50    # To be safe, round the above value to units of 3600 s (1 hour).
51    return round(time_shift_in_seconds / 3600.0) * 3600
52
53# Difference between local times of server and client. If 0.0, server
54# and client use the same timezone.
55#EXPECTED_TIME_SHIFT = utc_local_time_shift()
56# Pure-FTPd seems to have changed its mind (see docstring of
57# `utc_local_time_shift`).
58EXPECTED_TIME_SHIFT = 0.0
59
60
61class Cleaner(object):
62    """
63    This class helps remove directories and files which might
64    otherwise be left behind if a test fails in unexpected ways.
65    """
66
67    def __init__(self, host):
68        # The test class (probably `RealFTPTest`) and the helper
69        # class share the same `FTPHost` object.
70        self._host = host
71        self._ftp_items = []
72
73    def add_dir(self, path):
74        """Schedule a directory with path `path` for removal."""
75        self._ftp_items.append(("d", self._host.path.abspath(path)))
76
77    def add_file(self, path):
78        """Schedule a file with path `path` for removal."""
79        self._ftp_items.append(("f", self._host.path.abspath(path)))
80
81    def clean(self):
82        """
83        Remove the directories and files previously remembered.
84        The removal works in reverse order of the scheduling with
85        `add_dir` and `add_file`.
86
87        Errors due to a removal are ignored.
88        """
89        self._host.chdir("/")
90        for type_, path in reversed(self._ftp_items):
91            try:
92                if type_ == "d":
93                    # If something goes wrong in `rmtree` we might
94                    # leave a mess behind.
95                    self._host.rmtree(path)
96                elif type_ == "f":
97                    # Minor mess if `remove` fails
98                    self._host.remove(path)
99            except ftputil.error.FTPError:
100                pass
101
102
103class RealFTPTest(object):
104
105    def setup_method(self, method):
106        # Server, username, password.
107        self.login_data = ("localhost", "ftptest",
108                            "d605581757de5eb56d568a4419f4126e")
109        self.host = ftputil.FTPHost(*self.login_data)
110        self.cleaner = Cleaner(self.host)
111
112    def teardown_method(self, method):
113        self.cleaner.clean()
114        self.host.close()
115
116    #
117    # Helper methods
118    #
119    def make_remote_file(self, path):
120        """Create a file on the FTP host."""
121        self.cleaner.add_file(path)
122        with self.host.open(path, "wb") as file_:
123            # Write something. Otherwise the FTP server might not update
124            # the time of last modification if the file existed before.
125            file_.write(b"\n")
126
127    def make_local_file(self):
128        """Create a file on the local host (= on the client side)."""
129        with open("_local_file_", "wb") as fobj:
130            fobj.write(b"abc\x12\x34def\t")
131
132
133class TestMkdir(RealFTPTest):
134
135    def test_mkdir_rmdir(self):
136        host = self.host
137        dir_name = "_testdir_"
138        file_name = host.path.join(dir_name, "_nonempty_")
139        self.cleaner.add_dir(dir_name)
140        # Make dir and check if the directory is there.
141        host.mkdir(dir_name)
142        files = host.listdir(host.curdir)
143        assert dir_name in files
144        # Try to remove a non-empty directory.
145        self.cleaner.add_file(file_name)
146        non_empty = host.open(file_name, "w")
147        non_empty.close()
148        with pytest.raises(ftputil.error.PermanentError):
149            host.rmdir(dir_name)
150        # Remove file.
151        host.unlink(file_name)
152        # `remove` on a directory should fail.
153        try:
154            try:
155                host.remove(dir_name)
156            except ftputil.error.PermanentError as exc:
157                assert str(exc).startswith(
158                         "remove/unlink can only delete files")
159            else:
160                pytest.fail("we shouldn't have come here")
161        finally:
162            # Delete empty directory.
163            host.rmdir(dir_name)
164        files = host.listdir(host.curdir)
165        assert dir_name not in files
166
167    def test_makedirs_without_existing_dirs(self):
168        host = self.host
169        # No `_dir1_` yet
170        assert "_dir1_" not in host.listdir(host.curdir)
171        # Vanilla case, all should go well.
172        host.makedirs("_dir1_/dir2/dir3/dir4")
173        self.cleaner.add_dir("_dir1_")
174        # Check host.
175        assert host.path.isdir("_dir1_")
176        assert host.path.isdir("_dir1_/dir2")
177        assert host.path.isdir("_dir1_/dir2/dir3")
178        assert host.path.isdir("_dir1_/dir2/dir3/dir4")
179
180    def test_makedirs_from_non_root_directory(self):
181        # This is a testcase for issue #22, see
182        # http://ftputil.sschwarzer.net/trac/ticket/22 .
183        host = self.host
184        # No `_dir1_` and `_dir2_` yet
185        assert "_dir1_" not in host.listdir(host.curdir)
186        assert "_dir2_" not in host.listdir(host.curdir)
187        # Part 1: Try to make directories starting from `_dir1_` and
188        # change to non-root directory.
189        self.cleaner.add_dir("_dir1_")
190        host.mkdir("_dir1_")
191        host.chdir("_dir1_")
192        host.makedirs("_dir2_/_dir3_")
193        # Test for expected directory hierarchy.
194        assert host.path.isdir("/_dir1_")
195        assert host.path.isdir("/_dir1_/_dir2_")
196        assert host.path.isdir("/_dir1_/_dir2_/_dir3_")
197        assert not host.path.isdir("/_dir1_/_dir1_")
198        # Remove all but the directory we're in.
199        host.rmdir("/_dir1_/_dir2_/_dir3_")
200        host.rmdir("/_dir1_/_dir2_")
201        # Part 2: Try to make directories starting from root.
202        self.cleaner.add_dir("/_dir2_")
203        host.makedirs("/_dir2_/_dir3_")
204        # Test for expected directory hierarchy
205        assert host.path.isdir("/_dir2_")
206        assert host.path.isdir("/_dir2_/_dir3_")
207        assert not host.path.isdir("/_dir1_/_dir2_")
208
209    def test_makedirs_of_existing_directory(self):
210        host = self.host
211        # The (chrooted) login directory
212        host.makedirs("/")
213
214    def test_makedirs_with_file_in_the_way(self):
215        host = self.host
216        self.cleaner.add_dir("_dir1_")
217        host.mkdir("_dir1_")
218        self.make_remote_file("_dir1_/file1")
219        # Try it.
220        with pytest.raises(ftputil.error.PermanentError):
221            host.makedirs("_dir1_/file1")
222        with pytest.raises(ftputil.error.PermanentError):
223            host.makedirs("_dir1_/file1/dir2")
224
225    def test_makedirs_with_existing_directory(self):
226        host = self.host
227        self.cleaner.add_dir("_dir1_")
228        host.mkdir("_dir1_")
229        host.makedirs("_dir1_/dir2")
230        # Check
231        assert host.path.isdir("_dir1_")
232        assert host.path.isdir("_dir1_/dir2")
233
234    def test_makedirs_in_non_writable_directory(self):
235        host = self.host
236        # Preparation: `rootdir1` exists but is only writable by root.
237        with pytest.raises(ftputil.error.PermanentError):
238            host.makedirs("rootdir1/dir2")
239
240    def test_makedirs_with_writable_directory_at_end(self):
241        host = self.host
242        self.cleaner.add_dir("rootdir2/dir2")
243        # Preparation: `rootdir2` exists but is only writable by root.
244        # `dir2` is writable by regular ftp users. Both directories
245        # below should work.
246        host.makedirs("rootdir2/dir2")
247        host.makedirs("rootdir2/dir2/dir3")
248
249
250class TestRemoval(RealFTPTest):
251
252    def test_rmtree_without_error_handler(self):
253        host = self.host
254        # Build a tree.
255        self.cleaner.add_dir("_dir1_")
256        host.makedirs("_dir1_/dir2")
257        self.make_remote_file("_dir1_/file1")
258        self.make_remote_file("_dir1_/file2")
259        self.make_remote_file("_dir1_/dir2/file3")
260        self.make_remote_file("_dir1_/dir2/file4")
261        # Try to remove a _file_ with `rmtree`.
262        with pytest.raises(ftputil.error.PermanentError):
263            host.rmtree("_dir1_/file2")
264        # Remove `dir2`.
265        host.rmtree("_dir1_/dir2")
266        assert not host.path.exists("_dir1_/dir2")
267        assert host.path.exists("_dir1_/file2")
268        # Re-create `dir2` and remove `_dir1_`.
269        host.mkdir("_dir1_/dir2")
270        self.make_remote_file("_dir1_/dir2/file3")
271        self.make_remote_file("_dir1_/dir2/file4")
272        host.rmtree("_dir1_")
273        assert not host.path.exists("_dir1_")
274
275    def test_rmtree_with_error_handler(self):
276        host = self.host
277        self.cleaner.add_dir("_dir1_")
278        host.mkdir("_dir1_")
279        self.make_remote_file("_dir1_/file1")
280        # Prepare error "handler"
281        log = []
282        def error_handler(*args):
283            log.append(args)
284        # Try to remove a file as root "directory".
285        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
286        assert log == []
287        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
288        assert log[0][0] == host.listdir
289        assert log[0][1] == "_dir1_/file1"
290        assert log[1][0] == host.rmdir
291        assert log[1][1] == "_dir1_/file1"
292        host.rmtree("_dir1_")
293        # Try to remove a non-existent directory.
294        del log[:]
295        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
296        assert log[0][0] == host.listdir
297        assert log[0][1] == "_dir1_"
298        assert log[1][0] == host.rmdir
299        assert log[1][1] == "_dir1_"
300
301    def test_remove_non_existent_item(self):
302        host = self.host
303        with pytest.raises(ftputil.error.PermanentError):
304            host.remove("nonexistent")
305
306    def test_remove_existing_file(self):
307        self.cleaner.add_file("_testfile_")
308        self.make_remote_file("_testfile_")
309        host = self.host
310        assert host.path.isfile("_testfile_")
311        host.remove("_testfile_")
312        assert not host.path.exists("_testfile_")
313
314
315class TestWalk(RealFTPTest):
316    """
317    Walk the directory tree
318
319      walk_test
320      ├── dir1
321      │   ├── dir11
322      │   └── dir12
323      │       ├── dir123
324      │       │   └── file1234
325      │       ├── file121
326      │       └── file122
327      ├── dir2
328      ├── dir3
329      │   ├── dir31
330      │   ├── dir32 -> ../dir1/dir12/dir123
331      │   ├── file31
332      │   └── file32
333      └── file4
334
335    and check if the results are the expected ones.
336    """
337
338    def _walk_test(self, expected_result, **walk_kwargs):
339        """Walk the directory and test results."""
340        # Collect data using `walk`.
341        actual_result = []
342        for items in self.host.walk(**walk_kwargs):
343            actual_result.append(items)
344        # Compare with expected results.
345        assert len(actual_result) == len(expected_result)
346        for index, _ in enumerate(actual_result):
347            assert actual_result[index] == expected_result[index]
348
349    def test_walk_topdown(self):
350        # Preparation: build tree in directory `walk_test`.
351        expected_result = [
352          ("walk_test",
353           ["dir1", "dir2", "dir3"],
354           ["file4"]),
355          #
356          ("walk_test/dir1",
357           ["dir11", "dir12"],
358           []),
359          #
360          ("walk_test/dir1/dir11",
361           [],
362           []),
363          #
364          ("walk_test/dir1/dir12",
365           ["dir123"],
366           ["file121", "file122"]),
367          #
368          ("walk_test/dir1/dir12/dir123",
369           [],
370           ["file1234"]),
371          #
372          ("walk_test/dir2",
373           [],
374           []),
375          #
376          ("walk_test/dir3",
377           ["dir31", "dir32"],
378           ["file31", "file32"]),
379          #
380          ("walk_test/dir3/dir31",
381           [],
382           []),
383          ]
384        self._walk_test(expected_result, top="walk_test")
385
386    def test_walk_depth_first(self):
387        # Preparation: build tree in directory `walk_test`
388        expected_result = [
389          ("walk_test/dir1/dir11",
390           [],
391           []),
392          #
393          ("walk_test/dir1/dir12/dir123",
394           [],
395           ["file1234"]),
396          #
397          ("walk_test/dir1/dir12",
398           ["dir123"],
399           ["file121", "file122"]),
400          #
401          ("walk_test/dir1",
402           ["dir11", "dir12"],
403           []),
404          #
405          ("walk_test/dir2",
406           [],
407           []),
408          #
409          ("walk_test/dir3/dir31",
410           [],
411           []),
412          #
413          ("walk_test/dir3",
414           ["dir31", "dir32"],
415           ["file31", "file32"]),
416          #
417          ("walk_test",
418           ["dir1", "dir2", "dir3"],
419           ["file4"])
420          ]
421        self._walk_test(expected_result, top="walk_test", topdown=False)
422
423    def test_walk_following_links(self):
424        # Preparation: build tree in directory `walk_test`.
425        expected_result = [
426          ("walk_test",
427           ["dir1", "dir2", "dir3"],
428           ["file4"]),
429          #
430          ("walk_test/dir1",
431           ["dir11", "dir12"],
432           []),
433          #
434          ("walk_test/dir1/dir11",
435           [],
436           []),
437          #
438          ("walk_test/dir1/dir12",
439           ["dir123"],
440           ["file121", "file122"]),
441          #
442          ("walk_test/dir1/dir12/dir123",
443           [],
444           ["file1234"]),
445          #
446          ("walk_test/dir2",
447           [],
448           []),
449          #
450          ("walk_test/dir3",
451           ["dir31", "dir32"],
452           ["file31", "file32"]),
453          #
454          ("walk_test/dir3/dir31",
455           [],
456           []),
457          #
458          ("walk_test/dir3/dir32",
459           [],
460           ["file1234"]),
461          ]
462        self._walk_test(expected_result, top="walk_test", followlinks=True)
463
464
465class TestRename(RealFTPTest):
466
467    def test_rename(self):
468        host = self.host
469        # Make sure the target of the renaming operation is removed.
470        self.cleaner.add_file("_testfile2_")
471        self.make_remote_file("_testfile1_")
472        host.rename("_testfile1_", "_testfile2_")
473        assert not host.path.exists("_testfile1_")
474        assert host.path.exists("_testfile2_")
475
476    def test_rename_with_spaces_in_directory(self):
477        host = self.host
478        dir_name = "_dir with spaces_"
479        self.cleaner.add_dir(dir_name)
480        host.mkdir(dir_name)
481        self.make_remote_file(dir_name + "/testfile1")
482        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
483        assert not host.path.exists(dir_name + "/testfile1")
484        assert host.path.exists(dir_name + "/testfile2")
485
486
487class TestStat(RealFTPTest):
488
489    def test_stat(self):
490        host = self.host
491        dir_name = "_testdir_"
492        file_name = host.path.join(dir_name, "_nonempty_")
493        # Make a directory and a file in it.
494        self.cleaner.add_dir(dir_name)
495        host.mkdir(dir_name)
496        with host.open(file_name, "wb") as fobj:
497            fobj.write(b"abc\x12\x34def\t")
498        # Do some stats
499        # - dir
500        dir_stat = host.stat(dir_name)
501        assert isinstance(dir_stat._st_name, ftputil.compat.unicode_type)
502        assert host.listdir(dir_name) == ["_nonempty_"]
503        assert host.path.isdir(dir_name)
504        assert not host.path.isfile(dir_name)
505        assert not host.path.islink(dir_name)
506        # - file
507        file_stat = host.stat(file_name)
508        assert isinstance(file_stat._st_name, ftputil.compat.unicode_type)
509        assert not host.path.isdir(file_name)
510        assert host.path.isfile(file_name)
511        assert not host.path.islink(file_name)
512        assert host.path.getsize(file_name) == 9
513        # - file's modification time; allow up to two minutes difference
514        host.synchronize_times()
515        server_mtime = host.path.getmtime(file_name)
516        client_mtime = time.mktime(time.localtime())
517        calculated_time_shift = server_mtime - client_mtime
518        assert not abs(calculated_time_shift-host.time_shift()) > 120
519
520    def test_issomething_for_nonexistent_directory(self):
521        host = self.host
522        # Check if we get the right results if even the containing
523        # directory doesn't exist (see ticket #66).
524        nonexistent_path = "/nonexistent/nonexistent"
525        assert not host.path.isdir(nonexistent_path)
526        assert not host.path.isfile(nonexistent_path)
527        assert not host.path.islink(nonexistent_path)
528
529    def test_special_broken_link(self):
530        # Test for ticket #39.
531        host = self.host
532        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
533        assert (host.lstat(broken_link_name)._st_target ==
534                "../nonexistent/nonexistent")
535        assert not host.path.isdir(broken_link_name)
536        assert not host.path.isfile(broken_link_name)
537        assert host.path.islink(broken_link_name)
538
539    def test_concurrent_access(self):
540        self.make_remote_file("_testfile_")
541        with ftputil.FTPHost(*self.login_data) as host1:
542            with ftputil.FTPHost(*self.login_data) as host2:
543                stat_result1 = host1.stat("_testfile_")
544                stat_result2 = host2.stat("_testfile_")
545                assert stat_result1 == stat_result2
546                host2.remove("_testfile_")
547                # Can still get the result via `host1`
548                stat_result1 = host1.stat("_testfile_")
549                assert stat_result1 == stat_result2
550                # Stat'ing on `host2` gives an exception.
551                with pytest.raises(ftputil.error.PermanentError):
552                    host2.stat("_testfile_")
553                # Stat'ing on `host1` after invalidation
554                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
555                host1.stat_cache.invalidate(absolute_path)
556                with pytest.raises(ftputil.error.PermanentError):
557                    host1.stat("_testfile_")
558
559    def test_cache_auto_resizing(self):
560        """Test if the cache is resized appropriately."""
561        host = self.host
562        cache = host.stat_cache._cache
563        # Make sure the cache size isn't adjusted towards smaller values.
564        unused_entries = host.listdir("walk_test")
565        assert cache.size == ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE
566        # Make the cache very small initially and see if it gets resized.
567        cache.size = 2
568        entries = host.listdir("walk_test")
569        # The adjusted cache size should be larger or equal to the
570        # number of items in `walk_test` and its parent directory. The
571        # latter is read implicitly upon `listdir`'s `isdir` call.
572        expected_min_cache_size = max(len(host.listdir(host.curdir)),
573                                      len(entries))
574        assert cache.size >= expected_min_cache_size
575
576
577class TestUploadAndDownload(RealFTPTest):
578    """Test upload and download (including time shift test)."""
579
580    def test_time_shift(self):
581        self.host.synchronize_times()
582        assert self.host.time_shift() == EXPECTED_TIME_SHIFT
583
584    @pytest.mark.slow_test
585    def test_upload(self):
586        host = self.host
587        host.synchronize_times()
588        local_file = "_local_file_"
589        remote_file = "_remote_file_"
590        # Make local file to upload.
591        self.make_local_file()
592        # Wait, else small time differences between client and server
593        # actually could trigger the update.
594        time.sleep(65)
595        try:
596            self.cleaner.add_file(remote_file)
597            host.upload(local_file, remote_file)
598            # Retry; shouldn't be uploaded
599            uploaded = host.upload_if_newer(local_file, remote_file)
600            assert uploaded is False
601            # Rewrite the local file.
602            self.make_local_file()
603            # Retry; should be uploaded now
604            uploaded = host.upload_if_newer(local_file, remote_file)
605            assert uploaded is True
606        finally:
607            # Clean up
608            os.unlink(local_file)
609
610    @pytest.mark.slow_test
611    def test_download(self):
612        host = self.host
613        host.synchronize_times()
614        local_file = "_local_file_"
615        remote_file = "_remote_file_"
616        # Make a remote file.
617        self.make_remote_file(remote_file)
618        # File should be downloaded as it's not present yet.
619        downloaded = host.download_if_newer(remote_file, local_file)
620        assert downloaded is True
621        try:
622            # If the remote file, taking the datetime precision into
623            # account, _might_ be newer, the file will be downloaded
624            # again. To prevent this, wait a bit over a minute (the
625            # remote precision), then "touch" the local file.
626            time.sleep(65)
627            # Create empty file.
628            with open(local_file, "w") as fobj:
629                pass
630            # Local file is present and newer, so shouldn't download.
631            downloaded = host.download_if_newer(remote_file, local_file)
632            assert downloaded is False
633            # Re-make the remote file.
634            self.make_remote_file(remote_file)
635            # Local file is present but possibly older (taking the
636            # possible deviation because of the precision into account),
637            # so should download.
638            downloaded = host.download_if_newer(remote_file, local_file)
639            assert downloaded is True
640        finally:
641            # Clean up.
642            os.unlink(local_file)
643
644    def test_callback_with_transfer(self):
645        host = self.host
646        FILE_NAME = "debian-keyring.tar.gz"
647        # Default chunk size as in `FTPHost.copyfileobj`
648        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
649        file_size = host.path.getsize(FILE_NAME)
650        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
651        # Add one chunk for remainder.
652        chunk_count += 1
653        # Define a callback that just collects all data passed to it.
654        transferred_chunks_list = []
655        def test_callback(chunk):
656            transferred_chunks_list.append(chunk)
657        try:
658            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
659            # Construct a list of data chunks we expect.
660            expected_chunks_list = []
661            with open(FILE_NAME, "rb") as downloaded_fobj:
662                while True:
663                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
664                    if not chunk:
665                        break
666                    expected_chunks_list.append(chunk)
667            # Examine data collected by callback function.
668            assert len(transferred_chunks_list) == chunk_count
669            assert transferred_chunks_list == expected_chunks_list
670        finally:
671            os.unlink(FILE_NAME)
672
673
674class TestFTPFiles(RealFTPTest):
675
676    def test_only_closed_children(self):
677        REMOTE_FILE_NAME = "CONTENTS"
678        host = self.host
679        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
680            # Create empty file and close it.
681            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
682                pass
683            # This should re-use the second child because the first isn't
684            # closed but the second is.
685            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
686                assert len(host._children) == 2
687                assert file_obj._host is host._children[1]
688
689    def test_no_timed_out_children(self):
690        REMOTE_FILE_NAME = "CONTENTS"
691        host = self.host
692        # Implicitly create child host object.
693        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
694            pass
695        # Monkey-patch file to simulate an FTP server timeout below.
696        def timed_out_pwd():
697            raise ftplib.error_temp("simulated timeout")
698        file_obj1._host._session.pwd = timed_out_pwd
699        # Try to get a file - which shouldn't be the timed-out file.
700        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
701            assert file_obj1 is not file_obj2
702        # Re-use closed and not timed-out child session.
703        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
704            pass
705        assert file_obj2 is file_obj3
706
707    def test_no_delayed_226_children(self):
708        REMOTE_FILE_NAME = "CONTENTS"
709        host = self.host
710        # Implicitly create child host object.
711        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
712            pass
713        # Monkey-patch file to simulate an FTP server timeout below.
714        def timed_out_pwd():
715            raise ftplib.error_reply("delayed 226 reply")
716        file_obj1._host._session.pwd = timed_out_pwd
717        # Try to get a file - which shouldn't be the timed-out file.
718        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
719            assert file_obj1 is not file_obj2
720        # Re-use closed and not timed-out child session.
721        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
722            pass
723        assert file_obj2 is file_obj3
724
725
726class TestChmod(RealFTPTest):
727
728    def assert_mode(self, path, expected_mode):
729        """
730        Return an integer containing the allowed bits in the mode
731        change command.
732
733        The `FTPHost` object to test against is `self.host`.
734        """
735        full_mode = self.host.stat(path).st_mode
736        # Remove flags we can't set via `chmod`.
737        # Allowed flags according to Python documentation
738        # https://docs.python.org/library/stat.html
739        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
740          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
741          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
742          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
743          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
744        allowed_mask = functools.reduce(operator.or_, allowed_flags)
745        mode = full_mode & allowed_mask
746        assert mode == expected_mode, (
747                 "mode {0:o} != {1:o}".format(mode, expected_mode))
748
749    def test_chmod_existing_directory(self):
750        host = self.host
751        host.mkdir("_test dir_")
752        self.cleaner.add_dir("_test dir_")
753        # Set/get mode of a directory.
754        host.chmod("_test dir_", 0o757)
755        self.assert_mode("_test dir_", 0o757)
756        # Set/get mode in nested directory.
757        host.mkdir("_test dir_/nested_dir")
758        self.cleaner.add_dir("_test dir_/nested_dir")
759        host.chmod("_test dir_/nested_dir", 0o757)
760        self.assert_mode("_test dir_/nested_dir", 0o757)
761
762    def test_chmod_existing_file(self):
763        host = self.host
764        host.mkdir("_test dir_")
765        self.cleaner.add_dir("_test dir_")
766        # Set/get mode on a file.
767        file_name = host.path.join("_test dir_", "_testfile_")
768        self.make_remote_file(file_name)
769        host.chmod(file_name, 0o646)
770        self.assert_mode(file_name, 0o646)
771
772    def test_chmod_nonexistent_path(self):
773        # Set/get mode of a non-existing item.
774        with pytest.raises(ftputil.error.PermanentError):
775            self.host.chmod("nonexistent", 0o757)
776
777    def test_cache_invalidation(self):
778        host = self.host
779        host.mkdir("_test dir_")
780        self.cleaner.add_dir("_test dir_")
781        # Make sure the mode is in the cache.
782        unused_stat_result = host.stat("_test dir_")
783        # Set/get mode of the directory.
784        host.chmod("_test dir_", 0o757)
785        self.assert_mode("_test dir_", 0o757)
786        # Set/get mode on a file.
787        file_name = host.path.join("_test dir_", "_testfile_")
788        self.make_remote_file(file_name)
789        # Make sure the mode is in the cache.
790        unused_stat_result = host.stat(file_name)
791        host.chmod(file_name, 0o646)
792        self.assert_mode(file_name, 0o646)
793
794
795class TestRestArgument(RealFTPTest):
796
797    TEST_FILE_NAME = "rest_test"
798
799    def setup_method(self, method):
800        super(TestRestArgument, self).setup_method(method)
801        # Write test file.
802        with self.host.open(self.TEST_FILE_NAME, "wb") as fobj:
803            fobj.write(b"abcdefghijkl")
804        self.cleaner.add_file(self.TEST_FILE_NAME)
805
806    def test_for_reading(self):
807        """
808        If a `rest` argument is passed to `open`, the following read
809        operation should start at the byte given by `rest`.
810        """
811        with self.host.open(self.TEST_FILE_NAME, "rb", rest=3) as fobj:
812            data = fobj.read()
813        assert data == b"defghijkl"
814
815    def test_for_writing(self):
816        """
817        If a `rest` argument is passed to `open`, the following write
818        operation should start writing at the byte given by `rest`.
819        """
820        with self.host.open(self.TEST_FILE_NAME, "wb", rest=3) as fobj:
821            fobj.write(b"123")
822        with self.host.open(self.TEST_FILE_NAME, "rb") as fobj:
823            data = fobj.read()
824        assert data == b"abc123"
825
826    def test_invalid_read_from_text_file(self):
827        """
828        If the `rest` argument is used for reading from a text file,
829        a `CommandNotImplementedError` should be raised.
830        """
831        with pytest.raises(ftputil.error.CommandNotImplementedError):
832            self.host.open(self.TEST_FILE_NAME, "r", rest=3)
833
834    def test_invalid_write_to_text_file(self):
835        """
836        If the `rest` argument is used for reading from a text file,
837        a `CommandNotImplementedError` should be raised.
838        """
839        with pytest.raises(ftputil.error.CommandNotImplementedError):
840            self.host.open(self.TEST_FILE_NAME, "w", rest=3)
841
842    # There are no tests for reading and writing beyond the end of a
843    # file. For example, if the remote file is 10 bytes long and
844    # `open(remote_file, "rb", rest=100)` is used, the server may
845    # return an error status code or not.
846    #
847    # The server I use for testing returns a 554 status when
848    # attempting to _read_ beyond the end of the file. On the other
849    # hand, if attempting to _write_ beyond the end of the file, the
850    # server accepts the request, but starts writing after the end of
851    # the file, i. e. appends to the file.
852    #
853    # Instead of expecting certain responses that may differ between
854    # server implementations, I leave the bahavior for too large
855    # `rest` arguments undefined. In practice, this shouldn't be a
856    # problem because the `rest` argument should only be used for
857    # error recovery, and in this case a valid byte count for the
858    # `rest` argument should be known.
859
860
861class TestOther(RealFTPTest):
862
863    def test_open_for_reading(self):
864        # Test for issues #17 and #51,
865        # http://ftputil.sschwarzer.net/trac/ticket/17 and
866        # http://ftputil.sschwarzer.net/trac/ticket/51 .
867        file1 = self.host.open("debian-keyring.tar.gz", "rb")
868        time.sleep(1)
869        # Depending on the FTP server, this might return a status code
870        # unexpected by `ftplib` or block the socket connection until
871        # a server-side timeout.
872        file1.close()
873
874    def test_subsequent_reading(self):
875        # Open a file for reading.
876        with self.host.open("CONTENTS", "rb") as file1:
877            pass
878        # Make sure that there are no problems if the connection is reused.
879        with self.host.open("CONTENTS", "rb") as file2:
880            pass
881        assert file1._session is file2._session
882
883    def test_names_with_spaces(self):
884        # Test if directories and files with spaces in their names
885        # can be used.
886        host = self.host
887        assert host.path.isdir("dir with spaces")
888        assert (host.listdir("dir with spaces") ==
889                ["second dir", "some file", "some_file"])
890        assert host.path.isdir("dir with spaces/second dir")
891        assert host.path.isfile("dir with spaces/some_file")
892        assert host.path.isfile("dir with spaces/some file")
893
894    def test_synchronize_times_without_write_access(self):
895        """Test failing synchronization because of non-writable directory."""
896        host = self.host
897        # This isn't writable by the ftp account the tests are run under.
898        host.chdir("rootdir1")
899        with pytest.raises(ftputil.error.TimeShiftError):
900            host.synchronize_times()
901
902    def test_listdir_with_non_ascii_byte_string(self):
903        """
904        `listdir` should accept byte strings with non-ASCII
905        characters and return non-ASCII characters in directory or
906        file names.
907        """
908        host = self.host
909        path = "äbc".encode("UTF-8")
910        names = host.listdir(path)
911        assert names[0] == b"file1"
912        assert names[1] == "file1_ö".encode("UTF-8")
913
914    def test_listdir_with_non_ascii_unicode_string(self):
915        """
916        `listdir` should accept unicode strings with non-ASCII
917        characters and return non-ASCII characters in directory or
918        file names.
919        """
920        host = self.host
921        # `ftplib` under Python 3 only works correctly if the unicode
922        # strings are decoded from latin1. Under Python 2, ftputil
923        # is supposed to provide a compatible interface.
924        path = "äbc".encode("UTF-8").decode("latin1")
925        names = host.listdir(path)
926        assert names[0] == "file1"
927        assert names[1] == "file1_ö".encode("UTF-8").decode("latin1")
928
929    def test_path_with_non_latin1_unicode_string(self):
930        """
931        ftputil operations shouldn't accept file paths with non-latin1
932        characters.
933        """
934        # Use some musical symbols. These are certainly not latin1.
935        path = "𝄞𝄢"
936        # `UnicodeEncodeError` is also the exception that `ftplib`
937        # raises if it gets a non-latin1 path.
938        with pytest.raises(UnicodeEncodeError):
939            self.host.mkdir(path)
940
941    def test_list_a_option(self):
942        # For this test to pass, the server must _not_ list "hidden"
943        # files by default but instead only when the `LIST` `-a`
944        # option is used.
945        host = self.host
946        assert host.use_list_a_option
947        directory_entries = host.listdir(host.curdir)
948        assert ".hidden" in directory_entries
949        host.use_list_a_option = False
950        directory_entries = host.listdir(host.curdir)
951        assert ".hidden" not in directory_entries
952
953    def _make_objects_to_be_garbage_collected(self):
954        for _ in range(10):
955            with ftputil.FTPHost(*self.login_data) as host:
956                for _ in range(10):
957                    unused_stat_result = host.stat("CONTENTS")
958                    with host.open("CONTENTS") as fobj:
959                        unused_data = fobj.read()
960
961    def test_garbage_collection(self):
962        """Test whether there are cycles which prevent garbage collection."""
963        gc.collect()
964        objects_before_test = len(gc.garbage)
965        self._make_objects_to_be_garbage_collected()
966        gc.collect()
967        objects_after_test = len(gc.garbage)
968        assert not objects_after_test - objects_before_test
969
970    @pytest.mark.skipif(
971      ftputil.compat.python_version > 2,
972      reason="test requires M2Crypto which only works on Python 2")
973    def test_m2crypto_session(self):
974        """
975        Test if a session with `M2Crypto.ftpslib.FTP_TLS` is set up
976        correctly and works with unicode input.
977        """
978        # See ticket #78.
979        #
980        # M2Crypto is only available for Python 2.
981        import M2Crypto
982        factory = ftputil.session.session_factory(
983                    base_class=M2Crypto.ftpslib.FTP_TLS,
984                    encrypt_data_channel=True)
985        with ftputil.FTPHost(*self.login_data, session_factory=factory) as host:
986            # Test if unicode argument works.
987            files = host.listdir(".")
988        assert "CONTENTS" in files
Note: See TracBrowser for help on using the repository browser.