This commit is contained in:
Shikong 2024-03-30 20:29:33 +08:00
parent a627cc1a1b
commit 346ffe3d23
2 changed files with 34 additions and 0 deletions

View File

@ -0,0 +1,34 @@
from concurrent.futures import ThreadPoolExecutor
from pykafka import KafkaClient
import json
client = KafkaClient(hosts="10.10.10.200:9192,10.10.10.200:9292,10.10.10.200:9392")
topic = client.topics['test']
# workers = os.cpu_count()
workers = 1000
max_message = 1000000
def send(producer, num: int):
data = {
"data": "这是一个消息,num=%s" % num
}
try:
print("发送第 %s 条消息" % num)
producer.produce(json.dumps(data).encode(), partition_key="{}".format(num).encode())
except Exception as e:
print(e)
if __name__ == '__main__':
for topics in client.topics.keys():
print(topics)
with topic.get_producer(sync=False, delivery_reports=False) as producer:
with ThreadPoolExecutor(max_workers=workers) as worker:
for i in range(max_message):
worker.submit(send, producer, i)