1use chrono::{DateTime, Utc};
6use soroban_rs::xdr::{
7 ContractEventBody, DiagnosticEvent, Error, Hash, InnerTransactionResultResult,
8 InvokeHostFunctionResult, Limits, OperationResult, OperationResultTr, ScVal,
9 TransactionEnvelope, TransactionResultResult, WriteXdr,
10};
11use tracing::{debug, info, warn};
12
13use super::{is_final_state, StellarRelayerTransaction};
14use crate::constants::{
15 get_stellar_max_stuck_transaction_lifetime, STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS,
16 STELLAR_RESUBMIT_GROWTH_FACTOR, STELLAR_RESUBMIT_MAX_INTERVAL_SECONDS,
17};
18use crate::domain::transaction::stellar::prepare::common::send_submit_transaction_job;
19use crate::domain::transaction::stellar::utils::{
20 compute_resubmit_backoff_interval, extract_return_value_from_meta, extract_time_bounds,
21};
22use crate::domain::transaction::util::{get_age_since_created, get_age_since_sent_or_created};
23use crate::domain::xdr_utils::parse_transaction_xdr;
24use crate::{
25 constants::STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS,
26 jobs::{JobProducerTrait, StatusCheckContext, TransactionRequest},
27 models::{
28 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
29 TransactionStatus, TransactionUpdateRequest,
30 },
31 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
32 services::{
33 provider::StellarProviderTrait,
34 signer::{Signer, StellarSignTrait},
35 },
36};
37
38impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
39where
40 R: Repository<RelayerRepoModel, String> + Send + Sync,
41 T: TransactionRepository + Send + Sync,
42 J: JobProducerTrait + Send + Sync,
43 S: Signer + StellarSignTrait + Send + Sync,
44 P: StellarProviderTrait + Send + Sync,
45 C: TransactionCounterTrait + Send + Sync,
46 D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
47{
48 pub async fn handle_transaction_status_impl(
56 &self,
57 tx: TransactionRepoModel,
58 context: Option<StatusCheckContext>,
59 ) -> Result<TransactionRepoModel, TransactionError> {
60 debug!(
61 tx_id = %tx.id,
62 relayer_id = %tx.relayer_id,
63 status = ?tx.status,
64 "handling transaction status"
65 );
66
67 if is_final_state(&tx.status) {
69 debug!(
70 tx_id = %tx.id,
71 relayer_id = %tx.relayer_id,
72 status = ?tx.status,
73 "transaction in final state, skipping status check"
74 );
75 return Ok(tx);
76 }
77
78 if let Some(ref ctx) = context {
80 if ctx.should_force_finalize() {
81 let reason = format!(
82 "Transaction status monitoring failed after {} consecutive errors (total: {}). \
83 Last status: {:?}. Unable to determine final on-chain state.",
84 ctx.consecutive_failures, ctx.total_failures, tx.status
85 );
86 warn!(
87 tx_id = %tx.id,
88 consecutive_failures = ctx.consecutive_failures,
89 total_failures = ctx.total_failures,
90 max_consecutive = ctx.max_consecutive_failures,
91 "circuit breaker triggered, forcing transaction to failed state"
92 );
93 return self.mark_as_failed(tx, reason).await;
97 }
98 }
99
100 match self.status_core(tx.clone()).await {
101 Ok(updated_tx) => {
102 debug!(
103 tx_id = %updated_tx.id,
104 status = ?updated_tx.status,
105 "status check completed successfully"
106 );
107 Ok(updated_tx)
108 }
109 Err(error) => {
110 debug!(
111 tx_id = %tx.id,
112 error = ?error,
113 "status check encountered error"
114 );
115
116 if error.is_concurrent_update_conflict() {
121 info!(
122 tx_id = %tx.id,
123 relayer_id = %tx.relayer_id,
124 "concurrent transaction update detected during status handling, reloading latest state"
125 );
126 return self
127 .transaction_repository()
128 .get_by_id(tx.id.clone())
129 .await
130 .map_err(TransactionError::from);
131 }
132
133 match error {
135 TransactionError::ValidationError(ref msg) => {
136 warn!(
139 tx_id = %tx.id,
140 error = %msg,
141 "validation error detected - marking transaction as failed"
142 );
143
144 self.mark_as_failed(tx, format!("Validation error: {msg}"))
145 .await
146 }
147 _ => {
148 warn!(
151 tx_id = %tx.id,
152 error = ?error,
153 "status check failed with retriable error, will retry"
154 );
155 Err(error)
156 }
157 }
158 }
159 }
160 }
161
162 async fn status_core(
165 &self,
166 tx: TransactionRepoModel,
167 ) -> Result<TransactionRepoModel, TransactionError> {
168 match tx.status {
169 TransactionStatus::Pending => self.handle_pending_state(tx).await,
170 TransactionStatus::Sent => self.handle_sent_state(tx).await,
171 _ => self.handle_submitted_state(tx).await,
172 }
173 }
174
175 pub fn parse_and_validate_hash(
178 &self,
179 tx: &TransactionRepoModel,
180 ) -> Result<Hash, TransactionError> {
181 let stellar_network_data = tx.network_data.get_stellar_transaction_data()?;
182
183 let tx_hash_str = stellar_network_data.hash.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
184 TransactionError::ValidationError(format!(
185 "Stellar transaction {} is missing or has an empty on-chain hash in network_data. Cannot check status.",
186 tx.id
187 ))
188 })?;
189
190 let stellar_hash: Hash = tx_hash_str.parse().map_err(|e: Error| {
191 TransactionError::UnexpectedError(format!(
192 "Failed to parse transaction hash '{}' for tx {}: {:?}. This hash may be corrupted or not a valid Stellar hash.",
193 tx_hash_str, tx.id, e
194 ))
195 })?;
196
197 Ok(stellar_hash)
198 }
199
200 pub(super) async fn mark_as_failed(
202 &self,
203 tx: TransactionRepoModel,
204 reason: String,
205 ) -> Result<TransactionRepoModel, TransactionError> {
206 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as failed");
207
208 let update_request = TransactionUpdateRequest {
209 status: Some(TransactionStatus::Failed),
210 status_reason: Some(reason),
211 ..Default::default()
212 };
213
214 let failed_tx = self
215 .finalize_transaction_state(tx.id.clone(), update_request)
216 .await?;
217
218 if let Err(e) = self.enqueue_next_pending_transaction(&tx.id).await {
220 warn!(error = %e, "failed to enqueue next pending transaction after failure");
221 }
222
223 Ok(failed_tx)
224 }
225
226 pub(super) async fn mark_as_expired(
228 &self,
229 tx: TransactionRepoModel,
230 reason: String,
231 ) -> Result<TransactionRepoModel, TransactionError> {
232 info!(tx_id = %tx.id, reason = %reason, "marking transaction as expired");
233
234 let update_request = TransactionUpdateRequest {
235 status: Some(TransactionStatus::Expired),
236 status_reason: Some(reason),
237 ..Default::default()
238 };
239
240 let expired_tx = self
241 .finalize_transaction_state(tx.id.clone(), update_request)
242 .await?;
243
244 if let Err(e) = self.enqueue_next_pending_transaction(&tx.id).await {
246 warn!(tx_id = %tx.id, relayer_id = %tx.relayer_id, error = %e, "failed to enqueue next pending transaction after expiration");
247 }
248
249 Ok(expired_tx)
250 }
251
252 pub(super) fn is_transaction_expired(
254 &self,
255 tx: &TransactionRepoModel,
256 ) -> Result<bool, TransactionError> {
257 if let Some(valid_until_str) = &tx.valid_until {
258 return Ok(Self::is_valid_until_string_expired(valid_until_str));
259 }
260
261 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
263 if let Some(signed_xdr) = &stellar_data.signed_envelope_xdr {
264 if let Ok(envelope) = parse_transaction_xdr(signed_xdr, true) {
265 if let Some(tb) = extract_time_bounds(&envelope) {
266 if tb.max_time.0 == 0 {
267 return Ok(false); }
269 return Ok(Utc::now().timestamp() as u64 > tb.max_time.0);
270 }
271 }
272 }
273
274 Ok(false)
275 }
276
277 fn is_valid_until_string_expired(valid_until: &str) -> bool {
279 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(valid_until) {
280 return Utc::now() > dt.with_timezone(&Utc);
281 }
282 match valid_until.parse::<i64>() {
283 Ok(0) => false,
284 Ok(ts) => Utc::now().timestamp() > ts,
285 Err(_) => false,
286 }
287 }
288
289 pub async fn handle_stellar_success(
291 &self,
292 tx: TransactionRepoModel,
293 provider_response: soroban_rs::stellar_rpc_client::GetTransactionResponse,
294 ) -> Result<TransactionRepoModel, TransactionError> {
295 let updated_network_data =
297 tx.network_data
298 .get_stellar_transaction_data()
299 .ok()
300 .map(|mut stellar_data| {
301 if let Some(tx_result) = provider_response.result.as_ref() {
303 stellar_data = stellar_data.with_fee(tx_result.fee_charged as u32);
304 }
305
306 if let Some(result_meta) = provider_response.result_meta.as_ref() {
308 if let Some(return_value) = extract_return_value_from_meta(result_meta) {
309 let xdr_base64 = return_value.to_xdr_base64(Limits::none());
310 if let Ok(xdr_base64) = xdr_base64 {
311 stellar_data = stellar_data.with_transaction_result_xdr(xdr_base64);
312 } else {
313 warn!("Failed to serialize return value to XDR base64");
314 }
315 }
316 }
317
318 NetworkTransactionData::Stellar(stellar_data)
319 });
320
321 let update_request = TransactionUpdateRequest {
322 status: Some(TransactionStatus::Confirmed),
323 confirmed_at: Some(Utc::now().to_rfc3339()),
324 network_data: updated_network_data,
325 ..Default::default()
326 };
327
328 let confirmed_tx = self
329 .finalize_transaction_state(tx.id.clone(), update_request)
330 .await?;
331
332 self.enqueue_next_pending_transaction(&tx.id).await?;
333
334 Ok(confirmed_tx)
335 }
336
337 pub async fn handle_stellar_failed(
339 &self,
340 tx: TransactionRepoModel,
341 provider_response: soroban_rs::stellar_rpc_client::GetTransactionResponse,
342 ) -> Result<TransactionRepoModel, TransactionError> {
343 let result_code = provider_response
344 .result
345 .as_ref()
346 .map(|r| r.result.name())
347 .unwrap_or("unknown");
348
349 let (inner_result_code, op_result_code, inner_tx_hash, inner_fee_charged) =
351 match provider_response.result.as_ref().map(|r| &r.result) {
352 Some(TransactionResultResult::TxFeeBumpInnerFailed(pair)) => {
353 let inner = &pair.result.result;
354 let op = match inner {
355 InnerTransactionResultResult::TxFailed(ops) => {
356 first_failing_op(ops.as_slice())
357 }
358 _ => None,
359 };
360 (
361 Some(inner.name()),
362 op,
363 Some(hex::encode(pair.transaction_hash.0)),
364 pair.result.fee_charged,
365 )
366 }
367 Some(TransactionResultResult::TxFailed(ops)) => {
368 (None, first_failing_op(ops.as_slice()), None, 0)
369 }
370 _ => (None, None, None, 0),
371 };
372
373 let fee_charged = provider_response.result.as_ref().map(|r| r.fee_charged);
374 let fee_bid = provider_response.envelope.as_ref().map(extract_fee_bid);
375 let contract_error = extract_contract_error(&provider_response.events.diagnostic_events);
376
377 warn!(
378 tx_id = %tx.id,
379 result_code,
380 inner_result_code = inner_result_code.unwrap_or("n/a"),
381 op_result_code = op_result_code.unwrap_or("n/a"),
382 inner_tx_hash = inner_tx_hash.as_deref().unwrap_or("n/a"),
383 inner_fee_charged,
384 fee_charged = ?fee_charged,
385 fee_bid = ?fee_bid,
386 contract_error = contract_error.as_deref().unwrap_or("n/a"),
387 "stellar transaction failed"
388 );
389
390 let status_reason = format_failure_reason(
391 result_code,
392 inner_result_code,
393 op_result_code,
394 contract_error.as_deref(),
395 );
396
397 let update_request = TransactionUpdateRequest {
398 status: Some(TransactionStatus::Failed),
399 status_reason: Some(status_reason),
400 ..Default::default()
401 };
402
403 let updated_tx = self
404 .finalize_transaction_state(tx.id.clone(), update_request)
405 .await?;
406
407 self.enqueue_next_pending_transaction(&tx.id).await?;
408
409 Ok(updated_tx)
410 }
411
412 async fn check_expiration_and_max_lifetime(
415 &self,
416 tx: TransactionRepoModel,
417 failed_reason: String,
418 ) -> Option<Result<TransactionRepoModel, TransactionError>> {
419 let age = match get_age_since_created(&tx) {
420 Ok(age) => age,
421 Err(e) => return Some(Err(e)),
422 };
423
424 if let Ok(true) = self.is_transaction_expired(&tx) {
426 info!(tx_id = %tx.id, valid_until = ?tx.valid_until, "Transaction has expired");
427 return Some(
428 self.mark_as_expired(tx, "Transaction time_bounds expired".to_string())
429 .await,
430 );
431 }
432
433 if age > get_stellar_max_stuck_transaction_lifetime() {
435 warn!(tx_id = %tx.id, age_minutes = age.num_minutes(),
436 "Transaction exceeded max lifetime, marking as Failed");
437 return Some(self.mark_as_failed(tx, failed_reason).await);
438 }
439
440 None
441 }
442
443 async fn handle_sent_state(
446 &self,
447 tx: TransactionRepoModel,
448 ) -> Result<TransactionRepoModel, TransactionError> {
449 if let Some(result) = self
451 .check_expiration_and_max_lifetime(
452 tx.clone(),
453 "Transaction stuck in Sent status for too long".to_string(),
454 )
455 .await
456 {
457 return result;
458 }
459
460 let total_age = get_age_since_created(&tx)?;
464 if let Some(backoff_interval) = compute_resubmit_backoff_interval(
465 total_age,
466 STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS,
467 STELLAR_RESUBMIT_MAX_INTERVAL_SECONDS,
468 STELLAR_RESUBMIT_GROWTH_FACTOR,
469 ) {
470 let age_since_last_submit = get_age_since_sent_or_created(&tx)?;
471 if age_since_last_submit > backoff_interval {
472 info!(
473 tx_id = %tx.id,
474 total_age_seconds = total_age.num_seconds(),
475 since_last_submit_seconds = age_since_last_submit.num_seconds(),
476 backoff_interval_seconds = backoff_interval.num_seconds(),
477 "re-enqueueing submit job for stuck Sent transaction"
478 );
479 send_submit_transaction_job(self.job_producer(), &tx, None).await?;
480 }
481 }
482
483 Ok(tx)
484 }
485
486 async fn handle_pending_state(
489 &self,
490 tx: TransactionRepoModel,
491 ) -> Result<TransactionRepoModel, TransactionError> {
492 if let Some(result) = self
494 .check_expiration_and_max_lifetime(
495 tx.clone(),
496 "Transaction stuck in Pending status for too long".to_string(),
497 )
498 .await
499 {
500 return result;
501 }
502
503 let age = self.get_time_since_created_at(&tx)?;
505
506 if age.num_seconds() >= STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS {
509 info!(
510 tx_id = %tx.id,
511 age_seconds = age.num_seconds(),
512 "pending transaction without hash may be stuck, scheduling recovery job"
513 );
514
515 let transaction_request = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
516 if let Err(e) = self
517 .job_producer()
518 .produce_transaction_request_job(transaction_request, None)
519 .await
520 {
521 warn!(
522 tx_id = %tx.id,
523 error = %e,
524 "failed to schedule recovery job for pending transaction"
525 );
526 }
527 } else {
528 debug!(
529 tx_id = %tx.id,
530 age_seconds = age.num_seconds(),
531 "pending transaction without hash too young for recovery check"
532 );
533 }
534
535 Ok(tx)
536 }
537
538 fn get_time_since_created_at(
541 &self,
542 tx: &TransactionRepoModel,
543 ) -> Result<chrono::Duration, TransactionError> {
544 match DateTime::parse_from_rfc3339(&tx.created_at) {
545 Ok(dt) => Ok(Utc::now().signed_duration_since(dt.with_timezone(&Utc))),
546 Err(e) => {
547 warn!(tx_id = %tx.id, ts = %tx.created_at, error = %e, "failed to parse created_at timestamp");
548 Err(TransactionError::UnexpectedError(format!(
549 "Invalid created_at timestamp for transaction {}: {}",
550 tx.id, e
551 )))
552 }
553 }
554 }
555
556 async fn handle_submitted_state(
560 &self,
561 tx: TransactionRepoModel,
562 ) -> Result<TransactionRepoModel, TransactionError> {
563 let stellar_hash = match self.parse_and_validate_hash(&tx) {
564 Ok(hash) => hash,
565 Err(e) => {
566 warn!(
568 tx_id = %tx.id,
569 status = ?tx.status,
570 error = ?e,
571 "failed to parse and validate hash for submitted transaction"
572 );
573 return self
574 .mark_as_failed(tx, format!("Failed to parse and validate hash: {e}"))
575 .await;
576 }
577 };
578
579 let provider_response = match self.provider().get_transaction(&stellar_hash).await {
580 Ok(response) => response,
581 Err(e) => {
582 warn!(error = ?e, "provider get_transaction failed");
583 return Err(TransactionError::from(e));
584 }
585 };
586
587 match provider_response.status.as_str().to_uppercase().as_str() {
588 "SUCCESS" => self.handle_stellar_success(tx, provider_response).await,
589 "FAILED" => self.handle_stellar_failed(tx, provider_response).await,
590 _ => {
591 debug!(
592 tx_id = %tx.id,
593 relayer_id = %tx.relayer_id,
594 status = %provider_response.status,
595 "submitted transaction not yet final on-chain, will retry check later"
596 );
597
598 if let Some(result) = self
600 .check_expiration_and_max_lifetime(
601 tx.clone(),
602 "Transaction stuck in Submitted status for too long".to_string(),
603 )
604 .await
605 {
606 return result;
607 }
608
609 let total_age = get_age_since_created(&tx)?;
612 if let Some(backoff_interval) = compute_resubmit_backoff_interval(
613 total_age,
614 STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS,
615 STELLAR_RESUBMIT_MAX_INTERVAL_SECONDS,
616 STELLAR_RESUBMIT_GROWTH_FACTOR,
617 ) {
618 let age_since_last_submit = get_age_since_sent_or_created(&tx)?;
619 if age_since_last_submit > backoff_interval {
620 info!(
621 tx_id = %tx.id,
622 relayer_id = %tx.relayer_id,
623 total_age_seconds = total_age.num_seconds(),
624 since_last_submit_seconds = age_since_last_submit.num_seconds(),
625 backoff_interval_seconds = backoff_interval.num_seconds(),
626 "resubmitting Submitted transaction to ensure mempool inclusion"
627 );
628 send_submit_transaction_job(self.job_producer(), &tx, None).await?;
629 }
630 }
631
632 Ok(tx)
633 }
634 }
635 }
636}
637
638fn extract_fee_bid(envelope: &TransactionEnvelope) -> i64 {
643 match envelope {
644 TransactionEnvelope::TxFeeBump(fb) => fb.tx.fee,
645 TransactionEnvelope::Tx(v1) => v1.tx.fee as i64,
646 TransactionEnvelope::TxV0(v0) => v0.tx.fee as i64,
647 }
648}
649
650fn first_failing_op(ops: &[OperationResult]) -> Option<&'static str> {
655 let op = ops.iter().find(|op| match op {
656 OperationResult::OpInner(tr) => match tr {
657 OperationResultTr::InvokeHostFunction(r) => {
658 !matches!(r, InvokeHostFunctionResult::Success(_))
659 }
660 OperationResultTr::ExtendFootprintTtl(r) => r.name() != "Success",
661 OperationResultTr::RestoreFootprint(r) => r.name() != "Success",
662 _ => false,
663 },
664 _ => true,
665 })?;
666 match op {
667 OperationResult::OpInner(tr) => match tr {
668 OperationResultTr::InvokeHostFunction(r) => Some(r.name()),
669 OperationResultTr::ExtendFootprintTtl(r) => Some(r.name()),
670 OperationResultTr::RestoreFootprint(r) => Some(r.name()),
671 _ => Some(tr.name()),
672 },
673 _ => Some(op.name()),
674 }
675}
676
677fn format_failure_reason(
680 outer: &str,
681 inner: Option<&str>,
682 op: Option<&str>,
683 contract_error: Option<&str>,
684) -> String {
685 let mut s = format!("Transaction failed on-chain. reason={outer}");
686 if let Some(inner) = inner {
687 s.push_str(" inner=");
688 s.push_str(inner);
689 }
690 if let Some(op) = op {
691 s.push_str(" op=");
692 s.push_str(op);
693 }
694 if let Some(ce) = contract_error {
695 s.push_str(" contract_error=");
696 s.push_str(ce);
697 }
698 s
699}
700
701fn extract_contract_error(events: &[DiagnosticEvent]) -> Option<String> {
706 for evt in events {
707 let ContractEventBody::V0(body) = &evt.event.body;
708 let mut error_str: Option<String> = None;
709 let mut message: Option<String> = None;
710 for v in body.topics.iter().chain(std::iter::once(&body.data)) {
711 scan_scval(v, &mut error_str, &mut message);
712 if error_str.is_some() && message.is_some() {
713 break;
714 }
715 }
716 if let Some(err) = error_str {
717 return Some(match message {
718 Some(m) => format!("{err} message=\"{}\"", sanitize_message(&m)),
719 None => err,
720 });
721 }
722 }
723 None
724}
725
726fn scan_scval(v: &ScVal, error_str: &mut Option<String>, message: &mut Option<String>) {
727 match v {
728 ScVal::Error(e) => {
729 if error_str.is_none() {
730 let payload = match e {
731 soroban_rs::xdr::ScError::Contract(n) => n.to_string(),
732 soroban_rs::xdr::ScError::WasmVm(c)
733 | soroban_rs::xdr::ScError::Context(c)
734 | soroban_rs::xdr::ScError::Storage(c)
735 | soroban_rs::xdr::ScError::Object(c)
736 | soroban_rs::xdr::ScError::Crypto(c)
737 | soroban_rs::xdr::ScError::Events(c)
738 | soroban_rs::xdr::ScError::Budget(c)
739 | soroban_rs::xdr::ScError::Value(c)
740 | soroban_rs::xdr::ScError::Auth(c) => c.name().to_string(),
741 };
742 *error_str = Some(format!("{}({payload})", e.name()));
743 }
744 }
745 ScVal::String(s) => {
746 if message.is_none() {
747 let bytes: &[u8] = s.as_ref();
748 if let Ok(text) = std::str::from_utf8(bytes) {
749 if !text.is_empty() {
750 *message = Some(text.to_string());
751 }
752 }
753 }
754 }
755 ScVal::Symbol(sym) => {
756 if message.is_none() {
757 let bytes: &[u8] = sym.as_ref();
758 if let Ok(text) = std::str::from_utf8(bytes) {
759 if !text.is_empty() && text != "error" {
761 *message = Some(text.to_string());
762 }
763 }
764 }
765 }
766 ScVal::Vec(Some(items)) => {
767 for inner in items.iter() {
768 scan_scval(inner, error_str, message);
769 if error_str.is_some() && message.is_some() {
770 return;
771 }
772 }
773 }
774 _ => {}
775 }
776}
777
778fn sanitize_message(s: &str) -> String {
779 let mut out = String::with_capacity(s.len());
780 for c in s.chars() {
781 if c.is_control() {
782 continue;
783 }
784 if c == '"' {
785 out.push('\\');
786 }
787 out.push(c);
788 }
789 out
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::models::{NetworkTransactionData, RepositoryError};
796 use crate::repositories::PaginatedResult;
797 use chrono::Duration;
798 use mockall::predicate::eq;
799 use soroban_rs::stellar_rpc_client::GetTransactionResponse;
800
801 use crate::domain::transaction::stellar::test_helpers::*;
802
803 fn dummy_get_transaction_response(status: &str) -> GetTransactionResponse {
804 GetTransactionResponse {
805 status: status.to_string(),
806 ledger: None,
807 envelope: None,
808 result: None,
809 result_meta: None,
810 events: soroban_rs::stellar_rpc_client::GetTransactionEvents {
811 contract_events: vec![],
812 diagnostic_events: vec![],
813 transaction_events: vec![],
814 },
815 }
816 }
817
818 fn dummy_get_transaction_response_with_result_meta(
819 status: &str,
820 has_return_value: bool,
821 ) -> GetTransactionResponse {
822 use soroban_rs::xdr::{ScVal, SorobanTransactionMeta, TransactionMeta, TransactionMetaV3};
823
824 let result_meta = if has_return_value {
825 let return_value = ScVal::I32(42);
827 Some(TransactionMeta::V3(TransactionMetaV3 {
828 ext: soroban_rs::xdr::ExtensionPoint::V0,
829 tx_changes_before: soroban_rs::xdr::LedgerEntryChanges::default(),
830 operations: soroban_rs::xdr::VecM::default(),
831 tx_changes_after: soroban_rs::xdr::LedgerEntryChanges::default(),
832 soroban_meta: Some(SorobanTransactionMeta {
833 ext: soroban_rs::xdr::SorobanTransactionMetaExt::V0,
834 return_value,
835 events: soroban_rs::xdr::VecM::default(),
836 diagnostic_events: soroban_rs::xdr::VecM::default(),
837 }),
838 }))
839 } else {
840 None
841 };
842
843 GetTransactionResponse {
844 status: status.to_string(),
845 ledger: None,
846 envelope: None,
847 result: None,
848 result_meta,
849 events: soroban_rs::stellar_rpc_client::GetTransactionEvents {
850 contract_events: vec![],
851 diagnostic_events: vec![],
852 transaction_events: vec![],
853 },
854 }
855 }
856
857 mod handle_transaction_status_tests {
858 use crate::services::provider::ProviderError;
859
860 use super::*;
861
862 #[tokio::test]
863 async fn handle_transaction_status_confirmed_triggers_next() {
864 let relayer = create_test_relayer();
865 let mut mocks = default_test_mocks();
866
867 let mut tx_to_handle = create_test_transaction(&relayer.id);
868 tx_to_handle.id = "tx-confirm-this".to_string();
869 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
870 let tx_hash_bytes = [1u8; 32];
871 let tx_hash_hex = hex::encode(tx_hash_bytes);
872 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
873 {
874 stellar_data.hash = Some(tx_hash_hex.clone());
875 } else {
876 panic!("Expected Stellar network data for tx_to_handle");
877 }
878 tx_to_handle.status = TransactionStatus::Submitted;
879
880 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
881
882 mocks
884 .provider
885 .expect_get_transaction()
886 .with(eq(expected_stellar_hash.clone()))
887 .times(1)
888 .returning(move |_| {
889 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
890 });
891
892 mocks
894 .tx_repo
895 .expect_partial_update()
896 .withf(move |id, update| {
897 id == "tx-confirm-this"
898 && update.status == Some(TransactionStatus::Confirmed)
899 && update.confirmed_at.is_some()
900 })
901 .times(1)
902 .returning(move |id, update| {
903 let mut updated_tx = tx_to_handle.clone(); updated_tx.id = id;
905 updated_tx.status = update.status.unwrap();
906 updated_tx.confirmed_at = update.confirmed_at;
907 Ok(updated_tx)
908 });
909
910 mocks
912 .job_producer
913 .expect_produce_send_notification_job()
914 .times(1)
915 .returning(|_, _| Box::pin(async { Ok(()) }));
916
917 let mut oldest_pending_tx = create_test_transaction(&relayer.id);
919 oldest_pending_tx.id = "tx-oldest-pending".to_string();
920 oldest_pending_tx.status = TransactionStatus::Pending;
921 let captured_oldest_pending_tx = oldest_pending_tx.clone();
922 let relayer_id_clone = relayer.id.clone();
923 mocks
924 .tx_repo
925 .expect_find_by_status_paginated()
926 .withf(move |relayer_id, statuses, query, oldest_first| {
927 *relayer_id == relayer_id_clone
928 && statuses == [TransactionStatus::Pending]
929 && query.page == 1
930 && query.per_page == 1
931 && *oldest_first
932 })
933 .times(1)
934 .returning(move |_, _, _, _| {
935 Ok(PaginatedResult {
936 items: vec![captured_oldest_pending_tx.clone()],
937 total: 1,
938 page: 1,
939 per_page: 1,
940 })
941 });
942
943 mocks
945 .job_producer
946 .expect_produce_transaction_request_job()
947 .withf(move |job, _delay| job.transaction_id == "tx-oldest-pending")
948 .times(1)
949 .returning(|_, _| Box::pin(async { Ok(()) }));
950
951 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
952 let mut initial_tx_for_handling = create_test_transaction(&relayer.id);
953 initial_tx_for_handling.id = "tx-confirm-this".to_string();
954 initial_tx_for_handling.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
955 if let NetworkTransactionData::Stellar(ref mut stellar_data) =
956 initial_tx_for_handling.network_data
957 {
958 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
959 } else {
960 panic!("Expected Stellar network data for initial_tx_for_handling");
961 }
962 initial_tx_for_handling.status = TransactionStatus::Submitted;
963
964 let result = handler
965 .handle_transaction_status_impl(initial_tx_for_handling, None)
966 .await;
967
968 assert!(result.is_ok());
969 let handled_tx = result.unwrap();
970 assert_eq!(handled_tx.id, "tx-confirm-this");
971 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
972 assert!(handled_tx.confirmed_at.is_some());
973 }
974
975 #[tokio::test]
976 async fn handle_transaction_status_still_pending() {
977 let relayer = create_test_relayer();
978 let mut mocks = default_test_mocks();
979
980 let mut tx_to_handle = create_test_transaction(&relayer.id);
981 tx_to_handle.id = "tx-pending-check".to_string();
982 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
983 let tx_hash_bytes = [2u8; 32];
984 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
985 {
986 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
987 } else {
988 panic!("Expected Stellar network data");
989 }
990 tx_to_handle.status = TransactionStatus::Submitted; let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
993
994 mocks
996 .provider
997 .expect_get_transaction()
998 .with(eq(expected_stellar_hash.clone()))
999 .times(1)
1000 .returning(move |_| {
1001 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
1002 });
1003
1004 mocks.tx_repo.expect_partial_update().never();
1006
1007 mocks
1009 .job_producer
1010 .expect_produce_send_notification_job()
1011 .never();
1012
1013 mocks
1015 .job_producer
1016 .expect_produce_submit_transaction_job()
1017 .times(1)
1018 .returning(|_, _| Box::pin(async { Ok(()) }));
1019
1020 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1021 let original_tx_clone = tx_to_handle.clone();
1022
1023 let result = handler
1024 .handle_transaction_status_impl(tx_to_handle, None)
1025 .await;
1026
1027 assert!(result.is_ok());
1028 let returned_tx = result.unwrap();
1029 assert_eq!(returned_tx.id, original_tx_clone.id);
1031 assert_eq!(returned_tx.status, original_tx_clone.status);
1032 assert!(returned_tx.confirmed_at.is_none()); }
1034
1035 #[tokio::test]
1036 async fn handle_transaction_status_failed() {
1037 let relayer = create_test_relayer();
1038 let mut mocks = default_test_mocks();
1039
1040 let mut tx_to_handle = create_test_transaction(&relayer.id);
1041 tx_to_handle.id = "tx-fail-this".to_string();
1042 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1043 let tx_hash_bytes = [3u8; 32];
1044 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1045 {
1046 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1047 } else {
1048 panic!("Expected Stellar network data");
1049 }
1050 tx_to_handle.status = TransactionStatus::Submitted;
1051
1052 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1053
1054 mocks
1056 .provider
1057 .expect_get_transaction()
1058 .with(eq(expected_stellar_hash.clone()))
1059 .times(1)
1060 .returning(move |_| {
1061 Box::pin(async { Ok(dummy_get_transaction_response("FAILED")) })
1062 });
1063
1064 let relayer_id_for_mock = relayer.id.clone();
1066 mocks
1067 .tx_repo
1068 .expect_partial_update()
1069 .times(1)
1070 .returning(move |id, update| {
1071 let mut updated_tx = create_test_transaction(&relayer_id_for_mock);
1073 updated_tx.id = id;
1074 updated_tx.status = update.status.unwrap();
1075 updated_tx.status_reason = update.status_reason.clone();
1076 Ok::<_, RepositoryError>(updated_tx)
1077 });
1078
1079 mocks
1081 .job_producer
1082 .expect_produce_send_notification_job()
1083 .times(1)
1084 .returning(|_, _| Box::pin(async { Ok(()) }));
1085
1086 let relayer_id_clone = relayer.id.clone();
1088 mocks
1089 .tx_repo
1090 .expect_find_by_status_paginated()
1091 .withf(move |relayer_id, statuses, query, oldest_first| {
1092 *relayer_id == relayer_id_clone
1093 && statuses == [TransactionStatus::Pending]
1094 && query.page == 1
1095 && query.per_page == 1
1096 && *oldest_first
1097 })
1098 .times(1)
1099 .returning(move |_, _, _, _| {
1100 Ok(PaginatedResult {
1101 items: vec![],
1102 total: 0,
1103 page: 1,
1104 per_page: 1,
1105 })
1106 }); mocks
1110 .job_producer
1111 .expect_produce_transaction_request_job()
1112 .never();
1113 mocks
1115 .job_producer
1116 .expect_produce_check_transaction_status_job()
1117 .never();
1118
1119 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1120 let mut initial_tx_for_handling = create_test_transaction(&relayer.id);
1121 initial_tx_for_handling.id = "tx-fail-this".to_string();
1122 initial_tx_for_handling.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1123 if let NetworkTransactionData::Stellar(ref mut stellar_data) =
1124 initial_tx_for_handling.network_data
1125 {
1126 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1127 } else {
1128 panic!("Expected Stellar network data");
1129 }
1130 initial_tx_for_handling.status = TransactionStatus::Submitted;
1131
1132 let result = handler
1133 .handle_transaction_status_impl(initial_tx_for_handling, None)
1134 .await;
1135
1136 assert!(result.is_ok());
1137 let handled_tx = result.unwrap();
1138 assert_eq!(handled_tx.id, "tx-fail-this");
1139 assert_eq!(handled_tx.status, TransactionStatus::Failed);
1140 assert!(handled_tx.status_reason.is_some());
1141 assert_eq!(
1142 handled_tx.status_reason.unwrap(),
1143 "Transaction failed on-chain. reason=unknown"
1144 );
1145 }
1146
1147 #[tokio::test]
1148 async fn handle_transaction_status_provider_error() {
1149 let relayer = create_test_relayer();
1150 let mut mocks = default_test_mocks();
1151
1152 let mut tx_to_handle = create_test_transaction(&relayer.id);
1153 tx_to_handle.id = "tx-provider-error".to_string();
1154 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1155 let tx_hash_bytes = [4u8; 32];
1156 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1157 {
1158 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1159 } else {
1160 panic!("Expected Stellar network data");
1161 }
1162 tx_to_handle.status = TransactionStatus::Submitted;
1163
1164 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1165
1166 mocks
1168 .provider
1169 .expect_get_transaction()
1170 .with(eq(expected_stellar_hash.clone()))
1171 .times(1)
1172 .returning(move |_| {
1173 Box::pin(async { Err(ProviderError::Other("RPC boom".to_string())) })
1174 });
1175
1176 mocks.tx_repo.expect_partial_update().never();
1178
1179 mocks
1181 .job_producer
1182 .expect_produce_send_notification_job()
1183 .never();
1184 mocks
1186 .job_producer
1187 .expect_produce_transaction_request_job()
1188 .never();
1189
1190 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1191
1192 let result = handler
1193 .handle_transaction_status_impl(tx_to_handle, None)
1194 .await;
1195
1196 assert!(result.is_err());
1198 matches!(result.unwrap_err(), TransactionError::UnderlyingProvider(_));
1199 }
1200
1201 #[tokio::test]
1202 async fn handle_transaction_status_no_hashes() {
1203 let relayer = create_test_relayer();
1204 let mut mocks = default_test_mocks();
1205
1206 let mut tx_to_handle = create_test_transaction(&relayer.id);
1207 tx_to_handle.id = "tx-no-hashes".to_string();
1208 tx_to_handle.status = TransactionStatus::Submitted;
1209 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1210
1211 mocks.provider.expect_get_transaction().never();
1213
1214 mocks
1216 .tx_repo
1217 .expect_partial_update()
1218 .times(1)
1219 .returning(|_, update| {
1220 let mut updated_tx = create_test_transaction("test-relayer");
1221 updated_tx.status = update.status.unwrap_or(updated_tx.status);
1222 updated_tx.status_reason = update.status_reason.clone();
1223 Ok(updated_tx)
1224 });
1225
1226 mocks
1228 .job_producer
1229 .expect_produce_send_notification_job()
1230 .times(1)
1231 .returning(|_, _| Box::pin(async { Ok(()) }));
1232
1233 let relayer_id_clone = relayer.id.clone();
1235 mocks
1236 .tx_repo
1237 .expect_find_by_status_paginated()
1238 .withf(move |relayer_id, statuses, query, oldest_first| {
1239 *relayer_id == relayer_id_clone
1240 && statuses == [TransactionStatus::Pending]
1241 && query.page == 1
1242 && query.per_page == 1
1243 && *oldest_first
1244 })
1245 .times(1)
1246 .returning(move |_, _, _, _| {
1247 Ok(PaginatedResult {
1248 items: vec![],
1249 total: 0,
1250 page: 1,
1251 per_page: 1,
1252 })
1253 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1256 let result = handler
1257 .handle_transaction_status_impl(tx_to_handle, None)
1258 .await;
1259
1260 assert!(result.is_ok(), "Expected Ok result");
1262 let updated_tx = result.unwrap();
1263 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1264 assert!(
1265 updated_tx
1266 .status_reason
1267 .as_ref()
1268 .unwrap()
1269 .contains("Failed to parse and validate hash"),
1270 "Expected hash validation error in status_reason, got: {:?}",
1271 updated_tx.status_reason
1272 );
1273 }
1274
1275 #[tokio::test]
1276 async fn test_on_chain_failure_does_not_decrement_sequence() {
1277 let relayer = create_test_relayer();
1278 let mut mocks = default_test_mocks();
1279
1280 let mut tx_to_handle = create_test_transaction(&relayer.id);
1281 tx_to_handle.id = "tx-on-chain-fail".to_string();
1282 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1283 let tx_hash_bytes = [4u8; 32];
1284 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1285 {
1286 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1287 stellar_data.sequence_number = Some(100); }
1289 tx_to_handle.status = TransactionStatus::Submitted;
1290
1291 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1292
1293 mocks
1295 .provider
1296 .expect_get_transaction()
1297 .with(eq(expected_stellar_hash.clone()))
1298 .times(1)
1299 .returning(move |_| {
1300 Box::pin(async { Ok(dummy_get_transaction_response("FAILED")) })
1301 });
1302
1303 mocks.counter.expect_decrement().never();
1305
1306 mocks
1308 .tx_repo
1309 .expect_partial_update()
1310 .times(1)
1311 .returning(move |id, update| {
1312 let mut updated_tx = create_test_transaction("test");
1313 updated_tx.id = id;
1314 updated_tx.status = update.status.unwrap();
1315 updated_tx.status_reason = update.status_reason.clone();
1316 Ok::<_, RepositoryError>(updated_tx)
1317 });
1318
1319 mocks
1321 .job_producer
1322 .expect_produce_send_notification_job()
1323 .times(1)
1324 .returning(|_, _| Box::pin(async { Ok(()) }));
1325
1326 mocks
1328 .tx_repo
1329 .expect_find_by_status_paginated()
1330 .returning(move |_, _, _, _| {
1331 Ok(PaginatedResult {
1332 items: vec![],
1333 total: 0,
1334 page: 1,
1335 per_page: 1,
1336 })
1337 });
1338
1339 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1340 let initial_tx = tx_to_handle.clone();
1341
1342 let result = handler
1343 .handle_transaction_status_impl(initial_tx, None)
1344 .await;
1345
1346 assert!(result.is_ok());
1347 let handled_tx = result.unwrap();
1348 assert_eq!(handled_tx.id, "tx-on-chain-fail");
1349 assert_eq!(handled_tx.status, TransactionStatus::Failed);
1350 }
1351
1352 #[tokio::test]
1353 async fn test_on_chain_success_does_not_decrement_sequence() {
1354 let relayer = create_test_relayer();
1355 let mut mocks = default_test_mocks();
1356
1357 let mut tx_to_handle = create_test_transaction(&relayer.id);
1358 tx_to_handle.id = "tx-on-chain-success".to_string();
1359 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1360 let tx_hash_bytes = [5u8; 32];
1361 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1362 {
1363 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1364 stellar_data.sequence_number = Some(101); }
1366 tx_to_handle.status = TransactionStatus::Submitted;
1367
1368 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1369
1370 mocks
1372 .provider
1373 .expect_get_transaction()
1374 .with(eq(expected_stellar_hash.clone()))
1375 .times(1)
1376 .returning(move |_| {
1377 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
1378 });
1379
1380 mocks.counter.expect_decrement().never();
1382
1383 mocks
1385 .tx_repo
1386 .expect_partial_update()
1387 .withf(move |id, update| {
1388 id == "tx-on-chain-success"
1389 && update.status == Some(TransactionStatus::Confirmed)
1390 && update.confirmed_at.is_some()
1391 })
1392 .times(1)
1393 .returning(move |id, update| {
1394 let mut updated_tx = create_test_transaction("test");
1395 updated_tx.id = id;
1396 updated_tx.status = update.status.unwrap();
1397 updated_tx.confirmed_at = update.confirmed_at;
1398 Ok(updated_tx)
1399 });
1400
1401 mocks
1403 .job_producer
1404 .expect_produce_send_notification_job()
1405 .times(1)
1406 .returning(|_, _| Box::pin(async { Ok(()) }));
1407
1408 mocks
1410 .tx_repo
1411 .expect_find_by_status_paginated()
1412 .returning(move |_, _, _, _| {
1413 Ok(PaginatedResult {
1414 items: vec![],
1415 total: 0,
1416 page: 1,
1417 per_page: 1,
1418 })
1419 });
1420
1421 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1422 let initial_tx = tx_to_handle.clone();
1423
1424 let result = handler
1425 .handle_transaction_status_impl(initial_tx, None)
1426 .await;
1427
1428 assert!(result.is_ok());
1429 let handled_tx = result.unwrap();
1430 assert_eq!(handled_tx.id, "tx-on-chain-success");
1431 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
1432 }
1433
1434 #[tokio::test]
1435 async fn test_handle_transaction_status_with_xdr_error_requeues() {
1436 let relayer = create_test_relayer();
1438 let mut mocks = default_test_mocks();
1439
1440 let mut tx_to_handle = create_test_transaction(&relayer.id);
1441 tx_to_handle.id = "tx-xdr-error-requeue".to_string();
1442 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1443 let tx_hash_bytes = [8u8; 32];
1444 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1445 {
1446 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1447 }
1448 tx_to_handle.status = TransactionStatus::Submitted;
1449
1450 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1451
1452 mocks
1454 .provider
1455 .expect_get_transaction()
1456 .with(eq(expected_stellar_hash.clone()))
1457 .times(1)
1458 .returning(move |_| {
1459 Box::pin(async { Err(ProviderError::Other("Network timeout".to_string())) })
1460 });
1461
1462 mocks.tx_repo.expect_partial_update().never();
1464 mocks
1465 .job_producer
1466 .expect_produce_send_notification_job()
1467 .never();
1468
1469 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1470
1471 let result = handler
1472 .handle_transaction_status_impl(tx_to_handle, None)
1473 .await;
1474
1475 assert!(result.is_err());
1477 matches!(result.unwrap_err(), TransactionError::UnderlyingProvider(_));
1478 }
1479
1480 #[tokio::test]
1481 async fn handle_transaction_status_extracts_transaction_result_xdr() {
1482 let relayer = create_test_relayer();
1483 let mut mocks = default_test_mocks();
1484
1485 let mut tx_to_handle = create_test_transaction(&relayer.id);
1486 tx_to_handle.id = "tx-with-result".to_string();
1487 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1488 let tx_hash_bytes = [9u8; 32];
1489 let tx_hash_hex = hex::encode(tx_hash_bytes);
1490 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1491 {
1492 stellar_data.hash = Some(tx_hash_hex.clone());
1493 } else {
1494 panic!("Expected Stellar network data");
1495 }
1496 tx_to_handle.status = TransactionStatus::Submitted;
1497
1498 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1499
1500 mocks
1502 .provider
1503 .expect_get_transaction()
1504 .with(eq(expected_stellar_hash.clone()))
1505 .times(1)
1506 .returning(move |_| {
1507 Box::pin(async {
1508 Ok(dummy_get_transaction_response_with_result_meta(
1509 "SUCCESS", true,
1510 ))
1511 })
1512 });
1513
1514 let tx_to_handle_clone = tx_to_handle.clone();
1516 mocks
1517 .tx_repo
1518 .expect_partial_update()
1519 .withf(move |id, update| {
1520 id == "tx-with-result"
1521 && update.status == Some(TransactionStatus::Confirmed)
1522 && update.confirmed_at.is_some()
1523 && update.network_data.as_ref().is_some_and(|and| {
1524 if let NetworkTransactionData::Stellar(stellar_data) = and {
1525 stellar_data.transaction_result_xdr.is_some()
1527 } else {
1528 false
1529 }
1530 })
1531 })
1532 .times(1)
1533 .returning(move |id, update| {
1534 let mut updated_tx = tx_to_handle_clone.clone();
1535 updated_tx.id = id;
1536 updated_tx.status = update.status.unwrap();
1537 updated_tx.confirmed_at = update.confirmed_at;
1538 if let Some(network_data) = update.network_data {
1539 updated_tx.network_data = network_data;
1540 }
1541 Ok(updated_tx)
1542 });
1543
1544 mocks
1546 .job_producer
1547 .expect_produce_send_notification_job()
1548 .times(1)
1549 .returning(|_, _| Box::pin(async { Ok(()) }));
1550
1551 mocks
1553 .tx_repo
1554 .expect_find_by_status_paginated()
1555 .returning(move |_, _, _, _| {
1556 Ok(PaginatedResult {
1557 items: vec![],
1558 total: 0,
1559 page: 1,
1560 per_page: 1,
1561 })
1562 });
1563
1564 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1565 let result = handler
1566 .handle_transaction_status_impl(tx_to_handle, None)
1567 .await;
1568
1569 assert!(result.is_ok());
1570 let handled_tx = result.unwrap();
1571 assert_eq!(handled_tx.id, "tx-with-result");
1572 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
1573
1574 if let NetworkTransactionData::Stellar(stellar_data) = handled_tx.network_data {
1576 assert!(
1577 stellar_data.transaction_result_xdr.is_some(),
1578 "transaction_result_xdr should be stored when result_meta contains return_value"
1579 );
1580 } else {
1581 panic!("Expected Stellar network data");
1582 }
1583 }
1584
1585 #[tokio::test]
1586 async fn handle_transaction_status_no_result_meta_does_not_store_xdr() {
1587 let relayer = create_test_relayer();
1588 let mut mocks = default_test_mocks();
1589
1590 let mut tx_to_handle = create_test_transaction(&relayer.id);
1591 tx_to_handle.id = "tx-no-result-meta".to_string();
1592 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1593 let tx_hash_bytes = [10u8; 32];
1594 let tx_hash_hex = hex::encode(tx_hash_bytes);
1595 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1596 {
1597 stellar_data.hash = Some(tx_hash_hex.clone());
1598 } else {
1599 panic!("Expected Stellar network data");
1600 }
1601 tx_to_handle.status = TransactionStatus::Submitted;
1602
1603 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1604
1605 mocks
1607 .provider
1608 .expect_get_transaction()
1609 .with(eq(expected_stellar_hash.clone()))
1610 .times(1)
1611 .returning(move |_| {
1612 Box::pin(async {
1613 Ok(dummy_get_transaction_response_with_result_meta(
1614 "SUCCESS", false,
1615 ))
1616 })
1617 });
1618
1619 let tx_to_handle_clone = tx_to_handle.clone();
1621 mocks
1622 .tx_repo
1623 .expect_partial_update()
1624 .times(1)
1625 .returning(move |id, update| {
1626 let mut updated_tx = tx_to_handle_clone.clone();
1627 updated_tx.id = id;
1628 updated_tx.status = update.status.unwrap();
1629 updated_tx.confirmed_at = update.confirmed_at;
1630 if let Some(network_data) = update.network_data {
1631 updated_tx.network_data = network_data;
1632 }
1633 Ok(updated_tx)
1634 });
1635
1636 mocks
1638 .job_producer
1639 .expect_produce_send_notification_job()
1640 .times(1)
1641 .returning(|_, _| Box::pin(async { Ok(()) }));
1642
1643 mocks
1645 .tx_repo
1646 .expect_find_by_status_paginated()
1647 .returning(move |_, _, _, _| {
1648 Ok(PaginatedResult {
1649 items: vec![],
1650 total: 0,
1651 page: 1,
1652 per_page: 1,
1653 })
1654 });
1655
1656 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1657 let result = handler
1658 .handle_transaction_status_impl(tx_to_handle, None)
1659 .await;
1660
1661 assert!(result.is_ok());
1662 let handled_tx = result.unwrap();
1663
1664 if let NetworkTransactionData::Stellar(stellar_data) = handled_tx.network_data {
1666 assert!(
1667 stellar_data.transaction_result_xdr.is_none(),
1668 "transaction_result_xdr should be None when result_meta is missing"
1669 );
1670 } else {
1671 panic!("Expected Stellar network data");
1672 }
1673 }
1674
1675 #[tokio::test]
1676 async fn test_sent_transaction_not_stuck_yet_returns_ok() {
1677 let relayer = create_test_relayer();
1679 let mut mocks = default_test_mocks();
1680
1681 let mut tx = create_test_transaction(&relayer.id);
1682 tx.id = "tx-sent-not-stuck".to_string();
1683 tx.status = TransactionStatus::Sent;
1684 tx.created_at = Utc::now().to_rfc3339();
1686 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1688 stellar_data.hash = None;
1689 }
1690
1691 mocks.provider.expect_get_transaction().never();
1693 mocks.tx_repo.expect_partial_update().never();
1694 mocks
1695 .job_producer
1696 .expect_produce_submit_transaction_job()
1697 .never();
1698
1699 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1700 let result = handler
1701 .handle_transaction_status_impl(tx.clone(), None)
1702 .await;
1703
1704 assert!(result.is_ok());
1705 let returned_tx = result.unwrap();
1706 assert_eq!(returned_tx.id, tx.id);
1708 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1709 }
1710
1711 #[tokio::test]
1712 async fn test_stuck_sent_transaction_reenqueues_submit_job() {
1713 let relayer = create_test_relayer();
1716 let mut mocks = default_test_mocks();
1717
1718 let mut tx = create_test_transaction(&relayer.id);
1719 tx.id = "tx-stuck-with-xdr".to_string();
1720 tx.status = TransactionStatus::Sent;
1721 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
1723 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1725 stellar_data.hash = None;
1726 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1727 }
1728
1729 mocks
1731 .job_producer
1732 .expect_produce_submit_transaction_job()
1733 .times(1)
1734 .returning(|_, _| Box::pin(async { Ok(()) }));
1735
1736 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1737 let result = handler
1738 .handle_transaction_status_impl(tx.clone(), None)
1739 .await;
1740
1741 assert!(result.is_ok());
1742 let returned_tx = result.unwrap();
1743 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1745 }
1746
1747 #[tokio::test]
1748 async fn test_stuck_sent_transaction_expired_marks_expired() {
1749 let relayer = create_test_relayer();
1751 let mut mocks = default_test_mocks();
1752
1753 let mut tx = create_test_transaction(&relayer.id);
1754 tx.id = "tx-expired".to_string();
1755 tx.status = TransactionStatus::Sent;
1756 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
1758 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
1760 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1761 stellar_data.hash = None;
1762 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1763 }
1764
1765 mocks
1767 .tx_repo
1768 .expect_partial_update()
1769 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
1770 .times(1)
1771 .returning(|id, update| {
1772 let mut updated = create_test_transaction("test");
1773 updated.id = id;
1774 updated.status = update.status.unwrap();
1775 updated.status_reason = update.status_reason.clone();
1776 Ok(updated)
1777 });
1778
1779 mocks
1781 .job_producer
1782 .expect_produce_submit_transaction_job()
1783 .never();
1784
1785 mocks
1787 .job_producer
1788 .expect_produce_send_notification_job()
1789 .times(1)
1790 .returning(|_, _| Box::pin(async { Ok(()) }));
1791
1792 mocks
1794 .tx_repo
1795 .expect_find_by_status_paginated()
1796 .returning(move |_, _, _, _| {
1797 Ok(PaginatedResult {
1798 items: vec![],
1799 total: 0,
1800 page: 1,
1801 per_page: 1,
1802 })
1803 });
1804
1805 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1806 let result = handler.handle_transaction_status_impl(tx, None).await;
1807
1808 assert!(result.is_ok());
1809 let expired_tx = result.unwrap();
1810 assert_eq!(expired_tx.status, TransactionStatus::Expired);
1811 assert!(expired_tx
1812 .status_reason
1813 .as_ref()
1814 .unwrap()
1815 .contains("expired"));
1816 }
1817
1818 #[tokio::test]
1819 async fn test_stuck_sent_transaction_max_lifetime_marks_failed() {
1820 let relayer = create_test_relayer();
1822 let mut mocks = default_test_mocks();
1823
1824 let mut tx = create_test_transaction(&relayer.id);
1825 tx.id = "tx-max-lifetime".to_string();
1826 tx.status = TransactionStatus::Sent;
1827 tx.created_at = (Utc::now() - Duration::minutes(35)).to_rfc3339();
1829 tx.valid_until = None;
1831 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1832 stellar_data.hash = None;
1833 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1834 }
1835
1836 mocks
1838 .tx_repo
1839 .expect_partial_update()
1840 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1841 .times(1)
1842 .returning(|id, update| {
1843 let mut updated = create_test_transaction("test");
1844 updated.id = id;
1845 updated.status = update.status.unwrap();
1846 updated.status_reason = update.status_reason.clone();
1847 Ok(updated)
1848 });
1849
1850 mocks
1852 .job_producer
1853 .expect_produce_submit_transaction_job()
1854 .never();
1855
1856 mocks
1858 .job_producer
1859 .expect_produce_send_notification_job()
1860 .times(1)
1861 .returning(|_, _| Box::pin(async { Ok(()) }));
1862
1863 mocks
1865 .tx_repo
1866 .expect_find_by_status_paginated()
1867 .returning(|_, _, _, _| {
1868 Ok(PaginatedResult {
1869 items: vec![],
1870 total: 0,
1871 page: 1,
1872 per_page: 1,
1873 })
1874 });
1875
1876 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1877 let result = handler.handle_transaction_status_impl(tx, None).await;
1878
1879 assert!(result.is_ok());
1880 let failed_tx = result.unwrap();
1881 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1882 assert!(failed_tx
1884 .status_reason
1885 .as_ref()
1886 .unwrap()
1887 .contains("stuck in Sent status for too long"));
1888 }
1889 #[tokio::test]
1890 async fn handle_status_concurrent_update_conflict_reloads_latest_state() {
1891 let relayer = create_test_relayer();
1894 let mut mocks = default_test_mocks();
1895
1896 let mut tx = create_test_transaction(&relayer.id);
1897 tx.id = "tx-cas-conflict".to_string();
1898 tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1899 let tx_hash_bytes = [11u8; 32];
1900 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1901 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1902 }
1903 tx.status = TransactionStatus::Submitted;
1904
1905 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1906
1907 mocks
1909 .provider
1910 .expect_get_transaction()
1911 .with(eq(expected_stellar_hash))
1912 .times(1)
1913 .returning(move |_| {
1914 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
1915 });
1916
1917 mocks
1919 .tx_repo
1920 .expect_partial_update()
1921 .times(1)
1922 .returning(|_id, _update| {
1923 Err(RepositoryError::ConcurrentUpdateConflict(
1924 "CAS mismatch".to_string(),
1925 ))
1926 });
1927
1928 let reloaded_tx = {
1930 let mut t = create_test_transaction(&relayer.id);
1931 t.id = "tx-cas-conflict".to_string();
1932 t.status = TransactionStatus::Confirmed;
1934 t
1935 };
1936 let reloaded_clone = reloaded_tx.clone();
1937 mocks
1938 .tx_repo
1939 .expect_get_by_id()
1940 .with(eq("tx-cas-conflict".to_string()))
1941 .times(1)
1942 .returning(move |_| Ok(reloaded_clone.clone()));
1943
1944 mocks
1946 .job_producer
1947 .expect_produce_send_notification_job()
1948 .never();
1949 mocks
1950 .job_producer
1951 .expect_produce_transaction_request_job()
1952 .never();
1953
1954 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1955 let result = handler.handle_transaction_status_impl(tx, None).await;
1956
1957 assert!(result.is_ok(), "CAS conflict should return Ok after reload");
1958 let returned_tx = result.unwrap();
1959 assert_eq!(returned_tx.id, "tx-cas-conflict");
1960 assert_eq!(returned_tx.status, TransactionStatus::Confirmed);
1962 }
1963 }
1964
1965 mod handle_pending_state_tests {
1966 use super::*;
1967 use crate::constants::get_stellar_max_stuck_transaction_lifetime;
1968 use crate::constants::STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS;
1969
1970 #[tokio::test]
1971 async fn test_pending_exceeds_max_lifetime_marks_failed() {
1972 let relayer = create_test_relayer();
1973 let mut mocks = default_test_mocks();
1974
1975 let mut tx = create_test_transaction(&relayer.id);
1976 tx.id = "tx-pending-old".to_string();
1977 tx.status = TransactionStatus::Pending;
1978 tx.created_at =
1980 (Utc::now() - get_stellar_max_stuck_transaction_lifetime() - Duration::minutes(1))
1981 .to_rfc3339();
1982
1983 mocks
1985 .tx_repo
1986 .expect_partial_update()
1987 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1988 .times(1)
1989 .returning(|id, update| {
1990 let mut updated = create_test_transaction("test");
1991 updated.id = id;
1992 updated.status = update.status.unwrap();
1993 updated.status_reason = update.status_reason.clone();
1994 Ok(updated)
1995 });
1996
1997 mocks
1999 .job_producer
2000 .expect_produce_send_notification_job()
2001 .times(1)
2002 .returning(|_, _| Box::pin(async { Ok(()) }));
2003
2004 mocks
2006 .tx_repo
2007 .expect_find_by_status_paginated()
2008 .returning(move |_, _, _, _| {
2009 Ok(PaginatedResult {
2010 items: vec![],
2011 total: 0,
2012 page: 1,
2013 per_page: 1,
2014 })
2015 });
2016
2017 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2018 let result = handler.handle_transaction_status_impl(tx, None).await;
2019
2020 assert!(result.is_ok());
2021 let failed_tx = result.unwrap();
2022 assert_eq!(failed_tx.status, TransactionStatus::Failed);
2023 assert!(failed_tx
2024 .status_reason
2025 .as_ref()
2026 .unwrap()
2027 .contains("stuck in Pending status for too long"));
2028 }
2029
2030 #[tokio::test]
2031 async fn test_pending_triggers_recovery_job_when_old_enough() {
2032 let relayer = create_test_relayer();
2033 let mut mocks = default_test_mocks();
2034
2035 let mut tx = create_test_transaction(&relayer.id);
2036 tx.id = "tx-pending-recovery".to_string();
2037 tx.status = TransactionStatus::Pending;
2038 tx.created_at = (Utc::now()
2040 - Duration::seconds(STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS + 5))
2041 .to_rfc3339();
2042
2043 mocks
2045 .job_producer
2046 .expect_produce_transaction_request_job()
2047 .times(1)
2048 .returning(|_, _| Box::pin(async { Ok(()) }));
2049
2050 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2051 let result = handler.handle_transaction_status_impl(tx, None).await;
2052
2053 assert!(result.is_ok());
2054 let tx_result = result.unwrap();
2055 assert_eq!(tx_result.status, TransactionStatus::Pending);
2056 }
2057
2058 #[tokio::test]
2059 async fn test_pending_too_young_does_not_schedule_recovery() {
2060 let relayer = create_test_relayer();
2061 let mut mocks = default_test_mocks();
2062
2063 let mut tx = create_test_transaction(&relayer.id);
2064 tx.id = "tx-pending-young".to_string();
2065 tx.status = TransactionStatus::Pending;
2066 tx.created_at = (Utc::now()
2068 - Duration::seconds(STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS - 5))
2069 .to_rfc3339();
2070
2071 mocks
2073 .job_producer
2074 .expect_produce_transaction_request_job()
2075 .never();
2076
2077 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2078 let result = handler.handle_transaction_status_impl(tx, None).await;
2079
2080 assert!(result.is_ok());
2081 let tx_result = result.unwrap();
2082 assert_eq!(tx_result.status, TransactionStatus::Pending);
2083 }
2084
2085 #[tokio::test]
2086 async fn test_sent_without_hash_handles_stuck_recovery() {
2087 use crate::constants::STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS;
2088
2089 let relayer = create_test_relayer();
2090 let mut mocks = default_test_mocks();
2091
2092 let mut tx = create_test_transaction(&relayer.id);
2093 tx.id = "tx-sent-no-hash".to_string();
2094 tx.status = TransactionStatus::Sent;
2095 tx.created_at = (Utc::now()
2097 - Duration::seconds(STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS)
2098 - Duration::seconds(1))
2099 .to_rfc3339();
2100 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2101 stellar_data.hash = None; }
2103
2104 mocks
2106 .job_producer
2107 .expect_produce_submit_transaction_job()
2108 .times(1)
2109 .returning(|_, _| Box::pin(async { Ok(()) }));
2110
2111 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2112 let result = handler.handle_transaction_status_impl(tx, None).await;
2113
2114 assert!(result.is_ok());
2115 let tx_result = result.unwrap();
2116 assert_eq!(tx_result.status, TransactionStatus::Sent);
2117 }
2118
2119 #[tokio::test]
2120 async fn test_submitted_without_hash_marks_failed() {
2121 let relayer = create_test_relayer();
2122 let mut mocks = default_test_mocks();
2123
2124 let mut tx = create_test_transaction(&relayer.id);
2125 tx.id = "tx-submitted-no-hash".to_string();
2126 tx.status = TransactionStatus::Submitted;
2127 tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2128 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2129 stellar_data.hash = None; }
2131
2132 mocks
2134 .tx_repo
2135 .expect_partial_update()
2136 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
2137 .times(1)
2138 .returning(|id, update| {
2139 let mut updated = create_test_transaction("test");
2140 updated.id = id;
2141 updated.status = update.status.unwrap();
2142 updated.status_reason = update.status_reason.clone();
2143 Ok(updated)
2144 });
2145
2146 mocks
2148 .job_producer
2149 .expect_produce_send_notification_job()
2150 .times(1)
2151 .returning(|_, _| Box::pin(async { Ok(()) }));
2152
2153 mocks
2155 .tx_repo
2156 .expect_find_by_status_paginated()
2157 .returning(move |_, _, _, _| {
2158 Ok(PaginatedResult {
2159 items: vec![],
2160 total: 0,
2161 page: 1,
2162 per_page: 1,
2163 })
2164 });
2165
2166 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2167 let result = handler.handle_transaction_status_impl(tx, None).await;
2168
2169 assert!(result.is_ok());
2170 let failed_tx = result.unwrap();
2171 assert_eq!(failed_tx.status, TransactionStatus::Failed);
2172 assert!(failed_tx
2173 .status_reason
2174 .as_ref()
2175 .unwrap()
2176 .contains("Failed to parse and validate hash"));
2177 }
2178
2179 #[tokio::test]
2180 async fn test_submitted_exceeds_max_lifetime_marks_failed() {
2181 let relayer = create_test_relayer();
2182 let mut mocks = default_test_mocks();
2183
2184 let mut tx = create_test_transaction(&relayer.id);
2185 tx.id = "tx-submitted-old".to_string();
2186 tx.status = TransactionStatus::Submitted;
2187 tx.created_at =
2189 (Utc::now() - get_stellar_max_stuck_transaction_lifetime() - Duration::minutes(1))
2190 .to_rfc3339();
2191 let tx_hash_bytes = [6u8; 32];
2193 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2194 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2195 }
2196
2197 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2198
2199 mocks
2201 .provider
2202 .expect_get_transaction()
2203 .with(eq(expected_stellar_hash.clone()))
2204 .times(1)
2205 .returning(move |_| {
2206 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2207 });
2208
2209 mocks
2211 .tx_repo
2212 .expect_partial_update()
2213 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
2214 .times(1)
2215 .returning(|id, update| {
2216 let mut updated = create_test_transaction("test");
2217 updated.id = id;
2218 updated.status = update.status.unwrap();
2219 updated.status_reason = update.status_reason.clone();
2220 Ok(updated)
2221 });
2222
2223 mocks
2225 .job_producer
2226 .expect_produce_send_notification_job()
2227 .times(1)
2228 .returning(|_, _| Box::pin(async { Ok(()) }));
2229
2230 mocks
2232 .tx_repo
2233 .expect_find_by_status_paginated()
2234 .returning(move |_, _, _, _| {
2235 Ok(PaginatedResult {
2236 items: vec![],
2237 total: 0,
2238 page: 1,
2239 per_page: 1,
2240 })
2241 });
2242
2243 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2244 let result = handler.handle_transaction_status_impl(tx, None).await;
2245
2246 assert!(result.is_ok());
2247 let failed_tx = result.unwrap();
2248 assert_eq!(failed_tx.status, TransactionStatus::Failed);
2249 assert!(failed_tx
2250 .status_reason
2251 .as_ref()
2252 .unwrap()
2253 .contains("stuck in Submitted status for too long"));
2254 }
2255
2256 #[tokio::test]
2257 async fn test_submitted_expired_marks_expired() {
2258 let relayer = create_test_relayer();
2259 let mut mocks = default_test_mocks();
2260
2261 let mut tx = create_test_transaction(&relayer.id);
2262 tx.id = "tx-submitted-expired".to_string();
2263 tx.status = TransactionStatus::Submitted;
2264 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
2265 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
2267 let tx_hash_bytes = [7u8; 32];
2269 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2270 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2271 }
2272
2273 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2274
2275 mocks
2277 .provider
2278 .expect_get_transaction()
2279 .with(eq(expected_stellar_hash.clone()))
2280 .times(1)
2281 .returning(move |_| {
2282 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2283 });
2284
2285 mocks
2287 .tx_repo
2288 .expect_partial_update()
2289 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
2290 .times(1)
2291 .returning(|id, update| {
2292 let mut updated = create_test_transaction("test");
2293 updated.id = id;
2294 updated.status = update.status.unwrap();
2295 updated.status_reason = update.status_reason.clone();
2296 Ok(updated)
2297 });
2298
2299 mocks
2301 .job_producer
2302 .expect_produce_send_notification_job()
2303 .times(1)
2304 .returning(|_, _| Box::pin(async { Ok(()) }));
2305
2306 mocks
2308 .tx_repo
2309 .expect_find_by_status_paginated()
2310 .returning(move |_, _, _, _| {
2311 Ok(PaginatedResult {
2312 items: vec![],
2313 total: 0,
2314 page: 1,
2315 per_page: 1,
2316 })
2317 });
2318
2319 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2320 let result = handler.handle_transaction_status_impl(tx, None).await;
2321
2322 assert!(result.is_ok());
2323 let expired_tx = result.unwrap();
2324 assert_eq!(expired_tx.status, TransactionStatus::Expired);
2325 assert!(expired_tx
2326 .status_reason
2327 .as_ref()
2328 .unwrap()
2329 .contains("expired"));
2330 }
2331
2332 #[tokio::test]
2333 async fn test_handle_submitted_state_resubmits_after_timeout() {
2334 let relayer = create_test_relayer();
2336 let mut mocks = default_test_mocks();
2337
2338 let mut tx = create_test_transaction(&relayer.id);
2339 tx.id = "tx-submitted-resubmit".to_string();
2340 tx.status = TransactionStatus::Submitted;
2341 let sixteen_seconds_ago = (Utc::now() - Duration::seconds(16)).to_rfc3339();
2342 tx.created_at = sixteen_seconds_ago.clone();
2343 tx.sent_at = Some(sixteen_seconds_ago);
2344 let tx_hash_bytes = [8u8; 32];
2346 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2347 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2348 }
2349
2350 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2351
2352 mocks
2354 .provider
2355 .expect_get_transaction()
2356 .with(eq(expected_stellar_hash.clone()))
2357 .times(1)
2358 .returning(move |_| {
2359 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2360 });
2361
2362 mocks
2364 .job_producer
2365 .expect_produce_submit_transaction_job()
2366 .times(1)
2367 .returning(|_, _| Box::pin(async { Ok(()) }));
2368
2369 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2370 let result = handler.handle_transaction_status_impl(tx, None).await;
2371
2372 assert!(result.is_ok());
2373 let tx_result = result.unwrap();
2374 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2375 }
2376
2377 #[tokio::test]
2378 async fn test_handle_submitted_state_backoff_increases_interval() {
2379 let relayer = create_test_relayer();
2383 let mut mocks = default_test_mocks();
2384
2385 let mut tx = create_test_transaction(&relayer.id);
2386 tx.id = "tx-submitted-backoff".to_string();
2387 tx.status = TransactionStatus::Submitted;
2388 tx.created_at = (Utc::now() - Duration::seconds(30)).to_rfc3339();
2389 tx.sent_at = Some((Utc::now() - Duration::seconds(15)).to_rfc3339());
2390 let tx_hash_bytes = [11u8; 32];
2391 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2392 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2393 }
2394
2395 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2396
2397 mocks
2398 .provider
2399 .expect_get_transaction()
2400 .with(eq(expected_stellar_hash.clone()))
2401 .times(1)
2402 .returning(move |_| {
2403 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2404 });
2405
2406 mocks
2408 .job_producer
2409 .expect_produce_submit_transaction_job()
2410 .never();
2411
2412 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2413 let result = handler.handle_transaction_status_impl(tx, None).await;
2414
2415 assert!(result.is_ok());
2416 let tx_result = result.unwrap();
2417 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2418 }
2419
2420 #[tokio::test]
2421 async fn test_handle_submitted_state_backoff_resubmits_when_interval_exceeded() {
2422 let relayer = create_test_relayer();
2426 let mut mocks = default_test_mocks();
2427
2428 let mut tx = create_test_transaction(&relayer.id);
2429 tx.id = "tx-submitted-backoff-resubmit".to_string();
2430 tx.status = TransactionStatus::Submitted;
2431 tx.created_at = (Utc::now() - Duration::seconds(25)).to_rfc3339();
2432 tx.sent_at = Some((Utc::now() - Duration::seconds(25)).to_rfc3339());
2433 let tx_hash_bytes = [12u8; 32];
2434 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2435 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2436 }
2437
2438 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2439
2440 mocks
2441 .provider
2442 .expect_get_transaction()
2443 .with(eq(expected_stellar_hash.clone()))
2444 .times(1)
2445 .returning(move |_| {
2446 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2447 });
2448
2449 mocks
2451 .job_producer
2452 .expect_produce_submit_transaction_job()
2453 .times(1)
2454 .returning(|_, _| Box::pin(async { Ok(()) }));
2455
2456 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2457 let result = handler.handle_transaction_status_impl(tx, None).await;
2458
2459 assert!(result.is_ok());
2460 let tx_result = result.unwrap();
2461 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2462 }
2463
2464 #[tokio::test]
2465 async fn test_handle_submitted_state_recent_sent_at_prevents_resubmit() {
2466 let relayer = create_test_relayer();
2471 let mut mocks = default_test_mocks();
2472
2473 let mut tx = create_test_transaction(&relayer.id);
2474 tx.id = "tx-submitted-recent-sent".to_string();
2475 tx.status = TransactionStatus::Submitted;
2476 tx.created_at = (Utc::now() - Duration::seconds(60)).to_rfc3339();
2477 tx.sent_at = Some((Utc::now() - Duration::seconds(5)).to_rfc3339());
2478 let tx_hash_bytes = [13u8; 32];
2479 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2480 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2481 }
2482
2483 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2484
2485 mocks
2486 .provider
2487 .expect_get_transaction()
2488 .with(eq(expected_stellar_hash.clone()))
2489 .times(1)
2490 .returning(move |_| {
2491 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2492 });
2493
2494 mocks
2496 .job_producer
2497 .expect_produce_submit_transaction_job()
2498 .never();
2499
2500 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2501 let result = handler.handle_transaction_status_impl(tx, None).await;
2502
2503 assert!(result.is_ok());
2504 let tx_result = result.unwrap();
2505 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2506 }
2507
2508 #[tokio::test]
2509 async fn test_handle_submitted_state_no_resubmit_before_timeout() {
2510 let relayer = create_test_relayer();
2511 let mut mocks = default_test_mocks();
2512
2513 let mut tx = create_test_transaction(&relayer.id);
2514 tx.id = "tx-submitted-young".to_string();
2515 tx.status = TransactionStatus::Submitted;
2516 tx.created_at = Utc::now().to_rfc3339();
2518 let tx_hash_bytes = [9u8; 32];
2520 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2521 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2522 }
2523
2524 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2525
2526 mocks
2528 .provider
2529 .expect_get_transaction()
2530 .with(eq(expected_stellar_hash.clone()))
2531 .times(1)
2532 .returning(move |_| {
2533 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2534 });
2535
2536 mocks
2538 .job_producer
2539 .expect_produce_submit_transaction_job()
2540 .never();
2541
2542 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2543 let result = handler.handle_transaction_status_impl(tx, None).await;
2544
2545 assert!(result.is_ok());
2546 let tx_result = result.unwrap();
2547 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2548 }
2549
2550 #[tokio::test]
2551 async fn test_handle_submitted_state_expired_before_resubmit() {
2552 let relayer = create_test_relayer();
2553 let mut mocks = default_test_mocks();
2554
2555 let mut tx = create_test_transaction(&relayer.id);
2556 tx.id = "tx-submitted-expired-no-resubmit".to_string();
2557 tx.status = TransactionStatus::Submitted;
2558 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
2559 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
2561 let tx_hash_bytes = [10u8; 32];
2563 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2564 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2565 }
2566
2567 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2568
2569 mocks
2571 .provider
2572 .expect_get_transaction()
2573 .with(eq(expected_stellar_hash.clone()))
2574 .times(1)
2575 .returning(move |_| {
2576 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2577 });
2578
2579 mocks
2581 .tx_repo
2582 .expect_partial_update()
2583 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
2584 .times(1)
2585 .returning(|id, update| {
2586 let mut updated = create_test_transaction("test");
2587 updated.id = id;
2588 updated.status = update.status.unwrap();
2589 updated.status_reason = update.status_reason.clone();
2590 Ok(updated)
2591 });
2592
2593 mocks
2595 .job_producer
2596 .expect_produce_submit_transaction_job()
2597 .never();
2598
2599 mocks
2601 .job_producer
2602 .expect_produce_send_notification_job()
2603 .times(1)
2604 .returning(|_, _| Box::pin(async { Ok(()) }));
2605
2606 mocks
2608 .tx_repo
2609 .expect_find_by_status_paginated()
2610 .returning(move |_, _, _, _| {
2611 Ok(PaginatedResult {
2612 items: vec![],
2613 total: 0,
2614 page: 1,
2615 per_page: 1,
2616 })
2617 });
2618
2619 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2620 let result = handler.handle_transaction_status_impl(tx, None).await;
2621
2622 assert!(result.is_ok());
2623 let expired_tx = result.unwrap();
2624 assert_eq!(expired_tx.status, TransactionStatus::Expired);
2625 assert!(expired_tx
2626 .status_reason
2627 .as_ref()
2628 .unwrap()
2629 .contains("expired"));
2630 }
2631 }
2632
2633 mod is_valid_until_expired_tests {
2634 use super::*;
2635 use crate::{
2636 jobs::MockJobProducerTrait,
2637 repositories::{
2638 MockRelayerRepository, MockTransactionCounterTrait, MockTransactionRepository,
2639 },
2640 services::{
2641 provider::MockStellarProviderTrait, stellar_dex::MockStellarDexServiceTrait,
2642 },
2643 };
2644 use chrono::{Duration, Utc};
2645
2646 type TestHandler = StellarRelayerTransaction<
2648 MockRelayerRepository,
2649 MockTransactionRepository,
2650 MockJobProducerTrait,
2651 MockStellarCombinedSigner,
2652 MockStellarProviderTrait,
2653 MockTransactionCounterTrait,
2654 MockStellarDexServiceTrait,
2655 >;
2656
2657 #[test]
2658 fn test_rfc3339_expired() {
2659 let past = (Utc::now() - Duration::hours(1)).to_rfc3339();
2660 assert!(TestHandler::is_valid_until_string_expired(&past));
2661 }
2662
2663 #[test]
2664 fn test_rfc3339_not_expired() {
2665 let future = (Utc::now() + Duration::hours(1)).to_rfc3339();
2666 assert!(!TestHandler::is_valid_until_string_expired(&future));
2667 }
2668
2669 #[test]
2670 fn test_numeric_timestamp_expired() {
2671 let past_timestamp = (Utc::now() - Duration::hours(1)).timestamp().to_string();
2672 assert!(TestHandler::is_valid_until_string_expired(&past_timestamp));
2673 }
2674
2675 #[test]
2676 fn test_numeric_timestamp_not_expired() {
2677 let future_timestamp = (Utc::now() + Duration::hours(1)).timestamp().to_string();
2678 assert!(!TestHandler::is_valid_until_string_expired(
2679 &future_timestamp
2680 ));
2681 }
2682
2683 #[test]
2684 fn test_zero_timestamp_unbounded() {
2685 assert!(!TestHandler::is_valid_until_string_expired("0"));
2687 }
2688
2689 #[test]
2690 fn test_invalid_format_not_expired() {
2691 assert!(!TestHandler::is_valid_until_string_expired("not-a-date"));
2693 }
2694 }
2695
2696 mod circuit_breaker_tests {
2698 use super::*;
2699 use crate::jobs::StatusCheckContext;
2700 use crate::models::NetworkType;
2701
2702 fn create_triggered_context() -> StatusCheckContext {
2704 StatusCheckContext::new(
2705 110, 150, 160, 100, 300, NetworkType::Stellar,
2711 )
2712 }
2713
2714 fn create_safe_context() -> StatusCheckContext {
2716 StatusCheckContext::new(
2717 10, 20, 25, 100, 300, NetworkType::Stellar,
2723 )
2724 }
2725
2726 fn create_total_triggered_context() -> StatusCheckContext {
2728 StatusCheckContext::new(
2729 20, 310, 350, 100, 300, NetworkType::Stellar,
2735 )
2736 }
2737
2738 #[tokio::test]
2739 async fn test_circuit_breaker_submitted_marks_as_failed() {
2740 let relayer = create_test_relayer();
2741 let mut mocks = default_test_mocks();
2742
2743 let mut tx_to_handle = create_test_transaction(&relayer.id);
2744 tx_to_handle.status = TransactionStatus::Submitted;
2745 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2746
2747 mocks
2749 .tx_repo
2750 .expect_partial_update()
2751 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2752 .times(1)
2753 .returning(|_, update| {
2754 let mut updated_tx = create_test_transaction("test-relayer");
2755 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2756 updated_tx.status_reason = update.status_reason.clone();
2757 Ok(updated_tx)
2758 });
2759
2760 mocks
2762 .job_producer
2763 .expect_produce_send_notification_job()
2764 .returning(|_, _| Box::pin(async { Ok(()) }));
2765
2766 mocks
2768 .tx_repo
2769 .expect_find_by_status_paginated()
2770 .returning(|_, _, _, _| {
2771 Ok(PaginatedResult {
2772 items: vec![],
2773 total: 0,
2774 page: 1,
2775 per_page: 1,
2776 })
2777 });
2778
2779 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2780 let ctx = create_triggered_context();
2781
2782 let result = handler
2783 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2784 .await;
2785
2786 assert!(result.is_ok());
2787 let tx = result.unwrap();
2788 assert_eq!(tx.status, TransactionStatus::Failed);
2789 assert!(tx.status_reason.is_some());
2790 assert!(tx.status_reason.unwrap().contains("consecutive errors"));
2791 }
2792
2793 #[tokio::test]
2794 async fn test_circuit_breaker_pending_marks_as_failed() {
2795 let relayer = create_test_relayer();
2796 let mut mocks = default_test_mocks();
2797
2798 let mut tx_to_handle = create_test_transaction(&relayer.id);
2799 tx_to_handle.status = TransactionStatus::Pending;
2800 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2801
2802 mocks
2804 .tx_repo
2805 .expect_partial_update()
2806 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2807 .times(1)
2808 .returning(|_, update| {
2809 let mut updated_tx = create_test_transaction("test-relayer");
2810 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2811 updated_tx.status_reason = update.status_reason.clone();
2812 Ok(updated_tx)
2813 });
2814
2815 mocks
2816 .job_producer
2817 .expect_produce_send_notification_job()
2818 .returning(|_, _| Box::pin(async { Ok(()) }));
2819
2820 mocks
2821 .tx_repo
2822 .expect_find_by_status_paginated()
2823 .returning(|_, _, _, _| {
2824 Ok(PaginatedResult {
2825 items: vec![],
2826 total: 0,
2827 page: 1,
2828 per_page: 1,
2829 })
2830 });
2831
2832 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2833 let ctx = create_triggered_context();
2834
2835 let result = handler
2836 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2837 .await;
2838
2839 assert!(result.is_ok());
2840 let tx = result.unwrap();
2841 assert_eq!(tx.status, TransactionStatus::Failed);
2842 }
2843
2844 #[tokio::test]
2845 async fn test_circuit_breaker_total_failures_triggers() {
2846 let relayer = create_test_relayer();
2847 let mut mocks = default_test_mocks();
2848
2849 let mut tx_to_handle = create_test_transaction(&relayer.id);
2850 tx_to_handle.status = TransactionStatus::Submitted;
2851 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2852
2853 mocks
2854 .tx_repo
2855 .expect_partial_update()
2856 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2857 .times(1)
2858 .returning(|_, update| {
2859 let mut updated_tx = create_test_transaction("test-relayer");
2860 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2861 updated_tx.status_reason = update.status_reason.clone();
2862 Ok(updated_tx)
2863 });
2864
2865 mocks
2866 .job_producer
2867 .expect_produce_send_notification_job()
2868 .returning(|_, _| Box::pin(async { Ok(()) }));
2869
2870 mocks
2871 .tx_repo
2872 .expect_find_by_status_paginated()
2873 .returning(|_, _, _, _| {
2874 Ok(PaginatedResult {
2875 items: vec![],
2876 total: 0,
2877 page: 1,
2878 per_page: 1,
2879 })
2880 });
2881
2882 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2883 let ctx = create_total_triggered_context();
2885
2886 let result = handler
2887 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2888 .await;
2889
2890 assert!(result.is_ok());
2891 let tx = result.unwrap();
2892 assert_eq!(tx.status, TransactionStatus::Failed);
2893 }
2894
2895 #[tokio::test]
2896 async fn test_circuit_breaker_below_threshold_continues() {
2897 let relayer = create_test_relayer();
2898 let mut mocks = default_test_mocks();
2899
2900 let mut tx_to_handle = create_test_transaction(&relayer.id);
2901 tx_to_handle.status = TransactionStatus::Submitted;
2902 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2903 let tx_hash_bytes = [1u8; 32];
2904 let tx_hash_hex = hex::encode(tx_hash_bytes);
2905 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
2906 {
2907 stellar_data.hash = Some(tx_hash_hex.clone());
2908 }
2909
2910 mocks
2912 .provider
2913 .expect_get_transaction()
2914 .returning(|_| Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }));
2915
2916 mocks
2917 .tx_repo
2918 .expect_partial_update()
2919 .returning(|_, update| {
2920 let mut updated_tx = create_test_transaction("test-relayer");
2921 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2922 Ok(updated_tx)
2923 });
2924
2925 mocks
2926 .job_producer
2927 .expect_produce_send_notification_job()
2928 .returning(|_, _| Box::pin(async { Ok(()) }));
2929
2930 mocks
2931 .tx_repo
2932 .expect_find_by_status_paginated()
2933 .returning(|_, _, _, _| {
2934 Ok(PaginatedResult {
2935 items: vec![],
2936 total: 0,
2937 page: 1,
2938 per_page: 1,
2939 })
2940 });
2941
2942 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2943 let ctx = create_safe_context();
2944
2945 let result = handler
2946 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2947 .await;
2948
2949 assert!(result.is_ok());
2950 let tx = result.unwrap();
2951 assert_eq!(tx.status, TransactionStatus::Confirmed);
2953 }
2954
2955 #[tokio::test]
2956 async fn test_circuit_breaker_final_state_early_return() {
2957 let relayer = create_test_relayer();
2958 let mocks = default_test_mocks();
2959
2960 let mut tx_to_handle = create_test_transaction(&relayer.id);
2962 tx_to_handle.status = TransactionStatus::Confirmed;
2963
2964 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2965 let ctx = create_triggered_context();
2966
2967 let result = handler
2969 .handle_transaction_status_impl(tx_to_handle.clone(), Some(ctx))
2970 .await;
2971
2972 assert!(result.is_ok());
2973 assert_eq!(result.unwrap().id, tx_to_handle.id);
2974 }
2975
2976 #[tokio::test]
2977 async fn test_circuit_breaker_no_context_continues() {
2978 let relayer = create_test_relayer();
2979 let mut mocks = default_test_mocks();
2980
2981 let mut tx_to_handle = create_test_transaction(&relayer.id);
2982 tx_to_handle.status = TransactionStatus::Submitted;
2983 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2984 let tx_hash_bytes = [1u8; 32];
2985 let tx_hash_hex = hex::encode(tx_hash_bytes);
2986 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
2987 {
2988 stellar_data.hash = Some(tx_hash_hex.clone());
2989 }
2990
2991 mocks
2993 .provider
2994 .expect_get_transaction()
2995 .returning(|_| Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }));
2996
2997 mocks
2998 .tx_repo
2999 .expect_partial_update()
3000 .returning(|_, update| {
3001 let mut updated_tx = create_test_transaction("test-relayer");
3002 updated_tx.status = update.status.unwrap_or(updated_tx.status);
3003 Ok(updated_tx)
3004 });
3005
3006 mocks
3007 .job_producer
3008 .expect_produce_send_notification_job()
3009 .returning(|_, _| Box::pin(async { Ok(()) }));
3010
3011 mocks
3012 .tx_repo
3013 .expect_find_by_status_paginated()
3014 .returning(|_, _, _, _| {
3015 Ok(PaginatedResult {
3016 items: vec![],
3017 total: 0,
3018 page: 1,
3019 per_page: 1,
3020 })
3021 });
3022
3023 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
3024
3025 let result = handler
3027 .handle_transaction_status_impl(tx_to_handle, None)
3028 .await;
3029
3030 assert!(result.is_ok());
3031 let tx = result.unwrap();
3032 assert_eq!(tx.status, TransactionStatus::Confirmed);
3033 }
3034 }
3035
3036 mod failure_detail_helper_tests {
3037 use super::*;
3038 use soroban_rs::xdr::{InvokeHostFunctionResult, OperationResult, OperationResultTr, VecM};
3039
3040 #[test]
3041 fn first_failing_op_finds_trapped() {
3042 let ops: VecM<OperationResult> = vec![OperationResult::OpInner(
3043 OperationResultTr::InvokeHostFunction(InvokeHostFunctionResult::Trapped),
3044 )]
3045 .try_into()
3046 .unwrap();
3047 assert_eq!(first_failing_op(ops.as_slice()), Some("Trapped"));
3048 }
3049
3050 #[test]
3051 fn first_failing_op_skips_success() {
3052 let ops: VecM<OperationResult> = vec![
3053 OperationResult::OpInner(OperationResultTr::InvokeHostFunction(
3054 InvokeHostFunctionResult::Success(soroban_rs::xdr::Hash([0u8; 32])),
3055 )),
3056 OperationResult::OpInner(OperationResultTr::InvokeHostFunction(
3057 InvokeHostFunctionResult::ResourceLimitExceeded,
3058 )),
3059 ]
3060 .try_into()
3061 .unwrap();
3062 assert_eq!(
3063 first_failing_op(ops.as_slice()),
3064 Some("ResourceLimitExceeded")
3065 );
3066 }
3067
3068 #[test]
3069 fn first_failing_op_all_success_returns_none() {
3070 let ops: VecM<OperationResult> = vec![OperationResult::OpInner(
3071 OperationResultTr::InvokeHostFunction(InvokeHostFunctionResult::Success(
3072 soroban_rs::xdr::Hash([0u8; 32]),
3073 )),
3074 )]
3075 .try_into()
3076 .unwrap();
3077 assert_eq!(first_failing_op(ops.as_slice()), None);
3078 }
3079
3080 #[test]
3081 fn first_failing_op_empty_returns_none() {
3082 assert_eq!(first_failing_op(&[]), None);
3083 }
3084
3085 #[test]
3086 fn first_failing_op_op_bad_auth() {
3087 let ops: VecM<OperationResult> = vec![OperationResult::OpBadAuth].try_into().unwrap();
3088 assert_eq!(first_failing_op(ops.as_slice()), Some("OpBadAuth"));
3089 }
3090
3091 #[test]
3092 fn format_failure_reason_outer_only() {
3093 let s = format_failure_reason("TxBadSeq", None, None, None);
3094 assert_eq!(s, "Transaction failed on-chain. reason=TxBadSeq");
3095 assert!(!s.contains("inner="));
3096 assert!(!s.contains("op="));
3097 assert!(!s.contains("contract_error="));
3098 }
3099
3100 #[test]
3101 fn format_failure_reason_layers_inner_and_op() {
3102 let s = format_failure_reason(
3103 "TxFeeBumpInnerFailed",
3104 Some("TxFailed"),
3105 Some("Trapped"),
3106 None,
3107 );
3108 assert!(s.contains("reason=TxFeeBumpInnerFailed"));
3109 assert!(s.contains("inner=TxFailed"));
3110 assert!(s.contains("op=Trapped"));
3111 assert!(!s.contains("contract_error="));
3112 }
3113
3114 #[test]
3115 fn format_failure_reason_classic_op_failure() {
3116 let ops: VecM<OperationResult> = vec![OperationResult::OpBadAuth].try_into().unwrap();
3117 let op = first_failing_op(ops.as_slice());
3118 let s = format_failure_reason("TxFailed", None, op, None);
3119 assert!(s.contains("reason=TxFailed"));
3120 assert!(s.contains("op=OpBadAuth"));
3121 assert!(!s.contains("contract_error="));
3122 }
3123
3124 fn make_diag_event(topics: Vec<ScVal>, data: ScVal) -> DiagnosticEvent {
3125 use soroban_rs::xdr::{
3126 ContractEvent, ContractEventType, ContractEventV0, ExtensionPoint,
3127 };
3128 DiagnosticEvent {
3129 in_successful_contract_call: false,
3130 event: ContractEvent {
3131 ext: ExtensionPoint::V0,
3132 contract_id: None,
3133 type_: ContractEventType::Diagnostic,
3134 body: ContractEventBody::V0(ContractEventV0 {
3135 topics: topics.try_into().unwrap(),
3136 data,
3137 }),
3138 },
3139 }
3140 }
3141
3142 #[test]
3143 fn extract_contract_error_finds_sc_error() {
3144 use soroban_rs::xdr::ScError;
3145 let evt = make_diag_event(vec![], ScVal::Error(ScError::Contract(5)));
3146 assert_eq!(
3147 extract_contract_error(&[evt]),
3148 Some("Contract(5)".to_string())
3149 );
3150 }
3151
3152 #[test]
3153 fn extract_contract_error_returns_none_for_no_error() {
3154 assert_eq!(extract_contract_error(&[]), None);
3155 let evt = make_diag_event(
3156 vec![ScVal::Symbol("transfer".try_into().unwrap())],
3157 ScVal::I32(42),
3158 );
3159 assert_eq!(extract_contract_error(&[evt]), None);
3160 }
3161
3162 #[test]
3163 fn extract_contract_error_finds_error_with_message() {
3164 use soroban_rs::xdr::ScError;
3165 let evt = make_diag_event(
3166 vec![
3167 ScVal::Symbol("error".try_into().unwrap()),
3168 ScVal::Error(ScError::Contract(5)),
3169 ],
3170 ScVal::String(soroban_rs::xdr::ScString(
3171 "insufficient balance".try_into().unwrap(),
3172 )),
3173 );
3174 assert_eq!(
3175 extract_contract_error(&[evt]),
3176 Some("Contract(5) message=\"insufficient balance\"".to_string())
3177 );
3178 }
3179
3180 #[test]
3181 fn format_failure_reason_includes_contract_error_and_message() {
3182 use soroban_rs::xdr::ScError;
3183 let evt = make_diag_event(
3184 vec![
3185 ScVal::Symbol("error".try_into().unwrap()),
3186 ScVal::Error(ScError::Contract(5)),
3187 ],
3188 ScVal::String(soroban_rs::xdr::ScString(
3189 "insufficient balance".try_into().unwrap(),
3190 )),
3191 );
3192 let ce = extract_contract_error(&[evt]);
3193 let s = format_failure_reason(
3194 "TxFeeBumpInnerFailed",
3195 Some("TxFailed"),
3196 Some("Trapped"),
3197 ce.as_deref(),
3198 );
3199 assert!(s.contains("reason=TxFeeBumpInnerFailed"));
3200 assert!(s.contains("inner=TxFailed"));
3201 assert!(s.contains("op=Trapped"));
3202 assert!(s.contains("contract_error=Contract(5)"));
3203 assert!(s.contains("message=\"insufficient balance\""));
3204 }
3205
3206 #[test]
3207 fn extract_contract_error_first_event_wins() {
3208 use soroban_rs::xdr::ScError;
3209 let no_error_evt = make_diag_event(
3210 vec![ScVal::Symbol("fn_call".try_into().unwrap())],
3211 ScVal::I32(7),
3212 );
3213 let first_error_evt = make_diag_event(
3214 vec![
3215 ScVal::Symbol("error".try_into().unwrap()),
3216 ScVal::Error(ScError::Contract(1)),
3217 ],
3218 ScVal::Void,
3219 );
3220 let second_error_evt = make_diag_event(
3221 vec![
3222 ScVal::Symbol("error".try_into().unwrap()),
3223 ScVal::Error(ScError::Contract(99)),
3224 ],
3225 ScVal::Void,
3226 );
3227 assert_eq!(
3228 extract_contract_error(&[no_error_evt, first_error_evt, second_error_evt]),
3229 Some("Contract(1)".to_string())
3230 );
3231 }
3232
3233 #[test]
3234 fn extract_contract_error_renders_non_contract_error_types() {
3235 use soroban_rs::xdr::{ScError, ScErrorCode};
3236 let evt = make_diag_event(
3237 vec![],
3238 ScVal::Error(ScError::Budget(ScErrorCode::ExceededLimit)),
3239 );
3240 assert_eq!(
3241 extract_contract_error(&[evt]),
3242 Some("Budget(ExceededLimit)".to_string())
3243 );
3244
3245 let evt = make_diag_event(
3246 vec![],
3247 ScVal::Error(ScError::WasmVm(ScErrorCode::InvalidAction)),
3248 );
3249 assert_eq!(
3250 extract_contract_error(&[evt]),
3251 Some("WasmVm(InvalidAction)".to_string())
3252 );
3253 }
3254
3255 #[test]
3256 fn extract_contract_error_finds_error_nested_in_vec() {
3257 use soroban_rs::xdr::{ScError, ScVec};
3258 let nested: VecM<ScVal> = vec![
3259 ScVal::Symbol("inner".try_into().unwrap()),
3260 ScVal::Error(ScError::Contract(42)),
3261 ]
3262 .try_into()
3263 .unwrap();
3264 let evt = make_diag_event(
3265 vec![ScVal::Symbol("error".try_into().unwrap())],
3266 ScVal::Vec(Some(ScVec(nested))),
3267 );
3268 assert_eq!(
3269 extract_contract_error(&[evt]),
3270 Some("Contract(42) message=\"inner\"".to_string())
3271 );
3272 }
3273
3274 #[test]
3275 fn sanitize_message_escapes_quotes_and_strips_control_chars() {
3276 assert_eq!(sanitize_message("hello"), "hello");
3277 assert_eq!(sanitize_message(""), "");
3278 assert_eq!(
3279 sanitize_message(r#"it has "quotes""#),
3280 r#"it has \"quotes\""#
3281 );
3282 assert_eq!(
3283 sanitize_message("multi\nline\twith\rcontrols"),
3284 "multilinewithcontrols"
3285 );
3286 }
3287
3288 #[test]
3289 fn extract_contract_error_decodes_real_prod_xdr() {
3290 const PROD_EVENT_B64: &str = "AAAAAAAAAAAAAAAB1/5EvQrxHWArEJHy9KH03yEtRE0DIeoyrbPMHLurCgQAAAACAAAAAAAAAAIAAAAPAAAABWVycm9yAAAAAAAAAgAAAAAAAAAIAAAAEAAAAAEAAAACAAAADgAAABtmYWlsaW5nIHdpdGggY29udHJhY3QgZXJyb3IAAAAAAwAAAAg=";
3295 let evt = <DiagnosticEvent as soroban_rs::xdr::ReadXdr>::from_xdr_base64(
3296 PROD_EVENT_B64,
3297 Limits::none(),
3298 )
3299 .expect("real prod event should parse");
3300 assert_eq!(
3301 extract_contract_error(&[evt]),
3302 Some("Contract(8) message=\"failing with contract error\"".to_string())
3303 );
3304 }
3305 }
3306}