REST api support

This commit is contained in:
fbarbu15 2023-11-03 17:01:00 +02:00
parent 7e76bedfc7
commit 69e8be0371
No known key found for this signature in database
GPG Key ID: D75221C8DEA22501
14 changed files with 226 additions and 145 deletions

View File

@ -16,11 +16,20 @@ on:
required: false
type: string
default: "wakuorg/go-waku:latest"
protocol:
description: "Protocol used to comunicate inside the network"
required: true
type: choice
default: "REST"
options:
- "REST"
- "RPC"
env:
FORCE_COLOR: "1"
NODE_1: ${{ inputs.node1 || 'wakuorg/nwaku:deploy-wakuv2-test' }}
NODE_2: ${{ inputs.node2 || 'wakuorg/go-waku:latest' }}
PROTOCOL: ${{ inputs.protocol || 'REST' }}
jobs:

View File

@ -1,8 +1,8 @@
# waku-interop-tests
Waku interop testing between various implementation of the [Waku v2 protocol](https://rfc.vac.dev/spec/10/).
Waku e2e and interop framework used to test various implementation of the [Waku v2 protocol](https://rfc.vac.dev/spec/10/).
## Setup
## Setup and contribute
```shell
git clone git@github.com:waku-org/waku-interop-tests.git
@ -10,9 +10,16 @@ cd waku-interop-tests
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
pre-commit install
(optional) Overwrite default vars from src/env_vars.py via cli env vars or by adding a .env file
pytest
```
## CI
- Test runs via github actions
- [Allure Test Reports](https://waku-org.github.io/waku-interop-tests/3/) are published via github pages
## License
Licensed and distributed under either of

View File

@ -1,5 +1,5 @@
[pytest]
addopts = --instafail --tb=short --color=auto
addopts = -s --instafail --tb=short --color=auto
log_level = DEBUG
log_cli = True
log_file = log/test.log

View File

@ -1,11 +1,13 @@
allure-pytest
black
docker
marshmallow-dataclass
pre-commit
pyright
pytest
pytest-instafail
pytest-xdist
pytest-rerunfailures
python-dotenv
requests
tenacity

View File

@ -1,24 +1,24 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from marshmallow_dataclass import class_schema
from typing import Optional
@dataclass
class KeyPair:
privateKey: str
publicKey: str
@dataclass
class MessageRpcQuery:
payload: str # Hex encoded data string without `0x` prefix.
contentTopic: Optional[str] = None
timestamp: Optional[int] = None # Unix epoch time in nanoseconds as a 64-bit integer value.
payload: str
contentTopic: str
timestamp: Optional[int] = None
@dataclass
class MessageRpcResponse:
payload: str
contentTopic: Optional[str] = None
version: Optional[int] = None
timestamp: Optional[int] = None # Unix epoch time in nanoseconds as a 64-bit integer value.
ephemeral: Optional[bool] = None
contentTopic: str
version: Optional[int]
timestamp: int
ephemeral: Optional[bool]
rateLimitProof: Optional[dict] = field(default_factory=dict)
rate_limit_proof: Optional[dict] = field(default_factory=dict)
message_rpc_response_schema = class_schema(MessageRpcResponse)()

View File

@ -1,17 +1,20 @@
import os
import logging
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
load_dotenv() # This will load environment variables from a .env file if it exists
def get_env_var(var_name, default):
def get_env_var(var_name, default=None):
env_var = os.getenv(var_name, default)
logger.debug(f"{var_name}: {env_var}")
if env_var is not None:
print(f"{var_name}: {env_var}")
else:
print(f"{var_name} is not set; using default value: {default}")
return env_var
# Configuration constants. Need to be upercase to appear in reports
NODE_1 = get_env_var("NODE_1", "wakuorg/nwaku:deploy-wakuv2-test")
NODE_1 = get_env_var("NODE_1", "wakuorg/nwaku:latest")
NODE_2 = get_env_var("NODE_2", "wakuorg/go-waku:latest")
LOG_DIR = get_env_var("LOG_DIR", "./log")
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
@ -19,3 +22,4 @@ SUBNET = get_env_var("SUBNET", "172.18.0.0/16")
IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24")
GATEWAY = get_env_var("GATEWAY", "172.18.0.1")
DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto")
PROTOCOL = get_env_var("PROTOCOL", "REST")

View File

View File

@ -0,0 +1,44 @@
import logging
import requests
from tenacity import retry, stop_after_delay, wait_fixed
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class BaseClient(ABC):
# The retry decorator is applied to handle transient errors gracefully. This is particularly
# useful when running tests in parallel, where occasional network-related errors such as
# connection drops, timeouts, or temporary unavailability of a service can occur. Retrying
# ensures that such intermittent issues don't cause the tests to fail outright.
@retry(stop=stop_after_delay(2), wait=wait_fixed(0.1), reraise=True)
def make_request(self, method, url, headers=None, data=None):
logger.debug("%s call: %s with payload: %s", method.upper(), url, data)
response = requests.request(method.upper(), url, headers=headers, data=data)
try:
response.raise_for_status()
except requests.HTTPError as http_err:
logger.error("HTTP error occurred: %s", http_err)
raise
except Exception as err:
logger.error("An error occurred: %s", err)
raise
else:
logger.info("Response status code: %s", response.status_code)
return response
@abstractmethod
def info(self):
pass
@abstractmethod
def set_subscriptions(self, pubsub_topics):
pass
@abstractmethod
def send_message(self, message, pubsub_topic):
pass
@abstractmethod
def get_messages(self, pubsub_topic):
pass

View File

@ -0,0 +1,31 @@
import logging
import json
from dataclasses import asdict
from urllib.parse import quote
from src.node.api_clients.base_client import BaseClient
logger = logging.getLogger(__name__)
class REST(BaseClient):
def __init__(self, rest_port):
self._rest_port = rest_port
def rest_call(self, method, endpoint, payload=None):
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
headers = {"Content-Type": "application/json"}
return self.make_request(method, url, headers=headers, data=payload)
def info(self):
info_response = self.rest_call("get", "debug/v1/info")
return info_response.json()
def set_subscriptions(self, pubsub_topics):
return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics))
def send_message(self, message, pubsub_topic):
return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(asdict(message)))
def get_messages(self, pubsub_topic):
get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}")
return get_messages_response.json()

