Skip to content

同时订阅多个server_address的topic,产生的问题 #153

@kennard520

Description

@kennard520

`
class RocketMQListener:
def init(self, server_address, topic, group_id, mq_tag, callback):
self.server_address = server_address
self.topic = topic
self.group_id = group_id
self.mq_tag = mq_tag
self.callback = callback
self.consumer = None
self.stop_event = threading.Event()

def start(self):
    self.consumer = PushConsumer(self.group_id)
    self.consumer.set_group(self.group_id)
    self.consumer.set_thread_count(1)
    self.consumer.set_name_server_address(self.server_address)
    self.consumer.subscribe(self.topic, self.process_message, self.mq_tag)
    self.consumer.start()
    logging.info(f"Started RocketMQ listener for {self.server_address} topic {self.topic} and group {self.group_id} and mq_tag {self.mq_tag}.")

    while not self.stop_event.is_set():
        time.sleep(5)
        pass

    self.consumer.shutdown()

def process_message(self, message):
    return self.callback(message)

def stop(self):
    self.stop_event.set()`

`import threading
from contextlib import asynccontextmanager

import uvicorn
from fastapi import FastAPI
from starlette.responses import JSONResponse

from rocketmq_listener import RocketMQListener
from rocketmq.client import PushConsumer, ConsumeStatus

app = FastAPI()
brokers_and_topics = [
("192.168.0.137:9876", "kl-video-editing-l", "kl-group-video-process-15", "video_process_create_tag"),
("192.168.0.138:9876", "kl-video-editing", "kl-group-video-process-16", "video_process_create_tag")
]

listeners=[]

def test(msg):
print(msg)
return ConsumeStatus.RECONSUME_LATER

def start_listener(server_address, topic, group_id, tag):
listener = RocketMQListener(server_address, topic, group_id, tag, test)
listener.start()
listeners.append(listener)
@asynccontextmanager
async def lifespan(app: FastAPI):
for topic in brokers_and_topics:
threading.Thread(target=start_listener, args=topic).start()
yield
# Clean up the ML models and release the resources
app = FastAPI(lifespan=lifespan)

if name == 'main':
try:
uvicorn.run("main:app", host="0.0.0.0", port=5007, log_level="warning", workers=1)
except Exception as e:
print(e)
`如上所示,我用两个线程分别订阅192.168.0.137和192.168.0.138的topic,而且groupid不一样,最后却在192.168.0.137上创建了两个消费者
image

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions