from faker import Faker\nimport random\nfrom datetime import datetime, timedelta\nfrom kafka import KafkaProducer\nfrom kafka.errors import KafkaError\nimport traceback\nimport json\nimport time\nimport subprocess\n\n# $KAFKA_HOME/bin/kafka-topics.sh --zookeeper pnode1:2181,pnode2:2181,pnode3:2181 --topic topic_lakehouse_001 --delete\n# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server pnode1:9092,pnode2:9092,pnode3:9092 --topic topic_lakehouse_001\n# $KAFKA_HOME/bin/kafka-topics.sh --zookeeper pnode1:2181,pnode2:2181,pnode3:2181 --topic topic_lakehouse_001 --describe\n# $KAFKA_HOME/bin/kafka-topics.sh --zookeeper pnode1:2181,pnode2:2181,pnode3:2181 --list\n# $KAFKA_HOME/bin/kafka-topics.sh --zookeeper pnode1:2181,pnode2:2181,pnode3:2181 --topic topic_lakehouse_001 --delete\n\n# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list pnode1:9092,pnode2:9092,pnode3:9092 --topic topic_lakehouse_001 --time -1\n# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list pnode1:9092,pnode2:9092,pnode3:9092 --topic topic_lakehouse_001 --time -2\n\n# nohup python3 /data/zxc/datagen_to_kafka.py >/data/zxc/log$(date +"%Y-%m-%d-%H-%M-%S").log 2>&1 &\n\nsubprocess.Popen("$KAFKA_HOME/bin/kafka-topics.sh --zookeeper pnode1:2181,pnode2:2181,pnode3:2181 --create --topic topic_lakehouse_001 --partitions 16 --replication-factor 3", shell=True)\n\n# 假设生产的消息为json字符串\nproducer = KafkaProducer(\n bootstrap_servers=['pnode1:9092','pnode2:9092','pnode3:9092'],\n key_serializer=lambda k: json.dumps(k).encode(),\n value_serializer=lambda v: json.dumps(v).encode())\n \nclass LoginData:\n\n def init(self):\n fake = Faker()\n self.seq = random.randint(1, 1125899906842624)\n self.password = "Password@123"\n self.email = fake.email()\n self.username = fake.first_name()\n self.first_name = fake.first_name()\n self.last_name = fake.last_name()\n self.address = fake.address().replace("\n", " ")\n self.phone = random.randint(9000000000, 9999999999)\n self.city = fake.city()\n self.about = "This is a sample text : about"\n self.date = datetime.now() - timedelta(days=random.randint(1, 365))\n self.date_str = self.date.strftime('%Y-%m-%d')\n self.datetime_str = self.date.strftime('%Y-%m-%d %H:%M:%S')\n self.ts = int(round(time.time() * 1000))\n\n def get_json(self):\n p = {\n "seq": self.seq,\n "password": self.password,\n "email": self.email,\n "username": self.first_name,\n "first_name": self.first_name,\n "last_name": self.last_name,\n "address": self.address,\n "phone": self.phone,\n "city": self.city,\n "about": self.about,\n "date_str": self.date_str,\n "datetime_str": self.datetime_str,\n "ts": self.ts\n }\n return (p, self.date_str)\n\n\n\ndef input_data(x):\n\n for i in range(0, x):\n logindata = LoginData()\n tup = logindata.get_json()\n #vv = str(tup[0]).replace("'", '"') # 替换单引号\n #print(tup[0])\n future = producer.send(\n 'topic_lakehouse_001',\n key=tup[1], # 同一个key值,会被送至同一个分区\n value=tup[0])\n #future = producer.send(\n # 'topic_lakehouse_001',\n # key=tup[1], # 同一个key值,会被送至同一个分区\n # value=tup[0],\n # partition=0) # 向分区1发送消息\n #print ("start: %s" % ( time.ctime(time.time()) ))\n try:\n future.get(timeout=10) # 监控是否发送成功\n except KafkaError as e1: # 发送失败抛出KafkaError\n #traceback.format_exc()\n print "发送失败:", e1\n except kafka.errors.UnknownTopicOrPartitionError as e2:\n print "发送失败:", e2\n #print ("end: %s" % ( time.ctime(time.time()) ))\n\n\ndef main():\n while (1 == 1): \n num_of_input = 1000\n print "*********************** num_of_input: ", num_of_input\n input_data(num_of_input)\n time.sleep(1)\n\nmain()

Python 2 Kafka 数据生成器 - 将 Python 3 代码移植到 Python 2

原文地址: https://www.cveoy.top/t/topic/qcyC 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录