View File

@ -0,0 +1,35 @@
import logging
import json
from dataclasses import asdict
from src.node.api_clients.base_client import BaseClient
logger = logging.getLogger(__name__)
class RPC(BaseClient):
def __init__(self, rpc_port, image_name):
self._image_name = image_name
self._rpc_port = rpc_port
def rpc_call(self, endpoint, params=[]):
url = f"http://127.0.0.1:{self._rpc_port}"
headers = {"Content-Type": "application/json"}
payload = {"jsonrpc": "2.0", "method": endpoint, "params": params, "id": 1}
return self.make_request("post", url, headers=headers, data=json.dumps(payload))
def info(self):
info_response = self.rpc_call("get_waku_v2_debug_v1_info", [])
return info_response.json()["result"]
def set_subscriptions(self, pubsub_topics):
if "nwaku" in self._image_name:
return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics])
else:
return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics])
def send_message(self, message, pubsub_topic):
return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, asdict(message)])
def get_messages(self, pubsub_topic):
get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic])
return get_messages_response.json()["result"]

View File

@ -14,13 +14,13 @@ class DockerManager:
def __init__(self, image):
self._image = image
self._client = docker.from_env()
logger.debug(f"Docker client initialized with image {self._image}")
logger.debug("Docker client initialized with image %s", self._image)
def create_network(self, network_name=NETWORK_NAME):
logger.debug(f"Attempting to create or retrieve network '{network_name}'.")
logger.debug("Attempting to create or retrieve network %s", network_name)
networks = self._client.networks.list(names=[network_name])
if networks:
logger.debug(f"Network '{network_name}' already exists.")
logger.debug("Network %s already exists", network_name)
return networks[0]
network = self._client.networks.create(
@ -28,20 +28,25 @@ class DockerManager:
driver="bridge",
ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]),
)
logger.debug(f"Network '{network_name}' created.")
logger.debug("Network %s created", network_name)
return network
def start_container(self, image_name, ports, args, log_path, container_ip):
command = [f"--{key}={value}" for key, value in args.items()]
cli_args = []
for key, value in args.items():
if isinstance(value, list): # Check if value is a list
cli_args.extend([f"--{key}={item}" for item in value]) # Add a command for each item in the list
else:
cli_args.append(f"--{key}={value}") # Add a single command
port_bindings = {f"{port}/tcp": ("", port) for port in ports}
logger.debug(f"Starting container with image '{image_name}'.")
container = self._client.containers.run(image_name, command=command, ports=port_bindings, detach=True, auto_remove=True)
logger.debug("Starting container with image %s", image_name)
logger.debug("Using args %s", cli_args)
container = self._client.containers.run(image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True)
network = self._client.networks.get(NETWORK_NAME)
network.connect(container, ipv4_address=container_ip)
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at '{log_path}'.")
logger.debug("Container started with ID %s. Setting up logs at %s", container.short_id, log_path)
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path))
log_thread.daemon = True
log_thread.start()
@ -54,18 +59,18 @@ class DockerManager:
for chunk in container.logs(stream=True):
log_file.write(chunk)
def generate_ports(self, base_port=None, count=4):
def generate_ports(self, base_port=None, count=5):
if base_port is None:
base_port = random.randint(1024, 65535 - count)
ports = [base_port + i for i in range(count)]
logger.debug(f"Generated ports: {ports}")
logger.debug("Generated ports %s", ports)
return ports
@staticmethod
def generate_random_ext_ip():
base_ip_fragments = ["172", "18"]
ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)])
logger.debug(f"Generated random external IP: {ext_ip}")
logger.debug("Generated random external IP %s", ext_ip)
return ext_ip
def is_container_running(self, container):
@ -73,7 +78,7 @@ class DockerManager:
refreshed_container = self._client.containers.get(container.id)
return refreshed_container.status == "running"
except NotFound:
logger.error(f"Container with ID {container.id} not found.")
logger.error("Container with ID %s not found", container.id)
return False
@property

