Source code for simple_aws_rds.rds

# -*- coding: utf-8 -*-

"""
Abstract dataclass for RDS DB instance, cluster.
"""

import typing as T
import sys
import enum
import time
import dataclasses
from datetime import datetime

from iterproxy import IterProxy
from func_args import resolve_kwargs, NOTHING

from .vendor.waiter import Waiter
from .exc import StatusError

if T.TYPE_CHECKING:  # pragma: no cover
    from mypy_boto3_rds.client import RDSClient
    from mypy_boto3_rds.type_defs import (
        StartDBInstanceResultTypeDef,
        StopDBInstanceResultTypeDef,
        DeleteDBInstanceResultTypeDef,
        DeleteDBSnapshotResultTypeDef,
    )


[docs]class RDSDBInstanceStatusEnum(str, enum.Enum): """ RDS DB instance status enum. The official document doesn't mention the exact string. But I found it is all lower case slugified. Reference: - https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/accessing-monitoring.html#Overview.DBInstance.Status """ # fmt: off available = "available" backing_up = "backing-up" configuring_enhanced_monitoring = "configuring-enhanced-monitoring" configuring_iam_database_auth = "configuring-iam-database-auth" configuring_log_exports = "configuring-log-exports" converting_to_vpc = "converting-to-vpc" creating = "creating" delete_precheck = "delete-precheck" deleting = "deleting" failed = "failed" inaccessible_encryption_credentials = "inaccessible-encryption-credentials" inaccessible_encryption_credentials_recoverable ="inaccessible-encryption-credentials-recoverable" incompatible_network = "incompatible-network" incompatible_option_group = "incompatible-option-group" incompatible_parameters = "incompatible-parameters" incompatible_restore = "incompatible-restore" insufficient_capacity = "insufficient-capacity" maintenance = "maintenance" modifying = "modifying" moving_to_vpc = "moving-to-vpc" rebooting = "rebooting" resetting_master_credentials = "resetting-master-credentials" renaming = "renaming" restore_error = "restore-error" starting = "starting" stopped = "stopped" stopping = "stopping" storage_full = "storage-full" storage_optimization = "storage-optimization" upgrading = "upgrading"
# fmt: on T_STATUS_ENUM_SET = T.Set[RDSDBInstanceStatusEnum]
[docs]class RDSDBInstanceStatusGroupEnum: """ Group RDS DB instance status enum by semantic. :param ended: The status that the instance can be considered as "stopped", "won't change". :param in_transition: The status that the instance can be considered as "changing". :param impossible_to_become_available: as the name :param impossible_to_become_stopped: as the name """ ended: T_STATUS_ENUM_SET = { RDSDBInstanceStatusEnum.available, RDSDBInstanceStatusEnum.failed, RDSDBInstanceStatusEnum.stopped, RDSDBInstanceStatusEnum.inaccessible_encryption_credentials, RDSDBInstanceStatusEnum.inaccessible_encryption_credentials_recoverable, RDSDBInstanceStatusEnum.incompatible_network, RDSDBInstanceStatusEnum.incompatible_option_group, RDSDBInstanceStatusEnum.incompatible_parameters, RDSDBInstanceStatusEnum.incompatible_restore, RDSDBInstanceStatusEnum.insufficient_capacity, RDSDBInstanceStatusEnum.restore_error, } in_transition: T_STATUS_ENUM_SET = { RDSDBInstanceStatusEnum.backing_up, RDSDBInstanceStatusEnum.configuring_enhanced_monitoring, RDSDBInstanceStatusEnum.configuring_iam_database_auth, RDSDBInstanceStatusEnum.configuring_log_exports, RDSDBInstanceStatusEnum.converting_to_vpc, RDSDBInstanceStatusEnum.creating, RDSDBInstanceStatusEnum.deleting, RDSDBInstanceStatusEnum.maintenance, RDSDBInstanceStatusEnum.modifying, RDSDBInstanceStatusEnum.moving_to_vpc, RDSDBInstanceStatusEnum.rebooting, RDSDBInstanceStatusEnum.resetting_master_credentials, RDSDBInstanceStatusEnum.renaming, RDSDBInstanceStatusEnum.starting, RDSDBInstanceStatusEnum.stopping, RDSDBInstanceStatusEnum.storage_optimization, RDSDBInstanceStatusEnum.upgrading, } impossible_to_become_available: T_STATUS_ENUM_SET = ( { RDSDBInstanceStatusEnum.deleting, RDSDBInstanceStatusEnum.stopping, } .union(ended) .difference( { RDSDBInstanceStatusEnum.available, } ) ) impossible_to_become_stopped: T_STATUS_ENUM_SET = in_transition.union( ended ).difference( { RDSDBInstanceStatusEnum.stopping, RDSDBInstanceStatusEnum.stopped, } )
[docs]@dataclasses.dataclass class RDSDBInstance: """ Represent an RDS DB instance. See attributes explanation at: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/docdb/client/describe_db_instances.html """ # fmt: off id: str = dataclasses.field() status: T.Optional[str] = dataclasses.field(default=None) instance_class: T.Optional[str] = dataclasses.field(default=None) instance_create_time: T.Optional[datetime] = dataclasses.field(default=None) engine: T.Optional[str] = dataclasses.field(default=None) engine_version: T.Optional[str] = dataclasses.field(default=None) endpoint: T.Optional[str] = dataclasses.field(default=None) port: T.Optional[int] = dataclasses.field(default=None) hosted_zone_id: T.Optional[str] = dataclasses.field(default=None) master_username: T.Optional[str] = dataclasses.field(default=None) allocated_storage: T.Optional[str] = dataclasses.field(default=None) preferred_backup_window: T.Optional[str] = dataclasses.field(default=None) backup_retention_period: T.Optional[int] = dataclasses.field(default=None) preferred_maintenance_window: T.Optional[str] = dataclasses.field(default=None) latest_restorable_time: T.Optional[datetime] = dataclasses.field(default=None) multi_az: T.Optional[bool] = dataclasses.field(default=None) auto_minor_version_upgrade: T.Optional[bool] = dataclasses.field(default=None) replica_mode: T.Optional[str] = dataclasses.field(default=None) license_model: T.Optional[str] = dataclasses.field(default=None) iops: T.Optional[int] = dataclasses.field(default=None) character_set_name: T.Optional[str] = dataclasses.field(default=None) secondary_availability_zone: T.Optional[str] = dataclasses.field(default=None) publicly_accessible: T.Optional[bool] = dataclasses.field(default=None) storage_type: T.Optional[str] = dataclasses.field(default=None) tde_credential_arn: T.Optional[str] = dataclasses.field(default=None) db_instance_port: T.Optional[int] = dataclasses.field(default=None) db_cluster_identifier: T.Optional[str] = dataclasses.field(default=None) storage_encrypted: T.Optional[bool] = dataclasses.field(default=None) kms_key_id: T.Optional[str] = dataclasses.field(default=None) dbi_resource_id: T.Optional[str] = dataclasses.field(default=None) ca_certificate_identifier: T.Optional[str] = dataclasses.field(default=None) copy_tags_to_snapshot: T.Optional[bool] = dataclasses.field(default=None) monitoring_interval: T.Optional[int] = dataclasses.field(default=None) enhanced_monitoring_resource_arn: T.Optional[str] = dataclasses.field(default=None) monitoring_role_arn: T.Optional[str] = dataclasses.field(default=None) promotion_tier: T.Optional[int] = dataclasses.field(default=None) db_instance_arn: T.Optional[str] = dataclasses.field(default=None) timezone: T.Optional[str] = dataclasses.field(default=None) iam_database_authentication_enabled: T.Optional[bool] = dataclasses.field(default=None) performance_insights_enabled: T.Optional[bool] = dataclasses.field(default=None) performance_insights_kms_key_id: T.Optional[str] = dataclasses.field(default=None) performance_insights_retention_period: T.Optional[int] = dataclasses.field(default=None) deletion_protection: T.Optional[bool] = dataclasses.field(default=None) max_allocated_storage: T.Optional[int] = dataclasses.field(default=None) customer_owned_ip_enabled: T.Optional[bool] = dataclasses.field(default=None) aws_backup_recovery_point_arn: T.Optional[str] = dataclasses.field(default=None) activity_stream_status: T.Optional[str] = dataclasses.field(default=None) activity_stream_kms_key_id: T.Optional[str] = dataclasses.field(default=None) activity_stream_kinesis_stream_name: T.Optional[str] = dataclasses.field(default=None) activity_stream_mode: T.Optional[str] = dataclasses.field(default=None) activity_stream_engine_native_audit_fields_included: T.Optional[bool] = dataclasses.field(default=None) automation_mode: T.Optional[str] = dataclasses.field(default=None) resume_full_automation_mode_time: T.Optional[datetime] = dataclasses.field(default=None) custom_iam_instance_profile: T.Optional[str] = dataclasses.field(default=None) backup_target: T.Optional[str] = dataclasses.field(default=None) network_type: T.Optional[str] = dataclasses.field(default=None) activity_stream_policy_status: T.Optional[str] = dataclasses.field(default=None) storage_throughput: T.Optional[int] = dataclasses.field(default=None) db_system_id: T.Optional[str] = dataclasses.field(default=None) read_replica_source_db_cluster_identifier: T.Optional[str] = dataclasses.field(default=None) vpc_id: T.Optional[str] = dataclasses.field(default=None) subnet_ids: T.List[str] = dataclasses.field(default_factory=list) subnet_group_name: T.Optional[str] = dataclasses.field(default=None) subnet_group_description: T.Optional[str] = dataclasses.field(default=None) subnet_group_arn: T.Optional[str] = dataclasses.field(default=None) subnet_group_status: T.Optional[str] = dataclasses.field(default=None) security_groups: T.List[T.Dict[str, str]] = dataclasses.field(default_factory=list) availability_zone: T.Optional[str] = dataclasses.field(default=None) tags: T.Dict[str, str] = dataclasses.field(default_factory=dict) data: T.Dict[str, T.Any] = dataclasses.field(default_factory=dict) # fmt: on
[docs] @classmethod def from_dict(cls, dct: dict): """ Create an RDS DB instance object from the ``describe_db_instances`` API response. """ # fmt: off return cls( id=dct["DBInstanceIdentifier"], status=dct.get("DBInstanceStatus"), instance_class=dct.get("DBInstanceClass"), instance_create_time=dct.get("InstanceCreateTime"), engine=dct.get("Engine"), engine_version=dct.get("EngineVersion"), endpoint=dct.get("Endpoint", {}).get("Address"), port=dct.get("Endpoint", {}).get("Port"), hosted_zone_id=dct.get("Endpoint", {}).get("HostedZoneId"), master_username=dct.get("MasterUsername"), allocated_storage=dct.get("AllocatedStorage"), preferred_backup_window=dct.get("PreferredBackupWindow"), backup_retention_period=dct.get("BackupRetentionPeriod"), preferred_maintenance_window=dct.get("PreferredMaintenanceWindow"), latest_restorable_time=dct.get("LatestRestorableTime"), multi_az=dct.get("MultiAZ"), auto_minor_version_upgrade=dct.get("AutoMinorVersionUpgrade"), replica_mode=dct.get("ReplicaMode"), license_model=dct.get("LicenseModel"), iops=dct.get("Iops"), character_set_name=dct.get("CharacterSetName"), secondary_availability_zone=dct.get("SecondaryAvailabilityZone"), publicly_accessible=dct.get("PubliclyAccessible"), storage_type=dct.get("StorageType"), tde_credential_arn=dct.get("TdeCredentialArn"), db_instance_port=dct.get("DbInstancePort"), db_cluster_identifier=dct.get("DBClusterIdentifier"), storage_encrypted=dct.get("StorageEncrypted"), kms_key_id=dct.get("KmsKeyId"), dbi_resource_id=dct.get("DbiResourceId"), ca_certificate_identifier=dct.get("CACertificateIdentifier"), copy_tags_to_snapshot=dct.get("CopyTagsToSnapshot"), monitoring_interval=dct.get("MonitoringInterval"), enhanced_monitoring_resource_arn=dct.get("EnhancedMonitoringResourceArn"), monitoring_role_arn=dct.get("MonitoringRoleArn"), promotion_tier=dct.get("PromotionTier"), db_instance_arn=dct.get("DBInstanceArn"), timezone=dct.get("Timezone"), iam_database_authentication_enabled=dct.get("IAMDatabaseAuthenticationEnabled"), performance_insights_enabled=dct.get("PerformanceInsightsEnabled"), performance_insights_kms_key_id=dct.get("PerformanceInsightsKMSKeyId"), performance_insights_retention_period=dct.get("PerformanceInsightsRetentionPeriod"), deletion_protection=dct.get("DeletionProtection"), max_allocated_storage=dct.get("MaxAllocatedStorage"), customer_owned_ip_enabled=dct.get("CustomerOwnedIpEnabled"), aws_backup_recovery_point_arn=dct.get("AwsBackupRecoveryPointArn"), activity_stream_status=dct.get("ActivityStreamStatus"), activity_stream_kms_key_id=dct.get("ActivityStreamKmsKeyId"), activity_stream_kinesis_stream_name=dct.get("ActivityStreamKinesisStreamName"), activity_stream_mode=dct.get("ActivityStreamMode"), activity_stream_engine_native_audit_fields_included=dct.get("ActivityStreamEngineNativeAuditFieldsIncluded"), automation_mode=dct.get("AutomationMode"), resume_full_automation_mode_time=dct.get("ResumeFullAutomationModeTime"), custom_iam_instance_profile=dct.get("CustomIamInstanceProfile"), backup_target=dct.get("BackupTarget"), network_type=dct.get("NetworkType"), activity_stream_policy_status=dct.get("ActivityStreamPolicyStatus"), storage_throughput=dct.get("StorageThroughput"), db_system_id=dct.get("DBSystemId"), read_replica_source_db_cluster_identifier=dct.get("ReadReplicaSourceDBClusterIdentifier"), vpc_id=dct.get("DBSubnetGroup", {}).get("VpcId"), subnet_ids=[ kv["SubnetIdentifier"] for kv in dct.get("DBSubnetGroup", {}).get("Subnets", []) ], subnet_group_name=dct.get("DBSubnetGroup", {}).get("DBSubnetGroupName"), subnet_group_description=dct.get("DBSubnetGroup", {}).get("DBSubnetGroupDescription"), subnet_group_arn=dct.get("DBSubnetGroup", {}).get("DBSubnetGroupArn"), subnet_group_status=dct.get("DBSubnetGroup", {}).get("SubnetGroupStatus"), security_groups=dct.get("DBSecurityGroups", []), availability_zone=dct.get("AvailabilityZone"), tags={d["Key"]: d["Value"] for d in dct.get("TagList", [])}, data=dct, )
# fmt: on # status checking methods human intuitive status check def is_available(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.available.value def is_backing_up(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.backing_up.value def is_configuring_enhanced_monitoring(self) -> bool: # pragma: no cover """ """ return ( self.status == RDSDBInstanceStatusEnum.configuring_enhanced_monitoring.value ) def is_configuring_iam_database_auth(self) -> bool: # pragma: no cover """ """ return ( self.status == RDSDBInstanceStatusEnum.configuring_iam_database_auth.value ) def is_configuring_log_exports(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.configuring_log_exports.value def is_converting_to_vpc(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.converting_to_vpc.value def is_creating(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.creating.value def is_delete_precheck(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.delete_precheck.value def is_deleting(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.deleting.value def is_failed(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.failed.value def is_inaccessible_encryption_credentials(self) -> bool: # pragma: no cover """ """ return ( self.status == RDSDBInstanceStatusEnum.inaccessible_encryption_credentials.value ) def is_inaccessible_encryption_credentials_recoverable( self, ) -> bool: # pragma: no cover """ """ return ( self.status == RDSDBInstanceStatusEnum.inaccessible_encryption_credentials_recoverable.value ) def is_incompatible_network(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.incompatible_network.value def is_incompatible_option_group(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.incompatible_option_group.value def is_incompatible_parameters(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.incompatible_parameters.value def is_incompatible_restore(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.incompatible_restore.value def is_insufficient_capacity(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.insufficient_capacity.value def is_maintenance(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.maintenance.value def is_modifying(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.modifying.value def is_moving_to_vpc(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.moving_to_vpc.value def is_rebooting(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.rebooting.value def is_resetting_master_credentials(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.resetting_master_credentials.value def is_renaming(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.renaming.value def is_restore_error(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.restore_error.value def is_starting(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.starting.value def is_stopped(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.stopped.value def is_stopping(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.stopping.value def is_storage_full(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.storage_full.value def is_storage_optimization(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.storage_optimization.value def is_upgrading(self) -> bool: # pragma: no cover """ """ return self.status == RDSDBInstanceStatusEnum.upgrading.value # more human intuitive status check def is_ready_to_start(self) -> bool: """ """ return self.is_stopped() def is_ready_to_stop(self) -> bool: """ """ return self.is_available() def is_end(self) -> bool: """ """ return self.status in { status.value for status in RDSDBInstanceStatusGroupEnum.ended } def is_in_transition(self) -> bool: """ """ return self.status in { status.value for status in RDSDBInstanceStatusGroupEnum.in_transition }
[docs] def start_db_instance( self, rds_client: "RDSClient" ) -> "StartDBInstanceResultTypeDef": """ Start the RDS DB instance """ return rds_client.start_db_instance(DBInstanceIdentifier=self.id)
[docs] def stop_db_instance( self, rds_client: "RDSClient" ) -> "StopDBInstanceResultTypeDef": """ Stop the RDS DB instance """ return rds_client.stop_db_instance(DBInstanceIdentifier=self.id)
[docs] def delete_db_instance( self, rds_client: "RDSClient", skip_final_snapshot: bool = NOTHING, final_db_snapshot_identifier: str = NOTHING, delete_automated_backups: bool = NOTHING, ) -> "DeleteDBInstanceResultTypeDef": """ Delete the RDS DB instance """ return rds_client.delete_db_instance( **resolve_kwargs( DBInstanceIdentifier=self.id, SkipFinalSnapshot=skip_final_snapshot, FinalDBSnapshotIdentifier=final_db_snapshot_identifier, DeleteAutomatedBackups=delete_automated_backups, ) )
[docs] def wait_for_status( self, rds_client: "RDSClient", stop_status: T.Union[RDSDBInstanceStatusEnum, T.List[RDSDBInstanceStatusEnum]], gap: T.Union[int, float] = 1, delays: T.Union[int, float] = 10, timeout: T.Union[int, float] = 300, instant: bool = True, error_status: T.Optional[ T.Union[RDSDBInstanceStatusEnum, T.List[RDSDBInstanceStatusEnum]] ] = None, indent: int = 0, verbose: bool = True, ) -> "RDSDBInstance": # pragma: no cover """ wait until the DB instance reaches the specified status defined in ``stop_status``. If reaches any of ``error_status ``, raise error. :param rds_client: :param stop_status: status to stop waiting :param gap: the time to wait before making first status check :param delays: delay between each check :param timeout: timeout in seconds :param instant: if True, then the first check is instant :param error_status: status to raise error :param indent: indent level for logging :param verbose: whether to print log :return: the :class:`RDSDBInstance` representing the latest status of DB instance. """ if isinstance(stop_status, RDSDBInstanceStatusEnum): stop_status_set = {stop_status.value} else: stop_status_set = {status.value for status in stop_status} if error_status is None: error_status_set = set() elif isinstance(error_status, RDSDBInstanceStatusEnum): error_status_set = {error_status.value} else: error_status_set = {status.value for status in error_status} if gap: time.sleep(gap) for attempt, elapse in Waiter( delays=delays, timeout=timeout, instant=instant, indent=indent, verbose=verbose, ): db_inst = self.from_id(rds_client, self.id) if db_inst.status in stop_status_set: if verbose: sys.stdout.write("\n") return db_inst elif db_inst.status in error_status_set: raise StatusError(f"stop because status reaches {db_inst.status!r}") else: pass
[docs] def wait_for_available( self, rds_client: "RDSClient", gap: T.Union[int, float] = 1, delays: T.Union[int, float] = 10, timeout: T.Union[int, float] = 300, instant: bool = True, indent: int = 0, verbose: bool = True, ) -> "RDSDBInstance": # pragma: no cover """ Similar to :meth:`RDSDBInstance.wait_for_status`, but wait for DB instance to reach "available" status. """ return self.wait_for_status( rds_client=rds_client, stop_status=RDSDBInstanceStatusEnum.available, gap=gap, delays=delays, timeout=timeout, instant=instant, error_status=list( RDSDBInstanceStatusGroupEnum.impossible_to_become_available ), indent=indent, verbose=verbose, )
[docs] def wait_for_stopped( self, rds_client: "RDSClient", gap: T.Union[int, float] = 1, delays: T.Union[int, float] = 10, timeout: T.Union[int, float] = 300, instant: bool = True, indent: int = 0, verbose: bool = True, ) -> "RDSDBInstance": # pragma: no cover """ Similar to :meth:`RDSDBInstance.wait_for_status`, but wait for DB instance to reach "stopped" status. """ return self.wait_for_status( rds_client=rds_client, stop_status=RDSDBInstanceStatusEnum.stopped, gap=gap, delays=delays, timeout=timeout, instant=instant, error_status=list( RDSDBInstanceStatusGroupEnum.impossible_to_become_stopped ), indent=indent, verbose=verbose, )
# -------------------------------------------------------------------------- # more constructor methods # -------------------------------------------------------------------------- @classmethod def _yield_dict_from_describe_db_instances_response( cls, res: dict, ) -> T.Iterable["RDSDBInstance"]: db_instances = res.get("DBInstances", []) if len(db_instances): for db_instance_dict in db_instances: yield cls.from_dict(db_instance_dict)
[docs] @classmethod def query( cls, rds_client: "RDSClient", db_instance_identifier: str = NOTHING, filters: T.List[dict] = NOTHING, ) -> "RDSDBInstanceIterProxy": """ A wrapper around ``rds_client.describe_db_instances``. Multiple filters join with logic "AND", multiple values in a filter join with logic "OR". """ def run(): paginator = rds_client.get_paginator("describe_db_instances") kwargs = resolve_kwargs( DBInstanceIdentifier=db_instance_identifier, Filters=filters, PaginationConfig={ "MaxItems": 9999, "PageSize": 100, }, ) if db_instance_identifier is not NOTHING: del kwargs["PaginationConfig"] response_iterator = paginator.paginate(**kwargs) for response in response_iterator: yield from cls._yield_dict_from_describe_db_instances_response(response) return RDSDBInstanceIterProxy(run())
[docs] @classmethod def from_id( cls, rds_client: "RDSClient", db_identifier: str, ) -> T.Optional["RDSDBInstance"]: """ Get RDS DB instance details by it's id. """ return cls.query( rds_client, db_instance_identifier=db_identifier, ).one_or_none()
[docs] @classmethod def from_tag_key_value( cls, rds_client: "RDSClient", key: str, value: str, ) -> "RDSDBInstanceIterProxy": """ Query RDS DB Instance by tag key and value. This function only support single key, value pair filtering, if you want more advanced filtering, you can use the ``query`` method and do in-memory filtering. :param key: tag key :param value: tag value """ def run(): for db_inst in cls.query(rds_client): if db_inst.tags.get(key, "THIS_IS_IMPOSSIBLE_TO_MATCH") == value: yield db_inst return RDSDBInstanceIterProxy(run())
[docs]class RDSDBInstanceIterProxy(IterProxy[RDSDBInstance]): """ IterProxy for :class:`RDSDBInstance`. """ pass
# TODO: implement RDS DB Cluster # @dataclasses.dataclass # class RDSDBCluster: # """ # DB Cluster is only for Aurora DB. # """ # pass # class RDSDBClusterIterProxy(IterProxy[RDSDBCluster]): # """ # IterProxy for RDSDBCluster. # """ # # pass
[docs]class RDSDBSnapshotStatusEnum(str, enum.Enum): """ RDS DB snapshot status enum. The official document doesn't mention the exact string. But I found it is all lower case slugified. Reference: - https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateSnapshot.html """ creating = "creating" available = "available"
[docs]@dataclasses.dataclass class RDSDBSnapshot: """ Represent an RDS DB instance. See attributes explanation at: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/describe_db_snapshots.html """ # fmt: off db_snapshot_identifier: str = dataclasses.field() db_instance_identifier: T.Optional[str] = dataclasses.field(default=None) snapshot_create_time: T.Optional[datetime] = dataclasses.field(default=None) engine: T.Optional[str] = dataclasses.field(default=None) allocated_storage: T.Optional[int] = dataclasses.field(default=None) status: T.Optional[str] = dataclasses.field(default=None) port: T.Optional[int] = dataclasses.field(default=None) availability_zone: T.Optional[str] = dataclasses.field(default=None) vpc_id: T.Optional[str] = dataclasses.field(default=None) instance_create_time: T.Optional[datetime] = dataclasses.field(default=None) master_username: T.Optional[str] = dataclasses.field(default=None) engine_version: T.Optional[str] = dataclasses.field(default=None) license_model: T.Optional[str] = dataclasses.field(default=None) snapshot_type: T.Optional[str] = dataclasses.field(default=None) iops: T.Optional[str] = dataclasses.field(default=None) option_group_name: T.Optional[str] = dataclasses.field(default=None) percent_progress: T.Optional[int] = dataclasses.field(default=None) source_region: T.Optional[str] = dataclasses.field(default=None) source_db_snapshot_identifier: T.Optional[str] = dataclasses.field(default=None) storage_type: T.Optional[str] = dataclasses.field(default=None) tde_credential_arn: T.Optional[str] = dataclasses.field(default=None) encrypted: T.Optional[bool] = dataclasses.field(default=None) kms_key_id: T.Optional[str] = dataclasses.field(default=None) db_snapshot_arn: T.Optional[str] = dataclasses.field(default=None) timezone: T.Optional[str] = dataclasses.field(default=None) iam_database_authentication_enabled: T.Optional[bool] = dataclasses.field(default=None) processor_features: T.List[T.Dict[str, str]] = dataclasses.field(default_factory=list) dbi_resource_id: T.Optional[str] = dataclasses.field(default=None) tags: T.Dict[str, str] = dataclasses.field(default_factory=dict) original_snapshot_create_time: T.Optional[datetime] = dataclasses.field(default=None) snapshot_database_time: T.Optional[datetime] = dataclasses.field(default=None) snapshot_target: T.Optional[str] = dataclasses.field(default=None) storage_throughput: T.Optional[str] = dataclasses.field(default=None) db_system_id: T.Optional[str] = dataclasses.field(default=None) dedicated_log_volume: T.Optional[str] = dataclasses.field(default=None) multi_tenant: T.Optional[str] = dataclasses.field(default=None) data: T.Dict[str, T.Any] = dataclasses.field(default_factory=dict) # fmt: on @classmethod def from_dict(cls, dct: dict): # fmt: off return cls( db_snapshot_identifier=dct.get("DBSnapshotIdentifier"), db_instance_identifier=dct.get("DBInstanceIdentifier"), snapshot_create_time=dct.get("SnapshotCreateTime"), engine=dct.get("Engine"), allocated_storage=dct.get("AllocatedStorage"), status=dct.get("Status"), port=dct.get("Port"), availability_zone=dct.get("AvailabilityZone"), vpc_id=dct.get("VpcId"), instance_create_time=dct.get("InstanceCreateTime"), master_username=dct.get("MasterUsername"), engine_version=dct.get("EngineVersion"), license_model=dct.get("LicenseModel"), snapshot_type=dct.get("SnapshotType"), iops=dct.get("Iops"), option_group_name=dct.get("OptionGroupName"), percent_progress=dct.get("PercentProgress"), source_region=dct.get("SourceRegion"), source_db_snapshot_identifier=dct.get("SourceDBSnapshotIdentifier"), storage_type=dct.get("StorageType"), tde_credential_arn=dct.get("TdeCredentialArn"), encrypted=dct.get("Encrypted"), kms_key_id=dct.get("KmsKeyId"), db_snapshot_arn=dct.get("DBSnapshotArn"), timezone=dct.get("Timezone"), iam_database_authentication_enabled=dct.get("IAMDatabaseAuthenticationEnabled"), processor_features=dct.get("ProcessorFeatures", []), dbi_resource_id=dct.get("DbiResourceId"), tags={d["Key"]: d["Value"] for d in dct.get("TagList", [])}, original_snapshot_create_time=dct.get("OriginalSnapshotCreateTime"), snapshot_database_time=dct.get("SnapshotDatabaseTime"), snapshot_target=dct.get("SnapshotTarget"), storage_throughput=dct.get("StorageThroughput"), db_system_id=dct.get("DBSystemId"), dedicated_log_volume=dct.get("DedicatedLogVolume"), multi_tenant=dct.get("MultiTenant"), data=dct.get(""), ) # fmt: on
[docs] def is_creating(self) -> bool: # pragma: no cover """ Check if the snapshot is in creating status. """ return self.status == RDSDBSnapshotStatusEnum.creating.value
[docs] def is_available(self) -> bool: # pragma: no cover """ Check if the snapshot is in available status. """ return self.status == RDSDBSnapshotStatusEnum.available.value
[docs] def wait_for_status( self, rds_client: "RDSClient", stop_status: T.Union[RDSDBSnapshotStatusEnum, T.List[RDSDBSnapshotStatusEnum]], gap: T.Union[int, float] = 1, delays: T.Union[int, float] = 10, timeout: T.Union[int, float] = 300, instant: bool = True, error_status: T.Optional[ T.Union[RDSDBSnapshotStatusEnum, T.List[RDSDBSnapshotStatusEnum]] ] = None, indent: int = 0, verbose: bool = True, ) -> "RDSDBSnapshot": # pragma: no cover """ wait until the DB snapshot reaches the specified status defined in ``stop_status``. If reaches any of ``error_status ``, raise error. :param rds_client: :param stop_status: status to stop waiting :param gap: the time to wait before making first status check :param delays: delay between each check :param timeout: timeout in seconds :param instant: if True, then the first check is instant :param error_status: status to raise error :param indent: indent level for logging :param verbose: whether to print log :return: the :class:`RDSDBInstance` representing the latest status of DB instance. """ if isinstance(stop_status, RDSDBSnapshotStatusEnum): stop_status_set = {stop_status.value} else: stop_status_set = {status.value for status in stop_status} if error_status is None: error_status_set = set() elif isinstance(error_status, RDSDBSnapshotStatusEnum): error_status_set = {error_status.value} else: error_status_set = {status.value for status in error_status} if gap: time.sleep(gap) for attempt, elapse in Waiter( delays=delays, timeout=timeout, instant=instant, indent=indent, verbose=verbose, ): db_snapshot = self.from_id(rds_client, self.db_snapshot_identifier) if db_snapshot.status in stop_status_set: if verbose: sys.stdout.write("\n") return db_snapshot elif db_snapshot.status in error_status_set: raise StatusError(f"stop because status reaches {db_snapshot.status!r}") else: pass
[docs] def wait_for_available( self, rds_client, gap: T.Union[int, float] = 1, delays: T.Union[int, float] = 10, timeout: T.Union[int, float] = 300, instant: bool = True, indent: int = 0, verbose: bool = True, ) -> "RDSDBSnapshot": # pragma: no cover """ Similar to :meth:`RDSDBSnapshot.wait_for_status`, but wait for DB snapshot to reach "available" status. """ return self.wait_for_status( rds_client=rds_client, stop_status=RDSDBSnapshotStatusEnum.available, gap=gap, delays=delays, timeout=timeout, instant=instant, indent=indent, verbose=verbose, )
[docs] def delete_db_snapshot( self, rds_client: "RDSClient", ) -> "DeleteDBSnapshotResultTypeDef": """ Delete the RDS DB instance """ return rds_client.delete_db_snapshot( DBSnapshotIdentifier=self.db_snapshot_identifier, )
# -------------------------------------------------------------------------- # more constructor methods # -------------------------------------------------------------------------- @classmethod def _yield_dict_from_describe_db_snapshots_response( cls, res: dict, ) -> T.Iterable["RDSDBSnapshot"]: db_snapshots = res.get("DBSnapshots", []) if len(db_snapshots): for db_snapshot_dict in db_snapshots: yield cls.from_dict(db_snapshot_dict)
[docs] @classmethod def query( cls, rds_client: "RDSClient", db_instance_identifier: str = NOTHING, db_snapshot_identifier: str = NOTHING, filters: T.List[dict] = NOTHING, snapshot_type: str = NOTHING, include_shared: bool = NOTHING, include_public: bool = NOTHING, dbi_resource_id: str = NOTHING, ) -> "RDSDBSnapshotIterProxy": """ A wrapper around ``rds_client.describe_db_snapshots``. Multiple filters join with logic "AND", multiple values in a filter join with logic "OR". """ def run(): paginator = rds_client.get_paginator("describe_db_snapshots") kwargs = resolve_kwargs( DBInstanceIdentifier=db_instance_identifier, DBSnapshotIdentifier=db_snapshot_identifier, Filters=filters, SnapshotType=snapshot_type, IncludeShared=include_shared, IncludePublic=include_public, DbiResourceId=dbi_resource_id, PaginationConfig={ "MaxItems": 9999, "PageSize": 100, }, ) if db_snapshot_identifier is not NOTHING: del kwargs["PaginationConfig"] response_iterator = paginator.paginate(**kwargs) for response in response_iterator: yield from cls._yield_dict_from_describe_db_snapshots_response(response) return RDSDBSnapshotIterProxy(run())
[docs] @classmethod def from_id( cls, rds_client: "RDSClient", db_snapshot_identifier: str, ) -> T.Optional["RDSDBInstance"]: """ Get RDS DB snapshot details by it's id. """ return cls.query( rds_client, db_snapshot_identifier=db_snapshot_identifier, ).one_or_none()
[docs] @classmethod def from_tag_key_value( cls, rds_client: "RDSClient", key: str, value: str, ) -> "RDSDBSnapshotIterProxy": """ Query RDS DB snapshot by tag key and value. This function only support single key, value pair filtering, if you want more advanced filtering, you can use the ``query`` method and do in-memory filtering. :param key: tag key :param value: tag value """ def run(): for db_snap in cls.query(rds_client): if ( db_snap.tags.get(key, "THIS_IS_IMPOSSIBLE_TO_MATCH") == value ): # pragma: no cover yield db_snap return RDSDBSnapshotIterProxy(run())
[docs]class RDSDBSnapshotIterProxy(IterProxy[RDSDBSnapshot]): """ IterProxy for :class:`RDSDBSnapshot`. """ pass
# TODO: implement RDS DB Cluster # @dataclasses.dataclass # class RDSDBClusterSnapshot: # """ # """ # pass # class RDSDBClusterSnapshotIterProxy(IterProxy[RDSDBClusterSnapshot]): # """ # IterProxy for RDSDBClusterSnapshot. # """ # # pass