diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/base.py b/api/base.py new file mode 100644 index 0000000..0418cd3 --- /dev/null +++ b/api/base.py @@ -0,0 +1,15 @@ +from pydantic import BaseModel +from typing import Any, Union + +devices = { + +} + +class Device(BaseModel): + serial: str + + +class Response(BaseModel): + code: int = 200 + msg: str = "OK" + data: Any = None \ No newline at end of file diff --git a/api/douyin.py b/api/douyin.py new file mode 100644 index 0000000..60ef1ea --- /dev/null +++ b/api/douyin.py @@ -0,0 +1,46 @@ +import tasks.wechat +import time +import random +from .base import * +from fastapi import APIRouter + +router = APIRouter() + + +@router.post("/douyin") +async def douyin_task(item: Device): + if item.serial not in devices: + return Response(code=500, msg="设备 {} 不存在".format(item.serial)) + + d = devices[item.serial] + d.app_start("com.ss.android.ugc.aweme", stop=False) + # d.xpath("关闭").wait(timeout=1) + + width, height = d.window_size() + random.seed(time.time()) + for i in range(10): + start_x = random.randint(0, int(width)) + end_x = round(start_x + random.random(), 1) + + end_y = random.randint(0, int(height / 10)) + start_y = round(end_y * 9.75, 1) + if start_y - end_y < (height / 3): + start_y += round((height / 3), 1) + + # d.swipe_ext("up", scale=0.95, duration=0.1) + print(f"({start_x}, {start_y}) => ({end_x}, {end_y})") + d.swipe(start_x, start_y, end_x + random.random(), end_y, duration=0.05 + random.random() * 0.1) + time.sleep(5 + random.random() * 2) + return Response() + + +@router.post("/test") +async def douyin_task(item: Device): + if item.serial not in devices: + return Response(code=500, msg="设备 {} 不存在".format(item.serial)) + d = devices[item.serial] + d.app_start("com.ss.android.ugc.aweme", stop=False) + tasks.douyin.click_search_icon(d) + tasks.douyin.input_search_text(d, "老耗") + d.xpath("老耗游戏").click() + diff --git a/api/wechat.py b/api/wechat.py new file mode 100644 index 0000000..709234d --- /dev/null +++ b/api/wechat.py @@ -0,0 +1,39 @@ +import tasks.wechat +import time +from .base import * +from fastapi import APIRouter + +router = APIRouter() + +@router.post("/search") +async def wechat_search_task(item: Device): + if item.serial not in devices: + return Response(code=500, msg="设备 {} 不存在".format(item.serial)) + d = devices[item.serial] + tasks.wechat.click_search_btn(d) + + +class WechatSearchInput(Device): + search_keyword: str + input_text: str + + +@router.post("/search/input") +async def wechat_search_task(item: WechatSearchInput): + if item.serial not in devices: + return Response(code=500, msg="设备 {} 不存在".format(item.serial)) + d = devices[item.serial] + tasks.wechat.click_search_btn(d) + tasks.wechat.input_search_text(d, item.search_keyword) + tasks.wechat.click_search_result(d) + tasks.wechat.get_last_chat_text(d) + # tasks.wechat.input_chat_text(d, time.strftime("[AI] %Y-%m-%d %H:%M:%S", time.localtime())) + tasks.wechat.input_chat_text(d, item.input_text) + +@router.post("/launch") +def wechat_launch(item: Device): + if item.serial not in devices: + return Response(code=500, msg="设备 {} 不存在".format(item.serial)) + d = devices[item.serial] + + d.app_start("com.tencent.mm") \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..1d44072 --- /dev/null +++ b/app.py @@ -0,0 +1,10 @@ +from fastapi import FastAPI + +app = FastAPI() + + +import api.wechat +import api.douyin + +app.include_router(api.wechat.router, prefix="/task/app/wechat", tags=["WeChat"]) +app.include_router(api.douyin.router, prefix="/task/app/douyin", tags=["DouYin"]) \ No newline at end of file diff --git a/main.py b/main.py index 29f71c9..880c959 100644 --- a/main.py +++ b/main.py @@ -1,30 +1,8 @@ import uvicorn -from fastapi import FastAPI -from pydantic import BaseModel -from typing import Any, Union + import uiautomator2 as u2 -import time -import random - -import tasks.douyin -import tasks.wechat - -app = FastAPI() - -devices = { - -} - - -class Device(BaseModel): - serial: str - - -class Response(BaseModel): - code: int = 200 - msg: str = "OK" - data: Any = None - +from app import * +from api.base import * @app.get("/health", response_model=Response) def health(): @@ -56,77 +34,7 @@ def get_device_size(item: Device): }) -# width, height = d.window_size() -# print("width:{}, height:{}".format(width, height)) -# print(d.wlan_ip) -# print(d.serial) - -# print(d.app_current()) - -# d.app_start("com.ss.android.ugc.aweme", stop=False) -# - -@app.post("/task/app/douyin") -async def douyin_task(item: Device): - if item.serial not in devices: - return Response(code=500, msg="设备 {} 不存在".format(item.serial)) - - d = devices[item.serial] - d.app_start("com.ss.android.ugc.aweme", stop=False) - # d.xpath("关闭").wait(timeout=1) - - width, height = d.window_size() - random.seed(time.time()) - for i in range(10): - start_x = random.randint(0, int(width)) - end_x = round(start_x + random.random(), 1) - - end_y = random.randint(0, int(height / 10)) - start_y = round(end_y * 9.75, 1) - if start_y - end_y < (height / 3): - start_y += round((height / 3), 1) - - # d.swipe_ext("up", scale=0.95, duration=0.1) - print(f"({start_x}, {start_y}) => ({end_x}, {end_y})") - d.swipe(start_x, start_y, end_x + random.random(), end_y, duration=0.05 + random.random() * 0.1) - time.sleep(5 + random.random() * 2) - return Response() - - -@app.post("/task/app/douyin/test") -async def douyin_task(item: Device): - if item.serial not in devices: - return Response(code=500, msg="设备 {} 不存在".format(item.serial)) - d = devices[item.serial] - d.app_start("com.ss.android.ugc.aweme", stop=False) - tasks.douyin.click_search_icon(d) - tasks.douyin.input_search_text(d, "老耗") - d.xpath("老耗游戏").click() - - -@app.post("/task/app/wechat/search") -async def wechat_search_task(item: Device): - if item.serial not in devices: - return Response(code=500, msg="设备 {} 不存在".format(item.serial)) - d = devices[item.serial] - tasks.wechat.click_search_btn(d) - - -class WechatSearchInput(Device): - text: str - - -@app.post("/task/app/wechat/search/input") -async def wechat_search_task(item: WechatSearchInput): - if item.serial not in devices: - return Response(code=500, msg="设备 {} 不存在".format(item.serial)) - d = devices[item.serial] - tasks.wechat.click_search_btn(d) - tasks.wechat.input_search_text(d, item.text) - tasks.wechat.click_search_result(d) - tasks.wechat.get_last_chat_text(d) - tasks.wechat.input_chat_text(d, time.strftime("[AI] %Y-%m-%d %H:%M:%S", time.localtime())) if __name__ == '__main__': uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True) diff --git a/tasks/wechat/__init__.py b/tasks/wechat/__init__.py index 28ea180..1465467 100644 --- a/tasks/wechat/__init__.py +++ b/tasks/wechat/__init__.py @@ -12,7 +12,7 @@ def click_search_btn(d: Device): def input_search_text(d: Device, text: str): - clear_btn = d.xpath('//*[@resource-id="com.tencent.mm:id/nhn"]') + clear_btn = d.xpath('//*[@resource-id="com.tencent.mm:id/mdd"]/android.widget.RelativeLayout[1]') if clear_btn.exists: clear_btn.click()