openzeppelin_relayer/jobs/handlers/
transaction_status_handler.rs

1//! Transaction status monitoring handler.
2//!
3//! Monitors the status of submitted transactions by:
4//! - Checking transaction status on the network
5//! - Updating transaction status in storage
6//! - Tracking failure counts for circuit breaker decisions (stored in transaction metadata)
7use actix_web::web::ThinData;
8use eyre::Result;
9use tracing::{debug, info, instrument, warn};
10
11use crate::{
12    constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
13    domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
14    jobs::{Job, StatusCheckContext, TransactionStatusCheck},
15    models::{
16        ApiError, DefaultAppState, TransactionError, TransactionMetadata, TransactionRepoModel,
17    },
18    observability::request_id::set_request_id,
19    queues::{HandlerError, WorkerContext},
20    repositories::TransactionRepository,
21};
22
23#[instrument(
24    level = "debug",
25    skip(job, state, ctx),
26    fields(
27        request_id = ?job.request_id,
28        job_id = %job.message_id,
29        job_type = %job.job_type.to_string(),
30        attempt = %ctx.attempt,
31        tx_id = %job.data.transaction_id,
32        relayer_id = %job.data.relayer_id,
33        task_id = %ctx.task_id,
34    )
35)]
36pub async fn transaction_status_handler(
37    job: Job<TransactionStatusCheck>,
38    state: ThinData<DefaultAppState>,
39    ctx: WorkerContext,
40) -> Result<(), HandlerError> {
41    if let Some(request_id) = job.request_id.clone() {
42        set_request_id(request_id);
43    }
44
45    let tx_repo = state.transaction_repository();
46
47    // Execute status check - all logic moved here so errors go through handle_result
48    let req_result = handle_request(&job.data, &state, ctx.attempt, &ctx.task_id).await;
49
50    let tx_id = &job.data.transaction_id;
51
52    // Handle result and update counters via transaction repository
53    handle_result(
54        req_result.result,
55        &*tx_repo,
56        tx_id,
57        req_result.metadata,
58        req_result.should_retry_on_error,
59    )
60    .await
61}
62
63/// Handles status check results with circuit breaker tracking.
64///
65/// # Strategy
66/// - If transaction is in final state → return Ok (job completes, metadata cleaned up via delete_at)
67/// - If success but not final → Reset consecutive to 0, return Err (retries)
68/// - If error with should_retry=true → Increment counters, return Err (retries)
69/// - If error with should_retry=false → Return Ok (job completes, e.g., transaction not found)
70/// - If counters are None (early failure) → Skip counter updates
71///
72/// Counters are stored in transaction metadata, persisted via atomic Lua scripts.
73async fn handle_result<TR>(
74    result: Result<TransactionRepoModel>,
75    tx_repo: &TR,
76    tx_id: &str,
77    metadata: Option<TransactionMetadata>,
78    should_retry_on_error: bool,
79) -> Result<(), HandlerError>
80where
81    TR: TransactionRepository + Send + Sync,
82{
83    match result {
84        Ok(tx) if is_final_state(&tx.status) => {
85            // Transaction reached final state - job complete
86            // No need to clean up counters - tx will be deleted via delete_at
87            debug!(
88                tx_id = %tx.id,
89                relayer_id = %tx.relayer_id,
90                status = ?tx.status,
91                consecutive_failures = ?metadata.as_ref().map(|m| m.consecutive_failures),
92                total_failures = ?metadata.as_ref().map(|m| m.total_failures),
93                "transaction in final state, status check complete"
94            );
95
96            Ok(())
97        }
98        Ok(tx) => {
99            // Success but not final - RESET consecutive counter, keep total unchanged
100            debug!(
101                tx_id = %tx.id,
102                relayer_id = %tx.relayer_id,
103                status = ?tx.status,
104                "transaction not in final state"
105            );
106
107            // Use fresh metadata from the transaction (updated during handle_transaction_status)
108            // to decide whether a reset is needed, falling back to the pre-check snapshot.
109            let fresh_meta = tx.metadata.clone().or(metadata);
110            if let Some(meta) = fresh_meta {
111                if meta.consecutive_failures > 0 {
112                    if let Err(e) = tx_repo
113                        .reset_status_check_consecutive_failures(tx_id.to_string())
114                        .await
115                    {
116                        warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
117                    }
118                }
119            }
120
121            // Return error to trigger retry
122            Err(HandlerError::Retry(format!(
123                "transaction status: {:?} - not in final state, retrying",
124                tx.status
125            )))
126        }
127        Err(e) => {
128            if e.downcast_ref::<TransactionError>()
129                .is_some_and(TransactionError::is_concurrent_update_conflict)
130            {
131                info!(
132                    error = %e,
133                    tx_id = %tx_id,
134                    "status check lost a concurrent update race, completing job without counter changes"
135                );
136                return Ok(());
137            }
138
139            // Check if this is a permanent failure that shouldn't retry
140            if !should_retry_on_error {
141                info!(
142                    error = %e,
143                    tx_id = %tx_id,
144                    "status check failed with permanent error, completing job without retry"
145                );
146                return Ok(());
147            }
148
149            // Transient error - INCREMENT both counters (only if we have metadata)
150            if let Some(meta) = metadata {
151                warn!(
152                    error = %e,
153                    tx_id = %tx_id,
154                    consecutive_failures = meta.consecutive_failures.saturating_add(1),
155                    total_failures = meta.total_failures.saturating_add(1),
156                    "status check failed, incrementing failure counters"
157                );
158
159                // Update counters via atomic transaction repository method
160                if let Err(update_err) = tx_repo
161                    .increment_status_check_failures(tx_id.to_string())
162                    .await
163                {
164                    warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
165                }
166            } else {
167                // Early failure before counters were read - skip counter update
168                warn!(
169                    error = %e,
170                    tx_id = %tx_id,
171                    "status check failed early, counters not available"
172                );
173            }
174
175            // Return error to trigger retry
176            Err(HandlerError::Retry(format!("{e}")))
177        }
178    }
179}
180
181/// Result of handle_request including whether to retry on error.
182struct HandleRequestResult {
183    result: Result<TransactionRepoModel>,
184    /// Transaction metadata with failure counters. None if metadata couldn't be read
185    /// (e.g., transaction fetch failed early).
186    metadata: Option<TransactionMetadata>,
187    /// If false, errors should not trigger retry (e.g., transaction not found)
188    should_retry_on_error: bool,
189}
190
191/// Executes the status check logic and returns the result with counter values.
192/// Returns None for counters if they couldn't be read (e.g., transaction fetch failed early).
193/// Sets should_retry_on_error=false for permanent failures like transaction not found.
194async fn handle_request(
195    status_request: &TransactionStatusCheck,
196    state: &ThinData<DefaultAppState>,
197    attempt: usize,
198    task_id: &str,
199) -> HandleRequestResult {
200    let tx_id = &status_request.transaction_id;
201    debug!(
202        tx_id = %tx_id,
203        relayer_id = %status_request.relayer_id,
204        "handling transaction status check"
205    );
206
207    // Fetch transaction - if this fails, we can't read counters yet
208    let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
209        Ok(tx) => tx,
210        Err(ApiError::NotFound(msg)) => {
211            // Transaction not found - permanent failure, don't retry
212            warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
213            return HandleRequestResult {
214                result: Err(eyre::eyre!("Transaction not found: {}", msg)),
215                metadata: None,
216                should_retry_on_error: false,
217            };
218        }
219        Err(e) => {
220            // Other errors - should retry
221            return HandleRequestResult {
222                result: Err(e.into()),
223                metadata: None,
224                should_retry_on_error: true,
225            };
226        }
227    };
228
229    // Read failure counters from transaction metadata
230    let meta = transaction.metadata.clone().unwrap_or_default();
231
232    // Get network type from transaction (authoritative source)
233    let network_type = transaction.network_type;
234    let max_consecutive = get_max_consecutive_status_failures(network_type);
235    let max_total = get_max_total_status_failures(network_type);
236
237    debug!(
238        tx_id = %tx_id,
239        consecutive_failures = meta.consecutive_failures,
240        total_failures = meta.total_failures,
241        max_consecutive,
242        max_total,
243        attempt,
244        task_id = %task_id,
245        "handling transaction status check"
246    );
247
248    // Build circuit breaker context, attaching any job metadata (e.g., nonce recovery hints)
249    let context = StatusCheckContext::new(
250        meta.consecutive_failures,
251        meta.total_failures,
252        attempt as u32,
253        max_consecutive,
254        max_total,
255        network_type,
256    )
257    .with_job_metadata(status_request.metadata.clone());
258
259    // Get relayer transaction handler
260    let relayer_transaction =
261        match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
262            Ok(rt) => rt,
263            Err(ApiError::NotFound(msg)) => {
264                // Relayer or signer not found - permanent failure, don't retry
265                warn!(
266                    tx_id = %tx_id,
267                    relayer_id = %status_request.relayer_id,
268                    "relayer or signer not found, completing job without retry: {}", msg
269                );
270                return HandleRequestResult {
271                    result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
272                    metadata: Some(meta),
273                    should_retry_on_error: false,
274                };
275            }
276            Err(e) => {
277                // Other errors - should retry
278                return HandleRequestResult {
279                    result: Err(e.into()),
280                    metadata: Some(meta),
281                    should_retry_on_error: true,
282                };
283            }
284        };
285
286    // Execute status check
287    let result = relayer_transaction
288        .handle_transaction_status(transaction, Some(context))
289        .await
290        .map_err(|e| e.into());
291
292    if let Ok(tx) = result.as_ref() {
293        debug!(
294            tx_id = %tx.id,
295            status = ?tx.status,
296            "status check handled successfully"
297        );
298    }
299
300    HandleRequestResult {
301        result,
302        metadata: Some(meta),
303        should_retry_on_error: true,
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::{
311        models::{NetworkType, TransactionStatus},
312        repositories::MockTransactionRepository,
313    };
314    use std::collections::HashMap;
315
316    #[tokio::test]
317    async fn test_status_check_job_validation() {
318        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
319        let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
320
321        assert_eq!(job.data.transaction_id, "tx123");
322        assert_eq!(job.data.relayer_id, "relayer-1");
323        assert!(job.data.metadata.is_none());
324    }
325
326    #[tokio::test]
327    async fn test_status_check_with_metadata() {
328        let mut metadata = HashMap::new();
329        metadata.insert("retry_count".to_string(), "2".to_string());
330        metadata.insert("last_status".to_string(), "pending".to_string());
331
332        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
333            .with_metadata(metadata.clone());
334
335        assert!(check_job.metadata.is_some());
336        let job_metadata = check_job.metadata.unwrap();
337        assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
338        assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
339    }
340
341    #[test]
342    fn test_status_check_network_type_required() {
343        // Jobs should always have network_type set
344        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
345        assert!(check_job.network_type.is_some());
346
347        // Verify different network types are preserved
348        let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
349        assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
350
351        let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
352        assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
353    }
354
355    mod context_tests {
356        use super::*;
357
358        #[test]
359        fn test_context_should_force_finalize_below_threshold() {
360            let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
361            assert!(!ctx.should_force_finalize());
362        }
363
364        #[test]
365        fn test_context_should_force_finalize_consecutive_at_threshold() {
366            let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
367            assert!(ctx.should_force_finalize());
368        }
369
370        #[test]
371        fn test_context_should_force_finalize_total_at_threshold() {
372            let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
373            assert!(ctx.should_force_finalize());
374        }
375    }
376
377    mod final_state_tests {
378        use super::*;
379
380        fn verify_final_state(status: TransactionStatus) {
381            assert!(is_final_state(&status));
382        }
383
384        fn verify_not_final_state(status: TransactionStatus) {
385            assert!(!is_final_state(&status));
386        }
387
388        #[test]
389        fn test_confirmed_is_final() {
390            verify_final_state(TransactionStatus::Confirmed);
391        }
392
393        #[test]
394        fn test_failed_is_final() {
395            verify_final_state(TransactionStatus::Failed);
396        }
397
398        #[test]
399        fn test_canceled_is_final() {
400            verify_final_state(TransactionStatus::Canceled);
401        }
402
403        #[test]
404        fn test_expired_is_final() {
405            verify_final_state(TransactionStatus::Expired);
406        }
407
408        #[test]
409        fn test_pending_is_not_final() {
410            verify_not_final_state(TransactionStatus::Pending);
411        }
412
413        #[test]
414        fn test_sent_is_not_final() {
415            verify_not_final_state(TransactionStatus::Sent);
416        }
417
418        #[test]
419        fn test_submitted_is_not_final() {
420            verify_not_final_state(TransactionStatus::Submitted);
421        }
422
423        #[test]
424        fn test_mined_is_not_final() {
425            verify_not_final_state(TransactionStatus::Mined);
426        }
427    }
428
429    mod handle_result_tests {
430        use super::*;
431
432        /// Tests that counter increment uses saturating_add to prevent overflow
433        #[test]
434        fn test_counter_increment_saturating() {
435            let consecutive: u32 = u32::MAX;
436            let total: u32 = u32::MAX;
437
438            let new_consecutive = consecutive.saturating_add(1);
439            let new_total = total.saturating_add(1);
440
441            // Should not overflow, stays at MAX
442            assert_eq!(new_consecutive, u32::MAX);
443            assert_eq!(new_total, u32::MAX);
444        }
445
446        /// Tests normal counter increment
447        #[test]
448        fn test_counter_increment_normal() {
449            let consecutive: u32 = 5;
450            let total: u32 = 10;
451
452            let new_consecutive = consecutive.saturating_add(1);
453            let new_total = total.saturating_add(1);
454
455            assert_eq!(new_consecutive, 6);
456            assert_eq!(new_total, 11);
457        }
458
459        /// Tests that consecutive counter resets to 0 on success (non-final)
460        #[test]
461        fn test_consecutive_reset_on_success() {
462            // When status check succeeds but tx is not final,
463            // consecutive should reset to 0, total stays unchanged
464            let total: u32 = 20;
465
466            // On success, consecutive resets
467            let new_consecutive = 0;
468            let new_total = total; // unchanged
469
470            assert_eq!(new_consecutive, 0);
471            assert_eq!(new_total, 20);
472        }
473
474        /// Tests that final states are correctly identified for cleanup
475        #[test]
476        fn test_final_state_triggers_cleanup() {
477            let final_states = vec![
478                TransactionStatus::Confirmed,
479                TransactionStatus::Failed,
480                TransactionStatus::Canceled,
481                TransactionStatus::Expired,
482            ];
483
484            for status in final_states {
485                assert!(
486                    is_final_state(&status),
487                    "Expected {status:?} to be a final state"
488                );
489            }
490        }
491
492        /// Tests that non-final states trigger retry
493        #[test]
494        fn test_non_final_state_triggers_retry() {
495            let non_final_states = vec![
496                TransactionStatus::Pending,
497                TransactionStatus::Sent,
498                TransactionStatus::Submitted,
499                TransactionStatus::Mined,
500            ];
501
502            for status in non_final_states {
503                assert!(
504                    !is_final_state(&status),
505                    "Expected {status:?} to NOT be a final state"
506                );
507            }
508        }
509    }
510
511    mod handle_request_result_tests {
512        use super::*;
513
514        #[tokio::test]
515        async fn test_handle_result_ignores_concurrent_update_conflict() {
516            let tx_repo = MockTransactionRepository::new();
517
518            let result = handle_result(
519                Err(TransactionError::ConcurrentUpdateConflict("tx race".to_string()).into()),
520                &tx_repo,
521                "tx-1",
522                Some(TransactionMetadata {
523                    consecutive_failures: 2,
524                    total_failures: 5,
525                    ..Default::default()
526                }),
527                true,
528            )
529            .await;
530
531            assert!(result.is_ok());
532        }
533
534        #[test]
535        fn test_handle_request_result_with_counters() {
536            let result = HandleRequestResult {
537                result: Ok(TransactionRepoModel::default()),
538                metadata: Some(TransactionMetadata {
539                    consecutive_failures: 5,
540                    total_failures: 10,
541                    insufficient_fee_retries: 2,
542                    try_again_later_retries: 1,
543                    nonce_too_high_retries: 0,
544                }),
545                should_retry_on_error: true,
546            };
547
548            assert!(result.result.is_ok());
549            let meta = result.metadata.unwrap();
550            assert_eq!(meta.consecutive_failures, 5);
551            assert_eq!(meta.total_failures, 10);
552            assert!(result.should_retry_on_error);
553        }
554
555        #[test]
556        fn test_handle_request_result_without_counters() {
557            // Early failure before counters could be read
558            let result = HandleRequestResult {
559                result: Err(eyre::eyre!("Transaction not found")),
560                metadata: None,
561                should_retry_on_error: false,
562            };
563
564            assert!(result.result.is_err());
565            assert!(result.metadata.is_none());
566            assert!(!result.should_retry_on_error);
567        }
568
569        #[test]
570        fn test_permanent_error_should_not_retry() {
571            // NotFound errors are permanent - should not retry
572            let result = HandleRequestResult {
573                result: Err(eyre::eyre!("Transaction not found")),
574                metadata: None,
575                should_retry_on_error: false,
576            };
577
578            // Permanent errors have should_retry_on_error = false
579            assert!(!result.should_retry_on_error);
580        }
581
582        #[test]
583        fn test_transient_error_should_retry() {
584            // Network/connection errors are transient - should retry
585            let result = HandleRequestResult {
586                result: Err(eyre::eyre!("Connection timeout")),
587                metadata: Some(TransactionMetadata {
588                    consecutive_failures: 3,
589                    total_failures: 7,
590                    insufficient_fee_retries: 1,
591                    try_again_later_retries: 0,
592                    nonce_too_high_retries: 0,
593                }),
594                should_retry_on_error: true,
595            };
596
597            // Transient errors have should_retry_on_error = true
598            assert!(result.should_retry_on_error);
599        }
600    }
601}