1use crate::config::ServerConfig;
4use crate::constants::FINAL_TRANSACTION_STATUSES;
5use crate::domain::transaction::common::is_final_state;
6use crate::metrics::{
7 TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
8 TRANSACTIONS_INSUFFICIENT_FEE_FAILED, TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS,
9 TRANSACTIONS_SUBMITTED, TRANSACTIONS_SUCCESS, TRANSACTIONS_TRY_AGAIN_LATER_FAILED,
10 TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS, TRANSACTION_PROCESSING_TIME,
11};
12use crate::models::{
13 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15};
16use crate::repositories::redis_base::RedisRepository;
17use crate::repositories::{
18 BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
19 TransactionRepository,
20};
21use crate::utils::RedisConnections;
22use async_trait::async_trait;
23use chrono::Utc;
24use redis::{AsyncCommands, Script};
25use std::fmt;
26use std::sync::Arc;
27use tracing::{debug, error, warn};
28
29const RELAYER_PREFIX: &str = "relayer";
30const TX_PREFIX: &str = "tx";
31const STATUS_PREFIX: &str = "status";
32const STATUS_SORTED_PREFIX: &str = "status_sorted";
33const NONCE_PREFIX: &str = "nonce";
34const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
35const RELAYER_LIST_KEY: &str = "relayer_list";
36const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
37
38#[derive(Clone)]
39pub struct RedisTransactionRepository {
40 pub connections: Arc<RedisConnections>,
41 pub key_prefix: String,
42}
43
44impl RedisRepository for RedisTransactionRepository {}
45
46impl RedisTransactionRepository {
47 pub fn new(
48 connections: Arc<RedisConnections>,
49 key_prefix: String,
50 ) -> Result<Self, RepositoryError> {
51 if key_prefix.is_empty() {
52 return Err(RepositoryError::InvalidData(
53 "Redis key prefix cannot be empty".to_string(),
54 ));
55 }
56
57 Ok(Self {
58 connections,
59 key_prefix,
60 })
61 }
62
63 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
65 format!(
66 "{}:{}:{}:{}:{}",
67 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
68 )
69 }
70
71 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
73 format!(
74 "{}:{}:{}:{}",
75 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
76 )
77 }
78
79 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
81 format!(
82 "{}:{}:{}:{}:{}",
83 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
84 )
85 }
86
87 fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
90 format!(
91 "{}:{}:{}:{}:{}",
92 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
93 )
94 }
95
96 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
98 format!(
99 "{}:{}:{}:{}:{}",
100 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
101 )
102 }
103
104 fn relayer_list_key(&self) -> String {
106 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
107 }
108
109 fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
111 format!(
112 "{}:{}:{}:{}",
113 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
114 )
115 }
116
117 fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
122 let lookup_key = self.tx_to_relayer_key(tx_id);
123 let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
124 let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
125 (lookup_key, key_prefix, key_suffix)
126 }
127
128 async fn run_atomic_script(
135 &self,
136 lua: &str,
137 tx_id: &str,
138 extra_args: &[&str],
139 op_name: &str,
140 ) -> Result<TransactionRepoModel, RepositoryError> {
141 const MAX_RETRIES: u32 = 3;
142 const BASE_BACKOFF_MS: u64 = 100;
143
144 let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
145 let script = Script::new(lua);
146 let mut last_error = None;
147
148 for attempt in 0..MAX_RETRIES {
149 let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
150
151 let mut conn = match self
152 .get_connection(self.connections.primary(), op_name)
153 .await
154 {
155 Ok(conn) => conn,
156 Err(e) => {
157 last_error = Some(e);
158 if attempt < MAX_RETRIES - 1 {
159 warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
160 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
161 continue;
162 }
163 return Err(last_error.unwrap());
164 }
165 };
166
167 let mut invocation = script.prepare_invoke();
168 invocation
169 .key(&lookup_key)
170 .arg(&key_prefix)
171 .arg(&key_suffix);
172 for arg in extra_args {
173 invocation.arg(*arg);
174 }
175
176 match invocation.invoke_async::<Option<String>>(&mut conn).await {
177 Ok(result) => {
178 let json = result.ok_or_else(|| {
179 RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
180 })?;
181 return self.deserialize_entity::<TransactionRepoModel>(
182 &json,
183 tx_id,
184 "transaction",
185 );
186 }
187 Err(e) => {
188 last_error = Some(self.map_redis_error(e, op_name));
189 if attempt < MAX_RETRIES - 1 {
190 warn!(
191 tx_id = %tx_id, attempt, op = %op_name,
192 "atomic script failed, retrying"
193 );
194 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
195 continue;
196 }
197 return Err(last_error.unwrap());
198 }
199 }
200 }
201 Err(last_error.unwrap_or_else(|| {
202 RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
203 }))
204 }
205
206 async fn run_script_with_retry_vec(
210 &self,
211 script: &Script,
212 lookup_key: &str,
213 key_prefix: &str,
214 key_suffix: &str,
215 extra_args: &[&str],
216 op_name: &str,
217 ) -> Result<Option<Vec<String>>, RepositoryError> {
218 const MAX_RETRIES: u32 = 3;
219 const BASE_BACKOFF_MS: u64 = 100;
220
221 let mut last_error = None;
222
223 for attempt in 0..MAX_RETRIES {
224 let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
225
226 let mut conn = match self
227 .get_connection(self.connections.primary(), op_name)
228 .await
229 {
230 Ok(conn) => conn,
231 Err(e) => {
232 last_error = Some(e);
233 if attempt < MAX_RETRIES - 1 {
234 warn!(op = %op_name, attempt, "connection failed, retrying");
235 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
236 continue;
237 }
238 return Err(last_error.unwrap());
239 }
240 };
241
242 let mut invocation = script.prepare_invoke();
243 invocation.key(lookup_key).arg(key_prefix).arg(key_suffix);
244 for arg in extra_args {
245 invocation.arg(*arg);
246 }
247
248 match invocation
251 .invoke_async::<Option<Vec<String>>>(&mut conn)
252 .await
253 {
254 Ok(result) => return Ok(result),
255 Err(e) => {
256 last_error = Some(self.map_redis_error(e, op_name));
257 if attempt < MAX_RETRIES - 1 {
258 warn!(op = %op_name, attempt, "script failed, retrying");
259 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
260 continue;
261 }
262 return Err(last_error.unwrap());
263 }
264 }
265 }
266 Err(last_error.unwrap_or_else(|| {
267 RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
268 }))
269 }
270
271 fn timestamp_to_score(&self, timestamp: &str) -> f64 {
273 chrono::DateTime::parse_from_rfc3339(timestamp)
274 .map(|dt| dt.timestamp_millis() as f64)
275 .unwrap_or_else(|_| {
276 warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
277 0.0
278 })
279 }
280
281 fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
285 if tx.status == TransactionStatus::Confirmed {
286 if let Some(ref confirmed_at) = tx.confirmed_at {
288 return self.timestamp_to_score(confirmed_at);
289 }
290 warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
292 }
293 self.timestamp_to_score(&tx.created_at)
294 }
295
296 async fn get_transactions_by_ids(
298 &self,
299 ids: &[String],
300 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
301 if ids.is_empty() {
302 debug!("no transaction IDs provided for batch fetch");
303 return Ok(BatchRetrievalResult {
304 results: vec![],
305 failed_ids: vec![],
306 });
307 }
308
309 let mut conn = self
310 .get_connection(self.connections.reader(), "batch_fetch_transactions")
311 .await?;
312
313 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
314
315 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
316
317 let relayer_ids: Vec<Option<String>> = conn
318 .mget(&reverse_keys)
319 .await
320 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
321
322 let mut tx_keys = Vec::new();
323 let mut valid_ids = Vec::new();
324 let mut failed_ids = Vec::new();
325 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
326 match relayer_id {
327 Some(relayer_id) => {
328 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
329 valid_ids.push(ids[i].clone());
330 }
331 None => {
332 warn!(tx_id = %ids[i], "no relayer found for transaction");
333 failed_ids.push(ids[i].clone());
334 }
335 }
336 }
337
338 if tx_keys.is_empty() {
339 debug!("no valid transactions found for batch fetch");
340 return Ok(BatchRetrievalResult {
341 results: vec![],
342 failed_ids,
343 });
344 }
345
346 debug!(count = %tx_keys.len(), "batch fetching transaction data");
347
348 let values: Vec<Option<String>> = conn
349 .mget(&tx_keys)
350 .await
351 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
352
353 let mut transactions = Vec::new();
354 let mut failed_count = 0;
355 for (i, value) in values.into_iter().enumerate() {
356 match value {
357 Some(json) => {
358 match self.deserialize_entity::<TransactionRepoModel>(
359 &json,
360 &valid_ids[i],
361 "transaction",
362 ) {
363 Ok(tx) => transactions.push(tx),
364 Err(e) => {
365 failed_count += 1;
366 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
367 }
369 }
370 }
371 None => {
372 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
373 failed_ids.push(valid_ids[i].clone());
374 }
375 }
376 }
377
378 if failed_count > 0 {
379 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
380 }
381
382 debug!(count = %transactions.len(), "successfully fetched transactions");
383 Ok(BatchRetrievalResult {
384 results: transactions,
385 failed_ids,
386 })
387 }
388
389 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
391 match network_data.get_evm_transaction_data() {
392 Ok(tx_data) => tx_data.nonce,
393 Err(_) => {
394 debug!("no EVM transaction data available for nonce extraction");
395 None
396 }
397 }
398 }
399
400 async fn ensure_status_sorted_set(
417 &self,
418 relayer_id: &str,
419 status: &TransactionStatus,
420 ) -> Result<u64, RepositoryError> {
421 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
422 let legacy_key = self.relayer_status_key(relayer_id, status);
423
424 let legacy_ids = {
426 let mut conn = self
427 .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
428 .await?;
429
430 let legacy_count: u64 = conn
432 .scard(&legacy_key)
433 .await
434 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
435
436 if legacy_count == 0 {
437 let sorted_count: u64 = conn
439 .zcard(&sorted_key)
440 .await
441 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
442 return Ok(sorted_count);
443 }
444
445 debug!(
447 relayer_id = %relayer_id,
448 status = %status,
449 legacy_count = %legacy_count,
450 "migrating status set to sorted set"
451 );
452
453 let ids: Vec<String> = conn
454 .smembers(&legacy_key)
455 .await
456 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
457
458 ids
459 };
461
462 if legacy_ids.is_empty() {
463 return Ok(0);
464 }
465
466 let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
468
469 let mut conn = self
471 .get_connection(
472 self.connections.primary(),
473 "ensure_status_sorted_set_migrate",
474 )
475 .await?;
476
477 if transactions.results.is_empty() {
478 let _: () = conn
480 .del(&legacy_key)
481 .await
482 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
483 return Ok(0);
484 }
485
486 let mut pipe = redis::pipe();
489 pipe.atomic();
490
491 for tx in &transactions.results {
492 let score = self.status_sorted_score(tx);
493 pipe.zadd(&sorted_key, &tx.id, score);
494 }
495
496 pipe.del(&legacy_key);
498
499 pipe.query_async::<()>(&mut conn)
500 .await
501 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
502
503 let migrated_count = transactions.results.len() as u64;
504 debug!(
505 relayer_id = %relayer_id,
506 status = %status,
507 migrated_count = %migrated_count,
508 "completed migration of status set to sorted set"
509 );
510
511 Ok(migrated_count)
512 }
513
514 async fn update_indexes(
516 &self,
517 tx: &TransactionRepoModel,
518 old_tx: Option<&TransactionRepoModel>,
519 ) -> Result<(), RepositoryError> {
520 let mut conn = self
521 .get_connection(self.connections.primary(), "update_indexes")
522 .await?;
523 let mut pipe = redis::pipe();
524 pipe.atomic();
525
526 debug!(tx_id = %tx.id, "updating indexes for transaction");
527
528 let relayer_list_key = self.relayer_list_key();
530 pipe.sadd(&relayer_list_key, &tx.relayer_id);
531
532 let status_score = self.status_sorted_score(tx);
535 let created_at_score = self.timestamp_to_score(&tx.created_at);
537
538 let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
540 pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
541 debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
542
543 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
544 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
545 pipe.set(&nonce_key, &tx.id);
546 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
547 }
548
549 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
551 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
552 debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
553
554 if let Some(old) = old_tx {
556 if old.status != tx.status {
557 let old_status_sorted_key =
559 self.relayer_status_sorted_key(&old.relayer_id, &old.status);
560 pipe.zrem(&old_status_sorted_key, &tx.id);
561
562 let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
564 pipe.srem(&old_status_legacy_key, &tx.id);
565
566 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
567 }
568
569 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
571 let new_nonce = self.extract_nonce(&tx.network_data);
572 if Some(old_nonce) != new_nonce {
573 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
574 pipe.del(&old_nonce_key);
575 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
576 }
577 }
578 }
579
580 pipe.exec_async(&mut conn).await.map_err(|e| {
582 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
583 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
584 })?;
585
586 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
587 Ok(())
588 }
589
590 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
592 let mut conn = self
593 .get_connection(self.connections.primary(), "remove_all_indexes")
594 .await?;
595 let mut pipe = redis::pipe();
596 pipe.atomic();
597
598 debug!(tx_id = %tx.id, "removing all indexes for transaction");
599
600 for status in &[
604 TransactionStatus::Canceled,
605 TransactionStatus::Pending,
606 TransactionStatus::Sent,
607 TransactionStatus::Submitted,
608 TransactionStatus::Mined,
609 TransactionStatus::Confirmed,
610 TransactionStatus::Failed,
611 TransactionStatus::Expired,
612 ] {
613 let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
615 pipe.zrem(&status_sorted_key, &tx.id);
616
617 let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
619 pipe.srem(&status_legacy_key, &tx.id);
620 }
621
622 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
624 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
625 pipe.del(&nonce_key);
626 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
627 }
628
629 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
631 pipe.zrem(&relayer_sorted_key, &tx.id);
632 debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
633
634 let reverse_key = self.tx_to_relayer_key(&tx.id);
636 pipe.del(&reverse_key);
637
638 pipe.exec_async(&mut conn).await.map_err(|e| {
639 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
640 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
641 })?;
642
643 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
644 Ok(())
645 }
646
647 fn track_status_change_metrics(
649 &self,
650 _original_tx: &TransactionRepoModel,
651 updated_tx: &TransactionRepoModel,
652 old_status: &TransactionStatus,
653 new_status: &TransactionStatus,
654 ) {
655 let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
656 let relayer_id = updated_tx.relayer_id.as_str();
657
658 if *old_status != TransactionStatus::Submitted
660 && *new_status == TransactionStatus::Submitted
661 {
662 TRANSACTIONS_SUBMITTED
663 .with_label_values(&[relayer_id, &network_type])
664 .inc();
665
666 if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) {
667 let processing_seconds =
668 (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64;
669 TRANSACTION_PROCESSING_TIME
670 .with_label_values(&[relayer_id, &network_type, "creation_to_submission"])
671 .observe(processing_seconds);
672 }
673 }
674
675 if old_status != new_status {
677 let old_status_str = format!("{old_status:?}").to_lowercase();
678 let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
679 relayer_id,
680 &network_type,
681 &old_status_str,
682 ]);
683 let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
684 old_status_gauge.set(clamped_value);
685
686 let new_status_str = format!("{new_status:?}").to_lowercase();
687 TRANSACTIONS_BY_STATUS
688 .with_label_values(&[relayer_id, &network_type, &new_status_str])
689 .inc();
690 }
691
692 let was_final = is_final_state(old_status);
694 let is_final = is_final_state(new_status);
695
696 if !was_final && is_final {
697 let previous_status = format!("{old_status:?}").to_lowercase();
698 let meta = updated_tx.metadata.as_ref();
699 let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0);
700 let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0);
701
702 match new_status {
703 TransactionStatus::Confirmed => {
704 TRANSACTIONS_SUCCESS
705 .with_label_values(&[relayer_id, &network_type])
706 .inc();
707 if had_insufficient_fee {
708 TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS
709 .with_label_values(&[relayer_id, &network_type])
710 .inc();
711 }
712 if had_try_again_later {
713 TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS
714 .with_label_values(&[relayer_id, &network_type])
715 .inc();
716 }
717
718 if let (Some(sent_at_str), Some(confirmed_at_str)) =
719 (&updated_tx.sent_at, &updated_tx.confirmed_at)
720 {
721 if let (Ok(sent_time), Ok(confirmed_time)) = (
722 chrono::DateTime::parse_from_rfc3339(sent_at_str),
723 chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
724 ) {
725 let processing_seconds = (confirmed_time.with_timezone(&Utc)
726 - sent_time.with_timezone(&Utc))
727 .num_seconds()
728 as f64;
729 TRANSACTION_PROCESSING_TIME
730 .with_label_values(&[
731 relayer_id,
732 &network_type,
733 "submission_to_confirmation",
734 ])
735 .observe(processing_seconds);
736 }
737 }
738
739 if let Ok(created_time) =
740 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
741 {
742 if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
743 if let Ok(confirmed_time) =
744 chrono::DateTime::parse_from_rfc3339(confirmed_at_str)
745 {
746 let processing_seconds = (confirmed_time.with_timezone(&Utc)
747 - created_time.with_timezone(&Utc))
748 .num_seconds()
749 as f64;
750 TRANSACTION_PROCESSING_TIME
751 .with_label_values(&[
752 relayer_id,
753 &network_type,
754 "creation_to_confirmation",
755 ])
756 .observe(processing_seconds);
757 }
758 }
759 }
760 }
761 TransactionStatus::Failed => {
762 let failure_reason = updated_tx
763 .status_reason
764 .as_deref()
765 .map(|reason| {
766 if reason.starts_with("Submission failed:") {
767 "submission_failed"
768 } else if reason.starts_with("Preparation failed:") {
769 "preparation_failed"
770 } else {
771 "failed"
772 }
773 })
774 .unwrap_or("failed");
775 TRANSACTIONS_FAILED
776 .with_label_values(&[
777 relayer_id,
778 &network_type,
779 failure_reason,
780 &previous_status,
781 ])
782 .inc();
783 }
784 TransactionStatus::Expired => {
785 TRANSACTIONS_FAILED
786 .with_label_values(&[
787 relayer_id,
788 &network_type,
789 "expired",
790 &previous_status,
791 ])
792 .inc();
793 }
794 TransactionStatus::Canceled => {
795 TRANSACTIONS_FAILED
796 .with_label_values(&[
797 relayer_id,
798 &network_type,
799 "canceled",
800 &previous_status,
801 ])
802 .inc();
803 }
804 _ => {}
805 }
806
807 if *new_status != TransactionStatus::Confirmed {
809 if had_insufficient_fee {
810 TRANSACTIONS_INSUFFICIENT_FEE_FAILED
811 .with_label_values(&[relayer_id, &network_type])
812 .inc();
813 }
814 if had_try_again_later {
815 TRANSACTIONS_TRY_AGAIN_LATER_FAILED
816 .with_label_values(&[relayer_id, &network_type])
817 .inc();
818 }
819 }
820 }
821 }
822}
823
824impl fmt::Debug for RedisTransactionRepository {
825 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
826 f.debug_struct("RedisTransactionRepository")
827 .field("connections", &"<RedisConnections>")
828 .field("key_prefix", &self.key_prefix)
829 .finish()
830 }
831}
832
833#[async_trait]
834impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
835 async fn create(
836 &self,
837 entity: TransactionRepoModel,
838 ) -> Result<TransactionRepoModel, RepositoryError> {
839 if entity.id.is_empty() {
840 return Err(RepositoryError::InvalidData(
841 "Transaction ID cannot be empty".to_string(),
842 ));
843 }
844
845 let key = self.tx_key(&entity.relayer_id, &entity.id);
846 let reverse_key = self.tx_to_relayer_key(&entity.id);
847 let mut conn = self
848 .get_connection(self.connections.primary(), "create")
849 .await?;
850
851 debug!(tx_id = %entity.id, "creating transaction");
852
853 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
854
855 let existing: Option<String> = conn
857 .get(&reverse_key)
858 .await
859 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
860
861 if existing.is_some() {
862 return Err(RepositoryError::ConstraintViolation(format!(
863 "Transaction with ID {} already exists",
864 entity.id
865 )));
866 }
867
868 let mut pipe = redis::pipe();
870 pipe.atomic();
871 pipe.set(&key, &value);
872 pipe.set(&reverse_key, &entity.relayer_id);
873
874 pipe.exec_async(&mut conn)
875 .await
876 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
877
878 if let Err(e) = self.update_indexes(&entity, None).await {
880 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
881 return Err(e);
882 }
883
884 let network_type = format!("{:?}", entity.network_type).to_lowercase();
886 let relayer_id = entity.relayer_id.as_str();
887 TRANSACTIONS_CREATED
888 .with_label_values(&[relayer_id, &network_type])
889 .inc();
890
891 let status = &entity.status;
893 let status_str = format!("{status:?}").to_lowercase();
894 TRANSACTIONS_BY_STATUS
895 .with_label_values(&[relayer_id, &network_type, &status_str])
896 .inc();
897
898 debug!(tx_id = %entity.id, "successfully created transaction");
899 Ok(entity)
900 }
901
902 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
903 if id.is_empty() {
904 return Err(RepositoryError::InvalidData(
905 "Transaction ID cannot be empty".to_string(),
906 ));
907 }
908
909 let mut conn = self
910 .get_connection(self.connections.reader(), "get_by_id")
911 .await?;
912
913 debug!(tx_id = %id, "fetching transaction");
914
915 let reverse_key = self.tx_to_relayer_key(&id);
916 let relayer_id: Option<String> = conn
917 .get(&reverse_key)
918 .await
919 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
920
921 let relayer_id = match relayer_id {
922 Some(relayer_id) => relayer_id,
923 None => {
924 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
925 return Err(RepositoryError::NotFound(format!(
926 "Transaction with ID {id} not found"
927 )));
928 }
929 };
930
931 let key = self.tx_key(&relayer_id, &id);
932 let value: Option<String> = conn
933 .get(&key)
934 .await
935 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
936
937 match value {
938 Some(json) => {
939 let tx =
940 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
941 debug!(tx_id = %id, "successfully fetched transaction");
942 Ok(tx)
943 }
944 None => {
945 debug!(tx_id = %id, "transaction not found");
946 Err(RepositoryError::NotFound(format!(
947 "Transaction with ID {id} not found"
948 )))
949 }
950 }
951 }
952
953 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
955 let mut conn = self
956 .get_connection(self.connections.reader(), "list_all")
957 .await?;
958
959 debug!("fetching all transactions sorted by created_at (newest first)");
960
961 let relayer_list_key = self.relayer_list_key();
963 let relayer_ids: Vec<String> = conn
964 .smembers(&relayer_list_key)
965 .await
966 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
967
968 debug!(count = %relayer_ids.len(), "found relayers");
969
970 let mut all_tx_ids = Vec::new();
972 for relayer_id in relayer_ids {
973 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
974 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
975 .arg(&relayer_sorted_key)
976 .arg(0)
977 .arg(-1)
978 .arg("REV")
979 .query_async(&mut conn)
980 .await
981 .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
982
983 all_tx_ids.extend(tx_ids);
984 }
985
986 drop(conn);
988
989 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
991 let mut all_transactions = batch_result.results;
992
993 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
995
996 debug!(count = %all_transactions.len(), "found transactions");
997 Ok(all_transactions)
998 }
999
1000 async fn list_paginated(
1002 &self,
1003 query: PaginationQuery,
1004 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1005 if query.per_page == 0 {
1006 return Err(RepositoryError::InvalidData(
1007 "per_page must be greater than 0".to_string(),
1008 ));
1009 }
1010
1011 let mut conn = self
1012 .get_connection(self.connections.reader(), "list_paginated")
1013 .await?;
1014
1015 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
1016
1017 let relayer_list_key = self.relayer_list_key();
1019 let relayer_ids: Vec<String> = conn
1020 .smembers(&relayer_list_key)
1021 .await
1022 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
1023
1024 let mut all_tx_ids = Vec::new();
1026 for relayer_id in relayer_ids {
1027 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1028 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
1029 .arg(&relayer_sorted_key)
1030 .arg(0)
1031 .arg(-1)
1032 .arg("REV")
1033 .query_async(&mut conn)
1034 .await
1035 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
1036
1037 all_tx_ids.extend(tx_ids);
1038 }
1039
1040 drop(conn);
1042
1043 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
1045 let mut all_transactions = batch_result.results;
1046
1047 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1049
1050 let total = all_transactions.len() as u64;
1051 let start = ((query.page - 1) * query.per_page) as usize;
1052 let end = (start + query.per_page as usize).min(all_transactions.len());
1053
1054 if start >= all_transactions.len() {
1055 debug!(page = %query.page, total = %total, "page is beyond available data");
1056 return Ok(PaginatedResult {
1057 items: vec![],
1058 total,
1059 page: query.page,
1060 per_page: query.per_page,
1061 });
1062 }
1063
1064 let items = all_transactions[start..end].to_vec();
1065
1066 debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
1067
1068 Ok(PaginatedResult {
1069 items,
1070 total,
1071 page: query.page,
1072 per_page: query.per_page,
1073 })
1074 }
1075
1076 async fn update(
1077 &self,
1078 id: String,
1079 entity: TransactionRepoModel,
1080 ) -> Result<TransactionRepoModel, RepositoryError> {
1081 if id.is_empty() {
1082 return Err(RepositoryError::InvalidData(
1083 "Transaction ID cannot be empty".to_string(),
1084 ));
1085 }
1086
1087 debug!(tx_id = %id, "updating transaction");
1088
1089 let old_tx = self.get_by_id(id.clone()).await?;
1091
1092 let key = self.tx_key(&entity.relayer_id, &id);
1093 let mut conn = self
1094 .get_connection(self.connections.primary(), "update")
1095 .await?;
1096
1097 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
1098
1099 let _: () = conn
1101 .set(&key, value)
1102 .await
1103 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
1104
1105 self.update_indexes(&entity, Some(&old_tx)).await?;
1107
1108 debug!(tx_id = %id, "successfully updated transaction");
1109 Ok(entity)
1110 }
1111
1112 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
1113 if id.is_empty() {
1114 return Err(RepositoryError::InvalidData(
1115 "Transaction ID cannot be empty".to_string(),
1116 ));
1117 }
1118
1119 debug!(tx_id = %id, "deleting transaction");
1120
1121 let tx = self.get_by_id(id.clone()).await?;
1123
1124 let key = self.tx_key(&tx.relayer_id, &id);
1125 let reverse_key = self.tx_to_relayer_key(&id);
1126 let mut conn = self
1127 .get_connection(self.connections.primary(), "delete_by_id")
1128 .await?;
1129
1130 let mut pipe = redis::pipe();
1131 pipe.atomic();
1132 pipe.del(&key);
1133 pipe.del(&reverse_key);
1134
1135 pipe.exec_async(&mut conn)
1136 .await
1137 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
1138
1139 if let Err(e) = self.remove_all_indexes(&tx).await {
1141 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
1142 }
1143
1144 debug!(tx_id = %id, "successfully deleted transaction");
1145 Ok(())
1146 }
1147
1148 async fn count(&self) -> Result<usize, RepositoryError> {
1150 let mut conn = self
1151 .get_connection(self.connections.reader(), "count")
1152 .await?;
1153
1154 debug!("counting transactions");
1155
1156 let relayer_list_key = self.relayer_list_key();
1158 let relayer_ids: Vec<String> = conn
1159 .smembers(&relayer_list_key)
1160 .await
1161 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
1162
1163 let mut total_count = 0usize;
1164 for relayer_id in relayer_ids {
1165 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1166 let count: usize = conn
1167 .zcard(&relayer_sorted_key)
1168 .await
1169 .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
1170 total_count += count;
1171 }
1172
1173 debug!(count = %total_count, "transaction count");
1174 Ok(total_count)
1175 }
1176
1177 async fn has_entries(&self) -> Result<bool, RepositoryError> {
1178 let mut conn = self
1179 .get_connection(self.connections.reader(), "has_entries")
1180 .await?;
1181 let relayer_list_key = self.relayer_list_key();
1182
1183 debug!("checking if transaction entries exist");
1184
1185 let exists: bool = conn
1186 .exists(&relayer_list_key)
1187 .await
1188 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
1189
1190 debug!(exists = %exists, "transaction entries exist");
1191 Ok(exists)
1192 }
1193
1194 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
1195 let mut conn = self
1196 .get_connection(self.connections.primary(), "drop_all_entries")
1197 .await?;
1198 let relayer_list_key = self.relayer_list_key();
1199
1200 debug!("dropping all transaction entries");
1201
1202 let relayer_ids: Vec<String> = conn
1204 .smembers(&relayer_list_key)
1205 .await
1206 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
1207
1208 if relayer_ids.is_empty() {
1209 debug!("no transaction entries to drop");
1210 return Ok(());
1211 }
1212
1213 let mut pipe = redis::pipe();
1215 pipe.atomic();
1216
1217 for relayer_id in &relayer_ids {
1219 let pattern = format!(
1221 "{}:{}:{}:{}:*",
1222 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
1223 );
1224 let mut cursor = 0;
1225 let mut tx_ids = Vec::new();
1226
1227 loop {
1228 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
1229 .cursor_arg(cursor)
1230 .arg("MATCH")
1231 .arg(&pattern)
1232 .query_async(&mut conn)
1233 .await
1234 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
1235
1236 for key in keys {
1238 pipe.del(&key);
1239 if let Some(tx_id) = key.split(':').next_back() {
1240 tx_ids.push(tx_id.to_string());
1241 }
1242 }
1243
1244 cursor = next_cursor;
1245 if cursor == 0 {
1246 break;
1247 }
1248 }
1249
1250 for tx_id in tx_ids {
1252 let reverse_key = self.tx_to_relayer_key(&tx_id);
1253 pipe.del(&reverse_key);
1254
1255 for status in &[
1258 TransactionStatus::Canceled,
1259 TransactionStatus::Pending,
1260 TransactionStatus::Sent,
1261 TransactionStatus::Submitted,
1262 TransactionStatus::Mined,
1263 TransactionStatus::Confirmed,
1264 TransactionStatus::Failed,
1265 TransactionStatus::Expired,
1266 ] {
1267 let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1269 pipe.zrem(&status_sorted_key, &tx_id);
1270
1271 let status_key = self.relayer_status_key(relayer_id, status);
1273 pipe.srem(&status_key, &tx_id);
1274 }
1275 }
1276
1277 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1279 pipe.del(&relayer_sorted_key);
1280 }
1281
1282 pipe.del(&relayer_list_key);
1284
1285 pipe.exec_async(&mut conn)
1286 .await
1287 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
1288
1289 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
1290 Ok(())
1291 }
1292}
1293
1294#[async_trait]
1295impl TransactionRepository for RedisTransactionRepository {
1296 async fn find_by_relayer_id(
1297 &self,
1298 relayer_id: &str,
1299 query: PaginationQuery,
1300 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1301 let mut conn = self
1302 .get_connection(self.connections.reader(), "find_by_relayer_id")
1303 .await?;
1304
1305 debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
1306
1307 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1308
1309 let sorted_set_count: u64 = conn
1311 .zcard(&relayer_sorted_key)
1312 .await
1313 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
1314
1315 if sorted_set_count == 0 {
1318 debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
1319 return Ok(PaginatedResult {
1320 items: vec![],
1321 total: 0,
1322 page: query.page,
1323 per_page: query.per_page,
1324 });
1325 }
1326
1327 let total = sorted_set_count;
1328
1329 let start = ((query.page - 1) * query.per_page) as isize;
1331 let end = start + query.per_page as isize - 1;
1332
1333 if start as u64 >= total {
1334 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1335 return Ok(PaginatedResult {
1336 items: vec![],
1337 total,
1338 page: query.page,
1339 per_page: query.per_page,
1340 });
1341 }
1342
1343 let page_ids: Vec<String> = redis::cmd("ZRANGE")
1345 .arg(&relayer_sorted_key)
1346 .arg(start)
1347 .arg(end)
1348 .arg("REV")
1349 .query_async(&mut conn)
1350 .await
1351 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1352
1353 drop(conn);
1355
1356 let items = self.get_transactions_by_ids(&page_ids).await?;
1357
1358 debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1359
1360 Ok(PaginatedResult {
1361 items: items.results,
1362 total,
1363 page: query.page,
1364 per_page: query.per_page,
1365 })
1366 }
1367
1368 async fn find_by_status(
1370 &self,
1371 relayer_id: &str,
1372 statuses: &[TransactionStatus],
1373 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1374 for status in statuses {
1376 self.ensure_status_sorted_set(relayer_id, status).await?;
1377 }
1378
1379 let mut conn = self
1381 .get_connection(self.connections.reader(), "find_by_status")
1382 .await?;
1383
1384 let mut all_ids: Vec<String> = Vec::new();
1385 for status in statuses {
1386 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1388 let ids: Vec<String> = redis::cmd("ZRANGE")
1389 .arg(&sorted_key)
1390 .arg(0)
1391 .arg(-1)
1392 .arg("REV") .query_async(&mut conn)
1394 .await
1395 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1396
1397 all_ids.extend(ids);
1398 }
1399
1400 drop(conn);
1402
1403 if all_ids.is_empty() {
1404 return Ok(vec![]);
1405 }
1406
1407 all_ids.sort();
1409 all_ids.dedup();
1410
1411 let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1413
1414 transactions
1416 .results
1417 .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1418
1419 Ok(transactions.results)
1420 }
1421
1422 async fn find_by_status_paginated(
1423 &self,
1424 relayer_id: &str,
1425 statuses: &[TransactionStatus],
1426 query: PaginationQuery,
1427 oldest_first: bool,
1428 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1429 for status in statuses {
1431 self.ensure_status_sorted_set(relayer_id, status).await?;
1432 }
1433
1434 let mut conn = self
1435 .get_connection(self.connections.reader(), "find_by_status_paginated")
1436 .await?;
1437
1438 if statuses.len() == 1 {
1440 let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1441
1442 let total: u64 = conn
1444 .zcard(&sorted_key)
1445 .await
1446 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1447
1448 if total == 0 {
1449 return Ok(PaginatedResult {
1450 items: vec![],
1451 total: 0,
1452 page: query.page,
1453 per_page: query.per_page,
1454 });
1455 }
1456
1457 let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1459 let end = start + query.per_page as isize - 1;
1460
1461 let mut cmd = redis::cmd("ZRANGE");
1464 cmd.arg(&sorted_key).arg(start).arg(end);
1465 if !oldest_first {
1466 cmd.arg("REV");
1467 }
1468 let page_ids: Vec<String> = cmd
1469 .query_async(&mut conn)
1470 .await
1471 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1472
1473 drop(conn);
1475
1476 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1477
1478 debug!(
1479 relayer_id = %relayer_id,
1480 status = %statuses[0],
1481 total = %total,
1482 page = %query.page,
1483 page_size = %transactions.results.len(),
1484 "fetched paginated transactions by single status"
1485 );
1486
1487 return Ok(PaginatedResult {
1488 items: transactions.results,
1489 total,
1490 page: query.page,
1491 per_page: query.per_page,
1492 });
1493 }
1494
1495 let mut all_ids: Vec<(String, f64)> = Vec::new();
1497 for status in statuses {
1498 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1499
1500 let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1502 .arg(&sorted_key)
1503 .arg(0)
1504 .arg(-1)
1505 .arg("WITHSCORES")
1506 .query_async(&mut conn)
1507 .await
1508 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1509
1510 all_ids.extend(ids_with_scores);
1511 }
1512
1513 drop(conn);
1515
1516 let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1518 for (id, score) in all_ids {
1519 id_map
1520 .entry(id)
1521 .and_modify(|s| {
1522 if oldest_first {
1524 if score < *s {
1525 *s = score
1526 }
1527 } else if score > *s {
1528 *s = score
1529 }
1530 })
1531 .or_insert(score);
1532 }
1533
1534 let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1536 if oldest_first {
1537 sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1538 } else {
1539 sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1540 }
1541
1542 let total = sorted_ids.len() as u64;
1543
1544 if total == 0 {
1545 return Ok(PaginatedResult {
1546 items: vec![],
1547 total: 0,
1548 page: query.page,
1549 per_page: query.per_page,
1550 });
1551 }
1552
1553 let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1555 let page_ids: Vec<String> = sorted_ids
1556 .into_iter()
1557 .skip(start)
1558 .take(query.per_page as usize)
1559 .map(|(id, _)| id)
1560 .collect();
1561
1562 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1564
1565 debug!(
1566 relayer_id = %relayer_id,
1567 total = %total,
1568 page = %query.page,
1569 page_size = %transactions.results.len(),
1570 "fetched paginated transactions by status"
1571 );
1572
1573 Ok(PaginatedResult {
1574 items: transactions.results,
1575 total,
1576 page: query.page,
1577 per_page: query.per_page,
1578 })
1579 }
1580
1581 async fn find_by_nonce(
1582 &self,
1583 relayer_id: &str,
1584 nonce: u64,
1585 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1586 let mut conn = self
1587 .get_connection(self.connections.reader(), "find_by_nonce")
1588 .await?;
1589 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1590
1591 let tx_id: Option<String> = conn
1593 .get(nonce_key)
1594 .await
1595 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1596
1597 match tx_id {
1598 Some(tx_id) => {
1599 match self.get_by_id(tx_id.clone()).await {
1600 Ok(tx) => Ok(Some(tx)),
1601 Err(RepositoryError::NotFound(_)) => {
1602 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1604 Ok(None)
1605 }
1606 Err(e) => Err(e),
1607 }
1608 }
1609 None => Ok(None),
1610 }
1611 }
1612
1613 async fn get_nonce_occupancy(
1614 &self,
1615 relayer_id: &str,
1616 from_nonce: u64,
1617 to_nonce: u64,
1618 ) -> Result<Vec<(u64, Option<TransactionStatus>)>, RepositoryError> {
1619 if from_nonce >= to_nonce {
1620 return Ok(vec![]);
1621 }
1622
1623 let nonces: Vec<u64> = (from_nonce..to_nonce).collect();
1624 let nonce_keys: Vec<String> = nonces
1625 .iter()
1626 .map(|n| self.relayer_nonce_key(relayer_id, *n))
1627 .collect();
1628
1629 let mut conn = self
1632 .get_connection(self.connections.primary(), "get_nonce_occupancy")
1633 .await?;
1634 let tx_ids: Vec<Option<String>> = redis::cmd("MGET")
1635 .arg(&nonce_keys)
1636 .query_async(&mut conn)
1637 .await
1638 .map_err(|e| self.map_redis_error(e, "get_nonce_occupancy:mget_nonces"))?;
1639
1640 let mut tx_key_entries: Vec<(usize, String)> = Vec::new();
1643 for (i, tx_id) in tx_ids.iter().enumerate() {
1644 if let Some(id) = tx_id {
1645 tx_key_entries.push((i, self.tx_key(relayer_id, id)));
1646 }
1647 }
1648
1649 let tx_statuses: Vec<Option<TransactionStatus>> = if tx_key_entries.is_empty() {
1651 vec![]
1652 } else {
1653 let data_keys: Vec<&str> = tx_key_entries.iter().map(|(_, k)| k.as_str()).collect();
1654 let raw_values: Vec<Option<String>> = redis::cmd("MGET")
1655 .arg(&data_keys)
1656 .query_async(&mut conn)
1657 .await
1658 .map_err(|e| self.map_redis_error(e, "get_nonce_occupancy:mget_txs"))?;
1659
1660 raw_values
1661 .into_iter()
1662 .enumerate()
1663 .map(|(i, v)| {
1664 v.and_then(|json| {
1665 match serde_json::from_str::<TransactionRepoModel>(&json) {
1666 Ok(tx) => Some(tx.status),
1667 Err(e) => {
1668 let nonce = tx_key_entries.get(i).map(|(idx, _)| nonces[*idx]);
1669 warn!(
1670 relayer_id = %relayer_id,
1671 nonce = ?nonce,
1672 error = %e,
1673 "get_nonce_occupancy: failed to deserialize transaction, treating as empty"
1674 );
1675 None
1676 }
1677 }
1678 })
1679 })
1680 .collect()
1681 };
1682
1683 let mut results: Vec<(u64, Option<TransactionStatus>)> =
1685 nonces.iter().map(|n| (*n, None)).collect();
1686
1687 for (idx, (original_idx, _)) in tx_key_entries.iter().enumerate() {
1688 if let Some(status) = tx_statuses.get(idx).and_then(|s| s.clone()) {
1689 results[*original_idx].1 = Some(status);
1690 }
1691 }
1692
1693 Ok(results)
1694 }
1695
1696 async fn update_status(
1697 &self,
1698 tx_id: String,
1699 status: TransactionStatus,
1700 ) -> Result<TransactionRepoModel, RepositoryError> {
1701 let update = TransactionUpdateRequest {
1702 status: Some(status),
1703 ..Default::default()
1704 };
1705 self.partial_update(tx_id, update).await
1706 }
1707
1708 async fn partial_update(
1709 &self,
1710 tx_id: String,
1711 update: TransactionUpdateRequest,
1712 ) -> Result<TransactionRepoModel, RepositoryError> {
1713 let patch_json = serde_json::to_string(&update).map_err(|e| {
1715 RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}"))
1716 })?;
1717
1718 let delete_at_value = if let Some(ref status) = update.status {
1721 if FINAL_TRANSACTION_STATUSES.contains(status) {
1722 let expiration_hours = ServerConfig::get_transaction_expiration_hours();
1723 let seconds = (expiration_hours * 3600.0) as i64;
1724 let delete_time = Utc::now() + chrono::Duration::seconds(seconds);
1725 Some(delete_time.to_rfc3339())
1726 } else {
1727 None
1728 }
1729 } else {
1730 None
1731 };
1732 let delete_at_arg = delete_at_value.as_deref().unwrap_or("");
1733
1734 let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id);
1735
1736 let patch_script = Script::new(
1742 r#"
1743 local relayer_id = redis.call('GET', KEYS[1])
1744 if not relayer_id then return false end
1745
1746 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1747 local current = redis.call('GET', tx_key)
1748 if not current then return false end
1749
1750 local tx = cjson.decode(current)
1751 local patch = cjson.decode(ARGV[3])
1752
1753 -- Guard: reject status changes on finalized transactions.
1754 -- A stale worker must not resurrect a tx that another worker
1755 -- already moved to a terminal state.
1756 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1757 if final_states[tx["status"]] and patch["status"] then
1758 return {current, current}
1759 end
1760
1761 local old_snapshot = current
1762
1763 -- lua-cjson cannot distinguish empty Lua tables from empty
1764 -- arrays, so a decode/encode round-trip turns [] into {}.
1765 -- Record which keys held [] in the stored doc and the patch
1766 -- so we can restore them after cjson.encode.
1767 -- NOTE: this relies on each array-typed field having a unique key
1768 -- name across the entire JSON document (including nested objects).
1769 -- If the model ever introduces duplicate key names at different
1770 -- nesting levels (e.g. metadata.hashes), the gsub below could
1771 -- restore the wrong occurrence.
1772 local empty_arrs = {}
1773 for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do
1774 empty_arrs[k] = true
1775 end
1776 for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do
1777 empty_arrs[k] = true
1778 end
1779
1780 for k, v in pairs(patch) do
1781 tx[k] = v
1782 end
1783
1784 -- Apply delete_at if transitioning to a final state and not already set
1785 if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then
1786 tx["delete_at"] = ARGV[4]
1787 end
1788
1789 local updated = cjson.encode(tx)
1790
1791 -- Restore empty arrays that cjson.encode converted to {}
1792 for k, _ in pairs(empty_arrs) do
1793 updated = string.gsub(
1794 updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1
1795 )
1796 end
1797
1798 redis.call('SET', tx_key, updated)
1799 return {old_snapshot, updated}
1800 "#,
1801 );
1802
1803 let result: Option<Vec<String>> = self
1804 .run_script_with_retry_vec(
1805 &patch_script,
1806 &lookup_key,
1807 &key_prefix,
1808 &key_suffix,
1809 &[&patch_json, delete_at_arg],
1810 "partial_update",
1811 )
1812 .await?;
1813
1814 let parts = result.ok_or_else(|| {
1815 RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
1816 })?;
1817
1818 if parts.len() != 2 {
1819 return Err(RepositoryError::UnexpectedError(format!(
1820 "partial_update script returned {} elements, expected 2",
1821 parts.len()
1822 )));
1823 }
1824
1825 let old_json = &parts[0];
1826 let new_json = &parts[1];
1827
1828 let original_tx =
1829 self.deserialize_entity::<TransactionRepoModel>(old_json, &tx_id, "transaction")?;
1830 let updated_tx =
1831 self.deserialize_entity::<TransactionRepoModel>(new_json, &tx_id, "transaction")?;
1832
1833 self.update_indexes(&updated_tx, Some(&original_tx)).await?;
1835
1836 debug!(tx_id = %tx_id, "successfully updated transaction via patch");
1837
1838 if original_tx.status != updated_tx.status {
1842 self.track_status_change_metrics(
1843 &original_tx,
1844 &updated_tx,
1845 &original_tx.status,
1846 &updated_tx.status,
1847 );
1848 }
1849
1850 Ok(updated_tx)
1851 }
1852
1853 async fn update_network_data(
1854 &self,
1855 tx_id: String,
1856 network_data: NetworkTransactionData,
1857 ) -> Result<TransactionRepoModel, RepositoryError> {
1858 let update = TransactionUpdateRequest {
1859 network_data: Some(network_data),
1860 ..Default::default()
1861 };
1862 self.partial_update(tx_id, update).await
1863 }
1864
1865 async fn set_sent_at(
1866 &self,
1867 tx_id: String,
1868 sent_at: String,
1869 ) -> Result<TransactionRepoModel, RepositoryError> {
1870 let update = TransactionUpdateRequest {
1871 sent_at: Some(sent_at),
1872 ..Default::default()
1873 };
1874 self.partial_update(tx_id, update).await
1875 }
1876
1877 async fn increment_status_check_failures(
1878 &self,
1879 tx_id: String,
1880 ) -> Result<TransactionRepoModel, RepositoryError> {
1881 self.run_atomic_script(
1882 r#"
1883 local function set_obj(json, key, tbl)
1884 local enc = cjson.encode(tbl)
1885 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1886 if n > 0 then return r end
1887 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1888 if n > 0 then return r end
1889 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1890 end
1891
1892 local relayer_id = redis.call('GET', KEYS[1])
1893 if not relayer_id then return false end
1894
1895 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1896 local current = redis.call('GET', tx_key)
1897 if not current then return false end
1898
1899 local tx = cjson.decode(current)
1900 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1901 if final_states[tx["status"]] then return current end
1902
1903 local metadata = tx["metadata"]
1904 if type(metadata) ~= 'table' then metadata = {} end
1905 metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1
1906 metadata["total_failures"] = (metadata["total_failures"] or 0) + 1
1907
1908 local updated = set_obj(current, "metadata", metadata)
1909 redis.call('SET', tx_key, updated)
1910 return updated
1911 "#,
1912 &tx_id,
1913 &[],
1914 "increment_status_check_failures",
1915 )
1916 .await
1917 }
1918
1919 async fn reset_status_check_consecutive_failures(
1920 &self,
1921 tx_id: String,
1922 ) -> Result<TransactionRepoModel, RepositoryError> {
1923 self.run_atomic_script(
1924 r#"
1925 local function set_obj(json, key, tbl)
1926 local enc = cjson.encode(tbl)
1927 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1928 if n > 0 then return r end
1929 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1930 if n > 0 then return r end
1931 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1932 end
1933
1934 local relayer_id = redis.call('GET', KEYS[1])
1935 if not relayer_id then return false end
1936
1937 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1938 local current = redis.call('GET', tx_key)
1939 if not current then return false end
1940
1941 local tx = cjson.decode(current)
1942 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1943 if final_states[tx["status"]] then return current end
1944
1945 local metadata = tx["metadata"]
1946 if type(metadata) ~= 'table' then metadata = {} end
1947 metadata["consecutive_failures"] = 0
1948
1949 local updated = set_obj(current, "metadata", metadata)
1950 redis.call('SET', tx_key, updated)
1951 return updated
1952 "#,
1953 &tx_id,
1954 &[],
1955 "reset_status_check_consecutive_failures",
1956 )
1957 .await
1958 }
1959
1960 async fn record_stellar_insufficient_fee_retry(
1961 &self,
1962 tx_id: String,
1963 sent_at: String,
1964 ) -> Result<TransactionRepoModel, RepositoryError> {
1965 self.run_atomic_script(
1966 r#"
1967 local function set_str(json, key, val)
1968 local enc = cjson.encode(val)
1969 local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1970 if n > 0 then return r end
1971 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1972 if n > 0 then return r end
1973 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1974 end
1975 local function set_obj(json, key, tbl)
1976 local enc = cjson.encode(tbl)
1977 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1978 if n > 0 then return r end
1979 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1980 if n > 0 then return r end
1981 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1982 end
1983
1984 local relayer_id = redis.call('GET', KEYS[1])
1985 if not relayer_id then return false end
1986
1987 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1988 local current = redis.call('GET', tx_key)
1989 if not current then return false end
1990
1991 local tx = cjson.decode(current)
1992 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1993 if final_states[tx["status"]] then return current end
1994
1995 local metadata = tx["metadata"]
1996 if type(metadata) ~= 'table' then metadata = {} end
1997 metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1
1998
1999 local updated = set_str(current, "sent_at", ARGV[3])
2000 updated = set_obj(updated, "metadata", metadata)
2001 redis.call('SET', tx_key, updated)
2002 return updated
2003 "#,
2004 &tx_id,
2005 &[&sent_at],
2006 "record_stellar_insufficient_fee_retry",
2007 )
2008 .await
2009 }
2010
2011 async fn record_stellar_try_again_later_retry(
2012 &self,
2013 tx_id: String,
2014 sent_at: String,
2015 ) -> Result<TransactionRepoModel, RepositoryError> {
2016 self.run_atomic_script(
2017 r#"
2018 local function set_str(json, key, val)
2019 local enc = cjson.encode(val)
2020 local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
2021 if n > 0 then return r end
2022 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
2023 if n > 0 then return r end
2024 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
2025 end
2026 local function set_obj(json, key, tbl)
2027 local enc = cjson.encode(tbl)
2028 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
2029 if n > 0 then return r end
2030 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
2031 if n > 0 then return r end
2032 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
2033 end
2034
2035 local relayer_id = redis.call('GET', KEYS[1])
2036 if not relayer_id then return false end
2037
2038 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
2039 local current = redis.call('GET', tx_key)
2040 if not current then return false end
2041
2042 local tx = cjson.decode(current)
2043 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
2044 if final_states[tx["status"]] then return current end
2045
2046 local metadata = tx["metadata"]
2047 if type(metadata) ~= 'table' then metadata = {} end
2048 metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1
2049
2050 local updated = set_str(current, "sent_at", ARGV[3])
2051 updated = set_obj(updated, "metadata", metadata)
2052 redis.call('SET', tx_key, updated)
2053 return updated
2054 "#,
2055 &tx_id,
2056 &[&sent_at],
2057 "record_stellar_try_again_later_retry",
2058 )
2059 .await
2060 }
2061
2062 async fn set_confirmed_at(
2063 &self,
2064 tx_id: String,
2065 confirmed_at: String,
2066 ) -> Result<TransactionRepoModel, RepositoryError> {
2067 let update = TransactionUpdateRequest {
2068 confirmed_at: Some(confirmed_at),
2069 ..Default::default()
2070 };
2071 self.partial_update(tx_id, update).await
2072 }
2073
2074 async fn count_by_status(
2078 &self,
2079 relayer_id: &str,
2080 statuses: &[TransactionStatus],
2081 ) -> Result<u64, RepositoryError> {
2082 let mut conn = self
2083 .get_connection(self.connections.reader(), "count_by_status")
2084 .await?;
2085 let mut total_count: u64 = 0;
2086
2087 for status in statuses {
2088 self.ensure_status_sorted_set(relayer_id, status).await?;
2090
2091 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
2092 let count: u64 = conn
2093 .zcard(&sorted_key)
2094 .await
2095 .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
2096 total_count += count;
2097 }
2098
2099 debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
2100 Ok(total_count)
2101 }
2102
2103 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
2104 if ids.is_empty() {
2105 debug!("no transaction IDs provided for batch delete");
2106 return Ok(BatchDeleteResult::default());
2107 }
2108
2109 debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
2110
2111 let batch_result = self.get_transactions_by_ids(&ids).await?;
2113
2114 let requests: Vec<TransactionDeleteRequest> = batch_result
2116 .results
2117 .iter()
2118 .map(|tx| TransactionDeleteRequest {
2119 id: tx.id.clone(),
2120 relayer_id: tx.relayer_id.clone(),
2121 nonce: self.extract_nonce(&tx.network_data),
2122 })
2123 .collect();
2124
2125 let mut result = self.delete_by_requests(requests).await?;
2127
2128 for id in batch_result.failed_ids {
2130 result
2131 .failed
2132 .push((id.clone(), format!("Transaction with ID {id} not found")));
2133 }
2134
2135 Ok(result)
2136 }
2137
2138 async fn delete_by_requests(
2139 &self,
2140 requests: Vec<TransactionDeleteRequest>,
2141 ) -> Result<BatchDeleteResult, RepositoryError> {
2142 if requests.is_empty() {
2143 debug!("no delete requests provided for batch delete");
2144 return Ok(BatchDeleteResult::default());
2145 }
2146
2147 debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
2148 let mut conn = self
2149 .get_connection(self.connections.primary(), "batch_delete_no_fetch")
2150 .await?;
2151 let mut pipe = redis::pipe();
2152 pipe.atomic();
2153
2154 let all_statuses = [
2156 TransactionStatus::Canceled,
2157 TransactionStatus::Pending,
2158 TransactionStatus::Sent,
2159 TransactionStatus::Submitted,
2160 TransactionStatus::Mined,
2161 TransactionStatus::Confirmed,
2162 TransactionStatus::Failed,
2163 TransactionStatus::Expired,
2164 ];
2165
2166 for req in &requests {
2168 let tx_key = self.tx_key(&req.relayer_id, &req.id);
2170 pipe.del(&tx_key);
2171
2172 let reverse_key = self.tx_to_relayer_key(&req.id);
2174 pipe.del(&reverse_key);
2175
2176 for status in &all_statuses {
2178 let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
2179 pipe.zrem(&status_sorted_key, &req.id);
2180
2181 let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
2182 pipe.srem(&status_legacy_key, &req.id);
2183 }
2184
2185 if let Some(nonce) = req.nonce {
2187 let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
2188 pipe.del(&nonce_key);
2189 }
2190
2191 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
2193 pipe.zrem(&relayer_sorted_key, &req.id);
2194 }
2195
2196 match pipe.exec_async(&mut conn).await {
2198 Ok(_) => {
2199 let deleted_count = requests.len();
2200 debug!(
2201 deleted_count = %deleted_count,
2202 "batch delete completed"
2203 );
2204 Ok(BatchDeleteResult {
2205 deleted_count,
2206 failed: vec![],
2207 })
2208 }
2209 Err(e) => {
2210 error!(error = %e, "batch delete pipeline failed");
2211 let failed: Vec<(String, String)> = requests
2213 .iter()
2214 .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
2215 .collect();
2216 Ok(BatchDeleteResult {
2217 deleted_count: 0,
2218 failed,
2219 })
2220 }
2221 }
2222 }
2223}
2224
2225#[cfg(test)]
2226mod tests {
2227 use super::*;
2228 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
2229 use alloy::primitives::U256;
2230 use deadpool_redis::{Config, Runtime};
2231 use lazy_static::lazy_static;
2232 use std::str::FromStr;
2233 use tokio;
2234 use uuid::Uuid;
2235
2236 use tokio::sync::Mutex;
2237
2238 lazy_static! {
2240 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
2241 }
2242
2243 fn create_test_transaction(id: &str) -> TransactionRepoModel {
2245 TransactionRepoModel {
2246 id: id.to_string(),
2247 relayer_id: "relayer-1".to_string(),
2248 status: TransactionStatus::Pending,
2249 status_reason: None,
2250 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
2251 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2252 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2253 valid_until: None,
2254 delete_at: None,
2255 network_type: NetworkType::Evm,
2256 priced_at: None,
2257 hashes: vec![],
2258 network_data: NetworkTransactionData::Evm(EvmTransactionData {
2259 gas_price: Some(1000000000),
2260 gas_limit: Some(21000),
2261 nonce: Some(1),
2262 value: U256::from_str("1000000000000000000").unwrap(),
2263 data: Some("0x".to_string()),
2264 from: "0xSender".to_string(),
2265 to: Some("0xRecipient".to_string()),
2266 chain_id: 1,
2267 signature: None,
2268 hash: Some(format!("0x{id}")),
2269 speed: Some(Speed::Fast),
2270 max_fee_per_gas: None,
2271 max_priority_fee_per_gas: None,
2272 raw: None,
2273 }),
2274 noop_count: None,
2275 is_canceled: Some(false),
2276 metadata: None,
2277 }
2278 }
2279
2280 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
2281 let mut tx = create_test_transaction(id);
2282 tx.relayer_id = relayer_id.to_string();
2283 tx
2284 }
2285
2286 fn create_test_transaction_with_status(
2287 id: &str,
2288 relayer_id: &str,
2289 status: TransactionStatus,
2290 ) -> TransactionRepoModel {
2291 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2292 tx.status = status;
2293 tx
2294 }
2295
2296 fn create_test_transaction_with_nonce(
2297 id: &str,
2298 nonce: u64,
2299 relayer_id: &str,
2300 ) -> TransactionRepoModel {
2301 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2302 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
2303 evm_data.nonce = Some(nonce);
2304 }
2305 tx
2306 }
2307
2308 async fn setup_test_repo() -> RedisTransactionRepository {
2309 let redis_url = std::env::var("REDIS_TEST_URL")
2311 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2312
2313 let cfg = Config::from_url(&redis_url);
2314 let pool = Arc::new(
2315 cfg.builder()
2316 .expect("Failed to create pool builder")
2317 .max_size(16)
2318 .runtime(Runtime::Tokio1)
2319 .build()
2320 .expect("Failed to build Redis pool"),
2321 );
2322
2323 let connections = Arc::new(RedisConnections::new_single_pool(pool));
2325
2326 let random_id = Uuid::new_v4().to_string();
2327 let key_prefix = format!("test_prefix:{random_id}");
2328
2329 RedisTransactionRepository::new(connections, key_prefix)
2330 .expect("Failed to create RedisTransactionRepository")
2331 }
2332
2333 #[tokio::test]
2334 #[ignore = "Requires active Redis instance"]
2335 async fn test_new_repository_creation() {
2336 let repo = setup_test_repo().await;
2337 assert!(repo.key_prefix.contains("test_prefix"));
2338 }
2339
2340 #[tokio::test]
2341 #[ignore = "Requires active Redis instance"]
2342 async fn test_new_repository_empty_prefix_fails() {
2343 let redis_url = std::env::var("REDIS_TEST_URL")
2344 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2345 let cfg = Config::from_url(&redis_url);
2346 let pool = Arc::new(
2347 cfg.builder()
2348 .expect("Failed to create pool builder")
2349 .max_size(16)
2350 .runtime(Runtime::Tokio1)
2351 .build()
2352 .expect("Failed to build Redis pool"),
2353 );
2354 let connections = Arc::new(RedisConnections::new_single_pool(pool));
2355
2356 let result = RedisTransactionRepository::new(connections, "".to_string());
2357 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2358 }
2359
2360 #[tokio::test]
2361 #[ignore = "Requires active Redis instance"]
2362 async fn test_key_generation() {
2363 let repo = setup_test_repo().await;
2364
2365 assert!(repo
2366 .tx_key("relayer-1", "test-id")
2367 .contains(":relayer:relayer-1:tx:test-id"));
2368 assert!(repo
2369 .tx_to_relayer_key("test-id")
2370 .contains(":relayer:tx_to_relayer:test-id"));
2371 assert!(repo.relayer_list_key().contains(":relayer_list"));
2372 assert!(repo
2373 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
2374 .contains(":relayer:relayer-1:status:Pending"));
2375 assert!(repo
2376 .relayer_nonce_key("relayer-1", 42)
2377 .contains(":relayer:relayer-1:nonce:42"));
2378 }
2379
2380 #[tokio::test]
2381 #[ignore = "Requires active Redis instance"]
2382 async fn test_serialize_deserialize_transaction() {
2383 let repo = setup_test_repo().await;
2384 let tx = create_test_transaction("test-1");
2385
2386 let serialized = repo
2387 .serialize_entity(&tx, |t| &t.id, "transaction")
2388 .expect("Serialization should succeed");
2389 let deserialized: TransactionRepoModel = repo
2390 .deserialize_entity(&serialized, "test-1", "transaction")
2391 .expect("Deserialization should succeed");
2392
2393 assert_eq!(tx.id, deserialized.id);
2394 assert_eq!(tx.relayer_id, deserialized.relayer_id);
2395 assert_eq!(tx.status, deserialized.status);
2396 }
2397
2398 #[tokio::test]
2399 #[ignore = "Requires active Redis instance"]
2400 async fn test_extract_nonce() {
2401 let repo = setup_test_repo().await;
2402 let random_id = Uuid::new_v4().to_string();
2403 let relayer_id = Uuid::new_v4().to_string();
2404 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2405
2406 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
2407 assert_eq!(nonce, Some(42));
2408 }
2409
2410 #[tokio::test]
2411 #[ignore = "Requires active Redis instance"]
2412 async fn test_create_transaction() {
2413 let repo = setup_test_repo().await;
2414 let random_id = Uuid::new_v4().to_string();
2415 let tx = create_test_transaction(&random_id);
2416
2417 let result = repo.create(tx.clone()).await.unwrap();
2418 assert_eq!(result.id, tx.id);
2419 }
2420
2421 #[tokio::test]
2422 #[ignore = "Requires active Redis instance"]
2423 async fn test_get_transaction() {
2424 let repo = setup_test_repo().await;
2425 let random_id = Uuid::new_v4().to_string();
2426 let tx = create_test_transaction(&random_id);
2427
2428 repo.create(tx.clone()).await.unwrap();
2429 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
2430 assert_eq!(stored.id, tx.id);
2431 assert_eq!(stored.relayer_id, tx.relayer_id);
2432 }
2433
2434 #[tokio::test]
2435 #[ignore = "Requires active Redis instance"]
2436 async fn test_update_transaction() {
2437 let repo = setup_test_repo().await;
2438 let random_id = Uuid::new_v4().to_string();
2439 let mut tx = create_test_transaction(&random_id);
2440
2441 repo.create(tx.clone()).await.unwrap();
2442 tx.status = TransactionStatus::Confirmed;
2443
2444 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
2445 assert!(matches!(updated.status, TransactionStatus::Confirmed));
2446 }
2447
2448 #[tokio::test]
2449 #[ignore = "Requires active Redis instance"]
2450 async fn test_delete_transaction() {
2451 let repo = setup_test_repo().await;
2452 let random_id = Uuid::new_v4().to_string();
2453 let tx = create_test_transaction(&random_id);
2454
2455 repo.create(tx).await.unwrap();
2456 repo.delete_by_id(random_id.to_string()).await.unwrap();
2457
2458 let result = repo.get_by_id(random_id.to_string()).await;
2459 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2460 }
2461
2462 #[tokio::test]
2463 #[ignore = "Requires active Redis instance"]
2464 async fn test_list_all_transactions() {
2465 let repo = setup_test_repo().await;
2466 let random_id = Uuid::new_v4().to_string();
2467 let random_id2 = Uuid::new_v4().to_string();
2468
2469 let tx1 = create_test_transaction(&random_id);
2470 let tx2 = create_test_transaction(&random_id2);
2471
2472 repo.create(tx1).await.unwrap();
2473 repo.create(tx2).await.unwrap();
2474
2475 let transactions = repo.list_all().await.unwrap();
2476 assert!(transactions.len() >= 2);
2477 }
2478
2479 #[tokio::test]
2480 #[ignore = "Requires active Redis instance"]
2481 async fn test_count_transactions() {
2482 let repo = setup_test_repo().await;
2483 let random_id = Uuid::new_v4().to_string();
2484 let tx = create_test_transaction(&random_id);
2485
2486 let count = repo.count().await.unwrap();
2487 repo.create(tx).await.unwrap();
2488 assert!(repo.count().await.unwrap() > count);
2489 }
2490
2491 #[tokio::test]
2492 #[ignore = "Requires active Redis instance"]
2493 async fn test_get_nonexistent_transaction() {
2494 let repo = setup_test_repo().await;
2495 let result = repo.get_by_id("nonexistent".to_string()).await;
2496 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2497 }
2498
2499 #[tokio::test]
2500 #[ignore = "Requires active Redis instance"]
2501 async fn test_duplicate_transaction_creation() {
2502 let repo = setup_test_repo().await;
2503 let random_id = Uuid::new_v4().to_string();
2504
2505 let tx = create_test_transaction(&random_id);
2506
2507 repo.create(tx.clone()).await.unwrap();
2508 let result = repo.create(tx).await;
2509
2510 assert!(matches!(
2511 result,
2512 Err(RepositoryError::ConstraintViolation(_))
2513 ));
2514 }
2515
2516 #[tokio::test]
2517 #[ignore = "Requires active Redis instance"]
2518 async fn test_update_nonexistent_transaction() {
2519 let repo = setup_test_repo().await;
2520 let tx = create_test_transaction("test-1");
2521
2522 let result = repo.update("nonexistent".to_string(), tx).await;
2523 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2524 }
2525
2526 #[tokio::test]
2527 #[ignore = "Requires active Redis instance"]
2528 async fn test_list_paginated() {
2529 let repo = setup_test_repo().await;
2530
2531 for _ in 1..=10 {
2533 let random_id = Uuid::new_v4().to_string();
2534 let tx = create_test_transaction(&random_id);
2535 repo.create(tx).await.unwrap();
2536 }
2537
2538 let query = PaginationQuery {
2540 page: 1,
2541 per_page: 3,
2542 };
2543 let result = repo.list_paginated(query).await.unwrap();
2544 assert_eq!(result.items.len(), 3);
2545 assert!(result.total >= 10);
2546 assert_eq!(result.page, 1);
2547 assert_eq!(result.per_page, 3);
2548
2549 let query = PaginationQuery {
2551 page: 1000,
2552 per_page: 3,
2553 };
2554 let result = repo.list_paginated(query).await.unwrap();
2555 assert_eq!(result.items.len(), 0);
2556 }
2557
2558 #[tokio::test]
2559 #[ignore = "Requires active Redis instance"]
2560 async fn test_find_by_relayer_id() {
2561 let repo = setup_test_repo().await;
2562 let random_id = Uuid::new_v4().to_string();
2563 let random_id2 = Uuid::new_v4().to_string();
2564 let random_id3 = Uuid::new_v4().to_string();
2565
2566 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2567 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2568 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2569
2570 repo.create(tx1).await.unwrap();
2571 repo.create(tx2).await.unwrap();
2572 repo.create(tx3).await.unwrap();
2573
2574 let query = PaginationQuery {
2576 page: 1,
2577 per_page: 10,
2578 };
2579 let result = repo
2580 .find_by_relayer_id("relayer-1", query.clone())
2581 .await
2582 .unwrap();
2583 assert!(result.total >= 2);
2584 assert!(result.items.len() >= 2);
2585 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2586
2587 let result = repo
2589 .find_by_relayer_id("relayer-2", query.clone())
2590 .await
2591 .unwrap();
2592 assert!(result.total >= 1);
2593 assert!(!result.items.is_empty());
2594 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2595
2596 let result = repo
2598 .find_by_relayer_id("non-existent", query.clone())
2599 .await
2600 .unwrap();
2601 assert_eq!(result.total, 0);
2602 assert_eq!(result.items.len(), 0);
2603 }
2604
2605 #[tokio::test]
2606 #[ignore = "Requires active Redis instance"]
2607 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2608 let repo = setup_test_repo().await;
2609 let relayer_id = Uuid::new_v4().to_string();
2610
2611 let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2613 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2616 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2619 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
2627 page: 1,
2628 per_page: 10,
2629 };
2630 let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2631
2632 assert_eq!(result.total, 3);
2633 assert_eq!(result.items.len(), 3);
2634
2635 assert_eq!(
2637 result.items[0].id, "test-3",
2638 "First item should be newest (test-3)"
2639 );
2640 assert_eq!(
2641 result.items[0].created_at,
2642 "2025-01-27T14:00:00.000000+00:00"
2643 );
2644
2645 assert_eq!(
2646 result.items[1].id, "test-2",
2647 "Second item should be middle (test-2)"
2648 );
2649 assert_eq!(
2650 result.items[1].created_at,
2651 "2025-01-27T12:00:00.000000+00:00"
2652 );
2653
2654 assert_eq!(
2655 result.items[2].id, "test-1",
2656 "Third item should be oldest (test-1)"
2657 );
2658 assert_eq!(
2659 result.items[2].created_at,
2660 "2025-01-27T10:00:00.000000+00:00"
2661 );
2662 }
2663
2664 #[tokio::test]
2665 #[ignore = "Requires active Redis instance"]
2666 async fn test_find_by_status() {
2667 let repo = setup_test_repo().await;
2668 let random_id = Uuid::new_v4().to_string();
2669 let random_id2 = Uuid::new_v4().to_string();
2670 let random_id3 = Uuid::new_v4().to_string();
2671 let relayer_id = Uuid::new_v4().to_string();
2672 let tx1 = create_test_transaction_with_status(
2673 &random_id,
2674 &relayer_id,
2675 TransactionStatus::Pending,
2676 );
2677 let tx2 =
2678 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2679 let tx3 = create_test_transaction_with_status(
2680 &random_id3,
2681 &relayer_id,
2682 TransactionStatus::Confirmed,
2683 );
2684
2685 repo.create(tx1).await.unwrap();
2686 repo.create(tx2).await.unwrap();
2687 repo.create(tx3).await.unwrap();
2688
2689 let result = repo
2691 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2692 .await
2693 .unwrap();
2694 assert_eq!(result.len(), 1);
2695 assert_eq!(result[0].status, TransactionStatus::Pending);
2696
2697 let result = repo
2699 .find_by_status(
2700 &relayer_id,
2701 &[TransactionStatus::Pending, TransactionStatus::Sent],
2702 )
2703 .await
2704 .unwrap();
2705 assert_eq!(result.len(), 2);
2706
2707 let result = repo
2709 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2710 .await
2711 .unwrap();
2712 assert_eq!(result.len(), 0);
2713 }
2714
2715 #[tokio::test]
2716 #[ignore = "Requires active Redis instance"]
2717 async fn test_find_by_status_paginated() {
2718 let repo = setup_test_repo().await;
2719 let relayer_id = Uuid::new_v4().to_string();
2720
2721 for i in 1..=5 {
2723 let tx_id = Uuid::new_v4().to_string();
2724 let mut tx = create_test_transaction_with_status(
2725 &tx_id,
2726 &relayer_id,
2727 TransactionStatus::Pending,
2728 );
2729 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2730 repo.create(tx).await.unwrap();
2731 }
2732
2733 for i in 6..=7 {
2735 let tx_id = Uuid::new_v4().to_string();
2736 let mut tx = create_test_transaction_with_status(
2737 &tx_id,
2738 &relayer_id,
2739 TransactionStatus::Confirmed,
2740 );
2741 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2742 repo.create(tx).await.unwrap();
2743 }
2744
2745 let query = PaginationQuery {
2747 page: 1,
2748 per_page: 2,
2749 };
2750 let result = repo
2751 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2752 .await
2753 .unwrap();
2754
2755 assert_eq!(result.total, 5);
2756 assert_eq!(result.items.len(), 2);
2757 assert_eq!(result.page, 1);
2758 assert_eq!(result.per_page, 2);
2759
2760 let query = PaginationQuery {
2762 page: 2,
2763 per_page: 2,
2764 };
2765 let result = repo
2766 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2767 .await
2768 .unwrap();
2769
2770 assert_eq!(result.total, 5);
2771 assert_eq!(result.items.len(), 2);
2772 assert_eq!(result.page, 2);
2773
2774 let query = PaginationQuery {
2776 page: 3,
2777 per_page: 2,
2778 };
2779 let result = repo
2780 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2781 .await
2782 .unwrap();
2783
2784 assert_eq!(result.total, 5);
2785 assert_eq!(result.items.len(), 1);
2786
2787 let query = PaginationQuery {
2789 page: 1,
2790 per_page: 10,
2791 };
2792 let result = repo
2793 .find_by_status_paginated(
2794 &relayer_id,
2795 &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2796 query,
2797 false,
2798 )
2799 .await
2800 .unwrap();
2801
2802 assert_eq!(result.total, 7);
2803 assert_eq!(result.items.len(), 7);
2804
2805 let query = PaginationQuery {
2807 page: 1,
2808 per_page: 10,
2809 };
2810 let result = repo
2811 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2812 .await
2813 .unwrap();
2814
2815 assert_eq!(result.total, 0);
2816 assert_eq!(result.items.len(), 0);
2817 }
2818
2819 #[tokio::test]
2820 #[ignore = "Requires active Redis instance"]
2821 async fn test_find_by_status_paginated_oldest_first() {
2822 let repo = setup_test_repo().await;
2823 let relayer_id = Uuid::new_v4().to_string();
2824
2825 for i in 1..=5 {
2827 let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2828 let mut tx = create_test_transaction(&tx_id);
2829 tx.relayer_id = relayer_id.clone();
2830 tx.status = TransactionStatus::Pending;
2831 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2832 repo.create(tx).await.unwrap();
2833 }
2834
2835 let query = PaginationQuery {
2837 page: 1,
2838 per_page: 3,
2839 };
2840 let result = repo
2841 .find_by_status_paginated(
2842 &relayer_id,
2843 &[TransactionStatus::Pending],
2844 query.clone(),
2845 true,
2846 )
2847 .await
2848 .unwrap();
2849
2850 assert_eq!(result.total, 5);
2851 assert_eq!(result.items.len(), 3);
2852 assert!(
2854 result.items[0].created_at < result.items[1].created_at,
2855 "First item should be older than second"
2856 );
2857 assert!(
2858 result.items[1].created_at < result.items[2].created_at,
2859 "Second item should be older than third"
2860 );
2861
2862 let result_newest = repo
2864 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2865 .await
2866 .unwrap();
2867
2868 assert_eq!(result_newest.items.len(), 3);
2869 assert!(
2871 result_newest.items[0].created_at > result_newest.items[1].created_at,
2872 "First item should be newer than second"
2873 );
2874 assert!(
2875 result_newest.items[1].created_at > result_newest.items[2].created_at,
2876 "Second item should be newer than third"
2877 );
2878 }
2879
2880 #[tokio::test]
2881 #[ignore = "Requires active Redis instance"]
2882 async fn test_find_by_status_paginated_oldest_first_single_item() {
2883 let repo = setup_test_repo().await;
2884 let relayer_id = Uuid::new_v4().to_string();
2885
2886 let timestamps = [
2888 "2025-01-27T08:00:00.000000+00:00", "2025-01-27T10:00:00.000000+00:00", "2025-01-27T12:00:00.000000+00:00", ];
2892
2893 let mut oldest_id = String::new();
2894 let mut newest_id = String::new();
2895
2896 for (i, timestamp) in timestamps.iter().enumerate() {
2897 let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2898 if i == 0 {
2899 oldest_id = tx_id.clone();
2900 }
2901 if i == 2 {
2902 newest_id = tx_id.clone();
2903 }
2904 let mut tx = create_test_transaction(&tx_id);
2905 tx.relayer_id = relayer_id.clone();
2906 tx.status = TransactionStatus::Pending;
2907 tx.created_at = timestamp.to_string();
2908 repo.create(tx).await.unwrap();
2909 }
2910
2911 let query = PaginationQuery {
2913 page: 1,
2914 per_page: 1,
2915 };
2916 let result = repo
2917 .find_by_status_paginated(
2918 &relayer_id,
2919 &[TransactionStatus::Pending],
2920 query.clone(),
2921 true,
2922 )
2923 .await
2924 .unwrap();
2925
2926 assert_eq!(result.total, 3);
2927 assert_eq!(result.items.len(), 1);
2928 assert_eq!(
2929 result.items[0].id, oldest_id,
2930 "With oldest_first=true and per_page=1, should return the oldest transaction"
2931 );
2932
2933 let result = repo
2935 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2936 .await
2937 .unwrap();
2938
2939 assert_eq!(result.items.len(), 1);
2940 assert_eq!(
2941 result.items[0].id, newest_id,
2942 "With oldest_first=false and per_page=1, should return the newest transaction"
2943 );
2944 }
2945
2946 #[tokio::test]
2947 #[ignore = "Requires active Redis instance"]
2948 async fn test_find_by_nonce() {
2949 let repo = setup_test_repo().await;
2950 let random_id = Uuid::new_v4().to_string();
2951 let random_id2 = Uuid::new_v4().to_string();
2952 let relayer_id = Uuid::new_v4().to_string();
2953
2954 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2955 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2956
2957 repo.create(tx1.clone()).await.unwrap();
2958 repo.create(tx2).await.unwrap();
2959
2960 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2962 assert!(result.is_some());
2963 assert_eq!(result.unwrap().id, random_id);
2964
2965 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2967 assert!(result.is_none());
2968
2969 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2971 assert!(result.is_none());
2972 }
2973
2974 #[tokio::test]
2975 #[ignore = "Requires active Redis instance"]
2976 async fn test_get_nonce_occupancy_mixed_slots() {
2977 let repo = setup_test_repo().await;
2978 let relayer_id = Uuid::new_v4().to_string();
2979
2980 let tx1 = create_test_transaction_with_nonce(&Uuid::new_v4().to_string(), 10, &relayer_id);
2982 repo.create(tx1).await.unwrap();
2983
2984 let mut tx2 =
2985 create_test_transaction_with_nonce(&Uuid::new_v4().to_string(), 11, &relayer_id);
2986 tx2.status = TransactionStatus::Failed;
2987 repo.create(tx2).await.unwrap();
2988
2989 let result = repo.get_nonce_occupancy(&relayer_id, 10, 13).await.unwrap();
2990
2991 assert_eq!(result.len(), 3);
2992 assert_eq!(result[0], (10, Some(TransactionStatus::Pending)));
2993 assert_eq!(result[1], (11, Some(TransactionStatus::Failed)));
2994 assert_eq!(result[2], (12, None));
2995 }
2996
2997 #[tokio::test]
2998 #[ignore = "Requires active Redis instance"]
2999 async fn test_get_nonce_occupancy_empty_range() {
3000 let repo = setup_test_repo().await;
3001
3002 let result = repo.get_nonce_occupancy("any-relayer", 5, 5).await.unwrap();
3003 assert!(result.is_empty());
3004
3005 let result = repo
3006 .get_nonce_occupancy("any-relayer", 10, 5)
3007 .await
3008 .unwrap();
3009 assert!(result.is_empty());
3010 }
3011
3012 #[tokio::test]
3013 #[ignore = "Requires active Redis instance"]
3014 async fn test_get_nonce_occupancy_all_empty() {
3015 let repo = setup_test_repo().await;
3016 let relayer_id = Uuid::new_v4().to_string();
3017
3018 let result = repo
3020 .get_nonce_occupancy(&relayer_id, 100, 103)
3021 .await
3022 .unwrap();
3023
3024 assert_eq!(result.len(), 3);
3025 assert!(result.iter().all(|(_, status)| status.is_none()));
3026 }
3027
3028 #[tokio::test]
3029 #[ignore = "Requires active Redis instance"]
3030 async fn test_update_status() {
3031 let repo = setup_test_repo().await;
3032 let random_id = Uuid::new_v4().to_string();
3033 let tx = create_test_transaction(&random_id);
3034
3035 repo.create(tx).await.unwrap();
3036 let updated = repo
3037 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
3038 .await
3039 .unwrap();
3040 assert_eq!(updated.status, TransactionStatus::Confirmed);
3041 }
3042
3043 #[tokio::test]
3044 #[ignore = "Requires active Redis instance"]
3045 async fn test_partial_update() {
3046 let repo = setup_test_repo().await;
3047 let random_id = Uuid::new_v4().to_string();
3048 let tx = create_test_transaction(&random_id);
3049
3050 repo.create(tx).await.unwrap();
3051
3052 let update = TransactionUpdateRequest {
3053 status: Some(TransactionStatus::Sent),
3054 status_reason: Some("Transaction sent".to_string()),
3055 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
3056 confirmed_at: None,
3057 network_data: None,
3058 hashes: None,
3059 is_canceled: None,
3060 priced_at: None,
3061 noop_count: None,
3062 delete_at: None,
3063 metadata: None,
3064 };
3065
3066 let updated = repo
3067 .partial_update(random_id.to_string(), update)
3068 .await
3069 .unwrap();
3070 assert_eq!(updated.status, TransactionStatus::Sent);
3071 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
3072 assert_eq!(
3073 updated.sent_at,
3074 Some("2025-01-27T16:00:00.000000+00:00".to_string())
3075 );
3076 }
3077
3078 #[tokio::test]
3079 #[ignore = "Requires active Redis instance"]
3080 async fn test_set_sent_at() {
3081 let repo = setup_test_repo().await;
3082 let random_id = Uuid::new_v4().to_string();
3083 let tx = create_test_transaction(&random_id);
3084
3085 repo.create(tx).await.unwrap();
3086 let updated = repo
3087 .set_sent_at(
3088 random_id.to_string(),
3089 "2025-01-27T16:00:00.000000+00:00".to_string(),
3090 )
3091 .await
3092 .unwrap();
3093 assert_eq!(
3094 updated.sent_at,
3095 Some("2025-01-27T16:00:00.000000+00:00".to_string())
3096 );
3097 }
3098
3099 #[tokio::test]
3100 #[ignore = "Requires active Redis instance"]
3101 async fn test_set_confirmed_at() {
3102 let repo = setup_test_repo().await;
3103 let random_id = Uuid::new_v4().to_string();
3104 let tx = create_test_transaction(&random_id);
3105
3106 repo.create(tx).await.unwrap();
3107 let updated = repo
3108 .set_confirmed_at(
3109 random_id.to_string(),
3110 "2025-01-27T16:00:00.000000+00:00".to_string(),
3111 )
3112 .await
3113 .unwrap();
3114 assert_eq!(
3115 updated.confirmed_at,
3116 Some("2025-01-27T16:00:00.000000+00:00".to_string())
3117 );
3118 }
3119
3120 #[tokio::test]
3121 #[ignore = "Requires active Redis instance"]
3122 async fn test_update_network_data() {
3123 let repo = setup_test_repo().await;
3124 let random_id = Uuid::new_v4().to_string();
3125 let tx = create_test_transaction(&random_id);
3126
3127 repo.create(tx).await.unwrap();
3128
3129 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
3130 gas_price: Some(2000000000),
3131 gas_limit: Some(42000),
3132 nonce: Some(2),
3133 value: U256::from_str("2000000000000000000").unwrap(),
3134 data: Some("0x1234".to_string()),
3135 from: "0xNewSender".to_string(),
3136 to: Some("0xNewRecipient".to_string()),
3137 chain_id: 1,
3138 signature: None,
3139 hash: Some("0xnewhash".to_string()),
3140 speed: Some(Speed::SafeLow),
3141 max_fee_per_gas: None,
3142 max_priority_fee_per_gas: None,
3143 raw: None,
3144 });
3145
3146 let updated = repo
3147 .update_network_data(random_id.to_string(), new_network_data.clone())
3148 .await
3149 .unwrap();
3150 assert_eq!(
3151 updated
3152 .network_data
3153 .get_evm_transaction_data()
3154 .unwrap()
3155 .hash,
3156 new_network_data.get_evm_transaction_data().unwrap().hash
3157 );
3158 }
3159
3160 #[tokio::test]
3161 #[ignore = "Requires active Redis instance"]
3162 async fn test_debug_implementation() {
3163 let repo = setup_test_repo().await;
3164 let debug_str = format!("{repo:?}");
3165 assert!(debug_str.contains("RedisTransactionRepository"));
3166 assert!(debug_str.contains("test_prefix"));
3167 }
3168
3169 #[tokio::test]
3170 #[ignore = "Requires active Redis instance"]
3171 async fn test_error_handling_empty_id() {
3172 let repo = setup_test_repo().await;
3173
3174 let result = repo.get_by_id("".to_string()).await;
3175 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3176
3177 let result = repo
3178 .update("".to_string(), create_test_transaction("test"))
3179 .await;
3180 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3181
3182 let result = repo.delete_by_id("".to_string()).await;
3183 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3184 }
3185
3186 #[tokio::test]
3187 #[ignore = "Requires active Redis instance"]
3188 async fn test_pagination_validation() {
3189 let repo = setup_test_repo().await;
3190
3191 let query = PaginationQuery {
3192 page: 1,
3193 per_page: 0,
3194 };
3195 let result = repo.list_paginated(query).await;
3196 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3197 }
3198
3199 #[tokio::test]
3200 #[ignore = "Requires active Redis instance"]
3201 async fn test_index_consistency() {
3202 let repo = setup_test_repo().await;
3203 let random_id = Uuid::new_v4().to_string();
3204 let relayer_id = Uuid::new_v4().to_string();
3205 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
3206
3207 repo.create(tx.clone()).await.unwrap();
3209
3210 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3212 assert!(found.is_some());
3213
3214 let mut updated_tx = tx.clone();
3216 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
3217 evm_data.nonce = Some(43);
3218 }
3219
3220 repo.update(random_id.to_string(), updated_tx)
3221 .await
3222 .unwrap();
3223
3224 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3226 assert!(old_nonce_result.is_none());
3227
3228 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
3230 assert!(new_nonce_result.is_some());
3231 }
3232
3233 #[tokio::test]
3234 #[ignore = "Requires active Redis instance"]
3235 async fn test_has_entries() {
3236 let repo = setup_test_repo().await;
3237 assert!(!repo.has_entries().await.unwrap());
3238
3239 let tx_id = uuid::Uuid::new_v4().to_string();
3240 let tx = create_test_transaction(&tx_id);
3241 repo.create(tx.clone()).await.unwrap();
3242
3243 assert!(repo.has_entries().await.unwrap());
3244 }
3245
3246 #[tokio::test]
3247 #[ignore = "Requires active Redis instance"]
3248 async fn test_drop_all_entries() {
3249 let repo = setup_test_repo().await;
3250 let tx_id = uuid::Uuid::new_v4().to_string();
3251 let tx = create_test_transaction(&tx_id);
3252 repo.create(tx.clone()).await.unwrap();
3253 assert!(repo.has_entries().await.unwrap());
3254
3255 repo.drop_all_entries().await.unwrap();
3256 assert!(!repo.has_entries().await.unwrap());
3257 }
3258
3259 #[tokio::test]
3261 #[ignore = "Requires active Redis instance"]
3262 async fn test_update_status_sets_delete_at_for_final_statuses() {
3263 let _lock = ENV_MUTEX.lock().await;
3264
3265 use chrono::{DateTime, Duration, Utc};
3266 use std::env;
3267
3268 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
3270
3271 let repo = setup_test_repo().await;
3272
3273 let final_statuses = [
3274 TransactionStatus::Canceled,
3275 TransactionStatus::Confirmed,
3276 TransactionStatus::Failed,
3277 TransactionStatus::Expired,
3278 ];
3279
3280 for (i, status) in final_statuses.iter().enumerate() {
3281 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
3282 let mut tx = create_test_transaction(&tx_id);
3283
3284 tx.delete_at = None;
3286 tx.status = TransactionStatus::Pending;
3287
3288 repo.create(tx).await.unwrap();
3289
3290 let before_update = Utc::now();
3291
3292 let updated = repo
3294 .update_status(tx_id.clone(), status.clone())
3295 .await
3296 .unwrap();
3297
3298 assert!(
3300 updated.delete_at.is_some(),
3301 "delete_at should be set for status: {status:?}"
3302 );
3303
3304 let delete_at_str = updated.delete_at.unwrap();
3306 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3307 .expect("delete_at should be valid RFC3339")
3308 .with_timezone(&Utc);
3309
3310 let duration_from_before = delete_at.signed_duration_since(before_update);
3311 let expected_duration = Duration::hours(6);
3312 let tolerance = Duration::minutes(5);
3313
3314 assert!(
3315 duration_from_before >= expected_duration - tolerance
3316 && duration_from_before <= expected_duration + tolerance,
3317 "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
3318 );
3319 }
3320
3321 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3323 }
3324
3325 #[tokio::test]
3326 #[ignore = "Requires active Redis instance"]
3327 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
3328 let _lock = ENV_MUTEX.lock().await;
3329
3330 use std::env;
3331
3332 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
3333
3334 let repo = setup_test_repo().await;
3335
3336 let non_final_statuses = [
3337 TransactionStatus::Pending,
3338 TransactionStatus::Sent,
3339 TransactionStatus::Submitted,
3340 TransactionStatus::Mined,
3341 ];
3342
3343 for (i, status) in non_final_statuses.iter().enumerate() {
3344 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
3345 let mut tx = create_test_transaction(&tx_id);
3346 tx.delete_at = None;
3347 tx.status = TransactionStatus::Pending;
3348
3349 repo.create(tx).await.unwrap();
3350
3351 let updated = repo
3353 .update_status(tx_id.clone(), status.clone())
3354 .await
3355 .unwrap();
3356
3357 assert!(
3359 updated.delete_at.is_none(),
3360 "delete_at should NOT be set for status: {status:?}"
3361 );
3362 }
3363
3364 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3366 }
3367
3368 #[tokio::test]
3369 #[ignore = "Requires active Redis instance"]
3370 async fn test_partial_update_sets_delete_at_for_final_statuses() {
3371 let _lock = ENV_MUTEX.lock().await;
3372
3373 use chrono::{DateTime, Duration, Utc};
3374 use std::env;
3375
3376 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
3377
3378 let repo = setup_test_repo().await;
3379 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
3380 let mut tx = create_test_transaction(&tx_id);
3381 tx.delete_at = None;
3382 tx.status = TransactionStatus::Pending;
3383
3384 repo.create(tx).await.unwrap();
3385
3386 let before_update = Utc::now();
3387
3388 let update = TransactionUpdateRequest {
3390 status: Some(TransactionStatus::Confirmed),
3391 status_reason: Some("Transaction completed".to_string()),
3392 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
3393 ..Default::default()
3394 };
3395
3396 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
3397
3398 assert!(
3400 updated.delete_at.is_some(),
3401 "delete_at should be set when updating to Confirmed status"
3402 );
3403
3404 let delete_at_str = updated.delete_at.unwrap();
3406 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3407 .expect("delete_at should be valid RFC3339")
3408 .with_timezone(&Utc);
3409
3410 let duration_from_before = delete_at.signed_duration_since(before_update);
3411 let expected_duration = Duration::hours(8);
3412 let tolerance = Duration::minutes(5);
3413
3414 assert!(
3415 duration_from_before >= expected_duration - tolerance
3416 && duration_from_before <= expected_duration + tolerance,
3417 "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
3418 );
3419
3420 assert_eq!(updated.status, TransactionStatus::Confirmed);
3422 assert_eq!(
3423 updated.status_reason,
3424 Some("Transaction completed".to_string())
3425 );
3426 assert_eq!(
3427 updated.confirmed_at,
3428 Some("2023-01-01T12:05:00Z".to_string())
3429 );
3430
3431 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3433 }
3434
3435 #[tokio::test]
3436 #[ignore = "Requires active Redis instance"]
3437 async fn test_update_status_preserves_existing_delete_at() {
3438 let _lock = ENV_MUTEX.lock().await;
3439
3440 use std::env;
3441
3442 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3443
3444 let repo = setup_test_repo().await;
3445 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3446 let mut tx = create_test_transaction(&tx_id);
3447
3448 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3450 tx.delete_at = Some(existing_delete_at.clone());
3451 tx.status = TransactionStatus::Pending;
3452
3453 repo.create(tx).await.unwrap();
3454
3455 let updated = repo
3457 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3458 .await
3459 .unwrap();
3460
3461 assert_eq!(
3463 updated.delete_at,
3464 Some(existing_delete_at),
3465 "Existing delete_at should be preserved when updating to final status"
3466 );
3467
3468 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3470 }
3471 #[tokio::test]
3472 #[ignore = "Requires active Redis instance"]
3473 async fn test_partial_update_without_status_change_preserves_delete_at() {
3474 let _lock = ENV_MUTEX.lock().await;
3475
3476 use std::env;
3477
3478 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3479
3480 let repo = setup_test_repo().await;
3481 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3482 let mut tx = create_test_transaction(&tx_id);
3483 tx.delete_at = None;
3484 tx.status = TransactionStatus::Pending;
3485
3486 repo.create(tx).await.unwrap();
3487
3488 let updated1 = repo
3490 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3491 .await
3492 .unwrap();
3493
3494 assert!(updated1.delete_at.is_some());
3495 let original_delete_at = updated1.delete_at.clone();
3496
3497 let update = TransactionUpdateRequest {
3499 status: None, status_reason: Some("Updated reason".to_string()),
3501 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3502 ..Default::default()
3503 };
3504
3505 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3506
3507 assert_eq!(
3509 updated2.delete_at, original_delete_at,
3510 "delete_at should be preserved when status is not updated"
3511 );
3512
3513 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3516 assert_eq!(
3517 updated2.confirmed_at,
3518 Some("2023-01-01T12:10:00Z".to_string())
3519 );
3520
3521 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3523 }
3524
3525 #[tokio::test]
3528 #[ignore = "Requires active Redis instance"]
3529 async fn test_delete_by_ids_empty_list() {
3530 let repo = setup_test_repo().await;
3531 let tx_id = format!("test-empty-{}", Uuid::new_v4());
3532
3533 let tx = create_test_transaction(&tx_id);
3535 repo.create(tx).await.unwrap();
3536
3537 let result = repo.delete_by_ids(vec![]).await.unwrap();
3539
3540 assert_eq!(result.deleted_count, 0);
3541 assert!(result.failed.is_empty());
3542
3543 assert!(repo.get_by_id(tx_id).await.is_ok());
3545 }
3546
3547 #[tokio::test]
3548 #[ignore = "Requires active Redis instance"]
3549 async fn test_delete_by_ids_single_transaction() {
3550 let repo = setup_test_repo().await;
3551 let tx_id = format!("test-single-{}", Uuid::new_v4());
3552
3553 let tx = create_test_transaction(&tx_id);
3554 repo.create(tx).await.unwrap();
3555
3556 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3557
3558 assert_eq!(result.deleted_count, 1);
3559 assert!(result.failed.is_empty());
3560
3561 assert!(repo.get_by_id(tx_id).await.is_err());
3563 }
3564
3565 #[tokio::test]
3566 #[ignore = "Requires active Redis instance"]
3567 async fn test_delete_by_ids_multiple_transactions() {
3568 let repo = setup_test_repo().await;
3569 let base_id = Uuid::new_v4();
3570
3571 let mut created_ids = Vec::new();
3573 for i in 1..=5 {
3574 let tx_id = format!("test-multi-{base_id}-{i}");
3575 let tx = create_test_transaction(&tx_id);
3576 repo.create(tx).await.unwrap();
3577 created_ids.push(tx_id);
3578 }
3579
3580 let ids_to_delete = vec![
3582 created_ids[0].clone(),
3583 created_ids[2].clone(),
3584 created_ids[4].clone(),
3585 ];
3586 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3587
3588 assert_eq!(result.deleted_count, 3);
3589 assert!(result.failed.is_empty());
3590
3591 assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3593 assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3595 assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3597 }
3598
3599 #[tokio::test]
3600 #[ignore = "Requires active Redis instance"]
3601 async fn test_delete_by_ids_nonexistent_transactions() {
3602 let repo = setup_test_repo().await;
3603 let base_id = Uuid::new_v4();
3604
3605 let ids_to_delete = vec![
3607 format!("nonexistent-{}-1", base_id),
3608 format!("nonexistent-{}-2", base_id),
3609 ];
3610 let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3611
3612 assert_eq!(result.deleted_count, 0);
3613 assert_eq!(result.failed.len(), 2);
3614
3615 let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3617 assert!(failed_ids.contains(&&ids_to_delete[0]));
3618 assert!(failed_ids.contains(&&ids_to_delete[1]));
3619 }
3620
3621 #[tokio::test]
3622 #[ignore = "Requires active Redis instance"]
3623 async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3624 let repo = setup_test_repo().await;
3625 let base_id = Uuid::new_v4();
3626
3627 let existing_ids: Vec<String> = (1..=3)
3629 .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3630 .collect();
3631
3632 for id in &existing_ids {
3633 let tx = create_test_transaction(id);
3634 repo.create(tx).await.unwrap();
3635 }
3636
3637 let nonexistent_ids: Vec<String> = (1..=2)
3638 .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3639 .collect();
3640
3641 let ids_to_delete = vec![
3643 existing_ids[0].clone(),
3644 nonexistent_ids[0].clone(),
3645 existing_ids[1].clone(),
3646 nonexistent_ids[1].clone(),
3647 ];
3648 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3649
3650 assert_eq!(result.deleted_count, 2);
3651 assert_eq!(result.failed.len(), 2);
3652
3653 assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3655 assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3656
3657 assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3659 }
3660
3661 #[tokio::test]
3662 #[ignore = "Requires active Redis instance"]
3663 async fn test_delete_by_ids_removes_all_indexes() {
3664 let repo = setup_test_repo().await;
3665 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3666 let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3667
3668 let mut tx = create_test_transaction(&tx_id);
3670 tx.relayer_id = relayer_id.clone();
3671 tx.status = TransactionStatus::Confirmed;
3672 repo.create(tx).await.unwrap();
3673
3674 let found = repo
3676 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3677 .await
3678 .unwrap();
3679 assert!(found.iter().any(|t| t.id == tx_id));
3680
3681 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3683 assert_eq!(result.deleted_count, 1);
3684
3685 let found_after = repo
3687 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3688 .await
3689 .unwrap();
3690 assert!(!found_after.iter().any(|t| t.id == tx_id));
3691
3692 assert!(repo.get_by_id(tx_id).await.is_err());
3694 }
3695
3696 #[tokio::test]
3697 #[ignore = "Requires active Redis instance"]
3698 async fn test_delete_by_ids_removes_nonce_index() {
3699 let repo = setup_test_repo().await;
3700 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3701 let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3702 let nonce = 12345u64;
3703
3704 let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3706 repo.create(tx).await.unwrap();
3707
3708 let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3710 assert!(found.is_some());
3711 assert_eq!(found.unwrap().id, tx_id);
3712
3713 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3715 assert_eq!(result.deleted_count, 1);
3716
3717 let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3719 assert!(found_after.is_none());
3720 }
3721
3722 #[tokio::test]
3723 #[ignore = "Requires active Redis instance"]
3724 async fn test_delete_by_ids_large_batch() {
3725 let repo = setup_test_repo().await;
3726 let base_id = Uuid::new_v4();
3727
3728 let count = 50;
3730 let mut created_ids = Vec::new();
3731
3732 for i in 0..count {
3733 let tx_id = format!("test-large-{base_id}-{i}");
3734 let tx = create_test_transaction(&tx_id);
3735 repo.create(tx).await.unwrap();
3736 created_ids.push(tx_id);
3737 }
3738
3739 let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3741
3742 assert_eq!(result.deleted_count, count);
3743 assert!(result.failed.is_empty());
3744
3745 for id in created_ids {
3747 assert!(repo.get_by_id(id).await.is_err());
3748 }
3749 }
3750
3751 #[tokio::test]
3752 #[ignore = "Requires active Redis instance"]
3753 async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3754 let repo = setup_test_repo().await;
3755 let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3756 let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3757 let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3758 let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3759
3760 let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3762 let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3763
3764 repo.create(tx1).await.unwrap();
3765 repo.create(tx2).await.unwrap();
3766
3767 let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3769
3770 assert_eq!(result.deleted_count, 1);
3771
3772 assert!(repo.get_by_id(tx_id_1).await.is_err());
3774
3775 let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3777 assert_eq!(remaining.relayer_id, relayer_2);
3778 }
3779
3780 #[tokio::test]
3783 #[ignore = "Requires active Redis instance"]
3784 async fn test_increment_status_check_failures_no_prior_metadata() {
3785 let _lock = ENV_MUTEX.lock().await;
3786 let repo = setup_test_repo().await;
3787 let relayer_id = Uuid::new_v4().to_string();
3788 let tx_id = Uuid::new_v4().to_string();
3789 let mut tx =
3790 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3791 tx.metadata = None;
3792 repo.create(tx).await.unwrap();
3793
3794 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3795
3796 let meta = updated.metadata.expect("metadata should be set");
3797 assert_eq!(meta.consecutive_failures, 1);
3798 assert_eq!(meta.total_failures, 1);
3799 assert_eq!(meta.insufficient_fee_retries, 0);
3800 }
3801
3802 #[tokio::test]
3803 #[ignore = "Requires active Redis instance"]
3804 async fn test_increment_status_check_failures_accumulates() {
3805 let _lock = ENV_MUTEX.lock().await;
3806 let repo = setup_test_repo().await;
3807 let relayer_id = Uuid::new_v4().to_string();
3808 let tx_id = Uuid::new_v4().to_string();
3809 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3810 repo.create(tx).await.unwrap();
3811
3812 repo.increment_status_check_failures(tx_id.clone())
3813 .await
3814 .unwrap();
3815 repo.increment_status_check_failures(tx_id.clone())
3816 .await
3817 .unwrap();
3818 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3819
3820 let meta = updated.metadata.unwrap();
3821 assert_eq!(meta.consecutive_failures, 3);
3822 assert_eq!(meta.total_failures, 3);
3823 }
3824
3825 #[tokio::test]
3826 #[ignore = "Requires active Redis instance"]
3827 async fn test_increment_status_check_failures_noop_on_final_state() {
3828 let _lock = ENV_MUTEX.lock().await;
3829 let repo = setup_test_repo().await;
3830 let relayer_id = Uuid::new_v4().to_string();
3831 let tx_id = Uuid::new_v4().to_string();
3832 let tx =
3833 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3834 repo.create(tx).await.unwrap();
3835
3836 let result = repo.increment_status_check_failures(tx_id).await.unwrap();
3837
3838 assert!(result.metadata.is_none());
3840 assert_eq!(result.status, TransactionStatus::Confirmed);
3841 }
3842
3843 #[tokio::test]
3844 #[ignore = "Requires active Redis instance"]
3845 async fn test_increment_status_check_failures_not_found() {
3846 let _lock = ENV_MUTEX.lock().await;
3847 let repo = setup_test_repo().await;
3848
3849 let result = repo
3850 .increment_status_check_failures("nonexistent".to_string())
3851 .await;
3852
3853 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3854 }
3855
3856 #[tokio::test]
3859 #[ignore = "Requires active Redis instance"]
3860 async fn test_reset_consecutive_failures() {
3861 let _lock = ENV_MUTEX.lock().await;
3862 let repo = setup_test_repo().await;
3863 let relayer_id = Uuid::new_v4().to_string();
3864 let tx_id = Uuid::new_v4().to_string();
3865 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3866 repo.create(tx).await.unwrap();
3867
3868 repo.increment_status_check_failures(tx_id.clone())
3870 .await
3871 .unwrap();
3872 repo.increment_status_check_failures(tx_id.clone())
3873 .await
3874 .unwrap();
3875
3876 let updated = repo
3877 .reset_status_check_consecutive_failures(tx_id)
3878 .await
3879 .unwrap();
3880
3881 let meta = updated.metadata.unwrap();
3882 assert_eq!(meta.consecutive_failures, 0);
3883 assert_eq!(meta.total_failures, 2);
3885 }
3886
3887 #[tokio::test]
3888 #[ignore = "Requires active Redis instance"]
3889 async fn test_reset_consecutive_failures_noop_on_final_state() {
3890 let _lock = ENV_MUTEX.lock().await;
3891 let repo = setup_test_repo().await;
3892 let relayer_id = Uuid::new_v4().to_string();
3893 let tx_id = Uuid::new_v4().to_string();
3894 let mut tx =
3895 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed);
3896 tx.metadata = Some(crate::models::TransactionMetadata {
3897 consecutive_failures: 5,
3898 total_failures: 10,
3899 insufficient_fee_retries: 0,
3900 try_again_later_retries: 0,
3901 nonce_too_high_retries: 0,
3902 });
3903 repo.create(tx).await.unwrap();
3904
3905 let result = repo
3906 .reset_status_check_consecutive_failures(tx_id)
3907 .await
3908 .unwrap();
3909
3910 let meta = result.metadata.unwrap();
3912 assert_eq!(meta.consecutive_failures, 5);
3913 }
3914
3915 #[tokio::test]
3916 #[ignore = "Requires active Redis instance"]
3917 async fn test_reset_consecutive_failures_not_found() {
3918 let _lock = ENV_MUTEX.lock().await;
3919 let repo = setup_test_repo().await;
3920
3921 let result = repo
3922 .reset_status_check_consecutive_failures("nonexistent".to_string())
3923 .await;
3924
3925 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3926 }
3927
3928 #[tokio::test]
3931 #[ignore = "Requires active Redis instance"]
3932 async fn test_record_insufficient_fee_retry() {
3933 let _lock = ENV_MUTEX.lock().await;
3934 let repo = setup_test_repo().await;
3935 let relayer_id = Uuid::new_v4().to_string();
3936 let tx_id = Uuid::new_v4().to_string();
3937 let mut tx =
3938 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3939 tx.sent_at = None;
3940 repo.create(tx).await.unwrap();
3941
3942 let updated = repo
3943 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3944 .await
3945 .unwrap();
3946
3947 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3948 let meta = updated.metadata.unwrap();
3949 assert_eq!(meta.insufficient_fee_retries, 1);
3950 assert_eq!(meta.consecutive_failures, 0);
3951 assert_eq!(meta.total_failures, 0);
3952 }
3953
3954 #[tokio::test]
3955 #[ignore = "Requires active Redis instance"]
3956 async fn test_record_insufficient_fee_retry_accumulates() {
3957 let _lock = ENV_MUTEX.lock().await;
3958 let repo = setup_test_repo().await;
3959 let relayer_id = Uuid::new_v4().to_string();
3960 let tx_id = Uuid::new_v4().to_string();
3961 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3962 repo.create(tx).await.unwrap();
3963
3964 repo.record_stellar_insufficient_fee_retry(
3965 tx_id.clone(),
3966 "2025-03-18T10:00:00Z".to_string(),
3967 )
3968 .await
3969 .unwrap();
3970
3971 let updated = repo
3972 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3973 .await
3974 .unwrap();
3975
3976 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3977 let meta = updated.metadata.unwrap();
3978 assert_eq!(meta.insufficient_fee_retries, 2);
3979 }
3980
3981 #[tokio::test]
3982 #[ignore = "Requires active Redis instance"]
3983 async fn test_record_insufficient_fee_retry_noop_on_final_state() {
3984 let _lock = ENV_MUTEX.lock().await;
3985 let repo = setup_test_repo().await;
3986 let relayer_id = Uuid::new_v4().to_string();
3987 let tx_id = Uuid::new_v4().to_string();
3988 let mut tx =
3989 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3990 tx.sent_at = Some("old-time".to_string());
3991 repo.create(tx).await.unwrap();
3992
3993 let result = repo
3994 .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string())
3995 .await
3996 .unwrap();
3997
3998 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
4000 assert!(result.metadata.is_none());
4001 }
4002
4003 #[tokio::test]
4004 #[ignore = "Requires active Redis instance"]
4005 async fn test_record_insufficient_fee_retry_not_found() {
4006 let _lock = ENV_MUTEX.lock().await;
4007 let repo = setup_test_repo().await;
4008
4009 let result = repo
4010 .record_stellar_insufficient_fee_retry(
4011 "nonexistent".to_string(),
4012 "2025-03-18T10:00:00Z".to_string(),
4013 )
4014 .await;
4015
4016 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
4017 }
4018
4019 #[tokio::test]
4022 #[ignore = "Requires active Redis instance"]
4023 async fn test_record_try_again_later_retry() {
4024 let _lock = ENV_MUTEX.lock().await;
4025 let repo = setup_test_repo().await;
4026 let relayer_id = Uuid::new_v4().to_string();
4027 let tx_id = Uuid::new_v4().to_string();
4028 let mut tx =
4029 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4030 tx.sent_at = None;
4031 repo.create(tx).await.unwrap();
4032
4033 let updated = repo
4034 .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
4035 .await
4036 .unwrap();
4037
4038 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
4039 let meta = updated.metadata.unwrap();
4040 assert_eq!(meta.try_again_later_retries, 1);
4041 assert_eq!(meta.consecutive_failures, 0);
4042 assert_eq!(meta.total_failures, 0);
4043 }
4044
4045 #[tokio::test]
4046 #[ignore = "Requires active Redis instance"]
4047 async fn test_record_try_again_later_retry_accumulates() {
4048 let _lock = ENV_MUTEX.lock().await;
4049 let repo = setup_test_repo().await;
4050 let relayer_id = Uuid::new_v4().to_string();
4051 let tx_id = Uuid::new_v4().to_string();
4052 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4053 repo.create(tx).await.unwrap();
4054
4055 repo.record_stellar_try_again_later_retry(
4056 tx_id.clone(),
4057 "2025-03-18T10:00:00Z".to_string(),
4058 )
4059 .await
4060 .unwrap();
4061
4062 let updated = repo
4063 .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
4064 .await
4065 .unwrap();
4066
4067 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
4068 let meta = updated.metadata.unwrap();
4069 assert_eq!(meta.try_again_later_retries, 2);
4070 }
4071
4072 #[tokio::test]
4073 #[ignore = "Requires active Redis instance"]
4074 async fn test_record_try_again_later_retry_noop_on_final_state() {
4075 let _lock = ENV_MUTEX.lock().await;
4076 let repo = setup_test_repo().await;
4077 let relayer_id = Uuid::new_v4().to_string();
4078 let tx_id = Uuid::new_v4().to_string();
4079 let mut tx =
4080 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
4081 tx.sent_at = Some("old-time".to_string());
4082 repo.create(tx).await.unwrap();
4083
4084 let result = repo
4085 .record_stellar_try_again_later_retry(tx_id, "new-time".to_string())
4086 .await
4087 .unwrap();
4088
4089 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
4091 assert!(result.metadata.is_none());
4092 }
4093
4094 #[tokio::test]
4095 #[ignore = "Requires active Redis instance"]
4096 async fn test_record_try_again_later_retry_not_found() {
4097 let _lock = ENV_MUTEX.lock().await;
4098 let repo = setup_test_repo().await;
4099
4100 let result = repo
4101 .record_stellar_try_again_later_retry(
4102 "nonexistent".to_string(),
4103 "2025-03-18T10:00:00Z".to_string(),
4104 )
4105 .await;
4106
4107 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
4108 }
4109
4110 #[tokio::test]
4113 #[ignore = "Requires active Redis instance"]
4114 async fn test_increment_failures_preserves_try_again_later_retries() {
4115 let _lock = ENV_MUTEX.lock().await;
4116 let repo = setup_test_repo().await;
4117 let relayer_id = Uuid::new_v4().to_string();
4118 let tx_id = Uuid::new_v4().to_string();
4119 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4120 repo.create(tx).await.unwrap();
4121
4122 repo.record_stellar_try_again_later_retry(
4124 tx_id.clone(),
4125 "2025-03-18T10:00:00Z".to_string(),
4126 )
4127 .await
4128 .unwrap();
4129
4130 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4132
4133 let meta = updated.metadata.unwrap();
4134 assert_eq!(
4135 meta.try_again_later_retries, 1,
4136 "try_again_later_retries must survive increment_status_check_failures"
4137 );
4138 assert_eq!(meta.consecutive_failures, 1);
4139 assert_eq!(meta.total_failures, 1);
4140 }
4141
4142 #[tokio::test]
4143 #[ignore = "Requires active Redis instance"]
4144 async fn test_increment_failures_preserves_insufficient_fee_retries() {
4145 let _lock = ENV_MUTEX.lock().await;
4146 let repo = setup_test_repo().await;
4147 let relayer_id = Uuid::new_v4().to_string();
4148 let tx_id = Uuid::new_v4().to_string();
4149 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4150 repo.create(tx).await.unwrap();
4151
4152 repo.record_stellar_insufficient_fee_retry(
4154 tx_id.clone(),
4155 "2025-03-18T10:00:00Z".to_string(),
4156 )
4157 .await
4158 .unwrap();
4159
4160 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4162
4163 let meta = updated.metadata.unwrap();
4164 assert_eq!(
4165 meta.insufficient_fee_retries, 1,
4166 "insufficient_fee_retries must survive increment_status_check_failures"
4167 );
4168 assert_eq!(meta.consecutive_failures, 1);
4169 }
4170
4171 #[tokio::test]
4172 #[ignore = "Requires active Redis instance"]
4173 async fn test_reset_failures_preserves_retry_counters() {
4174 let _lock = ENV_MUTEX.lock().await;
4175 let repo = setup_test_repo().await;
4176 let relayer_id = Uuid::new_v4().to_string();
4177 let tx_id = Uuid::new_v4().to_string();
4178 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4179 repo.create(tx).await.unwrap();
4180
4181 repo.record_stellar_try_again_later_retry(
4183 tx_id.clone(),
4184 "2025-03-18T10:00:00Z".to_string(),
4185 )
4186 .await
4187 .unwrap();
4188 repo.record_stellar_insufficient_fee_retry(
4189 tx_id.clone(),
4190 "2025-03-18T10:01:00Z".to_string(),
4191 )
4192 .await
4193 .unwrap();
4194
4195 repo.increment_status_check_failures(tx_id.clone())
4197 .await
4198 .unwrap();
4199 let updated = repo
4200 .reset_status_check_consecutive_failures(tx_id)
4201 .await
4202 .unwrap();
4203
4204 let meta = updated.metadata.unwrap();
4205 assert_eq!(meta.consecutive_failures, 0);
4206 assert_eq!(meta.total_failures, 1);
4207 assert_eq!(
4208 meta.try_again_later_retries, 1,
4209 "try_again_later_retries must survive reset"
4210 );
4211 assert_eq!(
4212 meta.insufficient_fee_retries, 1,
4213 "insufficient_fee_retries must survive reset"
4214 );
4215 }
4216
4217 #[tokio::test]
4218 #[ignore = "Requires active Redis instance"]
4219 async fn test_fee_and_try_again_later_retries_independent() {
4220 let _lock = ENV_MUTEX.lock().await;
4221 let repo = setup_test_repo().await;
4222 let relayer_id = Uuid::new_v4().to_string();
4223 let tx_id = Uuid::new_v4().to_string();
4224 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4225 repo.create(tx).await.unwrap();
4226
4227 repo.record_stellar_try_again_later_retry(
4229 tx_id.clone(),
4230 "2025-03-18T10:00:00Z".to_string(),
4231 )
4232 .await
4233 .unwrap();
4234 repo.record_stellar_try_again_later_retry(
4235 tx_id.clone(),
4236 "2025-03-18T10:01:00Z".to_string(),
4237 )
4238 .await
4239 .unwrap();
4240
4241 let updated = repo
4243 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string())
4244 .await
4245 .unwrap();
4246
4247 let meta = updated.metadata.unwrap();
4248 assert_eq!(
4249 meta.try_again_later_retries, 2,
4250 "try_again_later_retries must survive insufficient_fee_retry"
4251 );
4252 assert_eq!(meta.insufficient_fee_retries, 1);
4253 }
4254}