source: test/test_real_ftp.py @ 1503:9d08fbe3c750

Last change on this file since 1503:9d08fbe3c750 was 1503:9d08fbe3c750, checked in by Stefan Schwarzer <sschwarzer@…>, 7 years ago
Refactored `walk` tests to avoid duplicated code.
File size: 32.0 KB
Line 
1# encoding: UTF-8
2
3# Copyright (C) 2003-2013, Stefan Schwarzer <sschwarzer@sschwarzer.net>
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7
8from __future__ import absolute_import
9from __future__ import unicode_literals
10
11import ftplib
12import functools
13import gc
14import operator
15import os
16import time
17import unittest
18import stat
19
20import ftputil.compat
21import ftputil.error
22import ftputil.file_transfer
23import ftputil.session
24import ftputil.stat_cache
25
26import test
27
28
29def utc_local_time_shift():
30    """
31    Return the expected time shift in seconds assuming the server
32    uses UTC in its listings and the client uses local time.
33
34    This is needed because Pure-FTPd meanwhile seems to insist that
35    the displayed time for files is in UTC.
36    """
37    utc_tuple = time.gmtime()
38    localtime_tuple = time.localtime()
39    # To calculate the correct times shift, we need to ignore the
40    # DST component in the localtime tuple, i. e. set it to 0.
41    localtime_tuple = localtime_tuple[:-1] + (0,)
42    time_shift_in_seconds = (time.mktime(utc_tuple) -
43                             time.mktime(localtime_tuple))
44    # To be safe, round the above value to units of 3600 s (1 hour).
45    return round(time_shift_in_seconds / 3600.0) * 3600
46
47# Difference between local times of server and client. If 0.0, server
48# and client use the same timezone.
49#EXPECTED_TIME_SHIFT = utc_local_time_shift()
50# Pure-FTPd seems to have changed its mind (see docstring of
51# `utc_local_time_shift`).
52EXPECTED_TIME_SHIFT = 0.0
53
54
55class Cleaner(object):
56    """
57    This class helps remove directories and files which might
58    otherwise be left behind if a test fails in unexpected ways.
59    """
60
61    def __init__(self, host):
62        # The test class (probably `RealFTPTest`) and the helper
63        # class share the same `FTPHost` object.
64        self._host = host
65        self._ftp_items = []
66
67    def add_dir(self, path):
68        """Schedule a directory with path `path` for removal."""
69        self._ftp_items.append(("d", self._host.path.abspath(path)))
70
71    def add_file(self, path):
72        """Schedule a file with path `path` for removal."""
73        self._ftp_items.append(("f", self._host.path.abspath(path)))
74
75    def clean(self):
76        """
77        Remove the directories and files previously remembered.
78        The removal works in reverse order of the scheduling with
79        `add_dir` and `add_file`.
80
81        Errors due to a removal are ignored.
82        """
83        self._host.chdir("/")
84        for type_, path in reversed(self._ftp_items):
85            try:
86                if type_ == "d":
87                    # If something goes wrong in `rmtree` we might
88                    # leave a mess behind.
89                    self._host.rmtree(path)
90                elif type_ == "f":
91                    # Minor mess if `remove` fails
92                    self._host.remove(path)
93            except ftputil.error.FTPError:
94                pass
95
96
97class RealFTPTest(unittest.TestCase):
98
99    def setUp(self):
100        # Server, username, password.
101        self.login_data = ("localhost", "ftptest",
102                            "d605581757de5eb56d568a4419f4126e")
103        self.host = ftputil.FTPHost(*self.login_data)
104        self.cleaner = Cleaner(self.host)
105
106    def tearDown(self):
107        self.cleaner.clean()
108        self.host.close()
109
110    #
111    # Helper methods
112    #
113    def make_remote_file(self, path):
114        """Create a file on the FTP host."""
115        self.cleaner.add_file(path)
116        with self.host.open(path, "wb") as file_:
117            # Write something. Otherwise the FTP server might not update
118            # the time of last modification if the file existed before.
119            file_.write(b"\n")
120
121    def make_local_file(self):
122        """Create a file on the local host (= on the client side)."""
123        with open("_local_file_", "wb") as fobj:
124            fobj.write(b"abc\x12\x34def\t")
125
126
127class TestMkdir(RealFTPTest):
128
129    def test_mkdir_rmdir(self):
130        host = self.host
131        dir_name = "_testdir_"
132        file_name = host.path.join(dir_name, "_nonempty_")
133        self.cleaner.add_dir(dir_name)
134        # Make dir and check if the directory is there.
135        host.mkdir(dir_name)
136        files = host.listdir(host.curdir)
137        self.assertTrue(dir_name in files)
138        # Try to remove a non-empty directory.
139        self.cleaner.add_file(file_name)
140        non_empty = host.open(file_name, "w")
141        non_empty.close()
142        self.assertRaises(ftputil.error.PermanentError, host.rmdir, dir_name)
143        # Remove file.
144        host.unlink(file_name)
145        # `remove` on a directory should fail.
146        try:
147            try:
148                host.remove(dir_name)
149            except ftputil.error.PermanentError as exc:
150                self.assertTrue(str(exc).startswith(
151                                "remove/unlink can only delete files"))
152            else:
153                self.assertTrue(False, "we shouldn't have come here")
154        finally:
155            # Delete empty directory.
156            host.rmdir(dir_name)
157        files = host.listdir(host.curdir)
158        self.assertTrue(dir_name not in files)
159
160    def test_makedirs_without_existing_dirs(self):
161        host = self.host
162        # No `_dir1_` yet
163        self.assertFalse("_dir1_" in host.listdir(host.curdir))
164        # Vanilla case, all should go well.
165        host.makedirs("_dir1_/dir2/dir3/dir4")
166        self.cleaner.add_dir("_dir1_")
167        # Check host.
168        self.assertTrue(host.path.isdir("_dir1_"))
169        self.assertTrue(host.path.isdir("_dir1_/dir2"))
170        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3"))
171        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3/dir4"))
172
173    def test_makedirs_from_non_root_directory(self):
174        # This is a testcase for issue #22, see
175        # http://ftputil.sschwarzer.net/trac/ticket/22 .
176        host = self.host
177        # No `_dir1_` and `_dir2_` yet
178        self.assertFalse("_dir1_" in host.listdir(host.curdir))
179        self.assertFalse("_dir2_" in host.listdir(host.curdir))
180        # Part 1: Try to make directories starting from `_dir1_` and
181        # change to non-root directory.
182        self.cleaner.add_dir("_dir1_")
183        host.mkdir("_dir1_")
184        host.chdir("_dir1_")
185        host.makedirs("_dir2_/_dir3_")
186        # Test for expected directory hierarchy.
187        self.assertTrue (host.path.isdir("/_dir1_"))
188        self.assertTrue (host.path.isdir("/_dir1_/_dir2_"))
189        self.assertTrue (host.path.isdir("/_dir1_/_dir2_/_dir3_"))
190        self.assertFalse(host.path.isdir("/_dir1_/_dir1_"))
191        # Remove all but the directory we're in.
192        host.rmdir("/_dir1_/_dir2_/_dir3_")
193        host.rmdir("/_dir1_/_dir2_")
194        # Part 2: Try to make directories starting from root.
195        self.cleaner.add_dir("/_dir2_")
196        host.makedirs("/_dir2_/_dir3_")
197        # Test for expected directory hierarchy
198        self.assertTrue (host.path.isdir("/_dir2_"))
199        self.assertTrue (host.path.isdir("/_dir2_/_dir3_"))
200        self.assertFalse(host.path.isdir("/_dir1_/_dir2_"))
201
202    def test_makedirs_of_existing_directory(self):
203        host = self.host
204        # The (chrooted) login directory
205        host.makedirs("/")
206
207    def test_makedirs_with_file_in_the_way(self):
208        host = self.host
209        self.cleaner.add_dir("_dir1_")
210        host.mkdir("_dir1_")
211        self.make_remote_file("_dir1_/file1")
212        # Try it.
213        self.assertRaises(ftputil.error.PermanentError,
214                          host.makedirs, "_dir1_/file1")
215        self.assertRaises(ftputil.error.PermanentError,
216                          host.makedirs, "_dir1_/file1/dir2")
217
218    def test_makedirs_with_existing_directory(self):
219        host = self.host
220        self.cleaner.add_dir("_dir1_")
221        host.mkdir("_dir1_")
222        host.makedirs("_dir1_/dir2")
223        # Check
224        self.assertTrue(host.path.isdir("_dir1_"))
225        self.assertTrue(host.path.isdir("_dir1_/dir2"))
226
227    def test_makedirs_in_non_writable_directory(self):
228        host = self.host
229        # Preparation: `rootdir1` exists but is only writable by root.
230        self.assertRaises(ftputil.error.PermanentError, host.makedirs,
231                          "rootdir1/dir2")
232
233    def test_makedirs_with_writable_directory_at_end(self):
234        host = self.host
235        self.cleaner.add_dir("rootdir2/dir2")
236        # Preparation: `rootdir2` exists but is only writable by root.
237        # `dir2` is writable by regular ftp user. These both should work.
238        host.makedirs("rootdir2/dir2")
239        host.makedirs("rootdir2/dir2/dir3")
240
241
242class TestRemoval(RealFTPTest):
243
244    def test_rmtree_without_error_handler(self):
245        host = self.host
246        # Build a tree.
247        self.cleaner.add_dir("_dir1_")
248        host.makedirs("_dir1_/dir2")
249        self.make_remote_file("_dir1_/file1")
250        self.make_remote_file("_dir1_/file2")
251        self.make_remote_file("_dir1_/dir2/file3")
252        self.make_remote_file("_dir1_/dir2/file4")
253        # Try to remove a _file_ with `rmtree`.
254        self.assertRaises(ftputil.error.PermanentError, host.rmtree,
255                          "_dir1_/file2")
256        # Remove `dir2`.
257        host.rmtree("_dir1_/dir2")
258        self.assertFalse(host.path.exists("_dir1_/dir2"))
259        self.assertTrue (host.path.exists("_dir1_/file2"))
260        # Re-create `dir2` and remove `_dir1_`.
261        host.mkdir("_dir1_/dir2")
262        self.make_remote_file("_dir1_/dir2/file3")
263        self.make_remote_file("_dir1_/dir2/file4")
264        host.rmtree("_dir1_")
265        self.assertFalse(host.path.exists("_dir1_"))
266
267    def test_rmtree_with_error_handler(self):
268        host = self.host
269        self.cleaner.add_dir("_dir1_")
270        host.mkdir("_dir1_")
271        self.make_remote_file("_dir1_/file1")
272        # Prepare error "handler"
273        log = []
274        def error_handler(*args):
275            log.append(args)
276        # Try to remove a file as root "directory".
277        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
278        self.assertEqual(log, [])
279        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
280        self.assertEqual(log[0][0], host.listdir)
281        self.assertEqual(log[0][1], "_dir1_/file1")
282        self.assertEqual(log[1][0], host.rmdir)
283        self.assertEqual(log[1][1], "_dir1_/file1")
284        host.rmtree("_dir1_")
285        # Try to remove a non-existent directory.
286        del log[:]
287        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
288        self.assertEqual(log[0][0], host.listdir)
289        self.assertEqual(log[0][1], "_dir1_")
290        self.assertEqual(log[1][0], host.rmdir)
291        self.assertEqual(log[1][1], "_dir1_")
292
293    def test_remove_non_existent_item(self):
294        host = self.host
295        self.assertRaises(ftputil.error.PermanentError, host.remove,
296                          "nonexistent")
297
298    def test_remove_existing_file(self):
299        self.cleaner.add_file("_testfile_")
300        self.make_remote_file("_testfile_")
301        host = self.host
302        self.assertTrue(host.path.isfile("_testfile_"))
303        host.remove("_testfile_")
304        self.assertFalse(host.path.exists("_testfile_"))
305
306
307class TestWalk(RealFTPTest):
308
309    def _walk_test(self, expected_result, **walk_kwargs):
310        """Walk the directory and test results."""
311        # Collect data using `walk`.
312        actual_result = []
313        for items in self.host.walk(**walk_kwargs):
314            actual_result.append(items)
315        # Compare with expected results.
316        self.assertEqual(len(actual_result), len(expected_result))
317        for index, _ in enumerate(actual_result):
318            self.assertEqual(actual_result[index], expected_result[index])
319
320    def test_walk_topdown(self):
321        # Preparation: build tree in directory `walk_test`.
322        expected_result = [
323          ("walk_test",
324           ["dir1", "dir2", "dir3"],
325           ["file4"]),
326
327          ("walk_test/dir1",
328           ["dir11", "dir12"],
329           []),
330
331          ("walk_test/dir1/dir11",
332           [],
333           []),
334
335          ("walk_test/dir1/dir12",
336           ["dir123"],
337           ["file121", "file122"]),
338
339          ("walk_test/dir1/dir12/dir123",
340           [],
341           ["file1234"]),
342
343          ("walk_test/dir2",
344           [],
345           []),
346
347          ("walk_test/dir3",
348           ["dir33"],
349           ["file31", "file32"]),
350
351          ("walk_test/dir3/dir33",
352           [],
353           []),
354          ]
355        self._walk_test(expected_result, top="walk_test")
356
357    def test_walk_depth_first(self):
358        # Preparation: build tree in directory `walk_test`
359        expected_result = [
360          ("walk_test/dir1/dir11",
361           [],
362           []),
363
364          ("walk_test/dir1/dir12/dir123",
365           [],
366           ["file1234"]),
367
368          ("walk_test/dir1/dir12",
369           ["dir123"],
370           ["file121", "file122"]),
371
372          ("walk_test/dir1",
373           ["dir11", "dir12"],
374           []),
375
376          ("walk_test/dir2",
377           [],
378           []),
379
380          ("walk_test/dir3/dir33",
381           [],
382           []),
383
384          ("walk_test/dir3",
385           ["dir33"],
386           ["file31", "file32"]),
387
388          ("walk_test",
389           ["dir1", "dir2", "dir3"],
390           ["file4"])
391          ]
392        self._walk_test(expected_result, top="walk_test", topdown=False)
393
394
395class TestRename(RealFTPTest):
396
397    def test_rename(self):
398        host = self.host
399        # Make sure the target of the renaming operation is removed.
400        self.cleaner.add_file("_testfile2_")
401        self.make_remote_file("_testfile1_")
402        host.rename("_testfile1_", "_testfile2_")
403        self.assertFalse(host.path.exists("_testfile1_"))
404        self.assertTrue (host.path.exists("_testfile2_"))
405
406    def test_rename_with_spaces_in_directory(self):
407        host = self.host
408        dir_name = "_dir with spaces_"
409        self.cleaner.add_dir(dir_name)
410        host.mkdir(dir_name)
411        self.make_remote_file(dir_name + "/testfile1")
412        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
413        self.assertFalse(host.path.exists(dir_name + "/testfile1"))
414        self.assertTrue (host.path.exists(dir_name + "/testfile2"))
415
416
417class TestStat(RealFTPTest):
418
419    def test_stat(self):
420        host = self.host
421        dir_name = "_testdir_"
422        file_name = host.path.join(dir_name, "_nonempty_")
423        # Make a directory and a file in it.
424        self.cleaner.add_dir(dir_name)
425        host.mkdir(dir_name)
426        with host.open(file_name, "wb") as fobj:
427            fobj.write(b"abc\x12\x34def\t")
428        # Do some stats
429        # - dir
430        dir_stat = host.stat(dir_name)
431        self.assertTrue(isinstance(dir_stat._st_name,
432                                   ftputil.compat.unicode_type))
433        self.assertEqual(host.listdir(dir_name), ["_nonempty_"])
434        self.assertTrue (host.path.isdir (dir_name))
435        self.assertFalse(host.path.isfile(dir_name))
436        self.assertFalse(host.path.islink(dir_name))
437        # - file
438        file_stat = host.stat(file_name)
439        self.assertTrue(isinstance(file_stat._st_name,
440                                   ftputil.compat.unicode_type))
441        self.assertFalse(host.path.isdir (file_name))
442        self.assertTrue (host.path.isfile(file_name))
443        self.assertFalse(host.path.islink(file_name))
444        self.assertEqual(host.path.getsize(file_name), 9)
445        # - file's modification time; allow up to two minutes difference
446        host.synchronize_times()
447        server_mtime = host.path.getmtime(file_name)
448        client_mtime = time.mktime(time.localtime())
449        calculated_time_shift = server_mtime - client_mtime
450        self.assertFalse(abs(calculated_time_shift-host.time_shift()) > 120)
451
452    def test_issomething_for_nonexistent_directory(self):
453        host = self.host
454        # Check if we get the right results if even the containing
455        # directory doesn't exist (see ticket #66).
456        nonexistent_path = "/nonexistent/nonexistent"
457        self.assertFalse(host.path.isdir (nonexistent_path))
458        self.assertFalse(host.path.isfile(nonexistent_path))
459        self.assertFalse(host.path.islink(nonexistent_path))
460
461    def test_special_broken_link(self):
462        # Test for ticket #39.
463        host = self.host
464        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
465        self.assertEqual(host.lstat(broken_link_name)._st_target,
466                         "../nonexistent/nonexistent")
467        self.assertFalse(host.path.isdir (broken_link_name))
468        self.assertFalse(host.path.isfile(broken_link_name))
469        self.assertTrue (host.path.islink(broken_link_name))
470
471    def test_concurrent_access(self):
472        self.make_remote_file("_testfile_")
473        with ftputil.FTPHost(*self.login_data) as host1:
474            with ftputil.FTPHost(*self.login_data) as host2:
475                stat_result1 = host1.stat("_testfile_")
476                stat_result2 = host2.stat("_testfile_")
477                self.assertEqual(stat_result1, stat_result2)
478                host2.remove("_testfile_")
479                # Can still get the result via `host1`
480                stat_result1 = host1.stat("_testfile_")
481                self.assertEqual(stat_result1, stat_result2)
482                # Stat'ing on `host2` gives an exception.
483                self.assertRaises(ftputil.error.PermanentError,
484                                  host2.stat, "_testfile_")
485                # Stat'ing on `host1` after invalidation
486                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
487                host1.stat_cache.invalidate(absolute_path)
488                self.assertRaises(ftputil.error.PermanentError,
489                                  host1.stat, "_testfile_")
490
491    def test_cache_auto_resizing(self):
492        """Test if the cache is resized appropriately."""
493        host = self.host
494        cache = host.stat_cache._cache
495        # Make sure the cache size isn't adjusted towards smaller values.
496        unused_entries = host.listdir("walk_test")
497        self.assertEqual(cache.size,
498                         ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE)
499        # Make the cache very small initially and see if it gets resized.
500        cache.size = 2
501        entries = host.listdir("walk_test")
502        # The adjusted cache size should be larger or equal to the
503        # number of items in `walk_test` and its parent directory. The
504        # latter is read implicitly upon `listdir`'s `isdir` call.
505        expected_min_cache_size = max(len(host.listdir(host.curdir)),
506                                      len(entries))
507        self.assertTrue(cache.size >= expected_min_cache_size)
508
509
510class TestUploadAndDownload(RealFTPTest):
511    """Test upload and download (including time shift test)."""
512
513    def test_time_shift(self):
514        self.host.synchronize_times()
515        self.assertEqual(self.host.time_shift(), EXPECTED_TIME_SHIFT)
516
517    @test.skip_long_running_test
518    def test_upload(self):
519        host = self.host
520        host.synchronize_times()
521        local_file = "_local_file_"
522        remote_file = "_remote_file_"
523        # Make local file to upload.
524        self.make_local_file()
525        # Wait, else small time differences between client and server
526        # actually could trigger the update.
527        time.sleep(65)
528        try:
529            self.cleaner.add_file(remote_file)
530            host.upload(local_file, remote_file)
531            # Retry; shouldn't be uploaded
532            uploaded = host.upload_if_newer(local_file, remote_file)
533            self.assertEqual(uploaded, False)
534            # Rewrite the local file.
535            self.make_local_file()
536            # Retry; should be uploaded now
537            uploaded = host.upload_if_newer(local_file, remote_file)
538            self.assertEqual(uploaded, True)
539        finally:
540            # Clean up
541            os.unlink(local_file)
542
543    @test.skip_long_running_test
544    def test_download(self):
545        host = self.host
546        host.synchronize_times()
547        local_file = "_local_file_"
548        remote_file = "_remote_file_"
549        # Make a remote file.
550        self.make_remote_file(remote_file)
551        # File should be downloaded as it's not present yet.
552        downloaded = host.download_if_newer(remote_file, local_file)
553        self.assertEqual(downloaded, True)
554        try:
555            # If the remote file, taking the datetime precision into
556            # account, _might_ be newer, the file will be downloaded
557            # again. To prevent this, wait a bit over a minute (the
558            # remote precision), then "touch" the local file.
559            time.sleep(65)
560            # Create empty file.
561            with open(local_file, "w") as fobj:
562                pass
563            # Local file is present and newer, so shouldn't download.
564            downloaded = host.download_if_newer(remote_file, local_file)
565            self.assertEqual(downloaded, False)
566            # Re-make the remote file.
567            self.make_remote_file(remote_file)
568            # Local file is present but possibly older (taking the
569            # possible deviation because of the precision into account),
570            # so should download.
571            downloaded = host.download_if_newer(remote_file, local_file)
572            self.assertEqual(downloaded, True)
573        finally:
574            # Clean up.
575            os.unlink(local_file)
576
577    def test_callback_with_transfer(self):
578        host = self.host
579        FILE_NAME = "debian-keyring.tar.gz"
580        # Default chunk size as in `FTPHost.copyfileobj`
581        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
582        file_size = host.path.getsize(FILE_NAME)
583        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
584        # Add one chunk for remainder.
585        chunk_count += 1
586        # Define a callback that just collects all data passed to it.
587        transferred_chunks_list = []
588        def test_callback(chunk):
589            transferred_chunks_list.append(chunk)
590        try:
591            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
592            # Construct a list of data chunks we expect.
593            expected_chunks_list = []
594            with open(FILE_NAME, "rb") as downloaded_fobj:
595                while True:
596                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
597                    if not chunk:
598                        break
599                    expected_chunks_list.append(chunk)
600            # Examine data collected by callback function.
601            self.assertEqual(len(transferred_chunks_list), chunk_count)
602            self.assertEqual(transferred_chunks_list, expected_chunks_list)
603        finally:
604            os.unlink(FILE_NAME)
605
606
607class TestFTPFiles(RealFTPTest):
608
609    def test_only_closed_children(self):
610        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
611        host = self.host
612        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
613            # Create empty file and close it.
614            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
615                pass
616            # This should re-use the second child because the first isn't
617            # closed but the second is.
618            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
619                self.assertEqual(len(host._children), 2)
620                self.assertTrue(file_obj._host is host._children[1])
621
622    def test_no_timed_out_children(self):
623        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
624        host = self.host
625        # Implicitly create child host object.
626        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
627            pass
628        # Monkey-patch file to simulate an FTP server timeout below.
629        def timed_out_pwd():
630            raise ftplib.error_temp("simulated timeout")
631        file_obj1._host._session.pwd = timed_out_pwd
632        # Try to get a file - which shouldn't be the timed-out file.
633        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
634            self.assertTrue(file_obj1 is not file_obj2)
635        # Re-use closed and not timed-out child session.
636        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
637            pass
638        self.assertTrue(file_obj2 is file_obj3)
639
640
641class TestChmod(RealFTPTest):
642
643    def assert_mode(self, path, expected_mode):
644        """
645        Return an integer containing the allowed bits in the mode
646        change command.
647
648        The `FTPHost` object to test against is `self.host`.
649        """
650        full_mode = self.host.stat(path).st_mode
651        # Remove flags we can't set via `chmod`.
652        # Allowed flags according to Python documentation
653        # http://docs.python.org/lib/os-file-dir.html .
654        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
655          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
656          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
657          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
658          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
659        allowed_mask = functools.reduce(operator.or_, allowed_flags)
660        mode = full_mode & allowed_mask
661        self.assertEqual(mode, expected_mode,
662                         "mode {0:o} != {1:o}".format(mode, expected_mode))
663
664    def test_chmod_existing_directory(self):
665        host = self.host
666        host.mkdir("_test dir_")
667        self.cleaner.add_dir("_test dir_")
668        # Set/get mode of a directory.
669        host.chmod("_test dir_", 0o757)
670        self.assert_mode("_test dir_", 0o757)
671        # Set/get mode in nested directory.
672        host.mkdir("_test dir_/nested_dir")
673        self.cleaner.add_dir("_test dir_/nested_dir")
674        host.chmod("_test dir_/nested_dir", 0o757)
675        self.assert_mode("_test dir_/nested_dir", 0o757)
676
677    def test_chmod_existing_file(self):
678        host = self.host
679        host.mkdir("_test dir_")
680        self.cleaner.add_dir("_test dir_")
681        # Set/get mode on a file.
682        file_name = host.path.join("_test dir_", "_testfile_")
683        self.make_remote_file(file_name)
684        host.chmod(file_name, 0o646)
685        self.assert_mode(file_name, 0o646)
686
687    def test_chmod_nonexistent_path(self):
688        # Set/get mode of a non-existing item.
689        self.assertRaises(ftputil.error.PermanentError, self.host.chmod,
690                          "nonexistent", 0o757)
691
692    def test_cache_invalidation(self):
693        host = self.host
694        host.mkdir("_test dir_")
695        self.cleaner.add_dir("_test dir_")
696        # Make sure the mode is in the cache.
697        unused_stat_result = host.stat("_test dir_")
698        # Set/get mode of the directory.
699        host.chmod("_test dir_", 0o757)
700        self.assert_mode("_test dir_", 0o757)
701        # Set/get mode on a file.
702        file_name = host.path.join("_test dir_", "_testfile_")
703        self.make_remote_file(file_name)
704        # Make sure the mode is in the cache.
705        unused_stat_result = host.stat(file_name)
706        host.chmod(file_name, 0o646)
707        self.assert_mode(file_name, 0o646)
708
709
710class TestOther(RealFTPTest):
711
712    def test_open_for_reading(self):
713        # Test for issues #17 and #51,
714        # http://ftputil.sschwarzer.net/trac/ticket/17 and
715        # http://ftputil.sschwarzer.net/trac/ticket/51 .
716        file1 = self.host.open("debian-keyring.tar.gz", "rb")
717        time.sleep(1)
718        # Depending on the FTP server, this might return a status code
719        # unexpected by `ftplib` or block the socket connection until
720        # a server-side timeout.
721        file1.close()
722
723    def test_subsequent_reading(self):
724        # Opening a file for reading
725        with self.host.open("debian-keyring.tar.gz", "rb") as file1:
726            pass
727        # Make sure that there are no problems if the connection is reused.
728        with self.host.open("debian-keyring.tar.gz", "rb") as file2:
729            pass
730        self.assertTrue(file1._session is file2._session)
731
732    def test_names_with_spaces(self):
733        # Test if directories and files with spaces in their names
734        # can be used.
735        host = self.host
736        self.assertTrue(host.path.isdir("dir with spaces"))
737        self.assertEqual(host.listdir("dir with spaces"),
738                         ["second dir", "some file", "some_file"])
739        self.assertTrue(host.path.isdir ("dir with spaces/second dir"))
740        self.assertTrue(host.path.isfile("dir with spaces/some_file"))
741        self.assertTrue(host.path.isfile("dir with spaces/some file"))
742
743    def test_synchronize_times_without_write_access(self):
744        """Test failing synchronization because of non-writable directory."""
745        host = self.host
746        # This isn't writable by the ftp account the tests are run under.
747        host.chdir("rootdir1")
748        self.assertRaises(ftputil.error.TimeShiftError, host.synchronize_times)
749
750    def test_bytes_file_name(self):
751        """
752        Test whether a UTF-8 file name can be sent and retrieved when
753        encoded explicitly.
754        """
755        # This test below would fail under Python 2 and I have no idea
756        # how to make it work there without modifying `ftplib`.
757        if ftputil.compat.python_version == 2:
758            return
759        #
760        host = self.host
761        # This requires an existing file with an UTF-8 encoded name on
762        # the remote file system. Note: If the remote file system
763        # doesn't use UTF-8, the test will probably succeed anyway.
764        FILE_NAME = "_ütf8_file_näme_♯♭_"
765        bytes_file_name = FILE_NAME.encode("UTF-8")
766        self.cleaner.add_file(bytes_file_name)
767        # Write under name `bytes_file_name`
768        with host.open(bytes_file_name, "w", encoding="UTF-8") as fobj:
769            fobj.write(FILE_NAME)
770        # Try to retrieve file with `listdir`.
771        items = host.listdir(b".")
772        self.assertTrue(bytes_file_name in items)
773        #  When getting a directory listing for a unicode directory,
774        #  ftputil will implicitly assume the encoding is latin-1 and
775        #  won't decode the file name to something different from
776        #  `bytes_file_name`.
777        items = host.listdir(".")
778        self.assertFalse(FILE_NAME in items)
779        # Re-open file.
780        with host.open(bytes_file_name, "r", encoding="UTF-8") as fobj:
781            data = fobj.read()
782        # Not the point of this test, but an additional sanity check
783        self.assertEqual(data, FILE_NAME)
784
785    def test_list_a_option(self):
786        # For this test to pass, the server must _not_ list "hidden"
787        # files by default but instead only when the `LIST` `-a`
788        # option is used.
789        host = self.host
790        self.assertTrue(host.use_list_a_option)
791        directory_entries = host.listdir(host.curdir)
792        self.assertTrue(".hidden" in directory_entries)
793        host.use_list_a_option = False
794        directory_entries = host.listdir(host.curdir)
795        self.assertFalse(".hidden" in directory_entries)
796
797    def _make_objects_to_be_garbage_collected(self):
798        for _ in range(10):
799            with ftputil.FTPHost(*self.login_data) as host:
800                for _ in range(10):
801                    unused_stat_result = host.stat("CONTENTS")
802                    with host.open("CONTENTS") as fobj:
803                        unused_data = fobj.read()
804
805    def test_garbage_collection(self):
806        """Test whether there are cycles which prevent garbage collection."""
807        gc.collect()
808        objects_before_test = len(gc.garbage)
809        self._make_objects_to_be_garbage_collected()
810        gc.collect()
811        objects_after_test = len(gc.garbage)
812        self.assertFalse(objects_after_test - objects_before_test)
813
814    @unittest.skipIf(ftputil.compat.python_version > 2,
815                     "test requires M2Crypto which only works on Python 2")
816    def test_m2crypto_session(self):
817        """
818        Test if a session with `M2Crypto.ftpslib.FTP_TLS` is set up
819        correctly and works with unicode input.
820        """
821        # See ticket #78.
822        #
823        # M2Crypto is only available for Python 2.
824        import M2Crypto
825        factory = ftputil.session.session_factory(
826                    base_class=M2Crypto.ftpslib.FTP_TLS,
827                    encrypt_data_channel=True)
828        with ftputil.FTPHost(*self.login_data, session_factory=factory) as host:
829            # Test if unicode argument works.
830            files = host.listdir(".")
831        self.assertTrue("CONTENTS" in files)
832
833
834
835if __name__ == "__main__":
836    print("""\
837Test real FTP access.
838
839This test writes some files and directories on the local client and
840the remote server. You'll need write access in the login directory.
841This test can take a few minutes because it has to wait to test the
842timezone calculation.
843    """)
844    unittest.main()
Note: See TracBrowser for help on using the repository browser.