Merge pull request #2468 from bblommers/remove-dead-code

Remove dead code
This commit is contained in:
Steve Pulec 2019-10-10 16:58:29 -05:00 committed by GitHub
commit b60f720f0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 3 additions and 117 deletions

View File

@ -280,14 +280,6 @@ class LambdaFunction(BaseModel):
except Exception:
return s
@staticmethod
def is_json(test_str):
try:
response = json.loads(test_str)
except Exception:
response = test_str
return response
def _invoke_lambda(self, code, event=None, context=None):
# TODO: context not yet implemented
if event is None:

View File

@ -799,21 +799,6 @@ class ConditionExpressionParser:
else: # pragma: no cover
raise ValueError("Unknown expression node kind %r" % node.kind)
def _print_debug(self, nodes): # pragma: no cover
print('ROOT')
for node in nodes:
self._print_node_recursive(node, depth=1)
def _print_node_recursive(self, node, depth=0): # pragma: no cover
if len(node.children) > 0:
print(' ' * depth, node.nonterminal, node.kind)
for child in node.children:
self._print_node_recursive(child, depth=depth + 1)
else:
print(' ' * depth, node.nonterminal, node.kind, node.value)
def _assert(self, condition, message, nodes):
if not condition:
raise ValueError(message + " " + " ".join([t.text for t in nodes]))

View File

@ -877,7 +877,7 @@ class Table(BaseModel):
exclusive_start_key, index_name)
return results, scanned_count, last_evaluated_key
def _trim_results(self, results, limit, exclusive_start_key, scaned_index=None):
def _trim_results(self, results, limit, exclusive_start_key, scanned_index=None):
if exclusive_start_key is not None:
hash_key = DynamoType(exclusive_start_key.get(self.hash_key_attr))
range_key = exclusive_start_key.get(self.range_key_attr)
@ -897,10 +897,10 @@ class Table(BaseModel):
if results[-1].range_key is not None:
last_evaluated_key[self.range_key_attr] = results[-1].range_key
if scaned_index:
if scanned_index:
all_indexes = self.all_indexes()
indexes_by_name = dict((i['IndexName'], i) for i in all_indexes)
idx = indexes_by_name[scaned_index]
idx = indexes_by_name[scanned_index]
idx_col_list = [i['AttributeName'] for i in idx['KeySchema']]
for col in idx_col_list:
last_evaluated_key[col] = results[-1].attrs[col]

View File

@ -1,7 +1,5 @@
from __future__ import unicode_literals
import datetime
import boto.rds
from jinja2 import Template
@ -14,95 +12,6 @@ from moto.rds2.models import rds2_backends
class Database(BaseModel):
def __init__(self, **kwargs):
self.status = "available"
self.is_replica = False
self.replicas = []
self.region = kwargs.get('region')
self.engine = kwargs.get("engine")
self.engine_version = kwargs.get("engine_version")
if self.engine_version is None:
self.engine_version = "5.6.21"
self.iops = kwargs.get("iops")
self.storage_encrypted = kwargs.get("storage_encrypted", False)
if self.storage_encrypted:
self.kms_key_id = kwargs.get("kms_key_id", "default_kms_key_id")
else:
self.kms_key_id = kwargs.get("kms_key_id")
self.storage_type = kwargs.get("storage_type")
self.master_username = kwargs.get('master_username')
self.master_password = kwargs.get('master_password')
self.auto_minor_version_upgrade = kwargs.get(
'auto_minor_version_upgrade')
if self.auto_minor_version_upgrade is None:
self.auto_minor_version_upgrade = True
self.allocated_storage = kwargs.get('allocated_storage')
self.db_instance_identifier = kwargs.get('db_instance_identifier')
self.source_db_identifier = kwargs.get("source_db_identifier")
self.db_instance_class = kwargs.get('db_instance_class')
self.port = kwargs.get('port')
self.db_name = kwargs.get("db_name")
self.publicly_accessible = kwargs.get("publicly_accessible")
if self.publicly_accessible is None:
self.publicly_accessible = True
self.copy_tags_to_snapshot = kwargs.get("copy_tags_to_snapshot")
if self.copy_tags_to_snapshot is None:
self.copy_tags_to_snapshot = False
self.backup_retention_period = kwargs.get("backup_retention_period")
if self.backup_retention_period is None:
self.backup_retention_period = 1
self.availability_zone = kwargs.get("availability_zone")
self.multi_az = kwargs.get("multi_az")
self.db_subnet_group_name = kwargs.get("db_subnet_group_name")
self.instance_create_time = str(datetime.datetime.utcnow())
if self.db_subnet_group_name:
self.db_subnet_group = rds_backends[
self.region].describe_subnet_groups(self.db_subnet_group_name)[0]
else:
self.db_subnet_group = []
self.security_groups = kwargs.get('security_groups', [])
# PreferredBackupWindow
# PreferredMaintenanceWindow
# backup_retention_period = self._get_param("BackupRetentionPeriod")
# OptionGroupName
# DBParameterGroupName
# VpcSecurityGroupIds.member.N
@property
def db_instance_arn(self):
return "arn:aws:rds:{0}:1234567890:db:{1}".format(
self.region, self.db_instance_identifier)
@property
def physical_resource_id(self):
return self.db_instance_identifier
@property
def address(self):
return "{0}.aaaaaaaaaa.{1}.rds.amazonaws.com".format(self.db_instance_identifier, self.region)
def add_replica(self, replica):
self.replicas.append(replica.db_instance_identifier)
def remove_replica(self, replica):
self.replicas.remove(replica.db_instance_identifier)
def set_as_replica(self):
self.is_replica = True
self.replicas = []
def update(self, db_kwargs):
for key, value in db_kwargs.items():
if value is not None:
setattr(self, key, value)
def get_cfn_attribute(self, attribute_name):
if attribute_name == 'Endpoint.Address':
return self.address