Test/store part 1 (#31)

* first set of tests

* fix ci

* new tests

* finish publish tests

* new tests

* running nodes tests

* new tests

* finishing touches

* new test

* temp commit

* running node tests

* new tests

* new store tests

* store part 1
This commit is contained in:
fbarbu15 2024-04-17 08:37:31 +03:00 committed by GitHub
parent 53417a8665
commit 8d956b3d9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 486 additions and 5 deletions

111
scripts/store_v3.sh Executable file
View File

@ -0,0 +1,111 @@
#!/bin/bash
printf "\nAssuming you already have a docker network called waku\n"
# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku
cluster_id=0
pubsub_topic="/waku/2/rs/$cluster_id/0"
encoded_pubsub_topic=$(echo "$pubsub_topic" | sed 's:/:%2F:g')
content_topic="/test/1/store/proto"
encoded_content_topic=$(echo "$content_topic" | sed 's:/:%2F:g')
node_1=harbor.status.im/wakuorg/nwaku:latest
node_1_ip=172.18.64.13
node_1_rest=32261
node_1_tcp=32262
node_2=harbor.status.im/wakuorg/nwaku:latest
node_2_ip=172.18.64.14
node_2_rest=4588
printf "\nStarting containers\n"
container_id1=$(docker run -d -i -t -p $node_1_rest:$node_1_rest -p $node_1_tcp:$node_1_tcp -p 32263:32263 -p 32264:32264 -p 32265:32265 $node_1 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=32263 --rest-port=$node_1_rest --tcp-port=$node_1_tcp --discv5-udp-port=32264 --rest-address=0.0.0.0 --nat=extip:$node_1_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=32265 --metrics-logging=true --store=false --relay=true)
docker network connect --ip $node_1_ip waku $container_id1
printf "\nSleeping 2 seconds\n"
sleep 2
response=$(curl -X GET "http://127.0.0.1:$node_1_rest/debug/v1/info" -H "accept: application/json")
enrUri=$(echo $response | jq -r '.enrUri')
# Extract the first non-WebSocket address
ws_address=$(echo $response | jq -r '.listenAddresses[] | select(contains("/ws") | not)')
# Check if we got an address, and construct the new address with it
if [[ $ws_address != "" ]]; then
identifier=$(echo $ws_address | awk -F'/p2p/' '{print $2}')
if [[ $identifier != "" ]]; then
multiaddr_with_id="/ip4/${node_1_ip}/tcp/${node_1_tcp}/p2p/${identifier}"
echo $multiaddr_with_id
else
echo "No identifier found in the address."
exit 1
fi
else
echo "No non-WebSocket address found."
exit 1
fi
container_id2=$(docker run -d -i -t -p $node_2_rest:$node_2_rest -p 4589:4589 -p 4590:4590 -p 4591:4591 -p 4592:4592 $node_2 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=4590 --rest-port=$node_2_rest --tcp-port=4589 --discv5-udp-port=4591 --rest-address=0.0.0.0 --nat=extip:$node_2_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=4592 --metrics-logging=true --discv5-bootstrap-node=$enrUri --storenode=$multiaddr_with_id --store=true --relay=true)
docker network connect --ip $node_2_ip waku $container_id2
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nConnect peers\n"
curl -X POST "http://127.0.0.1:$node_2_rest/admin/v1/peers" -H "Content-Type: application/json" -d "[\"$multiaddr_with_id\"]"
printf "\nSubscribe\n"
curl -X POST "http://127.0.0.1:$node_1_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]"
curl -X POST "http://127.0.0.1:$node_2_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]"
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nRelay from NODE 1\n"
curl -X POST "http://127.0.0.1:$node_1_rest/relay/v1/messages/$encoded_pubsub_topic" \
-H "Content-Type: application/json" \
-d '{"payload": "UmVsYXkgd29ya3MhIQ==", "contentTopic": "'"$content_topic"'", "timestamp": '$(date +%s%N)'}'
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nCheck message in NODE 2\n"
response=$(curl -X GET "http://127.0.0.1:$node_2_rest/relay/v1/messages/$encoded_pubsub_topic" -H "Content-Type: application/json")
printf "\nResponse: $response\n"
if [ "$response" == "[]" ]; then
echo "Error: NODE 2 didn't find the message"
exit 1
else
echo "Success: NODE 2 received the message"
fi
printf "\nCheck message was stored in NODE 2 with v1 API\n"
response=$(curl -v -X GET "http://127.0.0.1:$node_2_rest/store/v1/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true")
printf "\nResponse: $response\n"
if [ "$response" == "[]" ] || [ -z "$response" ]; then
echo "Error: NODE 2 didn't store the message with v1 API"
exit 1
else
echo "Success: NODE 2 stored the message with v1 API"
fi
printf "\nCheck message was stored in NODE 2 with v3 API\n"
response=$(curl -v -X GET "http://127.0.0.1:$node_2_rest/store/v3/messages?peerAddr=$multiaddr_with_id&contentTopics=$encoded_content_topic&pageSize=5&ascending=true")
printf "\nResponse: $response\n"
if [ "$response" == "[]" ] || [ -z "$response" ]; then
echo "Error: NODE 2 didn't stored the message with v3 API"
exit 1
else
echo "Success: NODE 2 stored the message with v3 API"
fi

View File

@ -82,3 +82,41 @@ class REST(BaseClient):
endpoint = f"filter/v2/messages/{quote(content_topic, safe='')}"
get_messages_response = self.rest_call("get", endpoint)
return get_messages_response.json()
def get_store_messages(
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
):
base_url = f"store/{store_v}/messages"
params = []
if peerAddr is not None:
params.append(f"peerAddr={quote(peerAddr, safe='')}")
if includeData is not None:
params.append(f"includeData={includeData}")
if pubsubTopic is not None:
params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}")
if contentTopics is not None:
params.append(f"contentTopics={quote(contentTopics, safe='')}")
if startTime is not None:
params.append(f"startTime={startTime}")
if endTime is not None:
params.append(f"endTime={endTime}")
if hashes is not None:
params.append(f"hashes={quote(hashes, safe='')}")
if cursor is not None:
params.append(f"cursor={quote(cursor, safe='')}")
if pageSize is not None:
params.append(f"pageSize={pageSize}")
if ascending is not None:
params.append(f"ascending={ascending}")
# Append any additional keyword arguments to the parameters list
for key, value in kwargs.items():
if value is not None:
params.append(f"{key}={quote(str(value), safe='')}")
if params:
base_url += "?" + "&".join(params)
get_messages_response = self.rest_call("get", base_url)
return get_messages_response.json()

