Ignore:
Timestamp:
Aug 4, 2013, 1:33:28 PM (6 years ago)
Author:
Stefan Schwarzer <sschwarzer@…>
Branch:
default
Message:
Removed no longer used method `_wrapped_file`.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • ftputil/file.py

    r1334 r1335  
    119119        self._conn = None
    120120        self._fobj = None
    121 
    122     def _wrapped_file(self, fobj, is_readable=False, is_writable=False):
    123         """
    124         Return a new file-like object which wraps `fobj` and in
    125         addition has the `readable`, `readinto` and `writable` methods
    126         that `BufferedReader` or `BufferedWriter` require.
    127         """
    128         # I tried to assign the missing methods as bound methods
    129         # directly to `fobj`, but this seemingly isn't possible with
    130         # the file object returned from `socket.makefile`.
    131         class Wrapper(io.RawIOBase):
    132             def __init__(self, fobj):
    133                 super(Wrapper, self).__setattr__("_fobj", fobj)
    134             def readable(self):
    135                 return is_readable
    136             def writable(self):
    137                 return is_writable
    138             def readinto(self, bytearray_):
    139                 data = self._fobj.read(len(bytearray_))
    140                 bytearray_[:len(data)] = data
    141                 return len(data)
    142             def write(self, bytes_):
    143                 # `BufferedWriter` expects the number of written bytes
    144                 # as return value. Since we don't really know how many
    145                 # bytes are written, return the length of the full
    146                 # data, but also call `flush` to increase the chance
    147                 # that actually all bytes are written.
    148                 print("=== type(bytes):", type(bytes_))
    149                 # Use slice in case we get a `memoryview` object.
    150                 self._fobj.write(bytes_[:])
    151                 self._fobj.flush()
    152                 return len(bytes_)
    153             def __getattr__(self, name):
    154                 if name == "__IOBase_closed":
    155                     result = super(Wrapper, self).__getattr__(name)
    156                     return result
    157                 else:
    158                     result = getattr(self._fobj, name)
    159                     return result
    160             def __setattr__(self, name, value):
    161                 if name == "__IOBase_closed":
    162                     # Delegate to this (`RawIOBase`) instance.
    163                     return super(Wrapper, self).__setattr__(name, value)
    164                 else:
    165                     return setattr(self._fobj, name, value)
    166         return Wrapper(fobj)
    167121
    168122    def _open(self, path, mode, buffering=None, encoding=None, errors=None,
Note: See TracChangeset for help on using the changeset viewer.