Skip to content

API Integration

import json
import threading
import time
import requests

'''
First add your public IP to the whitelist
'''

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding": "gzip, deflate, br"
}

# Business logic (HTTP) 
def execute_http(proxy_addr):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'http://{}'.format(proxy_addr),
        'https': 'http://{}'.format(proxy_addr),
    }
    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

# Business logic (SOCKS5) 
def execute_socks5(proxy_addr):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'socks5h://{}'.format(proxy_addr),
        'https': 'socks5h://{}'.format(proxy_addr),
    }
    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

class ThreadFactory(threading.Thread):
    def __init__(self, proxy_addr,proxy_type):
        threading.Thread.__init__(self)
        self.proxy_addr = proxy_addr
        self.proxy_type = proxy_type
    def run(self):
        if self.proxy_type == "socks5":
            execute_socks5(self.proxy_addr)
        else:
            execute_http(self.proxy_addr)

def test_http():

    # Extract 5 at a time (text format)
    get_proxy_url = 'http://list.rola.info:8088/user_get_ip_list?token=TOKEN&qty=5&country=&state=&city=&time=5&format=txt&protocol=http&filter=1'

    while True:
        try:
            resp = requests.get(url=get_proxy_url, timeout=5)
            if resp.status_code != 200:
                print("Failed to extract IP")
                time.sleep(1)
                continue
            ips = resp.text.split("\r\n")
            threads = []
            for ip in ips:
                threads.append(ThreadFactory(ip,"http"))
            for t in threads:  
                t.start()
                time.sleep(0.01)
            for t in threads:  
                t.join()

            # break
            time.sleep(1)
        except Exception as e:
            print(e)

def test_socks5():
    # Extract 5 at a time (text format)
    get_proxy_url = 'http://list.rola.info:8088/user_get_ip_list?token=TOKEN&qty=5&country=&state=&city=&time=5&format=txt&protocol=http&filter=1'

    while True:
        try:
            resp = requests.get(url=get_proxy_url, timeout=5)
            if resp.status_code != 200:
                print("Failed to extract IP")
                time.sleep(1)
                continue
            ips = resp.text.split("\r\n")
            threads = []
            for ip in ips:
                threads.append(ThreadFactory(ip,"socks5"))
            for t in threads:  
                t.start()
                time.sleep(0.01)
            for t in threads:  
                t.join()

            # break
            time.sleep(1)
        except Exception as e:
            print(e)

if __name__ == "__main__":
    # Test HTTP
    #test_http()

    # Test SOCKS5
    test_socks5()

Account/Password Integration

import json
import threading
import time
import requests

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding": "gzip, deflate, br"
}

# Business logic (HTTP)
def execute_http(user_name,password):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'http://{}:{}@gate.rola.info:1000'.format(user_name,password),
        'https':'http://{}:{}@gate.rola.info:1000'.format(user_name,password),
    }
    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

# Business logic (SOCKS5)
def execute_socks5(user_name,password):
    targetUrl = 'http://ip234.in/ip.json'

    proxies = {
        'http': 'socks5h://{}:{}@gate.rola.info:2000'.format(user_name,password),
        'https':'socks5h://{}:{}@gate.rola.info:2000'.format(user_name,password),
    }

    res = ""

    while True:
        try:
            res = requests.get(targetUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return

class ThreadFactory(threading.Thread):
    def __init__(self, user_name,password,proxy_type):
        threading.Thread.__init__(self)
        self.user_name = user_name
        self.password = password
        self.proxy_type = proxy_type
    def run(self):
        if self.proxy_type == "socks5":
            execute_socks5(self.user_name,self.password)
        else:
            execute_http(self.user_name,self.password)

def test_http():
    # Proxy username
    user_name = "USERNAME"
    # Proxy password
    passwrod = "PASSWORD"
    # Country code
    country="us"
    while True:
        try:
            threads = [] 
            # Generate a proxy, format: USERNAME-country-COUNTRYCODE-sid-SESSION
            proxy_user = "{}-country-{}-sid-{}".format(user_name,country,time.time() * 1000000)

            threads.append(ThreadFactory(proxy_user,passwrod,"http"))    
            for t in threads:  # Start threads
                t.start()
                time.sleep(0.01)
            for t in threads:  # Block threads
                t.join()
            # break
            time.sleep(1)
        except Exception as e:
            print(e)

def test_socks5():
    # Proxy username
    user_name = "USERNAME"
    # Proxy password
    passwrod = "PASSWORD"
    # Country code
    country="us"

    while True:
        try:
            threads = [] 
            # Generate a proxy, format: USERNAME-country-COUNTRYCODE-sid-SESSION, where SESSION is a random number
            proxy_user = "{}-country-{}-sid-{}".format(user_name,country,int(time.time() * 1000000))
            threads.append(ThreadFactory(proxy_user,passwrod,"socks5"))    
            for t in threads: 
                t.start()
                time.sleep(0.01)
            for t in threads:  
                t.join()
            # break
            time.sleep(1)
        except Exception as e:
            print(e)

if __name__ == "__main__":
    # Test HTTP
    test_http()

    # Test SOCKS5
    #test_socks5()

Professional IP Proxy Service Platform