From 346ffe3d23e876db7108dae53c455751db381f7f Mon Sep 17 00:00:00 2001 From: Shikong <919411476@qq.com> Date: Sat, 30 Mar 2024 20:29:33 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../python/{producer.py => api_producer.py} | 0 .../src/test/python/kafka_producer.py | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+) rename kafka-01/kafka-01-producer/src/test/python/{producer.py => api_producer.py} (100%) create mode 100644 kafka-01/kafka-01-producer/src/test/python/kafka_producer.py diff --git a/kafka-01/kafka-01-producer/src/test/python/producer.py b/kafka-01/kafka-01-producer/src/test/python/api_producer.py similarity index 100% rename from kafka-01/kafka-01-producer/src/test/python/producer.py rename to kafka-01/kafka-01-producer/src/test/python/api_producer.py diff --git a/kafka-01/kafka-01-producer/src/test/python/kafka_producer.py b/kafka-01/kafka-01-producer/src/test/python/kafka_producer.py new file mode 100644 index 0000000..9dfb0c7 --- /dev/null +++ b/kafka-01/kafka-01-producer/src/test/python/kafka_producer.py @@ -0,0 +1,34 @@ +from concurrent.futures import ThreadPoolExecutor + +from pykafka import KafkaClient + +import json + +client = KafkaClient(hosts="10.10.10.200:9192,10.10.10.200:9292,10.10.10.200:9392") + +topic = client.topics['test'] + +# workers = os.cpu_count() +workers = 1000 +max_message = 1000000 + + +def send(producer, num: int): + data = { + "data": "这是一个消息,num=%s" % num + } + try: + print("发送第 %s 条消息" % num) + producer.produce(json.dumps(data).encode(), partition_key="{}".format(num).encode()) + except Exception as e: + print(e) + + +if __name__ == '__main__': + for topics in client.topics.keys(): + print(topics) + + with topic.get_producer(sync=False, delivery_reports=False) as producer: + with ThreadPoolExecutor(max_workers=workers) as worker: + for i in range(max_message): + worker.submit(send, producer, i)