View File

@ -17,16 +17,15 @@ class MessageRpcResponse:
rate_limit_proof: Optional[dict] = field(default_factory=dict)
message_rpc_response_schema = class_schema(MessageRpcResponse)()
class WakuMessage:
def __init__(self, message_response):
def __init__(self, message_response, schema=MessageRpcResponse):
self.schema = schema
self.received_messages = message_response
self.message_rpc_response_schema = class_schema(self.schema)()
@allure.step
def assert_received_message(self, sent_message, index=0):
message = message_rpc_response_schema.load(self.received_messages[index])
message = self.message_rpc_response_schema.load(self.received_messages[index])
def assert_fail_message(field_name):
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(message, field_name)}"

View File

@ -260,6 +260,24 @@ class WakuNode:
def get_filter_messages(self, content_topic, pubsub_topic=None):
return self._api.get_filter_messages(content_topic, pubsub_topic)
def get_store_messages(
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
):
return self._api.get_store_messages(
peerAddr=peerAddr,
includeData=includeData,
pubsubTopic=pubsubTopic,
contentTopics=contentTopics,
startTime=startTime,
endTime=endTime,
hashes=hashes,
cursor=cursor,
pageSize=pageSize,
ascending=ascending,
store_v=store_v,
**kwargs,
)
def get_metrics(self):
if self.is_nwaku():
metrics = requests.get(f"http://localhost:{self._metrics_port}/metrics")

204
src/steps/store.py Normal file
View File

