Custom Search

Sunday, December 28, 2014

RabbitMQ Topic Exchange Python example of Publish and Consume

1)
Install pika
#sudo pip install pika

2)
Create exchange and queue and bind it

#!/usr/bin/env python
import pika
import sys

#https://www.rabbitmq.com/tutorials/tutorial-five-python.html

credentials = pika.PlainCredentials('guest', 'cloud')#username,password

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

exchange_name = 'my_exchange'
##Declare/Create a topic type exchange named "my_exchange"
channel.exchange_declare(exchange=exchange_name,
                         exchange_type='topic')
#default exchange_type is 'direct'

##Declare/Create a queue
queue_name = 'my_queue'
result = channel.queue_declare(queue=queue_name)

##Bind queue to exchange with routing_key/binding_key
binding_key = "my_key_2015"
channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key=binding_key)


connection.close()


3)
Publish a Message to exchange named "my_exchange"

#!/usr/bin/env python
import pika
import sys

#https://www.rabbitmq.com/tutorials/tutorial-five-python.html

credentials = pika.PlainCredentials('guest', 'cloud')#username,password

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

##Publish/send a message to the exchange named "my_exchange" with routing_key to route the message to correct queue
message = "Hello World"
exchange_name = 'my_exchange'
routing_key = 'my_key_2015'
channel.basic_publish(exchange=exchange_name,
                      routing_key=routing_key,
                      body=message)

connection.close()


4)
Consume the published Message from the exchange named "my_exchange"

#!/usr/bin/env python
import pika
import sys

#https://www.rabbitmq.com/tutorials/tutorial-five-python.html

credentials = pika.PlainCredentials('guest', 'cloud')#username,password

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

##Get the exchange named 'my_exchange'
exchange_name = 'my_exchange'
channel.exchange_declare(exchange=exchange_name,
                         type='topic')

##Function to be called when there is a message in the queue named "my_queue"
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

##Consume the queue named "my_queue" and wait for the message
queue_name = 'my_queue'
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()



5)
Screen shots




6)
AMQP concepts
https://www.rabbitmq.com/tutorials/amqp-concepts.html







No comments:

Post a Comment