openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::config::ServerConfig;
4use crate::constants::FINAL_TRANSACTION_STATUSES;
5use crate::domain::transaction::common::is_final_state;
6use crate::metrics::{
7    TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
8    TRANSACTIONS_INSUFFICIENT_FEE_FAILED, TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS,
9    TRANSACTIONS_SUBMITTED, TRANSACTIONS_SUCCESS, TRANSACTIONS_TRY_AGAIN_LATER_FAILED,
10    TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS, TRANSACTION_PROCESSING_TIME,
11};
12use crate::models::{
13    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
14    TransactionStatus, TransactionUpdateRequest,
15};
16use crate::repositories::redis_base::RedisRepository;
17use crate::repositories::{
18    BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
19    TransactionRepository,
20};
21use crate::utils::RedisConnections;
22use async_trait::async_trait;
23use chrono::Utc;
24use redis::{AsyncCommands, Script};
25use std::fmt;
26use std::sync::Arc;
27use tracing::{debug, error, warn};
28
29const RELAYER_PREFIX: &str = "relayer";
30const TX_PREFIX: &str = "tx";
31const STATUS_PREFIX: &str = "status";
32const STATUS_SORTED_PREFIX: &str = "status_sorted";
33const NONCE_PREFIX: &str = "nonce";
34const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
35const RELAYER_LIST_KEY: &str = "relayer_list";
36const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
37
38#[derive(Clone)]
39pub struct RedisTransactionRepository {
40    pub connections: Arc<RedisConnections>,
41    pub key_prefix: String,
42}
43
44impl RedisRepository for RedisTransactionRepository {}
45
46impl RedisTransactionRepository {
47    pub fn new(
48        connections: Arc<RedisConnections>,
49        key_prefix: String,
50    ) -> Result<Self, RepositoryError> {
51        if key_prefix.is_empty() {
52            return Err(RepositoryError::InvalidData(
53                "Redis key prefix cannot be empty".to_string(),
54            ));
55        }
56
57        Ok(Self {
58            connections,
59            key_prefix,
60        })
61    }
62
63    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
64    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
65        format!(
66            "{}:{}:{}:{}:{}",
67            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
68        )
69    }
70
71    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
72    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
73        format!(
74            "{}:{}:{}:{}",
75            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
76        )
77    }
78
79    /// Generate key for relayer status index (legacy SET): relayer:{relayer_id}:status:{status}
80    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
81        format!(
82            "{}:{}:{}:{}:{}",
83            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
84        )
85    }
86
87    /// Generate key for relayer status sorted index (SORTED SET): relayer:{relayer_id}:status_sorted:{status}
88    /// Score is created_at timestamp in milliseconds for efficient ordering.
89    fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
90        format!(
91            "{}:{}:{}:{}:{}",
92            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
93        )
94    }
95
96    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
97    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
98        format!(
99            "{}:{}:{}:{}:{}",
100            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
101        )
102    }
103
104    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
105    fn relayer_list_key(&self) -> String {
106        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
107    }
108
109    /// Generate key for relayer's sorted set by created_at: relayer:{relayer_id}:tx_by_created_at
110    fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
111        format!(
112            "{}:{}:{}:{}",
113            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
114        )
115    }
116
117    /// Returns the components needed for Lua scripts to resolve a tx key from
118    /// only the tx_id: (tx_to_relayer lookup key, key prefix, key suffix).
119    /// The Lua script does: `GET KEYS[1]` to get the relayer_id, then
120    /// constructs the tx key as `ARGV[1] .. relayer_id .. ARGV[2]`.
121    fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
122        let lookup_key = self.tx_to_relayer_key(tx_id);
123        let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
124        let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
125        (lookup_key, key_prefix, key_suffix)
126    }
127
128    /// Executes an atomic Lua script with retry/backoff for transient Redis failures.
129    ///
130    /// Every script receives `KEYS[1]` = tx_to_relayer lookup key and
131    /// `ARGV[1..2]` = key prefix/suffix. `extra_args` are appended as `ARGV[3..]`.
132    /// The script must return the (possibly updated) JSON string or `false` for
133    /// not-found.
134    async fn run_atomic_script(
135        &self,
136        lua: &str,
137        tx_id: &str,
138        extra_args: &[&str],
139        op_name: &str,
140    ) -> Result<TransactionRepoModel, RepositoryError> {
141        const MAX_RETRIES: u32 = 3;
142        const BASE_BACKOFF_MS: u64 = 100;
143
144        let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
145        let script = Script::new(lua);
146        let mut last_error = None;
147
148        for attempt in 0..MAX_RETRIES {
149            let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
150
151            let mut conn = match self
152                .get_connection(self.connections.primary(), op_name)
153                .await
154            {
155                Ok(conn) => conn,
156                Err(e) => {
157                    last_error = Some(e);
158                    if attempt < MAX_RETRIES - 1 {
159                        warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
160                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
161                        continue;
162                    }
163                    return Err(last_error.unwrap());
164                }
165            };
166
167            let mut invocation = script.prepare_invoke();
168            invocation
169                .key(&lookup_key)
170                .arg(&key_prefix)
171                .arg(&key_suffix);
172            for arg in extra_args {
173                invocation.arg(*arg);
174            }
175
176            match invocation.invoke_async::<Option<String>>(&mut conn).await {
177                Ok(result) => {
178                    let json = result.ok_or_else(|| {
179                        RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
180                    })?;
181                    return self.deserialize_entity::<TransactionRepoModel>(
182                        &json,
183                        tx_id,
184                        "transaction",
185                    );
186                }
187                Err(e) => {
188                    last_error = Some(self.map_redis_error(e, op_name));
189                    if attempt < MAX_RETRIES - 1 {
190                        warn!(
191                            tx_id = %tx_id, attempt, op = %op_name,
192                            "atomic script failed, retrying"
193                        );
194                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
195                        continue;
196                    }
197                    return Err(last_error.unwrap());
198                }
199            }
200        }
201        Err(last_error.unwrap_or_else(|| {
202            RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
203        }))
204    }
205
206    /// Executes a Lua script with retry/backoff, returning a Vec<String> result
207    /// (for scripts that return Lua tables / multi-bulk replies).
208    /// Returns `Ok(None)` when the script returns `false`.
209    async fn run_script_with_retry_vec(
210        &self,
211        script: &Script,
212        lookup_key: &str,
213        key_prefix: &str,
214        key_suffix: &str,
215        extra_args: &[&str],
216        op_name: &str,
217    ) -> Result<Option<Vec<String>>, RepositoryError> {
218        const MAX_RETRIES: u32 = 3;
219        const BASE_BACKOFF_MS: u64 = 100;
220
221        let mut last_error = None;
222
223        for attempt in 0..MAX_RETRIES {
224            let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
225
226            let mut conn = match self
227                .get_connection(self.connections.primary(), op_name)
228                .await
229            {
230                Ok(conn) => conn,
231                Err(e) => {
232                    last_error = Some(e);
233                    if attempt < MAX_RETRIES - 1 {
234                        warn!(op = %op_name, attempt, "connection failed, retrying");
235                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
236                        continue;
237                    }
238                    return Err(last_error.unwrap());
239                }
240            };
241
242            let mut invocation = script.prepare_invoke();
243            invocation.key(lookup_key).arg(key_prefix).arg(key_suffix);
244            for arg in extra_args {
245                invocation.arg(*arg);
246            }
247
248            // Redis returns `false` from Lua as a Nil bulk reply, which
249            // redis-rs maps to `None` for `Option<Vec<String>>`.
250            match invocation
251                .invoke_async::<Option<Vec<String>>>(&mut conn)
252                .await
253            {
254                Ok(result) => return Ok(result),
255                Err(e) => {
256                    last_error = Some(self.map_redis_error(e, op_name));
257                    if attempt < MAX_RETRIES - 1 {
258                        warn!(op = %op_name, attempt, "script failed, retrying");
259                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
260                        continue;
261                    }
262                    return Err(last_error.unwrap());
263                }
264            }
265        }
266        Err(last_error.unwrap_or_else(|| {
267            RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
268        }))
269    }
270
271    /// Parse timestamp string to score for sorted set (milliseconds since epoch)
272    fn timestamp_to_score(&self, timestamp: &str) -> f64 {
273        chrono::DateTime::parse_from_rfc3339(timestamp)
274            .map(|dt| dt.timestamp_millis() as f64)
275            .unwrap_or_else(|_| {
276                warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
277                0.0
278            })
279    }
280
281    /// Compute the appropriate score for a transaction's status sorted set.
282    /// - For Confirmed status: use confirmed_at (on-chain confirmation order)
283    /// - For all other statuses: use created_at (queue/processing order)
284    fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
285        if tx.status == TransactionStatus::Confirmed {
286            // For Confirmed, prefer confirmed_at for accurate on-chain ordering
287            if let Some(ref confirmed_at) = tx.confirmed_at {
288                return self.timestamp_to_score(confirmed_at);
289            }
290            // Fallback to created_at if confirmed_at not set (shouldn't happen)
291            warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
292        }
293        self.timestamp_to_score(&tx.created_at)
294    }
295
296    /// Batch fetch transactions by IDs using reverse lookup
297    async fn get_transactions_by_ids(
298        &self,
299        ids: &[String],
300    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
301        if ids.is_empty() {
302            debug!("no transaction IDs provided for batch fetch");
303            return Ok(BatchRetrievalResult {
304                results: vec![],
305                failed_ids: vec![],
306            });
307        }
308
309        let mut conn = self
310            .get_connection(self.connections.reader(), "batch_fetch_transactions")
311            .await?;
312
313        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
314
315        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
316
317        let relayer_ids: Vec<Option<String>> = conn
318            .mget(&reverse_keys)
319            .await
320            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
321
322        let mut tx_keys = Vec::new();
323        let mut valid_ids = Vec::new();
324        let mut failed_ids = Vec::new();
325        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
326            match relayer_id {
327                Some(relayer_id) => {
328                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
329                    valid_ids.push(ids[i].clone());
330                }
331                None => {
332                    warn!(tx_id = %ids[i], "no relayer found for transaction");
333                    failed_ids.push(ids[i].clone());
334                }
335            }
336        }
337
338        if tx_keys.is_empty() {
339            debug!("no valid transactions found for batch fetch");
340            return Ok(BatchRetrievalResult {
341                results: vec![],
342                failed_ids,
343            });
344        }
345
346        debug!(count = %tx_keys.len(), "batch fetching transaction data");
347
348        let values: Vec<Option<String>> = conn
349            .mget(&tx_keys)
350            .await
351            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
352
353        let mut transactions = Vec::new();
354        let mut failed_count = 0;
355        for (i, value) in values.into_iter().enumerate() {
356            match value {
357                Some(json) => {
358                    match self.deserialize_entity::<TransactionRepoModel>(
359                        &json,
360                        &valid_ids[i],
361                        "transaction",
362                    ) {
363                        Ok(tx) => transactions.push(tx),
364                        Err(e) => {
365                            failed_count += 1;
366                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
367                            // Continue processing other transactions
368                        }
369                    }
370                }
371                None => {
372                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
373                    failed_ids.push(valid_ids[i].clone());
374                }
375            }
376        }
377
378        if failed_count > 0 {
379            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
380        }
381
382        debug!(count = %transactions.len(), "successfully fetched transactions");
383        Ok(BatchRetrievalResult {
384            results: transactions,
385            failed_ids,
386        })
387    }
388
389    /// Extract nonce from EVM transaction data
390    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
391        match network_data.get_evm_transaction_data() {
392            Ok(tx_data) => tx_data.nonce,
393            Err(_) => {
394                debug!("no EVM transaction data available for nonce extraction");
395                None
396            }
397        }
398    }
399
400    /// Ensures the status sorted set exists, migrating from legacy SET if needed.
401    ///
402    /// This handles the transition from unordered SETs to sorted SETs for status indexing.
403    /// If the sorted set is empty but the legacy set has data, it migrates the data
404    /// by looking up each transaction's created_at timestamp to compute the score.
405    ///
406    /// # Concurrency
407    /// This function is safe for concurrent calls. If multiple calls race to migrate
408    /// the same status set:
409    /// - ZADD is idempotent (same member + score = no-op)
410    /// - DEL on non-existent key is safe (returns 0)
411    /// - After first successful migration, subsequent calls hit the fast path (ZCARD > 0)
412    ///
413    /// The only downside of concurrent migrations is wasted work, not data corruption.
414    ///
415    /// Returns the count of items in the sorted set after migration.
416    async fn ensure_status_sorted_set(
417        &self,
418        relayer_id: &str,
419        status: &TransactionStatus,
420    ) -> Result<u64, RepositoryError> {
421        let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
422        let legacy_key = self.relayer_status_key(relayer_id, status);
423
424        // Phase 1: Check if migration is needed
425        let legacy_ids = {
426            let mut conn = self
427                .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
428                .await?;
429
430            // Always check if legacy set has data that needs migration
431            let legacy_count: u64 = conn
432                .scard(&legacy_key)
433                .await
434                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
435
436            if legacy_count == 0 {
437                // No legacy data to migrate, return current ZSET count
438                let sorted_count: u64 = conn
439                    .zcard(&sorted_key)
440                    .await
441                    .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
442                return Ok(sorted_count);
443            }
444
445            // Migration needed: get all IDs from legacy set
446            debug!(
447                relayer_id = %relayer_id,
448                status = %status,
449                legacy_count = %legacy_count,
450                "migrating status set to sorted set"
451            );
452
453            let ids: Vec<String> = conn
454                .smembers(&legacy_key)
455                .await
456                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
457
458            ids
459            // Connection dropped here before nested call to avoid connection doubling
460        };
461
462        if legacy_ids.is_empty() {
463            return Ok(0);
464        }
465
466        // Phase 2: Fetch transactions (uses its own connection internally)
467        let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
468
469        // Phase 3: Perform migration with a new connection
470        let mut conn = self
471            .get_connection(
472                self.connections.primary(),
473                "ensure_status_sorted_set_migrate",
474            )
475            .await?;
476
477        if transactions.results.is_empty() {
478            // All transactions were stale/deleted, clean up legacy set
479            let _: () = conn
480                .del(&legacy_key)
481                .await
482                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
483            return Ok(0);
484        }
485
486        // Build sorted set entries and migrate atomically
487        // Use status-aware scoring: confirmed_at for Confirmed, created_at for others
488        let mut pipe = redis::pipe();
489        pipe.atomic();
490
491        for tx in &transactions.results {
492            let score = self.status_sorted_score(tx);
493            pipe.zadd(&sorted_key, &tx.id, score);
494        }
495
496        // Delete legacy set after migration
497        pipe.del(&legacy_key);
498
499        pipe.query_async::<()>(&mut conn)
500            .await
501            .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
502
503        let migrated_count = transactions.results.len() as u64;
504        debug!(
505            relayer_id = %relayer_id,
506            status = %status,
507            migrated_count = %migrated_count,
508            "completed migration of status set to sorted set"
509        );
510
511        Ok(migrated_count)
512    }
513
514    /// Update indexes atomically with comprehensive error handling
515    async fn update_indexes(
516        &self,
517        tx: &TransactionRepoModel,
518        old_tx: Option<&TransactionRepoModel>,
519    ) -> Result<(), RepositoryError> {
520        let mut conn = self
521            .get_connection(self.connections.primary(), "update_indexes")
522            .await?;
523        let mut pipe = redis::pipe();
524        pipe.atomic();
525
526        debug!(tx_id = %tx.id, "updating indexes for transaction");
527
528        // Add relayer to the global relayer list
529        let relayer_list_key = self.relayer_list_key();
530        pipe.sadd(&relayer_list_key, &tx.relayer_id);
531
532        // Compute scores for sorted sets
533        // Status sorted set: uses confirmed_at for Confirmed status, created_at for others
534        let status_score = self.status_sorted_score(tx);
535        // Global tx_by_created_at: always uses created_at for consistent ordering
536        let created_at_score = self.timestamp_to_score(&tx.created_at);
537
538        // Handle status index updates - write to SORTED SET (new format)
539        let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
540        pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
541        debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
542
543        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
544            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
545            pipe.set(&nonce_key, &tx.id);
546            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
547        }
548
549        // Add to per-relayer sorted set by created_at (for efficient sorted pagination)
550        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
551        pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
552        debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
553
554        // Remove old indexes if updating
555        if let Some(old) = old_tx {
556            if old.status != tx.status {
557                // Remove from old status sorted set (new format)
558                let old_status_sorted_key =
559                    self.relayer_status_sorted_key(&old.relayer_id, &old.status);
560                pipe.zrem(&old_status_sorted_key, &tx.id);
561
562                // Also clean up legacy SET if it exists (for migration cleanup)
563                let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
564                pipe.srem(&old_status_legacy_key, &tx.id);
565
566                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
567            }
568
569            // Handle nonce index cleanup
570            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
571                let new_nonce = self.extract_nonce(&tx.network_data);
572                if Some(old_nonce) != new_nonce {
573                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
574                    pipe.del(&old_nonce_key);
575                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
576                }
577            }
578        }
579
580        // Execute all operations in a single pipeline
581        pipe.exec_async(&mut conn).await.map_err(|e| {
582            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
583            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
584        })?;
585
586        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
587        Ok(())
588    }
589
590    /// Remove all indexes with error recovery
591    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
592        let mut conn = self
593            .get_connection(self.connections.primary(), "remove_all_indexes")
594            .await?;
595        let mut pipe = redis::pipe();
596        pipe.atomic();
597
598        debug!(tx_id = %tx.id, "removing all indexes for transaction");
599
600        // Remove from ALL possible status indexes to ensure complete cleanup
601        // This handles cases where a transaction might be in multiple status sets
602        // due to race conditions, partial failures, or bugs
603        for status in &[
604            TransactionStatus::Canceled,
605            TransactionStatus::Pending,
606            TransactionStatus::Sent,
607            TransactionStatus::Submitted,
608            TransactionStatus::Mined,
609            TransactionStatus::Confirmed,
610            TransactionStatus::Failed,
611            TransactionStatus::Expired,
612        ] {
613            // Remove from sorted status set (new format)
614            let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
615            pipe.zrem(&status_sorted_key, &tx.id);
616
617            // Remove from legacy status set (for migration cleanup)
618            let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
619            pipe.srem(&status_legacy_key, &tx.id);
620        }
621
622        // Remove nonce index if exists
623        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
624            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
625            pipe.del(&nonce_key);
626            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
627        }
628
629        // Remove from per-relayer sorted set by created_at
630        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
631        pipe.zrem(&relayer_sorted_key, &tx.id);
632        debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
633
634        // Remove reverse lookup
635        let reverse_key = self.tx_to_relayer_key(&tx.id);
636        pipe.del(&reverse_key);
637
638        pipe.exec_async(&mut conn).await.map_err(|e| {
639            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
640            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
641        })?;
642
643        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
644        Ok(())
645    }
646
647    /// Track Prometheus metrics when a transaction status changes.
648    fn track_status_change_metrics(
649        &self,
650        _original_tx: &TransactionRepoModel,
651        updated_tx: &TransactionRepoModel,
652        old_status: &TransactionStatus,
653        new_status: &TransactionStatus,
654    ) {
655        let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
656        let relayer_id = updated_tx.relayer_id.as_str();
657
658        // Track submission (when status changes to Submitted)
659        if *old_status != TransactionStatus::Submitted
660            && *new_status == TransactionStatus::Submitted
661        {
662            TRANSACTIONS_SUBMITTED
663                .with_label_values(&[relayer_id, &network_type])
664                .inc();
665
666            if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) {
667                let processing_seconds =
668                    (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64;
669                TRANSACTION_PROCESSING_TIME
670                    .with_label_values(&[relayer_id, &network_type, "creation_to_submission"])
671                    .observe(processing_seconds);
672            }
673        }
674
675        // Track status distribution (update gauge when status changes)
676        if old_status != new_status {
677            let old_status_str = format!("{old_status:?}").to_lowercase();
678            let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
679                relayer_id,
680                &network_type,
681                &old_status_str,
682            ]);
683            let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
684            old_status_gauge.set(clamped_value);
685
686            let new_status_str = format!("{new_status:?}").to_lowercase();
687            TRANSACTIONS_BY_STATUS
688                .with_label_values(&[relayer_id, &network_type, &new_status_str])
689                .inc();
690        }
691
692        // Track metrics for final transaction states
693        let was_final = is_final_state(old_status);
694        let is_final = is_final_state(new_status);
695
696        if !was_final && is_final {
697            let previous_status = format!("{old_status:?}").to_lowercase();
698            let meta = updated_tx.metadata.as_ref();
699            let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0);
700            let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0);
701
702            match new_status {
703                TransactionStatus::Confirmed => {
704                    TRANSACTIONS_SUCCESS
705                        .with_label_values(&[relayer_id, &network_type])
706                        .inc();
707                    if had_insufficient_fee {
708                        TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS
709                            .with_label_values(&[relayer_id, &network_type])
710                            .inc();
711                    }
712                    if had_try_again_later {
713                        TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS
714                            .with_label_values(&[relayer_id, &network_type])
715                            .inc();
716                    }
717
718                    if let (Some(sent_at_str), Some(confirmed_at_str)) =
719                        (&updated_tx.sent_at, &updated_tx.confirmed_at)
720                    {
721                        if let (Ok(sent_time), Ok(confirmed_time)) = (
722                            chrono::DateTime::parse_from_rfc3339(sent_at_str),
723                            chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
724                        ) {
725                            let processing_seconds = (confirmed_time.with_timezone(&Utc)
726                                - sent_time.with_timezone(&Utc))
727                            .num_seconds()
728                                as f64;
729                            TRANSACTION_PROCESSING_TIME
730                                .with_label_values(&[
731                                    relayer_id,
732                                    &network_type,
733                                    "submission_to_confirmation",
734                                ])
735                                .observe(processing_seconds);
736                        }
737                    }
738
739                    if let Ok(created_time) =
740                        chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
741                    {
742                        if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
743                            if let Ok(confirmed_time) =
744                                chrono::DateTime::parse_from_rfc3339(confirmed_at_str)
745                            {
746                                let processing_seconds = (confirmed_time.with_timezone(&Utc)
747                                    - created_time.with_timezone(&Utc))
748                                .num_seconds()
749                                    as f64;
750                                TRANSACTION_PROCESSING_TIME
751                                    .with_label_values(&[
752                                        relayer_id,
753                                        &network_type,
754                                        "creation_to_confirmation",
755                                    ])
756                                    .observe(processing_seconds);
757                            }
758                        }
759                    }
760                }
761                TransactionStatus::Failed => {
762                    let failure_reason = updated_tx
763                        .status_reason
764                        .as_deref()
765                        .map(|reason| {
766                            if reason.starts_with("Submission failed:") {
767                                "submission_failed"
768                            } else if reason.starts_with("Preparation failed:") {
769                                "preparation_failed"
770                            } else {
771                                "failed"
772                            }
773                        })
774                        .unwrap_or("failed");
775                    TRANSACTIONS_FAILED
776                        .with_label_values(&[
777                            relayer_id,
778                            &network_type,
779                            failure_reason,
780                            &previous_status,
781                        ])
782                        .inc();
783                }
784                TransactionStatus::Expired => {
785                    TRANSACTIONS_FAILED
786                        .with_label_values(&[
787                            relayer_id,
788                            &network_type,
789                            "expired",
790                            &previous_status,
791                        ])
792                        .inc();
793                }
794                TransactionStatus::Canceled => {
795                    TRANSACTIONS_FAILED
796                        .with_label_values(&[
797                            relayer_id,
798                            &network_type,
799                            "canceled",
800                            &previous_status,
801                        ])
802                        .inc();
803                }
804                _ => {}
805            }
806
807            // Track retry-related failure metrics for all non-success final states
808            if *new_status != TransactionStatus::Confirmed {
809                if had_insufficient_fee {
810                    TRANSACTIONS_INSUFFICIENT_FEE_FAILED
811                        .with_label_values(&[relayer_id, &network_type])
812                        .inc();
813                }
814                if had_try_again_later {
815                    TRANSACTIONS_TRY_AGAIN_LATER_FAILED
816                        .with_label_values(&[relayer_id, &network_type])
817                        .inc();
818                }
819            }
820        }
821    }
822}
823
824impl fmt::Debug for RedisTransactionRepository {
825    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
826        f.debug_struct("RedisTransactionRepository")
827            .field("connections", &"<RedisConnections>")
828            .field("key_prefix", &self.key_prefix)
829            .finish()
830    }
831}
832
833#[async_trait]
834impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
835    async fn create(
836        &self,
837        entity: TransactionRepoModel,
838    ) -> Result<TransactionRepoModel, RepositoryError> {
839        if entity.id.is_empty() {
840            return Err(RepositoryError::InvalidData(
841                "Transaction ID cannot be empty".to_string(),
842            ));
843        }
844
845        let key = self.tx_key(&entity.relayer_id, &entity.id);
846        let reverse_key = self.tx_to_relayer_key(&entity.id);
847        let mut conn = self
848            .get_connection(self.connections.primary(), "create")
849            .await?;
850
851        debug!(tx_id = %entity.id, "creating transaction");
852
853        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
854
855        // Check if transaction already exists by checking reverse lookup
856        let existing: Option<String> = conn
857            .get(&reverse_key)
858            .await
859            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
860
861        if existing.is_some() {
862            return Err(RepositoryError::ConstraintViolation(format!(
863                "Transaction with ID {} already exists",
864                entity.id
865            )));
866        }
867
868        // Use atomic pipeline for consistency
869        let mut pipe = redis::pipe();
870        pipe.atomic();
871        pipe.set(&key, &value);
872        pipe.set(&reverse_key, &entity.relayer_id);
873
874        pipe.exec_async(&mut conn)
875            .await
876            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
877
878        // Update indexes separately to handle partial failures gracefully
879        if let Err(e) = self.update_indexes(&entity, None).await {
880            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
881            return Err(e);
882        }
883
884        // Track transaction creation metric
885        let network_type = format!("{:?}", entity.network_type).to_lowercase();
886        let relayer_id = entity.relayer_id.as_str();
887        TRANSACTIONS_CREATED
888            .with_label_values(&[relayer_id, &network_type])
889            .inc();
890
891        // Track initial status distribution (Pending)
892        let status = &entity.status;
893        let status_str = format!("{status:?}").to_lowercase();
894        TRANSACTIONS_BY_STATUS
895            .with_label_values(&[relayer_id, &network_type, &status_str])
896            .inc();
897
898        debug!(tx_id = %entity.id, "successfully created transaction");
899        Ok(entity)
900    }
901
902    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
903        if id.is_empty() {
904            return Err(RepositoryError::InvalidData(
905                "Transaction ID cannot be empty".to_string(),
906            ));
907        }
908
909        let mut conn = self
910            .get_connection(self.connections.reader(), "get_by_id")
911            .await?;
912
913        debug!(tx_id = %id, "fetching transaction");
914
915        let reverse_key = self.tx_to_relayer_key(&id);
916        let relayer_id: Option<String> = conn
917            .get(&reverse_key)
918            .await
919            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
920
921        let relayer_id = match relayer_id {
922            Some(relayer_id) => relayer_id,
923            None => {
924                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
925                return Err(RepositoryError::NotFound(format!(
926                    "Transaction with ID {id} not found"
927                )));
928            }
929        };
930
931        let key = self.tx_key(&relayer_id, &id);
932        let value: Option<String> = conn
933            .get(&key)
934            .await
935            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
936
937        match value {
938            Some(json) => {
939                let tx =
940                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
941                debug!(tx_id = %id, "successfully fetched transaction");
942                Ok(tx)
943            }
944            None => {
945                debug!(tx_id = %id, "transaction not found");
946                Err(RepositoryError::NotFound(format!(
947                    "Transaction with ID {id} not found"
948                )))
949            }
950        }
951    }
952
953    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
954    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
955        let mut conn = self
956            .get_connection(self.connections.reader(), "list_all")
957            .await?;
958
959        debug!("fetching all transactions sorted by created_at (newest first)");
960
961        // Get all relayer IDs
962        let relayer_list_key = self.relayer_list_key();
963        let relayer_ids: Vec<String> = conn
964            .smembers(&relayer_list_key)
965            .await
966            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
967
968        debug!(count = %relayer_ids.len(), "found relayers");
969
970        // Collect all transaction IDs from all relayers using their sorted sets
971        let mut all_tx_ids = Vec::new();
972        for relayer_id in relayer_ids {
973            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
974            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
975                .arg(&relayer_sorted_key)
976                .arg(0)
977                .arg(-1)
978                .arg("REV")
979                .query_async(&mut conn)
980                .await
981                .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
982
983            all_tx_ids.extend(tx_ids);
984        }
985
986        // Release connection before nested call to avoid connection doubling
987        drop(conn);
988
989        // Batch fetch all transactions at once
990        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
991        let mut all_transactions = batch_result.results;
992
993        // Sort all transactions by created_at (newest first)
994        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
995
996        debug!(count = %all_transactions.len(), "found transactions");
997        Ok(all_transactions)
998    }
999
1000    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
1001    async fn list_paginated(
1002        &self,
1003        query: PaginationQuery,
1004    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1005        if query.per_page == 0 {
1006            return Err(RepositoryError::InvalidData(
1007                "per_page must be greater than 0".to_string(),
1008            ));
1009        }
1010
1011        let mut conn = self
1012            .get_connection(self.connections.reader(), "list_paginated")
1013            .await?;
1014
1015        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
1016
1017        // Get all relayer IDs
1018        let relayer_list_key = self.relayer_list_key();
1019        let relayer_ids: Vec<String> = conn
1020            .smembers(&relayer_list_key)
1021            .await
1022            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
1023
1024        // Collect all transaction IDs from all relayers using their sorted sets
1025        let mut all_tx_ids = Vec::new();
1026        for relayer_id in relayer_ids {
1027            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1028            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
1029                .arg(&relayer_sorted_key)
1030                .arg(0)
1031                .arg(-1)
1032                .arg("REV")
1033                .query_async(&mut conn)
1034                .await
1035                .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
1036
1037            all_tx_ids.extend(tx_ids);
1038        }
1039
1040        // Release connection before nested call to avoid connection doubling
1041        drop(conn);
1042
1043        // Batch fetch all transactions at once
1044        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
1045        let mut all_transactions = batch_result.results;
1046
1047        // Sort all transactions by created_at (newest first)
1048        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1049
1050        let total = all_transactions.len() as u64;
1051        let start = ((query.page - 1) * query.per_page) as usize;
1052        let end = (start + query.per_page as usize).min(all_transactions.len());
1053
1054        if start >= all_transactions.len() {
1055            debug!(page = %query.page, total = %total, "page is beyond available data");
1056            return Ok(PaginatedResult {
1057                items: vec![],
1058                total,
1059                page: query.page,
1060                per_page: query.per_page,
1061            });
1062        }
1063
1064        let items = all_transactions[start..end].to_vec();
1065
1066        debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
1067
1068        Ok(PaginatedResult {
1069            items,
1070            total,
1071            page: query.page,
1072            per_page: query.per_page,
1073        })
1074    }
1075
1076    async fn update(
1077        &self,
1078        id: String,
1079        entity: TransactionRepoModel,
1080    ) -> Result<TransactionRepoModel, RepositoryError> {
1081        if id.is_empty() {
1082            return Err(RepositoryError::InvalidData(
1083                "Transaction ID cannot be empty".to_string(),
1084            ));
1085        }
1086
1087        debug!(tx_id = %id, "updating transaction");
1088
1089        // Get the old transaction for index cleanup
1090        let old_tx = self.get_by_id(id.clone()).await?;
1091
1092        let key = self.tx_key(&entity.relayer_id, &id);
1093        let mut conn = self
1094            .get_connection(self.connections.primary(), "update")
1095            .await?;
1096
1097        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
1098
1099        // Update transaction
1100        let _: () = conn
1101            .set(&key, value)
1102            .await
1103            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
1104
1105        // Update indexes
1106        self.update_indexes(&entity, Some(&old_tx)).await?;
1107
1108        debug!(tx_id = %id, "successfully updated transaction");
1109        Ok(entity)
1110    }
1111
1112    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
1113        if id.is_empty() {
1114            return Err(RepositoryError::InvalidData(
1115                "Transaction ID cannot be empty".to_string(),
1116            ));
1117        }
1118
1119        debug!(tx_id = %id, "deleting transaction");
1120
1121        // Get transaction first for index cleanup
1122        let tx = self.get_by_id(id.clone()).await?;
1123
1124        let key = self.tx_key(&tx.relayer_id, &id);
1125        let reverse_key = self.tx_to_relayer_key(&id);
1126        let mut conn = self
1127            .get_connection(self.connections.primary(), "delete_by_id")
1128            .await?;
1129
1130        let mut pipe = redis::pipe();
1131        pipe.atomic();
1132        pipe.del(&key);
1133        pipe.del(&reverse_key);
1134
1135        pipe.exec_async(&mut conn)
1136            .await
1137            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
1138
1139        // Remove indexes (log errors but don't fail the delete)
1140        if let Err(e) = self.remove_all_indexes(&tx).await {
1141            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
1142        }
1143
1144        debug!(tx_id = %id, "successfully deleted transaction");
1145        Ok(())
1146    }
1147
1148    // Unoptimized implementation of count. Rarely used. find_by_relayer_id is preferred.
1149    async fn count(&self) -> Result<usize, RepositoryError> {
1150        let mut conn = self
1151            .get_connection(self.connections.reader(), "count")
1152            .await?;
1153
1154        debug!("counting transactions");
1155
1156        // Get all relayer IDs and sum their sorted set counts
1157        let relayer_list_key = self.relayer_list_key();
1158        let relayer_ids: Vec<String> = conn
1159            .smembers(&relayer_list_key)
1160            .await
1161            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
1162
1163        let mut total_count = 0usize;
1164        for relayer_id in relayer_ids {
1165            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1166            let count: usize = conn
1167                .zcard(&relayer_sorted_key)
1168                .await
1169                .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
1170            total_count += count;
1171        }
1172
1173        debug!(count = %total_count, "transaction count");
1174        Ok(total_count)
1175    }
1176
1177    async fn has_entries(&self) -> Result<bool, RepositoryError> {
1178        let mut conn = self
1179            .get_connection(self.connections.reader(), "has_entries")
1180            .await?;
1181        let relayer_list_key = self.relayer_list_key();
1182
1183        debug!("checking if transaction entries exist");
1184
1185        let exists: bool = conn
1186            .exists(&relayer_list_key)
1187            .await
1188            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
1189
1190        debug!(exists = %exists, "transaction entries exist");
1191        Ok(exists)
1192    }
1193
1194    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
1195        let mut conn = self
1196            .get_connection(self.connections.primary(), "drop_all_entries")
1197            .await?;
1198        let relayer_list_key = self.relayer_list_key();
1199
1200        debug!("dropping all transaction entries");
1201
1202        // Get all relayer IDs first
1203        let relayer_ids: Vec<String> = conn
1204            .smembers(&relayer_list_key)
1205            .await
1206            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
1207
1208        if relayer_ids.is_empty() {
1209            debug!("no transaction entries to drop");
1210            return Ok(());
1211        }
1212
1213        // Use pipeline for atomic operations
1214        let mut pipe = redis::pipe();
1215        pipe.atomic();
1216
1217        // Delete all transactions and their indexes for each relayer
1218        for relayer_id in &relayer_ids {
1219            // Get all transaction IDs for this relayer
1220            let pattern = format!(
1221                "{}:{}:{}:{}:*",
1222                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
1223            );
1224            let mut cursor = 0;
1225            let mut tx_ids = Vec::new();
1226
1227            loop {
1228                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
1229                    .cursor_arg(cursor)
1230                    .arg("MATCH")
1231                    .arg(&pattern)
1232                    .query_async(&mut conn)
1233                    .await
1234                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
1235
1236                // Extract transaction IDs from keys and delete keys
1237                for key in keys {
1238                    pipe.del(&key);
1239                    if let Some(tx_id) = key.split(':').next_back() {
1240                        tx_ids.push(tx_id.to_string());
1241                    }
1242                }
1243
1244                cursor = next_cursor;
1245                if cursor == 0 {
1246                    break;
1247                }
1248            }
1249
1250            // Delete reverse lookup keys and indexes
1251            for tx_id in tx_ids {
1252                let reverse_key = self.tx_to_relayer_key(&tx_id);
1253                pipe.del(&reverse_key);
1254
1255                // Delete status indexes (we can't know the specific status, so we'll clean up all possible ones)
1256                // This ensures complete cleanup even if there are orphaned entries
1257                for status in &[
1258                    TransactionStatus::Canceled,
1259                    TransactionStatus::Pending,
1260                    TransactionStatus::Sent,
1261                    TransactionStatus::Submitted,
1262                    TransactionStatus::Mined,
1263                    TransactionStatus::Confirmed,
1264                    TransactionStatus::Failed,
1265                    TransactionStatus::Expired,
1266                ] {
1267                    // Remove from sorted status set (new format)
1268                    let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1269                    pipe.zrem(&status_sorted_key, &tx_id);
1270
1271                    // Remove from legacy status set (for migration cleanup)
1272                    let status_key = self.relayer_status_key(relayer_id, status);
1273                    pipe.srem(&status_key, &tx_id);
1274                }
1275            }
1276
1277            // Delete the relayer's sorted set by created_at
1278            let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1279            pipe.del(&relayer_sorted_key);
1280        }
1281
1282        // Delete the relayer list key
1283        pipe.del(&relayer_list_key);
1284
1285        pipe.exec_async(&mut conn)
1286            .await
1287            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
1288
1289        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
1290        Ok(())
1291    }
1292}
1293
1294#[async_trait]
1295impl TransactionRepository for RedisTransactionRepository {
1296    async fn find_by_relayer_id(
1297        &self,
1298        relayer_id: &str,
1299        query: PaginationQuery,
1300    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1301        let mut conn = self
1302            .get_connection(self.connections.reader(), "find_by_relayer_id")
1303            .await?;
1304
1305        debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
1306
1307        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1308
1309        // Get total count from relayer's sorted set
1310        let sorted_set_count: u64 = conn
1311            .zcard(&relayer_sorted_key)
1312            .await
1313            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
1314
1315        // If sorted set is empty, return empty result immediately
1316        // All new transactions are automatically added to the sorted set
1317        if sorted_set_count == 0 {
1318            debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
1319            return Ok(PaginatedResult {
1320                items: vec![],
1321                total: 0,
1322                page: query.page,
1323                per_page: query.per_page,
1324            });
1325        }
1326
1327        let total = sorted_set_count;
1328
1329        // Calculate pagination range (0-indexed for Redis ZRANGE with REV)
1330        let start = ((query.page - 1) * query.per_page) as isize;
1331        let end = start + query.per_page as isize - 1;
1332
1333        if start as u64 >= total {
1334            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1335            return Ok(PaginatedResult {
1336                items: vec![],
1337                total,
1338                page: query.page,
1339                per_page: query.per_page,
1340            });
1341        }
1342
1343        // Get page of transaction IDs from sorted set (newest first using ZRANGE with REV)
1344        let page_ids: Vec<String> = redis::cmd("ZRANGE")
1345            .arg(&relayer_sorted_key)
1346            .arg(start)
1347            .arg(end)
1348            .arg("REV")
1349            .query_async(&mut conn)
1350            .await
1351            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1352
1353        // Release connection before nested call to avoid connection doubling
1354        drop(conn);
1355
1356        let items = self.get_transactions_by_ids(&page_ids).await?;
1357
1358        debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1359
1360        Ok(PaginatedResult {
1361            items: items.results,
1362            total,
1363            page: query.page,
1364            per_page: query.per_page,
1365        })
1366    }
1367
1368    // Unoptimized implementation of find_by_status. Rarely used. find_by_status_paginated is preferred.
1369    async fn find_by_status(
1370        &self,
1371        relayer_id: &str,
1372        statuses: &[TransactionStatus],
1373    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1374        // Ensure all status sorted sets are migrated first (releases connection after each)
1375        for status in statuses {
1376            self.ensure_status_sorted_set(relayer_id, status).await?;
1377        }
1378
1379        // Now get a connection and collect all IDs
1380        let mut conn = self
1381            .get_connection(self.connections.reader(), "find_by_status")
1382            .await?;
1383
1384        let mut all_ids: Vec<String> = Vec::new();
1385        for status in statuses {
1386            // Get IDs from sorted set (already ordered by created_at)
1387            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1388            let ids: Vec<String> = redis::cmd("ZRANGE")
1389                .arg(&sorted_key)
1390                .arg(0)
1391                .arg(-1)
1392                .arg("REV") // Newest first
1393                .query_async(&mut conn)
1394                .await
1395                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1396
1397            all_ids.extend(ids);
1398        }
1399
1400        // Release connection before nested call to avoid connection doubling
1401        drop(conn);
1402
1403        if all_ids.is_empty() {
1404            return Ok(vec![]);
1405        }
1406
1407        // Remove duplicates (can happen if a transaction is in multiple status sets due to partial failures)
1408        all_ids.sort();
1409        all_ids.dedup();
1410
1411        // Fetch all transactions and sort by created_at (newest first)
1412        let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1413
1414        // Sort by created_at descending (newest first)
1415        transactions
1416            .results
1417            .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1418
1419        Ok(transactions.results)
1420    }
1421
1422    async fn find_by_status_paginated(
1423        &self,
1424        relayer_id: &str,
1425        statuses: &[TransactionStatus],
1426        query: PaginationQuery,
1427        oldest_first: bool,
1428    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1429        // Ensure all status sorted sets are migrated first (releases connection after each)
1430        for status in statuses {
1431            self.ensure_status_sorted_set(relayer_id, status).await?;
1432        }
1433
1434        let mut conn = self
1435            .get_connection(self.connections.reader(), "find_by_status_paginated")
1436            .await?;
1437
1438        // For single status, we can paginate directly from the sorted set
1439        if statuses.len() == 1 {
1440            let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1441
1442            // Get total count
1443            let total: u64 = conn
1444                .zcard(&sorted_key)
1445                .await
1446                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1447
1448            if total == 0 {
1449                return Ok(PaginatedResult {
1450                    items: vec![],
1451                    total: 0,
1452                    page: query.page,
1453                    per_page: query.per_page,
1454                });
1455            }
1456
1457            // Calculate pagination bounds
1458            let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1459            let end = start + query.per_page as isize - 1;
1460
1461            // Get page of IDs directly from sorted set
1462            // REV = newest first (descending), no REV = oldest first (ascending)
1463            let mut cmd = redis::cmd("ZRANGE");
1464            cmd.arg(&sorted_key).arg(start).arg(end);
1465            if !oldest_first {
1466                cmd.arg("REV");
1467            }
1468            let page_ids: Vec<String> = cmd
1469                .query_async(&mut conn)
1470                .await
1471                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1472
1473            // Release connection before nested call to avoid connection doubling
1474            drop(conn);
1475
1476            let transactions = self.get_transactions_by_ids(&page_ids).await?;
1477
1478            debug!(
1479                relayer_id = %relayer_id,
1480                status = %statuses[0],
1481                total = %total,
1482                page = %query.page,
1483                page_size = %transactions.results.len(),
1484                "fetched paginated transactions by single status"
1485            );
1486
1487            return Ok(PaginatedResult {
1488                items: transactions.results,
1489                total,
1490                page: query.page,
1491                per_page: query.per_page,
1492            });
1493        }
1494
1495        // For multiple statuses, collect all IDs and merge
1496        let mut all_ids: Vec<(String, f64)> = Vec::new();
1497        for status in statuses {
1498            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1499
1500            // Get IDs with scores for proper sorting
1501            let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1502                .arg(&sorted_key)
1503                .arg(0)
1504                .arg(-1)
1505                .arg("WITHSCORES")
1506                .query_async(&mut conn)
1507                .await
1508                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1509
1510            all_ids.extend(ids_with_scores);
1511        }
1512
1513        // Release connection before nested call to avoid connection doubling
1514        drop(conn);
1515
1516        // Remove duplicates (keep highest/lowest score based on sort order)
1517        let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1518        for (id, score) in all_ids {
1519            id_map
1520                .entry(id)
1521                .and_modify(|s| {
1522                    // For oldest_first, keep the lowest score; otherwise keep highest
1523                    if oldest_first {
1524                        if score < *s {
1525                            *s = score
1526                        }
1527                    } else if score > *s {
1528                        *s = score
1529                    }
1530                })
1531                .or_insert(score);
1532        }
1533
1534        // Sort by score: descending for newest first, ascending for oldest first
1535        let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1536        if oldest_first {
1537            sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1538        } else {
1539            sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1540        }
1541
1542        let total = sorted_ids.len() as u64;
1543
1544        if total == 0 {
1545            return Ok(PaginatedResult {
1546                items: vec![],
1547                total: 0,
1548                page: query.page,
1549                per_page: query.per_page,
1550            });
1551        }
1552
1553        // Apply pagination
1554        let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1555        let page_ids: Vec<String> = sorted_ids
1556            .into_iter()
1557            .skip(start)
1558            .take(query.per_page as usize)
1559            .map(|(id, _)| id)
1560            .collect();
1561
1562        // Fetch only the transactions for this page
1563        let transactions = self.get_transactions_by_ids(&page_ids).await?;
1564
1565        debug!(
1566            relayer_id = %relayer_id,
1567            total = %total,
1568            page = %query.page,
1569            page_size = %transactions.results.len(),
1570            "fetched paginated transactions by status"
1571        );
1572
1573        Ok(PaginatedResult {
1574            items: transactions.results,
1575            total,
1576            page: query.page,
1577            per_page: query.per_page,
1578        })
1579    }
1580
1581    async fn find_by_nonce(
1582        &self,
1583        relayer_id: &str,
1584        nonce: u64,
1585    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1586        let mut conn = self
1587            .get_connection(self.connections.reader(), "find_by_nonce")
1588            .await?;
1589        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1590
1591        // Get transaction ID with this nonce for this relayer (should be single value)
1592        let tx_id: Option<String> = conn
1593            .get(nonce_key)
1594            .await
1595            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1596
1597        match tx_id {
1598            Some(tx_id) => {
1599                match self.get_by_id(tx_id.clone()).await {
1600                    Ok(tx) => Ok(Some(tx)),
1601                    Err(RepositoryError::NotFound(_)) => {
1602                        // Transaction was deleted but index wasn't cleaned up
1603                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1604                        Ok(None)
1605                    }
1606                    Err(e) => Err(e),
1607                }
1608            }
1609            None => Ok(None),
1610        }
1611    }
1612
1613    async fn get_nonce_occupancy(
1614        &self,
1615        relayer_id: &str,
1616        from_nonce: u64,
1617        to_nonce: u64,
1618    ) -> Result<Vec<(u64, Option<TransactionStatus>)>, RepositoryError> {
1619        if from_nonce >= to_nonce {
1620            return Ok(vec![]);
1621        }
1622
1623        let nonces: Vec<u64> = (from_nonce..to_nonce).collect();
1624        let nonce_keys: Vec<String> = nonces
1625            .iter()
1626            .map(|n| self.relayer_nonce_key(relayer_id, *n))
1627            .collect();
1628
1629        // Phase 1: MGET nonce keys → tx_ids (single round trip)
1630        // Uses primary to avoid replica lag fabricating false gaps.
1631        let mut conn = self
1632            .get_connection(self.connections.primary(), "get_nonce_occupancy")
1633            .await?;
1634        let tx_ids: Vec<Option<String>> = redis::cmd("MGET")
1635            .arg(&nonce_keys)
1636            .query_async(&mut conn)
1637            .await
1638            .map_err(|e| self.map_redis_error(e, "get_nonce_occupancy:mget_nonces"))?;
1639
1640        // Build tx data keys for non-None slots. We know the relayer_id so we
1641        // skip the reverse lookup and go straight to the data key.
1642        let mut tx_key_entries: Vec<(usize, String)> = Vec::new();
1643        for (i, tx_id) in tx_ids.iter().enumerate() {
1644            if let Some(id) = tx_id {
1645                tx_key_entries.push((i, self.tx_key(relayer_id, id)));
1646            }
1647        }
1648
1649        // Phase 2: MGET tx data keys → JSON blobs (single round trip)
1650        let tx_statuses: Vec<Option<TransactionStatus>> = if tx_key_entries.is_empty() {
1651            vec![]
1652        } else {
1653            let data_keys: Vec<&str> = tx_key_entries.iter().map(|(_, k)| k.as_str()).collect();
1654            let raw_values: Vec<Option<String>> = redis::cmd("MGET")
1655                .arg(&data_keys)
1656                .query_async(&mut conn)
1657                .await
1658                .map_err(|e| self.map_redis_error(e, "get_nonce_occupancy:mget_txs"))?;
1659
1660            raw_values
1661                .into_iter()
1662                .enumerate()
1663                .map(|(i, v)| {
1664                    v.and_then(|json| {
1665                        match serde_json::from_str::<TransactionRepoModel>(&json) {
1666                            Ok(tx) => Some(tx.status),
1667                            Err(e) => {
1668                                let nonce = tx_key_entries.get(i).map(|(idx, _)| nonces[*idx]);
1669                                warn!(
1670                                    relayer_id = %relayer_id,
1671                                    nonce = ?nonce,
1672                                    error = %e,
1673                                    "get_nonce_occupancy: failed to deserialize transaction, treating as empty"
1674                                );
1675                                None
1676                            }
1677                        }
1678                    })
1679                })
1680                .collect()
1681        };
1682
1683        // Assemble results
1684        let mut results: Vec<(u64, Option<TransactionStatus>)> =
1685            nonces.iter().map(|n| (*n, None)).collect();
1686
1687        for (idx, (original_idx, _)) in tx_key_entries.iter().enumerate() {
1688            if let Some(status) = tx_statuses.get(idx).and_then(|s| s.clone()) {
1689                results[*original_idx].1 = Some(status);
1690            }
1691        }
1692
1693        Ok(results)
1694    }
1695
1696    async fn update_status(
1697        &self,
1698        tx_id: String,
1699        status: TransactionStatus,
1700    ) -> Result<TransactionRepoModel, RepositoryError> {
1701        let update = TransactionUpdateRequest {
1702            status: Some(status),
1703            ..Default::default()
1704        };
1705        self.partial_update(tx_id, update).await
1706    }
1707
1708    async fn partial_update(
1709        &self,
1710        tx_id: String,
1711        update: TransactionUpdateRequest,
1712    ) -> Result<TransactionRepoModel, RepositoryError> {
1713        // Serialize only the non-None fields as a JSON patch.
1714        let patch_json = serde_json::to_string(&update).map_err(|e| {
1715            RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}"))
1716        })?;
1717
1718        // If the update sets a final status, compute delete_at in Rust (depends on server config)
1719        // and include it in the patch so the Lua script applies it atomically.
1720        let delete_at_value = if let Some(ref status) = update.status {
1721            if FINAL_TRANSACTION_STATUSES.contains(status) {
1722                let expiration_hours = ServerConfig::get_transaction_expiration_hours();
1723                let seconds = (expiration_hours * 3600.0) as i64;
1724                let delete_time = Utc::now() + chrono::Duration::seconds(seconds);
1725                Some(delete_time.to_rfc3339())
1726            } else {
1727                None
1728            }
1729        } else {
1730            None
1731        };
1732        let delete_at_arg = delete_at_value.as_deref().unwrap_or("");
1733
1734        let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id);
1735
1736        // Lua script: atomically applies a JSON patch to the stored transaction.
1737        // Guards: rejects status changes on already-finalized transactions.
1738        // Returns a two-element array {old_json, new_json} so Rust has the full
1739        // pre-update state for index cleanup and metrics.
1740        // Returns false if tx not found.
1741        let patch_script = Script::new(
1742            r#"
1743            local relayer_id = redis.call('GET', KEYS[1])
1744            if not relayer_id then return false end
1745
1746            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1747            local current = redis.call('GET', tx_key)
1748            if not current then return false end
1749
1750            local tx = cjson.decode(current)
1751            local patch = cjson.decode(ARGV[3])
1752
1753            -- Guard: reject status changes on finalized transactions.
1754            -- A stale worker must not resurrect a tx that another worker
1755            -- already moved to a terminal state.
1756            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1757            if final_states[tx["status"]] and patch["status"] then
1758                return {current, current}
1759            end
1760
1761            local old_snapshot = current
1762
1763            -- lua-cjson cannot distinguish empty Lua tables from empty
1764            -- arrays, so a decode/encode round-trip turns [] into {}.
1765            -- Record which keys held [] in the stored doc and the patch
1766            -- so we can restore them after cjson.encode.
1767            -- NOTE: this relies on each array-typed field having a unique key
1768            -- name across the entire JSON document (including nested objects).
1769            -- If the model ever introduces duplicate key names at different
1770            -- nesting levels (e.g. metadata.hashes), the gsub below could
1771            -- restore the wrong occurrence.
1772            local empty_arrs = {}
1773            for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do
1774                empty_arrs[k] = true
1775            end
1776            for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do
1777                empty_arrs[k] = true
1778            end
1779
1780            for k, v in pairs(patch) do
1781                tx[k] = v
1782            end
1783
1784            -- Apply delete_at if transitioning to a final state and not already set
1785            if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then
1786                tx["delete_at"] = ARGV[4]
1787            end
1788
1789            local updated = cjson.encode(tx)
1790
1791            -- Restore empty arrays that cjson.encode converted to {}
1792            for k, _ in pairs(empty_arrs) do
1793                updated = string.gsub(
1794                    updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1
1795                )
1796            end
1797
1798            redis.call('SET', tx_key, updated)
1799            return {old_snapshot, updated}
1800            "#,
1801        );
1802
1803        let result: Option<Vec<String>> = self
1804            .run_script_with_retry_vec(
1805                &patch_script,
1806                &lookup_key,
1807                &key_prefix,
1808                &key_suffix,
1809                &[&patch_json, delete_at_arg],
1810                "partial_update",
1811            )
1812            .await?;
1813
1814        let parts = result.ok_or_else(|| {
1815            RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
1816        })?;
1817
1818        if parts.len() != 2 {
1819            return Err(RepositoryError::UnexpectedError(format!(
1820                "partial_update script returned {} elements, expected 2",
1821                parts.len()
1822            )));
1823        }
1824
1825        let old_json = &parts[0];
1826        let new_json = &parts[1];
1827
1828        let original_tx =
1829            self.deserialize_entity::<TransactionRepoModel>(old_json, &tx_id, "transaction")?;
1830        let updated_tx =
1831            self.deserialize_entity::<TransactionRepoModel>(new_json, &tx_id, "transaction")?;
1832
1833        // Update indexes using the full pre-update state (status, network_data, nonce, etc.)
1834        self.update_indexes(&updated_tx, Some(&original_tx)).await?;
1835
1836        debug!(tx_id = %tx_id, "successfully updated transaction via patch");
1837
1838        // Track metrics only when the persisted status actually changed.
1839        // The Lua script may silently reject a status patch on already-final
1840        // transactions, so we compare the deserialized before/after states.
1841        if original_tx.status != updated_tx.status {
1842            self.track_status_change_metrics(
1843                &original_tx,
1844                &updated_tx,
1845                &original_tx.status,
1846                &updated_tx.status,
1847            );
1848        }
1849
1850        Ok(updated_tx)
1851    }
1852
1853    async fn update_network_data(
1854        &self,
1855        tx_id: String,
1856        network_data: NetworkTransactionData,
1857    ) -> Result<TransactionRepoModel, RepositoryError> {
1858        let update = TransactionUpdateRequest {
1859            network_data: Some(network_data),
1860            ..Default::default()
1861        };
1862        self.partial_update(tx_id, update).await
1863    }
1864
1865    async fn set_sent_at(
1866        &self,
1867        tx_id: String,
1868        sent_at: String,
1869    ) -> Result<TransactionRepoModel, RepositoryError> {
1870        let update = TransactionUpdateRequest {
1871            sent_at: Some(sent_at),
1872            ..Default::default()
1873        };
1874        self.partial_update(tx_id, update).await
1875    }
1876
1877    async fn increment_status_check_failures(
1878        &self,
1879        tx_id: String,
1880    ) -> Result<TransactionRepoModel, RepositoryError> {
1881        self.run_atomic_script(
1882            r#"
1883            local function set_obj(json, key, tbl)
1884                local enc = cjson.encode(tbl)
1885                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1886                if n > 0 then return r end
1887                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1888                if n > 0 then return r end
1889                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1890            end
1891
1892            local relayer_id = redis.call('GET', KEYS[1])
1893            if not relayer_id then return false end
1894
1895            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1896            local current = redis.call('GET', tx_key)
1897            if not current then return false end
1898
1899            local tx = cjson.decode(current)
1900            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1901            if final_states[tx["status"]] then return current end
1902
1903            local metadata = tx["metadata"]
1904            if type(metadata) ~= 'table' then metadata = {} end
1905            metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1
1906            metadata["total_failures"] = (metadata["total_failures"] or 0) + 1
1907
1908            local updated = set_obj(current, "metadata", metadata)
1909            redis.call('SET', tx_key, updated)
1910            return updated
1911            "#,
1912            &tx_id,
1913            &[],
1914            "increment_status_check_failures",
1915        )
1916        .await
1917    }
1918
1919    async fn reset_status_check_consecutive_failures(
1920        &self,
1921        tx_id: String,
1922    ) -> Result<TransactionRepoModel, RepositoryError> {
1923        self.run_atomic_script(
1924            r#"
1925            local function set_obj(json, key, tbl)
1926                local enc = cjson.encode(tbl)
1927                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1928                if n > 0 then return r end
1929                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1930                if n > 0 then return r end
1931                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1932            end
1933
1934            local relayer_id = redis.call('GET', KEYS[1])
1935            if not relayer_id then return false end
1936
1937            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1938            local current = redis.call('GET', tx_key)
1939            if not current then return false end
1940
1941            local tx = cjson.decode(current)
1942            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1943            if final_states[tx["status"]] then return current end
1944
1945            local metadata = tx["metadata"]
1946            if type(metadata) ~= 'table' then metadata = {} end
1947            metadata["consecutive_failures"] = 0
1948
1949            local updated = set_obj(current, "metadata", metadata)
1950            redis.call('SET', tx_key, updated)
1951            return updated
1952            "#,
1953            &tx_id,
1954            &[],
1955            "reset_status_check_consecutive_failures",
1956        )
1957        .await
1958    }
1959
1960    async fn record_stellar_insufficient_fee_retry(
1961        &self,
1962        tx_id: String,
1963        sent_at: String,
1964    ) -> Result<TransactionRepoModel, RepositoryError> {
1965        self.run_atomic_script(
1966            r#"
1967            local function set_str(json, key, val)
1968                local enc = cjson.encode(val)
1969                local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1970                if n > 0 then return r end
1971                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1972                if n > 0 then return r end
1973                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1974            end
1975            local function set_obj(json, key, tbl)
1976                local enc = cjson.encode(tbl)
1977                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1978                if n > 0 then return r end
1979                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1980                if n > 0 then return r end
1981                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1982            end
1983
1984            local relayer_id = redis.call('GET', KEYS[1])
1985            if not relayer_id then return false end
1986
1987            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1988            local current = redis.call('GET', tx_key)
1989            if not current then return false end
1990
1991            local tx = cjson.decode(current)
1992            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1993            if final_states[tx["status"]] then return current end
1994
1995            local metadata = tx["metadata"]
1996            if type(metadata) ~= 'table' then metadata = {} end
1997            metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1
1998
1999            local updated = set_str(current, "sent_at", ARGV[3])
2000            updated = set_obj(updated, "metadata", metadata)
2001            redis.call('SET', tx_key, updated)
2002            return updated
2003            "#,
2004            &tx_id,
2005            &[&sent_at],
2006            "record_stellar_insufficient_fee_retry",
2007        )
2008        .await
2009    }
2010
2011    async fn record_stellar_try_again_later_retry(
2012        &self,
2013        tx_id: String,
2014        sent_at: String,
2015    ) -> Result<TransactionRepoModel, RepositoryError> {
2016        self.run_atomic_script(
2017            r#"
2018            local function set_str(json, key, val)
2019                local enc = cjson.encode(val)
2020                local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
2021                if n > 0 then return r end
2022                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
2023                if n > 0 then return r end
2024                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
2025            end
2026            local function set_obj(json, key, tbl)
2027                local enc = cjson.encode(tbl)
2028                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
2029                if n > 0 then return r end
2030                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
2031                if n > 0 then return r end
2032                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
2033            end
2034
2035            local relayer_id = redis.call('GET', KEYS[1])
2036            if not relayer_id then return false end
2037
2038            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
2039            local current = redis.call('GET', tx_key)
2040            if not current then return false end
2041
2042            local tx = cjson.decode(current)
2043            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
2044            if final_states[tx["status"]] then return current end
2045
2046            local metadata = tx["metadata"]
2047            if type(metadata) ~= 'table' then metadata = {} end
2048            metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1
2049
2050            local updated = set_str(current, "sent_at", ARGV[3])
2051            updated = set_obj(updated, "metadata", metadata)
2052            redis.call('SET', tx_key, updated)
2053            return updated
2054            "#,
2055            &tx_id,
2056            &[&sent_at],
2057            "record_stellar_try_again_later_retry",
2058        )
2059        .await
2060    }
2061
2062    async fn set_confirmed_at(
2063        &self,
2064        tx_id: String,
2065        confirmed_at: String,
2066    ) -> Result<TransactionRepoModel, RepositoryError> {
2067        let update = TransactionUpdateRequest {
2068            confirmed_at: Some(confirmed_at),
2069            ..Default::default()
2070        };
2071        self.partial_update(tx_id, update).await
2072    }
2073
2074    /// Count transactions by status using Redis ZCARD (O(1) per sorted set).
2075    /// Much more efficient than find_by_status when you only need the count.
2076    /// Triggers migration from legacy SETs if needed.
2077    async fn count_by_status(
2078        &self,
2079        relayer_id: &str,
2080        statuses: &[TransactionStatus],
2081    ) -> Result<u64, RepositoryError> {
2082        let mut conn = self
2083            .get_connection(self.connections.reader(), "count_by_status")
2084            .await?;
2085        let mut total_count: u64 = 0;
2086
2087        for status in statuses {
2088            // Ensure sorted set is migrated
2089            self.ensure_status_sorted_set(relayer_id, status).await?;
2090
2091            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
2092            let count: u64 = conn
2093                .zcard(&sorted_key)
2094                .await
2095                .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
2096            total_count += count;
2097        }
2098
2099        debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
2100        Ok(total_count)
2101    }
2102
2103    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
2104        if ids.is_empty() {
2105            debug!("no transaction IDs provided for batch delete");
2106            return Ok(BatchDeleteResult::default());
2107        }
2108
2109        debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
2110
2111        // Fetch transactions to get their data for index cleanup
2112        let batch_result = self.get_transactions_by_ids(&ids).await?;
2113
2114        // Convert to delete requests
2115        let requests: Vec<TransactionDeleteRequest> = batch_result
2116            .results
2117            .iter()
2118            .map(|tx| TransactionDeleteRequest {
2119                id: tx.id.clone(),
2120                relayer_id: tx.relayer_id.clone(),
2121                nonce: self.extract_nonce(&tx.network_data),
2122            })
2123            .collect();
2124
2125        // Track IDs that weren't found
2126        let mut result = self.delete_by_requests(requests).await?;
2127
2128        // Add the IDs that weren't found during fetch
2129        for id in batch_result.failed_ids {
2130            result
2131                .failed
2132                .push((id.clone(), format!("Transaction with ID {id} not found")));
2133        }
2134
2135        Ok(result)
2136    }
2137
2138    async fn delete_by_requests(
2139        &self,
2140        requests: Vec<TransactionDeleteRequest>,
2141    ) -> Result<BatchDeleteResult, RepositoryError> {
2142        if requests.is_empty() {
2143            debug!("no delete requests provided for batch delete");
2144            return Ok(BatchDeleteResult::default());
2145        }
2146
2147        debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
2148        let mut conn = self
2149            .get_connection(self.connections.primary(), "batch_delete_no_fetch")
2150            .await?;
2151        let mut pipe = redis::pipe();
2152        pipe.atomic();
2153
2154        // All possible statuses for index cleanup
2155        let all_statuses = [
2156            TransactionStatus::Canceled,
2157            TransactionStatus::Pending,
2158            TransactionStatus::Sent,
2159            TransactionStatus::Submitted,
2160            TransactionStatus::Mined,
2161            TransactionStatus::Confirmed,
2162            TransactionStatus::Failed,
2163            TransactionStatus::Expired,
2164        ];
2165
2166        // Build pipeline for all deletions and index removals
2167        for req in &requests {
2168            // Delete transaction data
2169            let tx_key = self.tx_key(&req.relayer_id, &req.id);
2170            pipe.del(&tx_key);
2171
2172            // Delete reverse lookup
2173            let reverse_key = self.tx_to_relayer_key(&req.id);
2174            pipe.del(&reverse_key);
2175
2176            // Remove from all possible status indexes
2177            for status in &all_statuses {
2178                let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
2179                pipe.zrem(&status_sorted_key, &req.id);
2180
2181                let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
2182                pipe.srem(&status_legacy_key, &req.id);
2183            }
2184
2185            // Remove nonce index if exists
2186            if let Some(nonce) = req.nonce {
2187                let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
2188                pipe.del(&nonce_key);
2189            }
2190
2191            // Remove from per-relayer sorted set by created_at
2192            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
2193            pipe.zrem(&relayer_sorted_key, &req.id);
2194        }
2195
2196        // Execute the entire pipeline in one round-trip
2197        match pipe.exec_async(&mut conn).await {
2198            Ok(_) => {
2199                let deleted_count = requests.len();
2200                debug!(
2201                    deleted_count = %deleted_count,
2202                    "batch delete completed"
2203                );
2204                Ok(BatchDeleteResult {
2205                    deleted_count,
2206                    failed: vec![],
2207                })
2208            }
2209            Err(e) => {
2210                error!(error = %e, "batch delete pipeline failed");
2211                // Mark all requests as failed
2212                let failed: Vec<(String, String)> = requests
2213                    .iter()
2214                    .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
2215                    .collect();
2216                Ok(BatchDeleteResult {
2217                    deleted_count: 0,
2218                    failed,
2219                })
2220            }
2221        }
2222    }
2223}
2224
2225#[cfg(test)]
2226mod tests {
2227    use super::*;
2228    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
2229    use alloy::primitives::U256;
2230    use deadpool_redis::{Config, Runtime};
2231    use lazy_static::lazy_static;
2232    use std::str::FromStr;
2233    use tokio;
2234    use uuid::Uuid;
2235
2236    use tokio::sync::Mutex;
2237
2238    // Use a mutex to ensure tests don't run in parallel when modifying env vars
2239    lazy_static! {
2240        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
2241    }
2242
2243    // Helper function to create test transactions
2244    fn create_test_transaction(id: &str) -> TransactionRepoModel {
2245        TransactionRepoModel {
2246            id: id.to_string(),
2247            relayer_id: "relayer-1".to_string(),
2248            status: TransactionStatus::Pending,
2249            status_reason: None,
2250            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
2251            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2252            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2253            valid_until: None,
2254            delete_at: None,
2255            network_type: NetworkType::Evm,
2256            priced_at: None,
2257            hashes: vec![],
2258            network_data: NetworkTransactionData::Evm(EvmTransactionData {
2259                gas_price: Some(1000000000),
2260                gas_limit: Some(21000),
2261                nonce: Some(1),
2262                value: U256::from_str("1000000000000000000").unwrap(),
2263                data: Some("0x".to_string()),
2264                from: "0xSender".to_string(),
2265                to: Some("0xRecipient".to_string()),
2266                chain_id: 1,
2267                signature: None,
2268                hash: Some(format!("0x{id}")),
2269                speed: Some(Speed::Fast),
2270                max_fee_per_gas: None,
2271                max_priority_fee_per_gas: None,
2272                raw: None,
2273            }),
2274            noop_count: None,
2275            is_canceled: Some(false),
2276            metadata: None,
2277        }
2278    }
2279
2280    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
2281        let mut tx = create_test_transaction(id);
2282        tx.relayer_id = relayer_id.to_string();
2283        tx
2284    }
2285
2286    fn create_test_transaction_with_status(
2287        id: &str,
2288        relayer_id: &str,
2289        status: TransactionStatus,
2290    ) -> TransactionRepoModel {
2291        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2292        tx.status = status;
2293        tx
2294    }
2295
2296    fn create_test_transaction_with_nonce(
2297        id: &str,
2298        nonce: u64,
2299        relayer_id: &str,
2300    ) -> TransactionRepoModel {
2301        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2302        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
2303            evm_data.nonce = Some(nonce);
2304        }
2305        tx
2306    }
2307
2308    async fn setup_test_repo() -> RedisTransactionRepository {
2309        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
2310        let redis_url = std::env::var("REDIS_TEST_URL")
2311            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2312
2313        let cfg = Config::from_url(&redis_url);
2314        let pool = Arc::new(
2315            cfg.builder()
2316                .expect("Failed to create pool builder")
2317                .max_size(16)
2318                .runtime(Runtime::Tokio1)
2319                .build()
2320                .expect("Failed to build Redis pool"),
2321        );
2322
2323        // Create RedisConnections with same pool for both primary and reader (for testing)
2324        let connections = Arc::new(RedisConnections::new_single_pool(pool));
2325
2326        let random_id = Uuid::new_v4().to_string();
2327        let key_prefix = format!("test_prefix:{random_id}");
2328
2329        RedisTransactionRepository::new(connections, key_prefix)
2330            .expect("Failed to create RedisTransactionRepository")
2331    }
2332
2333    #[tokio::test]
2334    #[ignore = "Requires active Redis instance"]
2335    async fn test_new_repository_creation() {
2336        let repo = setup_test_repo().await;
2337        assert!(repo.key_prefix.contains("test_prefix"));
2338    }
2339
2340    #[tokio::test]
2341    #[ignore = "Requires active Redis instance"]
2342    async fn test_new_repository_empty_prefix_fails() {
2343        let redis_url = std::env::var("REDIS_TEST_URL")
2344            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2345        let cfg = Config::from_url(&redis_url);
2346        let pool = Arc::new(
2347            cfg.builder()
2348                .expect("Failed to create pool builder")
2349                .max_size(16)
2350                .runtime(Runtime::Tokio1)
2351                .build()
2352                .expect("Failed to build Redis pool"),
2353        );
2354        let connections = Arc::new(RedisConnections::new_single_pool(pool));
2355
2356        let result = RedisTransactionRepository::new(connections, "".to_string());
2357        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2358    }
2359
2360    #[tokio::test]
2361    #[ignore = "Requires active Redis instance"]
2362    async fn test_key_generation() {
2363        let repo = setup_test_repo().await;
2364
2365        assert!(repo
2366            .tx_key("relayer-1", "test-id")
2367            .contains(":relayer:relayer-1:tx:test-id"));
2368        assert!(repo
2369            .tx_to_relayer_key("test-id")
2370            .contains(":relayer:tx_to_relayer:test-id"));
2371        assert!(repo.relayer_list_key().contains(":relayer_list"));
2372        assert!(repo
2373            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
2374            .contains(":relayer:relayer-1:status:Pending"));
2375        assert!(repo
2376            .relayer_nonce_key("relayer-1", 42)
2377            .contains(":relayer:relayer-1:nonce:42"));
2378    }
2379
2380    #[tokio::test]
2381    #[ignore = "Requires active Redis instance"]
2382    async fn test_serialize_deserialize_transaction() {
2383        let repo = setup_test_repo().await;
2384        let tx = create_test_transaction("test-1");
2385
2386        let serialized = repo
2387            .serialize_entity(&tx, |t| &t.id, "transaction")
2388            .expect("Serialization should succeed");
2389        let deserialized: TransactionRepoModel = repo
2390            .deserialize_entity(&serialized, "test-1", "transaction")
2391            .expect("Deserialization should succeed");
2392
2393        assert_eq!(tx.id, deserialized.id);
2394        assert_eq!(tx.relayer_id, deserialized.relayer_id);
2395        assert_eq!(tx.status, deserialized.status);
2396    }
2397
2398    #[tokio::test]
2399    #[ignore = "Requires active Redis instance"]
2400    async fn test_extract_nonce() {
2401        let repo = setup_test_repo().await;
2402        let random_id = Uuid::new_v4().to_string();
2403        let relayer_id = Uuid::new_v4().to_string();
2404        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2405
2406        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
2407        assert_eq!(nonce, Some(42));
2408    }
2409
2410    #[tokio::test]
2411    #[ignore = "Requires active Redis instance"]
2412    async fn test_create_transaction() {
2413        let repo = setup_test_repo().await;
2414        let random_id = Uuid::new_v4().to_string();
2415        let tx = create_test_transaction(&random_id);
2416
2417        let result = repo.create(tx.clone()).await.unwrap();
2418        assert_eq!(result.id, tx.id);
2419    }
2420
2421    #[tokio::test]
2422    #[ignore = "Requires active Redis instance"]
2423    async fn test_get_transaction() {
2424        let repo = setup_test_repo().await;
2425        let random_id = Uuid::new_v4().to_string();
2426        let tx = create_test_transaction(&random_id);
2427
2428        repo.create(tx.clone()).await.unwrap();
2429        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
2430        assert_eq!(stored.id, tx.id);
2431        assert_eq!(stored.relayer_id, tx.relayer_id);
2432    }
2433
2434    #[tokio::test]
2435    #[ignore = "Requires active Redis instance"]
2436    async fn test_update_transaction() {
2437        let repo = setup_test_repo().await;
2438        let random_id = Uuid::new_v4().to_string();
2439        let mut tx = create_test_transaction(&random_id);
2440
2441        repo.create(tx.clone()).await.unwrap();
2442        tx.status = TransactionStatus::Confirmed;
2443
2444        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
2445        assert!(matches!(updated.status, TransactionStatus::Confirmed));
2446    }
2447
2448    #[tokio::test]
2449    #[ignore = "Requires active Redis instance"]
2450    async fn test_delete_transaction() {
2451        let repo = setup_test_repo().await;
2452        let random_id = Uuid::new_v4().to_string();
2453        let tx = create_test_transaction(&random_id);
2454
2455        repo.create(tx).await.unwrap();
2456        repo.delete_by_id(random_id.to_string()).await.unwrap();
2457
2458        let result = repo.get_by_id(random_id.to_string()).await;
2459        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2460    }
2461
2462    #[tokio::test]
2463    #[ignore = "Requires active Redis instance"]
2464    async fn test_list_all_transactions() {
2465        let repo = setup_test_repo().await;
2466        let random_id = Uuid::new_v4().to_string();
2467        let random_id2 = Uuid::new_v4().to_string();
2468
2469        let tx1 = create_test_transaction(&random_id);
2470        let tx2 = create_test_transaction(&random_id2);
2471
2472        repo.create(tx1).await.unwrap();
2473        repo.create(tx2).await.unwrap();
2474
2475        let transactions = repo.list_all().await.unwrap();
2476        assert!(transactions.len() >= 2);
2477    }
2478
2479    #[tokio::test]
2480    #[ignore = "Requires active Redis instance"]
2481    async fn test_count_transactions() {
2482        let repo = setup_test_repo().await;
2483        let random_id = Uuid::new_v4().to_string();
2484        let tx = create_test_transaction(&random_id);
2485
2486        let count = repo.count().await.unwrap();
2487        repo.create(tx).await.unwrap();
2488        assert!(repo.count().await.unwrap() > count);
2489    }
2490
2491    #[tokio::test]
2492    #[ignore = "Requires active Redis instance"]
2493    async fn test_get_nonexistent_transaction() {
2494        let repo = setup_test_repo().await;
2495        let result = repo.get_by_id("nonexistent".to_string()).await;
2496        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2497    }
2498
2499    #[tokio::test]
2500    #[ignore = "Requires active Redis instance"]
2501    async fn test_duplicate_transaction_creation() {
2502        let repo = setup_test_repo().await;
2503        let random_id = Uuid::new_v4().to_string();
2504
2505        let tx = create_test_transaction(&random_id);
2506
2507        repo.create(tx.clone()).await.unwrap();
2508        let result = repo.create(tx).await;
2509
2510        assert!(matches!(
2511            result,
2512            Err(RepositoryError::ConstraintViolation(_))
2513        ));
2514    }
2515
2516    #[tokio::test]
2517    #[ignore = "Requires active Redis instance"]
2518    async fn test_update_nonexistent_transaction() {
2519        let repo = setup_test_repo().await;
2520        let tx = create_test_transaction("test-1");
2521
2522        let result = repo.update("nonexistent".to_string(), tx).await;
2523        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2524    }
2525
2526    #[tokio::test]
2527    #[ignore = "Requires active Redis instance"]
2528    async fn test_list_paginated() {
2529        let repo = setup_test_repo().await;
2530
2531        // Create multiple transactions
2532        for _ in 1..=10 {
2533            let random_id = Uuid::new_v4().to_string();
2534            let tx = create_test_transaction(&random_id);
2535            repo.create(tx).await.unwrap();
2536        }
2537
2538        // Test first page with 3 items per page
2539        let query = PaginationQuery {
2540            page: 1,
2541            per_page: 3,
2542        };
2543        let result = repo.list_paginated(query).await.unwrap();
2544        assert_eq!(result.items.len(), 3);
2545        assert!(result.total >= 10);
2546        assert_eq!(result.page, 1);
2547        assert_eq!(result.per_page, 3);
2548
2549        // Test empty page (beyond total items)
2550        let query = PaginationQuery {
2551            page: 1000,
2552            per_page: 3,
2553        };
2554        let result = repo.list_paginated(query).await.unwrap();
2555        assert_eq!(result.items.len(), 0);
2556    }
2557
2558    #[tokio::test]
2559    #[ignore = "Requires active Redis instance"]
2560    async fn test_find_by_relayer_id() {
2561        let repo = setup_test_repo().await;
2562        let random_id = Uuid::new_v4().to_string();
2563        let random_id2 = Uuid::new_v4().to_string();
2564        let random_id3 = Uuid::new_v4().to_string();
2565
2566        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2567        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2568        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2569
2570        repo.create(tx1).await.unwrap();
2571        repo.create(tx2).await.unwrap();
2572        repo.create(tx3).await.unwrap();
2573
2574        // Test finding transactions for relayer-1
2575        let query = PaginationQuery {
2576            page: 1,
2577            per_page: 10,
2578        };
2579        let result = repo
2580            .find_by_relayer_id("relayer-1", query.clone())
2581            .await
2582            .unwrap();
2583        assert!(result.total >= 2);
2584        assert!(result.items.len() >= 2);
2585        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2586
2587        // Test finding transactions for relayer-2
2588        let result = repo
2589            .find_by_relayer_id("relayer-2", query.clone())
2590            .await
2591            .unwrap();
2592        assert!(result.total >= 1);
2593        assert!(!result.items.is_empty());
2594        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2595
2596        // Test finding transactions for non-existent relayer
2597        let result = repo
2598            .find_by_relayer_id("non-existent", query.clone())
2599            .await
2600            .unwrap();
2601        assert_eq!(result.total, 0);
2602        assert_eq!(result.items.len(), 0);
2603    }
2604
2605    #[tokio::test]
2606    #[ignore = "Requires active Redis instance"]
2607    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2608        let repo = setup_test_repo().await;
2609        let relayer_id = Uuid::new_v4().to_string();
2610
2611        // Create transactions with different created_at timestamps
2612        let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2613        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2614
2615        let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2616        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2617
2618        let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2619        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2620
2621        // Create transactions in non-chronological order to ensure sorting works
2622        repo.create(tx2.clone()).await.unwrap(); // Middle first
2623        repo.create(tx1.clone()).await.unwrap(); // Oldest second
2624        repo.create(tx3.clone()).await.unwrap(); // Newest last
2625
2626        let query = PaginationQuery {
2627            page: 1,
2628            per_page: 10,
2629        };
2630        let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2631
2632        assert_eq!(result.total, 3);
2633        assert_eq!(result.items.len(), 3);
2634
2635        // Verify transactions are sorted by created_at descending (newest first)
2636        assert_eq!(
2637            result.items[0].id, "test-3",
2638            "First item should be newest (test-3)"
2639        );
2640        assert_eq!(
2641            result.items[0].created_at,
2642            "2025-01-27T14:00:00.000000+00:00"
2643        );
2644
2645        assert_eq!(
2646            result.items[1].id, "test-2",
2647            "Second item should be middle (test-2)"
2648        );
2649        assert_eq!(
2650            result.items[1].created_at,
2651            "2025-01-27T12:00:00.000000+00:00"
2652        );
2653
2654        assert_eq!(
2655            result.items[2].id, "test-1",
2656            "Third item should be oldest (test-1)"
2657        );
2658        assert_eq!(
2659            result.items[2].created_at,
2660            "2025-01-27T10:00:00.000000+00:00"
2661        );
2662    }
2663
2664    #[tokio::test]
2665    #[ignore = "Requires active Redis instance"]
2666    async fn test_find_by_status() {
2667        let repo = setup_test_repo().await;
2668        let random_id = Uuid::new_v4().to_string();
2669        let random_id2 = Uuid::new_v4().to_string();
2670        let random_id3 = Uuid::new_v4().to_string();
2671        let relayer_id = Uuid::new_v4().to_string();
2672        let tx1 = create_test_transaction_with_status(
2673            &random_id,
2674            &relayer_id,
2675            TransactionStatus::Pending,
2676        );
2677        let tx2 =
2678            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2679        let tx3 = create_test_transaction_with_status(
2680            &random_id3,
2681            &relayer_id,
2682            TransactionStatus::Confirmed,
2683        );
2684
2685        repo.create(tx1).await.unwrap();
2686        repo.create(tx2).await.unwrap();
2687        repo.create(tx3).await.unwrap();
2688
2689        // Test finding pending transactions
2690        let result = repo
2691            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2692            .await
2693            .unwrap();
2694        assert_eq!(result.len(), 1);
2695        assert_eq!(result[0].status, TransactionStatus::Pending);
2696
2697        // Test finding multiple statuses
2698        let result = repo
2699            .find_by_status(
2700                &relayer_id,
2701                &[TransactionStatus::Pending, TransactionStatus::Sent],
2702            )
2703            .await
2704            .unwrap();
2705        assert_eq!(result.len(), 2);
2706
2707        // Test finding non-existent status
2708        let result = repo
2709            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2710            .await
2711            .unwrap();
2712        assert_eq!(result.len(), 0);
2713    }
2714
2715    #[tokio::test]
2716    #[ignore = "Requires active Redis instance"]
2717    async fn test_find_by_status_paginated() {
2718        let repo = setup_test_repo().await;
2719        let relayer_id = Uuid::new_v4().to_string();
2720
2721        // Create 5 pending transactions with different timestamps
2722        for i in 1..=5 {
2723            let tx_id = Uuid::new_v4().to_string();
2724            let mut tx = create_test_transaction_with_status(
2725                &tx_id,
2726                &relayer_id,
2727                TransactionStatus::Pending,
2728            );
2729            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2730            repo.create(tx).await.unwrap();
2731        }
2732
2733        // Create 2 confirmed transactions
2734        for i in 6..=7 {
2735            let tx_id = Uuid::new_v4().to_string();
2736            let mut tx = create_test_transaction_with_status(
2737                &tx_id,
2738                &relayer_id,
2739                TransactionStatus::Confirmed,
2740            );
2741            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2742            repo.create(tx).await.unwrap();
2743        }
2744
2745        // Test first page (2 items per page)
2746        let query = PaginationQuery {
2747            page: 1,
2748            per_page: 2,
2749        };
2750        let result = repo
2751            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2752            .await
2753            .unwrap();
2754
2755        assert_eq!(result.total, 5);
2756        assert_eq!(result.items.len(), 2);
2757        assert_eq!(result.page, 1);
2758        assert_eq!(result.per_page, 2);
2759
2760        // Test second page
2761        let query = PaginationQuery {
2762            page: 2,
2763            per_page: 2,
2764        };
2765        let result = repo
2766            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2767            .await
2768            .unwrap();
2769
2770        assert_eq!(result.total, 5);
2771        assert_eq!(result.items.len(), 2);
2772        assert_eq!(result.page, 2);
2773
2774        // Test last page (partial)
2775        let query = PaginationQuery {
2776            page: 3,
2777            per_page: 2,
2778        };
2779        let result = repo
2780            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2781            .await
2782            .unwrap();
2783
2784        assert_eq!(result.total, 5);
2785        assert_eq!(result.items.len(), 1);
2786
2787        // Test multiple statuses
2788        let query = PaginationQuery {
2789            page: 1,
2790            per_page: 10,
2791        };
2792        let result = repo
2793            .find_by_status_paginated(
2794                &relayer_id,
2795                &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2796                query,
2797                false,
2798            )
2799            .await
2800            .unwrap();
2801
2802        assert_eq!(result.total, 7);
2803        assert_eq!(result.items.len(), 7);
2804
2805        // Test empty result
2806        let query = PaginationQuery {
2807            page: 1,
2808            per_page: 10,
2809        };
2810        let result = repo
2811            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2812            .await
2813            .unwrap();
2814
2815        assert_eq!(result.total, 0);
2816        assert_eq!(result.items.len(), 0);
2817    }
2818
2819    #[tokio::test]
2820    #[ignore = "Requires active Redis instance"]
2821    async fn test_find_by_status_paginated_oldest_first() {
2822        let repo = setup_test_repo().await;
2823        let relayer_id = Uuid::new_v4().to_string();
2824
2825        // Create 5 pending transactions with ascending timestamps
2826        for i in 1..=5 {
2827            let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2828            let mut tx = create_test_transaction(&tx_id);
2829            tx.relayer_id = relayer_id.clone();
2830            tx.status = TransactionStatus::Pending;
2831            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2832            repo.create(tx).await.unwrap();
2833        }
2834
2835        // Test oldest_first: true - should return oldest transactions first
2836        let query = PaginationQuery {
2837            page: 1,
2838            per_page: 3,
2839        };
2840        let result = repo
2841            .find_by_status_paginated(
2842                &relayer_id,
2843                &[TransactionStatus::Pending],
2844                query.clone(),
2845                true,
2846            )
2847            .await
2848            .unwrap();
2849
2850        assert_eq!(result.total, 5);
2851        assert_eq!(result.items.len(), 3);
2852        // Verify ordering: oldest first (11:00, 12:00, 13:00)
2853        assert!(
2854            result.items[0].created_at < result.items[1].created_at,
2855            "First item should be older than second"
2856        );
2857        assert!(
2858            result.items[1].created_at < result.items[2].created_at,
2859            "Second item should be older than third"
2860        );
2861
2862        // Contrast with oldest_first: false - should return newest first
2863        let result_newest = repo
2864            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2865            .await
2866            .unwrap();
2867
2868        assert_eq!(result_newest.items.len(), 3);
2869        // Verify ordering: newest first (15:00, 14:00, 13:00)
2870        assert!(
2871            result_newest.items[0].created_at > result_newest.items[1].created_at,
2872            "First item should be newer than second"
2873        );
2874        assert!(
2875            result_newest.items[1].created_at > result_newest.items[2].created_at,
2876            "Second item should be newer than third"
2877        );
2878    }
2879
2880    #[tokio::test]
2881    #[ignore = "Requires active Redis instance"]
2882    async fn test_find_by_status_paginated_oldest_first_single_item() {
2883        let repo = setup_test_repo().await;
2884        let relayer_id = Uuid::new_v4().to_string();
2885
2886        // Create transactions with specific timestamps
2887        let timestamps = [
2888            "2025-01-27T08:00:00.000000+00:00", // oldest
2889            "2025-01-27T10:00:00.000000+00:00", // middle
2890            "2025-01-27T12:00:00.000000+00:00", // newest
2891        ];
2892
2893        let mut oldest_id = String::new();
2894        let mut newest_id = String::new();
2895
2896        for (i, timestamp) in timestamps.iter().enumerate() {
2897            let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2898            if i == 0 {
2899                oldest_id = tx_id.clone();
2900            }
2901            if i == 2 {
2902                newest_id = tx_id.clone();
2903            }
2904            let mut tx = create_test_transaction(&tx_id);
2905            tx.relayer_id = relayer_id.clone();
2906            tx.status = TransactionStatus::Pending;
2907            tx.created_at = timestamp.to_string();
2908            repo.create(tx).await.unwrap();
2909        }
2910
2911        // Request just 1 item with oldest_first: true
2912        let query = PaginationQuery {
2913            page: 1,
2914            per_page: 1,
2915        };
2916        let result = repo
2917            .find_by_status_paginated(
2918                &relayer_id,
2919                &[TransactionStatus::Pending],
2920                query.clone(),
2921                true,
2922            )
2923            .await
2924            .unwrap();
2925
2926        assert_eq!(result.total, 3);
2927        assert_eq!(result.items.len(), 1);
2928        assert_eq!(
2929            result.items[0].id, oldest_id,
2930            "With oldest_first=true and per_page=1, should return the oldest transaction"
2931        );
2932
2933        // Contrast with oldest_first: false
2934        let result = repo
2935            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2936            .await
2937            .unwrap();
2938
2939        assert_eq!(result.items.len(), 1);
2940        assert_eq!(
2941            result.items[0].id, newest_id,
2942            "With oldest_first=false and per_page=1, should return the newest transaction"
2943        );
2944    }
2945
2946    #[tokio::test]
2947    #[ignore = "Requires active Redis instance"]
2948    async fn test_find_by_nonce() {
2949        let repo = setup_test_repo().await;
2950        let random_id = Uuid::new_v4().to_string();
2951        let random_id2 = Uuid::new_v4().to_string();
2952        let relayer_id = Uuid::new_v4().to_string();
2953
2954        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2955        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2956
2957        repo.create(tx1.clone()).await.unwrap();
2958        repo.create(tx2).await.unwrap();
2959
2960        // Test finding existing nonce
2961        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2962        assert!(result.is_some());
2963        assert_eq!(result.unwrap().id, random_id);
2964
2965        // Test finding non-existent nonce
2966        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2967        assert!(result.is_none());
2968
2969        // Test finding nonce for non-existent relayer
2970        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2971        assert!(result.is_none());
2972    }
2973
2974    #[tokio::test]
2975    #[ignore = "Requires active Redis instance"]
2976    async fn test_get_nonce_occupancy_mixed_slots() {
2977        let repo = setup_test_repo().await;
2978        let relayer_id = Uuid::new_v4().to_string();
2979
2980        // nonce 10 → Pending (default), nonce 11 → Failed, nonce 12 → empty
2981        let tx1 = create_test_transaction_with_nonce(&Uuid::new_v4().to_string(), 10, &relayer_id);
2982        repo.create(tx1).await.unwrap();
2983
2984        let mut tx2 =
2985            create_test_transaction_with_nonce(&Uuid::new_v4().to_string(), 11, &relayer_id);
2986        tx2.status = TransactionStatus::Failed;
2987        repo.create(tx2).await.unwrap();
2988
2989        let result = repo.get_nonce_occupancy(&relayer_id, 10, 13).await.unwrap();
2990
2991        assert_eq!(result.len(), 3);
2992        assert_eq!(result[0], (10, Some(TransactionStatus::Pending)));
2993        assert_eq!(result[1], (11, Some(TransactionStatus::Failed)));
2994        assert_eq!(result[2], (12, None));
2995    }
2996
2997    #[tokio::test]
2998    #[ignore = "Requires active Redis instance"]
2999    async fn test_get_nonce_occupancy_empty_range() {
3000        let repo = setup_test_repo().await;
3001
3002        let result = repo.get_nonce_occupancy("any-relayer", 5, 5).await.unwrap();
3003        assert!(result.is_empty());
3004
3005        let result = repo
3006            .get_nonce_occupancy("any-relayer", 10, 5)
3007            .await
3008            .unwrap();
3009        assert!(result.is_empty());
3010    }
3011
3012    #[tokio::test]
3013    #[ignore = "Requires active Redis instance"]
3014    async fn test_get_nonce_occupancy_all_empty() {
3015        let repo = setup_test_repo().await;
3016        let relayer_id = Uuid::new_v4().to_string();
3017
3018        // No transactions created — all slots should be None
3019        let result = repo
3020            .get_nonce_occupancy(&relayer_id, 100, 103)
3021            .await
3022            .unwrap();
3023
3024        assert_eq!(result.len(), 3);
3025        assert!(result.iter().all(|(_, status)| status.is_none()));
3026    }
3027
3028    #[tokio::test]
3029    #[ignore = "Requires active Redis instance"]
3030    async fn test_update_status() {
3031        let repo = setup_test_repo().await;
3032        let random_id = Uuid::new_v4().to_string();
3033        let tx = create_test_transaction(&random_id);
3034
3035        repo.create(tx).await.unwrap();
3036        let updated = repo
3037            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
3038            .await
3039            .unwrap();
3040        assert_eq!(updated.status, TransactionStatus::Confirmed);
3041    }
3042
3043    #[tokio::test]
3044    #[ignore = "Requires active Redis instance"]
3045    async fn test_partial_update() {
3046        let repo = setup_test_repo().await;
3047        let random_id = Uuid::new_v4().to_string();
3048        let tx = create_test_transaction(&random_id);
3049
3050        repo.create(tx).await.unwrap();
3051
3052        let update = TransactionUpdateRequest {
3053            status: Some(TransactionStatus::Sent),
3054            status_reason: Some("Transaction sent".to_string()),
3055            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
3056            confirmed_at: None,
3057            network_data: None,
3058            hashes: None,
3059            is_canceled: None,
3060            priced_at: None,
3061            noop_count: None,
3062            delete_at: None,
3063            metadata: None,
3064        };
3065
3066        let updated = repo
3067            .partial_update(random_id.to_string(), update)
3068            .await
3069            .unwrap();
3070        assert_eq!(updated.status, TransactionStatus::Sent);
3071        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
3072        assert_eq!(
3073            updated.sent_at,
3074            Some("2025-01-27T16:00:00.000000+00:00".to_string())
3075        );
3076    }
3077
3078    #[tokio::test]
3079    #[ignore = "Requires active Redis instance"]
3080    async fn test_set_sent_at() {
3081        let repo = setup_test_repo().await;
3082        let random_id = Uuid::new_v4().to_string();
3083        let tx = create_test_transaction(&random_id);
3084
3085        repo.create(tx).await.unwrap();
3086        let updated = repo
3087            .set_sent_at(
3088                random_id.to_string(),
3089                "2025-01-27T16:00:00.000000+00:00".to_string(),
3090            )
3091            .await
3092            .unwrap();
3093        assert_eq!(
3094            updated.sent_at,
3095            Some("2025-01-27T16:00:00.000000+00:00".to_string())
3096        );
3097    }
3098
3099    #[tokio::test]
3100    #[ignore = "Requires active Redis instance"]
3101    async fn test_set_confirmed_at() {
3102        let repo = setup_test_repo().await;
3103        let random_id = Uuid::new_v4().to_string();
3104        let tx = create_test_transaction(&random_id);
3105
3106        repo.create(tx).await.unwrap();
3107        let updated = repo
3108            .set_confirmed_at(
3109                random_id.to_string(),
3110                "2025-01-27T16:00:00.000000+00:00".to_string(),
3111            )
3112            .await
3113            .unwrap();
3114        assert_eq!(
3115            updated.confirmed_at,
3116            Some("2025-01-27T16:00:00.000000+00:00".to_string())
3117        );
3118    }
3119
3120    #[tokio::test]
3121    #[ignore = "Requires active Redis instance"]
3122    async fn test_update_network_data() {
3123        let repo = setup_test_repo().await;
3124        let random_id = Uuid::new_v4().to_string();
3125        let tx = create_test_transaction(&random_id);
3126
3127        repo.create(tx).await.unwrap();
3128
3129        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
3130            gas_price: Some(2000000000),
3131            gas_limit: Some(42000),
3132            nonce: Some(2),
3133            value: U256::from_str("2000000000000000000").unwrap(),
3134            data: Some("0x1234".to_string()),
3135            from: "0xNewSender".to_string(),
3136            to: Some("0xNewRecipient".to_string()),
3137            chain_id: 1,
3138            signature: None,
3139            hash: Some("0xnewhash".to_string()),
3140            speed: Some(Speed::SafeLow),
3141            max_fee_per_gas: None,
3142            max_priority_fee_per_gas: None,
3143            raw: None,
3144        });
3145
3146        let updated = repo
3147            .update_network_data(random_id.to_string(), new_network_data.clone())
3148            .await
3149            .unwrap();
3150        assert_eq!(
3151            updated
3152                .network_data
3153                .get_evm_transaction_data()
3154                .unwrap()
3155                .hash,
3156            new_network_data.get_evm_transaction_data().unwrap().hash
3157        );
3158    }
3159
3160    #[tokio::test]
3161    #[ignore = "Requires active Redis instance"]
3162    async fn test_debug_implementation() {
3163        let repo = setup_test_repo().await;
3164        let debug_str = format!("{repo:?}");
3165        assert!(debug_str.contains("RedisTransactionRepository"));
3166        assert!(debug_str.contains("test_prefix"));
3167    }
3168
3169    #[tokio::test]
3170    #[ignore = "Requires active Redis instance"]
3171    async fn test_error_handling_empty_id() {
3172        let repo = setup_test_repo().await;
3173
3174        let result = repo.get_by_id("".to_string()).await;
3175        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3176
3177        let result = repo
3178            .update("".to_string(), create_test_transaction("test"))
3179            .await;
3180        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3181
3182        let result = repo.delete_by_id("".to_string()).await;
3183        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3184    }
3185
3186    #[tokio::test]
3187    #[ignore = "Requires active Redis instance"]
3188    async fn test_pagination_validation() {
3189        let repo = setup_test_repo().await;
3190
3191        let query = PaginationQuery {
3192            page: 1,
3193            per_page: 0,
3194        };
3195        let result = repo.list_paginated(query).await;
3196        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3197    }
3198
3199    #[tokio::test]
3200    #[ignore = "Requires active Redis instance"]
3201    async fn test_index_consistency() {
3202        let repo = setup_test_repo().await;
3203        let random_id = Uuid::new_v4().to_string();
3204        let relayer_id = Uuid::new_v4().to_string();
3205        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
3206
3207        // Create transaction
3208        repo.create(tx.clone()).await.unwrap();
3209
3210        // Verify it can be found by nonce
3211        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3212        assert!(found.is_some());
3213
3214        // Update the transaction with a new nonce
3215        let mut updated_tx = tx.clone();
3216        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
3217            evm_data.nonce = Some(43);
3218        }
3219
3220        repo.update(random_id.to_string(), updated_tx)
3221            .await
3222            .unwrap();
3223
3224        // Verify old nonce index is cleaned up
3225        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3226        assert!(old_nonce_result.is_none());
3227
3228        // Verify new nonce index works
3229        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
3230        assert!(new_nonce_result.is_some());
3231    }
3232
3233    #[tokio::test]
3234    #[ignore = "Requires active Redis instance"]
3235    async fn test_has_entries() {
3236        let repo = setup_test_repo().await;
3237        assert!(!repo.has_entries().await.unwrap());
3238
3239        let tx_id = uuid::Uuid::new_v4().to_string();
3240        let tx = create_test_transaction(&tx_id);
3241        repo.create(tx.clone()).await.unwrap();
3242
3243        assert!(repo.has_entries().await.unwrap());
3244    }
3245
3246    #[tokio::test]
3247    #[ignore = "Requires active Redis instance"]
3248    async fn test_drop_all_entries() {
3249        let repo = setup_test_repo().await;
3250        let tx_id = uuid::Uuid::new_v4().to_string();
3251        let tx = create_test_transaction(&tx_id);
3252        repo.create(tx.clone()).await.unwrap();
3253        assert!(repo.has_entries().await.unwrap());
3254
3255        repo.drop_all_entries().await.unwrap();
3256        assert!(!repo.has_entries().await.unwrap());
3257    }
3258
3259    // Tests for delete_at field setting on final status updates
3260    #[tokio::test]
3261    #[ignore = "Requires active Redis instance"]
3262    async fn test_update_status_sets_delete_at_for_final_statuses() {
3263        let _lock = ENV_MUTEX.lock().await;
3264
3265        use chrono::{DateTime, Duration, Utc};
3266        use std::env;
3267
3268        // Use a unique test environment variable to avoid conflicts
3269        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
3270
3271        let repo = setup_test_repo().await;
3272
3273        let final_statuses = [
3274            TransactionStatus::Canceled,
3275            TransactionStatus::Confirmed,
3276            TransactionStatus::Failed,
3277            TransactionStatus::Expired,
3278        ];
3279
3280        for (i, status) in final_statuses.iter().enumerate() {
3281            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
3282            let mut tx = create_test_transaction(&tx_id);
3283
3284            // Ensure transaction has no delete_at initially and is in pending state
3285            tx.delete_at = None;
3286            tx.status = TransactionStatus::Pending;
3287
3288            repo.create(tx).await.unwrap();
3289
3290            let before_update = Utc::now();
3291
3292            // Update to final status
3293            let updated = repo
3294                .update_status(tx_id.clone(), status.clone())
3295                .await
3296                .unwrap();
3297
3298            // Should have delete_at set
3299            assert!(
3300                updated.delete_at.is_some(),
3301                "delete_at should be set for status: {status:?}"
3302            );
3303
3304            // Verify the timestamp is reasonable (approximately 6 hours from now)
3305            let delete_at_str = updated.delete_at.unwrap();
3306            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3307                .expect("delete_at should be valid RFC3339")
3308                .with_timezone(&Utc);
3309
3310            let duration_from_before = delete_at.signed_duration_since(before_update);
3311            let expected_duration = Duration::hours(6);
3312            let tolerance = Duration::minutes(5);
3313
3314            assert!(
3315                duration_from_before >= expected_duration - tolerance
3316                    && duration_from_before <= expected_duration + tolerance,
3317                "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
3318            );
3319        }
3320
3321        // Cleanup
3322        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3323    }
3324
3325    #[tokio::test]
3326    #[ignore = "Requires active Redis instance"]
3327    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
3328        let _lock = ENV_MUTEX.lock().await;
3329
3330        use std::env;
3331
3332        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
3333
3334        let repo = setup_test_repo().await;
3335
3336        let non_final_statuses = [
3337            TransactionStatus::Pending,
3338            TransactionStatus::Sent,
3339            TransactionStatus::Submitted,
3340            TransactionStatus::Mined,
3341        ];
3342
3343        for (i, status) in non_final_statuses.iter().enumerate() {
3344            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
3345            let mut tx = create_test_transaction(&tx_id);
3346            tx.delete_at = None;
3347            tx.status = TransactionStatus::Pending;
3348
3349            repo.create(tx).await.unwrap();
3350
3351            // Update to non-final status
3352            let updated = repo
3353                .update_status(tx_id.clone(), status.clone())
3354                .await
3355                .unwrap();
3356
3357            // Should NOT have delete_at set
3358            assert!(
3359                updated.delete_at.is_none(),
3360                "delete_at should NOT be set for status: {status:?}"
3361            );
3362        }
3363
3364        // Cleanup
3365        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3366    }
3367
3368    #[tokio::test]
3369    #[ignore = "Requires active Redis instance"]
3370    async fn test_partial_update_sets_delete_at_for_final_statuses() {
3371        let _lock = ENV_MUTEX.lock().await;
3372
3373        use chrono::{DateTime, Duration, Utc};
3374        use std::env;
3375
3376        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
3377
3378        let repo = setup_test_repo().await;
3379        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
3380        let mut tx = create_test_transaction(&tx_id);
3381        tx.delete_at = None;
3382        tx.status = TransactionStatus::Pending;
3383
3384        repo.create(tx).await.unwrap();
3385
3386        let before_update = Utc::now();
3387
3388        // Use partial_update to set status to Confirmed (final status)
3389        let update = TransactionUpdateRequest {
3390            status: Some(TransactionStatus::Confirmed),
3391            status_reason: Some("Transaction completed".to_string()),
3392            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
3393            ..Default::default()
3394        };
3395
3396        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
3397
3398        // Should have delete_at set
3399        assert!(
3400            updated.delete_at.is_some(),
3401            "delete_at should be set when updating to Confirmed status"
3402        );
3403
3404        // Verify the timestamp is reasonable (approximately 8 hours from now)
3405        let delete_at_str = updated.delete_at.unwrap();
3406        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3407            .expect("delete_at should be valid RFC3339")
3408            .with_timezone(&Utc);
3409
3410        let duration_from_before = delete_at.signed_duration_since(before_update);
3411        let expected_duration = Duration::hours(8);
3412        let tolerance = Duration::minutes(5);
3413
3414        assert!(
3415            duration_from_before >= expected_duration - tolerance
3416                && duration_from_before <= expected_duration + tolerance,
3417            "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
3418        );
3419
3420        // Also verify other fields were updated
3421        assert_eq!(updated.status, TransactionStatus::Confirmed);
3422        assert_eq!(
3423            updated.status_reason,
3424            Some("Transaction completed".to_string())
3425        );
3426        assert_eq!(
3427            updated.confirmed_at,
3428            Some("2023-01-01T12:05:00Z".to_string())
3429        );
3430
3431        // Cleanup
3432        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3433    }
3434
3435    #[tokio::test]
3436    #[ignore = "Requires active Redis instance"]
3437    async fn test_update_status_preserves_existing_delete_at() {
3438        let _lock = ENV_MUTEX.lock().await;
3439
3440        use std::env;
3441
3442        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3443
3444        let repo = setup_test_repo().await;
3445        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3446        let mut tx = create_test_transaction(&tx_id);
3447
3448        // Set an existing delete_at value
3449        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3450        tx.delete_at = Some(existing_delete_at.clone());
3451        tx.status = TransactionStatus::Pending;
3452
3453        repo.create(tx).await.unwrap();
3454
3455        // Update to final status
3456        let updated = repo
3457            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3458            .await
3459            .unwrap();
3460
3461        // Should preserve the existing delete_at value
3462        assert_eq!(
3463            updated.delete_at,
3464            Some(existing_delete_at),
3465            "Existing delete_at should be preserved when updating to final status"
3466        );
3467
3468        // Cleanup
3469        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3470    }
3471    #[tokio::test]
3472    #[ignore = "Requires active Redis instance"]
3473    async fn test_partial_update_without_status_change_preserves_delete_at() {
3474        let _lock = ENV_MUTEX.lock().await;
3475
3476        use std::env;
3477
3478        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3479
3480        let repo = setup_test_repo().await;
3481        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3482        let mut tx = create_test_transaction(&tx_id);
3483        tx.delete_at = None;
3484        tx.status = TransactionStatus::Pending;
3485
3486        repo.create(tx).await.unwrap();
3487
3488        // First, update to final status to set delete_at
3489        let updated1 = repo
3490            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3491            .await
3492            .unwrap();
3493
3494        assert!(updated1.delete_at.is_some());
3495        let original_delete_at = updated1.delete_at.clone();
3496
3497        // Now update other fields without changing status
3498        let update = TransactionUpdateRequest {
3499            status: None, // No status change
3500            status_reason: Some("Updated reason".to_string()),
3501            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3502            ..Default::default()
3503        };
3504
3505        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3506
3507        // delete_at should be preserved
3508        assert_eq!(
3509            updated2.delete_at, original_delete_at,
3510            "delete_at should be preserved when status is not updated"
3511        );
3512
3513        // Other fields should be updated
3514        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
3515        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3516        assert_eq!(
3517            updated2.confirmed_at,
3518            Some("2023-01-01T12:10:00Z".to_string())
3519        );
3520
3521        // Cleanup
3522        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3523    }
3524
3525    // Tests for delete_by_ids batch delete functionality
3526
3527    #[tokio::test]
3528    #[ignore = "Requires active Redis instance"]
3529    async fn test_delete_by_ids_empty_list() {
3530        let repo = setup_test_repo().await;
3531        let tx_id = format!("test-empty-{}", Uuid::new_v4());
3532
3533        // Create a transaction to ensure repo is not empty
3534        let tx = create_test_transaction(&tx_id);
3535        repo.create(tx).await.unwrap();
3536
3537        // Delete with empty list should succeed and not affect existing data
3538        let result = repo.delete_by_ids(vec![]).await.unwrap();
3539
3540        assert_eq!(result.deleted_count, 0);
3541        assert!(result.failed.is_empty());
3542
3543        // Original transaction should still exist
3544        assert!(repo.get_by_id(tx_id).await.is_ok());
3545    }
3546
3547    #[tokio::test]
3548    #[ignore = "Requires active Redis instance"]
3549    async fn test_delete_by_ids_single_transaction() {
3550        let repo = setup_test_repo().await;
3551        let tx_id = format!("test-single-{}", Uuid::new_v4());
3552
3553        let tx = create_test_transaction(&tx_id);
3554        repo.create(tx).await.unwrap();
3555
3556        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3557
3558        assert_eq!(result.deleted_count, 1);
3559        assert!(result.failed.is_empty());
3560
3561        // Verify transaction was deleted
3562        assert!(repo.get_by_id(tx_id).await.is_err());
3563    }
3564
3565    #[tokio::test]
3566    #[ignore = "Requires active Redis instance"]
3567    async fn test_delete_by_ids_multiple_transactions() {
3568        let repo = setup_test_repo().await;
3569        let base_id = Uuid::new_v4();
3570
3571        // Create multiple transactions
3572        let mut created_ids = Vec::new();
3573        for i in 1..=5 {
3574            let tx_id = format!("test-multi-{base_id}-{i}");
3575            let tx = create_test_transaction(&tx_id);
3576            repo.create(tx).await.unwrap();
3577            created_ids.push(tx_id);
3578        }
3579
3580        // Delete 3 of them
3581        let ids_to_delete = vec![
3582            created_ids[0].clone(),
3583            created_ids[2].clone(),
3584            created_ids[4].clone(),
3585        ];
3586        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3587
3588        assert_eq!(result.deleted_count, 3);
3589        assert!(result.failed.is_empty());
3590
3591        // Verify correct transactions were deleted
3592        assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3593        assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); // Not deleted
3594        assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3595        assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); // Not deleted
3596        assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3597    }
3598
3599    #[tokio::test]
3600    #[ignore = "Requires active Redis instance"]
3601    async fn test_delete_by_ids_nonexistent_transactions() {
3602        let repo = setup_test_repo().await;
3603        let base_id = Uuid::new_v4();
3604
3605        // Try to delete transactions that don't exist
3606        let ids_to_delete = vec![
3607            format!("nonexistent-{}-1", base_id),
3608            format!("nonexistent-{}-2", base_id),
3609        ];
3610        let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3611
3612        assert_eq!(result.deleted_count, 0);
3613        assert_eq!(result.failed.len(), 2);
3614
3615        // Verify error messages contain the IDs
3616        let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3617        assert!(failed_ids.contains(&&ids_to_delete[0]));
3618        assert!(failed_ids.contains(&&ids_to_delete[1]));
3619    }
3620
3621    #[tokio::test]
3622    #[ignore = "Requires active Redis instance"]
3623    async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3624        let repo = setup_test_repo().await;
3625        let base_id = Uuid::new_v4();
3626
3627        // Create some transactions
3628        let existing_ids: Vec<String> = (1..=3)
3629            .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3630            .collect();
3631
3632        for id in &existing_ids {
3633            let tx = create_test_transaction(id);
3634            repo.create(tx).await.unwrap();
3635        }
3636
3637        let nonexistent_ids: Vec<String> = (1..=2)
3638            .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3639            .collect();
3640
3641        // Try to delete mix of existing and non-existing
3642        let ids_to_delete = vec![
3643            existing_ids[0].clone(),
3644            nonexistent_ids[0].clone(),
3645            existing_ids[1].clone(),
3646            nonexistent_ids[1].clone(),
3647        ];
3648        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3649
3650        assert_eq!(result.deleted_count, 2);
3651        assert_eq!(result.failed.len(), 2);
3652
3653        // Verify existing transactions were deleted
3654        assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3655        assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3656
3657        // Verify remaining transaction still exists
3658        assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3659    }
3660
3661    #[tokio::test]
3662    #[ignore = "Requires active Redis instance"]
3663    async fn test_delete_by_ids_removes_all_indexes() {
3664        let repo = setup_test_repo().await;
3665        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3666        let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3667
3668        // Create a transaction with specific status
3669        let mut tx = create_test_transaction(&tx_id);
3670        tx.relayer_id = relayer_id.clone();
3671        tx.status = TransactionStatus::Confirmed;
3672        repo.create(tx).await.unwrap();
3673
3674        // Verify transaction exists and is indexed
3675        let found = repo
3676            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3677            .await
3678            .unwrap();
3679        assert!(found.iter().any(|t| t.id == tx_id));
3680
3681        // Delete the transaction
3682        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3683        assert_eq!(result.deleted_count, 1);
3684
3685        // Verify transaction is no longer in status index
3686        let found_after = repo
3687            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3688            .await
3689            .unwrap();
3690        assert!(!found_after.iter().any(|t| t.id == tx_id));
3691
3692        // Verify transaction cannot be found
3693        assert!(repo.get_by_id(tx_id).await.is_err());
3694    }
3695
3696    #[tokio::test]
3697    #[ignore = "Requires active Redis instance"]
3698    async fn test_delete_by_ids_removes_nonce_index() {
3699        let repo = setup_test_repo().await;
3700        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3701        let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3702        let nonce = 12345u64;
3703
3704        // Create a transaction with a specific nonce
3705        let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3706        repo.create(tx).await.unwrap();
3707
3708        // Verify nonce index works
3709        let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3710        assert!(found.is_some());
3711        assert_eq!(found.unwrap().id, tx_id);
3712
3713        // Delete the transaction
3714        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3715        assert_eq!(result.deleted_count, 1);
3716
3717        // Verify nonce index was cleaned up
3718        let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3719        assert!(found_after.is_none());
3720    }
3721
3722    #[tokio::test]
3723    #[ignore = "Requires active Redis instance"]
3724    async fn test_delete_by_ids_large_batch() {
3725        let repo = setup_test_repo().await;
3726        let base_id = Uuid::new_v4();
3727
3728        // Create many transactions to test batch performance
3729        let count = 50;
3730        let mut created_ids = Vec::new();
3731
3732        for i in 0..count {
3733            let tx_id = format!("test-large-{base_id}-{i}");
3734            let tx = create_test_transaction(&tx_id);
3735            repo.create(tx).await.unwrap();
3736            created_ids.push(tx_id);
3737        }
3738
3739        // Delete all of them in one batch
3740        let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3741
3742        assert_eq!(result.deleted_count, count);
3743        assert!(result.failed.is_empty());
3744
3745        // Verify all were deleted
3746        for id in created_ids {
3747            assert!(repo.get_by_id(id).await.is_err());
3748        }
3749    }
3750
3751    #[tokio::test]
3752    #[ignore = "Requires active Redis instance"]
3753    async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3754        let repo = setup_test_repo().await;
3755        let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3756        let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3757        let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3758        let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3759
3760        // Create transactions for different relayers
3761        let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3762        let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3763
3764        repo.create(tx1).await.unwrap();
3765        repo.create(tx2).await.unwrap();
3766
3767        // Delete only relayer-1's transaction
3768        let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3769
3770        assert_eq!(result.deleted_count, 1);
3771
3772        // relayer-1's transaction should be deleted
3773        assert!(repo.get_by_id(tx_id_1).await.is_err());
3774
3775        // relayer-2's transaction should still exist
3776        let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3777        assert_eq!(remaining.relayer_id, relayer_2);
3778    }
3779
3780    // ── increment_status_check_failures ─────────────────────────────
3781
3782    #[tokio::test]
3783    #[ignore = "Requires active Redis instance"]
3784    async fn test_increment_status_check_failures_no_prior_metadata() {
3785        let _lock = ENV_MUTEX.lock().await;
3786        let repo = setup_test_repo().await;
3787        let relayer_id = Uuid::new_v4().to_string();
3788        let tx_id = Uuid::new_v4().to_string();
3789        let mut tx =
3790            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3791        tx.metadata = None;
3792        repo.create(tx).await.unwrap();
3793
3794        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3795
3796        let meta = updated.metadata.expect("metadata should be set");
3797        assert_eq!(meta.consecutive_failures, 1);
3798        assert_eq!(meta.total_failures, 1);
3799        assert_eq!(meta.insufficient_fee_retries, 0);
3800    }
3801
3802    #[tokio::test]
3803    #[ignore = "Requires active Redis instance"]
3804    async fn test_increment_status_check_failures_accumulates() {
3805        let _lock = ENV_MUTEX.lock().await;
3806        let repo = setup_test_repo().await;
3807        let relayer_id = Uuid::new_v4().to_string();
3808        let tx_id = Uuid::new_v4().to_string();
3809        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3810        repo.create(tx).await.unwrap();
3811
3812        repo.increment_status_check_failures(tx_id.clone())
3813            .await
3814            .unwrap();
3815        repo.increment_status_check_failures(tx_id.clone())
3816            .await
3817            .unwrap();
3818        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3819
3820        let meta = updated.metadata.unwrap();
3821        assert_eq!(meta.consecutive_failures, 3);
3822        assert_eq!(meta.total_failures, 3);
3823    }
3824
3825    #[tokio::test]
3826    #[ignore = "Requires active Redis instance"]
3827    async fn test_increment_status_check_failures_noop_on_final_state() {
3828        let _lock = ENV_MUTEX.lock().await;
3829        let repo = setup_test_repo().await;
3830        let relayer_id = Uuid::new_v4().to_string();
3831        let tx_id = Uuid::new_v4().to_string();
3832        let tx =
3833            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3834        repo.create(tx).await.unwrap();
3835
3836        let result = repo.increment_status_check_failures(tx_id).await.unwrap();
3837
3838        // Should return unchanged — no metadata mutation on final state
3839        assert!(result.metadata.is_none());
3840        assert_eq!(result.status, TransactionStatus::Confirmed);
3841    }
3842
3843    #[tokio::test]
3844    #[ignore = "Requires active Redis instance"]
3845    async fn test_increment_status_check_failures_not_found() {
3846        let _lock = ENV_MUTEX.lock().await;
3847        let repo = setup_test_repo().await;
3848
3849        let result = repo
3850            .increment_status_check_failures("nonexistent".to_string())
3851            .await;
3852
3853        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3854    }
3855
3856    // ── reset_status_check_consecutive_failures ─────────────────────
3857
3858    #[tokio::test]
3859    #[ignore = "Requires active Redis instance"]
3860    async fn test_reset_consecutive_failures() {
3861        let _lock = ENV_MUTEX.lock().await;
3862        let repo = setup_test_repo().await;
3863        let relayer_id = Uuid::new_v4().to_string();
3864        let tx_id = Uuid::new_v4().to_string();
3865        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3866        repo.create(tx).await.unwrap();
3867
3868        // Increment a few times first
3869        repo.increment_status_check_failures(tx_id.clone())
3870            .await
3871            .unwrap();
3872        repo.increment_status_check_failures(tx_id.clone())
3873            .await
3874            .unwrap();
3875
3876        let updated = repo
3877            .reset_status_check_consecutive_failures(tx_id)
3878            .await
3879            .unwrap();
3880
3881        let meta = updated.metadata.unwrap();
3882        assert_eq!(meta.consecutive_failures, 0);
3883        // total_failures should be preserved
3884        assert_eq!(meta.total_failures, 2);
3885    }
3886
3887    #[tokio::test]
3888    #[ignore = "Requires active Redis instance"]
3889    async fn test_reset_consecutive_failures_noop_on_final_state() {
3890        let _lock = ENV_MUTEX.lock().await;
3891        let repo = setup_test_repo().await;
3892        let relayer_id = Uuid::new_v4().to_string();
3893        let tx_id = Uuid::new_v4().to_string();
3894        let mut tx =
3895            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed);
3896        tx.metadata = Some(crate::models::TransactionMetadata {
3897            consecutive_failures: 5,
3898            total_failures: 10,
3899            insufficient_fee_retries: 0,
3900            try_again_later_retries: 0,
3901            nonce_too_high_retries: 0,
3902        });
3903        repo.create(tx).await.unwrap();
3904
3905        let result = repo
3906            .reset_status_check_consecutive_failures(tx_id)
3907            .await
3908            .unwrap();
3909
3910        // Should return unchanged on final state
3911        let meta = result.metadata.unwrap();
3912        assert_eq!(meta.consecutive_failures, 5);
3913    }
3914
3915    #[tokio::test]
3916    #[ignore = "Requires active Redis instance"]
3917    async fn test_reset_consecutive_failures_not_found() {
3918        let _lock = ENV_MUTEX.lock().await;
3919        let repo = setup_test_repo().await;
3920
3921        let result = repo
3922            .reset_status_check_consecutive_failures("nonexistent".to_string())
3923            .await;
3924
3925        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3926    }
3927
3928    // ── record_stellar_insufficient_fee_retry ───────────────────────
3929
3930    #[tokio::test]
3931    #[ignore = "Requires active Redis instance"]
3932    async fn test_record_insufficient_fee_retry() {
3933        let _lock = ENV_MUTEX.lock().await;
3934        let repo = setup_test_repo().await;
3935        let relayer_id = Uuid::new_v4().to_string();
3936        let tx_id = Uuid::new_v4().to_string();
3937        let mut tx =
3938            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3939        tx.sent_at = None;
3940        repo.create(tx).await.unwrap();
3941
3942        let updated = repo
3943            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3944            .await
3945            .unwrap();
3946
3947        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3948        let meta = updated.metadata.unwrap();
3949        assert_eq!(meta.insufficient_fee_retries, 1);
3950        assert_eq!(meta.consecutive_failures, 0);
3951        assert_eq!(meta.total_failures, 0);
3952    }
3953
3954    #[tokio::test]
3955    #[ignore = "Requires active Redis instance"]
3956    async fn test_record_insufficient_fee_retry_accumulates() {
3957        let _lock = ENV_MUTEX.lock().await;
3958        let repo = setup_test_repo().await;
3959        let relayer_id = Uuid::new_v4().to_string();
3960        let tx_id = Uuid::new_v4().to_string();
3961        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3962        repo.create(tx).await.unwrap();
3963
3964        repo.record_stellar_insufficient_fee_retry(
3965            tx_id.clone(),
3966            "2025-03-18T10:00:00Z".to_string(),
3967        )
3968        .await
3969        .unwrap();
3970
3971        let updated = repo
3972            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3973            .await
3974            .unwrap();
3975
3976        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3977        let meta = updated.metadata.unwrap();
3978        assert_eq!(meta.insufficient_fee_retries, 2);
3979    }
3980
3981    #[tokio::test]
3982    #[ignore = "Requires active Redis instance"]
3983    async fn test_record_insufficient_fee_retry_noop_on_final_state() {
3984        let _lock = ENV_MUTEX.lock().await;
3985        let repo = setup_test_repo().await;
3986        let relayer_id = Uuid::new_v4().to_string();
3987        let tx_id = Uuid::new_v4().to_string();
3988        let mut tx =
3989            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3990        tx.sent_at = Some("old-time".to_string());
3991        repo.create(tx).await.unwrap();
3992
3993        let result = repo
3994            .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string())
3995            .await
3996            .unwrap();
3997
3998        // Should return unchanged on final state
3999        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
4000        assert!(result.metadata.is_none());
4001    }
4002
4003    #[tokio::test]
4004    #[ignore = "Requires active Redis instance"]
4005    async fn test_record_insufficient_fee_retry_not_found() {
4006        let _lock = ENV_MUTEX.lock().await;
4007        let repo = setup_test_repo().await;
4008
4009        let result = repo
4010            .record_stellar_insufficient_fee_retry(
4011                "nonexistent".to_string(),
4012                "2025-03-18T10:00:00Z".to_string(),
4013            )
4014            .await;
4015
4016        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
4017    }
4018
4019    // ── record_stellar_try_again_later_retry ────────────────────────
4020
4021    #[tokio::test]
4022    #[ignore = "Requires active Redis instance"]
4023    async fn test_record_try_again_later_retry() {
4024        let _lock = ENV_MUTEX.lock().await;
4025        let repo = setup_test_repo().await;
4026        let relayer_id = Uuid::new_v4().to_string();
4027        let tx_id = Uuid::new_v4().to_string();
4028        let mut tx =
4029            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4030        tx.sent_at = None;
4031        repo.create(tx).await.unwrap();
4032
4033        let updated = repo
4034            .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
4035            .await
4036            .unwrap();
4037
4038        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
4039        let meta = updated.metadata.unwrap();
4040        assert_eq!(meta.try_again_later_retries, 1);
4041        assert_eq!(meta.consecutive_failures, 0);
4042        assert_eq!(meta.total_failures, 0);
4043    }
4044
4045    #[tokio::test]
4046    #[ignore = "Requires active Redis instance"]
4047    async fn test_record_try_again_later_retry_accumulates() {
4048        let _lock = ENV_MUTEX.lock().await;
4049        let repo = setup_test_repo().await;
4050        let relayer_id = Uuid::new_v4().to_string();
4051        let tx_id = Uuid::new_v4().to_string();
4052        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4053        repo.create(tx).await.unwrap();
4054
4055        repo.record_stellar_try_again_later_retry(
4056            tx_id.clone(),
4057            "2025-03-18T10:00:00Z".to_string(),
4058        )
4059        .await
4060        .unwrap();
4061
4062        let updated = repo
4063            .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
4064            .await
4065            .unwrap();
4066
4067        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
4068        let meta = updated.metadata.unwrap();
4069        assert_eq!(meta.try_again_later_retries, 2);
4070    }
4071
4072    #[tokio::test]
4073    #[ignore = "Requires active Redis instance"]
4074    async fn test_record_try_again_later_retry_noop_on_final_state() {
4075        let _lock = ENV_MUTEX.lock().await;
4076        let repo = setup_test_repo().await;
4077        let relayer_id = Uuid::new_v4().to_string();
4078        let tx_id = Uuid::new_v4().to_string();
4079        let mut tx =
4080            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
4081        tx.sent_at = Some("old-time".to_string());
4082        repo.create(tx).await.unwrap();
4083
4084        let result = repo
4085            .record_stellar_try_again_later_retry(tx_id, "new-time".to_string())
4086            .await
4087            .unwrap();
4088
4089        // Should return unchanged on final state
4090        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
4091        assert!(result.metadata.is_none());
4092    }
4093
4094    #[tokio::test]
4095    #[ignore = "Requires active Redis instance"]
4096    async fn test_record_try_again_later_retry_not_found() {
4097        let _lock = ENV_MUTEX.lock().await;
4098        let repo = setup_test_repo().await;
4099
4100        let result = repo
4101            .record_stellar_try_again_later_retry(
4102                "nonexistent".to_string(),
4103                "2025-03-18T10:00:00Z".to_string(),
4104            )
4105            .await;
4106
4107        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
4108    }
4109
4110    // ── metadata preservation across operations ─────────────────────
4111
4112    #[tokio::test]
4113    #[ignore = "Requires active Redis instance"]
4114    async fn test_increment_failures_preserves_try_again_later_retries() {
4115        let _lock = ENV_MUTEX.lock().await;
4116        let repo = setup_test_repo().await;
4117        let relayer_id = Uuid::new_v4().to_string();
4118        let tx_id = Uuid::new_v4().to_string();
4119        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4120        repo.create(tx).await.unwrap();
4121
4122        // Set try_again_later_retries = 1
4123        repo.record_stellar_try_again_later_retry(
4124            tx_id.clone(),
4125            "2025-03-18T10:00:00Z".to_string(),
4126        )
4127        .await
4128        .unwrap();
4129
4130        // Now increment failures — should NOT clobber try_again_later_retries
4131        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4132
4133        let meta = updated.metadata.unwrap();
4134        assert_eq!(
4135            meta.try_again_later_retries, 1,
4136            "try_again_later_retries must survive increment_status_check_failures"
4137        );
4138        assert_eq!(meta.consecutive_failures, 1);
4139        assert_eq!(meta.total_failures, 1);
4140    }
4141
4142    #[tokio::test]
4143    #[ignore = "Requires active Redis instance"]
4144    async fn test_increment_failures_preserves_insufficient_fee_retries() {
4145        let _lock = ENV_MUTEX.lock().await;
4146        let repo = setup_test_repo().await;
4147        let relayer_id = Uuid::new_v4().to_string();
4148        let tx_id = Uuid::new_v4().to_string();
4149        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4150        repo.create(tx).await.unwrap();
4151
4152        // Set insufficient_fee_retries = 1
4153        repo.record_stellar_insufficient_fee_retry(
4154            tx_id.clone(),
4155            "2025-03-18T10:00:00Z".to_string(),
4156        )
4157        .await
4158        .unwrap();
4159
4160        // Now increment failures — should NOT clobber insufficient_fee_retries
4161        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4162
4163        let meta = updated.metadata.unwrap();
4164        assert_eq!(
4165            meta.insufficient_fee_retries, 1,
4166            "insufficient_fee_retries must survive increment_status_check_failures"
4167        );
4168        assert_eq!(meta.consecutive_failures, 1);
4169    }
4170
4171    #[tokio::test]
4172    #[ignore = "Requires active Redis instance"]
4173    async fn test_reset_failures_preserves_retry_counters() {
4174        let _lock = ENV_MUTEX.lock().await;
4175        let repo = setup_test_repo().await;
4176        let relayer_id = Uuid::new_v4().to_string();
4177        let tx_id = Uuid::new_v4().to_string();
4178        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4179        repo.create(tx).await.unwrap();
4180
4181        // Set both retry counters
4182        repo.record_stellar_try_again_later_retry(
4183            tx_id.clone(),
4184            "2025-03-18T10:00:00Z".to_string(),
4185        )
4186        .await
4187        .unwrap();
4188        repo.record_stellar_insufficient_fee_retry(
4189            tx_id.clone(),
4190            "2025-03-18T10:01:00Z".to_string(),
4191        )
4192        .await
4193        .unwrap();
4194
4195        // Increment then reset consecutive failures
4196        repo.increment_status_check_failures(tx_id.clone())
4197            .await
4198            .unwrap();
4199        let updated = repo
4200            .reset_status_check_consecutive_failures(tx_id)
4201            .await
4202            .unwrap();
4203
4204        let meta = updated.metadata.unwrap();
4205        assert_eq!(meta.consecutive_failures, 0);
4206        assert_eq!(meta.total_failures, 1);
4207        assert_eq!(
4208            meta.try_again_later_retries, 1,
4209            "try_again_later_retries must survive reset"
4210        );
4211        assert_eq!(
4212            meta.insufficient_fee_retries, 1,
4213            "insufficient_fee_retries must survive reset"
4214        );
4215    }
4216
4217    #[tokio::test]
4218    #[ignore = "Requires active Redis instance"]
4219    async fn test_fee_and_try_again_later_retries_independent() {
4220        let _lock = ENV_MUTEX.lock().await;
4221        let repo = setup_test_repo().await;
4222        let relayer_id = Uuid::new_v4().to_string();
4223        let tx_id = Uuid::new_v4().to_string();
4224        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4225        repo.create(tx).await.unwrap();
4226
4227        // Set try_again_later_retries = 2
4228        repo.record_stellar_try_again_later_retry(
4229            tx_id.clone(),
4230            "2025-03-18T10:00:00Z".to_string(),
4231        )
4232        .await
4233        .unwrap();
4234        repo.record_stellar_try_again_later_retry(
4235            tx_id.clone(),
4236            "2025-03-18T10:01:00Z".to_string(),
4237        )
4238        .await
4239        .unwrap();
4240
4241        // Set insufficient_fee_retries = 1 — should NOT clobber try_again_later_retries
4242        let updated = repo
4243            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string())
4244            .await
4245            .unwrap();
4246
4247        let meta = updated.metadata.unwrap();
4248        assert_eq!(
4249            meta.try_again_later_retries, 2,
4250            "try_again_later_retries must survive insufficient_fee_retry"
4251        );
4252        assert_eq!(meta.insufficient_fee_retries, 1);
4253    }
4254}