""" Low-level OS functionality wrappers used by pathlib. """ from errno import * from io import TextIOWrapper, text_encoding from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE import os import sys try: import fcntl except ImportError: fcntl = None try: import posix except ImportError: posix = None try: import _winapi except ImportError: _winapi = None def _get_copy_blocksize(infd): """Determine blocksize for fastcopying on Linux. Hopefully the whole file will be copied in a single call. The copying itself should be performed in a loop 'till EOF is reached (0 return) so a blocksize smaller or bigger than the actual file size should not make any difference, also in case the file content changes while being copied. """ try: blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB except OSError: blocksize = 2 ** 27 # 128 MiB # On 32-bit architectures truncate to 1 GiB to avoid OverflowError, # see gh-82500. if sys.maxsize < 2 ** 32: blocksize = min(blocksize, 2 ** 30) return blocksize if fcntl and hasattr(fcntl, 'FICLONE'): def _ficlone(source_fd, target_fd): """ Perform a lightweight copy of two files, where the data blocks are copied only when modified. This is known as Copy on Write (CoW), instantaneous copy or reflink. """ fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd) else: _ficlone = None if posix and hasattr(posix, '_fcopyfile'): def _fcopyfile(source_fd, target_fd): """ Copy a regular file content using high-performance fcopyfile(3) syscall (macOS). """ posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA) else: _fcopyfile = None if hasattr(os, 'copy_file_range'): def _copy_file_range(source_fd, target_fd): """ Copy data from one regular mmap-like fd to another by using a high-performance copy_file_range(2) syscall that gives filesystems an opportunity to implement the use of reflinks or server-side copy. This should work on Linux >= 4.5 only. """ blocksize = _get_copy_blocksize(source_fd) offset = 0 while True: sent = os.copy_file_range(source_fd, target_fd, blocksize, offset_dst=offset) if sent == 0: break # EOF offset += sent else: _copy_file_range = None if hasattr(os, 'sendfile'): def _sendfile(source_fd, target_fd): """Copy data from one regular mmap-like fd to another by using high-performance sendfile(2) syscall. This should work on Linux >= 2.6.33 only. """ blocksize = _get_copy_blocksize(source_fd) offset = 0 while True: sent = os.sendfile(target_fd, source_fd, offset, blocksize) if sent == 0: break # EOF offset += sent else: _sendfile = None if _winapi and hasattr(_winapi, 'CopyFile2'): def copyfile2(source, target): """ Copy from one file to another using CopyFile2 (Windows only). """ _winapi.CopyFile2(source, target, 0) else: copyfile2 = None def copyfileobj(source_f, target_f): """ Copy data from file-like object source_f to file-like object target_f. """ try: source_fd = source_f.fileno() target_fd = target_f.fileno() except Exception: pass # Fall through to generic code. else: try: # Use OS copy-on-write where available. if _ficlone: try: _ficlone(source_fd, target_fd) return except OSError as err: if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV): raise err # Use OS copy where available. if _fcopyfile: try: _fcopyfile(source_fd, target_fd) return except OSError as err: if err.errno not in (EINVAL, ENOTSUP): raise err if _copy_file_range: try: _copy_file_range(source_fd, target_fd) return except OSError as err: if err.errno not in (ETXTBSY, EXDEV): raise err if _sendfile: try: _sendfile(source_fd, target_fd) return except OSError as err: if err.errno != ENOTSOCK: raise err except OSError as err: # Produce more useful error messages. err.filename = source_f.name err.filename2 = target_f.name raise err # Last resort: copy with fileobj read() and write(). read_source = source_f.read write_target = target_f.write while buf := read_source(1024 * 1024): write_target(buf) def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None, newline=None): """ Open the file pointed to by this path and return a file object, as the built-in open() function does. """ text = 'b' not in mode if text: # Call io.text_encoding() here to ensure any warning is raised at an # appropriate stack level. encoding = text_encoding(encoding) try: return open(path, mode, buffering, encoding, errors, newline) except TypeError: pass cls = type(path) mode = ''.join(sorted(c for c in mode if c not in 'bt')) if text: try: attr = getattr(cls, f'__open_{mode}__') except AttributeError: pass else: return attr(path, buffering, encoding, errors, newline) elif encoding is not None: raise ValueError("binary mode doesn't take an encoding argument") elif errors is not None: raise ValueError("binary mode doesn't take an errors argument") elif newline is not None: raise ValueError("binary mode doesn't take a newline argument") try: attr = getattr(cls, f'__open_{mode}b__') except AttributeError: pass else: stream = attr(path, buffering) if text: stream = TextIOWrapper(stream, encoding, errors, newline) return stream raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}") def ensure_distinct_paths(source, target): """ Raise OSError(EINVAL) if the other path is within this path. """ # Note: there is no straightforward, foolproof algorithm to determine # if one directory is within another (a particularly perverse example # would be a single network share mounted in one location via NFS, and # in another location via CIFS), so we simply checks whether the # other path is lexically equal to, or within, this path. if source == target: err = OSError(EINVAL, "Source and target are the same path") elif source in target.parents: err = OSError(EINVAL, "Source path is a parent of target path") else: return err.filename = str(source) err.filename2 = str(target) raise err def ensure_different_files(source, target): """ Raise OSError(EINVAL) if both paths refer to the same file. """ try: source_file_id = source.info._file_id target_file_id = target.info._file_id except AttributeError: if source != target: return else: try: if source_file_id() != target_file_id(): return except (OSError, ValueError): return err = OSError(EINVAL, "Source and target are the same file") err.filename = str(source) err.filename2 = str(target) raise err def copy_info(info, target, follow_symlinks=True): """Copy metadata from the given PathInfo to the given local path.""" copy_times_ns = ( hasattr(info, '_access_time_ns') and hasattr(info, '_mod_time_ns') and (follow_symlinks or os.utime in os.supports_follow_symlinks)) if copy_times_ns: t0 = info._access_time_ns(follow_symlinks=follow_symlinks) t1 = info._mod_time_ns(follow_symlinks=follow_symlinks) os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks) # We must copy extended attributes before the file is (potentially) # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. copy_xattrs = ( hasattr(info, '_xattrs') and hasattr(os, 'setxattr') and (follow_symlinks or os.setxattr in os.supports_follow_symlinks)) if copy_xattrs: xattrs = info._xattrs(follow_symlinks=follow_symlinks) for attr, value in xattrs: try: os.setxattr(target, attr, value, follow_symlinks=follow_symlinks) except OSError as e: if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): raise copy_posix_permissions = ( hasattr(info, '_posix_permissions') and (follow_symlinks or os.chmod in os.supports_follow_symlinks)) if copy_posix_permissions: posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks) try: os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks) except NotImplementedError: # if we got a NotImplementedError, it's because # * follow_symlinks=False, # * lchown() is unavailable, and # * either # * fchownat() is unavailable or # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. # (it returned ENOSUP.) # therefore we're out of options--we simply cannot chown the # symlink. give up, suppress the error. # (which is what shutil always did in this circumstance.) pass copy_bsd_flags = ( hasattr(info, '_bsd_flags') and hasattr(os, 'chflags') and (follow_symlinks or os.chflags in os.supports_follow_symlinks)) if copy_bsd_flags: bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks) try: os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks) except OSError as why: if why.errno not in (EOPNOTSUPP, ENOTSUP): raise class _PathInfoBase: __slots__ = ('_path', '_stat_result', '_lstat_result') def __init__(self, path): self._path = str(path) def __repr__(self): path_type = "WindowsPath" if os.name == "nt" else "PosixPath" return f"<{path_type}.info>" def _stat(self, *, follow_symlinks=True, ignore_errors=False): """Return the status as an os.stat_result, or None if stat() fails and ignore_errors is true.""" if follow_symlinks: try: result = self._stat_result except AttributeError: pass else: if ignore_errors or result is not None: return result try: self._stat_result = os.stat(self._path) except (OSError, ValueError): self._stat_result = None if not ignore_errors: raise return self._stat_result else: try: result = self._lstat_result except AttributeError: pass else: if ignore_errors or result is not None: return result try: self._lstat_result = os.lstat(self._path) except (OSError, ValueError): self._lstat_result = None if not ignore_errors: raise return self._lstat_result def _posix_permissions(self, *, follow_symlinks=True): """Return the POSIX file permissions.""" return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode) def _file_id(self, *, follow_symlinks=True): """Returns the identifier of the file.""" st = self._stat(follow_symlinks=follow_symlinks) return st.st_dev, st.st_ino def _access_time_ns(self, *, follow_symlinks=True): """Return the access time in nanoseconds.""" return self._stat(follow_symlinks=follow_symlinks).st_atime_ns def _mod_time_ns(self, *, follow_symlinks=True): """Return the modify time in nanoseconds.""" return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns if hasattr(os.stat_result, 'st_flags'): def _bsd_flags(self, *, follow_symlinks=True): """Return the flags.""" return self._stat(follow_symlinks=follow_symlinks).st_flags if hasattr(os, 'listxattr'): def _xattrs(self, *, follow_symlinks=True): """Return the xattrs as a list of (attr, value) pairs, or an empty list if extended attributes aren't supported.""" try: return [ (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] except OSError as err: if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): raise return [] class _WindowsPathInfo(_PathInfoBase): """Implementation of pathlib.types.PathInfo that provides status information for Windows paths. Don't try to construct it yourself.""" __slots__ = ('_exists', '_is_dir', '_is_file', '_is_symlink') def exists(self, *, follow_symlinks=True): """Whether this path exists.""" if not follow_symlinks and self.is_symlink(): return True try: return self._exists except AttributeError: if os.path.exists(self._path): self._exists = True return True else: self._exists = self._is_dir = self._is_file = False return False def is_dir(self, *, follow_symlinks=True): """Whether this path is a directory.""" if not follow_symlinks and self.is_symlink(): return False try: return self._is_dir except AttributeError: if os.path.isdir(self._path): self._is_dir = self._exists = True return True else: self._is_dir = False return False def is_file(self, *, follow_symlinks=True): """Whether this path is a regular file.""" if not follow_symlinks and self.is_symlink(): return False try: return self._is_file except AttributeError: if os.path.isfile(self._path): self._is_file = self._exists = True return True else: self._is_file = False return False def is_symlink(self): """Whether this path is a symbolic link.""" try: return self._is_symlink except AttributeError: self._is_symlink = os.path.islink(self._path) return self._is_symlink class _PosixPathInfo(_PathInfoBase): """Implementation of pathlib.types.PathInfo that provides status information for POSIX paths. Don't try to construct it yourself.""" __slots__ = () def exists(self, *, follow_symlinks=True): """Whether this path exists.""" st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) if st is None: return False return True def is_dir(self, *, follow_symlinks=True): """Whether this path is a directory.""" st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) if st is None: return False return S_ISDIR(st.st_mode) def is_file(self, *, follow_symlinks=True): """Whether this path is a regular file.""" st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) if st is None: return False return S_ISREG(st.st_mode) def is_symlink(self): """Whether this path is a symbolic link.""" st = self._stat(follow_symlinks=False, ignore_errors=True) if st is None: return False return S_ISLNK(st.st_mode) PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo class DirEntryInfo(_PathInfoBase): """Implementation of pathlib.types.PathInfo that provides status information by querying a wrapped os.DirEntry object. Don't try to construct it yourself.""" __slots__ = ('_entry',) def __init__(self, entry): super().__init__(entry.path) self._entry = entry def _stat(self, *, follow_symlinks=True, ignore_errors=False): try: return self._entry.stat(follow_symlinks=follow_symlinks) except OSError: if not ignore_errors: raise return None def exists(self, *, follow_symlinks=True): """Whether this path exists.""" if not follow_symlinks: return True return self._stat(ignore_errors=True) is not None def is_dir(self, *, follow_symlinks=True): """Whether this path is a directory.""" try: return self._entry.is_dir(follow_symlinks=follow_symlinks) except OSError: return False def is_file(self, *, follow_symlinks=True): """Whether this path is a regular file.""" try: return self._entry.is_file(follow_symlinks=follow_symlinks) except OSError: return False def is_symlink(self): """Whether this path is a symbolic link.""" try: return self._entry.is_symlink() except OSError: return False