55 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			55 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								import boto
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import sure  # noqa
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from moto import mock_sns
							 | 
						||
| 
								 | 
							
								from moto.sns.models import DEFAULT_TOPIC_POLICY, DEFAULT_EFFECTIVE_DELIVERY_POLICY
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@mock_sns
							 | 
						||
| 
								 | 
							
								def test_create_and_delete_topic():
							 | 
						||
| 
								 | 
							
								    conn = boto.connect_sns()
							 | 
						||
| 
								 | 
							
								    conn.create_topic("some-topic")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    topics_json = conn.get_all_topics()
							 | 
						||
| 
								 | 
							
								    topics = topics_json["ListTopicsResponse"]["ListTopicsResult"]["Topics"]
							 | 
						||
| 
								 | 
							
								    topics.should.have.length_of(1)
							 | 
						||
| 
								 | 
							
								    topics[0]['TopicArn'].should.equal("arn:aws:sns:us-east-1:123456789012:some-topic")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Delete the topic
							 | 
						||
| 
								 | 
							
								    conn.delete_topic(topics[0]['TopicArn'])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # And there should now be 0 topics
							 | 
						||
| 
								 | 
							
								    topics_json = conn.get_all_topics()
							 | 
						||
| 
								 | 
							
								    topics = topics_json["ListTopicsResponse"]["ListTopicsResult"]["Topics"]
							 | 
						||
| 
								 | 
							
								    topics.should.have.length_of(0)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@mock_sns
							 | 
						||
| 
								 | 
							
								def test_topic_attributes():
							 | 
						||
| 
								 | 
							
								    conn = boto.connect_sns()
							 | 
						||
| 
								 | 
							
								    conn.create_topic("some-topic")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    topics_json = conn.get_all_topics()
							 | 
						||
| 
								 | 
							
								    topic_arn = topics_json["ListTopicsResponse"]["ListTopicsResult"]["Topics"][0]['TopicArn']
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    attributes = conn.get_topic_attributes(topic_arn)['GetTopicAttributesResponse']['GetTopicAttributesResult']['Attributes']
							 | 
						||
| 
								 | 
							
								    attributes["TopicArn"].should.equal("arn:aws:sns:us-east-1:123456789012:some-topic")
							 | 
						||
| 
								 | 
							
								    attributes["Owner"].should.equal(123456789012)
							 | 
						||
| 
								 | 
							
								    attributes["Policy"].should.equal(DEFAULT_TOPIC_POLICY)
							 | 
						||
| 
								 | 
							
								    attributes["DisplayName"].should.equal("")
							 | 
						||
| 
								 | 
							
								    attributes["SubscriptionsPending"].should.equal(0)
							 | 
						||
| 
								 | 
							
								    attributes["SubscriptionsConfirmed"].should.equal(0)
							 | 
						||
| 
								 | 
							
								    attributes["SubscriptionsDeleted"].should.equal(0)
							 | 
						||
| 
								 | 
							
								    attributes["DeliveryPolicy"].should.equal("")
							 | 
						||
| 
								 | 
							
								    attributes["EffectiveDeliveryPolicy"].should.equal(DEFAULT_EFFECTIVE_DELIVERY_POLICY)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    conn.set_topic_attributes(topic_arn, "Policy", {"foo": "bar"})
							 | 
						||
| 
								 | 
							
								    conn.set_topic_attributes(topic_arn, "DisplayName", "My display name")
							 | 
						||
| 
								 | 
							
								    conn.set_topic_attributes(topic_arn, "DeliveryPolicy", {"http": {"defaultHealthyRetryPolicy": {"numRetries": 5}}})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    attributes = conn.get_topic_attributes(topic_arn)['GetTopicAttributesResponse']['GetTopicAttributesResult']['Attributes']
							 | 
						||
| 
								 | 
							
								    attributes["Policy"].should.equal("{'foo': 'bar'}")
							 | 
						||
| 
								 | 
							
								    attributes["DisplayName"].should.equal("My display name")
							 | 
						||
| 
								 | 
							
								    attributes["DeliveryPolicy"].should.equal("{'http': {'defaultHealthyRetryPolicy': {'numRetries': 5}}}")
							 |