View File

@ -1,15 +1,11 @@
import os
import logging
from time import time
import requests
import json
from tenacity import retry, stop_after_delay, wait_fixed
from dataclasses import asdict
from src.node.api_clients.rpc import RPC
from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager
from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC
from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL
from src.data_storage import DS
from src.libs.common import bytes_to_hex
logger = logging.getLogger(__name__)
@ -21,29 +17,40 @@ class WakuNode:
self._docker_manager = DockerManager(self._image_name)
self._container = None
self._ext_ip = self._docker_manager.generate_random_ext_ip()
logger.debug(f"WakuNode instance initialized with log path: {self._log_path}")
self._ports = self._docker_manager.generate_ports()
self._rest_port = self._ports[0]
self._rpc_port = self._ports[1]
self._websocket_port = self._ports[2]
logger.debug("WakuNode instance initialized with log path %s", self._log_path)
if PROTOCOL == "RPC":
self._api = RPC(self._rpc_port, self._image_name)
elif PROTOCOL == "REST":
self._api = REST(self._rest_port)
else:
raise ValueError(f"Unknown protocol: {PROTOCOL}")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def start(self, **kwargs):
logger.debug("Starting Node...")
self._docker_manager.create_network()
ports = self._docker_manager.generate_ports()
self._rpc_port = ports[0]
self._websocket_port = ports[2]
default_args = {
"listen-address": "0.0.0.0",
"rpc": "true",
"rpc-admin": "true",
"rest": "true",
"rest-admin": "true",
"websocket-support": "true",
"log-level": "TRACE",
"websocket-port": str(ports[2]),
"rpc-port": str(ports[0]),
"tcp-port": str(ports[1]),
"discv5-udp-port": str(ports[3]),
"websocket-port": str(self._ports[3]),
"rpc-port": self._rpc_port,
"rest-port": self._rest_port,
"tcp-port": str(self._ports[2]),
"discv5-udp-port": str(self._ports[4]),
"rpc-address": "0.0.0.0",
"topic": DEFAULT_PUBSUBTOPIC,
"rest-address": "0.0.0.0",
"nat": f"extip:{self._ext_ip}",
"pubsub-topic": DEFAULT_PUBSUBTOPIC,
}
if "go-waku" in self._docker_manager.image:
@ -58,14 +65,15 @@ class WakuNode:
key = key.replace("_", "-")
default_args[key] = value
logger.debug(f"Starting container with args: {default_args}")
self._container = self._docker_manager.start_container(self._docker_manager.image, ports, default_args, self._log_path, self._ext_ip)
logger.debug(f"Started container from image {self._image_name}. RPC port: {self._rpc_port} and WebSocket port: {self._websocket_port}")
self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip)
logger.debug(
"Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port
)
DS.waku_nodes.append(self)
try:
self.ensure_rpc_ready()
self.ensure_ready()
except Exception as e:
logger.error(f"RPC service did not become ready in time: {e}")
logger.error("%s service did not become ready in time: %s", PROTOCOL, e)
raise
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
@ -75,82 +83,19 @@ class WakuNode:
self._container.stop()
logger.debug("Container stopped.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def rpc_call(self, method, params=[]):
url = f"http://127.0.0.1:{self._rpc_port}"
headers = {"Content-Type": "application/json"}
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
logger.debug("RPC call payload %s", payload)
response = requests.post(url, data=json.dumps(payload), headers=headers)
return response.json()
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.05), reraise=True)
def ensure_rpc_ready(self):
def ensure_ready(self):
self.info()
logger.debug("RPC service is ready.")
def info(self):
return self.rpc_call("get_waku_v2_debug_v1_info", [])
return self._api.info()
def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUBTOPIC]):
return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics])
return self._api.set_subscriptions(pubsub_topics)
def send_message(self, message, pubsub_topic=DEFAULT_PUBSUBTOPIC):
if message.timestamp is None:
message.timestamp = int(time() * 1e9)
return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, asdict(message)])
return self._api.send_message(message, pubsub_topic)
def get_messages(self, pubsub_topic=DEFAULT_PUBSUBTOPIC):
return self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic])
def get_asymmetric_key_pair(self):
response = self.rpc_call("get_waku_v2_private_v1_asymmetric_keypair", [])
seckey = response.get("seckey")
pubkey = response.get("pubkey")
private_key = response.get("privateKey")
public_key = response.get("publicKey")
if seckey:
return {"privateKey": seckey, "publicKey": pubkey}
else:
return {"privateKey": private_key, "publicKey": public_key}
def post_asymmetric_message(self, message, public_key, pubsub_topic=None):
if not message.payload:
raise Exception("Attempting to send an empty message")
return self.rpc_call(
"post_waku_v2_private_v1_asymmetric_message",
[pubsub_topic or DEFAULT_PUBSUBTOPIC, message, "0x" + bytes_to_hex(public_key)],
)
def get_asymmetric_messages(self, private_key, pubsub_topic=None):
return self.rpc_call(
"get_waku_v2_private_v1_asymmetric_messages",
[pubsub_topic or DEFAULT_PUBSUBTOPIC, "0x" + bytes_to_hex(private_key)],
)
def get_symmetric_key(self):
return bytes.fromhex(self.rpc_call("get_waku_v2_private_v1_symmetric_key", []))
def post_symmetric_message(self, message, sym_key, pubsub_topic=None):
if not message.payload:
raise Exception("Attempting to send an empty message")
return self.rpc_call(
"post_waku_v2_private_v1_symmetric_message",
[pubsub_topic or DEFAULT_PUBSUBTOPIC, message, "0x" + bytes_to_hex(sym_key)],
)
def get_symmetric_messages(self, sym_key, pubsub_topic=None):
return self.rpc_call(
"get_waku_v2_private_v1_symmetric_messages",
[pubsub_topic or DEFAULT_PUBSUBTOPIC, "0x" + bytes_to_hex(sym_key)],
)
def get_peer_id(self):
# not implemented
peer_id = ""
return peer_id
def get_multiaddr_with_id(self):
peer_id = self.get_peer_id()
multiaddr_with_id = f"/ip4/127.0.0.1/tcp/{self._websocket_port}/ws/p2p/{peer_id}"
return multiaddr_with_id
return self._api.get_messages(pubsub_topic)

