source: _test_real_ftp.py @ 734:768b047b51ac

Last change on this file since 734:768b047b51ac was 734:768b047b51ac, checked in by Stefan Schwarzer <sschwarzer@…>, 13 years ago
Fixed test; I was too hasty here (thanks again, Tom).
File size: 22.0 KB
Line 
1# Copyright (C) 2003-2008, Stefan Schwarzer
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# - Redistributions of source code must retain the above copyright
9#   notice, this list of conditions and the following disclaimer.
10#
11# - Redistributions in binary form must reproduce the above copyright
12#   notice, this list of conditions and the following disclaimer in the
13#   documentation and/or other materials provided with the distribution.
14#
15# - Neither the name of the above author nor the names of the
16#   contributors to the software may be used to endorse or promote
17#   products derived from this software without specific prior written
18#   permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
24# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32# $Id$
33
34# Execute a test on a real FTP server (other tests use a mock server)
35
36import getpass
37import operator
38import os
39import time
40import unittest
41import stat
42import sys
43
44import ftputil
45from ftputil import ftp_error
46from ftputil import ftp_stat
47
48
49def get_login_data():
50    """
51    Return a three-element tuple consisting of server name, user id
52    and password. The data - used to be - requested interactively.
53    """
54    #server = raw_input("Server: ")
55    #user = raw_input("User: ")
56    #password = getpass.getpass()
57    #return server, user, password
58    return "localhost", 'ftptest', 'd605581757de5eb56d568a4419f4126e'
59
60def utc_local_time_shift():
61    """
62    Return the expected time shift in seconds assuming the server
63    uses UTC in its listings and the client uses local time.
64
65    This is needed because Pure-FTPd meanwhile seems to insist that
66    the displayed time for files is in UTC.
67    """
68    utc_tuple = time.gmtime()
69    localtime_tuple = time.localtime()
70    # to calculate the correct times shift, we need to ignore the
71    #  DST component in the localtime tuple, i. e. set it to 0
72    localtime_tuple = localtime_tuple[:-1] + (0,)
73    time_shift_in_seconds = time.mktime(utc_tuple) - \
74                            time.mktime(localtime_tuple)
75    # to be safe, round the above value to units of 3600 s (1 h)
76    return round(time_shift_in_seconds / 3600.0) * 3600
77
78# difference between local times of server and client; if 0.0, server
79#  and client use the same timezone
80EXPECTED_TIME_SHIFT = utc_local_time_shift()
81
82
83class Cleaner(object):
84    """This class helps to remove directories and files which
85    might be left behind if a test fails in unexpected ways.
86    """
87
88    def __init__(self, host):
89        # the test class (probably `RealFTPTest`) and the helper
90        #  class share the same `FTPHost` object
91        self._host = host
92        self._ftp_items = []
93
94    def add_dir(self, path):
95        """Schedule a directory with path `path` for removal."""
96        self._ftp_items.append(('d', self._host.path.abspath(path)))
97
98    def add_file(self, path):
99        """Schedule a file with path `path` for removal."""
100        self._ftp_items.append(('f', self._host.path.abspath(path)))
101
102    def clean(self):
103        """Remove the directories and files previously remembered.
104        The removal works in reverse order of the scheduling with
105        `add_dir` and `add_file`.
106
107        Errors due to a removal are ignored.
108        """
109        self._host.chdir("/")
110        # code should work with Python 2.3
111        self._ftp_items.reverse()
112        for type_, path in self._ftp_items:
113            try:
114                if type_ == 'd':
115                    # if something goes wrong in `rmtree` we might
116                    #  leave a mess behind
117                    self._host.rmtree(path)
118                elif type_ == 'f':
119                    # minor mess if `remove` fails
120                    self._host.remove(path)
121            except ftp_error.FTPError:
122                pass
123
124
125class RealFTPTest(unittest.TestCase):
126    def setUp(self):
127        self.host = ftputil.FTPHost(server, user, password)
128        self.cleaner = Cleaner(self.host)
129
130    def tearDown(self):
131        self.cleaner.clean()
132        self.host.close()
133
134    #
135    # helper methods
136    #
137    def make_file(self, path):
138        self.cleaner.add_file(path)
139        file_ = self.host.file(path, 'wb')
140        file_.close()
141
142    def make_local_file(self):
143        fobj = file('_localfile_', 'wb')
144        fobj.write("abc\x12\x34def\t")
145        fobj.close()
146
147    #
148    # `mkdir`, `makedirs`, `rmdir` and `rmtree`
149    #
150    def test_mkdir_rmdir(self):
151        host = self.host
152        dir_name = "_testdir_"
153        file_name = host.path.join(dir_name, "_nonempty_")
154        self.cleaner.add_dir(dir_name)
155        # make dir and check if it's there
156        host.mkdir(dir_name)
157        files = host.listdir(host.curdir)
158        self.failIf(dir_name not in files)
159        # try to remove non-empty directory
160        self.cleaner.add_file(file_name)
161        non_empty = host.file(file_name, "w")
162        non_empty.close()
163        self.assertRaises(ftp_error.PermanentError, host.rmdir, dir_name)
164        # remove file
165        host.unlink(file_name)
166        # `remove` on a directory should fail
167        try:
168            try:
169                host.remove(dir_name)
170            except ftp_error.PermanentError, exc:
171                self.failUnless(str(exc).startswith(
172                                "remove/unlink can only delete files"))
173            else:
174                self.failIf(True, "we shouldn't have come here")
175        finally:
176            # delete empty directory
177            host.rmdir(dir_name)
178        files = host.listdir(host.curdir)
179        self.failIf(dir_name in files)
180
181    def test_makedirs_without_existing_dirs(self):
182        host = self.host
183        # no `_dir1_` yet
184        self.failIf('_dir1_' in host.listdir(host.curdir))
185        # vanilla case, all should go well
186        host.makedirs('_dir1_/dir2/dir3/dir4')
187        self.cleaner.add_dir('_dir1_')
188        # check host
189        self.failUnless(host.path.isdir('_dir1_'))
190        self.failUnless(host.path.isdir('_dir1_/dir2'))
191        self.failUnless(host.path.isdir('_dir1_/dir2/dir3'))
192        self.failUnless(host.path.isdir('_dir1_/dir2/dir3/dir4'))
193
194    def test_makedirs_from_non_root_directory(self):
195        # this is a testcase for issue #22, see
196        #  http://ftputil.sschwarzer.net/trac/ticket/22
197        host = self.host
198        # no `_dir1_` and `_dir2_` yet
199        self.failIf('_dir1_' in host.listdir(host.curdir))
200        self.failIf('_dir2_' in host.listdir(host.curdir))
201        # part 1: try to make directories starting from `_dir1_`
202        # make and change to non-root directory
203        self.cleaner.add_dir("_dir1_")
204        host.mkdir('_dir1_')
205        host.chdir('_dir1_')
206        host.makedirs('_dir2_/_dir3_')
207        # test for expected directory hierarchy
208        self.failUnless(host.path.isdir('/_dir1_'))
209        self.failUnless(host.path.isdir('/_dir1_/_dir2_'))
210        self.failUnless(host.path.isdir('/_dir1_/_dir2_/_dir3_'))
211        self.failIf(host.path.isdir('/_dir1_/_dir1_'))
212        # remove all but the directory we're in
213        host.rmdir('/_dir1_/_dir2_/_dir3_')
214        host.rmdir('/_dir1_/_dir2_')
215        # part 2: try to make directories starting from root
216        self.cleaner.add_dir("/_dir2_")
217        host.makedirs('/_dir2_/_dir3_')
218        # test for expected directory hierarchy
219        self.failUnless(host.path.isdir('/_dir2_'))
220        self.failUnless(host.path.isdir('/_dir2_/_dir3_'))
221        self.failIf(host.path.isdir('/_dir1_/_dir2_'))
222
223    def test_makedirs_from_non_root_directory_fake_windows_os(self):
224        saved_sep = os.sep
225        os.sep = '\\'
226        try:
227            self.test_makedirs_from_non_root_directory()
228        finally:
229            os.sep = saved_sep
230
231    def test_makedirs_of_existing_directory(self):
232        host = self.host
233        # the (chrooted) login directory
234        host.makedirs('/')
235
236    def test_makedirs_with_file_in_the_way(self):
237        host = self.host
238        self.cleaner.add_dir('_dir1_')
239        host.mkdir('_dir1_')
240        self.make_file('_dir1_/file1')
241        # try it
242        self.assertRaises(ftp_error.PermanentError, host.makedirs,
243                          '_dir1_/file1')
244        self.assertRaises(ftp_error.PermanentError, host.makedirs,
245                          '_dir1_/file1/dir2')
246
247    def test_makedirs_with_existing_directory(self):
248        host = self.host
249        self.cleaner.add_dir("_dir1_")
250        host.mkdir('_dir1_')
251        host.makedirs('_dir1_/dir2')
252        # check
253        self.failUnless(host.path.isdir('_dir1_'))
254        self.failUnless(host.path.isdir('_dir1_/dir2'))
255
256    def test_makedirs_in_non_writable_directory(self):
257        host = self.host
258        # preparation: `rootdir1` exists but is only writable by root
259        self.assertRaises(ftp_error.PermanentError, host.makedirs,
260                          'rootdir1/dir2')
261
262    def test_makedirs_with_writable_directory_at_end(self):
263        host = self.host
264        self.cleaner.add_dir('rootdir2/dir2')
265        # preparation: `rootdir2` exists but is only writable by root;
266        #  `dir2` is writable by regular ftp user
267        # these both should work
268        host.makedirs('rootdir2/dir2')
269        host.makedirs('rootdir2/dir2/dir3')
270
271    def test_rmtree_without_error_handler(self):
272        host = self.host
273        # build a tree
274        self.cleaner.add_dir('_dir1_')
275        host.makedirs('_dir1_/dir2')
276        self.make_file('_dir1_/file1')
277        self.make_file('_dir1_/file2')
278        self.make_file('_dir1_/dir2/file3')
279        self.make_file('_dir1_/dir2/file4')
280        # try to remove a _file_ with `rmtree`
281        self.assertRaises(ftp_error.PermanentError, host.rmtree, '_dir1_/file2')
282        # remove dir2
283        host.rmtree('_dir1_/dir2')
284        self.failIf(host.path.exists('_dir1_/dir2'))
285        self.failUnless(host.path.exists('_dir1_/file2'))
286        # remake dir2 and remove _dir1_
287        host.mkdir('_dir1_/dir2')
288        self.make_file('_dir1_/dir2/file3')
289        self.make_file('_dir1_/dir2/file4')
290        host.rmtree('_dir1_')
291        self.failIf(host.path.exists('_dir1_'))
292
293    def test_rmtree_with_error_handler(self):
294        host = self.host
295        self.cleaner.add_dir('_dir1_')
296        host.mkdir('_dir1_')
297        self.make_file('_dir1_/file1')
298        # prepare error "handler"
299        log = []
300        def error_handler(*args):
301            log.append(args)
302        # try to remove a file as root "directory"
303        host.rmtree('_dir1_/file1', ignore_errors=True, onerror=error_handler)
304        self.assertEqual(log, [])
305        host.rmtree('_dir1_/file1', ignore_errors=False, onerror=error_handler)
306        self.assertEqual(log[0][0], host.listdir)
307        self.assertEqual(log[0][1], '_dir1_/file1')
308        self.assertEqual(log[1][0], host.rmdir)
309        self.assertEqual(log[1][1], '_dir1_/file1')
310        host.rmtree('_dir1_')
311        # try to remove a non-existent directory
312        del log[:]
313        host.rmtree('_dir1_', ignore_errors=False, onerror=error_handler)
314        self.assertEqual(log[0][0], host.listdir)
315        self.assertEqual(log[0][1], '_dir1_')
316        self.assertEqual(log[1][0], host.rmdir)
317        self.assertEqual(log[1][1], '_dir1_')
318
319    #
320    # directory tree walking
321    #
322    def test_walk_topdown(self):
323        # preparation: build tree in directory `walk_test`
324        host = self.host
325        expected = [
326          ('walk_test', ['dir1', 'dir2', 'dir3'], ['file4']),
327          ('walk_test/dir1', ['dir11', 'dir12'], []),
328          ('walk_test/dir1/dir11', [], []),
329          ('walk_test/dir1/dir12', ['dir123'], ['file121', 'file122']),
330          ('walk_test/dir1/dir12/dir123', [], ['file1234']),
331          ('walk_test/dir2', [], []),
332          ('walk_test/dir3', ['dir33'], ['file31', 'file32']),
333          ('walk_test/dir3/dir33', [], []),
334          ]
335        # collect data, using `walk`
336        actual = []
337        for items in host.walk('walk_test'):
338            actual.append(items)
339        # compare with expected results
340        self.assertEqual(len(actual), len(expected))
341        for index in range(len(actual)):
342            self.assertEqual(actual[index], expected[index])
343
344    def test_walk_depth_first(self):
345        # preparation: build tree in directory `walk_test`
346        host = self.host
347        expected = [
348          ('walk_test/dir1/dir11', [], []),
349          ('walk_test/dir1/dir12/dir123', [], ['file1234']),
350          ('walk_test/dir1/dir12', ['dir123'], ['file121', 'file122']),
351          ('walk_test/dir1', ['dir11', 'dir12'], []),
352          ('walk_test/dir2', [], []),
353          ('walk_test/dir3/dir33', [], []),
354          ('walk_test/dir3', ['dir33'], ['file31', 'file32']),
355          ('walk_test', ['dir1', 'dir2', 'dir3'], ['file4'])
356          ]
357        # collect data, using `walk`
358        actual = []
359        for items in host.walk('walk_test', topdown=False):
360            actual.append(items)
361        # compare with expected results
362        self.assertEqual(len(actual), len(expected))
363        for index in range(len(actual)):
364            self.assertEqual(actual[index], expected[index])
365
366    #
367    # renaming
368    #
369    def test_rename(self):
370        host = self.host
371        # make sure the target of the renaming operation is removed
372        self.cleaner.add_file('_testfile2_')
373        self.make_file("_testfile1_")
374        host.rename('_testfile1_', '_testfile2_')
375        self.failIf(host.path.exists('_testfile1_'))
376        self.failUnless(host.path.exists('_testfile2_'))
377        host.remove('_testfile2_')
378
379    def test_rename_with_spaces_in_directory(self):
380        host = self.host
381        dir_name = "_dir with spaces_"
382        self.cleaner.add_dir(dir_name)
383        host.mkdir(dir_name)
384        self.make_file(dir_name + "/testfile1")
385        host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
386        self.failIf(host.path.exists(dir_name + "/testfile1"))
387        self.failUnless(host.path.exists(dir_name + "/testfile2"))
388
389    #
390    # stat'ing
391    #
392    def test_stat(self):
393        host = self.host
394        dir_name = "_testdir_"
395        file_name = host.path.join(dir_name, "_nonempty_")
396        # make a directory and a file in it
397        self.cleaner.add_dir(dir_name)
398        host.mkdir(dir_name)
399        fobj = host.file(file_name, "wb")
400        fobj.write("abc\x12\x34def\t")
401        fobj.close()
402        # do some stats
403        # - dir
404        self.assertEqual(host.listdir(dir_name), ["_nonempty_"])
405        self.assertEqual(bool(host.path.isdir(dir_name)), True)
406        self.assertEqual(bool(host.path.isfile(dir_name)), False)
407        self.assertEqual(bool(host.path.islink(dir_name)), False)
408        # - file
409        self.assertEqual(bool(host.path.isdir(file_name)), False)
410        self.assertEqual(bool(host.path.isfile(file_name)), True)
411        self.assertEqual(bool(host.path.islink(file_name)), False)
412        self.assertEqual(host.path.getsize(file_name), 9)
413        # - file's modification time; allow up to two minutes difference
414        host.synchronize_times()
415        server_mtime = host.path.getmtime(file_name)
416        client_mtime = time.mktime(time.localtime())
417        calculated_time_shift = server_mtime - client_mtime
418        self.failIf(abs(calculated_time_shift-host.time_shift()) > 120)
419
420    def test_concurrent_access(self):
421        self.make_file("_testfile_")
422        host1 = ftputil.FTPHost(server, user, password)
423        host2 = ftputil.FTPHost(server, user, password)
424        stat_result1 = host1.stat("_testfile_")
425        stat_result2 = host2.stat("_testfile_")
426        self.assertEqual(stat_result1, stat_result2)
427        host2.remove("_testfile_")
428        # can still get the result via `host1`
429        stat_result1 = host1.stat("_testfile_")
430        self.assertEqual(stat_result1, stat_result2)
431        # stat'ing on `host2` gives an exception
432        self.assertRaises(ftp_error.PermanentError, host2.stat, "_testfile_")
433        # stat'ing on `host1` after invalidation
434        absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
435        host1.stat_cache.invalidate(absolute_path)
436        self.assertRaises(ftp_error.PermanentError, host1.stat, "_testfile_")
437
438    #
439    # `upload` (including time shift test)
440    #
441    def test_time_shift(self):
442        self.host.synchronize_times()
443        self.assertEqual(self.host.time_shift(), EXPECTED_TIME_SHIFT)
444
445    def test_upload(self):
446        host = self.host
447        host.synchronize_times()
448        # make local file and upload it
449        self.make_local_file()
450        # wait; else small time differences between client and server
451        #  actually could trigger the update
452        time.sleep(65)
453        try:
454            self.cleaner.add_file('_remotefile_')
455            host.upload('_localfile_', '_remotefile_', 'b')
456            # retry; shouldn't be uploaded
457            uploaded = host.upload_if_newer('_localfile_', '_remotefile_', 'b')
458            self.assertEqual(uploaded, False)
459            # rewrite the local file
460            self.make_local_file()
461            time.sleep(65)
462            # retry; should be uploaded now
463            uploaded = host.upload_if_newer('_localfile_', '_remotefile_', 'b')
464            self.assertEqual(uploaded, True)
465        finally:
466            # clean up
467            os.unlink('_localfile_')
468
469    #
470    # `chmod`
471    #
472    def assert_mode(self, path, expected_mode):
473        """Return an integer containing the allowed bits in the
474        mode change command.
475
476        The `FTPHost` object to test against is `self.host`.
477        """
478        full_mode = self.host.stat(path).st_mode
479        # remove flags we can't set via `chmod`
480        # allowed flags according to Python documentation
481        #  http://docs.python.org/lib/os-file-dir.html
482        allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
483          stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
484          stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
485          stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
486          stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
487        allowed_mask = reduce(operator.or_, allowed_flags)
488        mode = full_mode & allowed_mask
489        self.assertEqual(mode, expected_mode,
490                         "mode %s != %s" % (oct(mode), oct(expected_mode)))
491
492    def test_chmod_existing_directory(self):
493        host = self.host
494        host.mkdir("_test dir_")
495        self.cleaner.add_dir("_test dir_")
496        # set/get mode of a directory
497        host.chmod("_test dir_", 0757)
498        self.assert_mode("_test dir_", 0757)
499        # set/get mode in nested directory
500        host.mkdir("_test dir_/nested_dir")
501        self.cleaner.add_dir("_test dir_/nested_dir")
502        # set/get mode of a directory
503        host.chmod("_test dir_/nested_dir", 0757)
504        self.assert_mode("_test dir_/nested_dir", 0757)
505
506    def test_chmod_existing_file(self):
507        host = self.host
508        host.mkdir("_test dir_")
509        self.cleaner.add_dir("_test dir_")
510        # set/get mode on a file
511        file_name = host.path.join("_test dir_", "_testfile_")
512        self.make_file(file_name)
513        host.chmod(file_name, 0646)
514        self.assert_mode(file_name, 0646)
515
516    def test_chmod_nonexistent_path(self):
517        # set/get mode of a directory
518        self.assertRaises(ftp_error.PermanentError, self.host.chmod,
519                          "nonexistent", 0757)
520
521    def test_cache_invalidation(self):
522        host = self.host
523        host.mkdir("_test dir_")
524        self.cleaner.add_dir("_test dir_")
525        # make sure the mode is in the cache
526        unused_stat_result = host.stat("_test dir_")
527        # set/get mode of a directory
528        host.chmod("_test dir_", 0757)
529        self.assert_mode("_test dir_", 0757)
530        # set/get mode on a file
531        file_name = host.path.join("_test dir_", "_testfile_")
532        self.make_file(file_name)
533        # make sure the mode is in the cache
534        unused_stat_result = host.stat(file_name)
535        host.chmod(file_name, 0646)
536        self.assert_mode(file_name, 0646)
537
538    #
539    # other tests
540    #
541    def test_open_for_reading(self):
542        # test for issue #17, http://ftputil.sschwarzer.net/trac/ticket/17
543        file1 = self.host.file("debian-keyring.tar.gz", 'rb')
544        file1.close()
545        # make sure that there are no problems if the connection is reused
546        file2 = self.host.file("debian-keyring.tar.gz", 'rb')
547        file2.close()
548        self.failUnless(file1._session is file2._session)
549
550    def test_names_with_spaces(self):
551        # test if directories and files with spaces in their names
552        #  can be used
553        host = self.host
554        self.failUnless(host.path.isdir("dir with spaces"))
555        self.assertEqual(host.listdir("dir with spaces"),
556                         ['second dir', 'some file', 'some_file'])
557        self.failUnless(host.path.isdir("dir with spaces/second dir"))
558        self.failUnless(host.path.isfile("dir with spaces/some_file"))
559        self.failUnless(host.path.isfile("dir with spaces/some file"))
560
561
562if __name__ == '__main__':
563    print """\
564Test for real FTP access.
565
566This test writes some files and directories on the local client and the
567remote server. Thus, you may want to skip this test by pressing [Ctrl-C].
568If the test should run, enter the login data for the remote server. You
569need write access in the login directory. This test can last a few minutes
570because it has to wait to test the timezone calculation.
571"""
572    try:
573        raw_input("[Return] to continue, or [Ctrl-C] to skip test. ")
574    except KeyboardInterrupt:
575        print "\nTest aborted."
576        sys.exit()
577    # get login data only once, not for each test
578    server, user, password = get_login_data()
579    unittest.main()
580
Note: See TracBrowser for help on using the repository browser.