@ -0,0 +1,204 @@
import inspect
import os
from src.libs.custom_logger import get_custom_logger
from time import time
import pytest
import allure
from src.libs.common import to_base64, delay, gen_step_id
from src.node.waku_message import WakuMessage
from src.env_vars import (
ADDITIONAL_NODES,
NODE_1,
NODE_2,
)
from src.node.waku_node import WakuNode, rln_credential_store_ready
from tenacity import retry, stop_after_delay, wait_fixed
from src.test_data import VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__)
class StepsStore:
test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/0/0"
test_payload = "Store works!!"
@pytest.fixture(scope="function", autouse=True)
def store_setup(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.main_publishing_nodes = []
self.store_nodes = []
self.optional_nodes = []
self.multiaddr_list = []
@allure.step
def add_node_peer(self, node):
if node.is_nwaku():
for multiaddr in self.multiaddr_list:
node.add_peers([multiaddr])
@allure.step
def start_publishing_node(self, image, node_index, **kwargs):
node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}")
node.start(**kwargs)
if kwargs["relay"] == "true":
self.main_publishing_nodes.extend([node])
if kwargs["store"] == "true":
self.store_nodes.extend([node])
self.add_node_peer(node)
self.multiaddr_list.extend([node.get_multiaddr_with_id()])
return node
@allure.step
def setup_store_node(self, image, node_index, **kwargs):
node = WakuNode(image, f"store_node{node_index}_{self.test_id}")
node.start(discv5_bootstrap_node=self.enr_uri, storenode=self.multiaddr_list[0], **kwargs)
if kwargs["relay"] == "true":
self.main_publishing_nodes.extend([node])
self.store_nodes.extend([node])
self.add_node_peer(node)
return node
@allure.step
def setup_first_publishing_node(self, store="true", relay="true", **kwargs):
self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, **kwargs)
self.enr_uri = self.publishing_node1.get_enr_uri()
@allure.step
def setup_second_publishing_node(self, store, relay, **kwargs):
self.publishing_node2 = self.start_publishing_node(NODE_1, node_index=2, store=store, relay=relay, **kwargs)
@allure.step
def setup_additional_publishing_nodes(self, node_list=ADDITIONAL_NODES, **kwargs):
if node_list:
nodes = [node.strip() for node in node_list.split(",") if node]
else:
pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test")
for index, node in enumerate(nodes):
self.start_publishing_node(node, node_index=index + 2, store="true", relay="true", **kwargs)
@allure.step
def setup_first_store_node(self, store="true", relay="true", **kwargs):
self.store_node1 = self.setup_store_node(NODE_2, node_index=1, store=store, relay=relay, **kwargs)
@allure.step
def setup_second_store_node(self, store="true", relay="false", **kwargs):
self.store_node2 = self.setup_store_node(NODE_2, node_index=2, store=store, relay=relay, **kwargs)
@allure.step
def setup_additional_store_nodes(self, node_list=ADDITIONAL_NODES, **kwargs):
if node_list:
nodes = [node.strip() for node in node_list.split(",") if node]
else:
pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test")
self.additional_store_nodes = []
for index, node in enumerate(nodes):
node = self.setup_store_node(node, node_index=index + 2, store="true", relay="false", **kwargs)
self.additional_store_nodes.append(node)
@allure.step
def subscribe_to_pubsub_topics_via_relay(self, node=None, pubsub_topics=None):
if pubsub_topics is None:
pubsub_topics = [self.test_pubsub_topic]
if not node:
node = self.main_publishing_nodes
if isinstance(node, list):
for node in node:
node.set_relay_subscriptions(pubsub_topics)
else:
node.set_relay_subscriptions(pubsub_topics)
@allure.step
def subscribe_to_pubsub_topics_via_filter(self, node, pubsub_topic=None, content_topic=None):
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if content_topic is None:
content_topic = [self.test_content_topic]
subscription = {"requestId": "1", "contentFilters": content_topic, "pubsubTopic": pubsub_topic}
node.set_filter_subscriptions(subscription)
@allure.step
def publish_message_via(self, type, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None):
self.message = self.create_message() if message is None else message
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if not sender:
sender = self.publishing_node1
if type == "relay":
logger.debug("Relaying message")
sender.send_relay_message(self.message, pubsub_topic)
elif type == "lightpush":
payload = self.create_payload(pubsub_topic, self.message)
sender.send_light_push_message(payload)
delay(message_propagation_delay)
@allure.step
def check_published_message_is_stored(
self,
store_node=None,
peerAddr=None,
includeData=None,
pubsubTopic=None,
contentTopics=None,
startTime=None,
endTime=None,
hashes=None,
cursor=None,
pageSize=None,
ascending=None,
store_v="v1",
**kwargs,
):
if store_node is None:
store_node = self.store_nodes
elif not isinstance(store_node, list):
store_node = [store_node]
else:
store_node = store_node
for node in store_node:
logger.debug(f"Checking that peer {node.image} can find the stored message")
self.store_response = node.get_store_messages(
peerAddr=peerAddr,
includeData=includeData,
pubsubTopic=pubsubTopic,
contentTopics=contentTopics,
startTime=startTime,
endTime=endTime,
hashes=hashes,
cursor=cursor,
pageSize=pageSize,
ascending=ascending,
store_v=store_v,
**kwargs,
)
assert "messages" in self.store_response, f"Peer {node.image} has no messages key in the reponse"
assert self.store_response["messages"], f"Peer {node.image} couldn't find any messages"
assert len(self.store_response["messages"]) >= 1, "Expected at least 1 message but got none"
waku_message = WakuMessage(self.store_response["messages"][-1:])
waku_message.assert_received_message(self.message)
@allure.step
def check_store_returns_empty_response(self, pubsub_topic=None):
if not pubsub_topic:
pubsub_topic = self.test_pubsub_topic
try:
self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=5, ascending="true")
except Exception as ex:
assert "couldn't find any messages" in str(ex)
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message
@allure.step
def create_payload(self, pubsub_topic=None, message=None, **kwargs):
if message is None:
message = self.create_message()
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
payload = {"pubsubTopic": pubsub_topic, "message": message}
payload.update(kwargs)
return payload

