1 | # Copyright (C) 2002-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net> |
---|
2 | # and ftputil contributors (see `doc/contributors.txt`) |
---|
3 | # See the file LICENSE for licensing terms. |
---|
4 | |
---|
5 | import ftplib |
---|
6 | |
---|
7 | import pytest |
---|
8 | |
---|
9 | import ftputil.compat |
---|
10 | import ftputil.error |
---|
11 | |
---|
12 | from test import mock_ftplib |
---|
13 | from test import test_base |
---|
14 | |
---|
15 | |
---|
16 | # |
---|
17 | # Several customized `MockSession` classes |
---|
18 | # |
---|
19 | class ReadMockSession(mock_ftplib.MockSession): |
---|
20 | |
---|
21 | mock_file_content = b"line 1\r\nanother line\r\nyet another line" |
---|
22 | |
---|
23 | |
---|
24 | class ReadMockSessionWithMoreNewlines(mock_ftplib.MockSession): |
---|
25 | |
---|
26 | mock_file_content = b"\r\n".join(map(ftputil.compat.bytes_type, range(20))) |
---|
27 | |
---|
28 | |
---|
29 | class InaccessibleDirSession(mock_ftplib.MockSession): |
---|
30 | |
---|
31 | _login_dir = "/inaccessible" |
---|
32 | |
---|
33 | def pwd(self): |
---|
34 | return self._login_dir |
---|
35 | |
---|
36 | def cwd(self, dir): |
---|
37 | if dir in (self._login_dir, self._login_dir + "/"): |
---|
38 | raise ftplib.error_perm |
---|
39 | else: |
---|
40 | super(InaccessibleDirSession, self).cwd(dir) |
---|
41 | |
---|
42 | |
---|
43 | class TestFileOperations: |
---|
44 | """Test operations with file-like objects.""" |
---|
45 | |
---|
46 | def test_inaccessible_dir(self): |
---|
47 | """Test whether opening a file at an invalid location fails.""" |
---|
48 | host = test_base.ftp_host_factory( |
---|
49 | session_factory=InaccessibleDirSession) |
---|
50 | with pytest.raises(ftputil.error.FTPIOError): |
---|
51 | host.open("/inaccessible/new_file", "w") |
---|
52 | |
---|
53 | def test_caching(self): |
---|
54 | """Test whether `FTPFile` cache of `FTPHost` object works.""" |
---|
55 | host = test_base.ftp_host_factory() |
---|
56 | assert len(host._children) == 0 |
---|
57 | path1 = "path1" |
---|
58 | path2 = "path2" |
---|
59 | # Open one file and inspect cache. |
---|
60 | file1 = host.open(path1, "w") |
---|
61 | child1 = host._children[0] |
---|
62 | assert len(host._children) == 1 |
---|
63 | assert not child1._file.closed |
---|
64 | # Open another file. |
---|
65 | file2 = host.open(path2, "w") |
---|
66 | child2 = host._children[1] |
---|
67 | assert len(host._children) == 2 |
---|
68 | assert not child2._file.closed |
---|
69 | # Close first file. |
---|
70 | file1.close() |
---|
71 | assert len(host._children) == 2 |
---|
72 | assert child1._file.closed |
---|
73 | assert not child2._file.closed |
---|
74 | # Re-open first child's file. |
---|
75 | file1 = host.open(path1, "w") |
---|
76 | child1_1 = file1._host |
---|
77 | # Check if it's reused. |
---|
78 | assert child1 is child1_1 |
---|
79 | assert not child1._file.closed |
---|
80 | assert not child2._file.closed |
---|
81 | # Close second file. |
---|
82 | file2.close() |
---|
83 | assert child2._file.closed |
---|
84 | |
---|
85 | def test_write_to_directory(self): |
---|
86 | """Test whether attempting to write to a directory fails.""" |
---|
87 | host = test_base.ftp_host_factory() |
---|
88 | with pytest.raises(ftputil.error.FTPIOError): |
---|
89 | host.open("/home/sschwarzer", "w") |
---|
90 | |
---|
91 | def test_binary_read(self): |
---|
92 | """Read data from a binary file.""" |
---|
93 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
94 | with host.open("some_file", "rb") as fobj: |
---|
95 | data = fobj.read() |
---|
96 | assert data == ReadMockSession.mock_file_content |
---|
97 | |
---|
98 | def test_binary_write(self): |
---|
99 | """Write binary data with `write`.""" |
---|
100 | host = test_base.ftp_host_factory() |
---|
101 | data = b"\000a\001b\r\n\002c\003\n\004\r\005" |
---|
102 | with host.open("dummy", "wb") as output: |
---|
103 | output.write(data) |
---|
104 | child_data = mock_ftplib.content_of("dummy") |
---|
105 | expected_data = data |
---|
106 | assert child_data == expected_data |
---|
107 | |
---|
108 | def test_ascii_read(self): |
---|
109 | """Read ASCII text with plain `read`.""" |
---|
110 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
111 | with host.open("dummy", "r") as input_: |
---|
112 | data = input_.read(0) |
---|
113 | assert data == "" |
---|
114 | data = input_.read(3) |
---|
115 | assert data == "lin" |
---|
116 | data = input_.read(7) |
---|
117 | assert data == "e 1\nano" |
---|
118 | data = input_.read() |
---|
119 | assert data == "ther line\nyet another line" |
---|
120 | data = input_.read() |
---|
121 | assert data == "" |
---|
122 | |
---|
123 | def test_ascii_write(self): |
---|
124 | """Write ASCII text with `write`.""" |
---|
125 | host = test_base.ftp_host_factory() |
---|
126 | data = " \nline 2\nline 3" |
---|
127 | with host.open("dummy", "w", newline="\r\n") as output: |
---|
128 | output.write(data) |
---|
129 | child_data = mock_ftplib.content_of("dummy") |
---|
130 | # This corresponds to the byte stream, so expect a `bytes` object. |
---|
131 | expected_data = b" \r\nline 2\r\nline 3" |
---|
132 | assert child_data == expected_data |
---|
133 | |
---|
134 | # TODO: Add tests with given encoding and possibly buffering. |
---|
135 | |
---|
136 | def test_ascii_writelines(self): |
---|
137 | """Write ASCII text with `writelines`.""" |
---|
138 | host = test_base.ftp_host_factory() |
---|
139 | data = [" \n", "line 2\n", "line 3"] |
---|
140 | backup_data = data[:] |
---|
141 | output = host.open("dummy", "w", newline="\r\n") |
---|
142 | output.writelines(data) |
---|
143 | output.close() |
---|
144 | child_data = mock_ftplib.content_of("dummy") |
---|
145 | expected_data = b" \r\nline 2\r\nline 3" |
---|
146 | assert child_data == expected_data |
---|
147 | # Ensure that the original data was not modified. |
---|
148 | assert data == backup_data |
---|
149 | |
---|
150 | def test_binary_readline(self): |
---|
151 | """Read binary data with `readline`.""" |
---|
152 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
153 | input_ = host.open("dummy", "rb") |
---|
154 | data = input_.readline(3) |
---|
155 | assert data == b"lin" |
---|
156 | data = input_.readline(10) |
---|
157 | assert data == b"e 1\r\n" |
---|
158 | data = input_.readline(13) |
---|
159 | assert data == b"another line\r" |
---|
160 | data = input_.readline() |
---|
161 | assert data == b"\n" |
---|
162 | data = input_.readline() |
---|
163 | assert data == b"yet another line" |
---|
164 | data = input_.readline() |
---|
165 | assert data == b"" |
---|
166 | input_.close() |
---|
167 | |
---|
168 | def test_ascii_readline(self): |
---|
169 | """Read ASCII text with `readline`.""" |
---|
170 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
171 | input_ = host.open("dummy", "r") |
---|
172 | data = input_.readline(3) |
---|
173 | assert data == "lin" |
---|
174 | data = input_.readline(10) |
---|
175 | assert data == "e 1\n" |
---|
176 | data = input_.readline(13) |
---|
177 | assert data == "another line\n" |
---|
178 | data = input_.readline() |
---|
179 | assert data == "yet another line" |
---|
180 | data = input_.readline() |
---|
181 | assert data == "" |
---|
182 | input_.close() |
---|
183 | |
---|
184 | def test_ascii_readlines(self): |
---|
185 | """Read ASCII text with `readlines`.""" |
---|
186 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
187 | input_ = host.open("dummy", "r") |
---|
188 | data = input_.read(3) |
---|
189 | assert data == "lin" |
---|
190 | data = input_.readlines() |
---|
191 | assert data == ["e 1\n", "another line\n", "yet another line"] |
---|
192 | input_.close() |
---|
193 | |
---|
194 | def test_binary_iterator(self): |
---|
195 | """ |
---|
196 | Test the iterator interface of `FTPFile` objects (without |
---|
197 | newline conversion. |
---|
198 | """ |
---|
199 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
200 | input_ = host.open("dummy", "rb") |
---|
201 | input_iterator = iter(input_) |
---|
202 | assert next(input_iterator) == b"line 1\r\n" |
---|
203 | assert next(input_iterator) == b"another line\r\n" |
---|
204 | assert next(input_iterator) == b"yet another line" |
---|
205 | with pytest.raises(StopIteration): |
---|
206 | input_iterator.__next__() |
---|
207 | input_.close() |
---|
208 | |
---|
209 | def test_ascii_iterator(self): |
---|
210 | """ |
---|
211 | Test the iterator interface of `FTPFile` objects (with newline |
---|
212 | conversion). |
---|
213 | """ |
---|
214 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
215 | input_ = host.open("dummy") |
---|
216 | input_iterator = iter(input_) |
---|
217 | assert next(input_iterator) == "line 1\n" |
---|
218 | assert next(input_iterator) == "another line\n" |
---|
219 | assert next(input_iterator) == "yet another line" |
---|
220 | with pytest.raises(StopIteration): |
---|
221 | input_iterator.__next__() |
---|
222 | input_.close() |
---|
223 | |
---|
224 | def test_read_unknown_file(self): |
---|
225 | """Test whether reading a file which isn't there fails.""" |
---|
226 | host = test_base.ftp_host_factory() |
---|
227 | with pytest.raises(ftputil.error.FTPIOError): |
---|
228 | host.open("notthere", "r") |
---|
229 | |
---|
230 | |
---|
231 | class TestAvailableChild: |
---|
232 | |
---|
233 | def _failing_pwd(self, exception_class): |
---|
234 | """ |
---|
235 | Return a function that will be used instead of the |
---|
236 | `session.pwd` and will raise the exception |
---|
237 | `exception_to_raise`. |
---|
238 | """ |
---|
239 | def new_pwd(): |
---|
240 | raise exception_class("") |
---|
241 | return new_pwd |
---|
242 | |
---|
243 | def _test_with_pwd_error(self, exception_class): |
---|
244 | """ |
---|
245 | Test if reusing a child session fails because of |
---|
246 | `child_host._session.pwd` raising an exception of type |
---|
247 | `exception_class`. |
---|
248 | """ |
---|
249 | host = test_base.ftp_host_factory() |
---|
250 | # Implicitly create a child session. |
---|
251 | with host.open("/home/older") as _: |
---|
252 | pass |
---|
253 | assert len(host._children) == 1 |
---|
254 | # Make sure reusing the previous child session will fail. |
---|
255 | host._children[0]._session.pwd = self._failing_pwd(exception_class) |
---|
256 | # Try to create a new file. Since `pwd` now raises an |
---|
257 | # exception, a new child session should be created. |
---|
258 | with host.open("home/older") as _: |
---|
259 | pass |
---|
260 | assert len(host._children) == 2 |
---|
261 | |
---|
262 | def test_pwd_with_error_temp(self): |
---|
263 | """ |
---|
264 | Test if an `error_temp` in `_session.pwd` skips the child |
---|
265 | session. |
---|
266 | """ |
---|
267 | self._test_with_pwd_error(ftplib.error_temp) |
---|
268 | |
---|
269 | def test_pwd_with_error_reply(self): |
---|
270 | """ |
---|
271 | Test if an `error_reply` in `_session.pwd` skips the child |
---|
272 | session. |
---|
273 | """ |
---|
274 | self._test_with_pwd_error(ftplib.error_reply) |
---|
275 | |
---|
276 | def test_pwd_with_OSError(self): |
---|
277 | """ |
---|
278 | Test if an `OSError` in `_session.pwd` skips the child |
---|
279 | session. |
---|
280 | """ |
---|
281 | self._test_with_pwd_error(OSError) |
---|
282 | |
---|
283 | def test_pwd_with_EOFError(self): |
---|
284 | """ |
---|
285 | Test if an `EOFError` in `_session.pwd` skips the child |
---|
286 | session. |
---|
287 | """ |
---|
288 | self._test_with_pwd_error(EOFError) |
---|