openzeppelin_relayer/repositories/transaction/
mod.rs

1//! Transaction Repository Module
2//!
3//! This module provides the transaction repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract transaction data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete transactions
10//! - **Specialized Queries**: Find transactions by relayer ID, status, and nonce
11//! - **Pagination Support**: Efficient paginated listing of transactions
12//! - **Status Management**: Update transaction status and timestamps
13//! - **Partial Updates**: Support for partial transaction updates
14//! - **Network Data**: Manage transaction network-specific data
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryTransactionRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisTransactionRepository`]: Redis-backed storage for production environments
20//!
21mod transaction_in_memory;
22mod transaction_redis;
23
24pub use transaction_in_memory::*;
25pub use transaction_redis::*;
26
27use crate::{
28    models::{
29        NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
30    },
31    repositories::{BatchDeleteResult, TransactionDeleteRequest, *},
32    utils::RedisConnections,
33};
34use async_trait::async_trait;
35use eyre::Result;
36use std::sync::Arc;
37
38/// A trait defining transaction repository operations
39#[async_trait]
40pub trait TransactionRepository: Repository<TransactionRepoModel, String> {
41    /// Returns underlying storage Redis connections when available.
42    ///
43    /// In-memory implementations return `None`.
44    fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)> {
45        None
46    }
47
48    /// Find transactions by relayer ID with pagination
49    async fn find_by_relayer_id(
50        &self,
51        relayer_id: &str,
52        query: PaginationQuery,
53    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
54
55    /// Find transactions by relayer ID and status(es).
56    ///
57    /// Results are sorted by created_at descending (newest first).
58    async fn find_by_status(
59        &self,
60        relayer_id: &str,
61        statuses: &[TransactionStatus],
62    ) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
63
64    /// Find transactions by relayer ID and status(es) with pagination.
65    ///
66    /// Results are sorted by timestamp:
67    /// - For Confirmed transactions: sorted by confirmed_at (on-chain confirmation order)
68    /// - For all other statuses: sorted by created_at (queue/processing order)
69    ///
70    /// The `oldest_first` parameter controls sort direction:
71    /// - `false` (default): newest first (descending) - for displaying recent transactions
72    /// - `true`: oldest first (ascending) - for FIFO queue processing
73    ///
74    /// For multi-status queries, transactions are merged and sorted using the same rules,
75    /// ensuring consistent ordering across different statuses.
76    async fn find_by_status_paginated(
77        &self,
78        relayer_id: &str,
79        statuses: &[TransactionStatus],
80        query: PaginationQuery,
81        oldest_first: bool,
82    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
83
84    /// Find a transaction by relayer ID and nonce
85    async fn find_by_nonce(
86        &self,
87        relayer_id: &str,
88        nonce: u64,
89    ) -> Result<Option<TransactionRepoModel>, RepositoryError>;
90
91    /// Returns the transaction status for each nonce in `[from_nonce, to_nonce)`.
92    ///
93    /// For each nonce, returns `Some(status)` if a transaction exists at that slot,
94    /// or `None` if the slot is empty. Implementations should batch I/O where possible
95    /// (e.g., Redis MGET) to minimize round trips.
96    async fn get_nonce_occupancy(
97        &self,
98        relayer_id: &str,
99        from_nonce: u64,
100        to_nonce: u64,
101    ) -> Result<Vec<(u64, Option<TransactionStatus>)>, RepositoryError>;
102
103    /// Update the status of a transaction
104    async fn update_status(
105        &self,
106        tx_id: String,
107        status: TransactionStatus,
108    ) -> Result<TransactionRepoModel, RepositoryError>;
109
110    /// Partially update a transaction
111    async fn partial_update(
112        &self,
113        tx_id: String,
114        update: TransactionUpdateRequest,
115    ) -> Result<TransactionRepoModel, RepositoryError>;
116
117    /// Update the network data of a transaction
118    async fn update_network_data(
119        &self,
120        tx_id: String,
121        network_data: NetworkTransactionData,
122    ) -> Result<TransactionRepoModel, RepositoryError>;
123
124    /// Set the sent_at timestamp of a transaction
125    async fn set_sent_at(
126        &self,
127        tx_id: String,
128        sent_at: String,
129    ) -> Result<TransactionRepoModel, RepositoryError>;
130
131    /// Atomically increments status-check failure counters using the latest stored metadata.
132    async fn increment_status_check_failures(
133        &self,
134        tx_id: String,
135    ) -> Result<TransactionRepoModel, RepositoryError>;
136
137    /// Atomically resets consecutive status-check failures to zero while preserving other counters.
138    async fn reset_status_check_consecutive_failures(
139        &self,
140        tx_id: String,
141    ) -> Result<TransactionRepoModel, RepositoryError>;
142
143    /// Atomically sets `sent_at` and increments Stellar insufficient-fee retries.
144    async fn record_stellar_insufficient_fee_retry(
145        &self,
146        tx_id: String,
147        sent_at: String,
148    ) -> Result<TransactionRepoModel, RepositoryError>;
149
150    /// Atomically sets `sent_at` and increments Stellar try-again-later retries.
151    async fn record_stellar_try_again_later_retry(
152        &self,
153        tx_id: String,
154        sent_at: String,
155    ) -> Result<TransactionRepoModel, RepositoryError>;
156
157    /// Set the confirmed_at timestamp of a transaction
158    async fn set_confirmed_at(
159        &self,
160        tx_id: String,
161        confirmed_at: String,
162    ) -> Result<TransactionRepoModel, RepositoryError>;
163
164    /// Count transactions by status(es) without fetching full transaction data.
165    /// This is an optimized O(1) operation in Redis using ZCARD.
166    async fn count_by_status(
167        &self,
168        relayer_id: &str,
169        statuses: &[TransactionStatus],
170    ) -> Result<u64, RepositoryError>;
171
172    /// Delete multiple transactions by their IDs in a single batch operation.
173    ///
174    /// This is more efficient than calling `delete_by_id` multiple times as it
175    /// reduces the number of round-trips to the storage backend.
176    ///
177    /// Note: This method requires fetching transaction data first to clean up indexes.
178    /// If you already have transaction data, use `delete_by_requests` instead for
179    /// better performance.
180    ///
181    /// # Arguments
182    /// * `ids` - List of transaction IDs to delete
183    ///
184    /// # Returns
185    /// * `BatchDeleteResult` containing the count of successful deletions and any failures
186    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
187
188    /// Delete multiple transactions using pre-extracted data.
189    ///
190    /// This is the most efficient batch delete method as it doesn't require
191    /// re-fetching transaction data. Use this when you already have the transaction
192    /// data (e.g., from a previous query).
193    ///
194    /// # Arguments
195    /// * `requests` - List of delete requests containing transaction data needed for cleanup
196    ///
197    /// # Returns
198    /// * `BatchDeleteResult` containing the count of successful deletions and any failures
199    async fn delete_by_requests(
200        &self,
201        requests: Vec<TransactionDeleteRequest>,
202    ) -> Result<BatchDeleteResult, RepositoryError>;
203}
204
205#[cfg(test)]
206mockall::mock! {
207  pub TransactionRepository {}
208
209  #[async_trait]
210  impl Repository<TransactionRepoModel, String> for TransactionRepository {
211      async fn create(&self, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
212      async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError>;
213      async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
214      async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
215      async fn update(&self, id: String, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
216      async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
217      async fn count(&self) -> Result<usize, RepositoryError>;
218      async fn has_entries(&self) -> Result<bool, RepositoryError>;
219      async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
220  }
221
222  #[async_trait]
223  impl TransactionRepository for TransactionRepository {
224      fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)>;
225      async fn find_by_relayer_id(&self, relayer_id: &str, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
226      async fn find_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
227      async fn find_by_status_paginated(&self, relayer_id: &str, statuses: &[TransactionStatus], query: PaginationQuery, oldest_first: bool) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
228      async fn find_by_nonce(&self, relayer_id: &str, nonce: u64) -> Result<Option<TransactionRepoModel>, RepositoryError>;
229      async fn get_nonce_occupancy(&self, relayer_id: &str, from_nonce: u64, to_nonce: u64) -> Result<Vec<(u64, Option<TransactionStatus>)>, RepositoryError>;
230      async fn update_status(&self, tx_id: String, status: TransactionStatus) -> Result<TransactionRepoModel, RepositoryError>;
231      async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result<TransactionRepoModel, RepositoryError>;
232      async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result<TransactionRepoModel, RepositoryError>;
233      async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
234      async fn increment_status_check_failures(&self, tx_id: String) -> Result<TransactionRepoModel, RepositoryError>;
235      async fn reset_status_check_consecutive_failures(&self, tx_id: String) -> Result<TransactionRepoModel, RepositoryError>;
236      async fn record_stellar_insufficient_fee_retry(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
237      async fn record_stellar_try_again_later_retry(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
238      async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result<TransactionRepoModel, RepositoryError>;
239      async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<u64, RepositoryError>;
240      async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
241      async fn delete_by_requests(&self, requests: Vec<TransactionDeleteRequest>) -> Result<BatchDeleteResult, RepositoryError>;
242  }
243}
244
245/// Enum wrapper for different transaction repository implementations
246#[derive(Debug, Clone)]
247pub enum TransactionRepositoryStorage {
248    InMemory(InMemoryTransactionRepository),
249    Redis(RedisTransactionRepository),
250}
251
252impl TransactionRepositoryStorage {
253    pub fn new_in_memory() -> Self {
254        Self::InMemory(InMemoryTransactionRepository::new())
255    }
256    pub fn new_redis(
257        connections: Arc<RedisConnections>,
258        key_prefix: String,
259    ) -> Result<Self, RepositoryError> {
260        Ok(Self::Redis(RedisTransactionRepository::new(
261            connections,
262            key_prefix,
263        )?))
264    }
265
266    /// Returns underlying Redis connections if this is a persistent storage backend.
267    ///
268    /// This is useful for operations that need direct storage access, such as
269    /// distributed locking and health checks.
270    ///
271    /// # Returns
272    /// * `Some((connections, key_prefix))` - If using persistent Redis storage
273    /// * `None` - If using in-memory storage
274    pub fn connection_info(&self) -> Option<(Arc<RedisConnections>, &str)> {
275        match self {
276            TransactionRepositoryStorage::InMemory(_) => None,
277            TransactionRepositoryStorage::Redis(repo) => {
278                Some((repo.connections.clone(), &repo.key_prefix))
279            }
280        }
281    }
282
283    /// Returns key prefix used by persistent storage backends.
284    pub fn key_prefix(&self) -> Option<&str> {
285        match self {
286            TransactionRepositoryStorage::InMemory(_) => None,
287            TransactionRepositoryStorage::Redis(repo) => Some(&repo.key_prefix),
288        }
289    }
290}
291
292#[async_trait]
293impl TransactionRepository for TransactionRepositoryStorage {
294    fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)> {
295        TransactionRepositoryStorage::connection_info(self)
296            .map(|(connections, key_prefix)| (connections, key_prefix.to_string()))
297    }
298
299    async fn find_by_relayer_id(
300        &self,
301        relayer_id: &str,
302        query: PaginationQuery,
303    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
304        match self {
305            TransactionRepositoryStorage::InMemory(repo) => {
306                repo.find_by_relayer_id(relayer_id, query).await
307            }
308            TransactionRepositoryStorage::Redis(repo) => {
309                repo.find_by_relayer_id(relayer_id, query).await
310            }
311        }
312    }
313
314    async fn find_by_status(
315        &self,
316        relayer_id: &str,
317        statuses: &[TransactionStatus],
318    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
319        match self {
320            TransactionRepositoryStorage::InMemory(repo) => {
321                repo.find_by_status(relayer_id, statuses).await
322            }
323            TransactionRepositoryStorage::Redis(repo) => {
324                repo.find_by_status(relayer_id, statuses).await
325            }
326        }
327    }
328
329    async fn find_by_status_paginated(
330        &self,
331        relayer_id: &str,
332        statuses: &[TransactionStatus],
333        query: PaginationQuery,
334        oldest_first: bool,
335    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
336        match self {
337            TransactionRepositoryStorage::InMemory(repo) => {
338                repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
339                    .await
340            }
341            TransactionRepositoryStorage::Redis(repo) => {
342                repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
343                    .await
344            }
345        }
346    }
347
348    async fn find_by_nonce(
349        &self,
350        relayer_id: &str,
351        nonce: u64,
352    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
353        match self {
354            TransactionRepositoryStorage::InMemory(repo) => {
355                repo.find_by_nonce(relayer_id, nonce).await
356            }
357            TransactionRepositoryStorage::Redis(repo) => {
358                repo.find_by_nonce(relayer_id, nonce).await
359            }
360        }
361    }
362
363    async fn get_nonce_occupancy(
364        &self,
365        relayer_id: &str,
366        from_nonce: u64,
367        to_nonce: u64,
368    ) -> Result<Vec<(u64, Option<TransactionStatus>)>, RepositoryError> {
369        match self {
370            TransactionRepositoryStorage::InMemory(repo) => {
371                repo.get_nonce_occupancy(relayer_id, from_nonce, to_nonce)
372                    .await
373            }
374            TransactionRepositoryStorage::Redis(repo) => {
375                repo.get_nonce_occupancy(relayer_id, from_nonce, to_nonce)
376                    .await
377            }
378        }
379    }
380
381    async fn update_status(
382        &self,
383        tx_id: String,
384        status: TransactionStatus,
385    ) -> Result<TransactionRepoModel, RepositoryError> {
386        match self {
387            TransactionRepositoryStorage::InMemory(repo) => repo.update_status(tx_id, status).await,
388            TransactionRepositoryStorage::Redis(repo) => repo.update_status(tx_id, status).await,
389        }
390    }
391
392    async fn partial_update(
393        &self,
394        tx_id: String,
395        update: TransactionUpdateRequest,
396    ) -> Result<TransactionRepoModel, RepositoryError> {
397        match self {
398            TransactionRepositoryStorage::InMemory(repo) => {
399                repo.partial_update(tx_id, update).await
400            }
401            TransactionRepositoryStorage::Redis(repo) => repo.partial_update(tx_id, update).await,
402        }
403    }
404
405    async fn update_network_data(
406        &self,
407        tx_id: String,
408        network_data: NetworkTransactionData,
409    ) -> Result<TransactionRepoModel, RepositoryError> {
410        match self {
411            TransactionRepositoryStorage::InMemory(repo) => {
412                repo.update_network_data(tx_id, network_data).await
413            }
414            TransactionRepositoryStorage::Redis(repo) => {
415                repo.update_network_data(tx_id, network_data).await
416            }
417        }
418    }
419
420    async fn set_sent_at(
421        &self,
422        tx_id: String,
423        sent_at: String,
424    ) -> Result<TransactionRepoModel, RepositoryError> {
425        match self {
426            TransactionRepositoryStorage::InMemory(repo) => repo.set_sent_at(tx_id, sent_at).await,
427            TransactionRepositoryStorage::Redis(repo) => repo.set_sent_at(tx_id, sent_at).await,
428        }
429    }
430
431    async fn increment_status_check_failures(
432        &self,
433        tx_id: String,
434    ) -> Result<TransactionRepoModel, RepositoryError> {
435        match self {
436            TransactionRepositoryStorage::InMemory(repo) => {
437                repo.increment_status_check_failures(tx_id).await
438            }
439            TransactionRepositoryStorage::Redis(repo) => {
440                repo.increment_status_check_failures(tx_id).await
441            }
442        }
443    }
444
445    async fn reset_status_check_consecutive_failures(
446        &self,
447        tx_id: String,
448    ) -> Result<TransactionRepoModel, RepositoryError> {
449        match self {
450            TransactionRepositoryStorage::InMemory(repo) => {
451                repo.reset_status_check_consecutive_failures(tx_id).await
452            }
453            TransactionRepositoryStorage::Redis(repo) => {
454                repo.reset_status_check_consecutive_failures(tx_id).await
455            }
456        }
457    }
458
459    async fn record_stellar_insufficient_fee_retry(
460        &self,
461        tx_id: String,
462        sent_at: String,
463    ) -> Result<TransactionRepoModel, RepositoryError> {
464        match self {
465            TransactionRepositoryStorage::InMemory(repo) => {
466                repo.record_stellar_insufficient_fee_retry(tx_id, sent_at)
467                    .await
468            }
469            TransactionRepositoryStorage::Redis(repo) => {
470                repo.record_stellar_insufficient_fee_retry(tx_id, sent_at)
471                    .await
472            }
473        }
474    }
475
476    async fn record_stellar_try_again_later_retry(
477        &self,
478        tx_id: String,
479        sent_at: String,
480    ) -> Result<TransactionRepoModel, RepositoryError> {
481        match self {
482            TransactionRepositoryStorage::InMemory(repo) => {
483                repo.record_stellar_try_again_later_retry(tx_id, sent_at)
484                    .await
485            }
486            TransactionRepositoryStorage::Redis(repo) => {
487                repo.record_stellar_try_again_later_retry(tx_id, sent_at)
488                    .await
489            }
490        }
491    }
492
493    async fn set_confirmed_at(
494        &self,
495        tx_id: String,
496        confirmed_at: String,
497    ) -> Result<TransactionRepoModel, RepositoryError> {
498        match self {
499            TransactionRepositoryStorage::InMemory(repo) => {
500                repo.set_confirmed_at(tx_id, confirmed_at).await
501            }
502            TransactionRepositoryStorage::Redis(repo) => {
503                repo.set_confirmed_at(tx_id, confirmed_at).await
504            }
505        }
506    }
507
508    async fn count_by_status(
509        &self,
510        relayer_id: &str,
511        statuses: &[TransactionStatus],
512    ) -> Result<u64, RepositoryError> {
513        match self {
514            TransactionRepositoryStorage::InMemory(repo) => {
515                repo.count_by_status(relayer_id, statuses).await
516            }
517            TransactionRepositoryStorage::Redis(repo) => {
518                repo.count_by_status(relayer_id, statuses).await
519            }
520        }
521    }
522
523    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
524        match self {
525            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_ids(ids).await,
526            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_ids(ids).await,
527        }
528    }
529
530    async fn delete_by_requests(
531        &self,
532        requests: Vec<TransactionDeleteRequest>,
533    ) -> Result<BatchDeleteResult, RepositoryError> {
534        match self {
535            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_requests(requests).await,
536            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_requests(requests).await,
537        }
538    }
539}
540
541#[async_trait]
542impl Repository<TransactionRepoModel, String> for TransactionRepositoryStorage {
543    async fn create(
544        &self,
545        entity: TransactionRepoModel,
546    ) -> Result<TransactionRepoModel, RepositoryError> {
547        match self {
548            TransactionRepositoryStorage::InMemory(repo) => repo.create(entity).await,
549            TransactionRepositoryStorage::Redis(repo) => repo.create(entity).await,
550        }
551    }
552
553    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
554        match self {
555            TransactionRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
556            TransactionRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
557        }
558    }
559
560    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
561        match self {
562            TransactionRepositoryStorage::InMemory(repo) => repo.list_all().await,
563            TransactionRepositoryStorage::Redis(repo) => repo.list_all().await,
564        }
565    }
566
567    async fn list_paginated(
568        &self,
569        query: PaginationQuery,
570    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
571        match self {
572            TransactionRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
573            TransactionRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
574        }
575    }
576
577    async fn update(
578        &self,
579        id: String,
580        entity: TransactionRepoModel,
581    ) -> Result<TransactionRepoModel, RepositoryError> {
582        match self {
583            TransactionRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
584            TransactionRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
585        }
586    }
587
588    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
589        match self {
590            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
591            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
592        }
593    }
594
595    async fn count(&self) -> Result<usize, RepositoryError> {
596        match self {
597            TransactionRepositoryStorage::InMemory(repo) => repo.count().await,
598            TransactionRepositoryStorage::Redis(repo) => repo.count().await,
599        }
600    }
601
602    async fn has_entries(&self) -> Result<bool, RepositoryError> {
603        match self {
604            TransactionRepositoryStorage::InMemory(repo) => repo.has_entries().await,
605            TransactionRepositoryStorage::Redis(repo) => repo.has_entries().await,
606        }
607    }
608
609    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
610        match self {
611            TransactionRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
612            TransactionRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
613        }
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use chrono::Utc;
620    use color_eyre::Result;
621    use deadpool_redis::{Config, Runtime};
622
623    use super::*;
624    use crate::models::{
625        EvmTransactionData, NetworkTransactionData, TransactionStatus, TransactionUpdateRequest,
626    };
627    use crate::repositories::PaginationQuery;
628    use crate::utils::mocks::mockutils::create_mock_transaction;
629
630    fn create_test_transaction(id: &str, relayer_id: &str) -> TransactionRepoModel {
631        let mut transaction = create_mock_transaction();
632        transaction.id = id.to_string();
633        transaction.relayer_id = relayer_id.to_string();
634        transaction
635    }
636
637    fn create_test_transaction_with_status(
638        id: &str,
639        relayer_id: &str,
640        status: TransactionStatus,
641    ) -> TransactionRepoModel {
642        let mut transaction = create_test_transaction(id, relayer_id);
643        transaction.status = status;
644        transaction
645    }
646
647    fn create_test_transaction_with_nonce(
648        id: &str,
649        relayer_id: &str,
650        nonce: u64,
651    ) -> TransactionRepoModel {
652        let mut transaction = create_test_transaction(id, relayer_id);
653        if let NetworkTransactionData::Evm(ref mut evm_data) = transaction.network_data {
654            evm_data.nonce = Some(nonce);
655        }
656        transaction
657    }
658
659    fn create_test_update_request() -> TransactionUpdateRequest {
660        TransactionUpdateRequest {
661            status: Some(TransactionStatus::Sent),
662            status_reason: Some("Test reason".to_string()),
663            sent_at: Some(Utc::now().to_string()),
664            confirmed_at: None,
665            network_data: None,
666            priced_at: None,
667            hashes: Some(vec!["test_hash".to_string()]),
668            noop_count: None,
669            is_canceled: None,
670            delete_at: None,
671            metadata: None,
672        }
673    }
674
675    #[tokio::test]
676    async fn test_new_in_memory() {
677        let storage = TransactionRepositoryStorage::new_in_memory();
678
679        match storage {
680            TransactionRepositoryStorage::InMemory(_) => {
681                // Success - verify it's the InMemory variant
682            }
683            TransactionRepositoryStorage::Redis(_) => {
684                panic!("Expected InMemory variant, got Redis");
685            }
686        }
687    }
688
689    #[tokio::test]
690    async fn test_connection_info_returns_none_for_in_memory() {
691        let storage = TransactionRepositoryStorage::new_in_memory();
692
693        // In-memory storage should return None for connection_info
694        assert!(storage.connection_info().is_none());
695    }
696
697    #[tokio::test]
698    #[ignore = "Requires active Redis instance"]
699    async fn test_connection_info_returns_some_for_redis() -> Result<()> {
700        let redis_url = std::env::var("REDIS_TEST_URL")
701            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
702        let cfg = Config::from_url(&redis_url);
703        let pool = Arc::new(
704            cfg.builder()
705                .map_err(|e| eyre::eyre!("Failed to create Redis pool builder: {}", e))?
706                .max_size(16)
707                .runtime(Runtime::Tokio1)
708                .build()
709                .map_err(|e| eyre::eyre!("Failed to build Redis pool: {}", e))?,
710        );
711        let connections = Arc::new(RedisConnections::new_single_pool(pool.clone()));
712        let key_prefix = "test_prefix".to_string();
713
714        let storage = TransactionRepositoryStorage::new_redis(connections, key_prefix.clone())?;
715
716        let (returned_connection, returned_prefix) = storage
717            .connection_info()
718            .expect("Expected Redis connection info");
719
720        assert!(Arc::ptr_eq(&pool, returned_connection.primary()));
721        assert_eq!(returned_prefix, key_prefix);
722
723        Ok(())
724    }
725
726    #[tokio::test]
727    async fn test_create_in_memory() -> Result<()> {
728        let storage = TransactionRepositoryStorage::new_in_memory();
729        let transaction = create_test_transaction("test-tx", "test-relayer");
730
731        let created = storage.create(transaction.clone()).await?;
732        assert_eq!(created.id, transaction.id);
733        assert_eq!(created.relayer_id, transaction.relayer_id);
734        assert_eq!(created.status, transaction.status);
735
736        Ok(())
737    }
738
739    #[tokio::test]
740    async fn test_get_by_id_in_memory() -> Result<()> {
741        let storage = TransactionRepositoryStorage::new_in_memory();
742        let transaction = create_test_transaction("test-tx", "test-relayer");
743
744        // Create transaction first
745        storage.create(transaction.clone()).await?;
746
747        // Get by ID
748        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
749        assert_eq!(retrieved.id, transaction.id);
750        assert_eq!(retrieved.relayer_id, transaction.relayer_id);
751        assert_eq!(retrieved.status, transaction.status);
752
753        Ok(())
754    }
755
756    #[tokio::test]
757    async fn test_get_by_id_not_found_in_memory() -> Result<()> {
758        let storage = TransactionRepositoryStorage::new_in_memory();
759
760        let result = storage.get_by_id("non-existent".to_string()).await;
761        assert!(result.is_err());
762
763        Ok(())
764    }
765
766    #[tokio::test]
767    async fn test_list_all_in_memory() -> Result<()> {
768        let storage = TransactionRepositoryStorage::new_in_memory();
769
770        // Initially empty
771        let transactions = storage.list_all().await?;
772        assert!(transactions.is_empty());
773
774        // Add transactions
775        let tx1 = create_test_transaction("tx-1", "relayer-1");
776        let tx2 = create_test_transaction("tx-2", "relayer-2");
777
778        storage.create(tx1.clone()).await?;
779        storage.create(tx2.clone()).await?;
780
781        let all_transactions = storage.list_all().await?;
782        assert_eq!(all_transactions.len(), 2);
783
784        let ids: Vec<&str> = all_transactions.iter().map(|t| t.id.as_str()).collect();
785        assert!(ids.contains(&"tx-1"));
786        assert!(ids.contains(&"tx-2"));
787
788        Ok(())
789    }
790
791    #[tokio::test]
792    async fn test_list_paginated_in_memory() -> Result<()> {
793        let storage = TransactionRepositoryStorage::new_in_memory();
794
795        // Add test transactions
796        for i in 1..=5 {
797            let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
798            storage.create(tx).await?;
799        }
800
801        // Test pagination
802        let query = PaginationQuery {
803            page: 1,
804            per_page: 2,
805        };
806        let page = storage.list_paginated(query).await?;
807
808        assert_eq!(page.items.len(), 2);
809        assert_eq!(page.total, 5);
810        assert_eq!(page.page, 1);
811        assert_eq!(page.per_page, 2);
812
813        // Test second page
814        let query2 = PaginationQuery {
815            page: 2,
816            per_page: 2,
817        };
818        let page2 = storage.list_paginated(query2).await?;
819
820        assert_eq!(page2.items.len(), 2);
821        assert_eq!(page2.total, 5);
822        assert_eq!(page2.page, 2);
823        assert_eq!(page2.per_page, 2);
824
825        Ok(())
826    }
827
828    #[tokio::test]
829    async fn test_update_in_memory() -> Result<()> {
830        let storage = TransactionRepositoryStorage::new_in_memory();
831        let transaction = create_test_transaction("test-tx", "test-relayer");
832
833        // Create transaction first
834        storage.create(transaction.clone()).await?;
835
836        // Update it
837        let mut updated_transaction = transaction.clone();
838        updated_transaction.status = TransactionStatus::Sent;
839        updated_transaction.status_reason = Some("Updated reason".to_string());
840
841        let result = storage
842            .update("test-tx".to_string(), updated_transaction.clone())
843            .await?;
844        assert_eq!(result.id, "test-tx");
845        assert_eq!(result.status, TransactionStatus::Sent);
846        assert_eq!(result.status_reason, Some("Updated reason".to_string()));
847
848        // Verify the update persisted
849        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
850        assert_eq!(retrieved.status, TransactionStatus::Sent);
851        assert_eq!(retrieved.status_reason, Some("Updated reason".to_string()));
852
853        Ok(())
854    }
855
856    #[tokio::test]
857    async fn test_update_not_found_in_memory() -> Result<()> {
858        let storage = TransactionRepositoryStorage::new_in_memory();
859        let transaction = create_test_transaction("non-existent", "test-relayer");
860
861        let result = storage
862            .update("non-existent".to_string(), transaction)
863            .await;
864        assert!(result.is_err());
865
866        Ok(())
867    }
868
869    #[tokio::test]
870    async fn test_delete_by_id_in_memory() -> Result<()> {
871        let storage = TransactionRepositoryStorage::new_in_memory();
872        let transaction = create_test_transaction("test-tx", "test-relayer");
873
874        // Create transaction first
875        storage.create(transaction.clone()).await?;
876
877        // Verify it exists
878        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
879        assert_eq!(retrieved.id, "test-tx");
880
881        // Delete it
882        storage.delete_by_id("test-tx".to_string()).await?;
883
884        // Verify it's gone
885        let result = storage.get_by_id("test-tx".to_string()).await;
886        assert!(result.is_err());
887
888        Ok(())
889    }
890
891    #[tokio::test]
892    async fn test_delete_by_id_not_found_in_memory() -> Result<()> {
893        let storage = TransactionRepositoryStorage::new_in_memory();
894
895        let result = storage.delete_by_id("non-existent".to_string()).await;
896        assert!(result.is_err());
897
898        Ok(())
899    }
900
901    #[tokio::test]
902    async fn test_count_in_memory() -> Result<()> {
903        let storage = TransactionRepositoryStorage::new_in_memory();
904
905        // Initially empty
906        let count = storage.count().await?;
907        assert_eq!(count, 0);
908
909        // Add transactions
910        let tx1 = create_test_transaction("tx-1", "relayer-1");
911        let tx2 = create_test_transaction("tx-2", "relayer-2");
912
913        storage.create(tx1).await?;
914        let count_after_one = storage.count().await?;
915        assert_eq!(count_after_one, 1);
916
917        storage.create(tx2).await?;
918        let count_after_two = storage.count().await?;
919        assert_eq!(count_after_two, 2);
920
921        // Delete one
922        storage.delete_by_id("tx-1".to_string()).await?;
923        let count_after_delete = storage.count().await?;
924        assert_eq!(count_after_delete, 1);
925
926        Ok(())
927    }
928
929    #[tokio::test]
930    async fn test_has_entries_in_memory() -> Result<()> {
931        let storage = TransactionRepositoryStorage::new_in_memory();
932
933        // Initially empty
934        let has_entries = storage.has_entries().await?;
935        assert!(!has_entries);
936
937        // Add transaction
938        let transaction = create_test_transaction("test-tx", "test-relayer");
939        storage.create(transaction).await?;
940
941        let has_entries_after_create = storage.has_entries().await?;
942        assert!(has_entries_after_create);
943
944        // Delete transaction
945        storage.delete_by_id("test-tx".to_string()).await?;
946
947        let has_entries_after_delete = storage.has_entries().await?;
948        assert!(!has_entries_after_delete);
949
950        Ok(())
951    }
952
953    #[tokio::test]
954    async fn test_drop_all_entries_in_memory() -> Result<()> {
955        let storage = TransactionRepositoryStorage::new_in_memory();
956
957        // Add multiple transactions
958        for i in 1..=5 {
959            let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
960            storage.create(tx).await?;
961        }
962
963        // Verify they exist
964        let count_before = storage.count().await?;
965        assert_eq!(count_before, 5);
966
967        let has_entries_before = storage.has_entries().await?;
968        assert!(has_entries_before);
969
970        // Drop all entries
971        storage.drop_all_entries().await?;
972
973        // Verify they're gone
974        let count_after = storage.count().await?;
975        assert_eq!(count_after, 0);
976
977        let has_entries_after = storage.has_entries().await?;
978        assert!(!has_entries_after);
979
980        let all_transactions = storage.list_all().await?;
981        assert!(all_transactions.is_empty());
982
983        Ok(())
984    }
985
986    #[tokio::test]
987    async fn test_find_by_relayer_id_in_memory() -> Result<()> {
988        let storage = TransactionRepositoryStorage::new_in_memory();
989
990        // Add transactions for different relayers
991        let tx1 = create_test_transaction("tx-1", "relayer-1");
992        let tx2 = create_test_transaction("tx-2", "relayer-1");
993        let tx3 = create_test_transaction("tx-3", "relayer-2");
994
995        storage.create(tx1).await?;
996        storage.create(tx2).await?;
997        storage.create(tx3).await?;
998
999        // Find by relayer ID
1000        let query = PaginationQuery {
1001            page: 1,
1002            per_page: 10,
1003        };
1004        let result = storage.find_by_relayer_id("relayer-1", query).await?;
1005
1006        assert_eq!(result.items.len(), 2);
1007        assert_eq!(result.total, 2);
1008
1009        // Verify all transactions belong to relayer-1
1010        for tx in result.items {
1011            assert_eq!(tx.relayer_id, "relayer-1");
1012        }
1013
1014        Ok(())
1015    }
1016
1017    #[tokio::test]
1018    async fn test_find_by_status_in_memory() -> Result<()> {
1019        let storage = TransactionRepositoryStorage::new_in_memory();
1020
1021        // Add transactions with different statuses
1022        let tx1 =
1023            create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
1024        let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
1025        let tx3 =
1026            create_test_transaction_with_status("tx-3", "relayer-1", TransactionStatus::Pending);
1027        let tx4 =
1028            create_test_transaction_with_status("tx-4", "relayer-2", TransactionStatus::Pending);
1029
1030        storage.create(tx1).await?;
1031        storage.create(tx2).await?;
1032        storage.create(tx3).await?;
1033        storage.create(tx4).await?;
1034
1035        // Find by status
1036        let statuses = vec![TransactionStatus::Pending];
1037        let result = storage.find_by_status("relayer-1", &statuses).await?;
1038
1039        assert_eq!(result.len(), 2);
1040
1041        // Verify all transactions have Pending status and belong to relayer-1
1042        for tx in result {
1043            assert_eq!(tx.status, TransactionStatus::Pending);
1044            assert_eq!(tx.relayer_id, "relayer-1");
1045        }
1046
1047        Ok(())
1048    }
1049
1050    #[tokio::test]
1051    async fn test_find_by_nonce_in_memory() -> Result<()> {
1052        let storage = TransactionRepositoryStorage::new_in_memory();
1053
1054        // Add transactions with different nonces
1055        let tx1 = create_test_transaction_with_nonce("tx-1", "relayer-1", 10);
1056        let tx2 = create_test_transaction_with_nonce("tx-2", "relayer-1", 20);
1057        let tx3 = create_test_transaction_with_nonce("tx-3", "relayer-2", 10);
1058
1059        storage.create(tx1).await?;
1060        storage.create(tx2).await?;
1061        storage.create(tx3).await?;
1062
1063        // Find by nonce
1064        let result = storage.find_by_nonce("relayer-1", 10).await?;
1065
1066        assert!(result.is_some());
1067        let found_tx = result.unwrap();
1068        assert_eq!(found_tx.id, "tx-1");
1069        assert_eq!(found_tx.relayer_id, "relayer-1");
1070
1071        // Check EVM nonce
1072        if let NetworkTransactionData::Evm(evm_data) = found_tx.network_data {
1073            assert_eq!(evm_data.nonce, Some(10));
1074        }
1075
1076        // Test not found
1077        let not_found = storage.find_by_nonce("relayer-1", 99).await?;
1078        assert!(not_found.is_none());
1079
1080        Ok(())
1081    }
1082
1083    #[tokio::test]
1084    async fn test_update_status_in_memory() -> Result<()> {
1085        let storage = TransactionRepositoryStorage::new_in_memory();
1086        let transaction = create_test_transaction("test-tx", "test-relayer");
1087
1088        // Create transaction first
1089        storage.create(transaction).await?;
1090
1091        // Update status
1092        let updated = storage
1093            .update_status("test-tx".to_string(), TransactionStatus::Sent)
1094            .await?;
1095
1096        assert_eq!(updated.id, "test-tx");
1097        assert_eq!(updated.status, TransactionStatus::Sent);
1098
1099        // Verify the update persisted
1100        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
1101        assert_eq!(retrieved.status, TransactionStatus::Sent);
1102
1103        Ok(())
1104    }
1105
1106    #[tokio::test]
1107    async fn test_partial_update_in_memory() -> Result<()> {
1108        let storage = TransactionRepositoryStorage::new_in_memory();
1109        let transaction = create_test_transaction("test-tx", "test-relayer");
1110
1111        // Create transaction first
1112        storage.create(transaction).await?;
1113
1114        // Partial update
1115        let update_request = create_test_update_request();
1116        let updated = storage
1117            .partial_update("test-tx".to_string(), update_request)
1118            .await?;
1119
1120        assert_eq!(updated.id, "test-tx");
1121        assert_eq!(updated.status, TransactionStatus::Sent);
1122        assert_eq!(updated.status_reason, Some("Test reason".to_string()));
1123        assert!(updated.sent_at.is_some());
1124        assert_eq!(updated.hashes, vec!["test_hash".to_string()]);
1125
1126        Ok(())
1127    }
1128
1129    #[tokio::test]
1130    async fn test_update_network_data_in_memory() -> Result<()> {
1131        let storage = TransactionRepositoryStorage::new_in_memory();
1132        let transaction = create_test_transaction("test-tx", "test-relayer");
1133
1134        // Create transaction first
1135        storage.create(transaction).await?;
1136
1137        // Update network data
1138        let new_evm_data = EvmTransactionData {
1139            nonce: Some(42),
1140            gas_limit: Some(21000),
1141            ..Default::default()
1142        };
1143        let new_network_data = NetworkTransactionData::Evm(new_evm_data);
1144
1145        let updated = storage
1146            .update_network_data("test-tx".to_string(), new_network_data)
1147            .await?;
1148
1149        assert_eq!(updated.id, "test-tx");
1150        if let NetworkTransactionData::Evm(evm_data) = updated.network_data {
1151            assert_eq!(evm_data.nonce, Some(42));
1152            assert_eq!(evm_data.gas_limit, Some(21000));
1153        } else {
1154            panic!("Expected EVM network data");
1155        }
1156
1157        Ok(())
1158    }
1159
1160    #[tokio::test]
1161    async fn test_set_sent_at_in_memory() -> Result<()> {
1162        let storage = TransactionRepositoryStorage::new_in_memory();
1163        let transaction = create_test_transaction("test-tx", "test-relayer");
1164
1165        // Create transaction first
1166        storage.create(transaction).await?;
1167
1168        // Set sent_at
1169        let sent_at = Utc::now().to_string();
1170        let updated = storage
1171            .set_sent_at("test-tx".to_string(), sent_at.clone())
1172            .await?;
1173
1174        assert_eq!(updated.id, "test-tx");
1175        assert_eq!(updated.sent_at, Some(sent_at));
1176
1177        Ok(())
1178    }
1179
1180    #[tokio::test]
1181    async fn test_set_confirmed_at_in_memory() -> Result<()> {
1182        let storage = TransactionRepositoryStorage::new_in_memory();
1183        let transaction = create_test_transaction("test-tx", "test-relayer");
1184
1185        // Create transaction first
1186        storage.create(transaction).await?;
1187
1188        // Set confirmed_at
1189        let confirmed_at = Utc::now().to_string();
1190        let updated = storage
1191            .set_confirmed_at("test-tx".to_string(), confirmed_at.clone())
1192            .await?;
1193
1194        assert_eq!(updated.id, "test-tx");
1195        assert_eq!(updated.confirmed_at, Some(confirmed_at));
1196
1197        Ok(())
1198    }
1199
1200    #[tokio::test]
1201    async fn test_create_duplicate_id_in_memory() -> Result<()> {
1202        let storage = TransactionRepositoryStorage::new_in_memory();
1203        let transaction = create_test_transaction("duplicate-id", "test-relayer");
1204
1205        // Create first transaction
1206        storage.create(transaction.clone()).await?;
1207
1208        // Try to create another with same ID - should fail
1209        let result = storage.create(transaction.clone()).await;
1210        assert!(result.is_err());
1211
1212        Ok(())
1213    }
1214
1215    #[tokio::test]
1216    async fn test_workflow_in_memory() -> Result<()> {
1217        let storage = TransactionRepositoryStorage::new_in_memory();
1218
1219        // 1. Start with empty storage
1220        assert!(!storage.has_entries().await?);
1221        assert_eq!(storage.count().await?, 0);
1222
1223        // 2. Create transaction
1224        let transaction = create_test_transaction("workflow-test", "test-relayer");
1225        let created = storage.create(transaction.clone()).await?;
1226        assert_eq!(created.id, "workflow-test");
1227
1228        // 3. Verify it exists
1229        assert!(storage.has_entries().await?);
1230        assert_eq!(storage.count().await?, 1);
1231
1232        // 4. Retrieve it
1233        let retrieved = storage.get_by_id("workflow-test".to_string()).await?;
1234        assert_eq!(retrieved.id, "workflow-test");
1235
1236        // 5. Update status
1237        let updated = storage
1238            .update_status("workflow-test".to_string(), TransactionStatus::Sent)
1239            .await?;
1240        assert_eq!(updated.status, TransactionStatus::Sent);
1241
1242        // 6. Verify update
1243        let retrieved_updated = storage.get_by_id("workflow-test".to_string()).await?;
1244        assert_eq!(retrieved_updated.status, TransactionStatus::Sent);
1245
1246        // 7. Delete it
1247        storage.delete_by_id("workflow-test".to_string()).await?;
1248
1249        // 8. Verify it's gone
1250        assert!(!storage.has_entries().await?);
1251        assert_eq!(storage.count().await?, 0);
1252
1253        let result = storage.get_by_id("workflow-test".to_string()).await;
1254        assert!(result.is_err());
1255
1256        Ok(())
1257    }
1258
1259    #[tokio::test]
1260    async fn test_multiple_relayers_workflow() -> Result<()> {
1261        let storage = TransactionRepositoryStorage::new_in_memory();
1262
1263        // Add transactions for multiple relayers
1264        let tx1 =
1265            create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
1266        let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
1267        let tx3 =
1268            create_test_transaction_with_status("tx-3", "relayer-2", TransactionStatus::Pending);
1269
1270        storage.create(tx1).await?;
1271        storage.create(tx2).await?;
1272        storage.create(tx3).await?;
1273
1274        // Test find_by_relayer_id
1275        let query = PaginationQuery {
1276            page: 1,
1277            per_page: 10,
1278        };
1279        let relayer1_txs = storage.find_by_relayer_id("relayer-1", query).await?;
1280        assert_eq!(relayer1_txs.items.len(), 2);
1281
1282        // Test find_by_status
1283        let pending_txs = storage
1284            .find_by_status("relayer-1", &[TransactionStatus::Pending])
1285            .await?;
1286        assert_eq!(pending_txs.len(), 1);
1287        assert_eq!(pending_txs[0].id, "tx-1");
1288
1289        // Test count remains accurate
1290        assert_eq!(storage.count().await?, 3);
1291
1292        Ok(())
1293    }
1294
1295    #[tokio::test]
1296    async fn test_pagination_edge_cases_in_memory() -> Result<()> {
1297        let storage = TransactionRepositoryStorage::new_in_memory();
1298
1299        // Test pagination with empty storage
1300        let query = PaginationQuery {
1301            page: 1,
1302            per_page: 10,
1303        };
1304        let page = storage.list_paginated(query).await?;
1305        assert_eq!(page.items.len(), 0);
1306        assert_eq!(page.total, 0);
1307        assert_eq!(page.page, 1);
1308        assert_eq!(page.per_page, 10);
1309
1310        // Add one transaction
1311        let transaction = create_test_transaction("single-tx", "test-relayer");
1312        storage.create(transaction).await?;
1313
1314        // Test pagination with single item
1315        let query = PaginationQuery {
1316            page: 1,
1317            per_page: 10,
1318        };
1319        let page = storage.list_paginated(query).await?;
1320        assert_eq!(page.items.len(), 1);
1321        assert_eq!(page.total, 1);
1322        assert_eq!(page.page, 1);
1323        assert_eq!(page.per_page, 10);
1324
1325        // Test pagination with page beyond total
1326        let query = PaginationQuery {
1327            page: 3,
1328            per_page: 10,
1329        };
1330        let page = storage.list_paginated(query).await?;
1331        assert_eq!(page.items.len(), 0);
1332        assert_eq!(page.total, 1);
1333        assert_eq!(page.page, 3);
1334        assert_eq!(page.per_page, 10);
1335
1336        Ok(())
1337    }
1338
1339    #[tokio::test]
1340    async fn test_find_by_relayer_id_pagination() -> Result<()> {
1341        let storage = TransactionRepositoryStorage::new_in_memory();
1342
1343        // Add many transactions for one relayer
1344        for i in 1..=10 {
1345            let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
1346            storage.create(tx).await?;
1347        }
1348
1349        // Test first page
1350        let query = PaginationQuery {
1351            page: 1,
1352            per_page: 3,
1353        };
1354        let page1 = storage.find_by_relayer_id("test-relayer", query).await?;
1355        assert_eq!(page1.items.len(), 3);
1356        assert_eq!(page1.total, 10);
1357        assert_eq!(page1.page, 1);
1358        assert_eq!(page1.per_page, 3);
1359
1360        // Test second page
1361        let query = PaginationQuery {
1362            page: 2,
1363            per_page: 3,
1364        };
1365        let page2 = storage.find_by_relayer_id("test-relayer", query).await?;
1366        assert_eq!(page2.items.len(), 3);
1367        assert_eq!(page2.total, 10);
1368        assert_eq!(page2.page, 2);
1369        assert_eq!(page2.per_page, 3);
1370
1371        Ok(())
1372    }
1373
1374    #[tokio::test]
1375    async fn test_find_by_multiple_statuses() -> Result<()> {
1376        let storage = TransactionRepositoryStorage::new_in_memory();
1377
1378        // Add transactions with different statuses
1379        let tx1 =
1380            create_test_transaction_with_status("tx-1", "test-relayer", TransactionStatus::Pending);
1381        let tx2 =
1382            create_test_transaction_with_status("tx-2", "test-relayer", TransactionStatus::Sent);
1383        let tx3 = create_test_transaction_with_status(
1384            "tx-3",
1385            "test-relayer",
1386            TransactionStatus::Confirmed,
1387        );
1388        let tx4 =
1389            create_test_transaction_with_status("tx-4", "test-relayer", TransactionStatus::Failed);
1390
1391        storage.create(tx1).await?;
1392        storage.create(tx2).await?;
1393        storage.create(tx3).await?;
1394        storage.create(tx4).await?;
1395
1396        // Find by multiple statuses
1397        let statuses = vec![TransactionStatus::Pending, TransactionStatus::Sent];
1398        let result = storage.find_by_status("test-relayer", &statuses).await?;
1399
1400        assert_eq!(result.len(), 2);
1401
1402        // Verify all transactions have the correct statuses
1403        let found_statuses: Vec<TransactionStatus> =
1404            result.iter().map(|tx| tx.status.clone()).collect();
1405        assert!(found_statuses.contains(&TransactionStatus::Pending));
1406        assert!(found_statuses.contains(&TransactionStatus::Sent));
1407
1408        Ok(())
1409    }
1410
1411    #[tokio::test]
1412    async fn test_record_stellar_try_again_later_retry_in_memory() -> Result<()> {
1413        let storage = TransactionRepositoryStorage::new_in_memory();
1414        let mut transaction = create_test_transaction("test-tx", "test-relayer");
1415        transaction.status = TransactionStatus::Sent;
1416        storage.create(transaction).await?;
1417
1418        let sent_at = "2025-03-18T10:00:00Z".to_string();
1419        let updated = storage
1420            .record_stellar_try_again_later_retry("test-tx".to_string(), sent_at.clone())
1421            .await?;
1422
1423        assert_eq!(updated.id, "test-tx");
1424        assert_eq!(updated.sent_at, Some(sent_at));
1425        let meta = updated.metadata.expect("metadata should be set");
1426        assert_eq!(meta.try_again_later_retries, 1);
1427        assert_eq!(meta.consecutive_failures, 0);
1428        assert_eq!(meta.total_failures, 0);
1429        assert_eq!(meta.insufficient_fee_retries, 0);
1430
1431        Ok(())
1432    }
1433
1434    #[tokio::test]
1435    async fn test_record_stellar_try_again_later_retry_accumulates_in_memory() -> Result<()> {
1436        let storage = TransactionRepositoryStorage::new_in_memory();
1437        let mut transaction = create_test_transaction("test-tx", "test-relayer");
1438        transaction.status = TransactionStatus::Sent;
1439        storage.create(transaction).await?;
1440
1441        storage
1442            .record_stellar_try_again_later_retry(
1443                "test-tx".to_string(),
1444                "2025-03-18T10:00:00Z".to_string(),
1445            )
1446            .await?;
1447
1448        let updated = storage
1449            .record_stellar_try_again_later_retry(
1450                "test-tx".to_string(),
1451                "2025-03-18T10:01:00Z".to_string(),
1452            )
1453            .await?;
1454
1455        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
1456        let meta = updated.metadata.unwrap();
1457        assert_eq!(meta.try_again_later_retries, 2);
1458
1459        Ok(())
1460    }
1461
1462    #[tokio::test]
1463    async fn test_record_stellar_try_again_later_retry_noop_on_final_state_in_memory() -> Result<()>
1464    {
1465        let storage = TransactionRepositoryStorage::new_in_memory();
1466        let mut transaction = create_test_transaction("test-tx", "test-relayer");
1467        transaction.status = TransactionStatus::Confirmed;
1468        transaction.sent_at = Some("old-time".to_string());
1469        storage.create(transaction).await?;
1470
1471        let result = storage
1472            .record_stellar_try_again_later_retry("test-tx".to_string(), "new-time".to_string())
1473            .await?;
1474
1475        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
1476        assert!(result.metadata.is_none());
1477
1478        Ok(())
1479    }
1480
1481    #[tokio::test]
1482    async fn test_record_stellar_try_again_later_retry_not_found_in_memory() -> Result<()> {
1483        let storage = TransactionRepositoryStorage::new_in_memory();
1484
1485        let result = storage
1486            .record_stellar_try_again_later_retry(
1487                "nonexistent".to_string(),
1488                "2025-03-18T10:00:00Z".to_string(),
1489            )
1490            .await;
1491
1492        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1493
1494        Ok(())
1495    }
1496}