1use chrono::Utc;
6use tracing::{debug, info, warn};
7
8use super::{
9 is_final_state,
10 utils::{decode_transaction_result_code, is_bad_sequence_error, is_insufficient_fee_error},
11 StellarRelayerTransaction,
12};
13use crate::{
14 constants::STELLAR_INSUFFICIENT_FEE_MAX_RETRIES,
15 jobs::JobProducerTrait,
16 metrics::{STELLAR_SUBMISSION_FAILURES, TRANSACTIONS_INSUFFICIENT_FEE},
17 models::{
18 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
19 TransactionStatus, TransactionUpdateRequest,
20 },
21 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
22 services::{
23 provider::StellarProviderTrait,
24 signer::{Signer, StellarSignTrait},
25 },
26};
27
28impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
29where
30 R: Repository<RelayerRepoModel, String> + Send + Sync,
31 T: TransactionRepository + Send + Sync,
32 J: JobProducerTrait + Send + Sync,
33 S: Signer + StellarSignTrait + Send + Sync,
34 P: StellarProviderTrait + Send + Sync,
35 C: TransactionCounterTrait + Send + Sync,
36 D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
37{
38 pub async fn submit_transaction_impl(
41 &self,
42 tx: TransactionRepoModel,
43 ) -> Result<TransactionRepoModel, TransactionError> {
44 info!(
45 tx_id = %tx.id,
46 relayer_id = %tx.relayer_id,
47 status = ?tx.status,
48 "submitting stellar transaction"
49 );
50
51 if is_final_state(&tx.status) {
53 warn!(
54 tx_id = %tx.id,
55 relayer_id = %tx.relayer_id,
56 status = ?tx.status,
57 "transaction already in final state, skipping submission"
58 );
59 return Ok(tx);
60 }
61
62 if self.is_transaction_expired(&tx)? {
64 info!(
65 tx_id = %tx.id,
66 relayer_id = %tx.relayer_id,
67 valid_until = ?tx.valid_until,
68 "transaction has expired, marking as Expired"
69 );
70 return self
71 .mark_as_expired(tx, "Transaction time_bounds expired".to_string())
72 .await;
73 }
74
75 match self.submit_core(tx.clone()).await {
77 Ok(submitted_tx) => Ok(submitted_tx),
78 Err(error) => {
79 self.handle_submit_failure(tx, error).await
81 }
82 }
83 }
84
85 async fn submit_core(
96 &self,
97 tx: TransactionRepoModel,
98 ) -> Result<TransactionRepoModel, TransactionError> {
99 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
100 let tx_envelope = stellar_data
101 .get_envelope_for_submission()
102 .map_err(TransactionError::from)?;
103
104 let response = self
106 .provider()
107 .send_transaction_with_status(&tx_envelope)
108 .await
109 .map_err(|e| {
110 STELLAR_SUBMISSION_FAILURES
111 .with_label_values(&["provider_error", "n/a"])
112 .inc();
113 TransactionError::from(e)
114 })?;
115
116 match response.status.as_str() {
118 "PENDING" | "DUPLICATE" => {
119 if response.status == "DUPLICATE" {
121 info!(
122 tx_id = %tx.id,
123 relayer_id = %tx.relayer_id,
124 hash = %response.hash,
125 "transaction already submitted (DUPLICATE status)"
126 );
127 }
128 let tx_hash_hex = response.hash.clone();
129 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
130
131 let mut hashes = tx.hashes.clone();
132 if !hashes.contains(&tx_hash_hex) {
133 hashes.push(tx_hash_hex);
134 }
135
136 let update_req = TransactionUpdateRequest {
137 status: Some(TransactionStatus::Submitted),
138 sent_at: Some(Utc::now().to_rfc3339()),
139 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
140 hashes: Some(hashes),
141 ..Default::default()
142 };
143
144 let updated_tx = self
145 .transaction_repository()
146 .partial_update(tx.id.clone(), update_req)
147 .await?;
148
149 if response.status == "PENDING" {
151 info!(
152 tx_id = %tx.id,
153 relayer_id = %tx.relayer_id,
154 "sending transaction update notification for pending transaction"
155 );
156 self.send_transaction_update_notification(&updated_tx).await;
157 }
158
159 Ok(updated_tx)
160 }
161 "TRY_AGAIN_LATER" => {
162 let updated_tx = self
170 .transaction_repository()
171 .record_stellar_try_again_later_retry(tx.id.clone(), Utc::now().to_rfc3339())
172 .await?;
173
174 let retries = updated_tx
175 .metadata
176 .as_ref()
177 .map_or(0, |m| m.try_again_later_retries);
178
179 if retries == 1 {
181 crate::metrics::STELLAR_TRY_AGAIN_LATER
182 .with_label_values(&[&tx.relayer_id, &tx.status.to_string()])
183 .inc();
184 }
185
186 debug!(
187 tx_id = %tx.id,
188 relayer_id = %tx.relayer_id,
189 status = ?tx.status,
190 try_again_later_retries = retries,
191 "TRY_AGAIN_LATER — status checker will retry"
192 );
193 Ok(updated_tx)
194 }
195 "ERROR" => {
196 let error_detail = response
198 .error_result_xdr
199 .unwrap_or_else(|| "No error details provided".to_string());
200 let decoded_result_code = decode_transaction_result_code(&error_detail);
201
202 if decoded_result_code
206 .as_deref()
207 .is_some_and(is_insufficient_fee_error)
208 {
209 let mut meta = tx.metadata.clone().unwrap_or_default();
210 meta.insufficient_fee_retries = meta.insufficient_fee_retries.saturating_add(1);
211
212 if meta.insufficient_fee_retries == 1 {
214 TRANSACTIONS_INSUFFICIENT_FEE
215 .with_label_values(&[tx.relayer_id.as_str(), "stellar"])
216 .inc();
217 }
218
219 if meta.insufficient_fee_retries > STELLAR_INSUFFICIENT_FEE_MAX_RETRIES {
220 STELLAR_SUBMISSION_FAILURES
221 .with_label_values(&["error", "tx_insufficient_fee"])
222 .inc();
223 return Err(TransactionError::UnexpectedError(format!(
224 "Transaction submission error: insufficient fee retry limit exceeded ({STELLAR_INSUFFICIENT_FEE_MAX_RETRIES})"
225 )));
226 }
227
228 debug!(
229 tx_id = %tx.id,
230 relayer_id = %tx.relayer_id,
231 status = ?tx.status,
232 insufficient_fee_retries = meta.insufficient_fee_retries,
233 result_code = decoded_result_code.as_deref().unwrap_or("Unknown"),
234 "ERROR with insufficient fee — status checker will retry"
235 );
236 let updated_tx = self
238 .transaction_repository()
239 .record_stellar_insufficient_fee_retry(
240 tx.id.clone(),
241 Utc::now().to_rfc3339(),
242 )
243 .await?;
244 return Ok(updated_tx);
245 }
246 STELLAR_SUBMISSION_FAILURES
247 .with_label_values(&[
248 "error",
249 decoded_result_code.as_deref().unwrap_or("unknown"),
250 ])
251 .inc();
252 Err(TransactionError::UnexpectedError(format!(
253 "Transaction submission error: {}",
254 decoded_result_code.unwrap_or(error_detail)
255 )))
256 }
257 unknown => {
258 STELLAR_SUBMISSION_FAILURES
260 .with_label_values(&["unknown_status", "n/a"])
261 .inc();
262 warn!(
263 tx_id = %tx.id,
264 relayer_id = %tx.relayer_id,
265 status = %unknown,
266 "received unknown transaction status from RPC"
267 );
268 Err(TransactionError::UnexpectedError(format!(
269 "Unknown transaction status: {unknown}"
270 )))
271 }
272 }
273 }
274
275 async fn handle_submit_failure(
278 &self,
279 tx: TransactionRepoModel,
280 error: TransactionError,
281 ) -> Result<TransactionRepoModel, TransactionError> {
282 let error_reason = format!("Submission failed: {error}");
283 let tx_id = tx.id.clone();
284 let relayer_id = tx.relayer_id.clone();
285 warn!(
286 tx_id = %tx_id,
287 relayer_id = %relayer_id,
288 reason = %error_reason,
289 "transaction submission failed"
290 );
291
292 if error.is_concurrent_update_conflict() {
297 info!(
298 tx_id = %tx_id,
299 relayer_id = %relayer_id,
300 "concurrent transaction update detected during submission, reloading latest state"
301 );
302 return self
303 .transaction_repository()
304 .get_by_id(tx_id)
305 .await
306 .map_err(TransactionError::from);
307 }
308
309 if is_bad_sequence_error(&error_reason) {
310 if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
312 info!(
313 tx_id = %tx_id,
314 relayer_id = %relayer_id,
315 "syncing sequence from chain after bad sequence error"
316 );
317 match self
318 .sync_sequence_from_chain(&stellar_data.source_account)
319 .await
320 {
321 Ok(()) => {
322 info!(
323 tx_id = %tx_id,
324 relayer_id = %relayer_id,
325 "successfully synced sequence from chain"
326 );
327 }
328 Err(sync_error) => {
329 warn!(
330 tx_id = %tx_id,
331 relayer_id = %relayer_id,
332 error = %sync_error,
333 "failed to sync sequence from chain"
334 );
335 }
336 }
337 }
338
339 info!(
342 tx_id = %tx_id,
343 relayer_id = %relayer_id,
344 "bad sequence error detected, resetting transaction to pending state"
345 );
346 match self.reset_transaction_for_retry(tx.clone()).await {
347 Ok(reset_tx) => {
348 info!(
349 tx_id = %tx_id,
350 relayer_id = %relayer_id,
351 "transaction reset to pending, status check will handle resubmission"
352 );
353 return Ok(reset_tx);
357 }
358 Err(reset_error) => {
359 warn!(
360 tx_id = %tx_id,
361 relayer_id = %relayer_id,
362 error = %reset_error,
363 "failed to reset transaction for retry"
364 );
365 }
367 }
368 }
369
370 let update_request = TransactionUpdateRequest {
373 status: Some(TransactionStatus::Failed),
374 status_reason: Some(error_reason.clone()),
375 ..Default::default()
376 };
377 let failed_tx = match self
378 .finalize_transaction_state(tx_id.clone(), update_request)
379 .await
380 {
381 Ok(updated_tx) => updated_tx,
382 Err(finalize_error) => {
383 warn!(
384 tx_id = %tx_id,
385 relayer_id = %relayer_id,
386 error = %finalize_error,
387 "failed to mark transaction as failed, continuing with lane cleanup"
388 );
389 return Err(error);
392 }
393 };
394
395 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
397 warn!(
398 tx_id = %tx_id,
399 relayer_id = %relayer_id,
400 error = %enqueue_error,
401 "failed to enqueue next pending transaction after submission failure"
402 );
403 }
404
405 info!(
406 tx_id = %tx_id,
407 relayer_id = %relayer_id,
408 error = %error_reason,
409 "transaction submission failure handled, marked as failed"
410 );
411
412 Ok(failed_tx)
416 }
417
418 pub async fn resubmit_transaction_impl(
420 &self,
421 tx: TransactionRepoModel,
422 ) -> Result<TransactionRepoModel, TransactionError> {
423 self.submit_transaction_impl(tx).await
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use soroban_rs::stellar_rpc_client::SendTransactionResponse;
431 use soroban_rs::xdr::WriteXdr;
432
433 use crate::domain::transaction::stellar::test_helpers::*;
434 use crate::models::TransactionMetadata;
435
436 fn create_send_tx_response(status: &str, hash: &str) -> SendTransactionResponse {
438 SendTransactionResponse {
439 status: status.to_string(),
440 hash: hash.to_string(),
441 error_result_xdr: None,
442 latest_ledger: 100,
443 latest_ledger_close_time: 1700000000,
444 }
445 }
446
447 mod submit_transaction_tests {
448 use crate::{
449 models::RepositoryError, repositories::PaginatedResult,
450 services::provider::ProviderError,
451 };
452
453 use super::*;
454
455 #[tokio::test]
456 async fn submit_transaction_happy_path() {
457 let relayer = create_test_relayer();
458 let mut mocks = default_test_mocks();
459
460 let response = create_send_tx_response(
462 "PENDING",
463 "0101010101010101010101010101010101010101010101010101010101010101",
464 );
465 mocks
466 .provider
467 .expect_send_transaction_with_status()
468 .returning(move |_| {
469 let r = response.clone();
470 Box::pin(async move { Ok(r) })
471 });
472
473 mocks
475 .tx_repo
476 .expect_partial_update()
477 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
478 .returning(|id, upd| {
479 let mut tx = create_test_transaction("relayer-1");
480 tx.id = id;
481 tx.status = upd.status.unwrap();
482 Ok::<_, RepositoryError>(tx)
483 });
484
485 mocks
487 .job_producer
488 .expect_produce_send_notification_job()
489 .times(1)
490 .returning(|_, _| Box::pin(async { Ok(()) }));
491
492 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
493
494 let mut tx = create_test_transaction(&relayer.id);
495 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
497 d.signatures.push(dummy_signature());
498 d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
499 }
501
502 let res = handler.submit_transaction_impl(tx).await.unwrap();
503 assert_eq!(res.status, TransactionStatus::Submitted);
504 }
505
506 #[tokio::test]
507 async fn submit_transaction_provider_error_marks_failed() {
508 let relayer = create_test_relayer();
509 let mut mocks = default_test_mocks();
510
511 mocks
513 .provider
514 .expect_send_transaction_with_status()
515 .returning(|_| {
516 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
517 });
518
519 mocks
521 .tx_repo
522 .expect_partial_update()
523 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
524 .returning(|id, upd| {
525 let mut tx = create_test_transaction("relayer-1");
526 tx.id = id;
527 tx.status = upd.status.unwrap();
528 Ok::<_, RepositoryError>(tx)
529 });
530
531 mocks
533 .job_producer
534 .expect_produce_send_notification_job()
535 .times(1)
536 .returning(|_, _| Box::pin(async { Ok(()) }));
537
538 mocks
540 .tx_repo
541 .expect_find_by_status_paginated()
542 .returning(move |_, _, _, _| {
543 Ok(PaginatedResult {
544 items: vec![],
545 total: 0,
546 page: 1,
547 per_page: 1,
548 })
549 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
552 let mut tx = create_test_transaction(&relayer.id);
553 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
555 data.signatures.push(dummy_signature());
556 data.sequence_number = Some(42); data.signed_envelope_xdr = Some("test-xdr".to_string()); }
559
560 let res = handler.submit_transaction_impl(tx).await;
561
562 let failed_tx = res.unwrap();
564 assert_eq!(failed_tx.status, TransactionStatus::Failed);
565 }
566
567 #[tokio::test]
568 async fn submit_transaction_repository_error_marks_failed() {
569 let relayer = create_test_relayer();
570 let mut mocks = default_test_mocks();
571
572 let response = create_send_tx_response(
574 "PENDING",
575 "0101010101010101010101010101010101010101010101010101010101010101",
576 );
577 mocks
578 .provider
579 .expect_send_transaction_with_status()
580 .returning(move |_| {
581 let r = response.clone();
582 Box::pin(async move { Ok(r) })
583 });
584
585 mocks
587 .tx_repo
588 .expect_partial_update()
589 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
590 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
591
592 mocks
594 .tx_repo
595 .expect_partial_update()
596 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
597 .returning(|id, upd| {
598 let mut tx = create_test_transaction("relayer-1");
599 tx.id = id;
600 tx.status = upd.status.unwrap();
601 Ok::<_, RepositoryError>(tx)
602 });
603
604 mocks
606 .job_producer
607 .expect_produce_send_notification_job()
608 .times(1)
609 .returning(|_, _| Box::pin(async { Ok(()) }));
610
611 mocks
613 .tx_repo
614 .expect_find_by_status_paginated()
615 .returning(move |_, _, _, _| {
616 Ok(PaginatedResult {
617 items: vec![],
618 total: 0,
619 page: 1,
620 per_page: 1,
621 })
622 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
625 let mut tx = create_test_transaction(&relayer.id);
626 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
628 data.signatures.push(dummy_signature());
629 data.sequence_number = Some(42); data.signed_envelope_xdr = Some("test-xdr".to_string()); }
632
633 let res = handler.submit_transaction_impl(tx).await;
634
635 let failed_tx = res.unwrap();
638 assert_eq!(failed_tx.status, TransactionStatus::Failed);
639 }
640
641 #[tokio::test]
642 async fn submit_transaction_uses_signed_envelope_xdr() {
643 let relayer = create_test_relayer();
644 let mut mocks = default_test_mocks();
645
646 let mut tx = create_test_transaction(&relayer.id);
648 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
650 data.signatures.push(dummy_signature());
651 let envelope = data.get_envelope_for_submission().unwrap();
653 let xdr = envelope
654 .to_xdr_base64(soroban_rs::xdr::Limits::none())
655 .unwrap();
656 data.signed_envelope_xdr = Some(xdr);
657 }
658
659 let response = create_send_tx_response(
661 "PENDING",
662 "0202020202020202020202020202020202020202020202020202020202020202",
663 );
664 mocks
665 .provider
666 .expect_send_transaction_with_status()
667 .returning(move |_| {
668 let r = response.clone();
669 Box::pin(async move { Ok(r) })
670 });
671
672 mocks
674 .tx_repo
675 .expect_partial_update()
676 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
677 .returning(|id, upd| {
678 let mut tx = create_test_transaction("relayer-1");
679 tx.id = id;
680 tx.status = upd.status.unwrap();
681 Ok::<_, RepositoryError>(tx)
682 });
683
684 mocks
686 .job_producer
687 .expect_produce_send_notification_job()
688 .times(1)
689 .returning(|_, _| Box::pin(async { Ok(()) }));
690
691 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
692 let res = handler.submit_transaction_impl(tx).await.unwrap();
693
694 assert_eq!(res.status, TransactionStatus::Submitted);
695 }
696
697 #[tokio::test]
698 async fn resubmit_transaction_delegates_to_submit() {
699 let relayer = create_test_relayer();
700 let mut mocks = default_test_mocks();
701
702 let response = create_send_tx_response(
704 "PENDING",
705 "0101010101010101010101010101010101010101010101010101010101010101",
706 );
707 mocks
708 .provider
709 .expect_send_transaction_with_status()
710 .returning(move |_| {
711 let r = response.clone();
712 Box::pin(async move { Ok(r) })
713 });
714
715 mocks
717 .tx_repo
718 .expect_partial_update()
719 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
720 .returning(|id, upd| {
721 let mut tx = create_test_transaction("relayer-1");
722 tx.id = id;
723 tx.status = upd.status.unwrap();
724 Ok::<_, RepositoryError>(tx)
725 });
726
727 mocks
729 .job_producer
730 .expect_produce_send_notification_job()
731 .times(1)
732 .returning(|_, _| Box::pin(async { Ok(()) }));
733
734 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
735
736 let mut tx = create_test_transaction(&relayer.id);
737 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
739 d.signatures.push(dummy_signature());
740 d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
741 }
743
744 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
745 assert_eq!(res.status, TransactionStatus::Submitted);
746 }
747
748 #[tokio::test]
749 async fn submit_transaction_failure_enqueues_next_transaction() {
750 let relayer = create_test_relayer();
751 let mut mocks = default_test_mocks();
752
753 mocks
755 .provider
756 .expect_send_transaction_with_status()
757 .returning(|_| {
758 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
759 });
760
761 mocks
765 .tx_repo
766 .expect_partial_update()
767 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
768 .returning(|id, upd| {
769 let mut tx = create_test_transaction("relayer-1");
770 tx.id = id;
771 tx.status = upd.status.unwrap();
772 Ok::<_, RepositoryError>(tx)
773 });
774
775 mocks
777 .job_producer
778 .expect_produce_send_notification_job()
779 .times(1)
780 .returning(|_, _| Box::pin(async { Ok(()) }));
781
782 let mut pending_tx = create_test_transaction(&relayer.id);
784 pending_tx.id = "next-pending-tx".to_string();
785 pending_tx.status = TransactionStatus::Pending;
786 let captured_pending_tx = pending_tx.clone();
787 let relayer_id_clone = relayer.id.clone();
788 mocks
789 .tx_repo
790 .expect_find_by_status_paginated()
791 .withf(move |relayer_id, statuses, query, oldest_first| {
792 *relayer_id == relayer_id_clone
793 && statuses == [TransactionStatus::Pending]
794 && query.page == 1
795 && query.per_page == 1
796 && *oldest_first
797 })
798 .times(1)
799 .returning(move |_, _, _, _| {
800 Ok(PaginatedResult {
801 items: vec![captured_pending_tx.clone()],
802 total: 1,
803 page: 1,
804 per_page: 1,
805 })
806 });
807
808 mocks
810 .job_producer
811 .expect_produce_transaction_request_job()
812 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
813 .times(1)
814 .returning(|_, _| Box::pin(async { Ok(()) }));
815
816 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
817 let mut tx = create_test_transaction(&relayer.id);
818 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
820 data.signatures.push(dummy_signature());
821 data.sequence_number = Some(42); data.signed_envelope_xdr = Some("test-xdr".to_string()); }
824
825 let res = handler.submit_transaction_impl(tx).await;
826
827 let failed_tx = res.unwrap();
829 assert_eq!(failed_tx.status, TransactionStatus::Failed);
830 }
831
832 #[tokio::test]
833 async fn test_submit_bad_sequence_resets_and_retries() {
834 let relayer = create_test_relayer();
835 let mut mocks = default_test_mocks();
836
837 mocks
839 .provider
840 .expect_send_transaction_with_status()
841 .returning(|_| {
842 Box::pin(async {
843 Err(ProviderError::Other(
844 "transaction submission failed: TxBadSeq".to_string(),
845 ))
846 })
847 });
848
849 mocks.provider.expect_get_account().times(1).returning(|_| {
851 Box::pin(async {
852 use soroban_rs::xdr::{
853 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
854 String32, Thresholds, Uint256,
855 };
856 use stellar_strkey::ed25519;
857
858 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
859 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
860
861 Ok(AccountEntry {
862 account_id,
863 balance: 1000000,
864 seq_num: SequenceNumber(100),
865 num_sub_entries: 0,
866 inflation_dest: None,
867 flags: 0,
868 home_domain: String32::default(),
869 thresholds: Thresholds([1, 1, 1, 1]),
870 signers: Default::default(),
871 ext: AccountEntryExt::V0,
872 })
873 })
874 });
875
876 mocks
878 .counter
879 .expect_set()
880 .times(1)
881 .returning(|_, _, _| Box::pin(async { Ok(()) }));
882
883 mocks
885 .tx_repo
886 .expect_partial_update()
887 .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
888 .times(1)
889 .returning(|id, upd| {
890 let mut tx = create_test_transaction("relayer-1");
891 tx.id = id;
892 tx.status = upd.status.unwrap();
893 if let Some(network_data) = upd.network_data {
894 tx.network_data = network_data;
895 }
896 Ok::<_, RepositoryError>(tx)
897 });
898
899 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
903 let mut tx = create_test_transaction(&relayer.id);
904 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
906 data.signatures.push(dummy_signature());
907 data.sequence_number = Some(42);
908 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
909 }
911
912 let result = handler.submit_transaction_impl(tx).await;
913
914 assert!(result.is_ok());
916 let reset_tx = result.unwrap();
917 assert_eq!(reset_tx.status, TransactionStatus::Pending);
918
919 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
921 assert!(data.sequence_number.is_none());
922 assert!(data.signatures.is_empty());
923 assert!(data.hash.is_none());
924 assert!(data.signed_envelope_xdr.is_none());
925 } else {
926 panic!("Expected Stellar transaction data");
927 }
928 }
929
930 #[tokio::test]
931 async fn submit_transaction_duplicate_status_succeeds() {
932 let relayer = create_test_relayer();
933 let mut mocks = default_test_mocks();
934
935 let response = create_send_tx_response(
937 "DUPLICATE",
938 "0101010101010101010101010101010101010101010101010101010101010101",
939 );
940 mocks
941 .provider
942 .expect_send_transaction_with_status()
943 .returning(move |_| {
944 let r = response.clone();
945 Box::pin(async move { Ok(r) })
946 });
947
948 mocks
950 .tx_repo
951 .expect_partial_update()
952 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
953 .returning(|id, upd| {
954 let mut tx = create_test_transaction("relayer-1");
955 tx.id = id;
956 tx.status = upd.status.unwrap();
957 Ok::<_, RepositoryError>(tx)
958 });
959
960 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
961
962 let mut tx = create_test_transaction(&relayer.id);
963 tx.status = TransactionStatus::Sent;
964 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
965 d.signatures.push(dummy_signature());
966 d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
967 }
968
969 let res = handler.submit_transaction_impl(tx).await.unwrap();
970 assert_eq!(res.status, TransactionStatus::Submitted);
971 }
972
973 #[tokio::test]
974 async fn submit_transaction_try_again_later_keeps_tx_alive() {
975 let relayer = create_test_relayer();
976 let mut mocks = default_test_mocks();
977
978 let response = create_send_tx_response(
980 "TRY_AGAIN_LATER",
981 "0101010101010101010101010101010101010101010101010101010101010101",
982 );
983 mocks
984 .provider
985 .expect_send_transaction_with_status()
986 .returning(move |_| {
987 let r = response.clone();
988 Box::pin(async move { Ok(r) })
989 });
990
991 mocks
992 .tx_repo
993 .expect_record_stellar_try_again_later_retry()
994 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
995 .returning(|id, _| {
996 let mut tx = create_test_transaction("relayer-1");
997 tx.id = id;
998 tx.status = TransactionStatus::Sent;
999 tx.metadata = Some(TransactionMetadata {
1000 consecutive_failures: 0,
1001 total_failures: 0,
1002 insufficient_fee_retries: 0,
1003 try_again_later_retries: 1,
1004 nonce_too_high_retries: 0,
1005 });
1006 Ok::<_, RepositoryError>(tx)
1007 });
1008
1009 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1010 let mut tx = create_test_transaction(&relayer.id);
1011 tx.status = TransactionStatus::Sent;
1012 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1013 data.signatures.push(dummy_signature());
1014 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1015 }
1016
1017 let res = handler.submit_transaction_impl(tx).await;
1018
1019 let returned_tx = res.unwrap();
1021 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1022 }
1023
1024 #[tokio::test]
1025 async fn submit_try_again_later_then_status_checker_reenqueues_submit() {
1026 let relayer = create_test_relayer();
1027
1028 let mut submit_mocks = default_test_mocks();
1030 let response = create_send_tx_response(
1031 "TRY_AGAIN_LATER",
1032 "0101010101010101010101010101010101010101010101010101010101010101",
1033 );
1034 submit_mocks
1035 .provider
1036 .expect_send_transaction_with_status()
1037 .times(1)
1038 .returning(move |_| {
1039 let r = response.clone();
1040 Box::pin(async move { Ok(r) })
1041 });
1042 submit_mocks
1043 .tx_repo
1044 .expect_record_stellar_try_again_later_retry()
1045 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
1046 .times(1)
1047 .returning(|id, sent_at| {
1048 let mut tx = create_test_transaction("relayer-1");
1049 tx.id = id;
1050 tx.status = TransactionStatus::Sent;
1051 tx.sent_at = Some(sent_at);
1052 tx.metadata = Some(TransactionMetadata {
1053 consecutive_failures: 0,
1054 total_failures: 0,
1055 insufficient_fee_retries: 0,
1056 try_again_later_retries: 1,
1057 nonce_too_high_retries: 0,
1058 });
1059 Ok::<_, RepositoryError>(tx)
1060 });
1061
1062 let submit_handler = make_stellar_tx_handler(relayer.clone(), submit_mocks);
1063 let mut sent_tx = create_test_transaction(&relayer.id);
1064 sent_tx.status = TransactionStatus::Sent;
1065 if let NetworkTransactionData::Stellar(ref mut data) = sent_tx.network_data {
1066 data.signatures.push(dummy_signature());
1067 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1068 }
1069
1070 let mut returned_tx = submit_handler
1071 .submit_transaction_impl(sent_tx)
1072 .await
1073 .unwrap();
1074 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1075 assert!(returned_tx.sent_at.is_some());
1076
1077 use crate::constants::STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS;
1082 let buffer = 2;
1083 let created_at = (Utc::now()
1084 - chrono::Duration::seconds(STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS + buffer))
1085 .to_rfc3339();
1086 let sent_at = (Utc::now()
1087 - chrono::Duration::seconds(STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS + 1))
1088 .to_rfc3339();
1089 returned_tx.created_at = created_at;
1090 returned_tx.sent_at = Some(sent_at);
1091
1092 let mut status_mocks = default_test_mocks();
1093 status_mocks
1094 .job_producer
1095 .expect_produce_submit_transaction_job()
1096 .times(1)
1097 .returning(|_, _| Box::pin(async { Ok(()) }));
1098
1099 let status_handler = make_stellar_tx_handler(relayer.clone(), status_mocks);
1100 let status_result = status_handler
1101 .handle_transaction_status_impl(returned_tx, None)
1102 .await
1103 .unwrap();
1104 assert_eq!(status_result.status, TransactionStatus::Sent);
1105 }
1106
1107 #[tokio::test]
1108 async fn resubmit_try_again_later_returns_ok_for_submitted_tx() {
1109 let relayer = create_test_relayer();
1110 let mut mocks = default_test_mocks();
1111
1112 let response = create_send_tx_response(
1114 "TRY_AGAIN_LATER",
1115 "0101010101010101010101010101010101010101010101010101010101010101",
1116 );
1117 mocks
1118 .provider
1119 .expect_send_transaction_with_status()
1120 .returning(move |_| {
1121 let r = response.clone();
1122 Box::pin(async move { Ok(r) })
1123 });
1124
1125 mocks
1126 .tx_repo
1127 .expect_record_stellar_try_again_later_retry()
1128 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
1129 .returning(|id, _| {
1130 let mut tx = create_test_transaction("relayer-1");
1131 tx.id = id;
1132 tx.status = TransactionStatus::Submitted;
1133 tx.metadata = Some(TransactionMetadata {
1134 consecutive_failures: 0,
1135 total_failures: 0,
1136 insufficient_fee_retries: 0,
1137 try_again_later_retries: 1,
1138 nonce_too_high_retries: 0,
1139 });
1140 Ok::<_, RepositoryError>(tx)
1141 });
1142
1143 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1144 let mut tx = create_test_transaction(&relayer.id);
1145 tx.status = TransactionStatus::Submitted; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1147 data.signatures.push(dummy_signature());
1148 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1149 }
1150
1151 let res = handler.submit_transaction_impl(tx).await;
1152
1153 let returned_tx = res.unwrap();
1155 assert_eq!(returned_tx.status, TransactionStatus::Submitted);
1156 }
1157
1158 #[tokio::test]
1159 async fn submit_transaction_error_status_fails() {
1160 let relayer = create_test_relayer();
1161 let mut mocks = default_test_mocks();
1162
1163 let mut response = create_send_tx_response(
1165 "ERROR",
1166 "0101010101010101010101010101010101010101010101010101010101010101",
1167 );
1168 response.error_result_xdr = Some("not-base64".to_string());
1169 mocks
1170 .provider
1171 .expect_send_transaction_with_status()
1172 .returning(move |_| {
1173 let r = response.clone();
1174 Box::pin(async move { Ok(r) })
1175 });
1176
1177 mocks
1179 .tx_repo
1180 .expect_partial_update()
1181 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
1182 .returning(|id, upd| {
1183 let mut tx = create_test_transaction("relayer-1");
1184 tx.id = id;
1185 tx.status = upd.status.unwrap();
1186 Ok::<_, RepositoryError>(tx)
1187 });
1188
1189 mocks
1191 .job_producer
1192 .expect_produce_send_notification_job()
1193 .times(1)
1194 .returning(|_, _| Box::pin(async { Ok(()) }));
1195
1196 mocks
1198 .tx_repo
1199 .expect_find_by_status_paginated()
1200 .returning(move |_, _, _, _| {
1201 Ok(PaginatedResult {
1202 items: vec![],
1203 total: 0,
1204 page: 1,
1205 per_page: 1,
1206 })
1207 });
1208
1209 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1210 let mut tx = create_test_transaction(&relayer.id);
1211 tx.status = TransactionStatus::Sent;
1212 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1213 data.signatures.push(dummy_signature());
1214 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1215 }
1216
1217 let res = handler.submit_transaction_impl(tx).await;
1218
1219 let failed_tx = res.unwrap();
1221 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1222 }
1223
1224 #[tokio::test]
1225 async fn submit_transaction_insufficient_fee_keeps_tx_alive() {
1226 let relayer = create_test_relayer();
1227 let mut mocks = default_test_mocks();
1228
1229 let mut response = create_send_tx_response(
1231 "ERROR",
1232 "0101010101010101010101010101010101010101010101010101010101010101",
1233 );
1234 response.error_result_xdr = Some("AAAAAAAAY/n////3AAAAAA==".to_string());
1235 mocks
1236 .provider
1237 .expect_send_transaction_with_status()
1238 .returning(move |_| {
1239 let r = response.clone();
1240 Box::pin(async move { Ok(r) })
1241 });
1242
1243 mocks
1245 .tx_repo
1246 .expect_record_stellar_insufficient_fee_retry()
1247 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
1248 .returning(|id, _| {
1249 let mut tx = create_test_transaction("relayer-1");
1250 tx.id = id;
1251 tx.status = TransactionStatus::Sent;
1252 tx.metadata = Some(TransactionMetadata {
1253 consecutive_failures: 0,
1254 total_failures: 0,
1255 insufficient_fee_retries: 1,
1256 try_again_later_retries: 0,
1257 nonce_too_high_retries: 0,
1258 });
1259 Ok::<_, RepositoryError>(tx)
1260 })
1261 .times(1);
1262
1263 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1264 let mut tx = create_test_transaction(&relayer.id);
1265 tx.status = TransactionStatus::Sent;
1266 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1267 data.signatures.push(dummy_signature());
1268 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1269 }
1270
1271 let res = handler.submit_transaction_impl(tx).await;
1272
1273 let returned_tx = res.unwrap();
1275 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1276 assert_eq!(
1277 returned_tx
1278 .metadata
1279 .as_ref()
1280 .map(|metadata| metadata.insufficient_fee_retries),
1281 Some(1)
1282 );
1283 }
1284
1285 #[tokio::test]
1286 async fn submit_transaction_insufficient_fee_exceeding_retry_limit_fails() {
1287 let relayer = create_test_relayer();
1288 let mut mocks = default_test_mocks();
1289
1290 let mut response = create_send_tx_response(
1291 "ERROR",
1292 "0101010101010101010101010101010101010101010101010101010101010101",
1293 );
1294 response.error_result_xdr = Some("AAAAAAAAY/n////3AAAAAA==".to_string());
1295 mocks
1296 .provider
1297 .expect_send_transaction_with_status()
1298 .returning(move |_| {
1299 let r = response.clone();
1300 Box::pin(async move { Ok(r) })
1301 });
1302
1303 mocks
1304 .tx_repo
1305 .expect_partial_update()
1306 .withf(|_, upd| {
1307 upd.status == Some(TransactionStatus::Failed)
1308 && upd.status_reason.as_ref().is_some_and(|reason| {
1309 reason.contains("insufficient fee retry limit exceeded (2)")
1310 })
1311 })
1312 .returning(|id, upd| {
1313 let mut tx = create_test_transaction("relayer-1");
1314 tx.id = id;
1315 tx.status = upd.status.unwrap();
1316 tx.status_reason = upd.status_reason;
1317 Ok::<_, RepositoryError>(tx)
1318 });
1319
1320 mocks
1321 .job_producer
1322 .expect_produce_send_notification_job()
1323 .times(1)
1324 .returning(|_, _| Box::pin(async { Ok(()) }));
1325
1326 mocks
1327 .tx_repo
1328 .expect_find_by_status_paginated()
1329 .returning(move |_, _, _, _| {
1330 Ok(PaginatedResult {
1331 items: vec![],
1332 total: 0,
1333 page: 1,
1334 per_page: 1,
1335 })
1336 });
1337
1338 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1339 let mut tx = create_test_transaction(&relayer.id);
1340 tx.status = TransactionStatus::Sent;
1341 tx.metadata = Some(TransactionMetadata {
1342 insufficient_fee_retries: STELLAR_INSUFFICIENT_FEE_MAX_RETRIES,
1343 ..Default::default()
1344 });
1345 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1346 data.signatures.push(dummy_signature());
1347 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1348 }
1349
1350 let res = handler.submit_transaction_impl(tx).await;
1351
1352 let failed_tx = res.unwrap();
1353 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1354 assert!(
1355 failed_tx.status_reason.as_ref().is_some_and(
1356 |reason| reason.contains("insufficient fee retry limit exceeded (2)")
1357 )
1358 );
1359 }
1360
1361 #[tokio::test]
1362 async fn submit_transaction_error_non_fee_still_fails() {
1363 let relayer = create_test_relayer();
1364 let mut mocks = default_test_mocks();
1365
1366 let mut response = create_send_tx_response(
1368 "ERROR",
1369 "0101010101010101010101010101010101010101010101010101010101010101",
1370 );
1371 response.error_result_xdr = Some("AAAAAAAAA/v////6AAAAAA==".to_string());
1372 mocks
1373 .provider
1374 .expect_send_transaction_with_status()
1375 .returning(move |_| {
1376 let r = response.clone();
1377 Box::pin(async move { Ok(r) })
1378 });
1379
1380 mocks
1382 .tx_repo
1383 .expect_partial_update()
1384 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
1385 .returning(|id, upd| {
1386 let mut tx = create_test_transaction("relayer-1");
1387 tx.id = id;
1388 tx.status = upd.status.unwrap();
1389 Ok::<_, RepositoryError>(tx)
1390 });
1391
1392 mocks
1394 .job_producer
1395 .expect_produce_send_notification_job()
1396 .times(1)
1397 .returning(|_, _| Box::pin(async { Ok(()) }));
1398
1399 mocks
1401 .tx_repo
1402 .expect_find_by_status_paginated()
1403 .returning(move |_, _, _, _| {
1404 Ok(PaginatedResult {
1405 items: vec![],
1406 total: 0,
1407 page: 1,
1408 per_page: 1,
1409 })
1410 });
1411
1412 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1413 let mut tx = create_test_transaction(&relayer.id);
1414 tx.status = TransactionStatus::Sent;
1415 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1416 data.signatures.push(dummy_signature());
1417 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1418 }
1419
1420 let res = handler.submit_transaction_impl(tx).await;
1421
1422 let failed_tx = res.unwrap();
1424 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1425 }
1426
1427 #[tokio::test]
1428 async fn submit_transaction_concurrent_update_conflict_reloads_latest_state() {
1429 let relayer = create_test_relayer();
1432 let mut mocks = default_test_mocks();
1433
1434 let response = create_send_tx_response(
1436 "PENDING",
1437 "0101010101010101010101010101010101010101010101010101010101010101",
1438 );
1439 mocks
1440 .provider
1441 .expect_send_transaction_with_status()
1442 .returning(move |_| {
1443 let r = response.clone();
1444 Box::pin(async move { Ok(r) })
1445 });
1446
1447 mocks
1449 .tx_repo
1450 .expect_partial_update()
1451 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
1452 .times(1)
1453 .returning(|_, _| {
1454 Err(RepositoryError::ConcurrentUpdateConflict(
1455 "CAS mismatch".to_string(),
1456 ))
1457 });
1458
1459 let reloaded_tx = {
1461 let mut t = create_test_transaction(&relayer.id);
1462 t.status = TransactionStatus::Submitted;
1463 t
1464 };
1465 let reloaded_clone = reloaded_tx.clone();
1466 mocks
1467 .tx_repo
1468 .expect_get_by_id()
1469 .times(1)
1470 .returning(move |_| Ok(reloaded_clone.clone()));
1471
1472 mocks
1474 .job_producer
1475 .expect_produce_send_notification_job()
1476 .never();
1477 mocks
1478 .job_producer
1479 .expect_produce_transaction_request_job()
1480 .never();
1481
1482 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1483 let mut tx = create_test_transaction(&relayer.id);
1484 tx.status = TransactionStatus::Sent;
1485 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1486 data.signatures.push(dummy_signature());
1487 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1488 }
1489
1490 let res = handler.submit_transaction_impl(tx).await;
1491
1492 assert!(res.is_ok(), "CAS conflict should return Ok after reload");
1493 let returned_tx = res.unwrap();
1494 assert_eq!(returned_tx.status, TransactionStatus::Submitted);
1496 }
1497 }
1498}