Add support for IoT Domain Configuration (#4648)

This commit is contained in:
Luka 2021-12-09 08:28:02 -05:00 committed by GitHub
parent 194098c7de
commit 0055e12a9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 368 additions and 6 deletions

View File

@ -2711,7 +2711,7 @@
## iot ## iot
<details> <details>
<summary>27% implemented</summary> <summary>29% implemented</summary>
- [ ] accept_certificate_transfer - [ ] accept_certificate_transfer
- [ ] add_thing_to_billing_group - [ ] add_thing_to_billing_group
@ -2735,7 +2735,7 @@
- [ ] create_certificate_from_csr - [ ] create_certificate_from_csr
- [ ] create_custom_metric - [ ] create_custom_metric
- [ ] create_dimension - [ ] create_dimension
- [ ] create_domain_configuration - [X] create_domain_configuration
- [ ] create_dynamic_thing_group - [ ] create_dynamic_thing_group
- [ ] create_fleet_metric - [ ] create_fleet_metric
- [X] create_job - [X] create_job
@ -2765,7 +2765,7 @@
- [X] delete_certificate - [X] delete_certificate
- [ ] delete_custom_metric - [ ] delete_custom_metric
- [ ] delete_dimension - [ ] delete_dimension
- [ ] delete_domain_configuration - [X] delete_domain_configuration
- [ ] delete_dynamic_thing_group - [ ] delete_dynamic_thing_group
- [ ] delete_fleet_metric - [ ] delete_fleet_metric
- [X] delete_job - [X] delete_job
@ -2802,7 +2802,7 @@
- [ ] describe_default_authorizer - [ ] describe_default_authorizer
- [ ] describe_detect_mitigation_actions_task - [ ] describe_detect_mitigation_actions_task
- [ ] describe_dimension - [ ] describe_dimension
- [ ] describe_domain_configuration - [X] describe_domain_configuration
- [X] describe_endpoint - [X] describe_endpoint
- [ ] describe_event_configurations - [ ] describe_event_configurations
- [ ] describe_fleet_metric - [ ] describe_fleet_metric
@ -2860,7 +2860,7 @@
- [ ] list_detect_mitigation_actions_executions - [ ] list_detect_mitigation_actions_executions
- [ ] list_detect_mitigation_actions_tasks - [ ] list_detect_mitigation_actions_tasks
- [ ] list_dimensions - [ ] list_dimensions
- [ ] list_domain_configurations - [X] list_domain_configurations
- [ ] list_fleet_metrics - [ ] list_fleet_metrics
- [ ] list_indices - [ ] list_indices
- [X] list_job_executions_for_job - [X] list_job_executions_for_job
@ -2932,7 +2932,7 @@
- [X] update_certificate - [X] update_certificate
- [ ] update_custom_metric - [ ] update_custom_metric
- [ ] update_dimension - [ ] update_dimension
- [ ] update_domain_configuration - [X] update_domain_configuration
- [ ] update_dynamic_thing_group - [ ] update_dynamic_thing_group
- [ ] update_event_configurations - [ ] update_event_configurations
- [ ] update_fleet_metric - [ ] update_fleet_metric

View File

@ -484,6 +484,62 @@ class FakeRule(BaseModel):
} }
class FakeDomainConfiguration(BaseModel):
def __init__(
self,
region_name,
domain_configuration_name,
domain_name,
server_certificate_arns,
domain_configuration_status,
service_type,
authorizer_config,
domain_type,
):
if service_type and service_type not in ["DATA", "CREDENTIAL_PROVIDER", "JOBS"]:
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration "
"operation: Service type %s not recognized." % service_type
)
self.domain_configuration_name = domain_configuration_name
self.domain_configuration_arn = "arn:aws:iot:%s:1:domainconfiguration/%s/%s" % (
region_name,
domain_configuration_name,
random_string(5),
)
self.domain_name = domain_name
self.server_certificates = []
if server_certificate_arns:
for sc in server_certificate_arns:
self.server_certificates.append(
{"serverCertificateArn": sc, "serverCertificateStatus": "VALID"}
)
self.domain_configuration_status = domain_configuration_status
self.service_type = service_type
self.authorizer_config = authorizer_config
self.domain_type = domain_type
self.last_status_change_date = time.time()
def to_description_dict(self):
return {
"domainConfigurationName": self.domain_configuration_name,
"domainConfigurationArn": self.domain_configuration_arn,
"domainName": self.domain_name,
"serverCertificates": self.server_certificates,
"authorizerConfig": self.authorizer_config,
"domainConfigurationStatus": self.domain_configuration_status,
"serviceType": self.service_type,
"domainType": self.domain_type,
"lastStatusChangeDate": self.last_status_change_date,
}
def to_dict(self):
return {
"domainConfigurationName": self.domain_configuration_name,
"domainConfigurationArn": self.domain_configuration_arn,
}
class IoTBackend(BaseBackend): class IoTBackend(BaseBackend):
def __init__(self, region_name=None): def __init__(self, region_name=None):
super(IoTBackend, self).__init__() super(IoTBackend, self).__init__()
@ -499,6 +555,7 @@ class IoTBackend(BaseBackend):
self.principal_things = OrderedDict() self.principal_things = OrderedDict()
self.rules = OrderedDict() self.rules = OrderedDict()
self.endpoint = None self.endpoint = None
self.domain_configurations = OrderedDict()
def reset(self): def reset(self):
region_name = self.region_name region_name = self.region_name
@ -1408,6 +1465,64 @@ class IoTBackend(BaseBackend):
raise ResourceNotFoundException() raise ResourceNotFoundException()
self.rules[rule_name].rule_disabled = True self.rules[rule_name].rule_disabled = True
def create_domain_configuration(
self,
domain_configuration_name,
domain_name,
server_certificate_arns,
validation_certificate_arn,
authorizer_config,
service_type,
):
if domain_configuration_name in self.domain_configurations:
raise ResourceAlreadyExistsException(
"Domain configuration with given name already exists."
)
self.domain_configurations[domain_configuration_name] = FakeDomainConfiguration(
self.region_name,
domain_configuration_name,
domain_name,
server_certificate_arns,
"ENABLED",
service_type,
authorizer_config,
"CUSTOMER_MANAGED",
)
return self.domain_configurations[domain_configuration_name]
def delete_domain_configuration(self, domain_configuration_name):
if domain_configuration_name not in self.domain_configurations:
raise ResourceNotFoundException("The specified resource does not exist.")
del self.domain_configurations[domain_configuration_name]
def describe_domain_configuration(self, domain_configuration_name):
if domain_configuration_name not in self.domain_configurations:
raise ResourceNotFoundException("The specified resource does not exist.")
return self.domain_configurations[domain_configuration_name]
def list_domain_configurations(self):
return [_.to_dict() for _ in self.domain_configurations.values()]
def update_domain_configuration(
self,
domain_configuration_name,
authorizer_config,
domain_configuration_status,
remove_authorizer_config,
):
if domain_configuration_name not in self.domain_configurations:
raise ResourceNotFoundException("The specified resource does not exist.")
domain_configuration = self.domain_configurations[domain_configuration_name]
if authorizer_config is not None:
domain_configuration.authorizer_config = authorizer_config
if domain_configuration_status is not None:
domain_configuration.domain_configuration_status = (
domain_configuration_status
)
if remove_authorizer_config is not None and remove_authorizer_config is True:
domain_configuration.authorizer_config = None
return domain_configuration
iot_backends = {} iot_backends = {}
for region in Session().get_available_regions("iot"): for region in Session().get_available_regions("iot"):

