133 lines
3.5 KiB
Python
133 lines
3.5 KiB
Python
import uvicorn
|
|
from fastapi import FastAPI
|
|
from pydantic import BaseModel
|
|
from typing import Any, Union
|
|
import uiautomator2 as u2
|
|
import time
|
|
import random
|
|
|
|
import tasks.douyin
|
|
import tasks.wechat
|
|
|
|
app = FastAPI()
|
|
|
|
devices = {
|
|
|
|
}
|
|
|
|
|
|
class Device(BaseModel):
|
|
serial: str
|
|
|
|
|
|
class Response(BaseModel):
|
|
code: int = 200
|
|
msg: str = "OK"
|
|
data: Any = None
|
|
|
|
|
|
@app.get("/health", response_model=Response)
|
|
def health():
|
|
return Response()
|
|
|
|
|
|
@app.post("/device/connect", response_model=Response)
|
|
def connect_device(item: Device):
|
|
try:
|
|
d = u2.connect(item.serial)
|
|
except Exception as e:
|
|
return Response(code=500, msg=str(e))
|
|
print(d.info)
|
|
print(d.device_info)
|
|
devices[item.serial] = d
|
|
return Response(data=d.device_info)
|
|
|
|
|
|
@app.post("/device/size", response_model=Response)
|
|
def get_device_size(item: Device):
|
|
if item.serial not in devices:
|
|
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
|
|
|
|
d = devices[item.serial]
|
|
width, height = d.window_size()
|
|
return Response(data={
|
|
"width": width,
|
|
"heighth": height,
|
|
})
|
|
|
|
|
|
# width, height = d.window_size()
|
|
# print("width:{}, height:{}".format(width, height))
|
|
|
|
# print(d.wlan_ip)
|
|
# print(d.serial)
|
|
|
|
# print(d.app_current())
|
|
|
|
# d.app_start("com.ss.android.ugc.aweme", stop=False)
|
|
#
|
|
|
|
@app.post("/task/app/douyin")
|
|
async def douyin_task(item: Device):
|
|
if item.serial not in devices:
|
|
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
|
|
|
|
d = devices[item.serial]
|
|
d.app_start("com.ss.android.ugc.aweme", stop=False)
|
|
# d.xpath("关闭").wait(timeout=1)
|
|
|
|
width, height = d.window_size()
|
|
random.seed(time.time())
|
|
for i in range(10):
|
|
start_x = random.randint(0, int(width))
|
|
end_x = round(start_x + random.random(), 1)
|
|
|
|
end_y = random.randint(0, int(height / 10))
|
|
start_y = round(end_y * 9.75, 1)
|
|
if start_y - end_y < (height / 3):
|
|
start_y += round((height / 3), 1)
|
|
|
|
# d.swipe_ext("up", scale=0.95, duration=0.1)
|
|
print(f"({start_x}, {start_y}) => ({end_x}, {end_y})")
|
|
d.swipe(start_x, start_y, end_x + random.random(), end_y, duration=0.05 + random.random() * 0.1)
|
|
time.sleep(5 + random.random() * 2)
|
|
return Response()
|
|
|
|
|
|
@app.post("/task/app/douyin/test")
|
|
async def douyin_task(item: Device):
|
|
if item.serial not in devices:
|
|
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
|
|
d = devices[item.serial]
|
|
d.app_start("com.ss.android.ugc.aweme", stop=False)
|
|
tasks.douyin.click_search_icon(d)
|
|
tasks.douyin.input_search_text(d, "老耗")
|
|
d.xpath("老耗游戏").click()
|
|
|
|
|
|
@app.post("/task/app/wechat/search")
|
|
async def wechat_search_task(item: Device):
|
|
if item.serial not in devices:
|
|
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
|
|
d = devices[item.serial]
|
|
tasks.wechat.click_search_btn(d)
|
|
|
|
|
|
class WechatSearchInput(Device):
|
|
text: str
|
|
|
|
|
|
@app.post("/task/app/wechat/search/input")
|
|
async def wechat_search_task(item: WechatSearchInput):
|
|
if item.serial not in devices:
|
|
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
|
|
d = devices[item.serial]
|
|
tasks.wechat.click_search_btn(d)
|
|
tasks.wechat.input_search_text(d, item.text)
|
|
tasks.wechat.click_search_result(d)
|
|
tasks.wechat.get_last_chat_text(d)
|
|
tasks.wechat.input_chat_text(d, time.strftime("[AI] %Y-%m-%d %H:%M:%S", time.localtime()))
|
|
|
|
if __name__ == '__main__':
|
|
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True)
|