root/trunk/_test_real_ftp.py

Revision 769, 22.0 kB (checked in by schwa, 3 weeks ago)
Fixed test; I was too hasty here (thanks again, Tom).
  • Property svn:mime-type set to text/x-python
  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1 # Copyright (C) 2003-2008, Stefan Schwarzer
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # - Redistributions of source code must retain the above copyright
9 #   notice, this list of conditions and the following disclaimer.
10 #
11 # - Redistributions in binary form must reproduce the above copyright
12 #   notice, this list of conditions and the following disclaimer in the
13 #   documentation and/or other materials provided with the distribution.
14 #
15 # - Neither the name of the above author nor the names of the
16 #   contributors to the software may be used to endorse or promote
17 #   products derived from this software without specific prior written
18 #   permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
24 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32 # $Id$
33
34 # Execute a test on a real FTP server (other tests use a mock server)
35
36 import getpass
37 import operator
38 import os
39 import time
40 import unittest
41 import stat
42 import sys
43
44 import ftputil
45 from ftputil import ftp_error
46 from ftputil import ftp_stat
47
48
49 def get_login_data():
50     """
51     Return a three-element tuple consisting of server name, user id
52     and password. The data - used to be - requested interactively.
53     """
54     #server = raw_input("Server: ")
55     #user = raw_input("User: ")
56     #password = getpass.getpass()
57     #return server, user, password
58     return "localhost", 'ftptest', 'd605581757de5eb56d568a4419f4126e'
59
60 def utc_local_time_shift():
61     """
62     Return the expected time shift in seconds assuming the server
63     uses UTC in its listings and the client uses local time.
64
65     This is needed because Pure-FTPd meanwhile seems to insist that
66     the displayed time for files is in UTC.
67     """
68     utc_tuple = time.gmtime()
69     localtime_tuple = time.localtime()
70     # to calculate the correct times shift, we need to ignore the
71     #  DST component in the localtime tuple, i. e. set it to 0
72     localtime_tuple = localtime_tuple[:-1] + (0,)
73     time_shift_in_seconds = time.mktime(utc_tuple) - \
74                             time.mktime(localtime_tuple)
75     # to be safe, round the above value to units of 3600 s (1 h)
76     return round(time_shift_in_seconds / 3600.0) * 3600
77
78 # difference between local times of server and client; if 0.0, server
79 #  and client use the same timezone
80 EXPECTED_TIME_SHIFT = utc_local_time_shift()
81
82
83 class Cleaner(object):
84     """This class helps to remove directories and files which
85     might be left behind if a test fails in unexpected ways.
86     """
87
88     def __init__(self, host):
89         # the test class (probably `RealFTPTest`) and the helper
90         #  class share the same `FTPHost` object
91         self._host = host
92         self._ftp_items = []
93
94     def add_dir(self, path):
95         """Schedule a directory with path `path` for removal."""
96         self._ftp_items.append(('d', self._host.path.abspath(path)))
97
98     def add_file(self, path):
99         """Schedule a file with path `path` for removal."""
100         self._ftp_items.append(('f', self._host.path.abspath(path)))
101
102     def clean(self):
103         """Remove the directories and files previously remembered.
104         The removal works in reverse order of the scheduling with
105         `add_dir` and `add_file`.
106
107         Errors due to a removal are ignored.
108         """
109         self._host.chdir("/")
110         # code should work with Python 2.3
111         self._ftp_items.reverse()
112         for type_, path in self._ftp_items:
113             try:
114                 if type_ == 'd':
115                     # if something goes wrong in `rmtree` we might
116                     #  leave a mess behind
117                     self._host.rmtree(path)
118                 elif type_ == 'f':
119                     # minor mess if `remove` fails
120                     self._host.remove(path)
121             except ftp_error.FTPError:
122                 pass
123
124
125 class RealFTPTest(unittest.TestCase):
126     def setUp(self):
127         self.host = ftputil.FTPHost(server, user, password)
128         self.cleaner = Cleaner(self.host)
129
130     def tearDown(self):
131         self.cleaner.clean()
132         self.host.close()
133
134     #
135     # helper methods
136     #
137     def make_file(self, path):
138         self.cleaner.add_file(path)
139         file_ = self.host.file(path, 'wb')
140         file_.close()
141
142     def make_local_file(self):
143         fobj = file('_localfile_', 'wb')
144         fobj.write("abc\x12\x34def\t")
145         fobj.close()
146
147     #
148     # `mkdir`, `makedirs`, `rmdir` and `rmtree`
149     #
150     def test_mkdir_rmdir(self):
151         host = self.host
152         dir_name = "_testdir_"
153         file_name = host.path.join(dir_name, "_nonempty_")
154         self.cleaner.add_dir(dir_name)
155         # make dir and check if it's there
156         host.mkdir(dir_name)
157         files = host.listdir(host.curdir)
158         self.failIf(dir_name not in files)
159         # try to remove non-empty directory
160         self.cleaner.add_file(file_name)
161         non_empty = host.file(file_name, "w")
162         non_empty.close()
163         self.assertRaises(ftp_error.PermanentError, host.rmdir, dir_name)
164         # remove file
165         host.unlink(file_name)
166         # `remove` on a directory should fail
167         try:
168             try:
169                 host.remove(dir_name)
170             except ftp_error.PermanentError, exc:
171                 self.failUnless(str(exc).startswith(
172                                 "remove/unlink can only delete files"))
173             else:
174                 self.failIf(True, "we shouldn't have come here")
175         finally:
176             # delete empty directory
177             host.rmdir(dir_name)
178         files = host.listdir(host.curdir)
179         self.failIf(dir_name in files)
180
181     def test_makedirs_without_existing_dirs(self):
182         host = self.host
183         # no `_dir1_` yet
184         self.failIf('_dir1_' in host.listdir(host.curdir))
185         # vanilla case, all should go well
186         host.makedirs('_dir1_/dir2/dir3/dir4')
187         self.cleaner.add_dir('_dir1_')
188         # check host
189         self.failUnless(host.path.isdir('_dir1_'))
190         self.failUnless(host.path.isdir('_dir1_/dir2'))
191         self.failUnless(host.path.isdir('_dir1_/dir2/dir3'))
192         self.failUnless(host.path.isdir('_dir1_/dir2/dir3/dir4'))
193
194     def test_makedirs_from_non_root_directory(self):
195         # this is a testcase for issue #22, see
196         #  http://ftputil.sschwarzer.net/trac/ticket/22
197         host = self.host
198         # no `_dir1_` and `_dir2_` yet
199         self.failIf('_dir1_' in host.listdir(host.curdir))
200         self.failIf('_dir2_' in host.listdir(host.curdir))
201         # part 1: try to make directories starting from `_dir1_`
202         # make and change to non-root directory
203         self.cleaner.add_dir("_dir1_")
204         host.mkdir('_dir1_')
205         host.chdir('_dir1_')
206         host.makedirs('_dir2_/_dir3_')
207         # test for expected directory hierarchy
208         self.failUnless(host.path.isdir('/_dir1_'))
209         self.failUnless(host.path.isdir('/_dir1_/_dir2_'))
210         self.failUnless(host.path.isdir('/_dir1_/_dir2_/_dir3_'))
211         self.failIf(host.path.isdir('/_dir1_/_dir1_'))
212         # remove all but the directory we're in
213         host.rmdir('/_dir1_/_dir2_/_dir3_')
214         host.rmdir('/_dir1_/_dir2_')
215         # part 2: try to make directories starting from root
216         self.cleaner.add_dir("/_dir2_")
217         host.makedirs('/_dir2_/_dir3_')
218         # test for expected directory hierarchy
219         self.failUnless(host.path.isdir('/_dir2_'))
220         self.failUnless(host.path.isdir('/_dir2_/_dir3_'))
221         self.failIf(host.path.isdir('/_dir1_/_dir2_'))
222
223     def test_makedirs_from_non_root_directory_fake_windows_os(self):
224         saved_sep = os.sep
225         os.sep = '\\'
226         try:
227             self.test_makedirs_from_non_root_directory()
228         finally:
229             os.sep = saved_sep
230
231     def test_makedirs_of_existing_directory(self):
232         host = self.host
233         # the (chrooted) login directory
234         host.makedirs('/')
235
236     def test_makedirs_with_file_in_the_way(self):
237         host = self.host
238         self.cleaner.add_dir('_dir1_')
239         host.mkdir('_dir1_')
240         self.make_file('_dir1_/file1')
241         # try it
242         self.assertRaises(ftp_error.PermanentError, host.makedirs,
243                           '_dir1_/file1')
244         self.assertRaises(ftp_error.PermanentError, host.makedirs,
245                           '_dir1_/file1/dir2')
246
247     def test_makedirs_with_existing_directory(self):
248         host = self.host
249         self.cleaner.add_dir("_dir1_")
250         host.mkdir('_dir1_')
251         host.makedirs('_dir1_/dir2')
252         # check
253         self.failUnless(host.path.isdir('_dir1_'))
254         self.failUnless(host.path.isdir('_dir1_/dir2'))
255
256     def test_makedirs_in_non_writable_directory(self):
257         host = self.host
258         # preparation: `rootdir1` exists but is only writable by root
259         self.assertRaises(ftp_error.PermanentError, host.makedirs,
260                           'rootdir1/dir2')
261
262     def test_makedirs_with_writable_directory_at_end(self):
263         host = self.host
264         self.cleaner.add_dir('rootdir2/dir2')
265         # preparation: `rootdir2` exists but is only writable by root;
266         #  `dir2` is writable by regular ftp user
267         # these both should work
268         host.makedirs('rootdir2/dir2')
269         host.makedirs('rootdir2/dir2/dir3')
270
271     def test_rmtree_without_error_handler(self):
272         host = self.host
273         # build a tree
274         self.cleaner.add_dir('_dir1_')
275         host.makedirs('_dir1_/dir2')
276         self.make_file('_dir1_/file1')
277         self.make_file('_dir1_/file2')
278         self.make_file('_dir1_/dir2/file3')
279         self.make_file('_dir1_/dir2/file4')
280         # try to remove a _file_ with `rmtree`
281         self.assertRaises(ftp_error.PermanentError, host.rmtree, '_dir1_/file2')
282         # remove dir2
283         host.rmtree('_dir1_/dir2')
284         self.failIf(host.path.exists('_dir1_/dir2'))
285         self.failUnless(host.path.exists('_dir1_/file2'))
286         # remake dir2 and remove _dir1_
287         host.mkdir('_dir1_/dir2')
288         self.make_file('_dir1_/dir2/file3')
289         self.make_file('_dir1_/dir2/file4')
290         host.rmtree('_dir1_')
291         self.failIf(host.path.exists('_dir1_'))
292
293     def test_rmtree_with_error_handler(self):
294         host = self.host
295         self.cleaner.add_dir('_dir1_')
296         host.mkdir('_dir1_')
297         self.make_file('_dir1_/file1')
298         # prepare error "handler"
299         log = []
300         def error_handler(*args):
301             log.append(args)
302         # try to remove a file as root "directory"
303         host.rmtree('_dir1_/file1', ignore_errors=True, onerror=error_handler)
304         self.assertEqual(log, [])
305         host.rmtree('_dir1_/file1', ignore_errors=False, onerror=error_handler)
306         self.assertEqual(log[0][0], host.listdir)
307         self.assertEqual(log[0][1], '_dir1_/file1')
308         self.assertEqual(log[1][0], host.rmdir)
309         self.assertEqual(log[1][1], '_dir1_/file1')
310         host.rmtree('_dir1_')
311         # try to remove a non-existent directory
312         del log[:]
313         host.rmtree('_dir1_', ignore_errors=False, onerror=error_handler)
314         self.assertEqual(log[0][0], host.listdir)
315         self.assertEqual(log[0][1], '_dir1_')
316         self.assertEqual(log[1][0], host.rmdir)
317         self.assertEqual(log[1][1], '_dir1_')
318
319     #
320     # directory tree walking
321     #
322     def test_walk_topdown(self):
323         # preparation: build tree in directory `walk_test`
324         host = self.host
325         expected = [
326           ('walk_test', ['dir1', 'dir2', 'dir3'], ['file4']),
327           ('walk_test/dir1', ['dir11', 'dir12'], []),
328           ('walk_test/dir1/dir11', [], []),
329           ('walk_test/dir1/dir12', ['dir123'], ['file121', 'file122']),
330           ('walk_test/dir1/dir12/dir123', [], ['file1234']),
331           ('walk_test/dir2', [], []),
332           ('walk_test/dir3', ['dir33'], ['file31', 'file32']),
333           ('walk_test/dir3/dir33', [], []),
334           ]
335         # collect data, using `walk`
336         actual = []
337         for items in host.walk('walk_test'):
338             actual.append(items)
339         # compare with expected results
340         self.assertEqual(len(actual), len(expected))
341         for index in range(len(actual)):
342             self.assertEqual(actual[index], expected[index])
343
344     def test_walk_depth_first(self):
345         # preparation: build tree in directory `walk_test`
346         host = self.host
347         expected = [
348           ('walk_test/dir1/dir11', [], []),
349           ('walk_test/dir1/dir12/dir123', [], ['file1234']),
350           ('walk_test/dir1/dir12', ['dir123'], ['file121', 'file122']),
351           ('walk_test/dir1', ['dir11', 'dir12'], []),
352           ('walk_test/dir2', [], []),
353           ('walk_test/dir3/dir33', [], []),
354           ('walk_test/dir3', ['dir33'], ['file31', 'file32']),
355           ('walk_test', ['dir1', 'dir2', 'dir3'], ['file4'])
356           ]
357         # collect data, using `walk`
358         actual = []
359         for items in host.walk('walk_test', topdown=False):
360             actual.append(items)
361         # compare with expected results
362         self.assertEqual(len(actual), len(expected))
363         for index in range(len(actual)):
364             self.assertEqual(actual[index], expected[index])
365
366     #
367     # renaming
368     #
369     def test_rename(self):
370         host = self.host
371         # make sure the target of the renaming operation is removed
372         self.cleaner.add_file('_testfile2_')
373         self.make_file("_testfile1_")
374         host.rename('_testfile1_', '_testfile2_')
375         self.failIf(host.path.exists('_testfile1_'))
376         self.failUnless(host.path.exists('_testfile2_'))
377         host.remove('_testfile2_')
378
379     def test_rename_with_spaces_in_directory(self):
380         host = self.host
381         dir_name = "_dir with spaces_"
382         self.cleaner.add_dir(dir_name)
383         host.mkdir(dir_name)
384         self.make_file(dir_name + "/testfile1")
385         host.rename(dir_name + "/testfile1", dir_name + "/testfile2")
386         self.failIf(host.path.exists(dir_name + "/testfile1"))
387         self.failUnless(host.path.exists(dir_name + "/testfile2"))
388
389     #
390     # stat'ing
391     #
392     def test_stat(self):
393         host = self.host
394         dir_name = "_testdir_"
395         file_name = host.path.join(dir_name, "_nonempty_")
396         # make a directory and a file in it
397         self.cleaner.add_dir(dir_name)
398         host.mkdir(dir_name)
399         fobj = host.file(file_name, "wb")
400         fobj.write("abc\x12\x34def\t")
401         fobj.close()
402         # do some stats
403         # - dir
404         self.assertEqual(host.listdir(dir_name), ["_nonempty_"])
405         self.assertEqual(bool(host.path.isdir(dir_name)), True)
406         self.assertEqual(bool(host.path.isfile(dir_name)), False)
407         self.assertEqual(bool(host.path.islink(dir_name)), False)
408         # - file
409         self.assertEqual(bool(host.path.isdir(file_name)), False)
410         self.assertEqual(bool(host.path.isfile(file_name)), True)
411         self.assertEqual(bool(host.path.islink(file_name)), False)
412         self.assertEqual(host.path.getsize(file_name), 9)
413         # - file's modification time; allow up to two minutes difference
414         host.synchronize_times()
415         server_mtime = host.path.getmtime(file_name)
416         client_mtime = time.mktime(time.localtime())
417         calculated_time_shift = server_mtime - client_mtime
418         self.failIf(abs(calculated_time_shift-host.time_shift()) > 120)
419
420     def test_concurrent_access(self):
421         self.make_file("_testfile_")
422         host1 = ftputil.FTPHost(server, user, password)
423         host2 = ftputil.FTPHost(server, user, password)
424         stat_result1 = host1.stat("_testfile_")
425         stat_result2 = host2.stat("_testfile_")
426         self.assertEqual(stat_result1, stat_result2)
427         host2.remove("_testfile_")
428         # can still get the result via `host1`
429         stat_result1 = host1.stat("_testfile_")
430         self.assertEqual(stat_result1, stat_result2)
431         # stat'ing on `host2` gives an exception
432         self.assertRaises(ftp_error.PermanentError, host2.stat, "_testfile_")
433         # stat'ing on `host1` after invalidation
434         absolute_path = host1.path.join(host1.getcwd(), "_testfile_")
435         host1.stat_cache.invalidate(absolute_path)
436         self.assertRaises(ftp_error.PermanentError, host1.stat, "_testfile_")
437
438     #
439     # `upload` (including time shift test)
440     #
441     def test_time_shift(self):
442         self.host.synchronize_times()
443         self.assertEqual(self.host.time_shift(), EXPECTED_TIME_SHIFT)
444
445     def test_upload(self):
446         host = self.host
447         host.synchronize_times()
448         # make local file and upload it
449         self.make_local_file()
450         # wait; else small time differences between client and server
451         #  actually could trigger the update
452         time.sleep(65)
453         try:
454             self.cleaner.add_file('_remotefile_')
455             host.upload('_localfile_', '_remotefile_', 'b')
456             # retry; shouldn't be uploaded
457             uploaded = host.upload_if_newer('_localfile_', '_remotefile_', 'b')
458             self.assertEqual(uploaded, False)
459             # rewrite the local file
460             self.make_local_file()
461             time.sleep(65)
462             # retry; should be uploaded now
463             uploaded = host.upload_if_newer('_localfile_', '_remotefile_', 'b')
464             self.assertEqual(uploaded, True)
465         finally:
466             # clean up
467             os.unlink('_localfile_')
468
469     #
470     # `chmod`
471     #
472     def assert_mode(self, path, expected_mode):
473         """Return an integer containing the allowed bits in the
474         mode change command.
475
476         The `FTPHost` object to test against is `self.host`.
477         """
478         full_mode = self.host.stat(path).st_mode
479         # remove flags we can't set via `chmod`
480         # allowed flags according to Python documentation
481         #  http://docs.python.org/lib/os-file-dir.html
482         allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT,
483           stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC,
484           stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
485           stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
486           stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH]
487         allowed_mask = reduce(operator.or_, allowed_flags)
488         mode = full_mode & allowed_mask
489         self.assertEqual(mode, expected_mode,
490                          "mode %s != %s" % (oct(mode), oct(expected_mode)))
491
492     def test_chmod_existing_directory(self):
493         host = self.host
494         host.mkdir("_test dir_")
495         self.cleaner.add_dir("_test dir_")
496         # set/get mode of a directory
497         host.chmod("_test dir_", 0757)
498         self.assert_mode("_test dir_", 0757)
499         # set/get mode in nested directory
500         host.mkdir("_test dir_/nested_dir")
501         self.cleaner.add_dir("_test dir_/nested_dir")
502         # set/get mode of a directory
503         host.chmod("_test dir_/nested_dir", 0757)
504         self.assert_mode("_test dir_/nested_dir", 0757)
505
506     def test_chmod_existing_file(self):
507         host = self.host
508         host.mkdir("_test dir_")
509         self.cleaner.add_dir("_test dir_")
510         # set/get mode on a file
511         file_name = host.path.join("_test dir_", "_testfile_")
512         self.make_file(file_name)
513         host.chmod(file_name, 0646)
514         self.assert_mode(file_name, 0646)
515
516     def test_chmod_nonexistent_path(self):
517         # set/get mode of a directory
518         self.assertRaises(ftp_error.PermanentError, self.host.chmod,
519                           "nonexistent", 0757)
520
521     def test_cache_invalidation(self):
522         host = self.host
523         host.mkdir("_test dir_")
524         self.cleaner.add_dir("_test dir_")
525         # make sure the mode is in the cache
526         unused_stat_result = host.stat("_test dir_")
527         # set/get mode of a directory
528         host.chmod("_test dir_", 0757)
529         self.assert_mode("_test dir_", 0757)
530         # set/get mode on a file
531         file_name = host.path.join("_test dir_", "_testfile_")
532         self.make_file(file_name)
533         # make sure the mode is in the cache
534         unused_stat_result = host.stat(file_name)
535         host.chmod(file_name, 0646)
536         self.assert_mode(file_name, 0646)
537
538     #
539     # other tests
540     #
541     def test_open_for_reading(self):
542         # test for issue #17, http://ftputil.sschwarzer.net/trac/ticket/17
543         file1 = self.host.file("debian-keyring.tar.gz", 'rb')
544         file1.close()
545         # make sure that there are no problems if the connection is reused
546         file2 = self.host.file("debian-keyring.tar.gz", 'rb')
547         file2.close()
548         self.failUnless(file1._session is file2._session)
549
550     def test_names_with_spaces(self):
551         # test if directories and files with spaces in their names
552         #  can be used
553         host = self.host
554         self.failUnless(host.path.isdir("dir with spaces"))
555         self.assertEqual(host.listdir("dir with spaces"),
556                          ['second dir', 'some file', 'some_file'])
557         self.failUnless(host.path.isdir("dir with spaces/second dir"))
558         self.failUnless(host.path.isfile("dir with spaces/some_file"))
559         self.failUnless(host.path.isfile("dir with spaces/some file"))
560
561
562 if __name__ == '__main__':
563     print """\
564 Test for real FTP access.
565
566 This test writes some files and directories on the local client and the
567 remote server. Thus, you may want to skip this test by pressing [Ctrl-C].
568 If the test should run, enter the login data for the remote server. You
569 need write access in the login directory. This test can last a few minutes
570 because it has to wait to test the timezone calculation.
571 """
572     try:
573         raw_input("[Return] to continue, or [Ctrl-C] to skip test. ")
574     except KeyboardInterrupt:
575         print "\nTest aborted."
576         sys.exit()
577     # get login data only once, not for each test
578     server, user, password = get_login_data()
579     unittest.main()
580
Note: See TracBrowser for help on using the browser.