root/_test_ftputil.py @ 583:79fe7ae18727

Revision 583:79fe7ae18727, 19.8 kB (checked in by Stefan Schwarzer <sschwarzer@…>, 4 years ago)
Added iterator interface for `FTPFile`s.
Line 
1# Copyright (C) 2002-2006, Stefan Schwarzer
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# - Redistributions of source code must retain the above copyright
9#   notice, this list of conditions and the following disclaimer.
10#
11# - Redistributions in binary form must reproduce the above copyright
12#   notice, this list of conditions and the following disclaimer in the
13#   documentation and/or other materials provided with the distribution.
14#
15# - Neither the name of the above author nor the names of the
16#   contributors to the software may be used to endorse or promote
17#   products derived from this software without specific prior written
18#   permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
24# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32# $Id$
33
34import ftplib
35import operator
36import os
37import random
38import stat
39import time
40import unittest
41
42import _mock_ftplib
43import _test_base
44import ftp_error
45import ftp_file
46import ftputil
47
48#
49# helper functions to generate random data
50#
51def random_data(pool, size=10000):
52    """
53    Return a sequence of characters consisting of those from
54    the pool of integer numbers.
55    """
56    character_list = []
57    for i in range(size):
58        ordinal = random.choice(pool)
59        character_list.append(chr(ordinal))
60    result = ''.join(character_list)
61    return result
62
63def ascii_data():
64    """Return an ASCII character string."""
65    pool = range(32, 128)
66    pool.append(ord('\n'))
67    return random_data(pool)
68
69def binary_data():
70    """Return a binary character string."""
71    pool = range(0, 256)
72    return random_data(pool)
73
74
75#
76# several customized `MockSession` classes
77#
78class FailOnLoginSession(_mock_ftplib.MockSession):
79    def __init__(self, host='', user='', password=''):
80        raise ftplib.error_perm
81
82class ReadMockSession(_mock_ftplib.MockSession):
83    mock_file_content = 'line 1\r\nanother line\r\nyet another line'
84
85class AsciiReadMockSession(_mock_ftplib.MockSession):
86    mock_file_content = '\r\n'.join(map(str, range(20)))
87
88class BinaryDownloadMockSession(_mock_ftplib.MockSession):
89    mock_file_content = binary_data()
90
91class TimeShiftMockSession(_mock_ftplib.MockSession):
92    def delete(self, file_name):
93        pass
94
95class InaccessibleDirSession(_mock_ftplib.MockSession):
96    _login_dir = '/inaccessible'
97
98    def pwd(self):
99        return self._login_dir
100
101    def cwd(self, dir):
102        if dir in (self._login_dir, self._login_dir + '/'):
103            raise ftplib.error_perm
104        else:
105            _mock_ftplib.MockSession.cwd(self, dir)
106
107#
108# customized `FTPHost` class for conditional upload/download tests
109#  and time shift tests
110#
111class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
112    def upload(self, source, target, mode=''):
113        assert False, "`FTPHost.upload` should not have been called"
114
115    def download(self, source, target, mode=''):
116        assert False, "`FTPHost.download` should not have been called"
117
118class TimeShiftFTPHost(ftputil.FTPHost):
119    class _Path:
120        def set_mtime(self, mtime):
121            self._mtime = mtime
122        def getmtime(self, file_name):
123            return self._mtime
124        def abspath(self, path):
125            return "/home/sschwarzer/_ftputil_sync_"
126        # needed for `isdir` in `FTPHost.remove`
127        def isfile(self, path):
128            return True
129
130    def __init__(self, *args, **kwargs):
131        ftputil.FTPHost.__init__(self, *args, **kwargs)
132        self.path = self._Path()
133
134#
135# test cases
136#
137class TestOpenAndClose(unittest.TestCase):
138    """Test opening and closing of `FTPHost` objects."""
139    def test_open_and_close(self):
140        """Test closing of `FTPHost`."""
141        host = _test_base.ftp_host_factory()
142        host.close()
143        self.assertEqual(host.closed, 1)
144        self.assertEqual(host._children, [])
145
146
147class TestLogin(unittest.TestCase):
148    def test_invalid_login(self):
149        """Login to invalid host must fail."""
150        self.assertRaises(ftp_error.FTPOSError, _test_base.ftp_host_factory,
151                          FailOnLoginSession)
152
153
154class TestFileOperations(unittest.TestCase):
155    """Test operations with file-like objects."""
156    def test_inaccessible_dir(self):
157        """Test whether opening a file at an invalid location fails."""
158        host = _test_base.ftp_host_factory(
159               session_factory=InaccessibleDirSession)
160        self.assertRaises(ftp_error.FTPIOError, host.file,
161                          '/inaccessible/new_file', 'w')
162
163    def test_caching(self):
164        """Test whether `_FTPFile` cache of `FTPHost` object works."""
165        host = _test_base.ftp_host_factory()
166        self.assertEqual(len(host._children), 0)
167        path1 = 'path1'
168        path2 = 'path2'
169        # open one file and inspect cache
170        file1 = host.file(path1, 'w')
171        child1 = host._children[0]
172        self.assertEqual(len(host._children), 1)
173        self.failIf(child1._file.closed)
174        # open another file
175        file2 = host.file(path2, 'w')
176        child2 = host._children[1]
177        self.assertEqual(len(host._children), 2)
178        self.failIf(child2._file.closed)
179        # close first file
180        file1.close()
181        self.assertEqual(len(host._children), 2)
182        self.failUnless(child1._file.closed)
183        self.failIf(child2._file.closed)
184        # re-open first child's file
185        file1 = host.file(path1, 'w')
186        child1_1 = file1._host
187        # check if it's reused
188        self.failUnless(child1 is child1_1)
189        self.failIf(child1._file.closed)
190        self.failIf(child2._file.closed)
191        # close second file
192        file2.close()
193        self.failUnless(child2._file.closed)
194
195    def test_write_to_directory(self):
196        """Test whether attempting to write to a directory fails."""
197        host = _test_base.ftp_host_factory()
198        self.assertRaises(ftp_error.FTPIOError, host.file,
199                          '/home/sschwarzer', 'w')
200
201    def test_binary_write(self):
202        """Write binary data with `write`."""
203        host = _test_base.ftp_host_factory()
204        data = '\000a\001b\r\n\002c\003\n\004\r\005'
205        output = host.file('dummy', 'wb')
206        output.write(data)
207        output.close()
208        child_data = _mock_ftplib.content_of('dummy')
209        expected_data = data
210        self.assertEqual(child_data, expected_data)
211
212    def test_ascii_write(self):
213        """Write ASCII text with `write`."""
214        host = _test_base.ftp_host_factory()
215        data = ' \nline 2\nline 3'
216        output = host.file('dummy', 'w')
217        output.write(data)
218        output.close()
219        child_data = _mock_ftplib.content_of('dummy')
220        expected_data = ' \r\nline 2\r\nline 3'
221        self.assertEqual(child_data, expected_data)
222
223    def test_ascii_writelines(self):
224        """Write ASCII text with `writelines`."""
225        host = _test_base.ftp_host_factory()
226        data = [' \n', 'line 2\n', 'line 3']
227        backup_data = data[:]
228        output = host.file('dummy', 'w')
229        output.writelines(data)
230        output.close()
231        child_data = _mock_ftplib.content_of('dummy')
232        expected_data = ' \r\nline 2\r\nline 3'
233        self.assertEqual(child_data, expected_data)
234        # ensure that the original data was not modified
235        self.assertEqual(data, backup_data)
236
237    def test_ascii_read(self):
238        """Read ASCII text with plain `read`."""
239        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
240        input_ = host.file('dummy', 'r')
241        data = input_.read(0)
242        self.assertEqual(data, '')
243        data = input_.read(3)
244        self.assertEqual(data, 'lin')
245        data = input_.read(7)
246        self.assertEqual(data, 'e 1\nano')
247        data = input_.read()
248        self.assertEqual(data, 'ther line\nyet another line')
249        data = input_.read()
250        self.assertEqual(data, '')
251        input_.close()
252        # try it again with a more "problematic" string which
253        #  makes several reads in the `read` method necessary
254        host = _test_base.ftp_host_factory(session_factory=AsciiReadMockSession)
255        expected_data = AsciiReadMockSession.mock_file_content.\
256                        replace('\r\n', '\n')
257        input_ = host.file('dummy', 'r')
258        data = input_.read(len(expected_data))
259        self.assertEqual(data, expected_data)
260
261    def test_binary_readline(self):
262        """Read binary data with `readline`."""
263        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
264        input_ = host.file('dummy', 'rb')
265        data = input_.readline(3)
266        self.assertEqual(data, 'lin')
267        data = input_.readline(10)
268        self.assertEqual(data, 'e 1\r\n')
269        data = input_.readline(13)
270        self.assertEqual(data, 'another line\r')
271        data = input_.readline()
272        self.assertEqual(data, '\n')
273        data = input_.readline()
274        self.assertEqual(data, 'yet another line')
275        data = input_.readline()
276        self.assertEqual(data, '')
277        input_.close()
278
279    def test_ascii_readline(self):
280        """Read ASCII text with `readline`."""
281        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
282        input_ = host.file('dummy', 'r')
283        data = input_.readline(3)
284        self.assertEqual(data, 'lin')
285        data = input_.readline(10)
286        self.assertEqual(data, 'e 1\n')
287        data = input_.readline(13)
288        self.assertEqual(data, 'another line\n')
289        data = input_.readline()
290        self.assertEqual(data, 'yet another line')
291        data = input_.readline()
292        self.assertEqual(data, '')
293        input_.close()
294
295    def test_ascii_readlines(self):
296        """Read ASCII text with `readlines`."""
297        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
298        input_ = host.file('dummy', 'r')
299        data = input_.read(3)
300        self.assertEqual(data, 'lin')
301        data = input_.readlines()
302        self.assertEqual(data, ['e 1\n', 'another line\n',
303                                'yet another line'])
304        input_.close()
305
306    def test_ascii_xreadlines(self):
307        """Read ASCII text with `xreadlines`."""
308        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
309        # open file, skip some bytes
310        input_ = host.file('dummy', 'r')
311        data = input_.read(3)
312        xrl_obj = input_.xreadlines()
313        self.failUnless(xrl_obj.__class__ is ftp_file._XReadlines)
314        self.failUnless(xrl_obj._ftp_file.__class__ is ftp_file._FTPFile)
315        data = xrl_obj[0]
316        self.assertEqual(data, 'e 1\n')
317        # try to skip an index
318        self.assertRaises(RuntimeError, operator.__getitem__, xrl_obj, 2)
319        # continue reading
320        data = xrl_obj[1]
321        self.assertEqual(data, 'another line\n')
322        data = xrl_obj[2]
323        self.assertEqual(data, 'yet another line')
324        # try to read beyond EOF
325        self.assertRaises(IndexError, operator.__getitem__, xrl_obj, 3)
326
327    def test_binary_iterator(self):
328        """Test the iterator interface of `FTPFile` objects."""
329        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
330        input_ = host.file('dummy')
331        input_iterator = iter(input_)
332        self.assertEqual(input_iterator.next(), "line 1\n")
333        self.assertEqual(input_iterator.next(), "another line\n")
334        self.assertEqual(input_iterator.next(), "yet another line")
335        self.assertRaises(StopIteration, input_iterator.next)
336        input_.close()
337
338    def test_ascii_iterator(self):
339        """Test the iterator interface of `FTPFile` objects."""
340        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
341        input_ = host.file('dummy', 'rb')
342        input_iterator = iter(input_)
343        self.assertEqual(input_iterator.next(), "line 1\r\n")
344        self.assertEqual(input_iterator.next(), "another line\r\n")
345        self.assertEqual(input_iterator.next(), "yet another line")
346        self.assertRaises(StopIteration, input_iterator.next)
347        input_.close()
348
349    def test_read_unknown_file(self):
350        """Test whether reading a file which isn't there fails."""
351        host = _test_base.ftp_host_factory()
352        self.assertRaises(ftp_error.FTPIOError, host.file, 'notthere', 'r')
353
354
355class TestUploadAndDownload(unittest.TestCase):
356    """Test ASCII upload and binary download as examples."""
357    def generate_ascii_file(self, data, filename):
358        """Generate an ASCII data file."""
359        source_file = open(filename, 'w')
360        source_file.write(data)
361        source_file.close()
362
363    def test_ascii_upload(self):
364        """Test ASCII mode upload."""
365        local_source = '__test_source'
366        data = ascii_data()
367        self.generate_ascii_file(data, local_source)
368        # upload
369        host = _test_base.ftp_host_factory()
370        host.upload(local_source, 'dummy')
371        # check uploaded content
372        # the data which was uploaded has its line endings converted
373        #  so the conversion must also be applied to `data`
374        data = data.replace('\n', '\r\n')
375        remote_file_content = _mock_ftplib.content_of('dummy')
376        self.assertEqual(data, remote_file_content)
377        # clean up
378        os.unlink(local_source)
379
380    def test_binary_download(self):
381        """Test binary mode download."""
382        local_target = '__test_target'
383        host = _test_base.ftp_host_factory(
384               session_factory=BinaryDownloadMockSession)
385        # download
386        host.download('dummy', local_target, 'b')
387        # read file and compare
388        data = open(local_target, 'rb').read()
389        remote_file_content = _mock_ftplib.content_of('dummy')
390        self.assertEqual(data, remote_file_content)
391        # clean up
392        os.unlink(local_target)
393
394    def test_conditional_upload(self):
395        """Test conditional ASCII mode upload."""
396        local_source = '__test_source'
397        data = ascii_data()
398        self.generate_ascii_file(data, local_source)
399        # target is newer, so don't upload
400        host = _test_base.ftp_host_factory(
401               ftp_host_class=FailingUploadAndDownloadFTPHost)
402        flag = host.upload_if_newer(local_source, '/home/newer')
403        self.assertEqual(flag, False)
404        # target is older, so upload
405        host = _test_base.ftp_host_factory()
406        flag = host.upload_if_newer(local_source, '/home/older')
407        self.assertEqual(flag, True)
408        # check uploaded content
409        # the data which was uploaded has its line endings converted
410        #  so the conversion must also be applied to 'data'
411        data = data.replace('\n', '\r\n')
412        remote_file_content = _mock_ftplib.content_of('older')
413        self.assertEqual(data, remote_file_content)
414        # target doesn't exist, so upload
415        host = _test_base.ftp_host_factory()
416        flag = host.upload_if_newer(local_source, '/home/notthere')
417        self.assertEqual(flag, True)
418        remote_file_content = _mock_ftplib.content_of('notthere')
419        self.assertEqual(data, remote_file_content)
420        # clean up
421        os.unlink(local_source)
422
423    def compare_and_delete_downloaded_data(self, filename):
424        """Compare content of downloaded file with its source, then
425        delete the local target file."""
426        data = open(filename, 'rb').read()
427        remote_file_content = _mock_ftplib.content_of('newer')
428        self.assertEqual(data, remote_file_content)
429        # clean up
430        os.unlink(filename)
431
432    def test_conditional_download_without_target(self):
433        "Test conditional binary mode download when no target file exists."
434        local_target = '__test_target'
435        # target does not exist, so download
436        host = _test_base.ftp_host_factory(
437               session_factory=BinaryDownloadMockSession)
438        flag = host.download_if_newer('/home/newer', local_target, 'b')
439        self.assertEqual(flag, True)
440        self.compare_and_delete_downloaded_data(local_target)
441
442    def test_conditional_download_with_older_target(self):
443        """Test conditional binary mode download with newer source file."""
444        local_target = '__test_target'
445        # make target file
446        open(local_target, 'w').close()
447        # source is newer, so download
448        host = _test_base.ftp_host_factory(
449               session_factory=BinaryDownloadMockSession)
450        flag = host.download_if_newer('/home/newer', local_target, 'b')
451        self.assertEqual(flag, True)
452        self.compare_and_delete_downloaded_data(local_target)
453
454    def test_conditional_download_with_newer_target(self):
455        """Test conditional binary mode download with older source file."""
456        local_target = '__test_target'
457        # make target file
458        open(local_target, 'w').close()
459        # source is older, so don't download
460        host = _test_base.ftp_host_factory(
461               session_factory=BinaryDownloadMockSession)
462        host = _test_base.ftp_host_factory(
463               ftp_host_class=FailingUploadAndDownloadFTPHost,
464               session_factory=BinaryDownloadMockSession)
465        flag = host.download_if_newer('/home/older', local_target, 'b')
466        self.assertEqual(flag, False)
467        # remove target file
468        os.unlink(local_target)
469
470
471class TestTimeShift(unittest.TestCase):
472    def test_rounded_time_shift(self):
473        """Test if time shift is rounded correctly."""
474        host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
475        # use private bound method
476        rounded_time_shift = host._FTPHost__rounded_time_shift
477        # original value, expected result
478        test_data = [
479          (0, 0), (0.1, 0), (-0.1, 0), (1500, 0), (-1500, 0),
480          (1800, 3600), (-1800, -3600), (2000, 3600), (-2000, -3600),
481          (5*3600-100, 5*3600), (-5*3600+100, -5*3600)]
482        for time_shift, expected_time_shift in test_data:
483            calculated_time_shift = rounded_time_shift(time_shift)
484            self.assertEqual(calculated_time_shift, expected_time_shift)
485
486    def test_assert_valid_time_shift(self):
487        """Test time shift sanity checks."""
488        host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
489        # use private bound method
490        assert_time_shift = host._FTPHost__assert_valid_time_shift
491        # valid time shifts
492        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
493        for time_shift in test_data:
494            self.failUnless(assert_time_shift(time_shift) is None)
495        # invalid time shift (exceeds one day)
496        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 25*3600)
497        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, -25*3600)
498        # invalid time shift (deviation from full hours unacceptable)
499        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 10*60)
500        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift,
501                          -3600-10*60)
502
503    def test_synchronize_times(self):
504        """Test time synchronization with server."""
505        host = _test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
506               session_factory=TimeShiftMockSession)
507        # valid time shift
508        host.path.set_mtime(time.time() + 3630)
509        host.synchronize_times()
510        self.assertEqual(host.time_shift(), 3600)
511        # invalid time shift
512        host.path.set_mtime(time.time() + 3600+10*60)
513        self.assertRaises(ftp_error.TimeShiftError, host.synchronize_times)
514
515
516if __name__ == '__main__':
517    unittest.main()
Note: See TracBrowser for help on using the browser.