source: test/test_host.py @ 1807:65b79d35761d

Last change on this file since 1807:65b79d35761d was 1807:65b79d35761d, checked in by Stefan Schwarzer <sschwarzer@…>, 22 months ago
Define `Call` on module level Create a reference `Call` to `scripted_session.Call` on the module level, so that not every test method has to create a local reference.
File size: 47.6 KB
Line 
1# Copyright (C) 2002-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5import datetime
6import ftplib
7import io
8import itertools
9import os
10import pickle
11import posixpath
12import random
13import time
14import unittest
15import warnings
16
17import pytest
18
19import ftputil
20import ftputil.error
21import ftputil.file
22import ftputil.tool
23import ftputil.stat
24
25from test import mock_ftplib
26from test import test_base
27import test.scripted_session as scripted_session
28
29
30Call = scripted_session.Call
31
32
33#
34# Helper functions to generate random data
35#
36def random_data(pool, size=10000):
37    """
38    Return a byte string of characters consisting of those from the
39    pool of integer numbers.
40    """
41    ordinal_list = [random.choice(pool) for i in range(size)]
42    return bytes(ordinal_list)
43
44
45def ascii_data():
46    r"""
47    Return a unicode string of "normal" ASCII characters, including `\r`.
48    """
49    pool = list(range(32, 128))
50    # The idea is to have the "\r" converted to "\n" during the later
51    # text write and check this conversion.
52    pool.append(ord("\r"))
53    return ftputil.tool.as_unicode(random_data(pool))
54
55
56def binary_data():
57    """Return a binary character byte string."""
58    pool = list(range(0, 256))
59    return random_data(pool)
60
61
62#
63# Several customized `MockSession` classes
64#
65class FailOnLoginSession(mock_ftplib.MockSession):
66
67    def __init__(self, host="", user="", password=""):
68        raise ftplib.error_perm
69
70
71class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
72
73    mock_file_content = binary_data()
74
75
76class TimeShiftMockSession(mock_ftplib.MockSession):
77
78    def delete(self, file_name):
79        pass
80
81#
82# Customized `FTPHost` class for conditional upload/download tests
83# and time shift tests
84#
85class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
86
87    def upload(self, source, target, mode=""):
88        pytest.fail("`FTPHost.upload` should not have been called")
89
90    def download(self, source, target, mode=""):
91        pytest.fail("`FTPHost.download` should not have been called")
92
93
94class TimeShiftFTPHost(ftputil.FTPHost):
95
96    class _Path:
97        def split(self, path):
98            return posixpath.split(path)
99        def set_mtime(self, mtime):
100            self._mtime = mtime
101        def getmtime(self, file_name):
102            return self._mtime
103        def join(self, *args):
104            return posixpath.join(*args)
105        def normpath(self, path):
106            return posixpath.normpath(path)
107        def isabs(self, path):
108            return posixpath.isabs(path)
109        def abspath(self, path):
110            return "/home/sschwarzer/_ftputil_sync_"
111        # Needed for `isdir` in `FTPHost.remove`
112        def isfile(self, path):
113            return True
114
115    def __init__(self, *args, **kwargs):
116        ftputil.FTPHost.__init__(self, *args, **kwargs)
117        self.path = self._Path()
118
119#
120# Test cases
121#
122class TestConstructor:
123    """
124    Test initialization of `FTPHost` objects.
125    """
126
127    def test_open_and_close(self):
128        """
129        Test if opening and closing an `FTPHost` object works as
130        expected.
131        """
132        script = [
133          Call(method_name="__init__", result=None),
134          Call(method_name="pwd", result="/"),
135          Call(method_name="close")
136        ]
137        host = test_base.ftp_host_factory(scripted_session.factory(script))
138        host.close()
139        assert host.closed is True
140        assert host._children == []
141
142    def test_invalid_login(self):
143        """Login to invalid host must fail."""
144        script = [
145          Call(method_name="__init__", result=ftplib.error_perm),
146          Call(method_name="pwd", result="/"),
147        ]
148        with pytest.raises(ftputil.error.FTPOSError):
149            test_base.ftp_host_factory(scripted_session.factory(script))
150
151    def test_pwd_normalization(self):
152        """
153        Test if the stored current directory is normalized.
154        """
155        script = [
156          Call(method_name="__init__", result=None),
157          # Deliberately return the current working directory with a
158          # trailing slash to test if it's removed when stored in the
159          # `FTPHost` instance.
160          Call(method_name="pwd", result="/home/")
161        ]
162        host = test_base.ftp_host_factory(scripted_session.factory(script))
163        assert host.getcwd() == "/home"
164
165
166class TestKeepAlive:
167
168    def test_succeeding_keep_alive(self):
169        """Assume the connection is still alive."""
170        host = test_base.ftp_host_factory()
171        host.keep_alive()
172
173    def test_failing_keep_alive(self):
174        """Assume the connection has timed out, so `keep_alive` fails."""
175        script = [
176          Call(method_name="__init__", result=None),
177          Call(method_name="pwd", result="/home"),
178          # Simulate failing `pwd` call after the server closed the connection
179          # due to a session timeout.
180          Call(method_name="pwd", result=ftplib.error_temp),
181        ]
182        host = test_base.ftp_host_factory(scripted_session.factory(script))
183        with pytest.raises(ftputil.error.TemporaryError):
184            host.keep_alive()
185
186
187class TestSetParser:
188
189    class TrivialParser(ftputil.stat.Parser):
190        """
191        An instance of this parser always returns the same result
192        from its `parse_line` method. This is all we need to check
193        if ftputil uses the set parser. No actual parsing code is
194        required here.
195        """
196
197        def __init__(self):
198            # We can't use `os.stat("/home")` directly because we
199            # later need the object's `_st_name` attribute, which
200            # we can't set on a `os.stat` stat value.
201            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
202            default_stat_result._st_name = "home"
203            self.default_stat_result = default_stat_result
204
205        def parse_line(self, line, time_shift=0.0):
206            return self.default_stat_result
207
208    def test_set_parser(self):
209        """Test if the selected parser is used."""
210        script = [
211          Call(method_name="__init__", result=None),
212          Call(method_name="pwd", result="/"),
213          Call(method_name="cwd", result=None, args=("/",)),
214          Call(method_name="cwd", result=None, args=("/",)),
215          Call(method_name="dir",
216               result="drwxr-xr-x   2 45854    200           512 May  4  2000 home"),
217          Call(method_name="cwd", result=None, args=("/",))
218        ]
219        host = test_base.ftp_host_factory(scripted_session.factory(script))
220        assert host._stat._allow_parser_switching is True
221        trivial_parser = TestSetParser.TrivialParser()
222        host.set_parser(trivial_parser)
223        stat_result = host.stat("/home")
224        assert stat_result == trivial_parser.default_stat_result
225        assert host._stat._allow_parser_switching is False
226
227
228class TestCommandNotImplementedError:
229
230    def test_command_not_implemented_error(self):
231        """
232        Test if we get the anticipated exception if a command isn't
233        implemented by the server.
234        """
235        script = [
236          Call(method_name="__init__"),
237          Call(method_name="pwd", result="/"),
238          Call(method_name="cwd", result=None, args=("/",)),
239          Call(method_name="cwd", result=None, args=("/",)),
240          # `FTPHost.chmod` only raises a `CommandNotImplementedError` when
241          # the exception text of the `ftplib.error_perm` starts with "502".
242          Call(method_name="voidcmd",
243               result=ftplib.error_perm("502 command not implemented"),
244               args=("SITE CHMOD 0644 nonexistent",)),
245          Call(method_name="cwd", result=None, args=("/",)),
246          Call(method_name="cwd", result=None, args=("/",)),
247          Call(method_name="cwd", result=None, args=("/",)),
248          # `FTPHost.chmod` only raises a `CommandNotImplementedError` when
249          # the exception text of the `ftplib.error_perm` starts with "502".
250          Call(method_name="voidcmd",
251               result=ftplib.error_perm("502 command not implemented"),
252               args=("SITE CHMOD 0644 nonexistent",)),
253          Call(method_name="cwd", result=None, args=("/",)),
254        ]
255        host = test_base.ftp_host_factory(scripted_session.factory(script))
256        with pytest.raises(ftputil.error.CommandNotImplementedError):
257            host.chmod("nonexistent", 0o644)
258        # `CommandNotImplementedError` is a subclass of `PermanentError`.
259        with pytest.raises(ftputil.error.PermanentError):
260            host.chmod("nonexistent", 0o644)
261
262
263class TestRecursiveListingForDotAsPath:
264    """
265    These tests are for issue #33, see
266    http://ftputil.sschwarzer.net/trac/ticket/33 .
267    """
268
269    def test_plain_listing(self):
270        """
271        If an empty string is passed to `FTPHost._dir` it should be passed to
272        `session.dir` unmodified.
273        """
274        script = [
275          Call(method_name="__init__"),
276          Call(method_name="pwd", result="/"),
277          Call(method_name="cwd", result=None, args=("/",)),
278          Call(method_name="cwd", result=None, args=(".",)),
279          # Check that the empty string is passed on to `session.dir`.
280          Call(method_name="dir", result="non-recursive listing", args=("",)),
281          Call(method_name="cwd", result=None, args=("/",)),
282          Call(method_name="close", result=None)
283        ]
284        host = test_base.ftp_host_factory(scripted_session.factory(script))
285        lines = host._dir(host.curdir)
286        assert lines[0] == "non-recursive listing"
287        host.close()
288
289    def test_empty_string_instead_of_dot_workaround(self):
290        """
291        If `FTPHost.listdir` is called with a dot as argument, the underlying
292        `session.dir` should _not_ be called with the dot as argument, but with
293        an empty string.
294        """
295        dir_result = """\
296total 10
297lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
298d--x--x--x   2 staff        512 Sep 24  2000 dev
299d--x--x--x   3 staff        512 Sep 25  2000 etc
300dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
301d--x--x--x   5 staff        512 Oct  3  2000 usr"""
302        script = [
303          Call(method_name="__init__"),
304          Call(method_name="pwd", result="/"),
305          Call(method_name="cwd", result=None, args=("/",)),
306          Call(method_name="cwd", result=None, args=("/",)),
307          Call(method_name="dir", result=dir_result, args=("",)),
308          Call(method_name="cwd", result=None, args=("/",)),
309          Call(method_name="close", result=None),
310        ]
311        host = test_base.ftp_host_factory(scripted_session.factory(script))
312        files = host.listdir(host.curdir)
313        assert files == ["bin", "dev", "etc", "pub", "usr"]
314        host.close()
315
316
317class TestUploadAndDownload:
318    """Test upload and download."""
319
320    def generate_file(self, data, file_name):
321        """Generate a local data file."""
322        with open(file_name, "wb") as source_file:
323            source_file.write(data)
324
325    def test_download(self):
326        """Test mode download."""
327        remote_file_name = "dummy_name"
328        remote_file_content = b"dummy_content"
329        local_target = "_test_target_"
330        host_script = [
331          Call("__init__"),
332          Call(method_name="pwd", result="/"),
333          Call(method_name="close"),
334        ]
335        file_script = [
336          Call("__init__"),
337          Call(method_name="pwd", result="/"),
338          Call(method_name="cwd", result=None, args=("/",)),
339          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
340          Call(method_name="transfercmd", result=io.BytesIO(remote_file_content),
341               args=("RETR {}".format(remote_file_name), None)),
342          Call(method_name="voidresp"),
343          Call(method_name="close")
344        ]
345        multisession_factory = scripted_session.factory(host_script, file_script)
346        host = test_base.ftp_host_factory(multisession_factory)
347        # Download
348        with host:
349            host.download(remote_file_name, local_target)
350        # Verify expected operations on mock socket as done in `FTPFile.close`.
351        # We expect one `gettimeout` and two `settimeout` calls.
352        file_session = multisession_factory.scripted_sessions[1]
353        file_session.sock.gettimeout.assert_called_once_with()
354        assert len(file_session.sock.settimeout.call_args_list) == 2
355        assert (file_session.sock.settimeout.call_args_list[0] ==
356                ((ftputil.file.FTPFile._close_timeout,), {}) )
357        assert (file_session.sock.settimeout.call_args_list[1] ==
358                ((file_session.sock.gettimeout(),), {}))
359        # Read file and compare
360        with open(local_target, "rb") as fobj:
361            data = fobj.read()
362        assert data == remote_file_content
363        # Clean up
364        os.unlink(local_target)
365
366    def test_conditional_upload_without_upload(self):
367        """
368        If the target file is newer, no upload should happen.
369        """
370        local_source = "_test_source_"
371        data = binary_data()
372        self.generate_file(data, local_source)
373        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
374                                        date_=datetime.date.today() +
375                                              datetime.timedelta(days=1),
376                                        name="newer")
377        script = [
378          Call("__init__"),
379          Call(method_name="pwd", result="/"),
380          Call(method_name="cwd", result=None, args=("/",)),
381          Call(method_name="cwd", result=None, args=("/",)),
382          Call(method_name="dir", result=dir_result, args=("",)),
383          Call(method_name="cwd", result=None, args=("/",)),
384          Call(method_name="close"),
385        ]
386        # Target is newer, so don't upload.
387        #
388        # This not only tests the return value, but also if a transfer
389        # happened. If an upload was tried, our test framework would complain
390        # about a missing scripted session for the `FTPFile` host.
391        multisession_factory = scripted_session.factory(script)
392        with test_base.ftp_host_factory(multisession_factory) as host:
393            flag = host.upload_if_newer(local_source, "/newer")
394        assert flag is False
395
396    def test_conditional_upload_with_upload(self):
397        """
398        If the target file is older or doesn't exist, the source file
399        should be uploaded.
400        """
401        file_content = b"dummy_content"
402        local_source = "_test_source_"
403        self.generate_file(file_content, local_source)
404        remote_file_name = "dummy_name"
405        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
406                                        date_=datetime.date.today() -
407                                              datetime.timedelta(days=1),
408                                        name="older")
409        host_script = [
410          Call("__init__"),
411          Call(method_name="pwd", result="/"),
412          Call(method_name="cwd", result=None, args=("/",)),
413          Call(method_name="cwd", result=None, args=("/",)),
414          Call(method_name="dir", result=dir_result, args=("",)),
415          Call(method_name="cwd", result=None, args=("/",)),
416          Call(method_name="close"),
417        ]
418        file_script = [
419          Call("__init__"),
420          Call(method_name="pwd", result="/"),
421          Call(method_name="cwd", result=None, args=("/",)),
422          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
423          Call(method_name="transfercmd",
424               result=test_base.MockableBytesIO(),
425               args=("STOR older", None)),
426          Call(method_name="voidresp", result=None, args=()),
427          Call(method_name="close"),
428        ]
429        # Target is older, so upload.
430        multisession_factory = scripted_session.factory(host_script, file_script)
431        with unittest.mock.patch("test.test_base.MockableBytesIO.write") as write_mock:
432            with test_base.ftp_host_factory(multisession_factory) as host:
433                flag = host.upload_if_newer(local_source, "/older")
434            write_mock.assert_called_with(file_content)
435        assert flag is True
436        # Target doesn't exist, so upload.
437        #  Use correct file name for this test.
438        file_script[4] = Call(method_name="transfercmd",
439                              result=test_base.MockableBytesIO(),
440                              args=("STOR notthere", None))
441        multisession_factory = scripted_session.factory(host_script, file_script)
442        with unittest.mock.patch("test.test_base.MockableBytesIO.write") as write_mock:
443            with test_base.ftp_host_factory(multisession_factory) as host:
444                flag = host.upload_if_newer(local_source, "/notthere")
445            write_mock.assert_called_with(file_content)
446        assert flag is True
447        # Clean up.
448        os.unlink(local_source)
449
450    # FIXME: We always want to delete the unneeded target file, but we
451    # only want the file content comparison if the previous test
452    # (whether the file was downloaded) succeeded.
453    def compare_and_delete_downloaded_data(self, file_name, expected_data):
454        """
455        Compare content of downloaded file with its source, then
456        delete the local target file.
457        """
458        with open(file_name, "rb") as fobj:
459            data = fobj.read()
460        try:
461            assert data == expected_data
462        finally:
463            os.unlink(file_name)
464
465    def test_conditional_download_without_target(self):
466        """
467        Test conditional binary mode download when no target file
468        exists.
469        """
470        local_target = "_test_target_"
471        data = binary_data()
472        # Target does not exist, so download.
473        #  There isn't a `dir` call to compare the datetimes of the
474        #  remote and the target file because the local `exists` call
475        #  for the local target returns `False` and the datetime
476        #  comparison therefore isn't done.
477        host_script = [
478          Call("__init__"),
479          Call(method_name="pwd", result="/"),
480          Call(method_name="close"),
481        ]
482        file_script = [
483          Call("__init__"),
484          Call(method_name="pwd", result="/"),
485          Call(method_name="cwd", result=None, args=("/",)),
486          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
487          Call(method_name="transfercmd",
488               result=io.BytesIO(data),
489               args=("RETR newer", None)),
490          Call(method_name="voidresp", result=None, args=()),
491          Call(method_name="close"),
492        ]
493        multisession_factory = scripted_session.factory(host_script, file_script)
494        try:
495            with test_base.ftp_host_factory(multisession_factory) as host:
496                flag = host.download_if_newer("/newer", local_target)
497            assert flag is True
498        finally:
499            self.compare_and_delete_downloaded_data(local_target, data)
500
501    def test_conditional_download_with_older_target(self):
502        """Test conditional binary mode download with newer source file."""
503        local_target = "_test_target_"
504        # Make target file.
505        with open(local_target, "w"):
506            pass
507        data = binary_data()
508        # Target is older, so download.
509        #  Use a date in the future. That isn't realistic, but for the
510        #  purpose of the test it's an easy way to make sure the source
511        #  file is newer than the target file.
512        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
513                                        date_=datetime.date.today() +
514                                              datetime.timedelta(days=1),
515                                        name="newer")
516        host_script = [
517          Call("__init__"),
518          Call(method_name="pwd", result="/"),
519          Call(method_name="cwd", result=None, args=("/",)),
520          Call(method_name="cwd", result=None, args=("/",)),
521          Call(method_name="dir", result=dir_result, args=("",)),
522          Call(method_name="cwd", result=None, args=("/",)),
523          Call(method_name="close"),
524        ]
525        file_script = [
526          Call("__init__"),
527          Call(method_name="pwd", result="/"),
528          Call(method_name="cwd", result=None, args=("/",)),
529          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
530          Call(method_name="transfercmd",
531               result=io.BytesIO(data),
532               args=("RETR newer", None)),
533          Call(method_name="voidresp", result=None, args=()),
534          Call(method_name="close"),
535        ]
536        multisession_factory = scripted_session.factory(host_script, file_script)
537        try:
538            with test_base.ftp_host_factory(multisession_factory) as host:
539                flag = host.download_if_newer("/newer", local_target)
540            assert flag is True
541        finally:
542            self.compare_and_delete_downloaded_data(local_target, data)
543
544    def test_conditional_download_with_newer_target(self):
545        """Test conditional binary mode download with older source file."""
546        local_target = "_test_target_"
547        # Make target file.
548        with open(local_target, "w"):
549            pass
550        data = binary_data()
551        # Use date in the past, so the target file is newer and no
552        # download happens.
553        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
554                                        date_=datetime.date.today() -
555                                              datetime.timedelta(days=1),
556                                        name="newer")
557        host_script = [
558          Call("__init__"),
559          Call(method_name="pwd", result="/"),
560          Call(method_name="cwd", result=None, args=("/",)),
561          Call(method_name="cwd", result=None, args=("/",)),
562          Call(method_name="dir", result=dir_result, args=("",)),
563          Call(method_name="cwd", result=None, args=("/",)),
564          Call(method_name="close"),
565        ]
566        file_script = [
567          Call("__init__"),
568          Call(method_name="pwd", result="/"),
569          Call(method_name="cwd", result=None, args=("/",)),
570          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
571          Call(method_name="transfercmd",
572               result=io.BytesIO(data),
573               args=("RETR newer", None)),
574          Call(method_name="voidresp", result=None, args=()),
575          Call(method_name="close"),
576        ]
577        multisession_factory = scripted_session.factory(host_script, file_script)
578        with test_base.ftp_host_factory(multisession_factory) as host:
579            flag = host.download_if_newer("/newer", local_target)
580        assert flag is False
581
582
583class TestTimeShift:
584
585    # Helper mock class that frees us from setting up complicated
586    # session scripts for the remote calls.
587    class _Path:
588        def split(self, path):
589            return posixpath.split(path)
590        def set_mtime(self, mtime):
591            self._mtime = mtime
592        def getmtime(self, file_name):
593            return self._mtime
594        def join(self, *args):
595            return posixpath.join(*args)
596        def normpath(self, path):
597            return posixpath.normpath(path)
598        def isabs(self, path):
599            return posixpath.isabs(path)
600        def abspath(self, path):
601            return "/_ftputil_sync_"
602        # Needed for `isdir` in `FTPHost.remove`
603        def isfile(self, path):
604            return True
605
606    def test_rounded_time_shift(self):
607        """Test if time shift is rounded correctly."""
608        script = [
609          Call("__init__"),
610          Call(method_name="pwd", result="/"),
611          Call(method_name="close"),
612        ]
613        multisession_factory = scripted_session.factory(script)
614        with test_base.ftp_host_factory(multisession_factory) as host:
615            # Use private bound method.
616            rounded_time_shift = host._FTPHost__rounded_time_shift
617            # Pairs consisting of original value and expected result
618            test_data = [
619              (      0,           0),
620              (      0.1,         0),
621              (     -0.1,         0),
622              (   1500,        1800),
623              (  -1500,       -1800),
624              (   1800,        1800),
625              (  -1800,       -1800),
626              (   2000,        1800),
627              (  -2000,       -1800),
628              ( 5*3600-100,  5*3600),
629              (-5*3600+100, -5*3600)]
630            for time_shift, expected_time_shift in test_data:
631                calculated_time_shift = rounded_time_shift(time_shift)
632                assert calculated_time_shift == expected_time_shift
633
634    def test_assert_valid_time_shift(self):
635        """Test time shift sanity checks."""
636        script = [
637          Call("__init__"),
638          Call(method_name="pwd", result="/"),
639          Call(method_name="close"),
640        ]
641        multisession_factory = scripted_session.factory(script)
642        with test_base.ftp_host_factory(multisession_factory) as host:
643            # Use private bound method.
644            assert_time_shift = host._FTPHost__assert_valid_time_shift
645            # Valid time shifts
646            test_data = [23*3600, -23*3600, 3600+30, -3600+30]
647            for time_shift in test_data:
648                assert assert_time_shift(time_shift) is None
649            # Invalid time shift (exceeds one day)
650            with pytest.raises(ftputil.error.TimeShiftError):
651                assert_time_shift(25*3600)
652            with pytest.raises(ftputil.error.TimeShiftError):
653                assert_time_shift(-25*3600)
654            # Invalid time shift (too large deviation from 15-minute units
655            # is unacceptable)
656            with pytest.raises(ftputil.error.TimeShiftError):
657                assert_time_shift(8*60)
658            with pytest.raises(ftputil.error.TimeShiftError):
659                assert_time_shift(-3600-8*60)
660
661    def test_synchronize_times(self):
662        """Test time synchronization with server."""
663        host_script = [
664          Call("__init__"),
665          Call(method_name="pwd", result="/"),
666          Call(method_name="cwd", result=None, args=("/",)),
667          Call(method_name="cwd", result=None, args=("/",)),
668          Call(method_name="delete", result=None, args=("_ftputil_sync_",)),
669          Call(method_name="cwd", result=None, args=("/",)),
670          Call(method_name="close"),
671        ]
672        file_script = [
673          Call("__init__"),
674          Call(method_name="pwd", result="/"),
675          Call(method_name="cwd", result=None, args=("/",)),
676          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
677          Call(method_name="transfercmd", result=io.BytesIO(),
678               args=("STOR _ftputil_sync_", None)),
679          Call(method_name="voidresp", result=None, args=()),
680          Call(method_name="close"),
681        ]
682        # Valid time shifts
683        test_data = [
684          (60*60+30,  60*60),
685          (60*60-100, 60*60),
686          (30*60+100, 30*60),
687          (45*60-100, 45*60),
688        ]
689        for measured_time_shift, expected_time_shift in test_data:
690            print("=== measured_time_shift:", measured_time_shift)
691            print("=== expected_time_shift:", expected_time_shift)
692            # Use a new `BytesIO` object to avoid exception
693            # `ValueError: I/O operation on closed file`.
694            file_script[4] = Call(method_name="transfercmd", result=io.BytesIO(),
695                                  args=("STOR _ftputil_sync_", None))
696            multisession_factory = scripted_session.factory(host_script,
697                                                            file_script)
698            with test_base.ftp_host_factory(multisession_factory) as host:
699                host.path = self._Path()
700                host.path.set_mtime(time.time() + measured_time_shift)
701                host.synchronize_times()
702                assert host.time_shift() == expected_time_shift
703        # Invalid time shifts
704        measured_time_shifts = [60*60+8*60, 45*60-6*60]
705        for measured_time_shift in measured_time_shifts:
706            print("=== measured_time_shift:", measured_time_shift)
707            # Use a new `BytesIO` object to avoid exception
708            # `ValueError: I/O operation on closed file`.
709            file_script[4] = Call(method_name="transfercmd", result=io.BytesIO(),
710                                  args=("STOR _ftputil_sync_", None))
711            multisession_factory = scripted_session.factory(host_script,
712                                                            file_script)
713            with test_base.ftp_host_factory(multisession_factory) as host:
714                host.path = self._Path()
715                host.path.set_mtime(time.time() + measured_time_shift)
716                with pytest.raises(ftputil.error.TimeShiftError):
717                    host.synchronize_times()
718
719    def test_synchronize_times_for_server_in_east(self):
720        """Test for timestamp correction (see ticket #55)."""
721        host_script = [
722          Call("__init__"),
723          Call(method_name="pwd", result="/"),
724          Call(method_name="cwd", result=None, args=("/",)),
725          Call(method_name="cwd", result=None, args=("/",)),
726          Call(method_name="delete", result=None, args=("_ftputil_sync_",)),
727          Call(method_name="cwd", result=None, args=("/",)),
728          Call(method_name="close"),
729        ]
730        file_script = [
731          Call("__init__"),
732          Call(method_name="pwd", result="/"),
733          Call(method_name="cwd", result=None, args=("/",)),
734          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
735          Call(method_name="transfercmd", result=io.BytesIO(),
736               args=("STOR _ftputil_sync_", None)),
737          Call(method_name="voidresp", result=None, args=()),
738          Call(method_name="close"),
739        ]
740        multisession_factory = scripted_session.factory(host_script, file_script)
741        with test_base.ftp_host_factory(session_factory=multisession_factory) as host:
742            host.path = self._Path()
743            # Set this explicitly to emphasize the problem.
744            host.set_time_shift(0.0)
745            hour = 60 * 60
746            # This could be any negative time shift.
747            presumed_time_shift = -6 * hour
748            # Set `mtime` to simulate a server east of us.
749            # In case the `time_shift` value for this host instance is 0.0
750            # (as is to be expected before the time shift is determined),
751            # the directory parser (more specifically
752            # `ftputil.stat.Parser.parse_unix_time`) will return a time which
753            # is a year too far in the past. The `synchronize_times`
754            # method needs to deal with this and add the year "back".
755            # I don't think it's a bug in `parse_unix_time` because the
756            # method should work once the time shift is set correctly.
757            local_time = time.localtime()
758            local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
759            presumed_server_time = \
760              time.mktime(local_time_with_wrong_year) + presumed_time_shift
761            host.path.set_mtime(presumed_server_time)
762            host.synchronize_times()
763            assert host.time_shift() == presumed_time_shift
764
765
766class TestAcceptEitherUnicodeOrBytes:
767    """
768    Test whether certain `FTPHost` methods accept either unicode
769    or byte strings for the path(s).
770    """
771
772    def test_upload(self):
773        """Test whether `upload` accepts either unicode or bytes."""
774        host_script = [
775          Call("__init__"),
776          Call(method_name="pwd", result="/"),
777          Call(method_name="close"),
778        ]
779        file_script = [
780          Call("__init__"),
781          Call(method_name="pwd", result="/"),
782          Call(method_name="cwd", result=None, args=("/",)),
783          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
784          Call(method_name="transfercmd",
785               result=io.BytesIO(),
786               args=("STOR target", None)),
787          Call(method_name="voidresp", result=None, args=()),
788          Call(method_name="close"),
789        ]
790        multisession_factory = scripted_session.factory(host_script, file_script)
791        # The source file needs to be present in the current directory.
792        with test_base.ftp_host_factory(multisession_factory) as host:
793            host.upload("Makefile", "target")
794        # Create new `BytesIO` object.
795        file_script[4] = Call(method_name="transfercmd",
796                              result=io.BytesIO(),
797                              args=("STOR target", None))
798        multisession_factory = scripted_session.factory(host_script, file_script)
799        with test_base.ftp_host_factory(multisession_factory) as host:
800            host.upload("Makefile", ftputil.tool.as_bytes("target"))
801
802    def test_download(self):
803        """Test whether `download` accepts either unicode or bytes."""
804        host_script = [
805          Call("__init__"),
806          Call(method_name="pwd", result="/"),
807          Call(method_name="close"),
808        ]
809        file_script = [
810          Call("__init__"),
811          Call(method_name="pwd", result="/"),
812          Call(method_name="cwd", result=None, args=("/",)),
813          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
814          Call(method_name="transfercmd",
815               result=io.BytesIO(),
816               args=("RETR source", None)),
817          Call(method_name="voidresp", result=None, args=()),
818          Call(method_name="close"),
819        ]
820        local_file_name = "_local_target_"
821        multisession_factory = scripted_session.factory(host_script, file_script)
822        # The source file needs to be present in the current directory.
823        with test_base.ftp_host_factory(multisession_factory) as host:
824            host.download("source", local_file_name)
825        # Create new `BytesIO` object.
826        file_script[4] = Call(method_name="transfercmd",
827                              result=io.BytesIO(),
828                              args=("RETR source", None))
829        multisession_factory = scripted_session.factory(host_script, file_script)
830        with test_base.ftp_host_factory(multisession_factory) as host:
831            host.download(ftputil.tool.as_bytes("source"), local_file_name)
832        os.remove(local_file_name)
833
834    def test_rename(self):
835        """Test whether `rename` accepts either unicode or bytes."""
836        script = [
837          Call("__init__"),
838          Call(method_name="pwd", result="/"),
839          Call(method_name="cwd", result=None, args=("/",)),
840          Call(method_name="rename", result=None, args=("/ä", "/ä")),
841          Call(method_name="close"),
842        ]
843        # It's possible to mix argument types, as for `os.rename`.
844        path_as_unicode = "/ä"
845        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
846        paths = [path_as_unicode, path_as_bytes]
847        for source_path, target_path in itertools.product(paths, paths):
848            session_factory = scripted_session.factory(script)
849            with test_base.ftp_host_factory(session_factory) as host:
850                host.rename(source_path, target_path)
851
852    def test_listdir(self):
853        """Test whether `listdir` accepts either unicode or bytes."""
854        as_bytes = ftputil.tool.as_bytes
855        top_level_dir_line = test_base.dir_line(mode_string="drwxr-xr-x",
856                                                date_=datetime.date.today(),
857                                                name="ä")
858        dir_line1 = test_base.dir_line(mode_string="-rw-r--r--",
859                                       date_=datetime.date.today(),
860                                       name="ö")
861        dir_line2 = test_base.dir_line(mode_string="-rw-r--r--",
862                                       date_=datetime.date.today(),
863                                       name="o")
864        dir_result = dir_line1 + "\n" + dir_line2
865        script = [
866          Call("__init__"),
867          Call(method_name="pwd", result="/"),
868          Call(method_name="cwd", result=None, args=("/",)),
869          Call(method_name="cwd", result=None, args=("/",)),
870          Call(method_name="dir", result=top_level_dir_line, args=("",)),
871          Call(method_name="cwd", result=None, args=("/",)),
872          Call(method_name="cwd", result=None, args=("/",)),
873          Call(method_name="cwd", result=None, args=("/ä",)),
874          Call(method_name="dir", result=dir_result, args=("",)),
875          Call(method_name="cwd", result=None, args=("/",)),
876          Call(method_name="close"),
877        ]
878        # Unicode
879        session_factory = scripted_session.factory(script)
880        with test_base.ftp_host_factory(session_factory) as host:
881            items = host.listdir("ä")
882        assert items == ["ö", "o"]
883        # Bytes
884        session_factory = scripted_session.factory(script)
885        with test_base.ftp_host_factory(session_factory) as host:
886            items = host.listdir(as_bytes("ä"))
887            assert items == [as_bytes("ö"), as_bytes("o")]
888
889    def test_chmod(self):
890        """Test whether `chmod` accepts either unicode or bytes."""
891        script = [
892          Call("__init__"),
893          Call(method_name="pwd", result="/"),
894          Call(method_name="cwd", result=None, args=("/",)),
895          Call(method_name="cwd", result=None, args=("/",)),
896          Call(method_name="voidcmd", result=None, args=("SITE CHMOD 0755 ä",)),
897          Call(method_name="cwd", result=None, args=("/",)),
898          Call(method_name="close"),
899        ]
900        path = "/ä"
901        # Unicode
902        session_factory = scripted_session.factory(script)
903        with test_base.ftp_host_factory(session_factory) as host:
904            host.chmod(path, 0o755)
905        # Bytes
906        session_factory = scripted_session.factory(script)
907        with test_base.ftp_host_factory(session_factory) as host:
908            host.chmod(ftputil.tool.as_bytes(path), 0o755)
909
910    def _test_method_with_single_path_argument(self, method_name, path, script):
911        # Unicode
912        session_factory = scripted_session.factory(script)
913        with test_base.ftp_host_factory(session_factory) as host:
914            method = getattr(host, method_name)
915            method(path)
916        # Bytes
917        session_factory = scripted_session.factory(script)
918        with test_base.ftp_host_factory(session_factory) as host:
919            method = getattr(host, method_name)
920            method(ftputil.tool.as_bytes(path))
921
922    def test_chdir(self):
923        """Test whether `chdir` accepts either unicode or bytes."""
924        script = [
925          Call("__init__"),
926          Call(method_name="pwd", result="/"),
927          Call(method_name="cwd", result=None, args=("/ö",)),
928          Call(method_name="close"),
929        ]
930        self._test_method_with_single_path_argument("chdir", "/ö", script)
931
932    def test_mkdir(self):
933        """Test whether `mkdir` accepts either unicode or bytes."""
934        script = [
935          Call("__init__"),
936          Call(method_name="pwd", result="/"),
937          Call(method_name="cwd", result=None, args=("/",)),
938          Call(method_name="cwd", result=None, args=("/",)),
939          Call(method_name="mkd", result=None, args=("ä",)),
940          Call(method_name="cwd", result=None, args=("/",)),
941          Call(method_name="close"),
942        ]
943        self._test_method_with_single_path_argument("mkdir", "/ä", script)
944
945    def test_makedirs(self):
946        """Test whether `makedirs` accepts either unicode or bytes."""
947        script = [
948          Call("__init__"),
949          Call(method_name="pwd", result="/"),
950          # To deal with ticket #86 (virtual directories), `makedirs` tries to
951          # change into each directory and if it exists (changing doesn't raise
952          # an exception), doesn't try to create it. That's why you don't see
953          # an `mkd` calls here despite originally having a `makedirs` call.
954          Call(method_name="cwd", result=None, args=("/ä",)),
955          Call(method_name="cwd", result=None, args=("/ä/ö",)),
956          Call(method_name="cwd", result=None, args=("/",)),
957          Call(method_name="close"),
958        ]
959        self._test_method_with_single_path_argument("makedirs", "/ä/ö", script)
960
961    def test_rmdir(self):
962        """Test whether `rmdir` accepts either unicode or bytes."""
963        dir_line = test_base.dir_line(mode_string="drwxr-xr-x",
964                                      date_=datetime.date.today(),
965                                      name="empty_ä")
966        # Since the session script isn't at all obvious, I checked it with a
967        # debugger and added comments on some of the calls that happen during
968        # the `rmdir` call.
969        #
970        # `_robust_ftp_command` descends one directory at a time (see ticket
971        # #11) and restores the original directory in the end, which results in
972        # at least four calls on the FTP session object (`cwd`, `cwd`, actual
973        # method, `cwd`). It would be great if all the roundtrips to the server
974        # could be reduced.
975        script = [
976          # `FTPHost` initialization
977          Call("__init__"),
978          Call(method_name="pwd", result="/"),
979          # `host.rmdir("/empty_ä")`
980          #  `host.listdir("/empty_ä")`
981          #   `host._stat._listdir("/empty_ä")`
982          #    `host._stat.__call_with_parser_retry("/empty_ä")`
983          #     `host._stat._real_listdir("/empty_ä")`
984          #      `host.path.isdir("/empty_ä")`
985          Call(method_name="cwd", result=None, args=("/",)),
986          Call(method_name="cwd", result=None, args=("/",)),
987          Call(method_name="dir", result=dir_line, args=("",)),
988          Call(method_name="cwd", result=None, args=("/",)),
989          #      `host.path.isdir` end
990          #      `host._stat._stat_results_from_dir("/empty_ä")`
991          Call(method_name="cwd", result=None, args=("/",)),
992          Call(method_name="cwd", result=None, args=("/empty_ä",)),
993          Call(method_name="dir", result="", args=("",)),
994          Call(method_name="cwd", result=None, args=("/",)),
995          #      `host._stat._stat_results_from_dir("/empty_ä")` end
996          #  `host._session.rmd` in `host._robust_ftp_command`
997          #   `host._check_inaccessible_login_directory()`
998          Call(method_name="cwd", result=None, args=("/",)),
999          #   `host.chdir(head)` ("/")
1000          Call(method_name="cwd", result=None, args=("/",)),
1001          #   `host.rmd(tail)` ("empty_ä")
1002          Call(method_name="rmd", result=None, args=("empty_ä",)),
1003          #   `host.chdir(old_dir)` ("/")
1004          Call(method_name="cwd", result=None, args=("/",)),
1005          #
1006          Call(method_name="close")
1007        ]
1008        empty_directory_as_required_by_rmdir = "/empty_ä"
1009        self._test_method_with_single_path_argument(
1010          "rmdir", empty_directory_as_required_by_rmdir, script)
1011
1012    def test_remove(self):
1013        """Test whether `remove` accepts either unicode or bytes."""
1014        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1015                                      date_=datetime.date.today(),
1016                                      name="ö")
1017        script = [
1018          Call("__init__"),
1019          Call(method_name="pwd", result="/"),
1020          Call(method_name="cwd", result=None, args=("/",)),
1021          Call(method_name="cwd", result=None, args=("/",)),
1022          Call(method_name="dir", result=dir_line, args=("",)),
1023          Call(method_name="cwd", result=None, args=("/",)),
1024          Call(method_name="cwd", result=None, args=("/",)),
1025          Call(method_name="cwd", result=None, args=("/",)),
1026          Call(method_name="delete", result=None, args=("ö",)),
1027          Call(method_name="cwd", result=None, args=("/",)),
1028          Call(method_name="close"),
1029        ]
1030        self._test_method_with_single_path_argument("remove", "/ö", script)
1031
1032    def test_rmtree(self):
1033        """Test whether `rmtree` accepts either unicode or bytes."""
1034        dir_line = test_base.dir_line(mode_string="drwxr-xr-x",
1035                                      date_=datetime.date.today(),
1036                                      name="empty_ä")
1037        script = [
1038          Call("__init__"),
1039          Call(method_name="pwd", result="/"),
1040          Call(method_name="cwd", result=None, args=("/",)),
1041          Call(method_name="cwd", result=None, args=("/",)),
1042          # Recursive `listdir`
1043          #  Check parent (root) directory.
1044          Call(method_name="dir", result=dir_line, args=("",)),
1045          Call(method_name="cwd", result=None, args=("/",)),
1046          Call(method_name="cwd", result=None, args=("/",)),
1047          Call(method_name="cwd", result=None, args=("/empty_ä",)),
1048          #  Child directory (inside `empty_ä`)
1049          Call(method_name="dir", result="", args=("",)),
1050          Call(method_name="cwd", result=None, args=("/",)),
1051          Call(method_name="cwd", result=None, args=("/",)),
1052          Call(method_name="cwd", result=None, args=("/empty_ä",)),
1053          # Recursive `rmdir` (repeated `cwd` calls because of
1054          # `_robust_ftp_command`)
1055          Call(method_name="dir", result="", args=("",)),
1056          Call(method_name="cwd", result=None, args=("/",)),
1057          Call(method_name="cwd", result=None, args=("/",)),
1058          Call(method_name="cwd", result=None, args=("/",)),
1059          Call(method_name="rmd", result=None, args=("empty_ä",)),
1060          Call(method_name="cwd", result=None, args=("/",)),
1061          Call(method_name="close"),
1062        ]
1063        empty_directory_as_required_by_rmtree = "/empty_ä"
1064        self._test_method_with_single_path_argument(
1065          "rmtree", empty_directory_as_required_by_rmtree, script)
1066
1067    def test_lstat(self):
1068        """Test whether `lstat` accepts either unicode or bytes."""
1069        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1070                                      date_=datetime.date.today(),
1071                                      name="ä")
1072        script = [
1073          Call("__init__"),
1074          Call(method_name="pwd", result="/"),
1075          Call(method_name="cwd", result=None, args=("/",)),
1076          Call(method_name="cwd", result=None, args=("/",)),
1077          Call(method_name="dir", result=dir_line, args=("",)),
1078          Call(method_name="cwd", result=None, args=("/",)),
1079          Call(method_name="close"),
1080        ]
1081        self._test_method_with_single_path_argument("lstat", "/ä", script)
1082
1083    def test_stat(self):
1084        """Test whether `stat` accepts either unicode or bytes."""
1085        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1086                                      date_=datetime.date.today(),
1087                                      name="ä")
1088        script = [
1089          Call("__init__"),
1090          Call(method_name="pwd", result="/"),
1091          Call(method_name="cwd", result=None, args=("/",)),
1092          Call(method_name="cwd", result=None, args=("/",)),
1093          Call(method_name="dir", result=dir_line, args=("",)),
1094          Call(method_name="cwd", result=None, args=("/",)),
1095          Call(method_name="close"),
1096        ]
1097        self._test_method_with_single_path_argument("stat", "/ä", script)
1098
1099    def test_walk(self):
1100        """Test whether `walk` accepts either unicode or bytes."""
1101        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1102                                      date_=datetime.date.today(),
1103                                      name="ä")
1104        script = [
1105          Call("__init__"),
1106          Call(method_name="pwd", result="/"),
1107          Call(method_name="cwd", result=None, args=("/",)),
1108          Call(method_name="cwd", result=None, args=("/",)),
1109          Call(method_name="dir", result=dir_line, args=("",)),
1110          Call(method_name="cwd", result=None, args=("/",)),
1111          Call(method_name="close"),
1112        ]
1113        # We're not interested in the return value of `walk`.
1114        # Unicode
1115        session_factory = scripted_session.factory(script)
1116        with test_base.ftp_host_factory(session_factory) as host:
1117            result = list(host.walk("/ä"))
1118        # Bytes
1119        session_factory = scripted_session.factory(script)
1120        with test_base.ftp_host_factory(session_factory) as host:
1121            result = list(host.walk(ftputil.tool.as_bytes("/ä")))
1122
1123
1124class TestFailingPickling:
1125
1126    def test_failing_pickling(self):
1127        """Test if pickling (intentionally) isn't supported."""
1128        with test_base.ftp_host_factory() as host:
1129            with pytest.raises(TypeError):
1130                pickle.dumps(host)
1131            with host.open("/home/sschwarzer/index.html") as file_obj:
1132                with pytest.raises(TypeError):
1133                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.