banner
Hi my new friend!

windwos rabbitmq python配置学习

Scroll down

环境配置

下载

平台

windwos平台

erlang环境

rabbitmq是基于erlang环境的,必须要具备erlang语言的环境才可以使用

以下是提供的erlang下载路径: Downloads - Erlang/OTP

需要注意的是, 可能rabbitmq并不能支持最新版本的erlang,需要根据具体的官网信息来选择erlang版本,在rabbitmq官网会对此做具体声明: Erlang Version Requirements | RabbitMQ

(by the way: erlang的安装非常受网络影响,建议选择美区节点)

image-20250722233713096

对于erlang来说, 他和一个正常的python包是一样的,都是语言环境,都需要对此做一个具体的环境变量配置(自行配置)

安装完成,设置好环境变量之后,起一个新的终端页面(设置前打开的bash窗口暂未同步),来验证安装完成情况

1
2
erl或者
erl -version()

安装成功!

image-20250722234820405

安装RabbitMQ本体

下载RabbitMQ Server,官方下载链接: Installing on Windows | RabbitMQ

注意需要在已经下载完erlang包并且正确配置到环境变量中才可以开始运行RabbitMQ安装包,否则根本无法运行

尝试运行来检验是否成功安装

1
rabbitmq-plugins enable rabbitmq_management

出现下面内容表示成功安装

image-20250723000028422

该命令的功能说明

  1. 启用 Web 管理界面
    • 安装并激活 rabbitmq_management 插件后,会提供一个基于 Web 的图形化管理控制台(默认端口 15672)。
    • 通过浏览器访问 http://localhost:15672 即可登录管理界面(默认用户名/密码为 guest/guest)。
  2. 提供监控与管理功能
    • 查看队列、交换机、绑定的实时状态。
    • 管理用户、权限、虚拟主机(vhost)。
    • 监控消息流量、连接数、节点健康状况等。
  3. 暴露 HTTP API
    • 插件会提供一套 RESTful API(端口 15672),方便通过编程方式管理 RabbitMQ。
  4. 注册windwos服务
    • 插件会注册windwos服务并且设置为自动启动,会占用这个15672端口以及部分资源

实际上是存在命令无法正常启动RabbitMQ Server,就需要在win r中启动”服务”来手动启动RabbitMQ

1
services.msc

访问: http://localhost:15672/

image-20250723002221691

正常启动关闭

rabbitmq有多种启动和关闭方式,只要不要混用就不会出问题

区分好以下命令

1
2
3
4
rabbitmq-server.bat # 这个批命令不接受stop参数 使用就是启动
rabbitmq-service.bat
rabbitmqctl
rabbitmq-plugins

正常启动

1
2
3
4
5
6
7
# 启动插件
rabbitmq-plugins enable rabbitmq_management

rabbitmq-server.bat

# 检查是否启动成功
rabbitmq-server.bat status

正常关闭

1
2
rabbitmq-plugins disable rabbitmq_management
rabbitmqctl stop

image-20250723013509000

问题研究

