Bases: CloudPath
Class for representing and operating on Google Cloud Storage URIs, in the style of the
Python standard library's pathlib
module.
Instances represent a path in GS with filesystem path semantics, and convenient methods allow
for basic operations like joining, reading, writing, iterating over contents, etc. This class
almost entirely mimics the pathlib.Path
interface, so most familiar properties and methods should be available and behave in the
expected way.
The GSClient
class handles authentication with GCP. If a client instance is
not explicitly specified on GSPath
instantiation, a default client is used. See GSClient
's
documentation for more details.
Source code in cloudpathlib/gs/gspath.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 | @register_path_class("gs")
class GSPath(CloudPath):
"""Class for representing and operating on Google Cloud Storage URIs, in the style of the
Python standard library's [`pathlib` module](https://docs.python.org/3/library/pathlib.html).
Instances represent a path in GS with filesystem path semantics, and convenient methods allow
for basic operations like joining, reading, writing, iterating over contents, etc. This class
almost entirely mimics the [`pathlib.Path`](https://docs.python.org/3/library/pathlib.html#pathlib.Path)
interface, so most familiar properties and methods should be available and behave in the
expected way.
The [`GSClient`](../gsclient/) class handles authentication with GCP. If a client instance is
not explicitly specified on `GSPath` instantiation, a default client is used. See `GSClient`'s
documentation for more details.
"""
cloud_prefix: str = "gs://"
client: "GSClient"
@property
def drive(self) -> str:
return self.bucket
def mkdir(self, parents=False, exist_ok=False):
# not possible to make empty directory on cloud storage
pass
def touch(self, exist_ok: bool = True):
if self.exists():
if not exist_ok:
raise FileExistsError(f"File exists: {self}")
self.client._move_file(self, self)
else:
tf = TemporaryDirectory()
p = Path(tf.name) / "empty"
p.touch()
self.client._upload_file(p, self)
tf.cleanup()
def stat(self):
meta = self.client._get_metadata(self)
if meta is None:
raise NoStatError(
f"No stats available for {self}; it may be a directory or not exist."
)
try:
mtime = meta["updated"].timestamp()
except KeyError:
mtime = 0
return os.stat_result(
(
None, # mode
None, # ino
self.cloud_prefix, # dev,
None, # nlink,
None, # uid,
None, # gid,
meta.get("size", 0), # size,
None, # atime,
mtime, # mtime,
None, # ctime,
)
)
@property
def bucket(self) -> str:
return self._no_prefix.split("/", 1)[0]
@property
def blob(self) -> str:
key = self._no_prefix_no_drive
# key should never have starting slash for
# use with google-cloud-storage, etc.
if key.startswith("/"):
key = key[1:]
return key
@property
def etag(self):
return self.client._get_metadata(self).get("etag")
@property
def md5(self) -> Optional[str]:
meta = self.client._get_metadata(self)
if not meta:
return None
return meta.get("md5_hash", None)
|
Attributes
client: GSClient
instance-attribute
cloud_prefix: str = 'gs://'
class-attribute
instance-attribute
md5: Optional[str]
property
parents: Sequence[Self]
property
parts: Tuple[str, ...]
property
suffixes: List[str]
property
Functions
__init__(cloud_path: Union[str, Self, CloudPath], client: Optional[Client] = None) -> None
Source code in cloudpathlib/cloudpath.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 | def __init__(
self,
cloud_path: Union[str, Self, "CloudPath"],
client: Optional["Client"] = None,
) -> None:
# handle if local file gets opened. must be set at the top of the method in case any code
# below raises an exception, this prevents __del__ from raising an AttributeError
self._handle: Optional[IO] = None
self._client: Optional["Client"] = None
self.is_valid_cloudpath(cloud_path, raise_on_error=True)
self._cloud_meta.validate_completeness()
# versions of the raw string that provide useful methods
self._str = str(cloud_path)
self._url = urlparse(self._str)
self._path = PurePosixPath(f"/{self._no_prefix}")
# setup client
if client is None:
if isinstance(cloud_path, CloudPath):
self._client = cloud_path.client
else:
self._client = client
if client is not None and not isinstance(client, self._cloud_meta.client_class):
raise ClientMismatchError(
f"Client of type [{client.__class__}] is not valid for cloud path of type "
f"[{self.__class__}]; must be instance of [{self._cloud_meta.client_class}], or "
f"None to use default client for this cloud path class."
)
# track if local has been written to, if so it may need to be uploaded
self._dirty = False
|
absolute() -> Self
Source code in cloudpathlib/cloudpath.py
| def absolute(self) -> Self:
return self
|
as_uri() -> str
Source code in cloudpathlib/cloudpath.py
| def as_uri(self) -> str:
return str(self)
|
as_url(presign: bool = False, expire_seconds: int = 60 * 60) -> str
Source code in cloudpathlib/cloudpath.py
| def as_url(self, presign: bool = False, expire_seconds: int = 60 * 60) -> str:
if presign:
url = self.client._generate_presigned_url(self, expire_seconds=expire_seconds)
else:
url = self.client._get_public_url(self)
return url
|
clear_cache()
Removes cache if it exists
Source code in cloudpathlib/cloudpath.py
1234
1235
1236
1237
1238
1239
1240 | def clear_cache(self):
"""Removes cache if it exists"""
if self._local.exists():
if self._local.is_file():
self._local.unlink()
else:
shutil.rmtree(self._local)
|
copy(destination, force_overwrite_to_cloud=None)
copy(
destination: Self,
force_overwrite_to_cloud: Optional[bool] = None,
) -> Self
copy(
destination: Path,
force_overwrite_to_cloud: Optional[bool] = None,
) -> Path
copy(
destination: str,
force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, CloudPath]
Copy self to destination folder of file, if self is a file.
Source code in cloudpathlib/cloudpath.py
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167 | def copy(self, destination, force_overwrite_to_cloud=None):
"""Copy self to destination folder of file, if self is a file."""
if not self.exists() or not self.is_file():
raise ValueError(
f"Path {self} should be a file. To copy a directory tree use the method copytree."
)
# handle string version of cloud paths + local paths
if isinstance(destination, (str, os.PathLike)):
destination = anypath.to_anypath(destination)
if not isinstance(destination, CloudPath):
return self.download_to(destination)
# if same client, use cloud-native _move_file on client to avoid downloading
if self.client is destination.client:
if destination.exists() and destination.is_dir():
destination = destination / self.name
if force_overwrite_to_cloud is None:
force_overwrite_to_cloud = os.environ.get(
"CLOUDPATHLIB_FORCE_OVERWRITE_TO_CLOUD", "False"
).lower() in ["1", "true"]
if (
not force_overwrite_to_cloud
and destination.exists()
and destination.stat().st_mtime >= self.stat().st_mtime
):
raise OverwriteNewerCloudError(
f"File ({destination}) is newer than ({self}). "
f"To overwrite "
f"pass `force_overwrite_to_cloud=True`."
)
return self.client._move_file(self, destination, remove_src=False)
else:
if not destination.exists() or destination.is_file():
return destination.upload_from(
self.fspath, force_overwrite_to_cloud=force_overwrite_to_cloud
)
else:
return (destination / self.name).upload_from(
self.fspath, force_overwrite_to_cloud=force_overwrite_to_cloud
)
|
copytree(destination, force_overwrite_to_cloud=None, ignore=None)
copytree(
destination: Self,
force_overwrite_to_cloud: Optional[bool] = None,
ignore: Optional[
Callable[[str, Iterable[str]], Container[str]]
] = None,
) -> Self
copytree(
destination: Path,
force_overwrite_to_cloud: Optional[bool] = None,
ignore: Optional[
Callable[[str, Iterable[str]], Container[str]]
] = None,
) -> Path
copytree(
destination: str,
force_overwrite_to_cloud: Optional[bool] = None,
ignore: Optional[
Callable[[str, Iterable[str]], Container[str]]
] = None,
) -> Union[Path, CloudPath]
Copy self to a directory, if self is a directory.
Source code in cloudpathlib/cloudpath.py
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232 | def copytree(self, destination, force_overwrite_to_cloud=None, ignore=None):
"""Copy self to a directory, if self is a directory."""
if not self.is_dir():
raise CloudPathNotADirectoryError(
f"Origin path {self} must be a directory. To copy a single file use the method copy."
)
# handle string version of cloud paths + local paths
if isinstance(destination, (str, os.PathLike)):
destination = anypath.to_anypath(destination)
if destination.exists() and destination.is_file():
raise CloudPathFileExistsError(
f"Destination path {destination} of copytree must be a directory."
)
contents = list(self.iterdir())
if ignore is not None:
ignored_names = ignore(self._no_prefix_no_drive, [x.name for x in contents])
else:
ignored_names = set()
destination.mkdir(parents=True, exist_ok=True)
for subpath in contents:
if subpath.name in ignored_names:
continue
if subpath.is_file():
subpath.copy(
destination / subpath.name, force_overwrite_to_cloud=force_overwrite_to_cloud
)
elif subpath.is_dir():
subpath.copytree(
destination / subpath.name,
force_overwrite_to_cloud=force_overwrite_to_cloud,
ignore=ignore,
)
return destination
|
download_to(destination: Union[str, os.PathLike]) -> Path
Source code in cloudpathlib/cloudpath.py
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067 | def download_to(self, destination: Union[str, os.PathLike]) -> Path:
destination = Path(destination)
if not self.exists():
raise CloudPathNotExistsError(f"Cannot download because path does not exist: {self}")
if self.is_file():
if destination.is_dir():
destination = destination / self.name
return self.client._download_file(self, destination)
else:
destination.mkdir(exist_ok=True)
for f in self.iterdir():
rel = str(self)
if not rel.endswith("/"):
rel = rel + "/"
rel_dest = str(f)[len(rel) :]
f.download_to(destination / rel_dest)
return destination
|
exists() -> bool
Source code in cloudpathlib/cloudpath.py
| def exists(self) -> bool:
return self.client._exists(self)
|
from_uri(uri: str) -> Self
classmethod
Source code in cloudpathlib/cloudpath.py
| @classmethod
def from_uri(cls, uri: str) -> Self:
return cls(uri)
|
full_match(pattern: str, case_sensitive: Optional[bool] = None) -> bool
Source code in cloudpathlib/cloudpath.py
935
936
937
938
939
940
941
942
943
944
945
946 | def full_match(self, pattern: str, case_sensitive: Optional[bool] = None) -> bool:
if sys.version_info < (3, 13):
raise NotImplementedError("full_match requires Python 3.13 or higher")
# strip scheme from start of pattern before testing
if pattern.startswith(self.anchor + self.drive):
pattern = pattern[len(self.anchor + self.drive) :]
# remove drive, which is kept on normal dispatch to pathlib
return PurePosixPath(self._no_prefix_no_drive).full_match( # type: ignore[attr-defined]
pattern, case_sensitive=case_sensitive
)
|
glob(pattern: Union[str, os.PathLike], case_sensitive: Optional[bool] = None) -> Generator[Self, None, None]
Source code in cloudpathlib/cloudpath.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528 | def glob(
self, pattern: Union[str, os.PathLike], case_sensitive: Optional[bool] = None
) -> Generator[Self, None, None]:
pattern = self._glob_checks(pattern)
pattern_parts = PurePosixPath(pattern).parts
selector = _make_selector(
tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
)
yield from self._glob(
selector,
"/" in pattern
or "**"
in pattern, # recursive listing needed if explicit ** or any sub folder in pattern
)
|
is_absolute() -> bool
Source code in cloudpathlib/cloudpath.py
| def is_absolute(self) -> bool:
return True
|
is_dir(follow_symlinks=True) -> bool
Source code in cloudpathlib/cloudpath.py
| def is_dir(self, follow_symlinks=True) -> bool:
return self.client._is_file_or_dir(self) == "dir"
|
is_file(follow_symlinks=True) -> bool
Source code in cloudpathlib/cloudpath.py
| def is_file(self, follow_symlinks=True) -> bool:
return self.client._is_file_or_dir(self) == "file"
|
is_junction()
Source code in cloudpathlib/cloudpath.py
| def is_junction(self):
return False # only windows paths can be junctions, not cloudpaths
|
is_relative_to(other: Self) -> bool
Source code in cloudpathlib/cloudpath.py
| def is_relative_to(self, other: Self) -> bool:
try:
self.relative_to(other)
return True
except ValueError:
return False
|
is_valid_cloudpath(path: Union[str, CloudPath], raise_on_error: bool = False) -> Union[bool, TypeGuard[Self]]
classmethod
is_valid_cloudpath(
path: CloudPath, raise_on_error: bool = ...
) -> TypeGuard[Self]
is_valid_cloudpath(
path: str, raise_on_error: bool = ...
) -> bool
Source code in cloudpathlib/cloudpath.py
318
319
320
321
322
323
324
325
326
327
328
329 | @classmethod
def is_valid_cloudpath(
cls, path: Union[str, "CloudPath"], raise_on_error: bool = False
) -> Union[bool, TypeGuard[Self]]:
valid = str(path).lower().startswith(cls.cloud_prefix.lower())
if raise_on_error and not valid:
raise InvalidPrefixError(
f"'{path}' is not a valid path since it does not start with '{cls.cloud_prefix}'"
)
return valid
|
iterdir() -> Generator[Self, None, None]
Source code in cloudpathlib/cloudpath.py
| def iterdir(self) -> Generator[Self, None, None]:
for f, _ in self.client._list_dir(self, recursive=False):
if f != self: # iterdir does not include itself in pathlib
yield f
|
joinpath(*pathsegments: Union[str, os.PathLike]) -> Self
Source code in cloudpathlib/cloudpath.py
| def joinpath(self, *pathsegments: Union[str, os.PathLike]) -> Self:
return self._dispatch_to_path("joinpath", *pathsegments)
|
match(path_pattern: str, case_sensitive: Optional[bool] = None) -> bool
Source code in cloudpathlib/cloudpath.py
948
949
950
951
952
953
954
955
956
957
958 | def match(self, path_pattern: str, case_sensitive: Optional[bool] = None) -> bool:
# strip scheme from start of pattern before testing
if path_pattern.startswith(self.anchor + self.drive + "/"):
path_pattern = path_pattern[len(self.anchor + self.drive + "/") :]
kwargs = dict(case_sensitive=case_sensitive)
if sys.version_info < (3, 12):
kwargs.pop("case_sensitive")
return self._dispatch_to_path("match", path_pattern, **kwargs)
|
mkdir(parents=False, exist_ok=False)
Source code in cloudpathlib/gs/gspath.py
| def mkdir(self, parents=False, exist_ok=False):
# not possible to make empty directory on cloud storage
pass
|
open(mode: str = 'r', buffering: int = -1, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None, force_overwrite_from_cloud: Optional[bool] = None, force_overwrite_to_cloud: Optional[bool] = None) -> IO[Any]
open(
mode: OpenTextMode = "r",
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> TextIOWrapper
open(
mode: OpenBinaryMode,
buffering: Literal[0],
encoding: None = None,
errors: None = None,
newline: None = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> FileIO
open(
mode: OpenBinaryModeUpdating,
buffering: Literal[-1, 1] = -1,
encoding: None = None,
errors: None = None,
newline: None = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> BufferedRandom
open(
mode: OpenBinaryModeWriting,
buffering: Literal[-1, 1] = -1,
encoding: None = None,
errors: None = None,
newline: None = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> BufferedWriter
open(
mode: OpenBinaryModeReading,
buffering: Literal[-1, 1] = -1,
encoding: None = None,
errors: None = None,
newline: None = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> BufferedReader
open(
mode: OpenBinaryMode,
buffering: int = -1,
encoding: None = None,
errors: None = None,
newline: None = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> BinaryIO
open(
mode: str,
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
force_overwrite_from_cloud: Optional[bool] = None,
force_overwrite_to_cloud: Optional[bool] = None,
) -> IO[Any]
Source code in cloudpathlib/cloudpath.py
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751 | def open(
self,
mode: str = "r",
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
force_overwrite_from_cloud: Optional[bool] = None, # extra kwarg not in pathlib
force_overwrite_to_cloud: Optional[bool] = None, # extra kwarg not in pathlib
) -> "IO[Any]":
# if trying to call open on a directory that exists
if self.exists() and not self.is_file():
raise CloudPathIsADirectoryError(
f"Cannot open directory, only files. Tried to open ({self})"
)
if mode == "x" and self.exists():
raise CloudPathFileExistsError(f"Cannot open existing file ({self}) for creation.")
# TODO: consider streaming from client rather than DLing entire file to cache
self._refresh_cache(force_overwrite_from_cloud=force_overwrite_from_cloud)
# create any directories that may be needed if the file is new
if not self._local.exists():
self._local.parent.mkdir(parents=True, exist_ok=True)
original_mtime = 0.0
else:
original_mtime = self._local.stat().st_mtime
buffer = self._local.open(
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
)
# write modes need special on closing the buffer
if any(m in mode for m in ("w", "+", "x", "a")):
# dirty, handle, patch close
wrapped_close = buffer.close
# since we are pretending this is a cloud file, upload it to the cloud
# when the buffer is closed
def _patched_close_upload(*args, **kwargs) -> None:
wrapped_close(*args, **kwargs)
# we should be idempotent and not upload again if
# we already ran our close method patch
if not self._dirty:
return
# original mtime should match what was in the cloud; because of system clocks or rounding
# by the cloud provider, the new version in our cache is "older" than the original version;
# explicitly set the new modified time to be after the original modified time.
if self._local.stat().st_mtime < original_mtime:
new_mtime = original_mtime + 1
os.utime(self._local, times=(new_mtime, new_mtime))
self._upload_local_to_cloud(force_overwrite_to_cloud=force_overwrite_to_cloud)
self._dirty = False
buffer.close = _patched_close_upload # type: ignore
# keep reference in case we need to close when __del__ is called on this object
self._handle = buffer
# opened for write, so mark dirty
self._dirty = True
# if we don't want any cache around, remove the cache
# as soon as the file is closed
if self.client.file_cache_mode == FileCacheMode.close_file:
# this may be _patched_close_upload, in which case we need to
# make sure to call that first so the file gets uploaded
wrapped_close_for_cache = buffer.close
def _patched_close_empty_cache(*args, **kwargs):
wrapped_close_for_cache(*args, **kwargs)
# remove local file as last step on closing
self.clear_cache()
buffer.close = _patched_close_empty_cache # type: ignore
return buffer
|
read_bytes() -> bytes
Source code in cloudpathlib/cloudpath.py
| def read_bytes(self) -> bytes:
with self.open(mode="rb") as f:
return f.read()
|
read_text(encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None) -> str
Source code in cloudpathlib/cloudpath.py
838
839
840
841
842
843
844
845 | def read_text(
self,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
) -> str:
with self.open(mode="r", encoding=encoding, errors=errors, newline=newline) as f:
return f.read()
|
relative_to(other: Self, walk_up: bool = False) -> PurePosixPath
Source code in cloudpathlib/cloudpath.py
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922 | def relative_to(self, other: Self, walk_up: bool = False) -> PurePosixPath:
# We don't dispatch regularly since this never returns a cloud path (since it is relative, and cloud paths are
# absolute)
if not isinstance(other, CloudPath):
raise ValueError(f"{self} is a cloud path, but {other} is not")
if self.cloud_prefix != other.cloud_prefix:
raise ValueError(
f"{self} is a {self.cloud_prefix} path, but {other} is a {other.cloud_prefix} path"
)
kwargs = dict(walk_up=walk_up)
if sys.version_info < (3, 12):
kwargs.pop("walk_up")
return self._path.relative_to(other._path, **kwargs) # type: ignore[call-arg]
|
rename(target: Self) -> Self
Source code in cloudpathlib/cloudpath.py
| def rename(self, target: Self) -> Self:
# for cloud services replace == rename since we don't just rename,
# we actually move files
return self.replace(target)
|
replace(target: Self) -> Self
Source code in cloudpathlib/cloudpath.py
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772 | def replace(self, target: Self) -> Self:
if type(self) is not type(target):
raise TypeError(
f"The target based to rename must be an instantiated class of type: {type(self)}"
)
if self.is_dir():
raise CloudPathIsADirectoryError(
f"Path {self} is a directory; rename/replace the files recursively."
)
if target == self:
# Request is to replace/rename this with the same path - nothing to do
return self
if target.exists():
target.unlink()
self.client._move_file(self, target)
return target
|
resolve(strict: bool = False) -> Self
Source code in cloudpathlib/cloudpath.py
| def resolve(self, strict: bool = False) -> Self:
return self
|
rglob(pattern: Union[str, os.PathLike], case_sensitive: Optional[bool] = None) -> Generator[Self, None, None]
Source code in cloudpathlib/cloudpath.py
530
531
532
533
534
535
536
537
538
539
540 | def rglob(
self, pattern: Union[str, os.PathLike], case_sensitive: Optional[bool] = None
) -> Generator[Self, None, None]:
pattern = self._glob_checks(pattern)
pattern_parts = PurePosixPath(pattern).parts
selector = _make_selector(
("**",) + tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
)
yield from self._glob(selector, True)
|
rmdir() -> None
Source code in cloudpathlib/cloudpath.py
779
780
781
782
783
784
785
786
787
788
789
790
791 | def rmdir(self) -> None:
if self.is_file():
raise CloudPathNotADirectoryError(
f"Path {self} is a file; call unlink instead of rmdir."
)
try:
next(self.iterdir())
raise DirectoryNotEmptyError(
f"Directory not empty: '{self}'. Use rmtree to delete recursively."
)
except StopIteration:
pass
self.client._remove(self)
|
rmtree() -> None
Delete an entire directory tree.
Source code in cloudpathlib/cloudpath.py
1069
1070
1071
1072
1073
1074
1075 | def rmtree(self) -> None:
"""Delete an entire directory tree."""
if self.is_file():
raise CloudPathNotADirectoryError(
f"Path {self} is a file; call unlink instead of rmtree."
)
self.client._remove(self)
|
samefile(other_path: Union[str, os.PathLike]) -> bool
Source code in cloudpathlib/cloudpath.py
| def samefile(self, other_path: Union[str, os.PathLike]) -> bool:
# all cloud paths are absolute and the paths are used for hash
return self == other_path
|
stat()
Source code in cloudpathlib/gs/gspath.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 | def stat(self):
meta = self.client._get_metadata(self)
if meta is None:
raise NoStatError(
f"No stats available for {self}; it may be a directory or not exist."
)
try:
mtime = meta["updated"].timestamp()
except KeyError:
mtime = 0
return os.stat_result(
(
None, # mode
None, # ino
self.cloud_prefix, # dev,
None, # nlink,
None, # uid,
None, # gid,
meta.get("size", 0), # size,
None, # atime,
mtime, # mtime,
None, # ctime,
)
)
|
touch(exist_ok: bool = True)
Source code in cloudpathlib/gs/gspath.py
39
40
41
42
43
44
45
46
47
48
49
50
51 | def touch(self, exist_ok: bool = True):
if self.exists():
if not exist_ok:
raise FileExistsError(f"File exists: {self}")
self.client._move_file(self, self)
else:
tf = TemporaryDirectory()
p = Path(tf.name) / "empty"
p.touch()
self.client._upload_file(p, self)
tf.cleanup()
|
unlink(missing_ok: bool = True) -> None
Source code in cloudpathlib/cloudpath.py
797
798
799
800
801
802
803 | def unlink(self, missing_ok: bool = True) -> None:
# Note: missing_ok defaults to False in pathlib, but changing the default now would be a breaking change.
if self.is_dir():
raise CloudPathIsADirectoryError(
f"Path {self} is a directory; call rmdir instead of unlink."
)
self.client._remove(self, missing_ok)
|
upload_from(source: Union[str, os.PathLike], force_overwrite_to_cloud: Optional[bool] = None) -> Self
Upload a file or directory to the cloud path.
Source code in cloudpathlib/cloudpath.py
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099 | def upload_from(
self,
source: Union[str, os.PathLike],
force_overwrite_to_cloud: Optional[bool] = None,
) -> Self:
"""Upload a file or directory to the cloud path."""
source = Path(source)
if source.is_dir():
for p in source.iterdir():
(self / p.name).upload_from(p, force_overwrite_to_cloud=force_overwrite_to_cloud)
return self
else:
if self.exists() and self.is_dir():
dst = self / source.name
else:
dst = self
dst._upload_file_to_cloud(source, force_overwrite_to_cloud=force_overwrite_to_cloud)
return dst
|
validate(v: str) -> Self
classmethod
Used as a Pydantic validator. See
https://docs.pydantic.dev/2.0/usage/types/custom/
Source code in cloudpathlib/cloudpath.py
| @classmethod
def validate(cls, v: str) -> Self:
"""Used as a Pydantic validator. See
https://docs.pydantic.dev/2.0/usage/types/custom/"""
return cls(v)
|
walk(top_down: bool = True, on_error: Optional[Callable] = None, follow_symlinks: bool = False) -> Generator[Tuple[Self, List[str], List[str]], None, None]
Source code in cloudpathlib/cloudpath.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580 | def walk(
self,
top_down: bool = True,
on_error: Optional[Callable] = None,
follow_symlinks: bool = False,
) -> Generator[Tuple[Self, List[str], List[str]], None, None]:
try:
file_tree = self._build_subtree(recursive=True) # walking is always recursive
yield from self._walk_results_from_tree(self, file_tree, top_down=top_down)
except Exception as e:
if on_error is not None:
on_error(e)
else:
raise
|
with_name(name: str) -> Self
Source code in cloudpathlib/cloudpath.py
| def with_name(self, name: str) -> Self:
return self._dispatch_to_path("with_name", name)
|
with_segments(*pathsegments) -> Self
Create a new CloudPath with the same client out of the given segments.
The first segment will be interpreted as the bucket/container name.
Source code in cloudpathlib/cloudpath.py
| def with_segments(self, *pathsegments) -> Self:
"""Create a new CloudPath with the same client out of the given segments.
The first segment will be interpreted as the bucket/container name.
"""
return self._new_cloudpath("/".join(pathsegments))
|
with_stem(stem: str) -> Self
Source code in cloudpathlib/cloudpath.py
| def with_stem(self, stem: str) -> Self:
try:
return self._dispatch_to_path("with_stem", stem)
except AttributeError:
# with_stem was only added in python 3.9, so we fallback for compatibility
return self.with_name(stem + self.suffix)
|
with_suffix(suffix: str) -> Self
Source code in cloudpathlib/cloudpath.py
| def with_suffix(self, suffix: str) -> Self:
return self._dispatch_to_path("with_suffix", suffix)
|
write_bytes(data: bytes) -> int
Open the file in bytes mode, write to it, and close the file.
NOTE: vendored from pathlib since we override open
https://github.com/python/cpython/blob/3.8/Lib/pathlib.py#L1235-L1242
Source code in cloudpathlib/cloudpath.py
805
806
807
808
809
810
811
812
813
814 | def write_bytes(self, data: bytes) -> int:
"""Open the file in bytes mode, write to it, and close the file.
NOTE: vendored from pathlib since we override open
https://github.com/python/cpython/blob/3.8/Lib/pathlib.py#L1235-L1242
"""
# type-check for the buffer interface before truncating the file
view = memoryview(data)
with self.open(mode="wb") as f:
return f.write(view)
|
write_text(data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None) -> int
Open the file in text mode, write to it, and close the file.
NOTE: vendored from pathlib since we override open
https://github.com/python/cpython/blob/3.10/Lib/pathlib.py#L1146-L1155
Source code in cloudpathlib/cloudpath.py
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832 | def write_text(
self,
data: str,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
) -> int:
"""Open the file in text mode, write to it, and close the file.
NOTE: vendored from pathlib since we override open
https://github.com/python/cpython/blob/3.10/Lib/pathlib.py#L1146-L1155
"""
if not isinstance(data, str):
raise TypeError("data must be str, not %s" % data.__class__.__name__)
with self.open(mode="w", encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)
|