import random import time import execjs import pymysql import requests url = { "sequence": "https://api2.jiakaobaodian.com/api/open/exercise/sequence.htm", "question": "https://api2.jiakaobaodian.com/api/open/question/question-list.htm", } cityCode = 440500 TABLE = "jiaKaoBaoDian" def calc_r(): ctx = execjs.compile(""" s = function (t) { var a, i, o = Math.abs(parseInt((new Date).getTime() * Math.random() * 1e4)).toString(), n = 0; for (a = 0; a < o.length; a++) n += parseInt(o[a]); return i = function(t) { return function(a, i) { return 0 >= i - "" + a.length ? a : (t[i] || (t[i] = Array(i + 1).join(0))) + a } }([]), n += o.length, n = i(n, 3 - n.toString().length), t.toString() + o + n } function get_r() { return s(1) } """) return ctx.call("get_r") def get_sequence(): _r = calc_r() resp = requests.get(url["sequence"], params={ "_r": _r, "carStyle": "xiaoche", "carType": "car", "cityCode": cityCode, "course": "kemu1", "kemuStyyle": "kemu1", "_": 0.08272777821960653 }) if resp.status_code != 200: print(resp.text) raise BaseException("获取 题库 信息失败") question_list = [] data = resp.json()["data"] question_list = question_list + data print("获取 题库信息 共计 %d 道题" % len(question_list)) return question_list def get_question(questions): retry_time = 5 for i in range(retry_time): try: _r = calc_r() params = { "_r": _r, "carType": "car", "course": "kemu1", "_": 0.06614633144515003, "questionIds": str(questions).lstrip("[").rstrip("]").replace(" ", "") } resp = requests.get(url["question"], params) if resp.status_code != 200: print(resp.text) raise BaseException("获取 题目 信息失败") data = resp.json()["data"] print("获取 %s 道题目 信息成功" % len(data)) return data except BaseException as e: if i == (retry_time - 1): print("%s 重试 %d 次" % (questions, (i + 1))) raise e else: continue if __name__ == '__main__': db = pymysql.connect(host="10.10.10.200", port=3306, user="root", password="12341234", database="car") cursor = db.cursor() try: _question_list = get_sequence() _question_num_of_part = 20 _counter = 0 _throttling = 10 while len(_question_list) != 0: _part_questions = _question_list[:_question_num_of_part] _question_list = _question_list[_question_num_of_part:] _data = get_question(_part_questions) _counter += 1 if _counter == _throttling: time.sleep(5 + (random.random() * 10)) for _datum in _data: sql = "INSERT INTO %s (`answer`,`id`,`mediaContent`,`question`,`questionId`,`optionA`,`optionB`,`optionC`,`optionD`,`optionE`,`optionF`,`optionG`,`optionH`,`keywords`,`illiteracyExplain`,`conciseExplain`,`explain`,`knack`,`wrongRate`)" % TABLE sql += "VALUES (%d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', " \ "'%s', '%s', '%s', '%s', '%s', %f);" % ( _datum["answer"], _datum["id"], _datum["mediaContent"], _datum["question"], _datum["questionId"], _datum["optionA"], _datum["optionB"], _datum["optionC"], _datum["optionD"], _datum["optionE"], _datum["optionF"], _datum["optionG"], _datum["optionH"], _datum["keywords"], _datum["illiteracyExplain"], _datum["conciseExplain"], _datum["explain"], _datum["knackDetail"], _datum["wrongRate"], ) try: # 执行sql语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except Exception as e: continue finally: cursor.close() db.close()