不知道在研究啥 几个命令试一试就行了 大不了重启

  1. 启动命令

    1
    2
    rabbitmq-plugins enable rabbitmq_management
    rabbitmq-server start

    如果服务无法正常启动,就win+r输入services.msc手动启动这个服务

    尝试访问网页: http://localhost:15672/#/

    密码为: guest guest

  2. 使用启动插件命令错误的时候rabbitmq-plugins enable rabbitmq_management,如果出现这种问题
    image-20250723005841758
    使用命令解决,重启插件

    1
    2
    rabbitmq-plugins disable rabbitmq_management
    rabbitmq-plugins enable rabbitmq_management
  3. 如何关闭rabbitmq(强制关闭无法正常访问的前提下)

    打开管理员下的cmd,执行命令杀死进程

    1
    taskkill /F /IM erl.exe

    然后查看浏览器端口是否还在占用

    1
    netstat -ano | findstr "25672"

    然后发现还存在占用

    image-20250723011222852

    强杀PID

    1
    2
    3
    taskkill /F /PID 37648
    重新检查端口
    netstat -ano | findstr "25672"

    如果一直被拉起课尝试关闭(可能是注册了服务)

    1
    2
    3
    4
    sc delete RabbitMQ # 删除服务(本教程中不使用服务的方式开关
    rabbitmqctl stop
    # 如果无效
    rabbitmq-service.bat stop
  4. 如果在执行rabbitmqctl stop的时候出现了以下问题

    image-20250723015435972

    这是由于
    核心问题是:RabbitMQ节点 rabbit@LAPTOP-HB2HIA34 并未运行,导致 rabbitmqctl stop 命令无法执行
    可能已通过其他方式(如强制终止进程、服务崩溃等)关闭了RabbitMQ,但未通过正常流程停止,导致残留状态不一致。

    这个时候可以直接通过启动rabbitmqserver来解决

    1
    2
    rabbitmq-plugins enable rabbitmq_management
    rabbitmq-server start

    然后在执行rabbitmqctl status的时候会发现一切都正常起来了

    image-20250723015902220

  5. 如果在执行完所有的命令之后确认: 网页无法访问/windwos服务状态关闭/端口监听关闭
    然后执行命令rabbitmqctl status
    image-20250723020438101
    这个时候的报错是正常的,因为他的检查状态原理是和节点连接,但是节点已经关闭,下面是AI解释

    • rabbitmqctl status 的工作原理

      • 该命令需要与运行的RabbitMQ节点(Erlang虚拟机)建立连接,获取状态信息。

      • 节点未运行时:由于目标节点 rabbit@LAPTOP-HB2HIA34 已通过 rabbitmqctl stop 关闭,EPMD(Erlang端口映射守护进程)会直接返回“节点未运行”的提示(如诊断信息所示)。

    • 这是预期行为:节点停止后,rabbitmqctl status 必然报错,因为无法与不存在的进程通信。

    • 您的操作逻辑矛盾点

    • 您已明确停止服务(rabbitmqctl stop),此时节点进程已终止。

    • 端口无占用:5672(AMQP)和15672(管理插件)无监听,进一步证明服务已完全关闭。

    • 此时检查状态无意义:停止的服务无法返回状态信息,报错是正常的。

RabbitMQ默认端口

RabbitMQ的主要端口号

RabbitMQ默认使用以下端口号:

  • 5672:这是RabbitMQ的默认端口,用于AMQP协议(不带TLS)的客户端连接。
  • 5671:当使用TLS加密连接时,AMQP协议会使用这个端口。
  • 15672:如果启用了管理插件,这个端口用于访问RabbitMQ的Web管理界面,默认用户名和密码都是guest
  • 15674:这个端口与Web STOMP插件相关,允许通过WebSocket连接使用STOMP协议。

协同python

