fastapi 封装

This commit is contained in:
shikong 2024-08-01 20:39:20 +08:00
parent 447aaa5468
commit 546b6caa90
Signed by: Shikong
GPG Key ID: BD85FF18B373C341

108
main.py
View File

@ -1,33 +1,87 @@
import math import uvicorn
import time from fastapi import FastAPI
from pydantic import BaseModel
from typing import Any, Union
import uiautomator2 as u2 import uiautomator2 as u2
import time
import random import random
d = u2.connect("10.10.10.27:5555")
print(d.info)
print(d.device_info)
width, height = d.window_size() app = FastAPI()
print("width:{}, height:{}".format(width, height))
print(d.wlan_ip) devices = {
print(d.serial)
print(d.app_current())
d.app_start("com.ss.android.ugc.aweme", stop=False)
# d.xpath("关闭").wait(timeout=1)
random.seed(time.time())
for i in range(10):
startX = random.randint(0,int(width))
endX = round(startX + random.random(), 1)
endY = random.randint(0, int(height/10)) }
startY = round(endY * 9.75, 1)
class Device(BaseModel):
serial: str
class Response(BaseModel):
code: int = 200
msg: str = "OK"
data: Any = None
@app.get("/health", response_model=Response)
def health():
return Response()
@app.post("/device/connect", response_model=Response)
def connect_device(item: Device):
try:
d = u2.connect(item.serial)
except Exception as e:
return Response(code=500, msg=str(e))
print(d.info)
print(d.device_info)
devices[item.serial] = d
return Response(data=d.device_info)
@app.post("/device/size", response_model=Response)
def get_device_size(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
# d.swipe_ext("up", scale=0.95, duration=0.1) d = devices[item.serial]
print(f"({startX}, {startY}) => ({endX}, {endY})") width, height = d.window_size()
d.swipe(startX, startY, endX + random.random(), endY, duration=0.05 + random.random() * 0.1) return Response(data={
time.sleep(5 + random.random() * 2) "width": width,
"heighth": height,
})
# width, height = d.window_size()
# print("width:{}, height:{}".format(width, height))
# print(d.wlan_ip)
# print(d.serial)
# print(d.app_current())
# d.app_start("com.ss.android.ugc.aweme", stop=False)
#
@app.post("/task/app/douyin")
def douyin_task(item: Device):
if item.serial not in devices:
return {"error": "设备 {} 不存在".format(item.serial)}
d = devices[item.serial]
d.app_start("com.ss.android.ugc.aweme", stop=False)
# d.xpath("关闭").wait(timeout=1)
width, height = d.window_size()
random.seed(time.time())
for i in range(10):
startX = random.randint(0,int(width))
endX = round(startX + random.random(), 1)
endY = random.randint(0, int(height/10))
startY = round(endY * 9.75, 1)
# d.swipe_ext("up", scale=0.95, duration=0.1)
print(f"({startX}, {startY}) => ({endX}, {endY})")
d.swipe(startX, startY, endX + random.random(), endY, duration=0.05 + random.random() * 0.1)
time.sleep(5 + random.random() * 2)
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True)