1 | # Copyright (C) 2002-2016, Stefan Schwarzer <sschwarzer@sschwarzer.net> |
---|
2 | # and ftputil contributors (see `doc/contributors.txt`) |
---|
3 | # See the file LICENSE for licensing terms. |
---|
4 | |
---|
5 | from __future__ import unicode_literals |
---|
6 | |
---|
7 | import ftplib |
---|
8 | |
---|
9 | import pytest |
---|
10 | |
---|
11 | import ftputil.compat |
---|
12 | import ftputil.error |
---|
13 | |
---|
14 | from test import mock_ftplib |
---|
15 | from test import test_base |
---|
16 | |
---|
17 | |
---|
18 | # |
---|
19 | # Several customized `MockSession` classes |
---|
20 | # |
---|
21 | class ReadMockSession(mock_ftplib.MockSession): |
---|
22 | |
---|
23 | mock_file_content = b"line 1\r\nanother line\r\nyet another line" |
---|
24 | |
---|
25 | |
---|
26 | class ReadMockSessionWithMoreNewlines(mock_ftplib.MockSession): |
---|
27 | |
---|
28 | mock_file_content = b"\r\n".join(map(ftputil.compat.bytes_type, range(20))) |
---|
29 | |
---|
30 | |
---|
31 | class InaccessibleDirSession(mock_ftplib.MockSession): |
---|
32 | |
---|
33 | _login_dir = "/inaccessible" |
---|
34 | |
---|
35 | def pwd(self): |
---|
36 | return self._login_dir |
---|
37 | |
---|
38 | def cwd(self, dir): |
---|
39 | if dir in (self._login_dir, self._login_dir + "/"): |
---|
40 | raise ftplib.error_perm |
---|
41 | else: |
---|
42 | super(InaccessibleDirSession, self).cwd(dir) |
---|
43 | |
---|
44 | |
---|
45 | class TestFileOperations(object): |
---|
46 | """Test operations with file-like objects.""" |
---|
47 | |
---|
48 | def test_inaccessible_dir(self): |
---|
49 | """Test whether opening a file at an invalid location fails.""" |
---|
50 | host = test_base.ftp_host_factory( |
---|
51 | session_factory=InaccessibleDirSession) |
---|
52 | with pytest.raises(ftputil.error.FTPIOError): |
---|
53 | host.open("/inaccessible/new_file", "w") |
---|
54 | |
---|
55 | def test_caching(self): |
---|
56 | """Test whether `FTPFile` cache of `FTPHost` object works.""" |
---|
57 | host = test_base.ftp_host_factory() |
---|
58 | assert len(host._children) == 0 |
---|
59 | path1 = "path1" |
---|
60 | path2 = "path2" |
---|
61 | # Open one file and inspect cache. |
---|
62 | file1 = host.open(path1, "w") |
---|
63 | child1 = host._children[0] |
---|
64 | assert len(host._children) == 1 |
---|
65 | assert not child1._file.closed |
---|
66 | # Open another file. |
---|
67 | file2 = host.open(path2, "w") |
---|
68 | child2 = host._children[1] |
---|
69 | assert len(host._children) == 2 |
---|
70 | assert not child2._file.closed |
---|
71 | # Close first file. |
---|
72 | file1.close() |
---|
73 | assert len(host._children) == 2 |
---|
74 | assert child1._file.closed |
---|
75 | assert not child2._file.closed |
---|
76 | # Re-open first child's file. |
---|
77 | file1 = host.open(path1, "w") |
---|
78 | child1_1 = file1._host |
---|
79 | # Check if it's reused. |
---|
80 | assert child1 is child1_1 |
---|
81 | assert not child1._file.closed |
---|
82 | assert not child2._file.closed |
---|
83 | # Close second file. |
---|
84 | file2.close() |
---|
85 | assert child2._file.closed |
---|
86 | |
---|
87 | def test_write_to_directory(self): |
---|
88 | """Test whether attempting to write to a directory fails.""" |
---|
89 | host = test_base.ftp_host_factory() |
---|
90 | with pytest.raises(ftputil.error.FTPIOError): |
---|
91 | host.open("/home/sschwarzer", "w") |
---|
92 | |
---|
93 | def test_binary_read(self): |
---|
94 | """Read data from a binary file.""" |
---|
95 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
96 | with host.open("some_file", "rb") as fobj: |
---|
97 | data = fobj.read() |
---|
98 | assert data == ReadMockSession.mock_file_content |
---|
99 | |
---|
100 | def test_binary_write(self): |
---|
101 | """Write binary data with `write`.""" |
---|
102 | host = test_base.ftp_host_factory() |
---|
103 | data = b"\000a\001b\r\n\002c\003\n\004\r\005" |
---|
104 | with host.open("dummy", "wb") as output: |
---|
105 | output.write(data) |
---|
106 | child_data = mock_ftplib.content_of("dummy") |
---|
107 | expected_data = data |
---|
108 | assert child_data == expected_data |
---|
109 | |
---|
110 | def test_ascii_read(self): |
---|
111 | """Read ASCII text with plain `read`.""" |
---|
112 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
113 | with host.open("dummy", "r") as input_: |
---|
114 | data = input_.read(0) |
---|
115 | assert data == "" |
---|
116 | data = input_.read(3) |
---|
117 | assert data == "lin" |
---|
118 | data = input_.read(7) |
---|
119 | assert data == "e 1\nano" |
---|
120 | data = input_.read() |
---|
121 | assert data == "ther line\nyet another line" |
---|
122 | data = input_.read() |
---|
123 | assert data == "" |
---|
124 | |
---|
125 | def test_ascii_write(self): |
---|
126 | """Write ASCII text with `write`.""" |
---|
127 | host = test_base.ftp_host_factory() |
---|
128 | data = " \nline 2\nline 3" |
---|
129 | with host.open("dummy", "w", newline="\r\n") as output: |
---|
130 | output.write(data) |
---|
131 | child_data = mock_ftplib.content_of("dummy") |
---|
132 | # This corresponds to the byte stream, so expect a `bytes` object. |
---|
133 | expected_data = b" \r\nline 2\r\nline 3" |
---|
134 | assert child_data == expected_data |
---|
135 | |
---|
136 | # TODO: Add tests with given encoding and possibly buffering. |
---|
137 | |
---|
138 | def test_ascii_writelines(self): |
---|
139 | """Write ASCII text with `writelines`.""" |
---|
140 | host = test_base.ftp_host_factory() |
---|
141 | data = [" \n", "line 2\n", "line 3"] |
---|
142 | backup_data = data[:] |
---|
143 | output = host.open("dummy", "w", newline="\r\n") |
---|
144 | output.writelines(data) |
---|
145 | output.close() |
---|
146 | child_data = mock_ftplib.content_of("dummy") |
---|
147 | expected_data = b" \r\nline 2\r\nline 3" |
---|
148 | assert child_data == expected_data |
---|
149 | # Ensure that the original data was not modified. |
---|
150 | assert data == backup_data |
---|
151 | |
---|
152 | def test_binary_readline(self): |
---|
153 | """Read binary data with `readline`.""" |
---|
154 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
155 | input_ = host.open("dummy", "rb") |
---|
156 | data = input_.readline(3) |
---|
157 | assert data == b"lin" |
---|
158 | data = input_.readline(10) |
---|
159 | assert data == b"e 1\r\n" |
---|
160 | data = input_.readline(13) |
---|
161 | assert data == b"another line\r" |
---|
162 | data = input_.readline() |
---|
163 | assert data == b"\n" |
---|
164 | data = input_.readline() |
---|
165 | assert data == b"yet another line" |
---|
166 | data = input_.readline() |
---|
167 | assert data == b"" |
---|
168 | input_.close() |
---|
169 | |
---|
170 | def test_ascii_readline(self): |
---|
171 | """Read ASCII text with `readline`.""" |
---|
172 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
173 | input_ = host.open("dummy", "r") |
---|
174 | data = input_.readline(3) |
---|
175 | assert data == "lin" |
---|
176 | data = input_.readline(10) |
---|
177 | assert data == "e 1\n" |
---|
178 | data = input_.readline(13) |
---|
179 | assert data == "another line\n" |
---|
180 | data = input_.readline() |
---|
181 | assert data == "yet another line" |
---|
182 | data = input_.readline() |
---|
183 | assert data == "" |
---|
184 | input_.close() |
---|
185 | |
---|
186 | def test_ascii_readlines(self): |
---|
187 | """Read ASCII text with `readlines`.""" |
---|
188 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
189 | input_ = host.open("dummy", "r") |
---|
190 | data = input_.read(3) |
---|
191 | assert data == "lin" |
---|
192 | data = input_.readlines() |
---|
193 | assert data == ["e 1\n", "another line\n", "yet another line"] |
---|
194 | input_.close() |
---|
195 | |
---|
196 | def test_binary_iterator(self): |
---|
197 | """ |
---|
198 | Test the iterator interface of `FTPFile` objects (without |
---|
199 | newline conversion. |
---|
200 | """ |
---|
201 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
202 | input_ = host.open("dummy", "rb") |
---|
203 | input_iterator = iter(input_) |
---|
204 | assert next(input_iterator) == b"line 1\r\n" |
---|
205 | assert next(input_iterator) == b"another line\r\n" |
---|
206 | assert next(input_iterator) == b"yet another line" |
---|
207 | with pytest.raises(StopIteration): |
---|
208 | input_iterator.__next__() |
---|
209 | input_.close() |
---|
210 | |
---|
211 | def test_ascii_iterator(self): |
---|
212 | """ |
---|
213 | Test the iterator interface of `FTPFile` objects (with newline |
---|
214 | conversion). |
---|
215 | """ |
---|
216 | host = test_base.ftp_host_factory(session_factory=ReadMockSession) |
---|
217 | input_ = host.open("dummy") |
---|
218 | input_iterator = iter(input_) |
---|
219 | assert next(input_iterator) == "line 1\n" |
---|
220 | assert next(input_iterator) == "another line\n" |
---|
221 | assert next(input_iterator) == "yet another line" |
---|
222 | with pytest.raises(StopIteration): |
---|
223 | input_iterator.__next__() |
---|
224 | input_.close() |
---|
225 | |
---|
226 | def test_read_unknown_file(self): |
---|
227 | """Test whether reading a file which isn't there fails.""" |
---|
228 | host = test_base.ftp_host_factory() |
---|
229 | with pytest.raises(ftputil.error.FTPIOError): |
---|
230 | host.open("notthere", "r") |
---|
231 | |
---|
232 | |
---|
233 | class TestAvailableChild(object): |
---|
234 | |
---|
235 | def _failing_pwd(self, exception_class): |
---|
236 | """ |
---|
237 | Return a function that will be used instead of the |
---|
238 | `session.pwd` and will raise the exception |
---|
239 | `exception_to_raise`. |
---|
240 | """ |
---|
241 | def new_pwd(): |
---|
242 | raise exception_class("") |
---|
243 | return new_pwd |
---|
244 | |
---|
245 | def _test_with_pwd_error(self, exception_class): |
---|
246 | """ |
---|
247 | Test if reusing a child session fails because of |
---|
248 | `child_host._session.pwd` raising an exception of type |
---|
249 | `exception_class`. |
---|
250 | """ |
---|
251 | host = test_base.ftp_host_factory() |
---|
252 | # Implicitly create a child session. |
---|
253 | with host.open("/home/older") as _: |
---|
254 | pass |
---|
255 | assert len(host._children) == 1 |
---|
256 | # Make sure reusing the previous child session will fail. |
---|
257 | host._children[0]._session.pwd = self._failing_pwd(exception_class) |
---|
258 | # Try to create a new file. Since `pwd` now raises an |
---|
259 | # exception, a new child session should be created. |
---|
260 | with host.open("home/older") as _: |
---|
261 | pass |
---|
262 | assert len(host._children) == 2 |
---|
263 | |
---|
264 | def test_pwd_with_error_temp(self): |
---|
265 | """ |
---|
266 | Test if an `error_temp` in `_session.pwd` skips the child |
---|
267 | session. |
---|
268 | """ |
---|
269 | self._test_with_pwd_error(ftplib.error_temp) |
---|
270 | |
---|
271 | def test_pwd_with_error_reply(self): |
---|
272 | """ |
---|
273 | Test if an `error_reply` in `_session.pwd` skips the child |
---|
274 | session. |
---|
275 | """ |
---|
276 | self._test_with_pwd_error(ftplib.error_reply) |
---|
277 | |
---|
278 | def test_pwd_with_OSError(self): |
---|
279 | """ |
---|
280 | Test if an `OSError` in `_session.pwd` skips the child |
---|
281 | session. |
---|
282 | """ |
---|
283 | self._test_with_pwd_error(OSError) |
---|
284 | |
---|
285 | def test_pwd_with_EOFError(self): |
---|
286 | """ |
---|
287 | Test if an `EOFError` in `_session.pwd` skips the child |
---|
288 | session. |
---|
289 | """ |
---|
290 | self._test_with_pwd_error(EOFError) |
---|