View File

@ -1,7 +1,8 @@
import logging
from time import sleep
from time import sleep, time
import pytest
import allure
from src.data_classes import message_rpc_response_schema
from src.env_vars import NODE_1, NODE_2
from src.node.waku_node import WakuNode
from tenacity import retry, stop_after_delay, wait_fixed
@ -14,21 +15,24 @@ class StepsRelay:
def setup_nodes(self, request):
self.node1 = WakuNode(NODE_1, request.cls.test_id)
self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true")
enr_uri = self.node1.info()["result"]["enrUri"]
enr_uri = self.node1.info()["enrUri"]
self.node2 = WakuNode(NODE_2, request.cls.test_id)
self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=enr_uri, peer_exchange="true")
self.node1.set_subscriptions()
self.node2.set_subscriptions()
self.default_content_topic = "/test/1/waku-relay"
self.default_payload = "Relay works!!"
self.test_pubsub_topic = "test"
self.test_content_topic = "/test/1/waku-relay"
self.test_payload = "Relay works!!"
self.node1.set_subscriptions([self.test_pubsub_topic])
self.node2.set_subscriptions([self.test_pubsub_topic])
@allure.step
@retry(stop=stop_after_delay(2), wait=wait_fixed(0.2), reraise=True)
@retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True)
def check_published_message_reaches_peer(self, message):
self.node1.send_message(message)
message.timestamp = int(time() * 1e9)
self.node1.send_message(message, self.test_pubsub_topic)
sleep(0.1)
get_messages_response = self.node2.get_messages()
get_messages_response = self.node2.get_messages(self.test_pubsub_topic)
logger.debug("Got reponse from remote peer %s", get_messages_response)
assert get_messages_response["result"][0]["payload"] == message.payload
assert get_messages_response["result"][0]["contentTopic"] == message.contentTopic
assert get_messages_response["result"][0]["timestamp"] == message.timestamp
received_message = message_rpc_response_schema.load(get_messages_response[0])
assert received_message.payload == message.payload
assert received_message.contentTopic == message.contentTopic
assert received_message.timestamp == message.timestamp

