source: test/test_real_ftp.py @ 1506:f40b9f6738d0

Last change on this file since 1506:f40b9f6738d0 was 1506:f40b9f6738d0, checked in by Stefan Schwarzer <sschwarzer@…>, 7 years ago
Added support for `followlinks` in `FTPHost.walk` (ticket #73).
File size: 33.0 KB
Line 
1# encoding: UTF-8
2
3# Copyright (C) 2003-2013, Stefan Schwarzer <sschwarzer@sschwarzer.net>
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7
8from __future__ import absolute_import
9from __future__ import unicode_literals
10
11import ftplib
12import functools
13import gc
14import operator
15import os
16import time
17import unittest
18import stat
19
20import ftputil.compat
21import ftputil.error
22import ftputil.file_transfer
23import ftputil.session
24import ftputil.stat_cache
25
26import test
27
28
29def utc_local_time_shift():
30    """
31    Return the expected time shift in seconds assuming the server
32    uses UTC in its listings and the client uses local time.
33
34    This is needed because Pure-FTPd meanwhile seems to insist that
35    the displayed time for files is in UTC.
36    """
37    utc_tuple = time.gmtime()
38    localtime_tuple = time.localtime()
39    # To calculate the correct times shift, we need to ignore the
40    # DST component in the localtime tuple, i. e. set it to 0.
41    localtime_tuple = localtime_tuple[:-1] + (0,)
42    time_shift_in_seconds = (time.mktime(utc_tuple) -
43                             time.mktime(localtime_tuple))
44    # To be safe, round the above value to units of 3600 s (1 hour).
45    return round(time_shift_in_seconds / 3600.0) * 3600
46
47# Difference between local times of server and client. If 0.0, server
48# and client use the same timezone.
49#EXPECTED_TIME_SHIFT = utc_local_time_shift()
50# Pure-FTPd seems to have changed its mind (see docstring of
51# `utc_local_time_shift`).
52EXPECTED_TIME_SHIFT = 0.0
53
54
55class Cleaner(object):
56    """
57    This class helps remove directories and files which might
58    otherwise be left behind if a test fails in unexpected ways.
59    """
60
61    def __init__(self, host):
62        # The test class (probably `RealFTPTest`) and the helper
63        # class share the same `FTPHost` object.
64        self._host = host
65        self._ftp_items = []
66
67    def add_dir(self, path):
68        """Schedule a directory with path `path` for removal."""
69        self._ftp_items.append(("d", self._host.path.abspath(path)))
70
71    def add_file(self, path):
72        """Schedule a file with path `path` for removal."""
73        self._ftp_items.append(("f", self._host.path.abspath(path)))
74
75    def clean(self):
76        """
77        Remove the directories and files previously remembered.
78        The removal works in reverse order of the scheduling with
79        `add_dir` and `add_file`.
80
81        Errors due to a removal are ignored.
82        """
83        self._host.chdir("/")
84        for type_, path in reversed(self._ftp_items):
85            try:
86                if type_ == "d":
87                    # If something goes wrong in `rmtree` we might
88                    # leave a mess behind.
89                    self._host.rmtree(path)
90                elif type_ == "f":
91                    # Minor mess if `remove` fails
92                    self._host.remove(path)
93            except ftputil.error.FTPError:
94                pass
95
96
97class RealFTPTest(unittest.TestCase):
98
99    def setUp(self):
100        # Server, username, password.
101        self.login_data = ("localhost", "ftptest",
102                            "d605581757de5eb56d568a4419f4126e")
103        self.host = ftputil.FTPHost(*self.login_data)
104        self.cleaner = Cleaner(self.host)
105
106    def tearDown(self):
107        self.cleaner.clean()
108        self.host.close()
109
110    #
111    # Helper methods
112    #
113    def make_remote_file(self, path):
114        """Create a file on the FTP host."""
115        self.cleaner.add_file(path)
116        with self.host.open(path, "wb") as file_:
117            # Write something. Otherwise the FTP server might not update
118            # the time of last modification if the file existed before.
119            file_.write(b"\n")
120
121    def make_local_file(self):
122        """Create a file on the local host (= on the client side)."""
123        with open("_local_file_", "wb") as fobj:
124            fobj.write(b"abc\x12\x34def\t")
125
126
127class TestMkdir(RealFTPTest):
128
129    def test_mkdir_rmdir(self):
130        host = self.host
131        dir_name = "_testdir_"
132        file_name = host.path.join(dir_name, "_nonempty_")
133        self.cleaner.add_dir(dir_name)
134        # Make dir and check if the directory is there.
135        host.mkdir(dir_name)
136        files = host.listdir(host.curdir)
137        self.assertTrue(dir_name in files)
138        # Try to remove a non-empty directory.
139        self.cleaner.add_file(file_name)
140        non_empty = host.open(file_name, "w")
141        non_empty.close()
142        self.assertRaises(ftputil.error.PermanentError, host.rmdir, dir_name)
143        # Remove file.
144        host.unlink(file_name)
145        # `remove` on a directory should fail.
146        try:
147            try:
148                host.remove(dir_name)
149            except ftputil.error.PermanentError as exc:
150                self.assertTrue(str(exc).startswith(
151                                "remove/unlink can only delete files"))
152            else:
153                self.assertTrue(False, "we shouldn't have come here")
154        finally:
155            # Delete empty directory.
156            host.rmdir(dir_name)
157        files = host.listdir(host.curdir)
158        self.assertTrue(dir_name not in files)
159
160    def test_makedirs_without_existing_dirs(self):
161        host = self.host
162        # No `_dir1_` yet
163        self.assertFalse("_dir1_" in host.listdir(host.curdir))
164        # Vanilla case, all should go well.
165        host.makedirs("_dir1_/dir2/dir3/dir4")
166        self.cleaner.add_dir("_dir1_")
167        # Check host.
168        self.assertTrue(host.path.isdir("_dir1_"))
169        self.assertTrue(host.path.isdir("_dir1_/dir2"))
170        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3"))
171        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3/dir4"))
172
173    def test_makedirs_from_non_root_directory(self):
174        # This is a testcase for issue #22, see
175        # http://ftputil.sschwarzer.net/trac/ticket/22 .
176        host = self.host
177        # No `_dir1_` and `_dir2_` yet
178        self.assertFalse("_dir1_" in host.listdir(host.curdir))
179        self.assertFalse("_dir2_" in host.listdir(host.curdir))
180        # Part 1: Try to make directories starting from `_dir1_` and
181        # change to non-root directory.
182        self.cleaner.add_dir("_dir1_")
183        host.mkdir("_dir1_")
184        host.chdir("_dir1_")
185        host.makedirs("_dir2_/_dir3_")
186        # Test for expected directory hierarchy.
187        self.assertTrue (host.path.isdir("/_dir1_"))
188        self.assertTrue (host.path.isdir("/_dir1_/_dir2_"))
189        self.assertTrue (host.path.isdir("/_dir1_/_dir2_/_dir3_"))
190        self.assertFalse(host.path.isdir("/_dir1_/_dir1_"))
191        # Remove all but the directory we're in.
192        host.rmdir("/_dir1_/_dir2_/_dir3_")
193        host.rmdir("/_dir1_/_dir2_")
194        # Part 2: Try to make directories starting from root.
195        self.cleaner.add_dir("/_dir2_")
196        host.makedirs("/_dir2_/_dir3_")
197        # Test for expected directory hierarchy
198        self.assertTrue (host.path.isdir("/_dir2_"))
199        self.assertTrue (host.path.isdir("/_dir2_/_dir3_"))
200        self.assertFalse(host.path.isdir("/_dir1_/_dir2_"))
201
202    def test_makedirs_of_existing_directory(self):
203        host = self.host
204        # The (chrooted) login directory
205        host.makedirs("/")
206
207    def test_makedirs_with_file_in_the_way(self):
208        host = self.host
209        self.cleaner.add_dir("_dir1_")
210        host.mkdir("_dir1_")
211        self.make_remote_file("_dir1_/file1")
212        # Try it.
213        self.assertRaises(ftputil.error.PermanentError,
214                          host.makedirs, "_dir1_/file1")
215        self.assertRaises(ftputil.error.PermanentError,
216                          host.makedirs, "_dir1_/file1/dir2")
217
218    def test_makedirs_with_existing_directory(self):
219        host = self.host
220        self.cleaner.add_dir("_dir1_")
221        host.mkdir("_dir1_")
222        host.makedirs("_dir1_/dir2")
223        # Check
224        self.assertTrue(host.path.isdir("_dir1_"))
225        self.assertTrue(host.path.isdir("_dir1_/dir2"))
226
227    def test_makedirs_in_non_writable_directory(self):
228        host = self.host
229        # Preparation: `rootdir1` exists but is only writable by root.
230        self.assertRaises(ftputil.error.PermanentError, host.makedirs,
231                          "rootdir1/dir2")
232
233    def test_makedirs_with_writable_directory_at_end(self):
234        host = self.host
235        self.cleaner.add_dir("rootdir2/dir2")
236        # Preparation: `rootdir2` exists but is only writable by root.
237        # `dir2` is writable by regular ftp user. These both should work.
238        host.makedirs("rootdir2/dir2")
239        host.makedirs("rootdir2/dir2/dir3")
240
241
242class TestRemoval(RealFTPTest):
243
244    def test_rmtree_without_error_handler(self):
245        host = self.host
246        # Build a tree.
247        self.cleaner.add_dir("_dir1_")
248        host.makedirs("_dir1_/dir2")
249        self.make_remote_file("_dir1_/file1")
250        self.make_remote_file("_dir1_/file2")
251        self.make_remote_file("_dir1_/dir2/file3")
252        self.make_remote_file("_dir1_/dir2/file4")
253        # Try to remove a _file_ with `rmtree`.
254        self.assertRaises(ftputil.error.PermanentError, host.rmtree,
255                          "_dir1_/file2")
256        # Remove `dir2`.
257        host.rmtree("_dir1_/dir2")
258        self.assertFalse(host.path.exists("_dir1_/dir2"))
259        self.assertTrue (host.path.exists("_dir1_/file2"))
260        # Re-create `dir2` and remove `_dir1_`.
261        host.mkdir("_dir1_/dir2")
262        self.make_remote_file("_dir1_/dir2/file3")
263        self.make_remote_file("_dir1_/dir2/file4")
264        host.rmtree("_dir1_")
265        self.assertFalse(host.path.exists("_dir1_"))
266
267    def test_rmtree_with_error_handler(self):
268        host = self.host
269        self.cleaner.add_dir("_dir1_")
270        host.mkdir("_dir1_")
271        self.make_remote_file("_dir1_/file1")
272        # Prepare error "handler"
273        log = []
274        def error_handler(*args):
275            log.append(args)
276        # Try to remove a file as root "directory".
277        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
278        self.assertEqual(log, [])
279        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
280        self.assertEqual(log[0][0], host.listdir)
281        self.assertEqual(log[0][1], "_dir1_/file1")
282        self.assertEqual(log[1][0], host.rmdir)
283        self.assertEqual(log[1][1], "_dir1_/file1")
284        host.rmtree("_dir1_")
285        # Try to remove a non-existent directory.
286        del log[:]
287        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
288        self.assertEqual(log[0][0], host.listdir)
289        self.assertEqual(log[0][1], "_dir1_")
290        self.assertEqual(log[1][0], host.rmdir)
291        self.assertEqual(log[1][1], "_dir1_")
292
293    def test_remove_non_existent_item(self):
294        host = self.host
295        self.assertRaises(ftputil.error.PermanentError, host.remove,
296                          "nonexistent")
297
298    def test_remove_existing_file(self):
299        self.cleaner.add_file("_testfile_")
300        self.make_remote_file("_testfile_")
301        host = self.host
302        self.assertTrue(host.path.isfile("_testfile_"))
303        host.remove("_testfile_")
304        self.assertFalse(host.path.exists("_testfile_"))
305
306
307class TestWalk(RealFTPTest):
308
309    def _walk_test(self, expected_result, **walk_kwargs):
310        """Walk the directory and test results."""
311        # Collect data using `walk`.
312        actual_result = []
313        for items in self.host.walk(**walk_kwargs):
314            actual_result.append(items)
315        # Compare with expected results.
316        self.assertEqual(len(actual_result), len(expected_result))
317        for index, _ in enumerate(actual_result):
318            self.assertEqual(actual_result[index], expected_result[index])
319
320    def test_walk_topdown(self):
321        # Preparation: build tree in directory `walk_test`.
322        expected_result = [
323          ("walk_test",
324           ["dir1", "dir2", "dir3"],
325           ["file4"]),
326
327          ("walk_test/dir1",
328           ["dir11", "dir12"],
329           []),
330
331          ("walk_test/dir1/dir11",
332           [],
333           []),
334
335          ("walk_test/dir1/dir12",
336           ["dir123"],
337           ["file121", "file122"]),
338
339          ("walk_test/dir1/dir12/dir123",
340           [],
341           ["file1234"]),
342
343          ("walk_test/dir2",
344           [],
345           []),
346
347          ("walk_test/dir3",
348           ["dir31", "dir32"],
349           ["file31", "file32"]),
350
351          ("walk_test/dir3/dir31",
352           [],
353           []),
354          ]
355        self._walk_test(expected_result, top="walk_test")
356
357    def test_walk_depth_first(self):
358        # Preparation: build tree in directory `walk_test`
359        expected_result = [
360          ("walk_test/dir1/dir11",
361           [],
362           []),
363
364          ("walk_test/dir1/dir12/dir123",
365           [],
366           ["file1234"]),
367
368          ("walk_test/dir1/dir12",
369           ["dir123"],
370           ["file121", "file122"]),
371
372          ("walk_test/dir1",
373           ["dir11", "dir12"],
374           []),
375
376          ("walk_test/dir2",
377           [],
378           []),
379
380          ("walk_test/dir3/dir31",
381           [],
382           []),
383
384          ("walk_test/dir3",
385           ["dir31", "dir32"],
386           ["file31", "file32"]),
387
388          ("walk_test",
389           ["dir1", "dir2", "dir3"],
390           ["file4"])
391          ]
392        self._walk_test(expected_result, top="walk_test", topdown=False)
393
394    def test_walk_following_links(self):
395        # Preparation: build tree in directory `walk_test`.
396        expected_result = [
397          ("walk_test",
398           ["dir1", "dir2", "dir3"],
399           ["file4"]),
400
401          ("walk_test/dir1",
402           ["dir11", "dir12"],
403           []),
404
405          ("walk_test/dir1/dir11",
406           [],
407           []),
408
409          ("walk_test/dir1/dir12",
410           ["dir123"],
411           ["file121", "file122"]),
412
413          ("walk_test/dir1/dir12/dir123",
414           [],
415           ["file1234"]),
416
417          ("walk_test/dir2",
418           [],
419           []),
420
421          ("walk_test/dir3",
422           ["dir31", "dir32"],
423           ["file31", "file32"]),
424
425          ("walk_test/dir3/dir31",
426           [],
427           []),
428
429          ("walk_test/dir3/dir32",
430           [],
431           ["file1234"]),
432          ]
433        self._walk_test(expected_result, top="walk_test", followlinks=True)
434
435
436class TestRename(RealFTPTest):
437
438    def test_rename(self):
439        host = self.host
440        # Make sure the target of the renaming operation is removed.
441        self.cleaner.add_file("_testfile2_")
442        self.make_remote_file("_testfile1_")
443        host.rename("_testfile1_", "_testfile2_")
444        self.assertFalse(host.path.exists("_testfile1_"))
445        self.assertTrue (host.path.exists("_testfile2_"))
446
447    def test_rename_with_spaces_in_directory(self):
448        host = self.host
449        dir_name = "_dir with spaces_"
450        self.cleaner.add_dir(dir_name)
451        host.mkdir(dir_name)
452        self.make_remote_file(dir_name + "/testfile1")
453        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
454        self.assertFalse(host.path.exists(dir_name + "/testfile1"))
455        self.assertTrue (host.path.exists(dir_name + "/testfile2"))
456
457
458class TestStat(RealFTPTest):
459
460    def test_stat(self):
461        host = self.host
462        dir_name = "_testdir_"
463        file_name = host.path.join(dir_name, "_nonempty_")
464        # Make a directory and a file in it.
465        self.cleaner.add_dir(dir_name)
466        host.mkdir(dir_name)
467        with host.open(file_name, "wb") as fobj:
468            fobj.write(b"abc\x12\x34def\t")
469        # Do some stats
470        # - dir
471        dir_stat = host.stat(dir_name)
472        self.assertTrue(isinstance(dir_stat._st_name,
473                                   ftputil.compat.unicode_type))
474        self.assertEqual(host.listdir(dir_name), ["_nonempty_"])
475        self.assertTrue (host.path.isdir (dir_name))
476        self.assertFalse(host.path.isfile(dir_name))
477        self.assertFalse(host.path.islink(dir_name))
478        # - file
479        file_stat = host.stat(file_name)
480        self.assertTrue(isinstance(file_stat._st_name,
481                                   ftputil.compat.unicode_type))
482        self.assertFalse(host.path.isdir (file_name))
483        self.assertTrue (host.path.isfile(file_name))
484        self.assertFalse(host.path.islink(file_name))
485        self.assertEqual(host.path.getsize(file_name), 9)
486        # - file's modification time; allow up to two minutes difference
487        host.synchronize_times()
488        server_mtime = host.path.getmtime(file_name)
489        client_mtime = time.mktime(time.localtime())
490        calculated_time_shift = server_mtime - client_mtime
491        self.assertFalse(abs(calculated_time_shift-host.time_shift()) > 120)
492
493    def test_issomething_for_nonexistent_directory(self):
494        host = self.host
495        # Check if we get the right results if even the containing
496        # directory doesn't exist (see ticket #66).
497        nonexistent_path = "/nonexistent/nonexistent"
498        self.assertFalse(host.path.isdir (nonexistent_path))
499        self.assertFalse(host.path.isfile(nonexistent_path))
500        self.assertFalse(host.path.islink(nonexistent_path))
501
502    def test_special_broken_link(self):
503        # Test for ticket #39.
504        host = self.host
505        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
506        self.assertEqual(host.lstat(broken_link_name)._st_target,
507                         "../nonexistent/nonexistent")
508        self.assertFalse(host.path.isdir (broken_link_name))
509        self.assertFalse(host.path.isfile(broken_link_name))
510        self.assertTrue (host.path.islink(broken_link_name))
511
512    def test_concurrent_access(self):
513        self.make_remote_file("_testfile_")
514        with ftputil.FTPHost(*self.login_data) as host1:
515            with ftputil.FTPHost(*self.login_data) as host2:
516                stat_result1 = host1.stat("_testfile_")
517                stat_result2 = host2.stat("_testfile_")
518                self.assertEqual(stat_result1, stat_result2)
519                host2.remove("_testfile_")
520                # Can still get the result via `host1`
521                stat_result1 = host1.stat("_testfile_")
522                self.assertEqual(stat_result1, stat_result2)
523                # Stat'ing on `host2` gives an exception.
524                self.assertRaises(ftputil.error.PermanentError,
525                                  host2.stat, "_testfile_")
526                # Stat'ing on `host1` after invalidation
527                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
528                host1.stat_cache.invalidate(absolute_path)
529                self.assertRaises(ftputil.error.PermanentError,
530                                  host1.stat, "_testfile_")
531
532    def test_cache_auto_resizing(self):
533        """Test if the cache is resized appropriately."""
534        host = self.host
535        cache = host.stat_cache._cache
536        # Make sure the cache size isn't adjusted towards smaller values.
537        unused_entries = host.listdir("walk_test")
538        self.assertEqual(cache.size,
539                         ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE)
540        # Make the cache very small initially and see if it gets resized.
541        cache.size = 2
542        entries = host.listdir("walk_test")
543        # The adjusted cache size should be larger or equal to the
544        # number of items in `walk_test` and its parent directory. The
545        # latter is read implicitly upon `listdir`'s `isdir` call.
546        expected_min_cache_size = max(len(host.listdir(host.curdir)),
547                                      len(entries))
548        self.assertTrue(cache.size >= expected_min_cache_size)
549
550
551class TestUploadAndDownload(RealFTPTest):
552    """Test upload and download (including time shift test)."""
553
554    def test_time_shift(self):
555        self.host.synchronize_times()
556        self.assertEqual(self.host.time_shift(), EXPECTED_TIME_SHIFT)
557
558    @test.skip_long_running_test
559    def test_upload(self):
560        host = self.host
561        host.synchronize_times()
562        local_file = "_local_file_"
563        remote_file = "_remote_file_"
564        # Make local file to upload.
565        self.make_local_file()
566        # Wait, else small time differences between client and server
567        # actually could trigger the update.
568        time.sleep(65)
569        try:
570            self.cleaner.add_file(remote_file)
571            host.upload(local_file, remote_file)
572            # Retry; shouldn't be uploaded
573            uploaded = host.upload_if_newer(local_file, remote_file)
574            self.assertEqual(uploaded, False)
575            # Rewrite the local file.
576            self.make_local_file()
577            # Retry; should be uploaded now
578            uploaded = host.upload_if_newer(local_file, remote_file)
579            self.assertEqual(uploaded, True)
580        finally:
581            # Clean up
582            os.unlink(local_file)
583
584    @test.skip_long_running_test
585    def test_download(self):
586        host = self.host
587        host.synchronize_times()
588        local_file = "_local_file_"
589        remote_file = "_remote_file_"
590        # Make a remote file.
591        self.make_remote_file(remote_file)
592        # File should be downloaded as it's not present yet.
593        downloaded = host.download_if_newer(remote_file, local_file)
594        self.assertEqual(downloaded, True)
595        try:
596            # If the remote file, taking the datetime precision into
597            # account, _might_ be newer, the file will be downloaded
598            # again. To prevent this, wait a bit over a minute (the
599            # remote precision), then "touch" the local file.
600            time.sleep(65)
601            # Create empty file.
602            with open(local_file, "w") as fobj:
603                pass
604            # Local file is present and newer, so shouldn't download.
605            downloaded = host.download_if_newer(remote_file, local_file)
606            self.assertEqual(downloaded, False)
607            # Re-make the remote file.
608            self.make_remote_file(remote_file)
609            # Local file is present but possibly older (taking the
610            # possible deviation because of the precision into account),
611            # so should download.
612            downloaded = host.download_if_newer(remote_file, local_file)
613            self.assertEqual(downloaded, True)
614        finally:
615            # Clean up.
616            os.unlink(local_file)
617
618    def test_callback_with_transfer(self):
619        host = self.host
620        FILE_NAME = "debian-keyring.tar.gz"
621        # Default chunk size as in `FTPHost.copyfileobj`
622        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
623        file_size = host.path.getsize(FILE_NAME)
624        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
625        # Add one chunk for remainder.
626        chunk_count += 1
627        # Define a callback that just collects all data passed to it.
628        transferred_chunks_list = []
629        def test_callback(chunk):
630            transferred_chunks_list.append(chunk)
631        try:
632            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
633            # Construct a list of data chunks we expect.
634            expected_chunks_list = []
635            with open(FILE_NAME, "rb") as downloaded_fobj:
636                while True:
637                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
638                    if not chunk:
639                        break
640                    expected_chunks_list.append(chunk)
641            # Examine data collected by callback function.
642            self.assertEqual(len(transferred_chunks_list), chunk_count)
643            self.assertEqual(transferred_chunks_list, expected_chunks_list)
644        finally:
645            os.unlink(FILE_NAME)
646
647
648class TestFTPFiles(RealFTPTest):
649
650    def test_only_closed_children(self):
651        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
652        host = self.host
653        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
654            # Create empty file and close it.
655            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
656                pass
657            # This should re-use the second child because the first isn't
658            # closed but the second is.
659            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
660                self.assertEqual(len(host._children), 2)
661                self.assertTrue(file_obj._host is host._children[1])
662
663    def test_no_timed_out_children(self):
664        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
665        host = self.host
666        # Implicitly create child host object.
667        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
668            pass
669        # Monkey-patch file to simulate an FTP server timeout below.
670        def timed_out_pwd():
671            raise ftplib.error_temp("simulated timeout")
672        file_obj1._host._session.pwd = timed_out_pwd
673        # Try to get a file - which shouldn't be the timed-out file.
674        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
675            self.assertTrue(file_obj1 is not file_obj2)
676        # Re-use closed and not timed-out child session.
677        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
678            pass
679        self.assertTrue(file_obj2 is file_obj3)
680
681
682class TestChmod(RealFTPTest):
683
684    def assert_mode(self, path, expected_mode):
685        """
686        Return an integer containing the allowed bits in the mode
687        change command.
688
689        The `FTPHost` object to test against is `self.host`.
690        """
691        full_mode = self.host.stat(path).st_mode
692        # Remove flags we can't set via `chmod`.
693        # Allowed flags according to Python documentation
694        # http://docs.python.org/lib/os-file-dir.html .
695        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
696          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
697          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
698          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
699          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
700        allowed_mask = functools.reduce(operator.or_, allowed_flags)
701        mode = full_mode & allowed_mask
702        self.assertEqual(mode, expected_mode,
703                         "mode {0:o} != {1:o}".format(mode, expected_mode))
704
705    def test_chmod_existing_directory(self):
706        host = self.host
707        host.mkdir("_test dir_")
708        self.cleaner.add_dir("_test dir_")
709        # Set/get mode of a directory.
710        host.chmod("_test dir_", 0o757)
711        self.assert_mode("_test dir_", 0o757)
712        # Set/get mode in nested directory.
713        host.mkdir("_test dir_/nested_dir")
714        self.cleaner.add_dir("_test dir_/nested_dir")
715        host.chmod("_test dir_/nested_dir", 0o757)
716        self.assert_mode("_test dir_/nested_dir", 0o757)
717
718    def test_chmod_existing_file(self):
719        host = self.host
720        host.mkdir("_test dir_")
721        self.cleaner.add_dir("_test dir_")
722        # Set/get mode on a file.
723        file_name = host.path.join("_test dir_", "_testfile_")
724        self.make_remote_file(file_name)
725        host.chmod(file_name, 0o646)
726        self.assert_mode(file_name, 0o646)
727
728    def test_chmod_nonexistent_path(self):
729        # Set/get mode of a non-existing item.
730        self.assertRaises(ftputil.error.PermanentError, self.host.chmod,
731                          "nonexistent", 0o757)
732
733    def test_cache_invalidation(self):
734        host = self.host
735        host.mkdir("_test dir_")
736        self.cleaner.add_dir("_test dir_")
737        # Make sure the mode is in the cache.
738        unused_stat_result = host.stat("_test dir_")
739        # Set/get mode of the directory.
740        host.chmod("_test dir_", 0o757)
741        self.assert_mode("_test dir_", 0o757)
742        # Set/get mode on a file.
743        file_name = host.path.join("_test dir_", "_testfile_")
744        self.make_remote_file(file_name)
745        # Make sure the mode is in the cache.
746        unused_stat_result = host.stat(file_name)
747        host.chmod(file_name, 0o646)
748        self.assert_mode(file_name, 0o646)
749
750
751class TestOther(RealFTPTest):
752
753    def test_open_for_reading(self):
754        # Test for issues #17 and #51,
755        # http://ftputil.sschwarzer.net/trac/ticket/17 and
756        # http://ftputil.sschwarzer.net/trac/ticket/51 .
757        file1 = self.host.open("debian-keyring.tar.gz", "rb")
758        time.sleep(1)
759        # Depending on the FTP server, this might return a status code
760        # unexpected by `ftplib` or block the socket connection until
761        # a server-side timeout.
762        file1.close()
763
764    def test_subsequent_reading(self):
765        # Opening a file for reading
766        with self.host.open("debian-keyring.tar.gz", "rb") as file1:
767            pass
768        # Make sure that there are no problems if the connection is reused.
769        with self.host.open("debian-keyring.tar.gz", "rb") as file2:
770            pass
771        self.assertTrue(file1._session is file2._session)
772
773    def test_names_with_spaces(self):
774        # Test if directories and files with spaces in their names
775        # can be used.
776        host = self.host
777        self.assertTrue(host.path.isdir("dir with spaces"))
778        self.assertEqual(host.listdir("dir with spaces"),
779                         ["second dir", "some file", "some_file"])
780        self.assertTrue(host.path.isdir ("dir with spaces/second dir"))
781        self.assertTrue(host.path.isfile("dir with spaces/some_file"))
782        self.assertTrue(host.path.isfile("dir with spaces/some file"))
783
784    def test_synchronize_times_without_write_access(self):
785        """Test failing synchronization because of non-writable directory."""
786        host = self.host
787        # This isn't writable by the ftp account the tests are run under.
788        host.chdir("rootdir1")
789        self.assertRaises(ftputil.error.TimeShiftError, host.synchronize_times)
790
791    def test_bytes_file_name(self):
792        """
793        Test whether a UTF-8 file name can be sent and retrieved when
794        encoded explicitly.
795        """
796        # This test below would fail under Python 2 and I have no idea
797        # how to make it work there without modifying `ftplib`.
798        if ftputil.compat.python_version == 2:
799            return
800        #
801        host = self.host
802        # This requires an existing file with an UTF-8 encoded name on
803        # the remote file system. Note: If the remote file system
804        # doesn't use UTF-8, the test will probably succeed anyway.
805        FILE_NAME = "_ütf8_file_näme_♯♭_"
806        bytes_file_name = FILE_NAME.encode("UTF-8")
807        self.cleaner.add_file(bytes_file_name)
808        # Write under name `bytes_file_name`
809        with host.open(bytes_file_name, "w", encoding="UTF-8") as fobj:
810            fobj.write(FILE_NAME)
811        # Try to retrieve file with `listdir`.
812        items = host.listdir(b".")
813        self.assertTrue(bytes_file_name in items)
814        #  When getting a directory listing for a unicode directory,
815        #  ftputil will implicitly assume the encoding is latin-1 and
816        #  won't decode the file name to something different from
817        #  `bytes_file_name`.
818        items = host.listdir(".")
819        self.assertFalse(FILE_NAME in items)
820        # Re-open file.
821        with host.open(bytes_file_name, "r", encoding="UTF-8") as fobj:
822            data = fobj.read()
823        # Not the point of this test, but an additional sanity check
824        self.assertEqual(data, FILE_NAME)
825
826    def test_list_a_option(self):
827        # For this test to pass, the server must _not_ list "hidden"
828        # files by default but instead only when the `LIST` `-a`
829        # option is used.
830        host = self.host
831        self.assertTrue(host.use_list_a_option)
832        directory_entries = host.listdir(host.curdir)
833        self.assertTrue(".hidden" in directory_entries)
834        host.use_list_a_option = False
835        directory_entries = host.listdir(host.curdir)
836        self.assertFalse(".hidden" in directory_entries)
837
838    def _make_objects_to_be_garbage_collected(self):
839        for _ in range(10):
840            with ftputil.FTPHost(*self.login_data) as host:
841                for _ in range(10):
842                    unused_stat_result = host.stat("CONTENTS")
843                    with host.open("CONTENTS") as fobj:
844                        unused_data = fobj.read()
845
846    def test_garbage_collection(self):
847        """Test whether there are cycles which prevent garbage collection."""
848        gc.collect()
849        objects_before_test = len(gc.garbage)
850        self._make_objects_to_be_garbage_collected()
851        gc.collect()
852        objects_after_test = len(gc.garbage)
853        self.assertFalse(objects_after_test - objects_before_test)
854
855    @unittest.skipIf(ftputil.compat.python_version > 2,
856                     "test requires M2Crypto which only works on Python 2")
857    def test_m2crypto_session(self):
858        """
859        Test if a session with `M2Crypto.ftpslib.FTP_TLS` is set up
860        correctly and works with unicode input.
861        """
862        # See ticket #78.
863        #
864        # M2Crypto is only available for Python 2.
865        import M2Crypto
866        factory = ftputil.session.session_factory(
867                    base_class=M2Crypto.ftpslib.FTP_TLS,
868                    encrypt_data_channel=True)
869        with ftputil.FTPHost(*self.login_data, session_factory=factory) as host:
870            # Test if unicode argument works.
871            files = host.listdir(".")
872        self.assertTrue("CONTENTS" in files)
873
874
875
876if __name__ == "__main__":
877    print("""\
878Test real FTP access.
879
880This test writes some files and directories on the local client and
881the remote server. You'll need write access in the login directory.
882This test can take a few minutes because it has to wait to test the
883timezone calculation.
884    """)
885    unittest.main()
Note: See TracBrowser for help on using the repository browser.