queue the pi in the future if it is locked and the start of a test w/ burnettk

This commit is contained in:
jasquat 2024-05-06 16:12:04 -04:00
parent 38c30f0207
commit c3fc6214f9
No known key found for this signature in database
3 changed files with 75 additions and 2 deletions

View File

@ -1,7 +1,11 @@
from typing import Any
from billiard import current_process # type: ignore
from celery import shared_task
from flask import current_app
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate,
)
@ -23,9 +27,15 @@ class SpiffCeleryWorkerError(Exception):
pass
def get_current_process_index() -> Any:
raise Exception("NOOO")
# return current_process().index
return 1
@shared_task(ignore_result=False, time_limit=ten_minutes)
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict:
proc_index = current_process().index
proc_index = get_current_process_index()
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index)
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
@ -71,6 +81,8 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:"
f" {str(exception)}"
)
# NOTE: consider exponential backoff
queue_future_task_if_appropriate(process_instance, eta_in_seconds=10, task_guid=task_guid)
return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)}
except Exception as exception:
db.session.rollback() # in case the above left the database with a bad transaction

View File

@ -35,7 +35,9 @@ def should_queue_process_instance(process_instance: ProcessInstanceModel, execut
return False
def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str) -> bool:
def queue_future_task_if_appropriate(
process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str | None = None
) -> bool:
if queue_enabled_for_process_model(process_instance):
buffer = 1
countdown = eta_in_seconds - time.time() + buffer

View File

@ -0,0 +1,59 @@
from unittest.mock import patch
from pytest_mock.plugin import MockerFixture
from flask.app import Flask
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task import celery_task_process_instance_run
# from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
# queue_future_task_if_appropriate,
# )
from spiffworkflow_backend.models.db import db
import time
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
class TestProcessInstanceTask(BaseTest):
def test_queues_process_instance_if_locked(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
# mocked_process = mocker.Mock()
# mocked_process.index = 12345 # Setting the pid attribute to a mock value
# mocker.patch("billiard.current_process", return_value=mocked_process)
process_model = load_test_spec(
process_model_id="test_group/model_with_lanes",
bpmn_file_name="lanes.bpmn",
process_model_source_directory="model_with_lanes",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model, status="waiting")
assert process_instance.status == ProcessInstanceStatus.waiting.value
queue_entry = ProcessInstanceQueueModel.query.filter_by(process_instance_id=process_instance.id).first()
assert queue_entry is not None
queue_entry.locked_by = "test:test_waiting"
queue_entry.locked_at_seconds = round(time.time())
db.session.add(queue_entry)
db.session.commit()
# mocker.patch(
# "spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer.queue_future_task_if_appropriate"
# )
with patch(
"spiffworkflow_backend.background_processing.celery_tasks.process_instance_task.get_current_process_index"
) as mock_proc_index:
mock_proc_index.return_value = 1
# with patch(
# "spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer.queue_future_task_if_appropriate"
# ) as mock_queue:
with patch(
"spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer.queue_future_task_if_appropriate"
) as mock_queue:
mock_queue.return_value = 1
# mock = mocker.patch("billiard.current_process()", return_value=0)
celery_task_process_instance_run(process_instance_id=process_instance.id)
assert mock_queue.call_count == 1