source: test/scripted_session.py @ 1820:2a6cad87b143

Last change on this file since 1820:2a6cad87b143 was 1820:2a6cad87b143, checked in by Stefan Schwarzer <sschwarzer@…>, 14 months ago
Improve `Call` constructor API - Put `result` after `args` and `kwargs` because we first pass the arguments and then get a result. - Make all arguments but `method_name` keyword-only arguments to avoid confusion.
File size: 9.0 KB
Line 
1# Copyright (C) 2018-2019, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5import unittest.mock
6
7
8__all__ = ["Call", "factory"]
9
10
11class Call:
12
13    def __init__(self, method_name, *, args=None, kwargs=None, result=None):
14        self.method_name = method_name
15        self.result = result
16        self.args = args
17        self.kwargs = kwargs
18
19    def __repr__(self):
20        return ("{0.__class__.__name__}("
21                "method_name={0.method_name!r}, "
22                "result={0.result!r}, "
23                "args={0.args!r}, "
24                "kwargs={0.kwargs!r})".format(self))
25
26    def check_call(self, method_name, args=None, kwargs=None):
27        # TODO: Mention printing in the docstring.
28        # TODO: Describe how the comparison is made.
29        """
30        Check the method name, args and kwargs from this `Call` object
31        against the method name, args and kwargs from the system under test.
32
33        Raise an `AssertionError` if there's a mismatch.
34        """
35        print("  Call from session script:    {} | {!r} | {!r}".format(
36              self.method_name, self.args, self.kwargs))
37        print("  Call from system under test: {} | {!r} | {!r}".format(
38              method_name, args, kwargs))
39        def compare(value_name, script_value, sut_value):
40            if script_value is not None:
41                try:
42                    assert script_value == sut_value
43                except AssertionError:
44                    print("  Mismatch for `{}`: {!r} != {!r}".format(
45                          value_name, script_value, sut_value))
46                    raise
47        compare("method_name", self.method_name, method_name)
48        compare("args", self.args, args)
49        compare("kwargs", self.kwargs, kwargs)
50
51    @staticmethod
52    def _is_exception_class(obj):
53        """
54        Return `True` if `obj` is an exception class, else `False`.
55        """
56        try:
57            return issubclass(obj, Exception)
58        except TypeError:
59            # TypeError: issubclass() arg 1 must be a class
60            return False
61
62    def __call__(self):
63        """
64        Simulate call, returning the result or raising the exception.
65        """
66        if isinstance(self.result, Exception) or self._is_exception_class(self.result):
67            raise self.result
68        else:
69            return self.result
70
71
72class ScriptedSession:
73    """
74    "Scripted" `ftplib.FTP`-like class for testing.
75
76    To avoid actual input/output over sockets or files, specify the
77    values that should be returned by the class's methods.
78
79    The class is instantiated with a `script` argument. This is a list
80    of `Call` objects where each object specifies the name of the
81    `ftplib.FTP` method that is expected to be called and what the
82    method should return. If the value is an exception, it will be
83    raised, not returned.
84
85    In case the method returns a socket (like `transfercmd`), the
86    return value to be specified in the `Call` instance is the content
87    of the underlying socket file.
88
89    The advantage of the approach of this class over the use of
90    `unittest.mock.Mock` objects is that the sequence of calls is
91    clearly visible. With `Mock` objects, the developer must keep in
92    mind all the calls when specifying return values or side effects
93    for the mock methods.
94    """
95
96    # Class-level counter to enumerate `ScriptedSession`s. This makes it
97    # possible to make the output even more compact. Additionally, it's easier
98    # to distinguish numbers like 1, 2, etc. than hexadecimal ids.
99    _session_count = 0
100
101    @classmethod
102    def reset_session_count(cls):
103        cls._session_count = 0
104
105    def __init__(self, script):
106        self.script = script
107        # Index into `script`, the list of `Call` objects
108        self._call_index = 0
109        self.__class__._session_count += 1
110        self._session_count = self.__class__._session_count
111        # Always expect an entry for the constructor.
112        init_call = self._next_script_call("__init__")
113        # The constructor isn't supposed to return anything. The only
114        # reason to call it here is to raise an exception if that was
115        # specified in the `script`.
116        init_call()
117
118    def __str__(self):
119        return "{} {}".format(self.__class__.__name__, self._session_count)
120
121    def _next_script_call(self, requested_attribute):
122        """
123        Return next `Call` object.
124        """
125        print(self, "(in `_next_script_call`)")
126        try:
127            call = self.script[self._call_index]
128        except IndexError:
129            print("  *** Ran out of `Call` objects for this session".format(self))
130            print("  Requested attribute was {!r}".format(requested_attribute))
131            raise
132        self._call_index += 1
133        return call
134
135    def __getattr__(self, attribute_name):
136        script_call = self._next_script_call(attribute_name)
137        def dummy_method(*args, **kwargs):
138            print(self, "(in `__getattr__`)")
139            script_call.check_call(attribute_name, args, kwargs)
140            return script_call()
141        return dummy_method
142
143    # ----------------------------------------------------------------------
144    # `ftplib.FTP` methods that shouldn't be executed with the default
145    # processing in `__getattr__`
146
147    # `File.close` accesses the session `sock` object to set and reset the
148    # timeout. `sock` itself is never _called_ though, so it doesn't make sense
149    # to create a `sock` _call_.
150    sock = unittest.mock.Mock(name="socket_attribute")
151
152    def dir(self, path, callback):
153        """
154        Call the `callback` for each line in the multiline string
155        `call.result`.
156        """
157        script_call = self._next_script_call("dir")
158        # Check only the path. This requires that the corresponding `Call`
159        # object also solely specifies the path as `args`.
160        script_call.check_call("dir", (path,), None)
161        # Give `dir` the chance to raise an exception if one was specified in
162        # the `Call`'s `result` argument.
163        call_result = script_call()
164        for line in call_result.splitlines():
165            callback(line)
166
167    def ntransfercmd(self, cmd, rest=None):
168        """
169        Simulate the `ftplib.FTP.ntransfercmd` call.
170
171        `ntransfercmd` returns a tuple of a socket and a size argument. The
172        `result` value given when constructing an `ntransfercmd` call specifies
173        an `io.TextIO` or `io.BytesIO` value to be used as the
174        `Socket.makefile` result.
175        """
176        script_call = self._next_script_call("ntransfercmd")
177        script_call.check_call("ntransfercmd", (cmd, rest), None)
178        # Give `ntransfercmd` the chance to raise an exception if one was
179        # specified in the `Call`'s `result` argument.
180        call_result = script_call()
181        mock_socket = unittest.mock.Mock(name="socket")
182        mock_socket.makefile.return_value = call_result
183        # Return `None` for size. The docstring of `ftplib.FTP.ntransfercmd`
184        # says that's a possibility.
185        # TODO: Use a sensible `size` value later if it turns out we need it.
186        return mock_socket, None
187
188    def transfercmd(self, cmd, rest=None):
189        """
190        Simulate the `ftplib.FTP.transfercmd` call.
191
192        `transfercmd` returns a socket. The `result` value given when
193        constructing an `transfercmd` call specifies an `io.TextIO` or
194        `io.BytesIO` value to be used as the `Socket.makefile` result.
195        """
196        script_call = self._next_script_call("transfercmd")
197        script_call.check_call("transfercmd", (cmd, rest), None)
198        # Give `transfercmd` the chance to raise an exception if one was
199        # specified in the `Call`'s `result` argument.
200        call_result = script_call()
201        mock_socket = unittest.mock.Mock(name="socket")
202        mock_socket.makefile.return_value = call_result
203        return mock_socket
204
205
206class MultisessionFactory:
207    """
208    Return a session factory using the scripted data from the given
209    "scripts" for each consecutive call ("creation") of a factory.
210
211    Example:
212
213      host = ftputil.FTPHost(host, user, password,
214                             session_factory=scripted_session.factory(script1, script2))
215
216    When the `session_factory` is "instantiated" for the first time by
217    `FTPHost._make_session`, the factory object will use the behavior
218    described by the script `script1`. When the `session_factory` is
219    "instantiated" a second time, the factory object will use the
220    behavior described by the script `script2`.
221    """
222
223    def __init__(self, *scripts):
224        ScriptedSession.reset_session_count()
225        self._scripts = iter(scripts)
226        self.scripted_sessions = []
227
228    def __call__(self, host, user, password):
229        """
230        Call the factory.
231
232        This is equivalent to the constructor of the session (e. g.
233        `ftplib.FTP` in a real application).
234        """
235        script = next(self._scripts)
236        scripted_session = ScriptedSession(script)
237        self.scripted_sessions.append(scripted_session)
238        return scripted_session
239
240
241factory = MultisessionFactory
Note: See TracBrowser for help on using the repository browser.