Skip to content

API接入

import json
import threading
import time
import requests

'''
首先将外网IP加入白名单
'''

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding": "gzip, deflate, br"
}

# (http)业务逻辑 
def execute_http(proxy_addr):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'http://{}'.format(proxy_addr),
        'https': 'http://{}'.format(proxy_addr),
    }
    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

# (socks5) 业务逻辑 
def execute_socks5(proxy_addr):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'socks5h://{}'.format(proxy_addr),
        'https': 'socks5h://{}'.format(proxy_addr),
    }
    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

class ThreadFactory(threading.Thread):
    def __init__(self, proxy_addr,proxy_type):
        threading.Thread.__init__(self)
        self.proxy_addr = proxy_addr
        self.proxy_type = proxy_type
    def run(self):
        if self.proxy_type == "socks5":
            execute_socks5(self.proxy_addr)
        else:
            execute_http(self.proxy_addr)

def test_http():

    # 一次提5个 (text格式)
    get_proxy_url = 'http://list.rola.info:8088/user_get_ip_list?token=TOKEN&qty=5&country=&state=&city=&time=5&format=txt&protocol=http&filter=1'

    while True:
        try:
            resp = requests.get(url=get_proxy_url, timeout=5)
            if resp.status_code != 200:
                print("提取IP失败")
                time.sleep(1)
                continue
            ips = resp.text.split("\r\n")
            threads = []
            for ip in ips:
                threads.append(ThreadFactory(ip,"http"))
            for t in threads:  
                t.start()
                time.sleep(0.01)
            for t in threads:  
                t.join()

            # break
            time.sleep(1)
        except Exception as e:
            print(e)

def test_socks5():
     # 一次提5个 (text格式)
    get_proxy_url = 'http://list.rola.info:8088/user_get_ip_list?token=TOKEN&qty=5&country=&state=&city=&time=5&format=txt&protocol=http&filter=1'

    while True:
        try:
            resp = requests.get(url=get_proxy_url, timeout=5)
            if resp.status_code != 200:
                print("提取IP失败")
                time.sleep(1)
                continue
            ips = resp.text.split("\r\n")
            threads = []
            for ip in ips:
                threads.append(ThreadFactory(ip,"socks5"))
            for t in threads:  
                t.start()
                time.sleep(0.01)
            for t in threads:  
                t.join()

            # break
            time.sleep(1)
        except Exception as e:
            print(e)

if __name__ == "__main__":
    # 测试http
    #test_http()

    # 测试socks5
    test_socks5()

账密接入

import json
import threading
import time
import requests

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding": "gzip, deflate, br"
}

# 业务逻辑 (http)
def execute_http(user_name,password):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'http://{}:{}@gate.rola.info:1000'.format(user_name,password),
        'https':'http://{}:{}@gate.rola.info:1000'.format(user_name,password),
    }
    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

# 业务逻辑 (socks5)
def execute_socks5(user_name,password):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'socks5h://{}:{}@gate.rola.info:2000'.format(user_name,password),
        'https':'socks5h://{}:{}@gate.rola.info:2000'.format(user_name,password),
    }

    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

class ThreadFactory(threading.Thread):
    def __init__(self, user_name,password,proxy_type):
        threading.Thread.__init__(self)
        self.user_name = user_name
        self.password = password
        self.proxy_type = proxy_type
    def run(self):
        if self.proxy_type == "socks5":
            execute_socks5(self.user_name,self.password)
        else:
            execute_http(self.user_name,self.password)

def test_http():
  # 代理用户名
    user_name = "USERNAME"
    # 代理密码
    passwrod = "PASSWORD"
    # 国家代码
    country="us"
    while True:
        try:
            threads = [] 
            # 生成一个代理,格式:USERNAME-country-COUNTRYCODE-sid-SESSION
            proxy_user = "{}-country-{}-sid-{}".format(user_name,country,time.time() * 1000000)

            threads.append(ThreadFactory(proxy_user,passwrod,"http"))    
            for t in threads:  # 开启线程
                t.start()
                time.sleep(0.01)
            for t in threads:  # 阻塞线程
                t.join()
            # break
            time.sleep(1)
        except Exception as e:
            print(e)

def test_socks5():
    # 代理用户名
    user_name = "USERNAME"
    # 代理密码
    passwrod = "PASSWORD"
    # 国家代码
    country="us"

    while True:
        try:
            threads = [] 
            # 生成一个代理,格式:USERNAME-country-COUNTRYCODE-sid-SESSION   其中SESSION 为随机数
            proxy_user = "{}-country-{}-sid-{}".format(user_name,country,int(time.time() * 1000000))
            threads.append(ThreadFactory(proxy_user,passwrod,"socks5"))    
            for t in threads: 
                t.start()
                time.sleep(0.01)
            for t in threads:  
                t.join()
            # break
            time.sleep(1)
        except Exception as e:
            print(e)

if __name__ == "__main__":
    # 测试http
    test_http()

    # 测试socks5
    #test_socks5()

专业的IP代理服务平台