source: test/test_host.py @ 1660:93ea351f922b

Last change on this file since 1660:93ea351f922b was 1660:93ea351f922b, checked in by Stefan Schwarzer <sschwarzer@…>, 3 years ago
Remove `unittest.main` calls Since we now use pytest as test runner and not a shell loop in the makefile, there's no longer a need for explicit `unittest.main` calls. ticket: 98
File size: 21.3 KB
Line 
1# encoding: utf-8
2# Copyright (C) 2002-2016, Stefan Schwarzer <sschwarzer@sschwarzer.net>
3# and ftputil contributors (see `doc/contributors.txt`)
4# See the file LICENSE for licensing terms.
5
6from __future__ import unicode_literals
7
8import ftplib
9import itertools
10import os
11import pickle
12import posixpath
13import random
14import time
15import unittest
16import warnings
17
18import pytest
19
20import ftputil
21import ftputil.compat
22import ftputil.error
23import ftputil.tool
24import ftputil.stat
25
26from test import mock_ftplib
27from test import test_base
28
29
30#
31# Helper functions to generate random data
32#
33def random_data(pool, size=10000):
34    """
35    Return a byte string of characters consisting of those from the
36    pool of integer numbers.
37    """
38    ordinal_list = [random.choice(pool) for i in range(size)]
39    return ftputil.compat.bytes_from_ints(ordinal_list)
40
41
42def ascii_data():
43    r"""
44    Return a unicode string of "normal" ASCII characters, including `\r`.
45    """
46    pool = list(range(32, 128))
47    # The idea is to have the "\r" converted to "\n" during the later
48    # text write and check this conversion.
49    pool.append(ord("\r"))
50    return ftputil.tool.as_unicode(random_data(pool))
51
52
53def binary_data():
54    """Return a binary character byte string."""
55    pool = list(range(0, 256))
56    return random_data(pool)
57
58
59#
60# Several customized `MockSession` classes
61#
62class FailOnLoginSession(mock_ftplib.MockSession):
63
64    def __init__(self, host="", user="", password=""):
65        raise ftplib.error_perm
66
67
68class FailOnKeepAliveSession(mock_ftplib.MockSession):
69
70    def pwd(self):
71        # Raise exception on second call to let the constructor work.
72        if not hasattr(self, "pwd_called"):
73            self.pwd_called = True
74        else:
75            raise ftplib.error_temp
76
77
78class RecursiveListingForDotAsPathSession(mock_ftplib.MockSession):
79
80    dir_contents = {
81      ".": """\
82lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
83
84dev:
85total 10
86
87etc:
88total 10
89
90pub:
91total 4
92-rw-r--r--   1 staff         74 Sep 25  2000 .message
93----------   1 staff          0 Aug 16  2003 .notar
94drwxr-xr-x  12 ftp          512 Nov 23  2008 freeware
95
96usr:
97total 4""",
98
99      "": """\
100total 10
101lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
102d--x--x--x   2 staff        512 Sep 24  2000 dev
103d--x--x--x   3 staff        512 Sep 25  2000 etc
104dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
105d--x--x--x   5 staff        512 Oct  3  2000 usr"""}
106
107    def _transform_path(self, path):
108        return path
109
110
111class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
112
113    mock_file_content = binary_data()
114
115
116class TimeShiftMockSession(mock_ftplib.MockSession):
117
118    def delete(self, file_name):
119        pass
120
121#
122# Customized `FTPHost` class for conditional upload/download tests
123# and time shift tests
124#
125class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
126
127    def upload(self, source, target, mode=""):
128        assert False, "`FTPHost.upload` should not have been called"
129
130    def download(self, source, target, mode=""):
131        assert False, "`FTPHost.download` should not have been called"
132
133
134class TimeShiftFTPHost(ftputil.FTPHost):
135
136    class _Path:
137        def split(self, path):
138            return posixpath.split(path)
139        def set_mtime(self, mtime):
140            self._mtime = mtime
141        def getmtime(self, file_name):
142            return self._mtime
143        def join(self, *args):
144            return posixpath.join(*args)
145        def normpath(self, path):
146            return posixpath.normpath(path)
147        def isabs(self, path):
148            return posixpath.isabs(path)
149        def abspath(self, path):
150            return "/home/sschwarzer/_ftputil_sync_"
151        # Needed for `isdir` in `FTPHost.remove`
152        def isfile(self, path):
153            return True
154
155    def __init__(self, *args, **kwargs):
156        ftputil.FTPHost.__init__(self, *args, **kwargs)
157        self.path = self._Path()
158
159#
160# Test cases
161#
162class TestInitAndClose(unittest.TestCase):
163    """Test initialization and closure of `FTPHost` objects."""
164
165    def test_open_and_close(self):
166        host = test_base.ftp_host_factory()
167        host.close()
168        assert host.closed is True
169        assert host._children == []
170
171
172class TestLogin(unittest.TestCase):
173
174    def test_invalid_login(self):
175        """Login to invalid host must fail."""
176        with pytest.raises(ftputil.error.FTPOSError):
177            test_base.ftp_host_factory(FailOnLoginSession)
178
179
180class TestKeepAlive(unittest.TestCase):
181
182    def test_succeeding_keep_alive(self):
183        """Assume the connection is still alive."""
184        host = test_base.ftp_host_factory()
185        host.keep_alive()
186
187    def test_failing_keep_alive(self):
188        """Assume the connection has timed out, so `keep_alive` fails."""
189        host = test_base.ftp_host_factory(
190                 session_factory=FailOnKeepAliveSession)
191        with pytest.raises(ftputil.error.TemporaryError):
192            host.keep_alive()
193
194
195class TestSetParser(unittest.TestCase):
196
197    class TrivialParser(ftputil.stat.Parser):
198        """
199        An instance of this parser always returns the same result
200        from its `parse_line` method. This is all we need to check
201        if ftputil uses the set parser. No actual parsing code is
202        required here.
203        """
204
205        def __init__(self):
206            # We can't use `os.stat("/home")` directly because we
207            # later need the object's `_st_name` attribute, which
208            # we can't set on a `os.stat` stat value.
209            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
210            default_stat_result._st_name = "home"
211            self.default_stat_result = default_stat_result
212
213        def parse_line(self, line, time_shift=0.0):
214            return self.default_stat_result
215
216    def test_set_parser(self):
217        """Test if the selected parser is used."""
218        host = test_base.ftp_host_factory()
219        assert host._stat._allow_parser_switching is True
220        trivial_parser = TestSetParser.TrivialParser()
221        host.set_parser(trivial_parser)
222        stat_result = host.stat("/home")
223        assert stat_result == trivial_parser.default_stat_result
224        assert host._stat._allow_parser_switching is False
225
226
227class TestCommandNotImplementedError(unittest.TestCase):
228
229    def test_command_not_implemented_error(self):
230        """
231        Test if we get the anticipated exception if a command isn't
232        implemented by the server.
233        """
234        host = test_base.ftp_host_factory()
235        with pytest.raises(ftputil.error.CommandNotImplementedError):
236            host.chmod("nonexistent", 0o644)
237        # `CommandNotImplementedError` is a subclass of `PermanentError`.
238        with pytest.raises(ftputil.error.PermanentError):
239            host.chmod("nonexistent", 0o644)
240
241
242class TestRecursiveListingForDotAsPath(unittest.TestCase):
243    """
244    Return a recursive directory listing when the path to list
245    is a dot. This is used to test for issue #33, see
246    http://ftputil.sschwarzer.net/trac/ticket/33 .
247    """
248
249    def test_recursive_listing(self):
250        host = test_base.ftp_host_factory(
251                 session_factory=RecursiveListingForDotAsPathSession)
252        lines = host._dir(host.curdir)
253        assert lines[0] == "total 10"
254        assert lines[1].startswith("lrwxrwxrwx   1 staff")
255        assert lines[2].startswith("d--x--x--x   2 staff")
256        host.close()
257
258    def test_plain_listing(self):
259        host = test_base.ftp_host_factory(
260                 session_factory=RecursiveListingForDotAsPathSession)
261        lines = host._dir("")
262        assert lines[0] == "total 10"
263        assert lines[1].startswith("lrwxrwxrwx   1 staff")
264        assert lines[2].startswith("d--x--x--x   2 staff")
265        host.close()
266
267    def test_empty_string_instead_of_dot_workaround(self):
268        host = test_base.ftp_host_factory(
269                 session_factory=RecursiveListingForDotAsPathSession)
270        files = host.listdir(host.curdir)
271        assert files == ["bin", "dev", "etc", "pub", "usr"]
272        host.close()
273
274
275class TestUploadAndDownload(unittest.TestCase):
276    """Test ASCII upload and binary download as examples."""
277
278    def generate_file(self, data, file_name):
279        """Generate a local data file."""
280        with open(file_name, "wb") as source_file:
281            source_file.write(data)
282
283    def test_download(self):
284        """Test mode download."""
285        local_target = "_test_target_"
286        host = test_base.ftp_host_factory(
287                 session_factory=BinaryDownloadMockSession)
288        # Download
289        host.download("dummy", local_target)
290        # Read file and compare
291        with open(local_target, "rb") as fobj:
292            data = fobj.read()
293        remote_file_content = mock_ftplib.content_of("dummy")
294        assert data == remote_file_content
295        # Clean up
296        os.unlink(local_target)
297
298    def test_conditional_upload(self):
299        """Test conditional upload."""
300        local_source = "_test_source_"
301        data = binary_data()
302        self.generate_file(data, local_source)
303        # Target is newer, so don't upload.
304        host = test_base.ftp_host_factory(
305                 ftp_host_class=FailingUploadAndDownloadFTPHost)
306        flag = host.upload_if_newer(local_source, "/home/newer")
307        assert flag is False
308        # Target is older, so upload.
309        host = test_base.ftp_host_factory()
310        flag = host.upload_if_newer(local_source, "/home/older")
311        assert flag is True
312        remote_file_content = mock_ftplib.content_of("older")
313        assert data == remote_file_content
314        # Target doesn't exist, so upload.
315        host = test_base.ftp_host_factory()
316        flag = host.upload_if_newer(local_source, "/home/notthere")
317        assert flag is True
318        remote_file_content = mock_ftplib.content_of("notthere")
319        assert data == remote_file_content
320        # Clean up.
321        os.unlink(local_source)
322
323    def compare_and_delete_downloaded_data(self, file_name):
324        """
325        Compare content of downloaded file with its source, then
326        delete the local target file.
327        """
328        with open(file_name, "rb") as fobj:
329            data = fobj.read()
330        # The name `newer` is used by all callers, so use it here, too.
331        remote_file_content = mock_ftplib.content_of("newer")
332        assert data == remote_file_content
333        # Clean up
334        os.unlink(file_name)
335
336    def test_conditional_download_without_target(self):
337        """
338        Test conditional binary mode download when no target file
339        exists.
340        """
341        local_target = "_test_target_"
342        # Target does not exist, so download.
343        host = test_base.ftp_host_factory(
344                 session_factory=BinaryDownloadMockSession)
345        flag = host.download_if_newer("/home/newer", local_target)
346        assert flag is True
347        self.compare_and_delete_downloaded_data(local_target)
348
349    def test_conditional_download_with_older_target(self):
350        """Test conditional binary mode download with newer source file."""
351        local_target = "_test_target_"
352        # Make target file.
353        open(local_target, "w").close()
354        # Source is newer (date in 2020), so download.
355        host = test_base.ftp_host_factory(
356                 session_factory=BinaryDownloadMockSession)
357        flag = host.download_if_newer("/home/newer", local_target)
358        assert flag is True
359        self.compare_and_delete_downloaded_data(local_target)
360
361    def test_conditional_download_with_newer_target(self):
362        """Test conditional binary mode download with older source file."""
363        local_target = "_test_target_"
364        # Make target file.
365        open(local_target, "w").close()
366        # Source is older (date in 1970), so don't download.
367        host = test_base.ftp_host_factory(
368                 ftp_host_class=FailingUploadAndDownloadFTPHost,
369                 session_factory=BinaryDownloadMockSession)
370        flag = host.download_if_newer("/home/older", local_target)
371        assert flag is False
372        # Remove target file
373        os.unlink(local_target)
374
375
376class TestTimeShift(unittest.TestCase):
377
378    def test_rounded_time_shift(self):
379        """Test if time shift is rounded correctly."""
380        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
381        # Use private bound method.
382        rounded_time_shift = host._FTPHost__rounded_time_shift
383        # Pairs consisting of original value and expected result
384        test_data = [
385          (      0,           0),
386          (      0.1,         0),
387          (     -0.1,         0),
388          (   1500,        1800),
389          (  -1500,       -1800),
390          (   1800,        1800),
391          (  -1800,       -1800),
392          (   2000,        1800),
393          (  -2000,       -1800),
394          ( 5*3600-100,  5*3600),
395          (-5*3600+100, -5*3600)]
396        for time_shift, expected_time_shift in test_data:
397            calculated_time_shift = rounded_time_shift(time_shift)
398            assert calculated_time_shift == expected_time_shift
399
400    def test_assert_valid_time_shift(self):
401        """Test time shift sanity checks."""
402        host = test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
403        # Use private bound method.
404        assert_time_shift = host._FTPHost__assert_valid_time_shift
405        # Valid time shifts
406        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
407        for time_shift in test_data:
408            assert assert_time_shift(time_shift) is None
409        # Invalid time shift (exceeds one day)
410        with pytest.raises(ftputil.error.TimeShiftError):
411            assert_time_shift(25*3600)
412        with pytest.raises(ftputil.error.TimeShiftError):
413            assert_time_shift(-25*3600)
414        # Invalid time shift (too large deviation from 15-minute units
415        # is unacceptable)
416        with pytest.raises(ftputil.error.TimeShiftError):
417            assert_time_shift(8*60)
418        with pytest.raises(ftputil.error.TimeShiftError):
419            assert_time_shift(-3600-8*60)
420
421    def test_synchronize_times(self):
422        """Test time synchronization with server."""
423        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
424                                          session_factory=TimeShiftMockSession)
425        # Valid time shifts
426        test_data = [
427          (60*60+30,  60*60),
428          (60*60-100, 60*60),
429          (30*60+100, 30*60),
430          (45*60-100, 45*60),
431        ]
432        for measured_time_shift, expected_time_shift in test_data:
433            host.path.set_mtime(time.time() + measured_time_shift)
434            host.synchronize_times()
435            assert host.time_shift() == expected_time_shift
436        # Invalid time shifts
437        measured_time_shifts = [60*60+8*60, 45*60-6*60]
438        for measured_time_shift in measured_time_shifts:
439            host.path.set_mtime(time.time() + measured_time_shift)
440            with pytest.raises(ftputil.error.TimeShiftError):
441                host.synchronize_times()
442
443    def test_synchronize_times_for_server_in_east(self):
444        """Test for timestamp correction (see ticket #55)."""
445        host = test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
446                                          session_factory=TimeShiftMockSession)
447        # Set this explicitly to emphasize the problem.
448        host.set_time_shift(0.0)
449        hour = 60 * 60
450        # This could be any negative time shift.
451        presumed_time_shift = -6 * hour
452        # Set `mtime` to simulate a server east of us.
453        # In case the `time_shift` value for this host instance is 0.0
454        # (as is to be expected before the time shift is determined),
455        # the directory parser (more specifically
456        # `ftputil.stat.Parser.parse_unix_time`) will return a time which
457        # is a year too far in the past. The `synchronize_times`
458        # method needs to deal with this and add the year "back".
459        # I don't think it's a bug in `parse_unix_time` because the
460        # method should work once the time shift is set correctly.
461        local_time = time.localtime()
462        local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
463        presumed_server_time = \
464          time.mktime(local_time_with_wrong_year) + presumed_time_shift
465        host.path.set_mtime(presumed_server_time)
466        host.synchronize_times()
467        assert host.time_shift() == presumed_time_shift
468
469
470class TestAcceptEitherUnicodeOrBytes(unittest.TestCase):
471    """
472    Test whether certain `FTPHost` methods accept either unicode
473    or byte strings for the path(s).
474    """
475
476    def setUp(self):
477        self.host = test_base.ftp_host_factory()
478
479    def test_upload(self):
480        """Test whether `upload` accepts either unicode or bytes."""
481        host = self.host
482        # The source file needs to be present in the current directory.
483        host.upload("Makefile", "target")
484        host.upload("Makefile", ftputil.tool.as_bytes("target"))
485
486    def test_download(self):
487        """Test whether `download` accepts either unicode or bytes."""
488        host = test_base.ftp_host_factory(
489                 session_factory=BinaryDownloadMockSession)
490        local_file_name = "_local_target_"
491        host.download("source", local_file_name)
492        host.download(ftputil.tool.as_bytes("source"), local_file_name)
493        os.remove(local_file_name)
494
495    def test_rename(self):
496        """Test whether `rename` accepts either unicode or bytes."""
497        # It's possible to mix argument types, as for `os.rename`.
498        path_as_unicode = "/home/file_name_test/ä"
499        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
500        paths = [path_as_unicode, path_as_bytes]
501        for source_path, target_path in itertools.product(paths, paths):
502            self.host.rename(source_path, target_path)
503
504    def test_listdir(self):
505        """Test whether `listdir` accepts either unicode or bytes."""
506        host = self.host
507        as_bytes = ftputil.tool.as_bytes
508        host.chdir("/home/file_name_test")
509        # Unicode
510        items = host.listdir("ä")
511        assert items == ["ö", "o"]
512        #  Need explicit type check for Python 2
513        for item in items:
514            assert isinstance(item, ftputil.compat.unicode_type)
515        # Bytes
516        items = host.listdir(as_bytes("ä"))
517        assert items == [as_bytes("ö"), as_bytes("o")]
518        #  Need explicit type check for Python 2
519        for item in items:
520            assert isinstance(item, ftputil.compat.bytes_type)
521
522    def test_chmod(self):
523        """Test whether `chmod` accepts either unicode or bytes."""
524        host = self.host
525        # The `voidcmd` implementation in `MockSession` would raise an
526        # exception for the `CHMOD` command.
527        host._session.voidcmd = host._session._ignore_arguments
528        path = "/home/file_name_test/ä"
529        host.chmod(path, 0o755)
530        host.chmod(ftputil.tool.as_bytes(path), 0o755)
531
532    def _test_method_with_single_path_argument(self, method, path):
533        method(path)
534        method(ftputil.tool.as_bytes(path))
535
536    def test_chdir(self):
537        """Test whether `chdir` accepts either unicode or bytes."""
538        self._test_method_with_single_path_argument(
539          self.host.chdir, "/home/file_name_test/ö")
540
541    def test_mkdir(self):
542        """Test whether `mkdir` accepts either unicode or bytes."""
543        # This directory exists already in the mock session, but this
544        # shouldn't matter for the test.
545        self._test_method_with_single_path_argument(
546          self.host.mkdir, "/home/file_name_test/ä")
547
548    def test_makedirs(self):
549        """Test whether `makedirs` accepts either unicode or bytes."""
550        self._test_method_with_single_path_argument(
551          self.host.makedirs, "/home/file_name_test/ä")
552
553    def test_rmdir(self):
554        """Test whether `rmdir` accepts either unicode or bytes."""
555        empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä"
556        self._test_method_with_single_path_argument(
557          self.host.rmdir, empty_directory_as_required_by_rmdir)
558
559    def test_remove(self):
560        """Test whether `remove` accepts either unicode or bytes."""
561        self._test_method_with_single_path_argument(
562          self.host.remove, "/home/file_name_test/ö")
563
564    def test_rmtree(self):
565        """Test whether `rmtree` accepts either unicode or bytes."""
566        empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä"
567        self._test_method_with_single_path_argument(
568          self.host.rmtree, empty_directory_as_required_by_rmtree)
569
570    def test_lstat(self):
571        """Test whether `lstat` accepts either unicode or bytes."""
572        self._test_method_with_single_path_argument(
573          self.host.lstat, "/home/file_name_test/ä")
574
575    def test_stat(self):
576        """Test whether `stat` accepts either unicode or bytes."""
577        self._test_method_with_single_path_argument(
578          self.host.stat, "/home/file_name_test/ä")
579
580    def test_walk(self):
581        """Test whether `walk` accepts either unicode or bytes."""
582        # We're not interested in the return value of `walk`.
583        self._test_method_with_single_path_argument(
584          self.host.walk, "/home/file_name_test/ä")
585
586
587class TestFailingPickling(unittest.TestCase):
588
589    def test_failing_pickling(self):
590        """Test if pickling (intentionally) isn't supported."""
591        with test_base.ftp_host_factory() as host:
592            with pytest.raises(TypeError):
593                pickle.dumps(host)
594            with host.open("/home/sschwarzer/index.html") as file_obj:
595                with pytest.raises(TypeError):
596                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.