54 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			54 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import unicode_literals
 | 
						|
from six.moves.urllib.parse import parse_qs
 | 
						|
 | 
						|
import boto
 | 
						|
from freezegun import freeze_time
 | 
						|
import sure  # noqa
 | 
						|
 | 
						|
from moto.packages.responses import responses
 | 
						|
from moto import mock_sns_deprecated, mock_sqs_deprecated
 | 
						|
 | 
						|
 | 
						|
@mock_sqs_deprecated
 | 
						|
@mock_sns_deprecated
 | 
						|
def test_publish_to_sqs():
 | 
						|
    conn = boto.connect_sns()
 | 
						|
    conn.create_topic("some-topic")
 | 
						|
    topics_json = conn.get_all_topics()
 | 
						|
    topic_arn = topics_json["ListTopicsResponse"][
 | 
						|
        "ListTopicsResult"]["Topics"][0]['TopicArn']
 | 
						|
 | 
						|
    sqs_conn = boto.connect_sqs()
 | 
						|
    sqs_conn.create_queue("test-queue")
 | 
						|
 | 
						|
    conn.subscribe(topic_arn, "sqs",
 | 
						|
                   "arn:aws:sqs:us-east-1:123456789012:test-queue")
 | 
						|
 | 
						|
    conn.publish(topic=topic_arn, message="my message")
 | 
						|
 | 
						|
    queue = sqs_conn.get_queue("test-queue")
 | 
						|
    message = queue.read(1)
 | 
						|
    message.get_body().should.equal('my message')
 | 
						|
 | 
						|
 | 
						|
@mock_sqs_deprecated
 | 
						|
@mock_sns_deprecated
 | 
						|
def test_publish_to_sqs_in_different_region():
 | 
						|
    conn = boto.sns.connect_to_region("us-west-1")
 | 
						|
    conn.create_topic("some-topic")
 | 
						|
    topics_json = conn.get_all_topics()
 | 
						|
    topic_arn = topics_json["ListTopicsResponse"][
 | 
						|
        "ListTopicsResult"]["Topics"][0]['TopicArn']
 | 
						|
 | 
						|
    sqs_conn = boto.sqs.connect_to_region("us-west-2")
 | 
						|
    sqs_conn.create_queue("test-queue")
 | 
						|
 | 
						|
    conn.subscribe(topic_arn, "sqs",
 | 
						|
                   "arn:aws:sqs:us-west-2:123456789012:test-queue")
 | 
						|
 | 
						|
    conn.publish(topic=topic_arn, message="my message")
 | 
						|
 | 
						|
    queue = sqs_conn.get_queue("test-queue")
 | 
						|
    message = queue.read(1)
 | 
						|
    message.get_body().should.equal('my message')
 |