0
tests/store/__init__.py Normal file
View File

View File

@ -0,0 +1,30 @@
import pytest
from src.libs.custom_logger import get_custom_logger
from src.libs.common import to_base64
from src.steps.store import StepsStore
from src.test_data import SAMPLE_INPUTS
logger = get_custom_logger(__name__)
# TO DO test without pubsubtopic freezes
class TestGetMessages(StepsStore):
@pytest.fixture(scope="function", autouse=True)
def store_functional_setup(self, store_setup):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
def test_store_messages_with_valid_payloads(self):
failed_payloads = []
for payload in SAMPLE_INPUTS:
logger.debug(f'Running test with payload {payload["description"]}')
message = self.create_message(payload=to_base64(payload["value"]))
try:
self.publish_message_via("relay", message=message)
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true")
except Exception as e:
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
failed_payloads.append(payload["description"])
assert not failed_payloads, f"Payloads failed: {failed_payloads}"

View File

@ -0,0 +1,81 @@
import pytest
from src.env_vars import NODE_2
from src.steps.store import StepsStore
class TestRunningNodes(StepsStore):
def test_main_node_relay_and_store__peer_relay_and_store(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
def test_main_node_relay_and_store__peer_only_store(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="false")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
if self.store_node1.is_gowaku():
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
elif self.store_node1.is_nwaku():
self.check_store_returns_empty_response()
def test_main_node_relay_and_store__peer_only_relay(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="false", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
def test_main_node_relay_and_store__peer_neither_relay_nor_store(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="false", relay="false")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1087")
def test_main_node_only_relay__peer_relay_and_store(self):
self.setup_first_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
def test_main_node_only_relay__peer_only_store(self):
self.setup_first_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="true", relay="false")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
if self.store_node1.is_gowaku():
try:
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
except Exception as ex:
assert "failed to negotiate protocol: protocols not supported" in str(ex)
elif self.store_node1.is_nwaku():
self.check_store_returns_empty_response()
def test_main_node_only_relay__peer_only_relay(self):
self.setup_first_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="false", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
try:
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
except Exception as ex:
assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex)
def test_store_lightpushed_message(self):
self.setup_first_publishing_node(store="true", relay="true", lightpush="true")
self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0])
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("lightpush", sender=self.store_node1)
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
def test_store_with_filter(self):
self.setup_first_publishing_node(store="true", relay="true", filter="true")
self.setup_first_store_node(store="false", relay="false", filter="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")