View File

@ -1,5 +1,4 @@
import logging
from time import sleep
from src.libs.common import to_base64
from src.data_classes import MessageRpcQuery
@ -14,11 +13,11 @@ class TestRelayPublish(StepsRelay):
failed_payloads = []
for payload in SAMPLE_INPUTS:
logger.debug("Running test with payload %s", payload["description"])
message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.default_content_topic)
message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.test_content_topic)
try:
self.check_published_message_reaches_peer(message)
except Exception as e:
logger.error(f"Payload '{payload['description']}' failed: {str(e)}")
logger.error("Payload %s failed: %s", {payload["description"]}, {str(e)})
failed_payloads.append(payload)
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
@ -26,14 +25,10 @@ class TestRelayPublish(StepsRelay):
failed_content_topics = []
for content_topic in SAMPLE_INPUTS:
logger.debug("Running test with content topic %s", content_topic["description"])
message = MessageRpcQuery(payload=to_base64(self.default_payload), contentTopic=content_topic["value"])
message = MessageRpcQuery(payload=to_base64(self.test_payload), contentTopic=content_topic["value"])
try:
self.check_published_message_reaches_peer(message)
except Exception as e:
logger.error(f"ContentTopic '{content_topic['description']}' failed: {str(e)}")
logger.error("ContentTopic %s failed: %s", {content_topic["description"]}, {str(e)})
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
def test_fail_for_report_puposes(self):
message = MessageRpcQuery(payload="", contentTopic="")
self.check_published_message_reaches_peer(message)