source: test/test_real_ftp.py @ 1460:d16e39a9b351

Last change on this file since 1460:d16e39a9b351 was 1460:d16e39a9b351, checked in by Stefan Schwarzer <sschwarzer@…>, 8 years ago
Use `with` statement for local and remote files.
File size: 31.3 KB
Line 
1# encoding: UTF-8
2
3# Copyright (C) 2003-2013, Stefan Schwarzer <sschwarzer@sschwarzer.net>
4# See the file LICENSE for licensing terms.
5
6# Execute tests on a real FTP server (other tests use a mock server).
7
8from __future__ import absolute_import
9from __future__ import unicode_literals
10
11import ftplib
12import functools
13import gc
14import operator
15import os
16import time
17import unittest
18import stat
19
20import ftputil.compat
21import ftputil.error
22import ftputil.file_transfer
23import ftputil.stat_cache
24
25import test
26
27
28def utc_local_time_shift():
29    """
30    Return the expected time shift in seconds assuming the server
31    uses UTC in its listings and the client uses local time.
32
33    This is needed because Pure-FTPd meanwhile seems to insist that
34    the displayed time for files is in UTC.
35    """
36    utc_tuple = time.gmtime()
37    localtime_tuple = time.localtime()
38    # To calculate the correct times shift, we need to ignore the
39    # DST component in the localtime tuple, i. e. set it to 0.
40    localtime_tuple = localtime_tuple[:-1] + (0,)
41    time_shift_in_seconds = (time.mktime(utc_tuple) -
42                             time.mktime(localtime_tuple))
43    # To be safe, round the above value to units of 3600 s (1 hour).
44    return round(time_shift_in_seconds / 3600.0) * 3600
45
46# Difference between local times of server and client. If 0.0, server
47# and client use the same timezone.
48#EXPECTED_TIME_SHIFT = utc_local_time_shift()
49# Pure-FTPd seems to have changed its mind (see docstring of
50# `utc_local_time_shift`).
51EXPECTED_TIME_SHIFT = 0.0
52
53
54class Cleaner(object):
55    """
56    This class helps remove directories and files which might
57    otherwise be left behind if a test fails in unexpected ways.
58    """
59
60    def __init__(self, host):
61        # The test class (probably `RealFTPTest`) and the helper
62        # class share the same `FTPHost` object.
63        self._host = host
64        self._ftp_items = []
65
66    def add_dir(self, path):
67        """Schedule a directory with path `path` for removal."""
68        self._ftp_items.append(("d", self._host.path.abspath(path)))
69
70    def add_file(self, path):
71        """Schedule a file with path `path` for removal."""
72        self._ftp_items.append(("f", self._host.path.abspath(path)))
73
74    def clean(self):
75        """
76        Remove the directories and files previously remembered.
77        The removal works in reverse order of the scheduling with
78        `add_dir` and `add_file`.
79
80        Errors due to a removal are ignored.
81        """
82        self._host.chdir("/")
83        for type_, path in reversed(self._ftp_items):
84            try:
85                if type_ == "d":
86                    # If something goes wrong in `rmtree` we might
87                    # leave a mess behind.
88                    self._host.rmtree(path)
89                elif type_ == "f":
90                    # Minor mess if `remove` fails
91                    self._host.remove(path)
92            except ftputil.error.FTPError:
93                pass
94
95
96class RealFTPTest(unittest.TestCase):
97
98    def setUp(self):
99        # Server, username, password.
100        self.login_data = ("localhost", "ftptest",
101                            "d605581757de5eb56d568a4419f4126e")
102        self.host = ftputil.FTPHost(*self.login_data)
103        self.cleaner = Cleaner(self.host)
104
105    def tearDown(self):
106        self.cleaner.clean()
107        self.host.close()
108
109    #
110    # Helper methods
111    #
112    def make_remote_file(self, path):
113        """Create a file on the FTP host."""
114        self.cleaner.add_file(path)
115        with self.host.open(path, "wb") as file_:
116            # Write something. Otherwise the FTP server might not update
117            # the time of last modification if the file existed before.
118            file_.write(b"\n")
119
120    def make_local_file(self):
121        """Create a file on the local host (= on the client side)."""
122        with open("_local_file_", "wb") as fobj:
123            fobj.write(b"abc\x12\x34def\t")
124
125
126class TestMkdir(RealFTPTest):
127
128    def test_mkdir_rmdir(self):
129        host = self.host
130        dir_name = "_testdir_"
131        file_name = host.path.join(dir_name, "_nonempty_")
132        self.cleaner.add_dir(dir_name)
133        # Make dir and check if the directory is there.
134        host.mkdir(dir_name)
135        files = host.listdir(host.curdir)
136        self.assertTrue(dir_name in files)
137        # Try to remove a non-empty directory.
138        self.cleaner.add_file(file_name)
139        non_empty = host.open(file_name, "w")
140        non_empty.close()
141        self.assertRaises(ftputil.error.PermanentError, host.rmdir, dir_name)
142        # Remove file.
143        host.unlink(file_name)
144        # `remove` on a directory should fail.
145        try:
146            try:
147                host.remove(dir_name)
148            except ftputil.error.PermanentError as exc:
149                self.assertTrue(str(exc).startswith(
150                                "remove/unlink can only delete files"))
151            else:
152                self.assertTrue(False, "we shouldn't have come here")
153        finally:
154            # Delete empty directory.
155            host.rmdir(dir_name)
156        files = host.listdir(host.curdir)
157        self.assertTrue(dir_name not in files)
158
159    def test_makedirs_without_existing_dirs(self):
160        host = self.host
161        # No `_dir1_` yet
162        self.assertFalse("_dir1_" in host.listdir(host.curdir))
163        # Vanilla case, all should go well.
164        host.makedirs("_dir1_/dir2/dir3/dir4")
165        self.cleaner.add_dir("_dir1_")
166        # Check host.
167        self.assertTrue(host.path.isdir("_dir1_"))
168        self.assertTrue(host.path.isdir("_dir1_/dir2"))
169        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3"))
170        self.assertTrue(host.path.isdir("_dir1_/dir2/dir3/dir4"))
171
172    def test_makedirs_from_non_root_directory(self):
173        # This is a testcase for issue #22, see
174        # http://ftputil.sschwarzer.net/trac/ticket/22 .
175        host = self.host
176        # No `_dir1_` and `_dir2_` yet
177        self.assertFalse("_dir1_" in host.listdir(host.curdir))
178        self.assertFalse("_dir2_" in host.listdir(host.curdir))
179        # Part 1: Try to make directories starting from `_dir1_` and
180        # change to non-root directory.
181        self.cleaner.add_dir("_dir1_")
182        host.mkdir("_dir1_")
183        host.chdir("_dir1_")
184        host.makedirs("_dir2_/_dir3_")
185        # Test for expected directory hierarchy.
186        self.assertTrue (host.path.isdir("/_dir1_"))
187        self.assertTrue (host.path.isdir("/_dir1_/_dir2_"))
188        self.assertTrue (host.path.isdir("/_dir1_/_dir2_/_dir3_"))
189        self.assertFalse(host.path.isdir("/_dir1_/_dir1_"))
190        # Remove all but the directory we're in.
191        host.rmdir("/_dir1_/_dir2_/_dir3_")
192        host.rmdir("/_dir1_/_dir2_")
193        # Part 2: Try to make directories starting from root.
194        self.cleaner.add_dir("/_dir2_")
195        host.makedirs("/_dir2_/_dir3_")
196        # Test for expected directory hierarchy
197        self.assertTrue (host.path.isdir("/_dir2_"))
198        self.assertTrue (host.path.isdir("/_dir2_/_dir3_"))
199        self.assertFalse(host.path.isdir("/_dir1_/_dir2_"))
200
201    def test_makedirs_of_existing_directory(self):
202        host = self.host
203        # The (chrooted) login directory
204        host.makedirs("/")
205
206    def test_makedirs_with_file_in_the_way(self):
207        host = self.host
208        self.cleaner.add_dir("_dir1_")
209        host.mkdir("_dir1_")
210        self.make_remote_file("_dir1_/file1")
211        # Try it.
212        self.assertRaises(ftputil.error.PermanentError,
213                          host.makedirs, "_dir1_/file1")
214        self.assertRaises(ftputil.error.PermanentError,
215                          host.makedirs, "_dir1_/file1/dir2")
216
217    def test_makedirs_with_existing_directory(self):
218        host = self.host
219        self.cleaner.add_dir("_dir1_")
220        host.mkdir("_dir1_")
221        host.makedirs("_dir1_/dir2")
222        # Check
223        self.assertTrue(host.path.isdir("_dir1_"))
224        self.assertTrue(host.path.isdir("_dir1_/dir2"))
225
226    def test_makedirs_in_non_writable_directory(self):
227        host = self.host
228        # Preparation: `rootdir1` exists but is only writable by root.
229        self.assertRaises(ftputil.error.PermanentError, host.makedirs,
230                          "rootdir1/dir2")
231
232    def test_makedirs_with_writable_directory_at_end(self):
233        host = self.host
234        self.cleaner.add_dir("rootdir2/dir2")
235        # Preparation: `rootdir2` exists but is only writable by root.
236        # `dir2` is writable by regular ftp user. These both should work.
237        host.makedirs("rootdir2/dir2")
238        host.makedirs("rootdir2/dir2/dir3")
239
240
241class TestRemoval(RealFTPTest):
242
243    def test_rmtree_without_error_handler(self):
244        host = self.host
245        # Build a tree.
246        self.cleaner.add_dir("_dir1_")
247        host.makedirs("_dir1_/dir2")
248        self.make_remote_file("_dir1_/file1")
249        self.make_remote_file("_dir1_/file2")
250        self.make_remote_file("_dir1_/dir2/file3")
251        self.make_remote_file("_dir1_/dir2/file4")
252        # Try to remove a _file_ with `rmtree`.
253        self.assertRaises(ftputil.error.PermanentError, host.rmtree,
254                          "_dir1_/file2")
255        # Remove `dir2`.
256        host.rmtree("_dir1_/dir2")
257        self.assertFalse(host.path.exists("_dir1_/dir2"))
258        self.assertTrue (host.path.exists("_dir1_/file2"))
259        # Re-create `dir2` and remove `_dir1_`.
260        host.mkdir("_dir1_/dir2")
261        self.make_remote_file("_dir1_/dir2/file3")
262        self.make_remote_file("_dir1_/dir2/file4")
263        host.rmtree("_dir1_")
264        self.assertFalse(host.path.exists("_dir1_"))
265
266    def test_rmtree_with_error_handler(self):
267        host = self.host
268        self.cleaner.add_dir("_dir1_")
269        host.mkdir("_dir1_")
270        self.make_remote_file("_dir1_/file1")
271        # Prepare error "handler"
272        log = []
273        def error_handler(*args):
274            log.append(args)
275        # Try to remove a file as root "directory".
276        host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler)
277        self.assertEqual(log, [])
278        host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler)
279        self.assertEqual(log[0][0], host.listdir)
280        self.assertEqual(log[0][1], "_dir1_/file1")
281        self.assertEqual(log[1][0], host.rmdir)
282        self.assertEqual(log[1][1], "_dir1_/file1")
283        host.rmtree("_dir1_")
284        # Try to remove a non-existent directory.
285        del log[:]
286        host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler)
287        self.assertEqual(log[0][0], host.listdir)
288        self.assertEqual(log[0][1], "_dir1_")
289        self.assertEqual(log[1][0], host.rmdir)
290        self.assertEqual(log[1][1], "_dir1_")
291
292    def test_remove_non_existent_item(self):
293        host = self.host
294        self.assertRaises(ftputil.error.PermanentError, host.remove,
295                          "nonexistent")
296
297    def test_remove_existing_file(self):
298        self.cleaner.add_file("_testfile_")
299        self.make_remote_file("_testfile_")
300        host = self.host
301        self.assertTrue(host.path.isfile("_testfile_"))
302        host.remove("_testfile_")
303        self.assertFalse(host.path.exists("_testfile_"))
304
305
306class TestWalk(RealFTPTest):
307
308    def test_walk_topdown(self):
309        # Preparation: build tree in directory `walk_test`.
310        host = self.host
311        expected = [
312          ("walk_test",
313           ["dir1", "dir2", "dir3"],
314           ["file4"]),
315
316          ("walk_test/dir1",
317           ["dir11", "dir12"],
318           []),
319
320          ("walk_test/dir1/dir11",
321           [],
322           []),
323
324          ("walk_test/dir1/dir12",
325           ["dir123"],
326           ["file121", "file122"]),
327
328          ("walk_test/dir1/dir12/dir123",
329           [],
330           ["file1234"]),
331
332          ("walk_test/dir2",
333           [],
334           []),
335
336          ("walk_test/dir3",
337           ["dir33"],
338           ["file31", "file32"]),
339
340          ("walk_test/dir3/dir33",
341           [],
342           []),
343          ]
344        # Collect data using `walk`.
345        actual = []
346        for items in host.walk("walk_test"):
347            actual.append(items)
348        # Compare with expected results.
349        self.assertEqual(len(actual), len(expected))
350        for index, _ in enumerate(actual):
351            self.assertEqual(actual[index], expected[index])
352
353    def test_walk_depth_first(self):
354        # Preparation: build tree in directory `walk_test`
355        host = self.host
356        expected = [
357          ("walk_test/dir1/dir11",
358           [],
359           []),
360
361          ("walk_test/dir1/dir12/dir123",
362           [],
363           ["file1234"]),
364
365          ("walk_test/dir1/dir12",
366           ["dir123"],
367           ["file121", "file122"]),
368
369          ("walk_test/dir1",
370           ["dir11", "dir12"],
371           []),
372
373          ("walk_test/dir2",
374           [],
375           []),
376
377          ("walk_test/dir3/dir33",
378           [],
379           []),
380
381          ("walk_test/dir3",
382           ["dir33"],
383           ["file31", "file32"]),
384
385          ("walk_test",
386           ["dir1", "dir2", "dir3"],
387           ["file4"])
388          ]
389        # Collect data using `walk`.
390        actual = []
391        for items in host.walk("walk_test", topdown=False):
392            actual.append(items)
393        # Compare with expected results.
394        self.assertEqual(len(actual), len(expected))
395        for index, _ in enumerate(actual):
396            self.assertEqual(actual[index], expected[index])
397
398
399class TestRename(RealFTPTest):
400
401    def test_rename(self):
402        host = self.host
403        # Make sure the target of the renaming operation is removed.
404        self.cleaner.add_file("_testfile2_")
405        self.make_remote_file("_testfile1_")
406        host.rename("_testfile1_", "_testfile2_")
407        self.assertFalse(host.path.exists("_testfile1_"))
408        self.assertTrue (host.path.exists("_testfile2_"))
409
410    def test_rename_with_spaces_in_directory(self):
411        host = self.host
412        dir_name = "_dir with spaces_"
413        self.cleaner.add_dir(dir_name)
414        host.mkdir(dir_name)
415        self.make_remote_file(dir_name + "/testfile1")
416        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
417        self.assertFalse(host.path.exists(dir_name + "/testfile1"))
418        self.assertTrue (host.path.exists(dir_name + "/testfile2"))
419
420
421class TestStat(RealFTPTest):
422
423    def test_stat(self):
424        host = self.host
425        dir_name = "_testdir_"
426        file_name = host.path.join(dir_name, "_nonempty_")
427        # Make a directory and a file in it.
428        self.cleaner.add_dir(dir_name)
429        host.mkdir(dir_name)
430        with host.open(file_name, "wb") as fobj:
431            fobj.write(b"abc\x12\x34def\t")
432        # Do some stats
433        # - dir
434        dir_stat = host.stat(dir_name)
435        self.assertTrue(isinstance(dir_stat._st_name,
436                                   ftputil.compat.unicode_type))
437        self.assertEqual(host.listdir(dir_name), ["_nonempty_"])
438        self.assertTrue (host.path.isdir (dir_name))
439        self.assertFalse(host.path.isfile(dir_name))
440        self.assertFalse(host.path.islink(dir_name))
441        # - file
442        file_stat = host.stat(file_name)
443        self.assertTrue(isinstance(file_stat._st_name,
444                                   ftputil.compat.unicode_type))
445        self.assertFalse(host.path.isdir (file_name))
446        self.assertTrue (host.path.isfile(file_name))
447        self.assertFalse(host.path.islink(file_name))
448        self.assertEqual(host.path.getsize(file_name), 9)
449        # - file's modification time; allow up to two minutes difference
450        host.synchronize_times()
451        server_mtime = host.path.getmtime(file_name)
452        client_mtime = time.mktime(time.localtime())
453        calculated_time_shift = server_mtime - client_mtime
454        self.assertFalse(abs(calculated_time_shift-host.time_shift()) > 120)
455
456    def test_issomething_for_nonexistent_directory(self):
457        host = self.host
458        # Check if we get the right results if even the containing
459        # directory doesn't exist (see ticket #66).
460        nonexistent_path = "/nonexistent/nonexistent"
461        self.assertFalse(host.path.isdir (nonexistent_path))
462        self.assertFalse(host.path.isfile(nonexistent_path))
463        self.assertFalse(host.path.islink(nonexistent_path))
464
465    def test_special_broken_link(self):
466        # Test for ticket #39.
467        host = self.host
468        broken_link_name = os.path.join("dir_with_broken_link", "nonexistent")
469        self.assertEqual(host.lstat(broken_link_name)._st_target,
470                         "../nonexistent/nonexistent")
471        self.assertFalse(host.path.isdir (broken_link_name))
472        self.assertFalse(host.path.isfile(broken_link_name))
473        self.assertTrue (host.path.islink(broken_link_name))
474
475    def test_concurrent_access(self):
476        self.make_remote_file("_testfile_")
477        with ftputil.FTPHost(*self.login_data) as host1:
478            with ftputil.FTPHost(*self.login_data) as host2:
479                stat_result1 = host1.stat("_testfile_")
480                stat_result2 = host2.stat("_testfile_")
481                self.assertEqual(stat_result1, stat_result2)
482                host2.remove("_testfile_")
483                # Can still get the result via `host1`
484                stat_result1 = host1.stat("_testfile_")
485                self.assertEqual(stat_result1, stat_result2)
486                # Stat'ing on `host2` gives an exception.
487                self.assertRaises(ftputil.error.PermanentError,
488                                  host2.stat, "_testfile_")
489                # Stat'ing on `host1` after invalidation
490                absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
491                host1.stat_cache.invalidate(absolute_path)
492                self.assertRaises(ftputil.error.PermanentError,
493                                  host1.stat, "_testfile_")
494
495    def test_cache_auto_resizing(self):
496        """Test if the cache is resized appropriately."""
497        host = self.host
498        cache = host.stat_cache._cache
499        # Make sure the cache size isn't adjusted towards smaller values.
500        unused_entries = host.listdir("walk_test")
501        self.assertEqual(cache.size,
502                         ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE)
503        # Make the cache very small initially and see if it gets resized.
504        cache.size = 2
505        entries = host.listdir("walk_test")
506        # The adjusted cache size should be larger or equal to the
507        # number of items in `walk_test` and its parent directory. The
508        # latter is read implicitly upon `listdir`'s `isdir` call.
509        expected_min_cache_size = max(len(host.listdir(host.curdir)),
510                                      len(entries))
511        self.assertTrue(cache.size >= expected_min_cache_size)
512
513
514class TestUploadAndDownload(RealFTPTest):
515    """Test upload and download (including time shift test)."""
516
517    def test_time_shift(self):
518        self.host.synchronize_times()
519        self.assertEqual(self.host.time_shift(), EXPECTED_TIME_SHIFT)
520
521    @test.skip_long_running_test
522    def test_upload(self):
523        host = self.host
524        host.synchronize_times()
525        local_file = "_local_file_"
526        remote_file = "_remote_file_"
527        # Make local file to upload.
528        self.make_local_file()
529        # Wait, else small time differences between client and server
530        # actually could trigger the update.
531        time.sleep(65)
532        try:
533            self.cleaner.add_file(remote_file)
534            host.upload(local_file, remote_file)
535            # Retry; shouldn't be uploaded
536            uploaded = host.upload_if_newer(local_file, remote_file)
537            self.assertEqual(uploaded, False)
538            # Rewrite the local file.
539            self.make_local_file()
540            # Retry; should be uploaded now
541            uploaded = host.upload_if_newer(local_file, remote_file)
542            self.assertEqual(uploaded, True)
543        finally:
544            # Clean up
545            os.unlink(local_file)
546
547    @test.skip_long_running_test
548    def test_download(self):
549        host = self.host
550        host.synchronize_times()
551        local_file = "_local_file_"
552        remote_file = "_remote_file_"
553        # Make a remote file.
554        self.make_remote_file(remote_file)
555        # File should be downloaded as it's not present yet.
556        downloaded = host.download_if_newer(remote_file, local_file)
557        self.assertEqual(downloaded, True)
558        try:
559            # If the remote file, taking the datetime precision into
560            # account, _might_ be newer, the file will be downloaded
561            # again. To prevent this, wait a bit over a minute (the
562            # remote precision), then "touch" the local file.
563            time.sleep(65)
564            # Create empty file.
565            with open(local_file, "w") as fobj:
566                pass
567            # Local file is present and newer, so shouldn't download.
568            downloaded = host.download_if_newer(remote_file, local_file)
569            self.assertEqual(downloaded, False)
570            # Re-make the remote file.
571            self.make_remote_file(remote_file)
572            # Local file is present but possibly older (taking the
573            # possible deviation because of the precision into account),
574            # so should download.
575            downloaded = host.download_if_newer(remote_file, local_file)
576            self.assertEqual(downloaded, True)
577        finally:
578            # Clean up.
579            os.unlink(local_file)
580
581    def test_callback_with_transfer(self):
582        host = self.host
583        FILE_NAME = "debian-keyring.tar.gz"
584        # Default chunk size as in `FTPHost.copyfileobj`
585        MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE
586        file_size = host.path.getsize(FILE_NAME)
587        chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE)
588        # Add one chunk for remainder.
589        chunk_count += 1
590        # Define a callback that just collects all data passed to it.
591        transferred_chunks_list = []
592        def test_callback(chunk):
593            transferred_chunks_list.append(chunk)
594        try:
595            host.download(FILE_NAME, FILE_NAME, callback=test_callback)
596            # Construct a list of data chunks we expect.
597            expected_chunks_list = []
598            with open(FILE_NAME, "rb") as downloaded_fobj:
599                while True:
600                    chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE)
601                    if not chunk:
602                        break
603                    expected_chunks_list.append(chunk)
604            # Examine data collected by callback function.
605            self.assertEqual(len(transferred_chunks_list), chunk_count)
606            self.assertEqual(transferred_chunks_list, expected_chunks_list)
607        finally:
608            os.unlink(FILE_NAME)
609
610
611class TestFTPFiles(RealFTPTest):
612
613    def test_only_closed_children(self):
614        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
615        host = self.host
616        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
617            # Create empty file and close it.
618            with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
619                pass
620            # This should re-use the second child because the first isn't
621            # closed but the second is.
622            with host.open(REMOTE_FILE_NAME, "rb") as file_obj:
623                self.assertEqual(len(host._children), 2)
624                self.assertTrue(file_obj._host is host._children[1])
625
626    def test_no_timed_out_children(self):
627        REMOTE_FILE_NAME = "debian-keyring.tar.gz"
628        host = self.host
629        # Implicitly create child host object.
630        with host.open(REMOTE_FILE_NAME, "rb") as file_obj1:
631            pass
632        # Monkey-patch file to simulate an FTP server timeout below.
633        def timed_out_pwd():
634            raise ftplib.error_temp("simulated timeout")
635        file_obj1._host._session.pwd = timed_out_pwd
636        # Try to get a file - which shouldn't be the timed-out file.
637        with host.open(REMOTE_FILE_NAME, "rb") as file_obj2:
638            self.assertTrue(file_obj1 is not file_obj2)
639        # Re-use closed and not timed-out child session.
640        with host.open(REMOTE_FILE_NAME, "rb") as file_obj3:
641            pass
642        self.assertTrue(file_obj2 is file_obj3)
643
644
645class TestChmod(RealFTPTest):
646
647    def assert_mode(self, path, expected_mode):
648        """
649        Return an integer containing the allowed bits in the mode
650        change command.
651
652        The `FTPHost` object to test against is `self.host`.
653        """
654        full_mode = self.host.stat(path).st_mode
655        # Remove flags we can't set via `chmod`.
656        # Allowed flags according to Python documentation
657        # http://docs.python.org/lib/os-file-dir.html .
658        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
659          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
660          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
661          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
662          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
663        allowed_mask = functools.reduce(operator.or_, allowed_flags)
664        mode = full_mode & allowed_mask
665        self.assertEqual(mode, expected_mode,
666                         "mode {0:o} != {1:o}".format(mode, expected_mode))
667
668    def test_chmod_existing_directory(self):
669        host = self.host
670        host.mkdir("_test dir_")
671        self.cleaner.add_dir("_test dir_")
672        # Set/get mode of a directory.
673        host.chmod("_test dir_", 0o757)
674        self.assert_mode("_test dir_", 0o757)
675        # Set/get mode in nested directory.
676        host.mkdir("_test dir_/nested_dir")
677        self.cleaner.add_dir("_test dir_/nested_dir")
678        host.chmod("_test dir_/nested_dir", 0o757)
679        self.assert_mode("_test dir_/nested_dir", 0o757)
680
681    def test_chmod_existing_file(self):
682        host = self.host
683        host.mkdir("_test dir_")
684        self.cleaner.add_dir("_test dir_")
685        # Set/get mode on a file.
686        file_name = host.path.join("_test dir_", "_testfile_")
687        self.make_remote_file(file_name)
688        host.chmod(file_name, 0o646)
689        self.assert_mode(file_name, 0o646)
690
691    def test_chmod_nonexistent_path(self):
692        # Set/get mode of a non-existing item.
693        self.assertRaises(ftputil.error.PermanentError, self.host.chmod,
694                          "nonexistent", 0o757)
695
696    def test_cache_invalidation(self):
697        host = self.host
698        host.mkdir("_test dir_")
699        self.cleaner.add_dir("_test dir_")
700        # Make sure the mode is in the cache.
701        unused_stat_result = host.stat("_test dir_")
702        # Set/get mode of the directory.
703        host.chmod("_test dir_", 0o757)
704        self.assert_mode("_test dir_", 0o757)
705        # Set/get mode on a file.
706        file_name = host.path.join("_test dir_", "_testfile_")
707        self.make_remote_file(file_name)
708        # Make sure the mode is in the cache.
709        unused_stat_result = host.stat(file_name)
710        host.chmod(file_name, 0o646)
711        self.assert_mode(file_name, 0o646)
712
713
714class TestOther(RealFTPTest):
715
716    def test_open_for_reading(self):
717        # Test for issues #17 and #51,
718        # http://ftputil.sschwarzer.net/trac/ticket/17 and
719        # http://ftputil.sschwarzer.net/trac/ticket/51 .
720        file1 = self.host.open("debian-keyring.tar.gz", "rb")
721        time.sleep(1)
722        # Depending on the FTP server, this might return a status code
723        # unexpected by `ftplib` or block the socket connection until
724        # a server-side timeout.
725        file1.close()
726
727    def test_subsequent_reading(self):
728        # Opening a file for reading
729        with self.host.open("debian-keyring.tar.gz", "rb") as file1:
730            pass
731        # Make sure that there are no problems if the connection is reused.
732        with self.host.open("debian-keyring.tar.gz", "rb") as file2:
733            pass
734        self.assertTrue(file1._session is file2._session)
735
736    def test_names_with_spaces(self):
737        # Test if directories and files with spaces in their names
738        # can be used.
739        host = self.host
740        self.assertTrue(host.path.isdir("dir with spaces"))
741        self.assertEqual(host.listdir("dir with spaces"),
742                         ["second dir", "some file", "some_file"])
743        self.assertTrue(host.path.isdir ("dir with spaces/second dir"))
744        self.assertTrue(host.path.isfile("dir with spaces/some_file"))
745        self.assertTrue(host.path.isfile("dir with spaces/some file"))
746
747    def test_synchronize_times_without_write_access(self):
748        """Test failing synchronization because of non-writable directory."""
749        host = self.host
750        # This isn't writable by the ftp account the tests are run under.
751        host.chdir("rootdir1")
752        self.assertRaises(ftputil.error.TimeShiftError, host.synchronize_times)
753
754    def test_bytes_file_name(self):
755        """
756        Test whether a UTF-8 file name can be sent and retrieved when
757        encoded explicitly.
758        """
759        # This test below would fail under Python 2 and I have no idea
760        # how to make it work there without modifying `ftplib`.
761        if ftputil.compat.python_version == 2:
762            return
763        #
764        host = self.host
765        # This requires an existing file with an UTF-8 encoded name on
766        # the remote file system. Note: If the remote file system
767        # doesn't use UTF-8, the test will probably succeed anyway.
768        FILE_NAME = "_ütf8_file_näme_♯♭_"
769        bytes_file_name = FILE_NAME.encode("UTF-8")
770        self.cleaner.add_file(bytes_file_name)
771        # Write under name `bytes_file_name`
772        with host.open(bytes_file_name, "w", encoding="UTF-8") as fobj:
773            fobj.write(FILE_NAME)
774        # Try to retrieve file with `listdir`.
775        items = host.listdir(b".")
776        self.assertTrue(bytes_file_name in items)
777        #  When getting a directory listing for a unicode directory,
778        #  ftputil will implicitly assume the encoding is latin-1 and
779        #  won't decode the file name to something different from
780        #  `bytes_file_name`.
781        items = host.listdir(".")
782        self.assertFalse(FILE_NAME in items)
783        # Re-open file.
784        with host.open(bytes_file_name, "r", encoding="UTF-8") as fobj:
785            data = fobj.read()
786        # Not the point of this test, but an additional sanity check
787        self.assertEqual(data, FILE_NAME)
788
789    def test_list_a_option(self):
790        # For this test to pass, the server must _not_ list "hidden"
791        # files by default but instead only when the `LIST` `-a`
792        # option is used.
793        host = self.host
794        self.assertTrue(host.use_list_a_option)
795        directory_entries = host.listdir(host.curdir)
796        self.assertTrue(".hidden" in directory_entries)
797        host.use_list_a_option = False
798        directory_entries = host.listdir(host.curdir)
799        self.assertFalse(".hidden" in directory_entries)
800
801    def _make_objects_to_be_garbage_collected(self):
802        for _ in range(10):
803            with ftputil.FTPHost(*self.login_data) as host:
804                for _ in range(10):
805                    unused_stat_result = host.stat("CONTENTS")
806                    with host.open("CONTENTS") as fobj:
807                        unused_data = fobj.read()
808
809    def test_garbage_collection(self):
810        """Test whether there are cycles which prevent garbage collection."""
811        gc.collect()
812        objects_before_test = len(gc.garbage)
813        self._make_objects_to_be_garbage_collected()
814        gc.collect()
815        objects_after_test = len(gc.garbage)
816        self.assertFalse(objects_after_test - objects_before_test)
817
818
819if __name__ == "__main__":
820    print("""\
821Test real FTP access.
822
823This test writes some files and directories on the local client and
824the remote server. You'll need write access in the login directory.
825This test can take a few minutes because it has to wait to test the
826timezone calculation.
827    """)
828    unittest.main()
Note: See TracBrowser for help on using the repository browser.