View File

@ -696,3 +696,40 @@ class IoTResponse(BaseResponse):
def disable_topic_rule(self): def disable_topic_rule(self):
self.iot_backend.disable_topic_rule(rule_name=self._get_param("ruleName")) self.iot_backend.disable_topic_rule(rule_name=self._get_param("ruleName"))
return json.dumps(dict()) return json.dumps(dict())
def create_domain_configuration(self):
domain_configuration = self.iot_backend.create_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName"),
domain_name=self._get_param("domainName"),
server_certificate_arns=self._get_param("serverCertificateArns"),
validation_certificate_arn=self._get_param("validationCertificateArn"),
authorizer_config=self._get_param("authorizerConfig"),
service_type=self._get_param("serviceType"),
)
return json.dumps(domain_configuration.to_dict())
def delete_domain_configuration(self):
self.iot_backend.delete_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName")
)
return json.dumps(dict())
def describe_domain_configuration(self):
domain_configuration = self.iot_backend.describe_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName")
)
return json.dumps(domain_configuration.to_description_dict())
def list_domain_configurations(self):
return json.dumps(
dict(domainConfigurations=self.iot_backend.list_domain_configurations())
)
def update_domain_configuration(self):
domain_configuration = self.iot_backend.update_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName"),
authorizer_config=self._get_param("authorizerConfig"),
domain_configuration_status=self._get_param("domainConfigurationStatus"),
remove_authorizer_config=self._get_bool_param("removeAuthorizerConfig"),
)
return json.dumps(domain_configuration.to_dict())

