1use actix_web::web::ThinData;
8use eyre::Result;
9use tracing::{debug, info, instrument, warn};
10
11use crate::{
12 constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
13 domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
14 jobs::{Job, StatusCheckContext, TransactionStatusCheck},
15 models::{
16 ApiError, DefaultAppState, TransactionError, TransactionMetadata, TransactionRepoModel,
17 },
18 observability::request_id::set_request_id,
19 queues::{HandlerError, WorkerContext},
20 repositories::TransactionRepository,
21};
22
23#[instrument(
24 level = "debug",
25 skip(job, state, ctx),
26 fields(
27 request_id = ?job.request_id,
28 job_id = %job.message_id,
29 job_type = %job.job_type.to_string(),
30 attempt = %ctx.attempt,
31 tx_id = %job.data.transaction_id,
32 relayer_id = %job.data.relayer_id,
33 task_id = %ctx.task_id,
34 )
35)]
36pub async fn transaction_status_handler(
37 job: Job<TransactionStatusCheck>,
38 state: ThinData<DefaultAppState>,
39 ctx: WorkerContext,
40) -> Result<(), HandlerError> {
41 if let Some(request_id) = job.request_id.clone() {
42 set_request_id(request_id);
43 }
44
45 let tx_repo = state.transaction_repository();
46
47 let req_result = handle_request(&job.data, &state, ctx.attempt, &ctx.task_id).await;
49
50 let tx_id = &job.data.transaction_id;
51
52 handle_result(
54 req_result.result,
55 &*tx_repo,
56 tx_id,
57 req_result.metadata,
58 req_result.should_retry_on_error,
59 )
60 .await
61}
62
63async fn handle_result<TR>(
74 result: Result<TransactionRepoModel>,
75 tx_repo: &TR,
76 tx_id: &str,
77 metadata: Option<TransactionMetadata>,
78 should_retry_on_error: bool,
79) -> Result<(), HandlerError>
80where
81 TR: TransactionRepository + Send + Sync,
82{
83 match result {
84 Ok(tx) if is_final_state(&tx.status) => {
85 debug!(
88 tx_id = %tx.id,
89 relayer_id = %tx.relayer_id,
90 status = ?tx.status,
91 consecutive_failures = ?metadata.as_ref().map(|m| m.consecutive_failures),
92 total_failures = ?metadata.as_ref().map(|m| m.total_failures),
93 "transaction in final state, status check complete"
94 );
95
96 Ok(())
97 }
98 Ok(tx) => {
99 debug!(
101 tx_id = %tx.id,
102 relayer_id = %tx.relayer_id,
103 status = ?tx.status,
104 "transaction not in final state"
105 );
106
107 let fresh_meta = tx.metadata.clone().or(metadata);
110 if let Some(meta) = fresh_meta {
111 if meta.consecutive_failures > 0 {
112 if let Err(e) = tx_repo
113 .reset_status_check_consecutive_failures(tx_id.to_string())
114 .await
115 {
116 warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
117 }
118 }
119 }
120
121 Err(HandlerError::Retry(format!(
123 "transaction status: {:?} - not in final state, retrying",
124 tx.status
125 )))
126 }
127 Err(e) => {
128 if e.downcast_ref::<TransactionError>()
129 .is_some_and(TransactionError::is_concurrent_update_conflict)
130 {
131 info!(
132 error = %e,
133 tx_id = %tx_id,
134 "status check lost a concurrent update race, completing job without counter changes"
135 );
136 return Ok(());
137 }
138
139 if !should_retry_on_error {
141 info!(
142 error = %e,
143 tx_id = %tx_id,
144 "status check failed with permanent error, completing job without retry"
145 );
146 return Ok(());
147 }
148
149 if let Some(meta) = metadata {
151 warn!(
152 error = %e,
153 tx_id = %tx_id,
154 consecutive_failures = meta.consecutive_failures.saturating_add(1),
155 total_failures = meta.total_failures.saturating_add(1),
156 "status check failed, incrementing failure counters"
157 );
158
159 if let Err(update_err) = tx_repo
161 .increment_status_check_failures(tx_id.to_string())
162 .await
163 {
164 warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
165 }
166 } else {
167 warn!(
169 error = %e,
170 tx_id = %tx_id,
171 "status check failed early, counters not available"
172 );
173 }
174
175 Err(HandlerError::Retry(format!("{e}")))
177 }
178 }
179}
180
181struct HandleRequestResult {
183 result: Result<TransactionRepoModel>,
184 metadata: Option<TransactionMetadata>,
187 should_retry_on_error: bool,
189}
190
191async fn handle_request(
195 status_request: &TransactionStatusCheck,
196 state: &ThinData<DefaultAppState>,
197 attempt: usize,
198 task_id: &str,
199) -> HandleRequestResult {
200 let tx_id = &status_request.transaction_id;
201 debug!(
202 tx_id = %tx_id,
203 relayer_id = %status_request.relayer_id,
204 "handling transaction status check"
205 );
206
207 let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
209 Ok(tx) => tx,
210 Err(ApiError::NotFound(msg)) => {
211 warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
213 return HandleRequestResult {
214 result: Err(eyre::eyre!("Transaction not found: {}", msg)),
215 metadata: None,
216 should_retry_on_error: false,
217 };
218 }
219 Err(e) => {
220 return HandleRequestResult {
222 result: Err(e.into()),
223 metadata: None,
224 should_retry_on_error: true,
225 };
226 }
227 };
228
229 let meta = transaction.metadata.clone().unwrap_or_default();
231
232 let network_type = transaction.network_type;
234 let max_consecutive = get_max_consecutive_status_failures(network_type);
235 let max_total = get_max_total_status_failures(network_type);
236
237 debug!(
238 tx_id = %tx_id,
239 consecutive_failures = meta.consecutive_failures,
240 total_failures = meta.total_failures,
241 max_consecutive,
242 max_total,
243 attempt,
244 task_id = %task_id,
245 "handling transaction status check"
246 );
247
248 let context = StatusCheckContext::new(
250 meta.consecutive_failures,
251 meta.total_failures,
252 attempt as u32,
253 max_consecutive,
254 max_total,
255 network_type,
256 )
257 .with_job_metadata(status_request.metadata.clone());
258
259 let relayer_transaction =
261 match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
262 Ok(rt) => rt,
263 Err(ApiError::NotFound(msg)) => {
264 warn!(
266 tx_id = %tx_id,
267 relayer_id = %status_request.relayer_id,
268 "relayer or signer not found, completing job without retry: {}", msg
269 );
270 return HandleRequestResult {
271 result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
272 metadata: Some(meta),
273 should_retry_on_error: false,
274 };
275 }
276 Err(e) => {
277 return HandleRequestResult {
279 result: Err(e.into()),
280 metadata: Some(meta),
281 should_retry_on_error: true,
282 };
283 }
284 };
285
286 let result = relayer_transaction
288 .handle_transaction_status(transaction, Some(context))
289 .await
290 .map_err(|e| e.into());
291
292 if let Ok(tx) = result.as_ref() {
293 debug!(
294 tx_id = %tx.id,
295 status = ?tx.status,
296 "status check handled successfully"
297 );
298 }
299
300 HandleRequestResult {
301 result,
302 metadata: Some(meta),
303 should_retry_on_error: true,
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::{
311 models::{NetworkType, TransactionStatus},
312 repositories::MockTransactionRepository,
313 };
314 use std::collections::HashMap;
315
316 #[tokio::test]
317 async fn test_status_check_job_validation() {
318 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
319 let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
320
321 assert_eq!(job.data.transaction_id, "tx123");
322 assert_eq!(job.data.relayer_id, "relayer-1");
323 assert!(job.data.metadata.is_none());
324 }
325
326 #[tokio::test]
327 async fn test_status_check_with_metadata() {
328 let mut metadata = HashMap::new();
329 metadata.insert("retry_count".to_string(), "2".to_string());
330 metadata.insert("last_status".to_string(), "pending".to_string());
331
332 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
333 .with_metadata(metadata.clone());
334
335 assert!(check_job.metadata.is_some());
336 let job_metadata = check_job.metadata.unwrap();
337 assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
338 assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
339 }
340
341 #[test]
342 fn test_status_check_network_type_required() {
343 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
345 assert!(check_job.network_type.is_some());
346
347 let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
349 assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
350
351 let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
352 assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
353 }
354
355 mod context_tests {
356 use super::*;
357
358 #[test]
359 fn test_context_should_force_finalize_below_threshold() {
360 let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
361 assert!(!ctx.should_force_finalize());
362 }
363
364 #[test]
365 fn test_context_should_force_finalize_consecutive_at_threshold() {
366 let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
367 assert!(ctx.should_force_finalize());
368 }
369
370 #[test]
371 fn test_context_should_force_finalize_total_at_threshold() {
372 let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
373 assert!(ctx.should_force_finalize());
374 }
375 }
376
377 mod final_state_tests {
378 use super::*;
379
380 fn verify_final_state(status: TransactionStatus) {
381 assert!(is_final_state(&status));
382 }
383
384 fn verify_not_final_state(status: TransactionStatus) {
385 assert!(!is_final_state(&status));
386 }
387
388 #[test]
389 fn test_confirmed_is_final() {
390 verify_final_state(TransactionStatus::Confirmed);
391 }
392
393 #[test]
394 fn test_failed_is_final() {
395 verify_final_state(TransactionStatus::Failed);
396 }
397
398 #[test]
399 fn test_canceled_is_final() {
400 verify_final_state(TransactionStatus::Canceled);
401 }
402
403 #[test]
404 fn test_expired_is_final() {
405 verify_final_state(TransactionStatus::Expired);
406 }
407
408 #[test]
409 fn test_pending_is_not_final() {
410 verify_not_final_state(TransactionStatus::Pending);
411 }
412
413 #[test]
414 fn test_sent_is_not_final() {
415 verify_not_final_state(TransactionStatus::Sent);
416 }
417
418 #[test]
419 fn test_submitted_is_not_final() {
420 verify_not_final_state(TransactionStatus::Submitted);
421 }
422
423 #[test]
424 fn test_mined_is_not_final() {
425 verify_not_final_state(TransactionStatus::Mined);
426 }
427 }
428
429 mod handle_result_tests {
430 use super::*;
431
432 #[test]
434 fn test_counter_increment_saturating() {
435 let consecutive: u32 = u32::MAX;
436 let total: u32 = u32::MAX;
437
438 let new_consecutive = consecutive.saturating_add(1);
439 let new_total = total.saturating_add(1);
440
441 assert_eq!(new_consecutive, u32::MAX);
443 assert_eq!(new_total, u32::MAX);
444 }
445
446 #[test]
448 fn test_counter_increment_normal() {
449 let consecutive: u32 = 5;
450 let total: u32 = 10;
451
452 let new_consecutive = consecutive.saturating_add(1);
453 let new_total = total.saturating_add(1);
454
455 assert_eq!(new_consecutive, 6);
456 assert_eq!(new_total, 11);
457 }
458
459 #[test]
461 fn test_consecutive_reset_on_success() {
462 let total: u32 = 20;
465
466 let new_consecutive = 0;
468 let new_total = total; assert_eq!(new_consecutive, 0);
471 assert_eq!(new_total, 20);
472 }
473
474 #[test]
476 fn test_final_state_triggers_cleanup() {
477 let final_states = vec![
478 TransactionStatus::Confirmed,
479 TransactionStatus::Failed,
480 TransactionStatus::Canceled,
481 TransactionStatus::Expired,
482 ];
483
484 for status in final_states {
485 assert!(
486 is_final_state(&status),
487 "Expected {status:?} to be a final state"
488 );
489 }
490 }
491
492 #[test]
494 fn test_non_final_state_triggers_retry() {
495 let non_final_states = vec![
496 TransactionStatus::Pending,
497 TransactionStatus::Sent,
498 TransactionStatus::Submitted,
499 TransactionStatus::Mined,
500 ];
501
502 for status in non_final_states {
503 assert!(
504 !is_final_state(&status),
505 "Expected {status:?} to NOT be a final state"
506 );
507 }
508 }
509 }
510
511 mod handle_request_result_tests {
512 use super::*;
513
514 #[tokio::test]
515 async fn test_handle_result_ignores_concurrent_update_conflict() {
516 let tx_repo = MockTransactionRepository::new();
517
518 let result = handle_result(
519 Err(TransactionError::ConcurrentUpdateConflict("tx race".to_string()).into()),
520 &tx_repo,
521 "tx-1",
522 Some(TransactionMetadata {
523 consecutive_failures: 2,
524 total_failures: 5,
525 ..Default::default()
526 }),
527 true,
528 )
529 .await;
530
531 assert!(result.is_ok());
532 }
533
534 #[test]
535 fn test_handle_request_result_with_counters() {
536 let result = HandleRequestResult {
537 result: Ok(TransactionRepoModel::default()),
538 metadata: Some(TransactionMetadata {
539 consecutive_failures: 5,
540 total_failures: 10,
541 insufficient_fee_retries: 2,
542 try_again_later_retries: 1,
543 nonce_too_high_retries: 0,
544 }),
545 should_retry_on_error: true,
546 };
547
548 assert!(result.result.is_ok());
549 let meta = result.metadata.unwrap();
550 assert_eq!(meta.consecutive_failures, 5);
551 assert_eq!(meta.total_failures, 10);
552 assert!(result.should_retry_on_error);
553 }
554
555 #[test]
556 fn test_handle_request_result_without_counters() {
557 let result = HandleRequestResult {
559 result: Err(eyre::eyre!("Transaction not found")),
560 metadata: None,
561 should_retry_on_error: false,
562 };
563
564 assert!(result.result.is_err());
565 assert!(result.metadata.is_none());
566 assert!(!result.should_retry_on_error);
567 }
568
569 #[test]
570 fn test_permanent_error_should_not_retry() {
571 let result = HandleRequestResult {
573 result: Err(eyre::eyre!("Transaction not found")),
574 metadata: None,
575 should_retry_on_error: false,
576 };
577
578 assert!(!result.should_retry_on_error);
580 }
581
582 #[test]
583 fn test_transient_error_should_retry() {
584 let result = HandleRequestResult {
586 result: Err(eyre::eyre!("Connection timeout")),
587 metadata: Some(TransactionMetadata {
588 consecutive_failures: 3,
589 total_failures: 7,
590 insufficient_fee_retries: 1,
591 try_again_later_retries: 0,
592 nonce_too_high_retries: 0,
593 }),
594 should_retry_on_error: true,
595 };
596
597 assert!(result.should_retry_on_error);
599 }
600 }
601}