结构调整

This commit is contained in:
shikong 2025-02-01 14:41:23 +08:00
parent 88f396a1c7
commit f2e6c5ec37
Signed by: Shikong
GPG Key ID: BD85FF18B373C341
7 changed files with 114 additions and 96 deletions

0
api/__init__.py Normal file
View File

15
api/base.py Normal file
View File

@ -0,0 +1,15 @@
from pydantic import BaseModel
from typing import Any, Union
devices = {
}
class Device(BaseModel):
serial: str
class Response(BaseModel):
code: int = 200
msg: str = "OK"
data: Any = None

46
api/douyin.py Normal file
View File

@ -0,0 +1,46 @@
import tasks.wechat
import time
import random
from .base import *
from fastapi import APIRouter
router = APIRouter()
@router.post("/douyin")
async def douyin_task(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
d.app_start("com.ss.android.ugc.aweme", stop=False)
# d.xpath("关闭").wait(timeout=1)
width, height = d.window_size()
random.seed(time.time())
for i in range(10):
start_x = random.randint(0, int(width))
end_x = round(start_x + random.random(), 1)
end_y = random.randint(0, int(height / 10))
start_y = round(end_y * 9.75, 1)
if start_y - end_y < (height / 3):
start_y += round((height / 3), 1)
# d.swipe_ext("up", scale=0.95, duration=0.1)
print(f"({start_x}, {start_y}) => ({end_x}, {end_y})")
d.swipe(start_x, start_y, end_x + random.random(), end_y, duration=0.05 + random.random() * 0.1)
time.sleep(5 + random.random() * 2)
return Response()
@router.post("/test")
async def douyin_task(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
d.app_start("com.ss.android.ugc.aweme", stop=False)
tasks.douyin.click_search_icon(d)
tasks.douyin.input_search_text(d, "老耗")
d.xpath("老耗游戏").click()

39
api/wechat.py Normal file
View File

@ -0,0 +1,39 @@
import tasks.wechat
import time
from .base import *
from fastapi import APIRouter
router = APIRouter()
@router.post("/search")
async def wechat_search_task(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
tasks.wechat.click_search_btn(d)
class WechatSearchInput(Device):
search_keyword: str
input_text: str
@router.post("/search/input")
async def wechat_search_task(item: WechatSearchInput):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
tasks.wechat.click_search_btn(d)
tasks.wechat.input_search_text(d, item.search_keyword)
tasks.wechat.click_search_result(d)
tasks.wechat.get_last_chat_text(d)
# tasks.wechat.input_chat_text(d, time.strftime("[AI] %Y-%m-%d %H:%M:%S", time.localtime()))
tasks.wechat.input_chat_text(d, item.input_text)
@router.post("/launch")
def wechat_launch(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
d.app_start("com.tencent.mm")

10
app.py Normal file
View File

@ -0,0 +1,10 @@
from fastapi import FastAPI
app = FastAPI()
import api.wechat
import api.douyin
app.include_router(api.wechat.router, prefix="/task/app/wechat", tags=["WeChat"])
app.include_router(api.douyin.router, prefix="/task/app/douyin", tags=["DouYin"])

98
main.py
View File

@ -1,30 +1,8 @@
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Any, Union
import uiautomator2 as u2
import time
import random
import tasks.douyin
import tasks.wechat
app = FastAPI()
devices = {
}
class Device(BaseModel):
serial: str
class Response(BaseModel):
code: int = 200
msg: str = "OK"
data: Any = None
from app import *
from api.base import *
@app.get("/health", response_model=Response)
def health():
@ -56,77 +34,7 @@ def get_device_size(item: Device):
})
# width, height = d.window_size()
# print("width:{}, height:{}".format(width, height))
# print(d.wlan_ip)
# print(d.serial)
# print(d.app_current())
# d.app_start("com.ss.android.ugc.aweme", stop=False)
#
@app.post("/task/app/douyin")
async def douyin_task(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
d.app_start("com.ss.android.ugc.aweme", stop=False)
# d.xpath("关闭").wait(timeout=1)
width, height = d.window_size()
random.seed(time.time())
for i in range(10):
start_x = random.randint(0, int(width))
end_x = round(start_x + random.random(), 1)
end_y = random.randint(0, int(height / 10))
start_y = round(end_y * 9.75, 1)
if start_y - end_y < (height / 3):
start_y += round((height / 3), 1)
# d.swipe_ext("up", scale=0.95, duration=0.1)
print(f"({start_x}, {start_y}) => ({end_x}, {end_y})")
d.swipe(start_x, start_y, end_x + random.random(), end_y, duration=0.05 + random.random() * 0.1)
time.sleep(5 + random.random() * 2)
return Response()
@app.post("/task/app/douyin/test")
async def douyin_task(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
d.app_start("com.ss.android.ugc.aweme", stop=False)
tasks.douyin.click_search_icon(d)
tasks.douyin.input_search_text(d, "老耗")
d.xpath("老耗游戏").click()
@app.post("/task/app/wechat/search")
async def wechat_search_task(item: Device):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
tasks.wechat.click_search_btn(d)
class WechatSearchInput(Device):
text: str
@app.post("/task/app/wechat/search/input")
async def wechat_search_task(item: WechatSearchInput):
if item.serial not in devices:
return Response(code=500, msg="设备 {} 不存在".format(item.serial))
d = devices[item.serial]
tasks.wechat.click_search_btn(d)
tasks.wechat.input_search_text(d, item.text)
tasks.wechat.click_search_result(d)
tasks.wechat.get_last_chat_text(d)
tasks.wechat.input_chat_text(d, time.strftime("[AI] %Y-%m-%d %H:%M:%S", time.localtime()))
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True)

View File

@ -12,7 +12,7 @@ def click_search_btn(d: Device):
def input_search_text(d: Device, text: str):
clear_btn = d.xpath('//*[@resource-id="com.tencent.mm:id/nhn"]')
clear_btn = d.xpath('//*[@resource-id="com.tencent.mm:id/mdd"]/android.widget.RelativeLayout[1]')
if clear_btn.exists:
clear_btn.click()