source: test/test_real_ftp.py @ 1494:30df847fea39

Last change on this file since 1494:30df847fea39 was 1494:30df847fea39, checked in by Stefan Schwarzer <sschwarzer@…>, 7 years ago
Added unit tests for `session_factory` helper. Most tests are done with a mock class, but there's also a test in `test_real_ftp.py` that will use `M2Crypto.ftpslib.FTP_TLS` to check if the workaround in the `Session` class in `ftputil.session.session_factory` works. Note that since M2Crypto isn't available for Python 3, the real FTP test is skipped for Python 3. (This requires the `unittest2` module or here the `unittest` in Python 2.7 and 3.3.)
File size: 32.1 KB
Line 
1# encoding: UTF-8
2
3# Copyright (C) 2003-2013, Stefan Schwarzer <sschwarzer@sschwarzer.net>
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7
8from __future__ import absolute_import
9from __future__ import unicode_literals
10
11import ftplib
12import functools
13import gc
14import operator
15import os
16import time
17import unittest
18import stat
19
20import ftputil.compat
21import ftputil.error
22import ftputil.file_transfer
23import ftputil.session
24import ftputil.stat_cache
25
26import test
27
28
29def utc_local_time_shift():
30    """
31    Return the expected time shift in seconds assuming the server
32    uses UTC in its listings and the client uses local time.
33
34    This is needed because Pure-FTPd meanwhile seems to insist that
35    the displayed time for files is in UTC.
36    """
37    utc_tuple = time.gmtime()
38    localtime_tuple = time.localtime()
39    # To calculate the correct times shift, we need to ignore the
40    # DST component in the localtime tuple, i. e. set it to 0.
41    localtime_tuple = localtime_tuple[:-1] + (0,)
42    time_shift_in_seconds = (time.mktime(utc_tuple) -
43                             time.mktime(localtime_tuple))
44    # To be safe, round the above value to units of 3600 s (1 hour).
45    return round(time_shift_in_seconds / 3600.0) * 3600
46
47# Difference between local times of server and client. If 0.0, server
48# and client use the same timezone.
49#EXPECTED_TIME_SHIFT = utc_local_time_shift()
50# Pure-FTPd seems to have changed its mind (see docstring of
51# `utc_local_time_shift`).
52EXPECTED_TIME_SHIFT = 0.0
53
54
55class Cleaner(object):
56    """
57    This class helps remove directories and files which might
58    otherwise be left behind if a test fails in unexpected ways.
59    """
60
61    def __init__(self, host):
62        # The test class (probably `RealFTPTest`) and the helper
63        # class share the same `FTPHost` object.
64        self._host = host
65        self._ftp_items = []
66
67    def add_dir(self, path):
68        """Schedule a directory with path `path` for removal."""
69        self._ftp_items.append(("d", self._host.path.abspath(path)))
70
71    def add_file(self, path):
72        """Schedule a file with path `path` for removal."""
73        self._ftp_items.append(("f", self._host.path.abspath(path)))
74
75    def clean(self):
76        """
77        Remove the directories and files previously remembered.
78        The removal works in reverse order of the scheduling with
79        `add_dir` and `add_file`.
80
81        Errors due to a removal are ignored.
82        """
83        self._host.chdir("/")
84        for type_, path in reversed(self._ftp_items):
85            try:
86                if type_ == "d":
87                    # If something goes wrong in `rmtree` we might
88                    # leave a mess behind.
89                    self._host.rmtree(path)
90                elif type_ == "f":
91                    # Minor mess if `remove` fails
92                    self._host.remove(path)
93            except ftputil.error.FTPError:
94                pass
95
96
97class RealFTPTest(unittest.TestCase):
98
99    def setUp(self):
100        # Server, username, password.
101        self.login_data = ("localhost", "ftptest",
102                            "d605581757de5eb56d568a4419f4126e")
103        self.host = ftputil.FTPHost(*self.login_data)
104        self.cleaner = Cleaner(self.host)
105
106    def tearDown(self):
107        self.cleaner.clean()
108        self.host.close()
109
110    #
111    # Helper methods
112    #
113    def make_remote_file(self, path):
114        """Create a file on the FTP host."""
115        self.cleaner.add_file(path)
116        with self.host.open(path, "wb") as file_:
117            # Write something. Otherwise the FTP server might not update
118            # the time of last modification if the file existed before.
119            file_.write(b"\n")
120
121    def make_local_file(self):
122        """Create a file on the local host (= on the client side)."""
123        with open("_local_file_", "wb") as fobj:
124            fobj.write(b"abc\x12\x34def\t")
125
126
127class TestMkdir(RealFTPTest):
128
129    def test_mkdir_rmdir(self):
130        host = self.host
131        dir_name = "_testdir_"
132        file_name = host.path.join(dir_name, "_nonempty_")
133        self.cleaner.add_dir(dir_name)
134        # Make dir and check if the directory is there.
135        host.mkdir(dir_name)
136        files = host.listdir(host.curdir)
137        self.assertTrue(dir_name in files)
138        # Try to remove a non-empty directory.
139        self.cleaner.add_file(file_name)
140        non_empty = host.open(file_name, "w")
141        non_empty.close()
142        self.assertRaises(ftputil.error.PermanentError, host.rmdir, dir_name)
143        # Remove file.
144        host.unlink(file_name)
145        # `remove` on a directory should fail.
146        try:
147            try:
148                host.remove(dir_name)
149            except ftputil.error.PermanentError as exc:
150                self.assertTrue(str(exc).startswith(
151                                "remove/unlink can only delete files"))
152            else:
153                self.assertTrue(False, "we shouldn't have come here")
154        finally:
155            # Delete empty directory.
156            host.rmdir(dir_name)
157        files = host.listdir(host.curdir)
158        self.assertTrue(dir_name not in files)
159
160    def test_makedirs_without_existing_dirs(self):
161        host = self.host
162        # No `_dir1_` yet
163        self.assertFalse("_dir1_" in host.listdir(host.curdir))
164        # Vanilla case, all should go well.
165        host.makedirs("_dir1_/dir2/dir3/dir4")
166        self.cleaner.add_dir("_dir1_")
167        # Check host.
168        self.assertTrue(host.path.isdir("_dir1_"))
169        self.assertTrue(host.path.isdir("_dir1_/dir2"))
170        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3"))
171        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3/dir4"))
172
173    def test_makedirs_from_non_root_directory(self):
174        # This is a testcase for issue #22, see
175        # http://ftputil.sschwarzer.net/trac/ticket/22 .
176        host = self.host
177        # No `_dir1_` and `_dir2_` yet
178        self.assertFalse("_dir1_" in host.listdir(host.curdir))
179        self.assertFalse("_dir2_" in host.listdir(host.curdir))
180        # Part 1: Try to make directories starting from `_dir1_` and
181        # change to non-root directory.
182        self.cleaner.add_dir("_dir1_")
183        host.mkdir("_dir1_")
184        host.chdir("_dir1_")
185        host.makedirs("_dir2_/_dir3_")
186        # Test for expected directory hierarchy.
187        self.assertTrue (host.path.isdir("/_dir1_"))
188        self.assertTrue (host.path.isdir("/_dir1_/_dir2_"))
189        self.assertTrue (host.path.isdir("/_dir1_/_dir2_/_dir3_"))
190        self.assertFalse(host.path.isdir("/_dir1_/_dir1_"))
191        # Remove all but the directory we're in.
192        host.rmdir("/_dir1_/_dir2_/_dir3_")
193        host.rmdir("/_dir1_/_dir2_")
194        # Part 2: Try to make directories starting from root.
195        self.cleaner.add_dir("/_dir2_")
196        host.makedirs("/_dir2_/_dir3_")
197        # Test for expected directory hierarchy
198        self.assertTrue (host.path.isdir("/_dir2_"))
199        self.assertTrue (host.path.isdir("/_dir2_/_dir3_"))
200        self.assertFalse(host.path.isdir("/_dir1_/_dir2_"))
201
202    def test_makedirs_of_existing_directory(self):
203        host = self.host
204        # The (chrooted) login directory
205        host.makedirs("/")
206
207    def test_makedirs_with_file_in_the_way(self):
208        host = self.host
209        self.cleaner.add_dir("_dir1_")
210        host.mkdir("_dir1_")
211        self.make_remote_file("_dir1_/file1")
212        # Try it.
213        self.assertRaises(ftputil.error.PermanentError,
214                          host.makedirs, "_dir1_/file1")
215        self.assertRaises(ftputil.error.PermanentError,
216                          host.makedirs, "_dir1_/file1/dir2")
217
218    def test_makedirs_with_existing_directory(self):
219        host = self.host
220        self.cleaner.add_dir("_dir1_")
221        host.mkdir("_dir1_")
222        host.makedirs("_dir1_/dir2")
223        # Check
224        self.assertTrue(host.path.isdir("_dir1_"))
225        self.assertTrue(host.path.isdir("_dir1_/dir2"))
226
227    def test_makedirs_in_non_writable_directory(self):
228        host = self.host
229        # Preparation: `rootdir1` exists but is only writable by root.
230        self.assertRaises(ftputil.error.PermanentError, host.makedirs,
231                          "rootdir1/dir2")
232
233    def test_makedirs_with_writable_directory_at_end(self):
234        host = self.host
235        self.cleaner.add_dir("rootdir2/dir2")
236        # Preparation: `rootdir2` exists but is only writable by root.
237        # `dir2` is writable by regular ftp user. These both should work.
238        host.makedirs("rootdir2/dir2")
239        host.makedirs("rootdir2/dir2/dir3")
240
241
242class TestRemoval(RealFTPTest):
243
244    def test_rmtree_without_error_handler(self):
245        host = self.host
246        # Build a tree.
247        self.cleaner.add_dir("_dir1_")
248        host.makedirs("_dir1_/dir2")
249        self.make_remote_file("_dir1_/file1")
250        self.make_remote_file("_dir1_/file2")
251        self.make_remote_file("_dir1_/dir2/file3")
252        self.make_remote_file("_dir1_/dir2/file4")
253        # Try to remove a _file_ with `rmtree`.
254        self.assertRaises(ftputil.error.PermanentError, host.rmtree,
255                          "_dir1_/file2")
256        # Remove `dir2`.
257        host.rmtree("_dir1_/dir2")
258        self.assertFalse(host.path.exists("_dir1_/dir2"))
259        self.assertTrue (host.path.exists("_dir1_/file2"))
260        # Re-create `dir2` and remove `_dir1_`.
261        host.mkdir("_dir1_/dir2")
262        self.make_remote_file("_dir1_/dir2/file3")
263        self.make_remote_file("_dir1_/dir2/file4")
264        host.rmtree("_dir1_")
265        self.assertFalse(host.path.exists("_dir1_"))
266
267    def test_rmtree_with_error_handler(self):
268        host = self.host
269        self.cleaner.add_dir("_dir1_")
270        host.mkdir("_dir1_")
271        self.make_remote_file("_dir1_/file1")
272        # Prepare error "handler"
273        log = []
274        def error_handler(*args):
275            log.append(args)
276        # Try to remove a file as root "directory".
277        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
278        self.assertEqual(log, [])
279        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
280        self.assertEqual(log[0][0], host.listdir)
281        self.assertEqual(log[0][1], "_dir1_/file1")
282        self.assertEqual(log[1][0], host.rmdir)
283        self.assertEqual(log[1][1], "_dir1_/file1")
284        host.rmtree("_dir1_")
285        # Try to remove a non-existent directory.
286        del log[:]
287        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
288        self.assertEqual(log[0][0], host.listdir)
289        self.assertEqual(log[0][1], "_dir1_")
290        self.assertEqual(log[1][0], host.rmdir)
291        self.assertEqual(log[1][1], "_dir1_")
292
293    def test_remove_non_existent_item(self):
294        host = self.host
295        self.assertRaises(ftputil.error.PermanentError, host.remove,
296                          "nonexistent")
297
298    def test_remove_existing_file(self):
299        self.cleaner.add_file("_testfile_")
300        self.make_remote_file("_testfile_")
301        host = self.host
302        self.assertTrue(host.path.isfile("_testfile_"))
303        host.remove("_testfile_")
304        self.assertFalse(host.path.exists("_testfile_"))
305
306
307class TestWalk(RealFTPTest):
308
309    def test_walk_topdown(self):
310        # Preparation: build tree in directory `walk_test`.
311        host = self.host
312        expected = [
313          ("walk_test",
314           ["dir1", "dir2", "dir3"],
315           ["file4"]),
316
317          ("walk_test/dir1",
318           ["dir11", "dir12"],
319           []),
320
321          ("walk_test/dir1/dir11",
322           [],
323           []),
324
325          ("walk_test/dir1/dir12",
326           ["dir123"],
327           ["file121", "file122"]),
328
329          ("walk_test/dir1/dir12/dir123",
330           [],
331           ["file1234"]),
332
333          ("walk_test/dir2",
334           [],
335           []),
336
337          ("walk_test/dir3",
338           ["dir33"],
339           ["file31", "file32"]),
340
341          ("walk_test/dir3/dir33",
342           [],
343           []),
344          ]
345        # Collect data using `walk`.
346        actual = []
347        for items in host.walk("walk_test"):
348            actual.append(items)
349        # Compare with expected results.
350        self.assertEqual(len(actual), len(expected))
351        for index, _ in enumerate(actual):
352            self.assertEqual(actual[index], expected[index])
353
354    def test_walk_depth_first(self):
355        # Preparation: build tree in directory `walk_test`
356        host = self.host
357        expected = [
358          ("walk_test/dir1/dir11",
359           [],
360           []),
361
362          ("walk_test/dir1/dir12/dir123",
363           [],
364           ["file1234"]),
365
366          ("walk_test/dir1/dir12",
367           ["dir123"],
368           ["file121", "file122"]),
369
370          ("walk_test/dir1",
371           ["dir11", "dir12"],
372           []),
373
374          ("walk_test/dir2",
375           [],
376           []),
377
378          ("walk_test/dir3/dir33",
379           [],
380           []),
381
382          ("walk_test/dir3",
383           ["dir33"],
384           ["file31", "file32"]),
385
386          ("walk_test",
387           ["dir1", "dir2", "dir3"],
388           ["file4"])
389          ]
390        # Collect data using `walk`.
391        actual = []
392        for items in host.walk("walk_test", topdown=False):
393            actual.append(items)
394        # Compare with expected results.
395        self.assertEqual(len(actual), len(expected))
396        for index, _ in enumerate(actual):
397            self.assertEqual(actual[index], expected[index])
398
399
400class TestRename(RealFTPTest):
401
402    def test_rename(self):
403        host = self.host
404        # Make sure the target of the renaming operation is removed.
405        self.cleaner.add_file("_testfile2_")
406        self.make_remote_file("_testfile1_")
407        host.rename("_testfile1_", "_testfile2_")
408        self.assertFalse(host.path.exists("_testfile1_"))
409        self.assertTrue (host.path.exists("_testfile2_"))
410
411    def test_rename_with_spaces_in_directory(self):
412        host = self.host
413        dir_name = "_dir with spaces_"
414        self.cleaner.add_dir(dir_name)
415        host.mkdir(dir_name)
416        self.make_remote_file(dir_name + "/testfile1")
417        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
418        self.assertFalse(host.path.exists(dir_name + "/testfile1"))
419        self.assertTrue (host.path.exists(dir_name + "/testfile2"))
420
421
422class TestStat(RealFTPTest):
423
424    def test_stat(self):
425        host = self.host
426        dir_name = "_testdir_"
427        file_name = host.path.join(dir_name, "_nonempty_")
428        # Make a directory and a file in it.
429        self.cleaner.add_dir(dir_name)
430        host.mkdir(dir_name)
431        with host.open(file_name, "wb") as fobj:
432            fobj.write(b"abc\x12\x34def\t")
433        # Do some stats
434        # - dir
435        dir_stat = host.stat(dir_name)
436        self.assertTrue(isinstance(dir_stat._st_name,
437                                   ftputil.compat.unicode_type))
438        self.assertEqual(host.listdir(dir_name), ["_nonempty_"])
439        self.assertTrue (host.path.isdir (dir_name))
440        self.assertFalse(host.path.isfile(dir_name))
441        self.assertFalse(host.path.islink(dir_name))
442        # - file
443        file_stat = host.stat(file_name)
444        self.assertTrue(isinstance(file_stat._st_name,
445                                   ftputil.compat.unicode_type))
446        self.assertFalse(host.path.isdir (file_name))
447        self.assertTrue (host.path.isfile(file_name))
448        self.assertFalse(host.path.islink(file_name))
449        self.assertEqual(host.path.getsize(file_name), 9)
450        # - file's modification time; allow up to two minutes difference
451        host.synchronize_times()
452        server_mtime = host.path.getmtime(file_name)
453        client_mtime = time.mktime(time.localtime())
454        calculated_time_shift = server_mtime - client_mtime
455        self.assertFalse(abs(calculated_time_shift-host.time_shift()) > 120)
456
457    def test_issomething_for_nonexistent_directory(self):
458        host = self.host
459        # Check if we get the right results if even the containing
460        # directory doesn't exist (see ticket #66).
461        nonexistent_path = "/nonexistent/nonexistent"
462        self.assertFalse(host.path.isdir (nonexistent_path))
463        self.assertFalse(host.path.isfile(nonexistent_path))
464        self.assertFalse(host.path.islink(nonexistent_path))
465
466    def test_special_broken_link(self):
467        # Test for ticket #39.
468        host = self.host
469        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
470        self.assertEqual(host.lstat(broken_link_name)._st_target,
471                         "../nonexistent/nonexistent")
472        self.assertFalse(host.path.isdir (broken_link_name))
473        self.assertFalse(host.path.isfile(broken_link_name))
474        self.assertTrue (host.path.islink(broken_link_name))
475
476    def test_concurrent_access(self):
477        self.make_remote_file("_testfile_")
478        with ftputil.FTPHost(*self.login_data) as host1:
479            with ftputil.FTPHost(*self.login_data) as host2:
480                stat_result1 = host1.stat("_testfile_")
481                stat_result2 = host2.stat("_testfile_")
482                self.assertEqual(stat_result1, stat_result2)
483                host2.remove("_testfile_")
484                # Can still get the result via `host1`
485                stat_result1 = host1.stat("_testfile_")
486                self.assertEqual(stat_result1, stat_result2)
487                # Stat'ing on `host2` gives an exception.
488                self.assertRaises(ftputil.error.PermanentError,
489                                  host2.stat, "_testfile_")
490                # Stat'ing on `host1` after invalidation
491                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
492                host1.stat_cache.invalidate(absolute_path)
493                self.assertRaises(ftputil.error.PermanentError,
494                                  host1.stat, "_testfile_")
495
496    def test_cache_auto_resizing(self):
497        """Test if the cache is resized appropriately."""
498        host = self.host
499        cache = host.stat_cache._cache
500        # Make sure the cache size isn't adjusted towards smaller values.
501        unused_entries = host.listdir("walk_test")
502        self.assertEqual(cache.size,
503                         ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE)
504        # Make the cache very small initially and see if it gets resized.
505        cache.size = 2
506        entries = host.listdir("walk_test")
507        # The adjusted cache size should be larger or equal to the
508        # number of items in `walk_test` and its parent directory. The
509        # latter is read implicitly upon `listdir`'s `isdir` call.
510        expected_min_cache_size = max(len(host.listdir(host.curdir)),
511                                      len(entries))
512        self.assertTrue(cache.size >= expected_min_cache_size)
513
514
515class TestUploadAndDownload(RealFTPTest):
516    """Test upload and download (including time shift test)."""
517
518    def test_time_shift(self):
519        self.host.synchronize_times()
520        self.assertEqual(self.host.time_shift(), EXPECTED_TIME_SHIFT)
521
522    @test.skip_long_running_test
523    def test_upload(self):
524        host = self.host
525        host.synchronize_times()
526        local_file = "_local_file_"
527        remote_file = "_remote_file_"
528        # Make local file to upload.
529        self.make_local_file()
530        # Wait, else small time differences between client and server
531        # actually could trigger the update.
532        time.sleep(65)
533        try:
534            self.cleaner.add_file(remote_file)
535            host.upload(local_file, remote_file)
536            # Retry; shouldn't be uploaded
537            uploaded = host.upload_if_newer(local_file, remote_file)
538            self.assertEqual(uploaded, False)
539            # Rewrite the local file.
540            self.make_local_file()
541            # Retry; should be uploaded now
542            uploaded = host.upload_if_newer(local_file, remote_file)
543            self.assertEqual(uploaded, True)
544        finally:
545            # Clean up
546            os.unlink(local_file)
547
548    @test.skip_long_running_test
549    def test_download(self):
550        host = self.host
551        host.synchronize_times()
552        local_file = "_local_file_"
553        remote_file = "_remote_file_"
554        # Make a remote file.
555        self.make_remote_file(remote_file)
556        # File should be downloaded as it's not present yet.
557        downloaded = host.download_if_newer(remote_file, local_file)
558        self.assertEqual(downloaded, True)
559        try:
560            # If the remote file, taking the datetime precision into
561            # account, _might_ be newer, the file will be downloaded
562            # again. To prevent this, wait a bit over a minute (the
563            # remote precision), then "touch" the local file.
564            time.sleep(65)
565            # Create empty file.
566            with open(local_file, "w") as fobj:
567                pass
568            # Local file is present and newer, so shouldn't download.
569            downloaded = host.download_if_newer(remote_file, local_file)
570            self.assertEqual(downloaded, False)
571            # Re-make the remote file.
572            self.make_remote_file(remote_file)
573            # Local file is present but possibly older (taking the
574            # possible deviation because of the precision into account),
575            # so should download.
576            downloaded = host.download_if_newer(remote_file, local_file)
577            self.assertEqual(downloaded, True)
578        finally:
579            # Clean up.
580            os.unlink(local_file)
581
582    def test_callback_with_transfer(self):
583        host = self.host
584        FILE_NAME = "debian-keyring.tar.gz"
585        # Default chunk size as in `FTPHost.copyfileobj`
586        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
587        file_size = host.path.getsize(FILE_NAME)
588        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
589        # Add one chunk for remainder.
590        chunk_count += 1
591        # Define a callback that just collects all data passed to it.
592        transferred_chunks_list = []
593        def test_callback(chunk):
594            transferred_chunks_list.append(chunk)
595        try:
596            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
597            # Construct a list of data chunks we expect.
598            expected_chunks_list = []
599            with open(FILE_NAME, "rb") as downloaded_fobj:
600                while True:
601                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
602                    if not chunk:
603                        break
604                    expected_chunks_list.append(chunk)
605            # Examine data collected by callback function.
606            self.assertEqual(len(transferred_chunks_list), chunk_count)
607            self.assertEqual(transferred_chunks_list, expected_chunks_list)
608        finally:
609            os.unlink(FILE_NAME)
610
611
612class TestFTPFiles(RealFTPTest):
613
614    def test_only_closed_children(self):
615        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
616        host = self.host
617        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
618            # Create empty file and close it.
619            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
620                pass
621            # This should re-use the second child because the first isn't
622            # closed but the second is.
623            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
624                self.assertEqual(len(host._children), 2)
625                self.assertTrue(file_obj._host is host._children[1])
626
627    def test_no_timed_out_children(self):
628        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
629        host = self.host
630        # Implicitly create child host object.
631        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
632            pass
633        # Monkey-patch file to simulate an FTP server timeout below.
634        def timed_out_pwd():
635            raise ftplib.error_temp("simulated timeout")
636        file_obj1._host._session.pwd = timed_out_pwd
637        # Try to get a file - which shouldn't be the timed-out file.
638        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
639            self.assertTrue(file_obj1 is not file_obj2)
640        # Re-use closed and not timed-out child session.
641        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
642            pass
643        self.assertTrue(file_obj2 is file_obj3)
644
645
646class TestChmod(RealFTPTest):
647
648    def assert_mode(self, path, expected_mode):
649        """
650        Return an integer containing the allowed bits in the mode
651        change command.
652
653        The `FTPHost` object to test against is `self.host`.
654        """
655        full_mode = self.host.stat(path).st_mode
656        # Remove flags we can't set via `chmod`.
657        # Allowed flags according to Python documentation
658        # http://docs.python.org/lib/os-file-dir.html .
659        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
660          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
661          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
662          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
663          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
664        allowed_mask = functools.reduce(operator.or_, allowed_flags)
665        mode = full_mode & allowed_mask
666        self.assertEqual(mode, expected_mode,
667                         "mode {0:o} != {1:o}".format(mode, expected_mode))
668
669    def test_chmod_existing_directory(self):
670        host = self.host
671        host.mkdir("_test dir_")
672        self.cleaner.add_dir("_test dir_")
673        # Set/get mode of a directory.
674        host.chmod("_test dir_", 0o757)
675        self.assert_mode("_test dir_", 0o757)
676        # Set/get mode in nested directory.
677        host.mkdir("_test dir_/nested_dir")
678        self.cleaner.add_dir("_test dir_/nested_dir")
679        host.chmod("_test dir_/nested_dir", 0o757)
680        self.assert_mode("_test dir_/nested_dir", 0o757)
681
682    def test_chmod_existing_file(self):
683        host = self.host
684        host.mkdir("_test dir_")
685        self.cleaner.add_dir("_test dir_")
686        # Set/get mode on a file.
687        file_name = host.path.join("_test dir_", "_testfile_")
688        self.make_remote_file(file_name)
689        host.chmod(file_name, 0o646)
690        self.assert_mode(file_name, 0o646)
691
692    def test_chmod_nonexistent_path(self):
693        # Set/get mode of a non-existing item.
694        self.assertRaises(ftputil.error.PermanentError, self.host.chmod,
695                          "nonexistent", 0o757)
696
697    def test_cache_invalidation(self):
698        host = self.host
699        host.mkdir("_test dir_")
700        self.cleaner.add_dir("_test dir_")
701        # Make sure the mode is in the cache.
702        unused_stat_result = host.stat("_test dir_")
703        # Set/get mode of the directory.
704        host.chmod("_test dir_", 0o757)
705        self.assert_mode("_test dir_", 0o757)
706        # Set/get mode on a file.
707        file_name = host.path.join("_test dir_", "_testfile_")
708        self.make_remote_file(file_name)
709        # Make sure the mode is in the cache.
710        unused_stat_result = host.stat(file_name)
711        host.chmod(file_name, 0o646)
712        self.assert_mode(file_name, 0o646)
713
714
715class TestOther(RealFTPTest):
716
717    def test_open_for_reading(self):
718        # Test for issues #17 and #51,
719        # http://ftputil.sschwarzer.net/trac/ticket/17 and
720        # http://ftputil.sschwarzer.net/trac/ticket/51 .
721        file1 = self.host.open("debian-keyring.tar.gz", "rb")
722        time.sleep(1)
723        # Depending on the FTP server, this might return a status code
724        # unexpected by `ftplib` or block the socket connection until
725        # a server-side timeout.
726        file1.close()
727
728    def test_subsequent_reading(self):
729        # Opening a file for reading
730        with self.host.open("debian-keyring.tar.gz", "rb") as file1:
731            pass
732        # Make sure that there are no problems if the connection is reused.
733        with self.host.open("debian-keyring.tar.gz", "rb") as file2:
734            pass
735        self.assertTrue(file1._session is file2._session)
736
737    def test_names_with_spaces(self):
738        # Test if directories and files with spaces in their names
739        # can be used.
740        host = self.host
741        self.assertTrue(host.path.isdir("dir with spaces"))
742        self.assertEqual(host.listdir("dir with spaces"),
743                         ["second dir", "some file", "some_file"])
744        self.assertTrue(host.path.isdir ("dir with spaces/second dir"))
745        self.assertTrue(host.path.isfile("dir with spaces/some_file"))
746        self.assertTrue(host.path.isfile("dir with spaces/some file"))
747
748    def test_synchronize_times_without_write_access(self):
749        """Test failing synchronization because of non-writable directory."""
750        host = self.host
751        # This isn't writable by the ftp account the tests are run under.
752        host.chdir("rootdir1")
753        self.assertRaises(ftputil.error.TimeShiftError, host.synchronize_times)
754
755    def test_bytes_file_name(self):
756        """
757        Test whether a UTF-8 file name can be sent and retrieved when
758        encoded explicitly.
759        """
760        # This test below would fail under Python 2 and I have no idea
761        # how to make it work there without modifying `ftplib`.
762        if ftputil.compat.python_version == 2:
763            return
764        #
765        host = self.host
766        # This requires an existing file with an UTF-8 encoded name on
767        # the remote file system. Note: If the remote file system
768        # doesn't use UTF-8, the test will probably succeed anyway.
769        FILE_NAME = "_ütf8_file_näme_♯♭_"
770        bytes_file_name = FILE_NAME.encode("UTF-8")
771        self.cleaner.add_file(bytes_file_name)
772        # Write under name `bytes_file_name`
773        with host.open(bytes_file_name, "w", encoding="UTF-8") as fobj:
774            fobj.write(FILE_NAME)
775        # Try to retrieve file with `listdir`.
776        items = host.listdir(b".")
777        self.assertTrue(bytes_file_name in items)
778        #  When getting a directory listing for a unicode directory,
779        #  ftputil will implicitly assume the encoding is latin-1 and
780        #  won't decode the file name to something different from
781        #  `bytes_file_name`.
782        items = host.listdir(".")
783        self.assertFalse(FILE_NAME in items)
784        # Re-open file.
785        with host.open(bytes_file_name, "r", encoding="UTF-8") as fobj:
786            data = fobj.read()
787        # Not the point of this test, but an additional sanity check
788        self.assertEqual(data, FILE_NAME)
789
790    def test_list_a_option(self):
791        # For this test to pass, the server must _not_ list "hidden"
792        # files by default but instead only when the `LIST` `-a`
793        # option is used.
794        host = self.host
795        self.assertTrue(host.use_list_a_option)
796        directory_entries = host.listdir(host.curdir)
797        self.assertTrue(".hidden" in directory_entries)
798        host.use_list_a_option = False
799        directory_entries = host.listdir(host.curdir)
800        self.assertFalse(".hidden" in directory_entries)
801
802    def _make_objects_to_be_garbage_collected(self):
803        for _ in range(10):
804            with ftputil.FTPHost(*self.login_data) as host:
805                for _ in range(10):
806                    unused_stat_result = host.stat("CONTENTS")
807                    with host.open("CONTENTS") as fobj:
808                        unused_data = fobj.read()
809
810    def test_garbage_collection(self):
811        """Test whether there are cycles which prevent garbage collection."""
812        gc.collect()
813        objects_before_test = len(gc.garbage)
814        self._make_objects_to_be_garbage_collected()
815        gc.collect()
816        objects_after_test = len(gc.garbage)
817        self.assertFalse(objects_after_test - objects_before_test)
818
819    @unittest.skipIf(ftputil.compat.python_version > 2,
820                     "test requires M2Crypto which only works on Python 2")
821    def test_m2crypto_session(self):
822        """
823        Test if a session with `M2Crypto.ftpslib.FTP_TLS` is set up
824        correctly and works with unicode input.
825        """
826        # See ticket #78.
827        #
828        # M2Crypto is only available for Python 2.
829        import M2Crypto
830        factory = ftputil.session.session_factory(
831                    base_class=M2Crypto.ftpslib.FTP_TLS,
832                    encrypt_data_channel=True)
833        with ftputil.FTPHost(*self.login_data, session_factory=factory) as host:
834            # Test if unicode argument works.
835            files = host.listdir(".")
836        self.assertTrue("CONTENTS" in files)
837
838
839
840if __name__ == "__main__":
841    print("""\
842Test real FTP access.
843
844This test writes some files and directories on the local client and
845the remote server. You'll need write access in the login directory.
846This test can take a few minutes because it has to wait to test the
847timezone calculation.
848    """)
849    unittest.main()
Note: See TracBrowser for help on using the repository browser.