View File

@ -2346,3 +2346,213 @@ class TestTopicRules:
client.update_thing( client.update_thing(
thingName=thing_name, thingTypeName=deprecated_thing_type_name thingName=thing_name, thingTypeName=deprecated_thing_type_name
) )
class TestDomainConfigurations:
@mock_iot
def test_create_domain_configuration_only_name(self):
client = boto3.client("iot", region_name="us-east-1")
domain_config = client.create_domain_configuration(
domainConfigurationName="testConfig"
)
domain_config.should.have.key("domainConfigurationName").which.should.equal(
"testConfig"
)
domain_config.should.have.key("domainConfigurationArn").which.should_not.be.none
@mock_iot
def test_create_duplicate_domain_configuration_fails(self):
client = boto3.client("iot", region_name="us-east-1")
domain_config = client.create_domain_configuration(
domainConfigurationName="testConfig"
)
domain_config.should.have.key("domainConfigurationName").which.should.equal(
"testConfig"
)
domain_config.should.have.key("domainConfigurationArn").which.should_not.be.none
with pytest.raises(client.exceptions.ResourceAlreadyExistsException) as exc:
client.create_domain_configuration(domainConfigurationName="testConfig")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceAlreadyExistsException")
err["Message"].should.equal(
"Domain configuration with given name already exists."
)
@mock_iot
def test_create_domain_configuration_full_params(self):
client = boto3.client("iot", region_name="us-east-1")
domain_config = client.create_domain_configuration(
domainConfigurationName="testConfig",
domainName="example.com",
serverCertificateArns=["ARN1", "ARN2"],
validationCertificateArn="VARN",
authorizerConfig={
"defaultAuthorizerName": "name",
"allowAuthorizerOverride": True,
},
serviceType="DATA",
)
domain_config.should.have.key("domainConfigurationName").which.should.equal(
"testConfig"
)
domain_config.should.have.key("domainConfigurationArn").which.should_not.be.none
@mock_iot
def test_create_domain_configuration_invalid_service_type(self):
client = boto3.client("iot", region_name="us-east-1")
with pytest.raises(client.exceptions.InvalidRequestException) as exc:
client.create_domain_configuration(
domainConfigurationName="testConfig", serviceType="INVALIDTYPE"
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidRequestException")
err["Message"].should.equal(
"An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration operation: Service type INVALIDTYPE not recognized."
)
@mock_iot
def test_describe_nonexistent_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
with pytest.raises(client.exceptions.ResourceNotFoundException) as exc:
client.describe_domain_configuration(domainConfigurationName="doesntExist")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("The specified resource does not exist.")
@mock_iot
def test_describe_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(
domainConfigurationName="testConfig",
domainName="example.com",
serverCertificateArns=["ARN1", "ARN2"],
validationCertificateArn="VARN",
authorizerConfig={
"defaultAuthorizerName": "name",
"allowAuthorizerOverride": True,
},
serviceType="DATA",
)
described_config = client.describe_domain_configuration(
domainConfigurationName="testConfig"
)
described_config.should.have.key("domainConfigurationName").which.should.equal(
"testConfig"
)
described_config.should.have.key("domainConfigurationArn")
described_config.should.have.key("serverCertificates")
described_config.should.have.key("authorizerConfig")
described_config.should.have.key(
"domainConfigurationStatus"
).which.should.equal("ENABLED")
described_config.should.have.key("serviceType").which.should.equal("DATA")
described_config.should.have.key("domainType")
described_config.should.have.key("lastStatusChangeDate")
@mock_iot
def test_update_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(
domainConfigurationName="testConfig",
domainName="example.com",
serverCertificateArns=["ARN1", "ARN2"],
validationCertificateArn="VARN",
authorizerConfig={
"defaultAuthorizerName": "name",
"allowAuthorizerOverride": True,
},
serviceType="DATA",
)
client.update_domain_configuration(
domainConfigurationName="testConfig",
authorizerConfig={
"defaultAuthorizerName": "updatedName",
"allowAuthorizerOverride": False,
},
domainConfigurationStatus="DISABLED",
)
described_updated_config = client.describe_domain_configuration(
domainConfigurationName="testConfig"
)
described_updated_config.should.have.key(
"authorizerConfig"
).which.should.have.key("defaultAuthorizerName").which.should.equal(
"updatedName"
)
described_updated_config.should.have.key(
"authorizerConfig"
).which.should.have.key("allowAuthorizerOverride").which.should.equal(False)
described_updated_config.should.have.key(
"domainConfigurationStatus"
).which.should.equal("DISABLED")
@mock_iot
def test_update_domain_configuration_remove_authorizer_type(self):
client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(
domainConfigurationName="testConfig",
domainName="example.com",
serverCertificateArns=["ARN1", "ARN2"],
validationCertificateArn="VARN",
authorizerConfig={
"defaultAuthorizerName": "name",
"allowAuthorizerOverride": True,
},
serviceType="DATA",
)
client.update_domain_configuration(
domainConfigurationName="testConfig", removeAuthorizerConfig=True
)
described_updated_config = client.describe_domain_configuration(
domainConfigurationName="testConfig"
)
described_updated_config.should_not.have.key("authorizerConfig")
@mock_iot
def test_update_nonexistent_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
with pytest.raises(client.exceptions.ResourceNotFoundException) as exc:
client.update_domain_configuration(domainConfigurationName="doesntExist")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("The specified resource does not exist.")
@mock_iot
def test_list_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(domainConfigurationName="testConfig1")
client.create_domain_configuration(domainConfigurationName="testConfig2")
domain_configs = client.list_domain_configurations()
domain_configs.should.have.key(
"domainConfigurations"
).which.should.have.length_of(2)
domain_configs["domainConfigurations"][0].should.have.key(
"domainConfigurationName"
).which.should.equal("testConfig1")
domain_configs["domainConfigurations"][1].should.have.key(
"domainConfigurationName"
).which.should.equal("testConfig2")
@mock_iot
def test_delete_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(domainConfigurationName="testConfig")
domain_configs = client.list_domain_configurations()
domain_configs.should.have.key(
"domainConfigurations"
).which.should.have.length_of(1)
client.delete_domain_configuration(domainConfigurationName="testConfig")
domain_configs = client.list_domain_configurations()
domain_configs.should.have.key(
"domainConfigurations"
).which.should.have.length_of(0)
@mock_iot
def test_delete_nonexistent_domain_configuration(self):
client = boto3.client("iot", region_name="us-east-1")
with pytest.raises(client.exceptions.ResourceNotFoundException) as exc:
client.delete_domain_configuration(domainConfigurationName="doesntExist")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("The specified resource does not exist.")