RabbitMQ的基本使用

官网:RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ 安装

1
2
3
4
5
6
7
8
9
10
11
# 安装配置epel源
rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

# 安装erlang
yum -y install erlang

# 安装RabbitMQ
yum -y install rabbitmq-server

# 开启和关闭RabbitMQ
service rabbitmq-server start/stop

安装API

1
2
3
4
5
# python3安装
pip install pika

# python2安装
easy_install pik

生产者消费者

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('IP地址'))

# 有密码
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列) 并支持持久化
channel.queue_declare(queue='队列名称',durable=True)

channel.basic_publish(exchange='',
routing_key='队列名称', # 消息队列名称
body='发送的内容',
properties=pika.BasicProperties(
delivery_mode = 2, # 对信息进行持久化
))

# 关闭
connection.close()
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pika

# 连接rabbitMQ
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='队列名称')

# 设置闲置时消费,那个消费者消费完了就接收任务
channel.basic_qos(prefetch_count=1)

# 设置回调函数
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag) # 给服务端发送我接收到了

# 监听队列 no_ack=false 表示给服务端发送我接收到了的消息
channel.basic_consume(callback,queue='队列名称',no_ack=False)
# 开始监听
channel.start_consuming()

发布者和订阅者

全部订阅用户

发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pika
# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 设置 中间商 exchange_type : fanout(全部)
channel.exchange_declare(exchange='中间商名称',exchange_type='fanout')

# 向队列添加内容
channel.basic_publish(exchange='中间商名称',
routing_key='',
body='内容')

connection.close()

订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pika

# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 创建中间商(有的话就不会创建)
channel.exchange_declare(exchange='中间商名称',exchange_type='fanout')

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queu


# 让exchange和queque进行绑定.
channel.queue_bind(exchange='中间商名称',queue=queue_name)

# 设置回调函数
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)

# 对队列进行监听
channel.basic_consume(callback,queue=queue_name,no_ack=True)

# 开始监听
channel.start_consuming()
关键字发布

发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pika

# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 创建中间商 exchange_type direct:代表关键字队列
channel.exchange_declare(exchange='中间商名称',exchange_type='direct')

channel.basic_publish(exchange='中间商名称',
routing_key='关键字',
body='内容')

# 关闭
connection.close()

订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pika

# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 创建中间商 exchange_type direct:代表关键字队列
channel.exchange_declare(exchange='名称',exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让exchange和queque进行绑定. routing_key:指定关键字,可以绑定多次
channel.queue_bind(exchange='名称',queue=queue_name,routing_key='关键字1')
channel.queue_bind(exchange='名称',queue=queue_name,routing_key='关键字1')

# 设置回调函数
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)

# 对队列进行监听
channel.basic_consume(callback,queue=queue_name,no_ack=True)

# 开始监听
channel.start_consuming()
模糊匹配

发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pika

# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 创建一个中间商 topic:代表模糊匹配
channel.exchange_declare(exchange='名称',exchange_type='topic')

# 发送数据
channel.basic_publish(exchange='m3',
routing_key='xxx.xxx.py',
body='内容')

# 关闭
connection.close()

订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pika

# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 创建一个中间商 topic:代表模糊匹配
channel.exchange_declare(exchange='名称',exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让exchange和queque进行绑定. # :代表全部 *代表一个单词
channel.queue_bind(exchange='名称',queue=queue_name,routing_key='xxx.#')

# 回调函数
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)

# 监听
channel.basic_consume(callback,queue=queue_name,no_ack=True)

# 开始监听
channel.start_consuming()

基于RabbitMQ事项RPC

这是程序与程序之间的信息传递,所以起名中间商和服务商

中间商
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import pika
import uuid

class FibonacciRpcClient(object):
# 连接rabbitMQ
def __init__(self):
credentials = pika.PlainCredentials("root", "123")
self.connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14', credentials=credentials))
self.channel = self.connection.channel()

# 随机生成一个消息队列(用于接收结果)
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue

# 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())

# 中间商 给 服务商 发送一个任务: 任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称
self.channel.basic_publish(exchange='',
routing_key='rpc_queue', # 服务商接收任务的队列名称
properties=pika.BasicProperties(
reply_to = self.callback_queue, # 用于接收结果的队列
correlation_id = self.corr_id, # 任务ID
),
body=str(n))
# 循环监听信息有没有传回来
while self.response is None:
# process_data_events也是一直监听有没有值
self.connection.process_data_events()

return self.response

# 实例化
fibonacci_rpc = FibonacciRpcClient()
# 调用程序
response = fibonacci_rpc.call(50)
print('返回结果:',response)
服务商
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pika

# 连接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 服务商监听任务队列
channel.queue_declare(queue='rpc_queue')

# 回调函数
def on_request(ch, method, props, body):
# 接收到内容并处理
n = int(body)
response = n + 100

# 把处理好的信息通过队列把它返回回去
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 指定队列名称
properties=pika.BasicProperties(correlation_id= props.correlation_id), # 返回id
body=str(response)) # body返回的是内容
# 返回一个ack 给服务端发送我接收到了
ch.basic_ack(delivery_tag=method.delivery_tag)

# 设置闲置时消费,那个消费者消费完了就接收任务
channel.basic_qos(prefetch_count=1)

# 监听队列
channel.basic_consume(on_request, queue='rpc_queue')
# 开始监听
channel.start_consuming()