简单例子

  1. 启动RabbbitMQ Server

    1
    2
    3
    4
    5
    6
    7
    # 启动插件
    rabbitmq-plugins enable rabbitmq_management

    rabbitmq-server.bat

    # 检查是否启动成功
    rabbitmq-server.bat status
  2. Pycharm启动一个新的项目(独立项目避免环境冲突)
    image-20250725112358335
    路径 + 项目名
    image-20250725112534318

  3. 连接端口和密码,这里使用的都是默认的参数
    账号密码: guest guest
    端口: 5672
    web管理页面(这是通过插件打开的): http://localhost:15672/#/

  4. 生产者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import pika
    import json

    credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码
    # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,virtual_host = '/',credentials = credentials))
    channel=connection.channel()
    # 声明消息队列,消息将在这个队列传递,如不存在,则创建
    result = channel.queue_declare(queue = 'python-test')

    for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
    # 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
    print(message)
    connection.close()
  5. 消费者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import pika

    credentials = pika.PlainCredentials('guest', 'guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
    channel.queue_declare(queue = 'python-test', durable = False)
    # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
    def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

    # 告诉rabbitmq,用callback来接收消息
    channel.basic_consume('python-test',callback)
    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()

协同AI

本次使用的协同ai是gemini,申请api网址如下:

全局代理设置

为了不去搞懂每个库的代理到底要怎么设置,直接偷个小懒,使用全局socks5代理,我使用的是v2ray,也可以代理到对应的代理端口上,注意Gemini Api的地区使用政策,如果发现挂了代理出页面但是不能直接上就可能是地区选择有问题

  1. 库安装

    1
    2
    3
    4
    5
    6
    pip install -r requestments

    # 或者
    pip install PySocks
    pip install pika
    pip install pip install google-genai

    requestments

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    annotated-types==0.7.0
    anyio==4.9.0
    cachetools==5.5.2
    certifi==2025.7.14
    charset-normalizer==3.4.2
    colorama==0.4.6
    exceptiongroup==1.3.0
    google-ai-generativelanguage==0.6.15
    google-api-core==2.25.1
    google-api-python-client==2.177.0
    google-auth==2.40.3
    google-auth-httplib2==0.2.0
    google-genai==1.27.0
    google-generativeai==0.8.5
    googleapis-common-protos==1.70.0
    grpcio==1.74.0
    grpcio-status==1.71.2
    h11==0.16.0
    httpcore==1.0.9
    httplib2==0.22.0
    httpx==0.28.1
    idna==3.10
    pika==1.3.2
    proto-plus==1.26.1
    protobuf==5.29.5
    pyasn1==0.6.1
    pyasn1_modules==0.4.2
    pydantic==2.11.7
    pydantic_core==2.33.2
    pyparsing==3.2.3
    PySocks==1.7.1
    requests==2.32.4
    rsa==4.9.1
    sniffio==1.3.1
    tenacity==8.5.0
    tqdm==4.67.1
    typing-inspection==0.4.1
    typing_extensions==4.14.1
    uritemplate==4.2.0
    urllib3==2.5.0
    websockets==15.0.1
  2. new socks5_global_proxy

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import socks
    import socket

    # Proxy details
    proxy_ip = "127.0.0.1" # Replace with your SOCKS5 proxy IP address
    proxy_port = 10808 # Replace with your SOCKS5 proxy port

    # Optional: Authentication if your proxy requires it
    # username = "your_username"
    # password = "your_password"

    # Set up the SOCKS proxy
    socks.set_default_proxy(socks.SOCKS5, proxy_ip, proxy_port)
    # If authentication is required:
    # socks.set_default_proxy(socks.SOCKS5, proxy_ip, proxy_port, username=username, password=password)

    # Patch the default socket to use PySocks
    socket.socket = socks.socksocket

    在需要代理的项目直接import

    1
    from socks5_global_proxy import *

代码设置

  1. Gemini测试代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    from socks5_global_proxy import *
    from google import genai

    # 初始化客户端
    client = genai.Client(api_key="YOUR_API_KEY")

    # 发送请求
    response = client.models.generate_content(
    model="gemini-2.5-pro",
    contents="你是谁",
    )

    # 输出结果
    print(response.text)

    封装成接口来供调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from socks5_global_proxy import *
    from google import genai

    with open ('tishici.txt', 'r', encoding='utf-8') as f:
    tishici = f.read()

    def Gemini_replay(contents):
    # 初始化客户端
    client = genai.Client(api_key="YOUR_API_KEY")

    # 发送请求
    response = client.models.generate_content(
    model="gemini-2.5-pro",
    contents=str(tishici) + str(contents),
    )

    # 输出结果
    return response.text
  2. 消费者生产代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import pika
    from 另一个封装好的对象类 import *
    from Gemini_api_test import *

    credentials = pika.PlainCredentials('guest', 'guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
    channel.queue_declare(queue = 'craxpro', durable = False)
    # 定义一个回调函数来处理消息队列中的消息,这里是打印出来


    AAAA = 另一个封装好的对象类()


    def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    # print(body.decode())
    all_content = json.loads(body.decode())
    print(f"正在处理:{all_content['title']}")
    post_url = all_content['post_url']
    content = AAAA.get_content(post_url)
    all_content['content'] = content
    print(Gemini_replay(all_content))

    # 告诉rabbitmq,用callback来接收消息
    channel.basic_consume('另一个封装好的对象类名',callback)
    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()
其他文章
cover
碎碎念
  • 25/06/17
  • 01:02
  • 未分类
目录导航 置顶
  1. 1. 环境配置
    1. 1.1. 下载
      1. 1.1.1. 平台
      2. 1.1.2. erlang环境
      3. 1.1.3. 安装RabbitMQ本体
    2. 1.2. 正常启动关闭
      1. 1.2.1. 正常启动
      2. 1.2.2. 正常关闭
    3. 1.3. 问题研究
  2. 2. RabbitMQ默认端口
  3. 3. 协同python
    1. 3.1. 简单例子
    2. 3.2. 协同AI
      1. 3.2.1. 全局代理设置
    3. 3.3. 代码设置
请输入关键词进行搜索