source: test/test_host.py @ 1806:6a898515802d

Last change on this file since 1806:6a898515802d was 1806:6a898515802d, checked in by Stefan Schwarzer <sschwarzer@…>, 14 months ago
Use `ScriptedSession` in `TestAcceptEitherUnicodeOrBytes` Only during working on this I became aware how many `cwd` calls happen for most of the FTP commands. This was kind of hidden with the previous implementation which faked the FTP server and didn't require any specification (or look at) the actually executed FTP/session commands.
File size: 48.8 KB
Line 
1# Copyright (C) 2002-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5import datetime
6import ftplib
7import io
8import itertools
9import os
10import pickle
11import posixpath
12import random
13import time
14import unittest
15import warnings
16
17import pytest
18
19import ftputil
20import ftputil.error
21import ftputil.file
22import ftputil.tool
23import ftputil.stat
24
25from test import mock_ftplib
26from test import test_base
27import test.scripted_session as scripted_session
28
29
30#
31# Helper functions to generate random data
32#
33def random_data(pool, size=10000):
34    """
35    Return a byte string of characters consisting of those from the
36    pool of integer numbers.
37    """
38    ordinal_list = [random.choice(pool) for i in range(size)]
39    return bytes(ordinal_list)
40
41
42def ascii_data():
43    r"""
44    Return a unicode string of "normal" ASCII characters, including `\r`.
45    """
46    pool = list(range(32, 128))
47    # The idea is to have the "\r" converted to "\n" during the later
48    # text write and check this conversion.
49    pool.append(ord("\r"))
50    return ftputil.tool.as_unicode(random_data(pool))
51
52
53def binary_data():
54    """Return a binary character byte string."""
55    pool = list(range(0, 256))
56    return random_data(pool)
57
58
59#
60# Several customized `MockSession` classes
61#
62class FailOnLoginSession(mock_ftplib.MockSession):
63
64    def __init__(self, host="", user="", password=""):
65        raise ftplib.error_perm
66
67
68class BinaryDownloadMockSession(mock_ftplib.MockUnixFormatSession):
69
70    mock_file_content = binary_data()
71
72
73class TimeShiftMockSession(mock_ftplib.MockSession):
74
75    def delete(self, file_name):
76        pass
77
78#
79# Customized `FTPHost` class for conditional upload/download tests
80# and time shift tests
81#
82class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
83
84    def upload(self, source, target, mode=""):
85        pytest.fail("`FTPHost.upload` should not have been called")
86
87    def download(self, source, target, mode=""):
88        pytest.fail("`FTPHost.download` should not have been called")
89
90
91class TimeShiftFTPHost(ftputil.FTPHost):
92
93    class _Path:
94        def split(self, path):
95            return posixpath.split(path)
96        def set_mtime(self, mtime):
97            self._mtime = mtime
98        def getmtime(self, file_name):
99            return self._mtime
100        def join(self, *args):
101            return posixpath.join(*args)
102        def normpath(self, path):
103            return posixpath.normpath(path)
104        def isabs(self, path):
105            return posixpath.isabs(path)
106        def abspath(self, path):
107            return "/home/sschwarzer/_ftputil_sync_"
108        # Needed for `isdir` in `FTPHost.remove`
109        def isfile(self, path):
110            return True
111
112    def __init__(self, *args, **kwargs):
113        ftputil.FTPHost.__init__(self, *args, **kwargs)
114        self.path = self._Path()
115
116#
117# Test cases
118#
119class TestConstructor:
120    """
121    Test initialization of `FTPHost` objects.
122    """
123
124    def test_open_and_close(self):
125        """
126        Test if opening and closing an `FTPHost` object works as
127        expected.
128        """
129        script = [
130          scripted_session.Call(method_name="__init__", result=None),
131          scripted_session.Call(method_name="pwd", result="/"),
132          scripted_session.Call(method_name="close")
133        ]
134        host = test_base.ftp_host_factory(scripted_session.factory(script))
135        host.close()
136        assert host.closed is True
137        assert host._children == []
138
139    def test_invalid_login(self):
140        """Login to invalid host must fail."""
141        script = [
142          scripted_session.Call(method_name="__init__", result=ftplib.error_perm),
143          scripted_session.Call(method_name="pwd", result="/"),
144        ]
145        with pytest.raises(ftputil.error.FTPOSError):
146            test_base.ftp_host_factory(scripted_session.factory(script))
147
148    def test_pwd_normalization(self):
149        """
150        Test if the stored current directory is normalized.
151        """
152        script = [
153          scripted_session.Call(method_name="__init__", result=None),
154          # Deliberately return the current working directory with a
155          # trailing slash to test if it's removed when stored in the
156          # `FTPHost` instance.
157          scripted_session.Call(method_name="pwd", result="/home/")
158        ]
159        host = test_base.ftp_host_factory(scripted_session.factory(script))
160        assert host.getcwd() == "/home"
161
162
163class TestKeepAlive:
164
165    def test_succeeding_keep_alive(self):
166        """Assume the connection is still alive."""
167        host = test_base.ftp_host_factory()
168        host.keep_alive()
169
170    def test_failing_keep_alive(self):
171        """Assume the connection has timed out, so `keep_alive` fails."""
172        script = [
173          scripted_session.Call(method_name="__init__", result=None),
174          scripted_session.Call(method_name="pwd", result="/home"),
175          # Simulate failing `pwd` call after the server closed the connection
176          # due to a session timeout.
177          scripted_session.Call(method_name="pwd", result=ftplib.error_temp),
178        ]
179        host = test_base.ftp_host_factory(scripted_session.factory(script))
180        with pytest.raises(ftputil.error.TemporaryError):
181            host.keep_alive()
182
183
184class TestSetParser:
185
186    class TrivialParser(ftputil.stat.Parser):
187        """
188        An instance of this parser always returns the same result
189        from its `parse_line` method. This is all we need to check
190        if ftputil uses the set parser. No actual parsing code is
191        required here.
192        """
193
194        def __init__(self):
195            # We can't use `os.stat("/home")` directly because we
196            # later need the object's `_st_name` attribute, which
197            # we can't set on a `os.stat` stat value.
198            default_stat_result = ftputil.stat.StatResult(os.stat("/home"))
199            default_stat_result._st_name = "home"
200            self.default_stat_result = default_stat_result
201
202        def parse_line(self, line, time_shift=0.0):
203            return self.default_stat_result
204
205    def test_set_parser(self):
206        """Test if the selected parser is used."""
207        Call = scripted_session.Call
208        script = [
209          Call(method_name="__init__", result=None),
210          Call(method_name="pwd", result="/"),
211          Call(method_name="cwd", result=None, args=("/",)),
212          Call(method_name="cwd", result=None, args=("/",)),
213          Call(method_name="dir",
214               result="drwxr-xr-x   2 45854    200           512 May  4  2000 home"),
215          Call(method_name="cwd", result=None, args=("/",))
216        ]
217        host = test_base.ftp_host_factory(scripted_session.factory(script))
218        assert host._stat._allow_parser_switching is True
219        trivial_parser = TestSetParser.TrivialParser()
220        host.set_parser(trivial_parser)
221        stat_result = host.stat("/home")
222        assert stat_result == trivial_parser.default_stat_result
223        assert host._stat._allow_parser_switching is False
224
225
226class TestCommandNotImplementedError:
227
228    def test_command_not_implemented_error(self):
229        """
230        Test if we get the anticipated exception if a command isn't
231        implemented by the server.
232        """
233        Call = scripted_session.Call
234        script = [
235          Call(method_name="__init__"),
236          Call(method_name="pwd", result="/"),
237          Call(method_name="cwd", result=None, args=("/",)),
238          Call(method_name="cwd", result=None, args=("/",)),
239          # `FTPHost.chmod` only raises a `CommandNotImplementedError` when
240          # the exception text of the `ftplib.error_perm` starts with "502".
241          Call(method_name="voidcmd",
242               result=ftplib.error_perm("502 command not implemented"),
243               args=("SITE CHMOD 0644 nonexistent",)),
244          Call(method_name="cwd", result=None, args=("/",)),
245          Call(method_name="cwd", result=None, args=("/",)),
246          Call(method_name="cwd", result=None, args=("/",)),
247          # `FTPHost.chmod` only raises a `CommandNotImplementedError` when
248          # the exception text of the `ftplib.error_perm` starts with "502".
249          Call(method_name="voidcmd",
250               result=ftplib.error_perm("502 command not implemented"),
251               args=("SITE CHMOD 0644 nonexistent",)),
252          Call(method_name="cwd", result=None, args=("/",)),
253        ]
254        host = test_base.ftp_host_factory(scripted_session.factory(script))
255        with pytest.raises(ftputil.error.CommandNotImplementedError):
256            host.chmod("nonexistent", 0o644)
257        # `CommandNotImplementedError` is a subclass of `PermanentError`.
258        with pytest.raises(ftputil.error.PermanentError):
259            host.chmod("nonexistent", 0o644)
260
261
262class TestRecursiveListingForDotAsPath:
263    """
264    These tests are for issue #33, see
265    http://ftputil.sschwarzer.net/trac/ticket/33 .
266    """
267
268    def test_plain_listing(self):
269        """
270        If an empty string is passed to `FTPHost._dir` it should be passed to
271        `session.dir` unmodified.
272        """
273        Call = scripted_session.Call
274        script = [
275          Call(method_name="__init__"),
276          Call(method_name="pwd", result="/"),
277          Call(method_name="cwd", result=None, args=("/",)),
278          Call(method_name="cwd", result=None, args=(".",)),
279          # Check that the empty string is passed on to `session.dir`.
280          Call(method_name="dir", result="non-recursive listing", args=("",)),
281          Call(method_name="cwd", result=None, args=("/",)),
282          Call(method_name="close", result=None)
283        ]
284        host = test_base.ftp_host_factory(scripted_session.factory(script))
285        lines = host._dir(host.curdir)
286        assert lines[0] == "non-recursive listing"
287        host.close()
288
289    def test_empty_string_instead_of_dot_workaround(self):
290        """
291        If `FTPHost.listdir` is called with a dot as argument, the underlying
292        `session.dir` should _not_ be called with the dot as argument, but with
293        an empty string.
294        """
295        Call = scripted_session.Call
296        dir_result = """\
297total 10
298lrwxrwxrwx   1 staff          7 Aug 13  2003 bin -> usr/bin
299d--x--x--x   2 staff        512 Sep 24  2000 dev
300d--x--x--x   3 staff        512 Sep 25  2000 etc
301dr-xr-xr-x   3 staff        512 Oct  3  2000 pub
302d--x--x--x   5 staff        512 Oct  3  2000 usr"""
303        script = [
304          Call(method_name="__init__"),
305          Call(method_name="pwd", result="/"),
306          Call(method_name="cwd", result=None, args=("/",)),
307          Call(method_name="cwd", result=None, args=("/",)),
308          Call(method_name="dir", result=dir_result, args=("",)),
309          Call(method_name="cwd", result=None, args=("/",)),
310          Call(method_name="close", result=None),
311        ]
312        host = test_base.ftp_host_factory(scripted_session.factory(script))
313        files = host.listdir(host.curdir)
314        assert files == ["bin", "dev", "etc", "pub", "usr"]
315        host.close()
316
317
318class TestUploadAndDownload:
319    """Test upload and download."""
320
321    def generate_file(self, data, file_name):
322        """Generate a local data file."""
323        with open(file_name, "wb") as source_file:
324            source_file.write(data)
325
326    def test_download(self):
327        """Test mode download."""
328        Call = scripted_session.Call
329        remote_file_name = "dummy_name"
330        remote_file_content = b"dummy_content"
331        local_target = "_test_target_"
332        host_script = [
333          Call("__init__"),
334          Call(method_name="pwd", result="/"),
335          Call(method_name="close"),
336        ]
337        file_script = [
338          Call("__init__"),
339          Call(method_name="pwd", result="/"),
340          Call(method_name="cwd", result=None, args=("/",)),
341          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
342          Call(method_name="transfercmd", result=io.BytesIO(remote_file_content),
343               args=("RETR {}".format(remote_file_name), None)),
344          Call(method_name="voidresp"),
345          Call(method_name="close")
346        ]
347        multisession_factory = scripted_session.factory(host_script, file_script)
348        host = test_base.ftp_host_factory(multisession_factory)
349        # Download
350        with host:
351            host.download(remote_file_name, local_target)
352        # Verify expected operations on mock socket as done in `FTPFile.close`.
353        # We expect one `gettimeout` and two `settimeout` calls.
354        file_session = multisession_factory.scripted_sessions[1]
355        file_session.sock.gettimeout.assert_called_once_with()
356        assert len(file_session.sock.settimeout.call_args_list) == 2
357        assert (file_session.sock.settimeout.call_args_list[0] ==
358                ((ftputil.file.FTPFile._close_timeout,), {}) )
359        assert (file_session.sock.settimeout.call_args_list[1] ==
360                ((file_session.sock.gettimeout(),), {}))
361        # Read file and compare
362        with open(local_target, "rb") as fobj:
363            data = fobj.read()
364        assert data == remote_file_content
365        # Clean up
366        os.unlink(local_target)
367
368    def test_conditional_upload_without_upload(self):
369        """
370        If the target file is newer, no upload should happen.
371        """
372        Call = scripted_session.Call
373        local_source = "_test_source_"
374        data = binary_data()
375        self.generate_file(data, local_source)
376        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
377                                        date_=datetime.date.today() +
378                                              datetime.timedelta(days=1),
379                                        name="newer")
380        script = [
381          Call("__init__"),
382          Call(method_name="pwd", result="/"),
383          Call(method_name="cwd", result=None, args=("/",)),
384          Call(method_name="cwd", result=None, args=("/",)),
385          Call(method_name="dir", result=dir_result, args=("",)),
386          Call(method_name="cwd", result=None, args=("/",)),
387          Call(method_name="close"),
388        ]
389        # Target is newer, so don't upload.
390        #
391        # This not only tests the return value, but also if a transfer
392        # happened. If an upload was tried, our test framework would complain
393        # about a missing scripted session for the `FTPFile` host.
394        multisession_factory = scripted_session.factory(script)
395        with test_base.ftp_host_factory(multisession_factory) as host:
396            flag = host.upload_if_newer(local_source, "/newer")
397        assert flag is False
398
399    def test_conditional_upload_with_upload(self):
400        """
401        If the target file is older or doesn't exist, the source file
402        should be uploaded.
403        """
404        Call = scripted_session.Call
405        file_content = b"dummy_content"
406        local_source = "_test_source_"
407        self.generate_file(file_content, local_source)
408        remote_file_name = "dummy_name"
409        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
410                                        date_=datetime.date.today() -
411                                              datetime.timedelta(days=1),
412                                        name="older")
413        host_script = [
414          Call("__init__"),
415          Call(method_name="pwd", result="/"),
416          Call(method_name="cwd", result=None, args=("/",)),
417          Call(method_name="cwd", result=None, args=("/",)),
418          Call(method_name="dir", result=dir_result, args=("",)),
419          Call(method_name="cwd", result=None, args=("/",)),
420          Call(method_name="close"),
421        ]
422        file_script = [
423          Call("__init__"),
424          Call(method_name="pwd", result="/"),
425          Call(method_name="cwd", result=None, args=("/",)),
426          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
427          Call(method_name="transfercmd",
428               result=test_base.MockableBytesIO(),
429               args=("STOR older", None)),
430          Call(method_name="voidresp", result=None, args=()),
431          Call(method_name="close"),
432        ]
433        # Target is older, so upload.
434        multisession_factory = scripted_session.factory(host_script, file_script)
435        with unittest.mock.patch("test.test_base.MockableBytesIO.write") as write_mock:
436            with test_base.ftp_host_factory(multisession_factory) as host:
437                flag = host.upload_if_newer(local_source, "/older")
438            write_mock.assert_called_with(file_content)
439        assert flag is True
440        # Target doesn't exist, so upload.
441        #  Use correct file name for this test.
442        file_script[4] = Call(method_name="transfercmd",
443                              result=test_base.MockableBytesIO(),
444                              args=("STOR notthere", None))
445        multisession_factory = scripted_session.factory(host_script, file_script)
446        with unittest.mock.patch("test.test_base.MockableBytesIO.write") as write_mock:
447            with test_base.ftp_host_factory(multisession_factory) as host:
448                flag = host.upload_if_newer(local_source, "/notthere")
449            write_mock.assert_called_with(file_content)
450        assert flag is True
451        # Clean up.
452        os.unlink(local_source)
453
454    # FIXME: We always want to delete the unneeded target file, but we
455    # only want the file content comparison if the previous test
456    # (whether the file was downloaded) succeeded.
457    def compare_and_delete_downloaded_data(self, file_name, expected_data):
458        """
459        Compare content of downloaded file with its source, then
460        delete the local target file.
461        """
462        with open(file_name, "rb") as fobj:
463            data = fobj.read()
464        try:
465            assert data == expected_data
466        finally:
467            os.unlink(file_name)
468
469    def test_conditional_download_without_target(self):
470        """
471        Test conditional binary mode download when no target file
472        exists.
473        """
474        local_target = "_test_target_"
475        data = binary_data()
476        # Target does not exist, so download.
477        Call = scripted_session.Call
478        #  There isn't a `dir` call to compare the datetimes of the
479        #  remote and the target file because the local `exists` call
480        #  for the local target returns `False` and the datetime
481        #  comparison therefore isn't done.
482        host_script = [
483          Call("__init__"),
484          Call(method_name="pwd", result="/"),
485          Call(method_name="close"),
486        ]
487        file_script = [
488          Call("__init__"),
489          Call(method_name="pwd", result="/"),
490          Call(method_name="cwd", result=None, args=("/",)),
491          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
492          Call(method_name="transfercmd",
493               result=io.BytesIO(data),
494               args=("RETR newer", None)),
495          Call(method_name="voidresp", result=None, args=()),
496          Call(method_name="close"),
497        ]
498        multisession_factory = scripted_session.factory(host_script, file_script)
499        try:
500            with test_base.ftp_host_factory(multisession_factory) as host:
501                flag = host.download_if_newer("/newer", local_target)
502            assert flag is True
503        finally:
504            self.compare_and_delete_downloaded_data(local_target, data)
505
506    def test_conditional_download_with_older_target(self):
507        """Test conditional binary mode download with newer source file."""
508        local_target = "_test_target_"
509        # Make target file.
510        with open(local_target, "w"):
511            pass
512        data = binary_data()
513        # Target is older, so download.
514        Call = scripted_session.Call
515        #  Use a date in the future. That isn't realistic, but for the
516        #  purpose of the test it's an easy way to make sure the source
517        #  file is newer than the target file.
518        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
519                                        date_=datetime.date.today() +
520                                              datetime.timedelta(days=1),
521                                        name="newer")
522        host_script = [
523          Call("__init__"),
524          Call(method_name="pwd", result="/"),
525          Call(method_name="cwd", result=None, args=("/",)),
526          Call(method_name="cwd", result=None, args=("/",)),
527          Call(method_name="dir", result=dir_result, args=("",)),
528          Call(method_name="cwd", result=None, args=("/",)),
529          Call(method_name="close"),
530        ]
531        file_script = [
532          Call("__init__"),
533          Call(method_name="pwd", result="/"),
534          Call(method_name="cwd", result=None, args=("/",)),
535          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
536          Call(method_name="transfercmd",
537               result=io.BytesIO(data),
538               args=("RETR newer", None)),
539          Call(method_name="voidresp", result=None, args=()),
540          Call(method_name="close"),
541        ]
542        multisession_factory = scripted_session.factory(host_script, file_script)
543        try:
544            with test_base.ftp_host_factory(multisession_factory) as host:
545                flag = host.download_if_newer("/newer", local_target)
546            assert flag is True
547        finally:
548            self.compare_and_delete_downloaded_data(local_target, data)
549
550    def test_conditional_download_with_newer_target(self):
551        """Test conditional binary mode download with older source file."""
552        local_target = "_test_target_"
553        # Make target file.
554        with open(local_target, "w"):
555            pass
556        data = binary_data()
557        Call = scripted_session.Call
558        # Use date in the past, so the target file is newer and no
559        # download happens.
560        dir_result = test_base.dir_line(mode_string="-rw-r--r--",
561                                        date_=datetime.date.today() -
562                                              datetime.timedelta(days=1),
563                                        name="newer")
564        host_script = [
565          Call("__init__"),
566          Call(method_name="pwd", result="/"),
567          Call(method_name="cwd", result=None, args=("/",)),
568          Call(method_name="cwd", result=None, args=("/",)),
569          Call(method_name="dir", result=dir_result, args=("",)),
570          Call(method_name="cwd", result=None, args=("/",)),
571          Call(method_name="close"),
572        ]
573        file_script = [
574          Call("__init__"),
575          Call(method_name="pwd", result="/"),
576          Call(method_name="cwd", result=None, args=("/",)),
577          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
578          Call(method_name="transfercmd",
579               result=io.BytesIO(data),
580               args=("RETR newer", None)),
581          Call(method_name="voidresp", result=None, args=()),
582          Call(method_name="close"),
583        ]
584        multisession_factory = scripted_session.factory(host_script, file_script)
585        with test_base.ftp_host_factory(multisession_factory) as host:
586            flag = host.download_if_newer("/newer", local_target)
587        assert flag is False
588
589
590class TestTimeShift:
591
592    # Helper mock class that frees us from setting up complicated
593    # session scripts for the remote calls.
594    class _Path:
595        def split(self, path):
596            return posixpath.split(path)
597        def set_mtime(self, mtime):
598            self._mtime = mtime
599        def getmtime(self, file_name):
600            return self._mtime
601        def join(self, *args):
602            return posixpath.join(*args)
603        def normpath(self, path):
604            return posixpath.normpath(path)
605        def isabs(self, path):
606            return posixpath.isabs(path)
607        def abspath(self, path):
608            return "/_ftputil_sync_"
609        # Needed for `isdir` in `FTPHost.remove`
610        def isfile(self, path):
611            return True
612
613    def test_rounded_time_shift(self):
614        """Test if time shift is rounded correctly."""
615        Call = scripted_session.Call
616        script = [
617          Call("__init__"),
618          Call(method_name="pwd", result="/"),
619          Call(method_name="close"),
620        ]
621        multisession_factory = scripted_session.factory(script)
622        with test_base.ftp_host_factory(multisession_factory) as host:
623            # Use private bound method.
624            rounded_time_shift = host._FTPHost__rounded_time_shift
625            # Pairs consisting of original value and expected result
626            test_data = [
627              (      0,           0),
628              (      0.1,         0),
629              (     -0.1,         0),
630              (   1500,        1800),
631              (  -1500,       -1800),
632              (   1800,        1800),
633              (  -1800,       -1800),
634              (   2000,        1800),
635              (  -2000,       -1800),
636              ( 5*3600-100,  5*3600),
637              (-5*3600+100, -5*3600)]
638            for time_shift, expected_time_shift in test_data:
639                calculated_time_shift = rounded_time_shift(time_shift)
640                assert calculated_time_shift == expected_time_shift
641
642    def test_assert_valid_time_shift(self):
643        """Test time shift sanity checks."""
644        Call = scripted_session.Call
645        script = [
646          Call("__init__"),
647          Call(method_name="pwd", result="/"),
648          Call(method_name="close"),
649        ]
650        multisession_factory = scripted_session.factory(script)
651        with test_base.ftp_host_factory(multisession_factory) as host:
652            # Use private bound method.
653            assert_time_shift = host._FTPHost__assert_valid_time_shift
654            # Valid time shifts
655            test_data = [23*3600, -23*3600, 3600+30, -3600+30]
656            for time_shift in test_data:
657                assert assert_time_shift(time_shift) is None
658            # Invalid time shift (exceeds one day)
659            with pytest.raises(ftputil.error.TimeShiftError):
660                assert_time_shift(25*3600)
661            with pytest.raises(ftputil.error.TimeShiftError):
662                assert_time_shift(-25*3600)
663            # Invalid time shift (too large deviation from 15-minute units
664            # is unacceptable)
665            with pytest.raises(ftputil.error.TimeShiftError):
666                assert_time_shift(8*60)
667            with pytest.raises(ftputil.error.TimeShiftError):
668                assert_time_shift(-3600-8*60)
669
670    def test_synchronize_times(self):
671        """Test time synchronization with server."""
672        Call = scripted_session.Call
673        host_script = [
674          Call("__init__"),
675          Call(method_name="pwd", result="/"),
676          Call(method_name="cwd", result=None, args=("/",)),
677          Call(method_name="cwd", result=None, args=("/",)),
678          Call(method_name="delete", result=None, args=("_ftputil_sync_",)),
679          Call(method_name="cwd", result=None, args=("/",)),
680          Call(method_name="close"),
681        ]
682        file_script = [
683          Call("__init__"),
684          Call(method_name="pwd", result="/"),
685          Call(method_name="cwd", result=None, args=("/",)),
686          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
687          Call(method_name="transfercmd", result=io.BytesIO(),
688               args=("STOR _ftputil_sync_", None)),
689          Call(method_name="voidresp", result=None, args=()),
690          Call(method_name="close"),
691        ]
692        # Valid time shifts
693        test_data = [
694          (60*60+30,  60*60),
695          (60*60-100, 60*60),
696          (30*60+100, 30*60),
697          (45*60-100, 45*60),
698        ]
699        for measured_time_shift, expected_time_shift in test_data:
700            print("=== measured_time_shift:", measured_time_shift)
701            print("=== expected_time_shift:", expected_time_shift)
702            # Use a new `BytesIO` object to avoid exception
703            # `ValueError: I/O operation on closed file`.
704            file_script[4] = Call(method_name="transfercmd", result=io.BytesIO(),
705                                  args=("STOR _ftputil_sync_", None))
706            multisession_factory = scripted_session.factory(host_script,
707                                                            file_script)
708            with test_base.ftp_host_factory(multisession_factory) as host:
709                host.path = self._Path()
710                host.path.set_mtime(time.time() + measured_time_shift)
711                host.synchronize_times()
712                assert host.time_shift() == expected_time_shift
713        # Invalid time shifts
714        measured_time_shifts = [60*60+8*60, 45*60-6*60]
715        for measured_time_shift in measured_time_shifts:
716            print("=== measured_time_shift:", measured_time_shift)
717            # Use a new `BytesIO` object to avoid exception
718            # `ValueError: I/O operation on closed file`.
719            file_script[4] = Call(method_name="transfercmd", result=io.BytesIO(),
720                                  args=("STOR _ftputil_sync_", None))
721            multisession_factory = scripted_session.factory(host_script,
722                                                            file_script)
723            with test_base.ftp_host_factory(multisession_factory) as host:
724                host.path = self._Path()
725                host.path.set_mtime(time.time() + measured_time_shift)
726                with pytest.raises(ftputil.error.TimeShiftError):
727                    host.synchronize_times()
728
729    def test_synchronize_times_for_server_in_east(self):
730        """Test for timestamp correction (see ticket #55)."""
731        Call = scripted_session.Call
732        host_script = [
733          Call("__init__"),
734          Call(method_name="pwd", result="/"),
735          Call(method_name="cwd", result=None, args=("/",)),
736          Call(method_name="cwd", result=None, args=("/",)),
737          Call(method_name="delete", result=None, args=("_ftputil_sync_",)),
738          Call(method_name="cwd", result=None, args=("/",)),
739          Call(method_name="close"),
740        ]
741        file_script = [
742          Call("__init__"),
743          Call(method_name="pwd", result="/"),
744          Call(method_name="cwd", result=None, args=("/",)),
745          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
746          Call(method_name="transfercmd", result=io.BytesIO(),
747               args=("STOR _ftputil_sync_", None)),
748          Call(method_name="voidresp", result=None, args=()),
749          Call(method_name="close"),
750        ]
751        multisession_factory = scripted_session.factory(host_script, file_script)
752        with test_base.ftp_host_factory(session_factory=multisession_factory) as host:
753            host.path = self._Path()
754            # Set this explicitly to emphasize the problem.
755            host.set_time_shift(0.0)
756            hour = 60 * 60
757            # This could be any negative time shift.
758            presumed_time_shift = -6 * hour
759            # Set `mtime` to simulate a server east of us.
760            # In case the `time_shift` value for this host instance is 0.0
761            # (as is to be expected before the time shift is determined),
762            # the directory parser (more specifically
763            # `ftputil.stat.Parser.parse_unix_time`) will return a time which
764            # is a year too far in the past. The `synchronize_times`
765            # method needs to deal with this and add the year "back".
766            # I don't think it's a bug in `parse_unix_time` because the
767            # method should work once the time shift is set correctly.
768            local_time = time.localtime()
769            local_time_with_wrong_year = (local_time.tm_year-1,) + local_time[1:]
770            presumed_server_time = \
771              time.mktime(local_time_with_wrong_year) + presumed_time_shift
772            host.path.set_mtime(presumed_server_time)
773            host.synchronize_times()
774            assert host.time_shift() == presumed_time_shift
775
776
777class TestAcceptEitherUnicodeOrBytes:
778    """
779    Test whether certain `FTPHost` methods accept either unicode
780    or byte strings for the path(s).
781    """
782
783    def test_upload(self):
784        """Test whether `upload` accepts either unicode or bytes."""
785        Call = scripted_session.Call
786        host_script = [
787          Call("__init__"),
788          Call(method_name="pwd", result="/"),
789          Call(method_name="close"),
790        ]
791        file_script = [
792          Call("__init__"),
793          Call(method_name="pwd", result="/"),
794          Call(method_name="cwd", result=None, args=("/",)),
795          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
796          Call(method_name="transfercmd",
797               result=io.BytesIO(),
798               args=("STOR target", None)),
799          Call(method_name="voidresp", result=None, args=()),
800          Call(method_name="close"),
801        ]
802        multisession_factory = scripted_session.factory(host_script, file_script)
803        # The source file needs to be present in the current directory.
804        with test_base.ftp_host_factory(multisession_factory) as host:
805            host.upload("Makefile", "target")
806        # Create new `BytesIO` object.
807        file_script[4] = Call(method_name="transfercmd",
808                              result=io.BytesIO(),
809                              args=("STOR target", None))
810        multisession_factory = scripted_session.factory(host_script, file_script)
811        with test_base.ftp_host_factory(multisession_factory) as host:
812            host.upload("Makefile", ftputil.tool.as_bytes("target"))
813
814    def test_download(self):
815        """Test whether `download` accepts either unicode or bytes."""
816        Call = scripted_session.Call
817        host_script = [
818          Call("__init__"),
819          Call(method_name="pwd", result="/"),
820          Call(method_name="close"),
821        ]
822        file_script = [
823          Call("__init__"),
824          Call(method_name="pwd", result="/"),
825          Call(method_name="cwd", result=None, args=("/",)),
826          Call(method_name="voidcmd", result=None, args=("TYPE I",)),
827          Call(method_name="transfercmd",
828               result=io.BytesIO(),
829               args=("RETR source", None)),
830          Call(method_name="voidresp", result=None, args=()),
831          Call(method_name="close"),
832        ]
833        local_file_name = "_local_target_"
834        multisession_factory = scripted_session.factory(host_script, file_script)
835        # The source file needs to be present in the current directory.
836        with test_base.ftp_host_factory(multisession_factory) as host:
837            host.download("source", local_file_name)
838        # Create new `BytesIO` object.
839        file_script[4] = Call(method_name="transfercmd",
840                              result=io.BytesIO(),
841                              args=("RETR source", None))
842        multisession_factory = scripted_session.factory(host_script, file_script)
843        with test_base.ftp_host_factory(multisession_factory) as host:
844            host.download(ftputil.tool.as_bytes("source"), local_file_name)
845        os.remove(local_file_name)
846
847    def test_rename(self):
848        """Test whether `rename` accepts either unicode or bytes."""
849        Call = scripted_session.Call
850        script = [
851          Call("__init__"),
852          Call(method_name="pwd", result="/"),
853          Call(method_name="cwd", result=None, args=("/",)),
854          Call(method_name="rename", result=None, args=("/ä", "/ä")),
855          Call(method_name="close"),
856        ]
857        # It's possible to mix argument types, as for `os.rename`.
858        path_as_unicode = "/ä"
859        path_as_bytes = ftputil.tool.as_bytes(path_as_unicode)
860        paths = [path_as_unicode, path_as_bytes]
861        for source_path, target_path in itertools.product(paths, paths):
862            session_factory = scripted_session.factory(script)
863            with test_base.ftp_host_factory(session_factory) as host:
864                host.rename(source_path, target_path)
865
866    def test_listdir(self):
867        """Test whether `listdir` accepts either unicode or bytes."""
868        as_bytes = ftputil.tool.as_bytes
869        Call = scripted_session.Call
870        top_level_dir_line = test_base.dir_line(mode_string="drwxr-xr-x",
871                                                date_=datetime.date.today(),
872                                                name="ä")
873        dir_line1 = test_base.dir_line(mode_string="-rw-r--r--",
874                                       date_=datetime.date.today(),
875                                       name="ö")
876        dir_line2 = test_base.dir_line(mode_string="-rw-r--r--",
877                                       date_=datetime.date.today(),
878                                       name="o")
879        dir_result = dir_line1 + "\n" + dir_line2
880        script = [
881          Call("__init__"),
882          Call(method_name="pwd", result="/"),
883          Call(method_name="cwd", result=None, args=("/",)),
884          Call(method_name="cwd", result=None, args=("/",)),
885          Call(method_name="dir", result=top_level_dir_line, args=("",)),
886          Call(method_name="cwd", result=None, args=("/",)),
887          Call(method_name="cwd", result=None, args=("/",)),
888          Call(method_name="cwd", result=None, args=("/ä",)),
889          Call(method_name="dir", result=dir_result, args=("",)),
890          Call(method_name="cwd", result=None, args=("/",)),
891          Call(method_name="close"),
892        ]
893        # Unicode
894        session_factory = scripted_session.factory(script)
895        with test_base.ftp_host_factory(session_factory) as host:
896            items = host.listdir("ä")
897        assert items == ["ö", "o"]
898        # Bytes
899        session_factory = scripted_session.factory(script)
900        with test_base.ftp_host_factory(session_factory) as host:
901            items = host.listdir(as_bytes("ä"))
902            assert items == [as_bytes("ö"), as_bytes("o")]
903
904    def test_chmod(self):
905        """Test whether `chmod` accepts either unicode or bytes."""
906        Call = scripted_session.Call
907        script = [
908          Call("__init__"),
909          Call(method_name="pwd", result="/"),
910          Call(method_name="cwd", result=None, args=("/",)),
911          Call(method_name="cwd", result=None, args=("/",)),
912          Call(method_name="voidcmd", result=None, args=("SITE CHMOD 0755 ä",)),
913          Call(method_name="cwd", result=None, args=("/",)),
914          Call(method_name="close"),
915        ]
916        path = "/ä"
917        # Unicode
918        session_factory = scripted_session.factory(script)
919        with test_base.ftp_host_factory(session_factory) as host:
920            host.chmod(path, 0o755)
921        # Bytes
922        session_factory = scripted_session.factory(script)
923        with test_base.ftp_host_factory(session_factory) as host:
924            host.chmod(ftputil.tool.as_bytes(path), 0o755)
925
926    def _test_method_with_single_path_argument(self, method_name, path, script):
927        # Unicode
928        session_factory = scripted_session.factory(script)
929        with test_base.ftp_host_factory(session_factory) as host:
930            method = getattr(host, method_name)
931            method(path)
932        # Bytes
933        session_factory = scripted_session.factory(script)
934        with test_base.ftp_host_factory(session_factory) as host:
935            method = getattr(host, method_name)
936            method(ftputil.tool.as_bytes(path))
937
938    def test_chdir(self):
939        """Test whether `chdir` accepts either unicode or bytes."""
940        Call = scripted_session.Call
941        script = [
942          Call("__init__"),
943          Call(method_name="pwd", result="/"),
944          Call(method_name="cwd", result=None, args=("/ö",)),
945          Call(method_name="close"),
946        ]
947        self._test_method_with_single_path_argument("chdir", "/ö", script)
948
949    def test_mkdir(self):
950        """Test whether `mkdir` accepts either unicode or bytes."""
951        Call = scripted_session.Call
952        script = [
953          Call("__init__"),
954          Call(method_name="pwd", result="/"),
955          Call(method_name="cwd", result=None, args=("/",)),
956          Call(method_name="cwd", result=None, args=("/",)),
957          Call(method_name="mkd", result=None, args=("ä",)),
958          Call(method_name="cwd", result=None, args=("/",)),
959          Call(method_name="close"),
960        ]
961        self._test_method_with_single_path_argument("mkdir", "/ä", script)
962
963    def test_makedirs(self):
964        """Test whether `makedirs` accepts either unicode or bytes."""
965        Call = scripted_session.Call
966        script = [
967          Call("__init__"),
968          Call(method_name="pwd", result="/"),
969          # To deal with ticket #86 (virtual directories), `makedirs` tries to
970          # change into each directory and if it exists (changing doesn't raise
971          # an exception), doesn't try to create it. That's why you don't see
972          # an `mkd` calls here despite originally having a `makedirs` call.
973          Call(method_name="cwd", result=None, args=("/ä",)),
974          Call(method_name="cwd", result=None, args=("/ä/ö",)),
975          Call(method_name="cwd", result=None, args=("/",)),
976          Call(method_name="close"),
977        ]
978        self._test_method_with_single_path_argument("makedirs", "/ä/ö", script)
979
980    def test_rmdir(self):
981        """Test whether `rmdir` accepts either unicode or bytes."""
982        Call = scripted_session.Call
983        dir_line = test_base.dir_line(mode_string="drwxr-xr-x",
984                                      date_=datetime.date.today(),
985                                      name="empty_ä")
986        # Since the session script isn't at all obvious, I checked it with a
987        # debugger and added comments on some of the calls that happen during
988        # the `rmdir` call.
989        #
990        # `_robust_ftp_command` descends one directory at a time (see ticket
991        # #11) and restores the original directory in the end, which results in
992        # at least four calls on the FTP session object (`cwd`, `cwd`, actual
993        # method, `cwd`). It would be great if all the roundtrips to the server
994        # could be reduced.
995        script = [
996          # `FTPHost` initialization
997          Call("__init__"),
998          Call(method_name="pwd", result="/"),
999          # `host.rmdir("/empty_ä")`
1000          #  `host.listdir("/empty_ä")`
1001          #   `host._stat._listdir("/empty_ä")`
1002          #    `host._stat.__call_with_parser_retry("/empty_ä")`
1003          #     `host._stat._real_listdir("/empty_ä")`
1004          #      `host.path.isdir("/empty_ä")`
1005          Call(method_name="cwd", result=None, args=("/",)),
1006          Call(method_name="cwd", result=None, args=("/",)),
1007          Call(method_name="dir", result=dir_line, args=("",)),
1008          Call(method_name="cwd", result=None, args=("/",)),
1009          #      `host.path.isdir` end
1010          #      `host._stat._stat_results_from_dir("/empty_ä")`
1011          Call(method_name="cwd", result=None, args=("/",)),
1012          Call(method_name="cwd", result=None, args=("/empty_ä",)),
1013          Call(method_name="dir", result="", args=("",)),
1014          Call(method_name="cwd", result=None, args=("/",)),
1015          #      `host._stat._stat_results_from_dir("/empty_ä")` end
1016          #  `host._session.rmd` in `host._robust_ftp_command`
1017          #   `host._check_inaccessible_login_directory()`
1018          Call(method_name="cwd", result=None, args=("/",)),
1019          #   `host.chdir(head)` ("/")
1020          Call(method_name="cwd", result=None, args=("/",)),
1021          #   `host.rmd(tail)` ("empty_ä")
1022          Call(method_name="rmd", result=None, args=("empty_ä",)),
1023          #   `host.chdir(old_dir)` ("/")
1024          Call(method_name="cwd", result=None, args=("/",)),
1025          #
1026          Call(method_name="close")
1027        ]
1028        empty_directory_as_required_by_rmdir = "/empty_ä"
1029        self._test_method_with_single_path_argument(
1030          "rmdir", empty_directory_as_required_by_rmdir, script)
1031
1032    def test_remove(self):
1033        """Test whether `remove` accepts either unicode or bytes."""
1034        Call = scripted_session.Call
1035        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1036                                      date_=datetime.date.today(),
1037                                      name="ö")
1038        script = [
1039          Call("__init__"),
1040          Call(method_name="pwd", result="/"),
1041          Call(method_name="cwd", result=None, args=("/",)),
1042          Call(method_name="cwd", result=None, args=("/",)),
1043          Call(method_name="dir", result=dir_line, args=("",)),
1044          Call(method_name="cwd", result=None, args=("/",)),
1045          Call(method_name="cwd", result=None, args=("/",)),
1046          Call(method_name="cwd", result=None, args=("/",)),
1047          Call(method_name="delete", result=None, args=("ö",)),
1048          Call(method_name="cwd", result=None, args=("/",)),
1049          Call(method_name="close"),
1050        ]
1051        self._test_method_with_single_path_argument("remove", "/ö", script)
1052
1053    def test_rmtree(self):
1054        """Test whether `rmtree` accepts either unicode or bytes."""
1055        Call = scripted_session.Call
1056        dir_line = test_base.dir_line(mode_string="drwxr-xr-x",
1057                                      date_=datetime.date.today(),
1058                                      name="empty_ä")
1059        script = [
1060          Call("__init__"),
1061          Call(method_name="pwd", result="/"),
1062          Call(method_name="cwd", result=None, args=("/",)),
1063          Call(method_name="cwd", result=None, args=("/",)),
1064          # Recursive `listdir`
1065          #  Check parent (root) directory.
1066          Call(method_name="dir", result=dir_line, args=("",)),
1067          Call(method_name="cwd", result=None, args=("/",)),
1068          Call(method_name="cwd", result=None, args=("/",)),
1069          Call(method_name="cwd", result=None, args=("/empty_ä",)),
1070          #  Child directory (inside `empty_ä`)
1071          Call(method_name="dir", result="", args=("",)),
1072          Call(method_name="cwd", result=None, args=("/",)),
1073          Call(method_name="cwd", result=None, args=("/",)),
1074          Call(method_name="cwd", result=None, args=("/empty_ä",)),
1075          # Recursive `rmdir` (repeated `cwd` calls because of
1076          # `_robust_ftp_command`)
1077          Call(method_name="dir", result="", args=("",)),
1078          Call(method_name="cwd", result=None, args=("/",)),
1079          Call(method_name="cwd", result=None, args=("/",)),
1080          Call(method_name="cwd", result=None, args=("/",)),
1081          Call(method_name="rmd", result=None, args=("empty_ä",)),
1082          Call(method_name="cwd", result=None, args=("/",)),
1083          Call(method_name="close"),
1084        ]
1085        empty_directory_as_required_by_rmtree = "/empty_ä"
1086        self._test_method_with_single_path_argument(
1087          "rmtree", empty_directory_as_required_by_rmtree, script)
1088
1089    def test_lstat(self):
1090        """Test whether `lstat` accepts either unicode or bytes."""
1091        Call = scripted_session.Call
1092        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1093                                      date_=datetime.date.today(),
1094                                      name="ä")
1095        script = [
1096          Call("__init__"),
1097          Call(method_name="pwd", result="/"),
1098          Call(method_name="cwd", result=None, args=("/",)),
1099          Call(method_name="cwd", result=None, args=("/",)),
1100          Call(method_name="dir", result=dir_line, args=("",)),
1101          Call(method_name="cwd", result=None, args=("/",)),
1102          Call(method_name="close"),
1103        ]
1104        self._test_method_with_single_path_argument("lstat", "/ä", script)
1105
1106    def test_stat(self):
1107        """Test whether `stat` accepts either unicode or bytes."""
1108        Call = scripted_session.Call
1109        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1110                                      date_=datetime.date.today(),
1111                                      name="ä")
1112        script = [
1113          Call("__init__"),
1114          Call(method_name="pwd", result="/"),
1115          Call(method_name="cwd", result=None, args=("/",)),
1116          Call(method_name="cwd", result=None, args=("/",)),
1117          Call(method_name="dir", result=dir_line, args=("",)),
1118          Call(method_name="cwd", result=None, args=("/",)),
1119          Call(method_name="close"),
1120        ]
1121        self._test_method_with_single_path_argument("stat", "/ä", script)
1122
1123    def test_walk(self):
1124        """Test whether `walk` accepts either unicode or bytes."""
1125        Call = scripted_session.Call
1126        dir_line = test_base.dir_line(mode_string="-rw-r--r--",
1127                                      date_=datetime.date.today(),
1128                                      name="ä")
1129        script = [
1130          Call("__init__"),
1131          Call(method_name="pwd", result="/"),
1132          Call(method_name="cwd", result=None, args=("/",)),
1133          Call(method_name="cwd", result=None, args=("/",)),
1134          Call(method_name="dir", result=dir_line, args=("",)),
1135          Call(method_name="cwd", result=None, args=("/",)),
1136          Call(method_name="close"),
1137        ]
1138        # We're not interested in the return value of `walk`.
1139        # Unicode
1140        session_factory = scripted_session.factory(script)
1141        with test_base.ftp_host_factory(session_factory) as host:
1142            result = list(host.walk("/ä"))
1143        # Bytes
1144        session_factory = scripted_session.factory(script)
1145        with test_base.ftp_host_factory(session_factory) as host:
1146            result = list(host.walk(ftputil.tool.as_bytes("/ä")))
1147
1148
1149class TestFailingPickling:
1150
1151    def test_failing_pickling(self):
1152        """Test if pickling (intentionally) isn't supported."""
1153        with test_base.ftp_host_factory() as host:
1154            with pytest.raises(TypeError):
1155                pickle.dumps(host)
1156            with host.open("/home/sschwarzer/index.html") as file_obj:
1157                with pytest.raises(TypeError):
1158                    pickle.dumps(file_obj)
Note: See TracBrowser for help on using the repository browser.