1use std::sync::Arc;
28
29use crate::{
30 constants::{
31 transactions::PENDING_TRANSACTION_STATUSES, EVM_SMALLEST_UNIT_NAME,
32 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
33 },
34 domain::{
35 relayer::{Relayer, RelayerError},
36 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
37 SignTransactionRequest, SignTypedDataRequest,
38 },
39 jobs::{
40 JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionSend,
41 TransactionStatusCheck,
42 },
43 models::{
44 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
45 EvmNetwork, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel,
46 NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest, NetworkType,
47 PaginationQuery, RelayerRepoModel, RelayerStatus, RepositoryError, RpcErrorCodes,
48 TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
49 },
50 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
51 services::{
52 provider::{EvmProvider, EvmProviderTrait},
53 signer::{DataSignerTrait, EvmSigner},
54 TransactionCounterService, TransactionCounterServiceTrait,
55 },
56 utils::calculate_scheduled_timestamp,
57};
58use async_trait::async_trait;
59use eyre::Result;
60use tracing::{debug, error, info, instrument, warn};
61
62use super::{create_error_response, create_success_response, EvmTransactionValidator};
63use crate::utils::{map_provider_error, sanitize_error_description};
64
65#[allow(dead_code)]
66pub struct EvmRelayer<P, RR, NR, TR, J, S, TCS>
67where
68 P: EvmProviderTrait + Send + Sync,
69 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
72 J: JobProducerTrait + Send + Sync + 'static,
73 S: DataSignerTrait + Send + Sync + 'static,
74{
75 pub(super) relayer: RelayerRepoModel,
76 pub(super) signer: S,
77 pub(super) network: EvmNetwork,
78 pub(super) provider: P,
79 pub(super) relayer_repository: Arc<RR>,
80 pub(super) network_repository: Arc<NR>,
81 pub(super) transaction_repository: Arc<TR>,
82 pub(super) job_producer: Arc<J>,
83 pub(super) transaction_counter_service: Arc<TCS>,
84}
85
86#[allow(clippy::too_many_arguments)]
87impl<P, RR, NR, TR, J, S, TCS> EvmRelayer<P, RR, NR, TR, J, S, TCS>
88where
89 P: EvmProviderTrait + Send + Sync,
90 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
91 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
92 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
93 J: JobProducerTrait + Send + Sync + 'static,
94 S: DataSignerTrait + Send + Sync + 'static,
95 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
96{
97 pub fn new(
114 relayer: RelayerRepoModel,
115 signer: S,
116 provider: P,
117 network: EvmNetwork,
118 relayer_repository: Arc<RR>,
119 network_repository: Arc<NR>,
120 transaction_repository: Arc<TR>,
121 transaction_counter_service: Arc<TCS>,
122 job_producer: Arc<J>,
123 ) -> Result<Self, RelayerError> {
124 Ok(Self {
125 relayer,
126 signer,
127 network,
128 provider,
129 relayer_repository,
130 network_repository,
131 transaction_repository,
132 transaction_counter_service,
133 job_producer,
134 })
135 }
136
137 #[instrument(
143 level = "debug",
144 skip(self),
145 fields(
146 request_id = ?crate::observability::request_id::get_request_id(),
147 relayer_id = %self.relayer.id,
148 )
149 )]
150 async fn validate_rpc(&self) -> Result<(), RelayerError> {
151 self.provider
152 .health_check()
153 .await
154 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
155
156 Ok(())
157 }
158
159 #[instrument(
169 level = "debug",
170 skip(self, transaction),
171 fields(
172 request_id = ?crate::observability::request_id::get_request_id(),
173 relayer_id = %self.relayer.id,
174 tx_id = %transaction.id,
175 )
176 )]
177 async fn cancel_transaction_via_job(
178 &self,
179 transaction: TransactionRepoModel,
180 ) -> Result<(), RelayerError> {
181 let cancel_job = TransactionSend::cancel(
182 transaction.id.clone(),
183 transaction.relayer_id.clone(),
184 "Cancelled via delete_pending_transactions".to_string(),
185 );
186
187 self.job_producer
188 .produce_submit_transaction_job(cancel_job, None)
189 .await
190 .map_err(RelayerError::from)?;
191
192 Ok(())
193 }
194}
195
196pub type DefaultEvmRelayer<J, T, RR, NR, TCR> =
198 EvmRelayer<EvmProvider, RR, NR, T, J, EvmSigner, TransactionCounterService<TCR>>;
199
200#[async_trait]
201impl<P, RR, NR, TR, J, S, TCS> Relayer for EvmRelayer<P, RR, NR, TR, J, S, TCS>
202where
203 P: EvmProviderTrait + Send + Sync,
204 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
205 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
206 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
207 J: JobProducerTrait + Send + Sync + 'static,
208 S: DataSignerTrait + Send + Sync + 'static,
209 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
210{
211 #[instrument(
221 level = "debug",
222 skip(self, network_transaction),
223 fields(
224 request_id = ?crate::observability::request_id::get_request_id(),
225 relayer_id = %self.relayer.id,
226 network_type = ?self.relayer.network_type,
227 )
228 )]
229 async fn process_transaction_request(
230 &self,
231 network_transaction: NetworkTransactionRequest,
232 ) -> Result<TransactionRepoModel, RelayerError> {
233 let network_model = self
234 .network_repository
235 .get_by_name(NetworkType::Evm, &self.relayer.network)
236 .await?
237 .ok_or_else(|| {
238 RelayerError::NetworkConfiguration(format!(
239 "Network {} not found",
240 self.relayer.network
241 ))
242 })?;
243 let transaction =
244 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
245
246 self.transaction_repository
247 .create(transaction.clone())
248 .await
249 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
250
251 if let Err(e) = self
255 .job_producer
256 .produce_check_transaction_status_job(
257 TransactionStatusCheck::new(
258 transaction.id.clone(),
259 transaction.relayer_id.clone(),
260 crate::models::NetworkType::Evm,
261 ),
262 Some(calculate_scheduled_timestamp(
263 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
264 )),
265 )
266 .await
267 {
268 error!(
270 relayer_id = %self.relayer.id,
271 transaction_id = %transaction.id,
272 error = %e,
273 "Status check queue push failed - marking transaction as failed"
274 );
275 if let Err(update_err) = self
276 .transaction_repository
277 .partial_update(
278 transaction.id.clone(),
279 TransactionUpdateRequest {
280 status: Some(TransactionStatus::Failed),
281 status_reason: Some("Queue unavailable".to_string()),
282 ..Default::default()
283 },
284 )
285 .await
286 {
287 warn!(
288 relayer_id = %self.relayer.id,
289 transaction_id = %transaction.id,
290 error = %update_err,
291 "Failed to mark transaction as failed after queue push failure"
292 );
293 }
294 return Err(e.into());
295 }
296
297 self.job_producer
300 .produce_transaction_request_job(
301 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
302 None,
303 )
304 .await?;
305
306 Ok(transaction)
307 }
308
309 #[instrument(
315 level = "debug",
316 skip(self),
317 fields(
318 request_id = ?crate::observability::request_id::get_request_id(),
319 relayer_id = %self.relayer.id,
320 )
321 )]
322 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
323 let balance: u128 = self
324 .provider
325 .get_balance(&self.relayer.address)
326 .await
327 .map_err(|e| RelayerError::ProviderError(e.to_string()))?
328 .try_into()
329 .map_err(|_| {
330 RelayerError::ProviderError("Failed to convert balance to u128".to_string())
331 })?;
332
333 Ok(BalanceResponse {
334 balance,
335 unit: EVM_SMALLEST_UNIT_NAME.to_string(),
336 })
337 }
338
339 #[instrument(
345 level = "debug",
346 skip(self),
347 fields(
348 request_id = ?crate::observability::request_id::get_request_id(),
349 relayer_id = %self.relayer.id,
350 )
351 )]
352 async fn get_status(&self) -> Result<RelayerStatus, RelayerError> {
353 let relayer_model = &self.relayer;
354
355 let nonce = self
357 .transaction_counter_service
358 .get()
359 .await
360 .ok()
361 .flatten()
362 .unwrap_or(0);
363 let nonce_str = nonce.to_string();
364
365 let balance_response = self.get_balance().await?;
366
367 let pending_transactions_count = self
369 .transaction_repository
370 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
371 .await
372 .map_err(RelayerError::from)?;
373
374 let last_confirmed_transaction_timestamp = self
376 .transaction_repository
377 .find_by_status_paginated(
378 &relayer_model.id,
379 &[TransactionStatus::Confirmed],
380 PaginationQuery {
381 page: 1,
382 per_page: 1,
383 },
384 false, )
386 .await
387 .map_err(RelayerError::from)?
388 .items
389 .into_iter()
390 .next()
391 .and_then(|tx| tx.confirmed_at);
392
393 Ok(RelayerStatus::Evm {
394 balance: balance_response.balance.to_string(),
395 pending_transactions_count,
396 last_confirmed_transaction_timestamp,
397 system_disabled: relayer_model.system_disabled,
398 paused: relayer_model.paused,
399 nonce: nonce_str,
400 })
401 }
402
403 #[instrument(
410 level = "debug",
411 skip(self),
412 fields(
413 request_id = ?crate::observability::request_id::get_request_id(),
414 relayer_id = %self.relayer.id,
415 )
416 )]
417 async fn delete_pending_transactions(
418 &self,
419 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
420 let pending_statuses = [
421 TransactionStatus::Pending,
422 TransactionStatus::Sent,
423 TransactionStatus::Submitted,
424 ];
425
426 let pending_transactions = self
428 .transaction_repository
429 .find_by_status(&self.relayer.id, &pending_statuses[..])
430 .await
431 .map_err(RelayerError::from)?;
432
433 let transaction_count = pending_transactions.len();
434
435 if transaction_count == 0 {
436 info!(
437 relayer_id = %self.relayer.id,
438 "no pending transactions found for relayer"
439 );
440 return Ok(DeletePendingTransactionsResponse {
441 queued_for_cancellation_transaction_ids: vec![],
442 failed_to_queue_transaction_ids: vec![],
443 total_processed: 0,
444 });
445 }
446
447 info!(
448 relayer_id = %self.relayer.id,
449 transaction_count = %transaction_count,
450 "processing pending transactions for relayer"
451 );
452
453 let mut cancelled_transaction_ids = Vec::new();
454 let mut failed_transaction_ids = Vec::new();
455
456 for transaction in pending_transactions {
458 match self.cancel_transaction_via_job(transaction.clone()).await {
459 Ok(_) => {
460 cancelled_transaction_ids.push(transaction.id.clone());
461 info!(
462 tx_id = %transaction.id,
463 relayer_id = %self.relayer.id,
464 status = ?transaction.status,
465 "initiated cancellation for transaction"
466 );
467 }
468 Err(e) => {
469 failed_transaction_ids.push(transaction.id.clone());
470 warn!(
471 tx_id = %transaction.id,
472 relayer_id = %self.relayer.id,
473 error = %e,
474 "failed to cancel transaction"
475 );
476 }
477 }
478 }
479
480 let total_processed = cancelled_transaction_ids.len() + failed_transaction_ids.len();
481
482 debug!(
483 queued_for_cancellation = %cancelled_transaction_ids.len(),
484 failed_to_queue = %failed_transaction_ids.len(),
485 "completed processing pending transactions for relayer"
486 );
487
488 Ok(DeletePendingTransactionsResponse {
489 queued_for_cancellation_transaction_ids: cancelled_transaction_ids,
490 failed_to_queue_transaction_ids: failed_transaction_ids,
491 total_processed: total_processed as u32,
492 })
493 }
494
495 #[instrument(
505 level = "debug",
506 skip(self, request),
507 fields(
508 request_id = ?crate::observability::request_id::get_request_id(),
509 relayer_id = %self.relayer.id,
510 )
511 )]
512 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
513 let result = self.signer.sign_data(request).await?;
514
515 Ok(result)
516 }
517
518 #[instrument(
528 level = "debug",
529 skip(self, request),
530 fields(
531 request_id = ?crate::observability::request_id::get_request_id(),
532 relayer_id = %self.relayer.id,
533 )
534 )]
535 async fn sign_typed_data(
536 &self,
537 request: SignTypedDataRequest,
538 ) -> Result<SignDataResponse, RelayerError> {
539 let result = self.signer.sign_typed_data(request).await?;
540
541 Ok(result)
542 }
543
544 #[instrument(
554 level = "debug",
555 skip(self, request),
556 fields(
557 request_id = ?crate::observability::request_id::get_request_id(),
558 relayer_id = %self.relayer.id,
559 )
560 )]
561 async fn rpc(
562 &self,
563 request: JsonRpcRequest<NetworkRpcRequest>,
564 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
565 let evm_request = match request.params {
566 NetworkRpcRequest::Evm(evm_req) => evm_req,
567 _ => {
568 return Ok(create_error_response(
569 request.id,
570 RpcErrorCodes::INVALID_PARAMS,
571 "Invalid params",
572 "Expected EVM network request",
573 ))
574 }
575 };
576
577 let (method, params_json) = match evm_request {
579 crate::models::EvmRpcRequest::RawRpcRequest { method, params } => (method, params),
580 };
581
582 match self.provider.raw_request_dyn(&method, params_json).await {
584 Ok(result_value) => Ok(create_success_response(request.id, result_value)),
585 Err(provider_error) => {
586 tracing::error!(
588 error = %provider_error,
589 "RPC provider error occurred"
590 );
591 let (error_code, error_message) = map_provider_error(&provider_error);
592 let sanitized_description = sanitize_error_description(&provider_error);
593 Ok(create_error_response(
594 request.id,
595 error_code,
596 error_message,
597 &sanitized_description,
598 ))
599 }
600 }
601 }
602
603 #[instrument(
609 level = "debug",
610 skip(self),
611 fields(
612 request_id = ?crate::observability::request_id::get_request_id(),
613 relayer_id = %self.relayer.id,
614 )
615 )]
616 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
617 let policy = self.relayer.policies.get_evm_policy();
618 EvmTransactionValidator::init_balance_validation(
619 &self.relayer.address,
620 &policy,
621 &self.provider,
622 )
623 .await
624 .map_err(|e| RelayerError::InsufficientBalanceError(e.to_string()))?;
625
626 Ok(())
627 }
628
629 #[instrument(
635 level = "debug",
636 skip(self),
637 fields(
638 request_id = ?crate::observability::request_id::get_request_id(),
639 relayer_id = %self.relayer.id,
640 )
641 )]
642 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
643 debug!("running health checks");
644
645 let nonce_sync_result = self.sync_nonce().await;
646 let validate_rpc_result = self.validate_rpc().await;
647 let validate_min_balance_result = self.validate_min_balance().await;
648
649 let failures: Vec<HealthCheckFailure> = vec![
651 nonce_sync_result
652 .err()
653 .map(|e| HealthCheckFailure::NonceSyncFailed(e.to_string())),
654 validate_rpc_result
655 .err()
656 .map(|e| HealthCheckFailure::RpcValidationFailed(e.to_string())),
657 validate_min_balance_result
658 .err()
659 .map(|e| HealthCheckFailure::BalanceCheckFailed(e.to_string())),
660 ]
661 .into_iter()
662 .flatten()
663 .collect();
664
665 if failures.is_empty() {
666 info!("all health checks passed");
667 Ok(())
668 } else {
669 warn!("health checks failed: {:?}", failures);
670 Err(failures)
671 }
672 }
673
674 #[instrument(
675 level = "debug",
676 skip(self),
677 fields(
678 request_id = ?crate::observability::request_id::get_request_id(),
679 relayer_id = %self.relayer.id,
680 )
681 )]
682 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
683 debug!("initializing EVM relayer");
684
685 match self.check_health().await {
686 Ok(_) => {
687 if self.relayer.system_disabled {
689 self.relayer_repository
691 .enable_relayer(self.relayer.id.clone())
692 .await?;
693 }
694 Ok(())
695 }
696 Err(failures) => {
697 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
699 DisabledReason::RpcValidationFailed("Unknown error".to_string())
700 });
701
702 warn!(reason = %reason, "disabling relayer");
703 let updated_relayer = self
704 .relayer_repository
705 .disable_relayer(self.relayer.id.clone(), reason.clone())
706 .await?;
707
708 if let Some(notification_id) = &self.relayer.notification_id {
710 self.job_producer
711 .produce_send_notification_job(
712 produce_relayer_disabled_payload(
713 notification_id,
714 &updated_relayer,
715 &reason.safe_description(),
716 ),
717 None,
718 )
719 .await?;
720 }
721
722 self.job_producer
724 .produce_relayer_health_check_job(
725 RelayerHealthCheck::new(self.relayer.id.clone()),
726 Some(calculate_scheduled_timestamp(10)),
727 )
728 .await?;
729
730 Ok(())
731 }
732 }
733 }
734
735 #[instrument(
736 level = "debug",
737 skip(self, _request),
738 fields(
739 request_id = ?crate::observability::request_id::get_request_id(),
740 relayer_id = %self.relayer.id,
741 )
742 )]
743 async fn sign_transaction(
744 &self,
745 _request: &SignTransactionRequest,
746 ) -> Result<SignTransactionExternalResponse, RelayerError> {
747 Err(RelayerError::NotSupported(
748 "Transaction signing not supported for EVM".to_string(),
749 ))
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use crate::models::RpcConfig;
757 use crate::{
758 config::{EvmNetworkConfig, NetworkConfigCommon},
759 jobs::MockJobProducerTrait,
760 models::{
761 EvmRpcRequest, EvmRpcResult, JsonRpcId, NetworkRepoModel, NetworkType,
762 RelayerEvmPolicy, RelayerNetworkPolicy, RepositoryError, SignerError,
763 TransactionStatus, U256,
764 },
765 repositories::{MockNetworkRepository, MockRelayerRepository, MockTransactionRepository},
766 services::{
767 provider::{MockEvmProviderTrait, ProviderError},
768 MockTransactionCounterServiceTrait,
769 },
770 };
771 use mockall::predicate::*;
772 use std::future::ready;
773
774 mockall::mock! {
775 pub DataSigner {}
776
777 #[async_trait]
778 impl DataSignerTrait for DataSigner {
779 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, SignerError>;
780 async fn sign_typed_data(&self, request: SignTypedDataRequest) -> Result<SignDataResponse, SignerError>;
781 }
782 }
783
784 fn create_test_evm_network() -> EvmNetwork {
785 EvmNetwork {
786 network: "mainnet".to_string(),
787 rpc_urls: vec![RpcConfig::new(
788 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
789 )],
790 explorer_urls: None,
791 average_blocktime_ms: 12000,
792 is_testnet: false,
793 tags: vec!["mainnet".to_string()],
794 chain_id: 1,
795 required_confirmations: 1,
796 features: vec!["eip1559".to_string()],
797 symbol: "ETH".to_string(),
798 gas_price_cache: None,
799 }
800 }
801
802 fn create_test_network_repo_model() -> NetworkRepoModel {
803 let config = EvmNetworkConfig {
804 common: NetworkConfigCommon {
805 network: "mainnet".to_string(),
806 from: None,
807 rpc_urls: Some(vec![crate::models::RpcConfig::new(
808 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
809 )]),
810 explorer_urls: None,
811 average_blocktime_ms: Some(12000),
812 is_testnet: Some(false),
813 tags: Some(vec!["mainnet".to_string()]),
814 },
815 chain_id: Some(1),
816 required_confirmations: Some(1),
817 features: Some(vec!["eip1559".to_string()]),
818 symbol: Some("ETH".to_string()),
819 gas_price_cache: None,
820 };
821
822 NetworkRepoModel::new_evm(config)
823 }
824
825 fn create_test_relayer() -> RelayerRepoModel {
826 RelayerRepoModel {
827 id: "test-relayer-id".to_string(),
828 name: "Test Relayer".to_string(),
829 network: "mainnet".to_string(), address: "0xSender".to_string(),
831 paused: false,
832 system_disabled: false,
833 signer_id: "test-signer-id".to_string(),
834 notification_id: Some("test-notification-id".to_string()),
835 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
836 min_balance: Some(100000000000000000u128), whitelist_receivers: Some(vec!["0xRecipient".to_string()]),
838 gas_price_cap: Some(100000000000), eip1559_pricing: Some(true),
840 private_transactions: Some(false),
841 gas_limit_estimation: Some(true),
842 }),
843 network_type: NetworkType::Evm,
844 custom_rpc_urls: None,
845 ..Default::default()
846 }
847 }
848
849 fn setup_mocks() -> (
850 MockEvmProviderTrait,
851 MockRelayerRepository,
852 MockNetworkRepository,
853 MockTransactionRepository,
854 MockJobProducerTrait,
855 MockDataSigner,
856 MockTransactionCounterServiceTrait,
857 ) {
858 (
859 MockEvmProviderTrait::new(),
860 MockRelayerRepository::new(),
861 MockNetworkRepository::new(),
862 MockTransactionRepository::new(),
863 MockJobProducerTrait::new(),
864 MockDataSigner::new(),
865 MockTransactionCounterServiceTrait::new(),
866 )
867 }
868
869 #[tokio::test]
870 async fn test_get_balance() {
871 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
872 setup_mocks();
873 let relayer_model = create_test_relayer();
874
875 provider
876 .expect_get_balance()
877 .with(eq("0xSender"))
878 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64))))); let relayer = EvmRelayer::new(
881 relayer_model,
882 signer,
883 provider,
884 create_test_evm_network(),
885 Arc::new(relayer_repo),
886 Arc::new(network_repo),
887 Arc::new(tx_repo),
888 Arc::new(counter),
889 Arc::new(job_producer),
890 )
891 .unwrap();
892
893 let balance = relayer.get_balance().await.unwrap();
894 assert_eq!(balance.balance, 1000000000000000000u128);
895 assert_eq!(balance.unit, EVM_SMALLEST_UNIT_NAME);
896 }
897
898 #[tokio::test]
899 async fn test_process_transaction_request() {
900 let (
901 provider,
902 relayer_repo,
903 mut network_repo,
904 mut tx_repo,
905 mut job_producer,
906 signer,
907 counter,
908 ) = setup_mocks();
909 let relayer_model = create_test_relayer();
910
911 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
912 to: Some("0xRecipient".to_string()),
913 value: U256::from(1000000000000000000u64),
914 data: Some("0xData".to_string()),
915 gas_limit: Some(21000),
916 gas_price: Some(20000000000),
917 max_fee_per_gas: None,
918 max_priority_fee_per_gas: None,
919 speed: None,
920 valid_until: None,
921 });
922
923 network_repo
924 .expect_get_by_name()
925 .with(eq(NetworkType::Evm), eq("mainnet"))
926 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
927
928 tx_repo.expect_create().returning(Ok);
929 job_producer
930 .expect_produce_transaction_request_job()
931 .returning(|_, _| Box::pin(ready(Ok(()))));
932 job_producer
933 .expect_produce_check_transaction_status_job()
934 .returning(|_, _| Box::pin(ready(Ok(()))));
935
936 let relayer = EvmRelayer::new(
937 relayer_model,
938 signer,
939 provider,
940 create_test_evm_network(),
941 Arc::new(relayer_repo),
942 Arc::new(network_repo),
943 Arc::new(tx_repo),
944 Arc::new(counter),
945 Arc::new(job_producer),
946 )
947 .unwrap();
948
949 let result = relayer.process_transaction_request(network_tx).await;
950 assert!(result.is_ok());
951 }
952
953 #[tokio::test]
954 async fn test_process_transaction_request_status_check_failure_returns_error() {
955 let (
956 provider,
957 relayer_repo,
958 mut network_repo,
959 mut tx_repo,
960 mut job_producer,
961 signer,
962 counter,
963 ) = setup_mocks();
964 let relayer_model = create_test_relayer();
965
966 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
967 to: Some("0xRecipient".to_string()),
968 value: U256::from(1000000000000000000u64),
969 data: Some("0xData".to_string()),
970 gas_limit: Some(21000),
971 gas_price: Some(20000000000),
972 max_fee_per_gas: None,
973 max_priority_fee_per_gas: None,
974 speed: None,
975 valid_until: None,
976 });
977
978 network_repo
979 .expect_get_by_name()
980 .with(eq(NetworkType::Evm), eq("mainnet"))
981 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
982
983 tx_repo.expect_create().returning(Ok);
984 tx_repo
986 .expect_partial_update()
987 .returning(|_, _| Ok(TransactionRepoModel::default()));
988
989 job_producer
991 .expect_produce_check_transaction_status_job()
992 .returning(|_, _| {
993 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
994 "Failed to queue job".to_string(),
995 ))))
996 });
997
998 let relayer = EvmRelayer::new(
1002 relayer_model,
1003 signer,
1004 provider,
1005 create_test_evm_network(),
1006 Arc::new(relayer_repo),
1007 Arc::new(network_repo),
1008 Arc::new(tx_repo),
1009 Arc::new(counter),
1010 Arc::new(job_producer),
1011 )
1012 .unwrap();
1013
1014 let result = relayer.process_transaction_request(network_tx).await;
1015 assert!(result.is_err());
1016 }
1017
1018 #[tokio::test]
1019 async fn test_process_transaction_request_status_check_failure_marks_tx_failed() {
1020 let (
1021 provider,
1022 relayer_repo,
1023 mut network_repo,
1024 mut tx_repo,
1025 mut job_producer,
1026 signer,
1027 counter,
1028 ) = setup_mocks();
1029 let relayer_model = create_test_relayer();
1030
1031 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
1032 to: Some("0xRecipient".to_string()),
1033 value: U256::from(1000000000000000000u64),
1034 data: Some("0xData".to_string()),
1035 gas_limit: Some(21000),
1036 gas_price: Some(20000000000),
1037 max_fee_per_gas: None,
1038 max_priority_fee_per_gas: None,
1039 speed: None,
1040 valid_until: None,
1041 });
1042
1043 network_repo
1044 .expect_get_by_name()
1045 .with(eq(NetworkType::Evm), eq("mainnet"))
1046 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
1047
1048 tx_repo.expect_create().returning(Ok);
1049
1050 tx_repo
1052 .expect_partial_update()
1053 .withf(|_tx_id, update| {
1054 update.status == Some(TransactionStatus::Failed)
1055 && update.status_reason == Some("Queue unavailable".to_string())
1056 })
1057 .returning(|_, _| Ok(TransactionRepoModel::default()));
1058
1059 job_producer
1060 .expect_produce_check_transaction_status_job()
1061 .returning(|_, _| {
1062 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1063 "Redis timeout".to_string(),
1064 ))))
1065 });
1066
1067 let relayer = EvmRelayer::new(
1068 relayer_model,
1069 signer,
1070 provider,
1071 create_test_evm_network(),
1072 Arc::new(relayer_repo),
1073 Arc::new(network_repo),
1074 Arc::new(tx_repo),
1075 Arc::new(counter),
1076 Arc::new(job_producer),
1077 )
1078 .unwrap();
1079
1080 let result = relayer.process_transaction_request(network_tx).await;
1081 assert!(result.is_err());
1082 }
1084
1085 #[tokio::test]
1086 async fn test_validate_min_balance_sufficient() {
1087 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1088 setup_mocks();
1089 let relayer_model = create_test_relayer();
1090
1091 provider
1092 .expect_get_balance()
1093 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); let relayer = EvmRelayer::new(
1096 relayer_model,
1097 signer,
1098 provider,
1099 create_test_evm_network(),
1100 Arc::new(relayer_repo),
1101 Arc::new(network_repo),
1102 Arc::new(tx_repo),
1103 Arc::new(counter),
1104 Arc::new(job_producer),
1105 )
1106 .unwrap();
1107
1108 let result = relayer.validate_min_balance().await;
1109 assert!(result.is_ok());
1110 }
1111
1112 #[tokio::test]
1113 async fn test_validate_min_balance_insufficient() {
1114 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1115 setup_mocks();
1116 let relayer_model = create_test_relayer();
1117
1118 provider
1119 .expect_get_balance()
1120 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); let relayer = EvmRelayer::new(
1123 relayer_model,
1124 signer,
1125 provider,
1126 create_test_evm_network(),
1127 Arc::new(relayer_repo),
1128 Arc::new(network_repo),
1129 Arc::new(tx_repo),
1130 Arc::new(counter),
1131 Arc::new(job_producer),
1132 )
1133 .unwrap();
1134
1135 let result = relayer.validate_min_balance().await;
1136 assert!(matches!(
1137 result,
1138 Err(RelayerError::InsufficientBalanceError(_))
1139 ));
1140 }
1141
1142 #[tokio::test]
1143 async fn test_sync_nonce() {
1144 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1145 setup_mocks();
1146 let relayer_model = create_test_relayer();
1147
1148 provider
1149 .expect_get_transaction_count()
1150 .returning(|_| Box::pin(ready(Ok(42u64))));
1151
1152 counter
1153 .expect_set()
1154 .returning(|_nonce| Box::pin(ready(Ok(()))));
1155
1156 counter
1157 .expect_get()
1158 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1159
1160 let relayer = EvmRelayer::new(
1161 relayer_model,
1162 signer,
1163 provider,
1164 create_test_evm_network(),
1165 Arc::new(relayer_repo),
1166 Arc::new(network_repo),
1167 Arc::new(tx_repo),
1168 Arc::new(counter),
1169 Arc::new(job_producer),
1170 )
1171 .unwrap();
1172
1173 let result = relayer.sync_nonce().await;
1174 assert!(result.is_ok());
1175 }
1176
1177 #[tokio::test]
1178 async fn test_sync_nonce_lower_on_chain_nonce() {
1179 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1180 setup_mocks();
1181 let relayer_model = create_test_relayer();
1182
1183 provider
1184 .expect_get_transaction_count()
1185 .returning(|_| Box::pin(ready(Ok(40u64))));
1186
1187 counter
1188 .expect_set()
1189 .with(eq(42u64))
1190 .returning(|_nonce| Box::pin(ready(Ok(()))));
1191
1192 counter
1193 .expect_get()
1194 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1195
1196 let relayer = EvmRelayer::new(
1197 relayer_model,
1198 signer,
1199 provider,
1200 create_test_evm_network(),
1201 Arc::new(relayer_repo),
1202 Arc::new(network_repo),
1203 Arc::new(tx_repo),
1204 Arc::new(counter),
1205 Arc::new(job_producer),
1206 )
1207 .unwrap();
1208
1209 let result = relayer.sync_nonce().await;
1210 assert!(result.is_ok());
1211 }
1212
1213 #[tokio::test]
1214 async fn test_sync_nonce_lower_transaction_counter_nonce() {
1215 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1216 setup_mocks();
1217 let relayer_model = create_test_relayer();
1218
1219 provider
1220 .expect_get_transaction_count()
1221 .returning(|_| Box::pin(ready(Ok(42u64))));
1222
1223 counter
1224 .expect_set()
1225 .with(eq(42u64))
1226 .returning(|_nonce| Box::pin(ready(Ok(()))));
1227
1228 counter
1229 .expect_get()
1230 .returning(|| Box::pin(ready(Ok(Some(40u64)))));
1231
1232 let relayer = EvmRelayer::new(
1233 relayer_model,
1234 signer,
1235 provider,
1236 create_test_evm_network(),
1237 Arc::new(relayer_repo),
1238 Arc::new(network_repo),
1239 Arc::new(tx_repo),
1240 Arc::new(counter),
1241 Arc::new(job_producer),
1242 )
1243 .unwrap();
1244
1245 let result = relayer.sync_nonce().await;
1246 assert!(result.is_ok());
1247 }
1248
1249 #[tokio::test]
1250 async fn test_validate_rpc() {
1251 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1252 setup_mocks();
1253 let relayer_model = create_test_relayer();
1254
1255 provider
1256 .expect_health_check()
1257 .returning(|| Box::pin(ready(Ok(true))));
1258
1259 let relayer = EvmRelayer::new(
1260 relayer_model,
1261 signer,
1262 provider,
1263 create_test_evm_network(),
1264 Arc::new(relayer_repo),
1265 Arc::new(network_repo),
1266 Arc::new(tx_repo),
1267 Arc::new(counter),
1268 Arc::new(job_producer),
1269 )
1270 .unwrap();
1271
1272 let result = relayer.validate_rpc().await;
1273 assert!(result.is_ok());
1274 }
1275
1276 #[tokio::test]
1277 async fn test_get_status_success() {
1278 let (
1279 mut provider,
1280 relayer_repo,
1281 network_repo,
1282 mut tx_repo,
1283 job_producer,
1284 signer,
1285 mut counter,
1286 ) = setup_mocks();
1287 let relayer_model = create_test_relayer();
1288
1289 counter
1291 .expect_get()
1292 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1293 .once();
1294 provider
1295 .expect_get_balance()
1296 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1297 .once();
1298
1299 tx_repo
1301 .expect_count_by_status()
1302 .withf(|relayer_id, statuses| {
1303 relayer_id == "test-relayer-id"
1304 && statuses
1305 == [
1306 TransactionStatus::Pending,
1307 TransactionStatus::Sent,
1308 TransactionStatus::Submitted,
1309 ]
1310 })
1311 .returning(|_, _| Ok(0u64))
1312 .once();
1313
1314 let latest_confirmed_tx = TransactionRepoModel {
1316 id: "tx1".to_string(),
1317 relayer_id: relayer_model.id.clone(),
1318 status: TransactionStatus::Confirmed,
1319 confirmed_at: Some("2023-01-01T12:00:00Z".to_string()),
1320 ..TransactionRepoModel::default()
1321 };
1322 let relayer_id_clone = relayer_model.id.clone();
1323 tx_repo
1324 .expect_find_by_status_paginated()
1325 .withf(move |relayer_id, statuses, query, oldest_first| {
1326 *relayer_id == relayer_id_clone
1327 && statuses == [TransactionStatus::Confirmed]
1328 && query.page == 1
1329 && query.per_page == 1
1330 && !(*oldest_first)
1331 })
1332 .returning(move |_, _, _, _| {
1333 Ok(crate::repositories::PaginatedResult {
1334 items: vec![latest_confirmed_tx.clone()],
1335 total: 1,
1336 page: 1,
1337 per_page: 1,
1338 })
1339 })
1340 .once();
1341
1342 let relayer = EvmRelayer::new(
1343 relayer_model.clone(),
1344 signer,
1345 provider,
1346 create_test_evm_network(),
1347 Arc::new(relayer_repo),
1348 Arc::new(network_repo),
1349 Arc::new(tx_repo),
1350 Arc::new(counter),
1351 Arc::new(job_producer),
1352 )
1353 .unwrap();
1354
1355 let status = relayer.get_status().await.unwrap();
1356
1357 match status {
1358 RelayerStatus::Evm {
1359 balance,
1360 pending_transactions_count,
1361 last_confirmed_transaction_timestamp,
1362 system_disabled,
1363 paused,
1364 nonce,
1365 } => {
1366 assert_eq!(balance, "1000000000000000000");
1367 assert_eq!(pending_transactions_count, 0);
1368 assert_eq!(
1369 last_confirmed_transaction_timestamp,
1370 Some("2023-01-01T12:00:00Z".to_string())
1371 );
1372 assert_eq!(system_disabled, relayer_model.system_disabled);
1373 assert_eq!(paused, relayer_model.paused);
1374 assert_eq!(nonce, "10");
1375 }
1376 _ => panic!("Expected EVM RelayerStatus"),
1377 }
1378 }
1379
1380 #[tokio::test]
1381 async fn test_get_status_provider_nonce_error() {
1382 let (
1383 mut provider,
1384 relayer_repo,
1385 network_repo,
1386 mut tx_repo,
1387 job_producer,
1388 signer,
1389 mut counter,
1390 ) = setup_mocks();
1391 let relayer_model = create_test_relayer();
1392
1393 counter
1395 .expect_get()
1396 .returning(|| Box::pin(ready(Ok(None))))
1397 .once();
1398 provider
1399 .expect_get_balance()
1400 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1401 .once();
1402
1403 tx_repo
1405 .expect_count_by_status()
1406 .returning(|_, _| Ok(0u64))
1407 .once();
1408
1409 tx_repo
1411 .expect_find_by_status_paginated()
1412 .withf(|_relayer_id, statuses, query, oldest_first| {
1413 statuses == [TransactionStatus::Confirmed]
1414 && query.page == 1
1415 && query.per_page == 1
1416 && !(*oldest_first)
1417 })
1418 .returning(|_, _, _, _| {
1419 Ok(crate::repositories::PaginatedResult {
1420 items: vec![],
1421 total: 0,
1422 page: 1,
1423 per_page: 1,
1424 })
1425 })
1426 .once();
1427
1428 let relayer = EvmRelayer::new(
1429 relayer_model.clone(),
1430 signer,
1431 provider,
1432 create_test_evm_network(),
1433 Arc::new(relayer_repo),
1434 Arc::new(network_repo),
1435 Arc::new(tx_repo),
1436 Arc::new(counter),
1437 Arc::new(job_producer),
1438 )
1439 .unwrap();
1440
1441 let status = relayer.get_status().await.unwrap();
1443 match status {
1444 RelayerStatus::Evm { nonce, .. } => {
1445 assert_eq!(nonce, "0");
1446 }
1447 _ => panic!("Expected Evm status"),
1448 }
1449 }
1450
1451 #[tokio::test]
1452 async fn test_get_status_repository_pending_error() {
1453 let (
1454 mut provider,
1455 relayer_repo,
1456 network_repo,
1457 mut tx_repo,
1458 job_producer,
1459 signer,
1460 mut counter,
1461 ) = setup_mocks();
1462 let relayer_model = create_test_relayer();
1463
1464 counter
1466 .expect_get()
1467 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1468 .once();
1469 provider
1470 .expect_get_balance()
1471 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1472
1473 tx_repo
1474 .expect_count_by_status()
1475 .withf(|relayer_id, statuses| {
1476 relayer_id == "test-relayer-id"
1477 && statuses
1478 == [
1479 TransactionStatus::Pending,
1480 TransactionStatus::Sent,
1481 TransactionStatus::Submitted,
1482 ]
1483 })
1484 .returning(|_, _| Err(RepositoryError::Unknown("DB down".to_string())))
1485 .once();
1486
1487 let relayer = EvmRelayer::new(
1488 relayer_model.clone(),
1489 signer,
1490 provider,
1491 create_test_evm_network(),
1492 Arc::new(relayer_repo),
1493 Arc::new(network_repo),
1494 Arc::new(tx_repo),
1495 Arc::new(counter),
1496 Arc::new(job_producer),
1497 )
1498 .unwrap();
1499
1500 let result = relayer.get_status().await;
1501 assert!(result.is_err());
1502 match result.err().unwrap() {
1503 RelayerError::NetworkConfiguration(msg) => assert!(msg.contains("DB down")),
1505 _ => panic!("Expected NetworkConfiguration error for repo failure"),
1506 }
1507 }
1508
1509 #[tokio::test]
1510 async fn test_get_status_no_confirmed_transactions() {
1511 let (
1512 mut provider,
1513 relayer_repo,
1514 network_repo,
1515 mut tx_repo,
1516 job_producer,
1517 signer,
1518 mut counter,
1519 ) = setup_mocks();
1520 let relayer_model = create_test_relayer();
1521
1522 counter
1524 .expect_get()
1525 .returning(|| Box::pin(ready(Ok(Some(10u64)))));
1526 provider
1527 .expect_get_balance()
1528 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1529 provider
1530 .expect_health_check()
1531 .returning(|| Box::pin(ready(Ok(true))));
1532
1533 tx_repo
1535 .expect_count_by_status()
1536 .withf(|relayer_id, statuses| {
1537 relayer_id == "test-relayer-id"
1538 && statuses
1539 == [
1540 TransactionStatus::Pending,
1541 TransactionStatus::Sent,
1542 TransactionStatus::Submitted,
1543 ]
1544 })
1545 .returning(|_, _| Ok(0u64))
1546 .once();
1547
1548 let relayer_id_clone = relayer_model.id.clone();
1550 tx_repo
1551 .expect_find_by_status_paginated()
1552 .withf(move |relayer_id, statuses, query, oldest_first| {
1553 *relayer_id == relayer_id_clone
1554 && statuses == [TransactionStatus::Confirmed]
1555 && query.page == 1
1556 && query.per_page == 1
1557 && !(*oldest_first)
1558 })
1559 .returning(|_, _, _, _| {
1560 Ok(crate::repositories::PaginatedResult {
1561 items: vec![],
1562 total: 0,
1563 page: 1,
1564 per_page: 1,
1565 })
1566 })
1567 .once();
1568
1569 let relayer = EvmRelayer::new(
1570 relayer_model.clone(),
1571 signer,
1572 provider,
1573 create_test_evm_network(),
1574 Arc::new(relayer_repo),
1575 Arc::new(network_repo),
1576 Arc::new(tx_repo),
1577 Arc::new(counter),
1578 Arc::new(job_producer),
1579 )
1580 .unwrap();
1581
1582 let status = relayer.get_status().await.unwrap();
1583 match status {
1584 RelayerStatus::Evm {
1585 balance,
1586 pending_transactions_count,
1587 last_confirmed_transaction_timestamp,
1588 system_disabled,
1589 paused,
1590 nonce,
1591 } => {
1592 assert_eq!(balance, "1000000000000000000");
1593 assert_eq!(pending_transactions_count, 0);
1594 assert_eq!(last_confirmed_transaction_timestamp, None);
1595 assert_eq!(system_disabled, relayer_model.system_disabled);
1596 assert_eq!(paused, relayer_model.paused);
1597 assert_eq!(nonce, "10");
1598 }
1599 _ => panic!("Expected EVM RelayerStatus"),
1600 }
1601 }
1602
1603 #[tokio::test]
1604 async fn test_cancel_transaction_via_job_success() {
1605 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1606 setup_mocks();
1607 let relayer_model = create_test_relayer();
1608
1609 let test_transaction = TransactionRepoModel {
1610 id: "test-tx-id".to_string(),
1611 relayer_id: relayer_model.id.clone(),
1612 status: TransactionStatus::Pending,
1613 ..TransactionRepoModel::default()
1614 };
1615
1616 job_producer
1617 .expect_produce_submit_transaction_job()
1618 .withf(|job, delay| {
1619 matches!(job.command, crate::jobs::TransactionCommand::Cancel { ref reason }
1620 if job.transaction_id == "test-tx-id"
1621 && job.relayer_id == "test-relayer-id"
1622 && reason == "Cancelled via delete_pending_transactions")
1623 && delay.is_none()
1624 })
1625 .returning(|_, _| Box::pin(ready(Ok(()))))
1626 .once();
1627
1628 let relayer = EvmRelayer::new(
1629 relayer_model,
1630 signer,
1631 provider,
1632 create_test_evm_network(),
1633 Arc::new(relayer_repo),
1634 Arc::new(network_repo),
1635 Arc::new(tx_repo),
1636 Arc::new(counter),
1637 Arc::new(job_producer),
1638 )
1639 .unwrap();
1640
1641 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1642 assert!(result.is_ok());
1643 }
1644
1645 #[tokio::test]
1646 async fn test_cancel_transaction_via_job_failure() {
1647 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1648 setup_mocks();
1649 let relayer_model = create_test_relayer();
1650
1651 let test_transaction = TransactionRepoModel {
1652 id: "test-tx-id".to_string(),
1653 relayer_id: relayer_model.id.clone(),
1654 status: TransactionStatus::Pending,
1655 ..TransactionRepoModel::default()
1656 };
1657
1658 job_producer
1659 .expect_produce_submit_transaction_job()
1660 .returning(|_, _| {
1661 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1662 "Queue is full".to_string(),
1663 ))))
1664 })
1665 .once();
1666
1667 let relayer = EvmRelayer::new(
1668 relayer_model,
1669 signer,
1670 provider,
1671 create_test_evm_network(),
1672 Arc::new(relayer_repo),
1673 Arc::new(network_repo),
1674 Arc::new(tx_repo),
1675 Arc::new(counter),
1676 Arc::new(job_producer),
1677 )
1678 .unwrap();
1679
1680 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1681 assert!(result.is_err());
1682 match result.err().unwrap() {
1683 RelayerError::QueueError(_) => (),
1684 _ => panic!("Expected QueueError"),
1685 }
1686 }
1687
1688 #[tokio::test]
1689 async fn test_delete_pending_transactions_no_pending() {
1690 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1691 setup_mocks();
1692 let relayer_model = create_test_relayer();
1693
1694 tx_repo
1695 .expect_find_by_status()
1696 .withf(|relayer_id, statuses| {
1697 relayer_id == "test-relayer-id"
1698 && statuses
1699 == [
1700 TransactionStatus::Pending,
1701 TransactionStatus::Sent,
1702 TransactionStatus::Submitted,
1703 ]
1704 })
1705 .returning(|_, _| Ok(vec![]))
1706 .once();
1707
1708 let relayer = EvmRelayer::new(
1709 relayer_model,
1710 signer,
1711 provider,
1712 create_test_evm_network(),
1713 Arc::new(relayer_repo),
1714 Arc::new(network_repo),
1715 Arc::new(tx_repo),
1716 Arc::new(counter),
1717 Arc::new(job_producer),
1718 )
1719 .unwrap();
1720
1721 let result = relayer.delete_pending_transactions().await.unwrap();
1722 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1723 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1724 assert_eq!(result.total_processed, 0);
1725 }
1726
1727 #[tokio::test]
1728 async fn test_delete_pending_transactions_all_successful() {
1729 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1730 setup_mocks();
1731 let relayer_model = create_test_relayer();
1732
1733 let pending_transactions = vec![
1734 TransactionRepoModel {
1735 id: "tx1".to_string(),
1736 relayer_id: relayer_model.id.clone(),
1737 status: TransactionStatus::Pending,
1738 ..TransactionRepoModel::default()
1739 },
1740 TransactionRepoModel {
1741 id: "tx2".to_string(),
1742 relayer_id: relayer_model.id.clone(),
1743 status: TransactionStatus::Sent,
1744 ..TransactionRepoModel::default()
1745 },
1746 TransactionRepoModel {
1747 id: "tx3".to_string(),
1748 relayer_id: relayer_model.id.clone(),
1749 status: TransactionStatus::Submitted,
1750 ..TransactionRepoModel::default()
1751 },
1752 ];
1753
1754 tx_repo
1755 .expect_find_by_status()
1756 .withf(|relayer_id, statuses| {
1757 relayer_id == "test-relayer-id"
1758 && statuses
1759 == [
1760 TransactionStatus::Pending,
1761 TransactionStatus::Sent,
1762 TransactionStatus::Submitted,
1763 ]
1764 })
1765 .returning(move |_, _| Ok(pending_transactions.clone()))
1766 .once();
1767
1768 job_producer
1769 .expect_produce_submit_transaction_job()
1770 .returning(|_, _| Box::pin(ready(Ok(()))))
1771 .times(3);
1772
1773 let relayer = EvmRelayer::new(
1774 relayer_model,
1775 signer,
1776 provider,
1777 create_test_evm_network(),
1778 Arc::new(relayer_repo),
1779 Arc::new(network_repo),
1780 Arc::new(tx_repo),
1781 Arc::new(counter),
1782 Arc::new(job_producer),
1783 )
1784 .unwrap();
1785
1786 let result = relayer.delete_pending_transactions().await.unwrap();
1787 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 3);
1788 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1789 assert_eq!(result.total_processed, 3);
1790
1791 let expected_ids = vec!["tx1", "tx2", "tx3"];
1792 for id in expected_ids {
1793 assert!(result
1794 .queued_for_cancellation_transaction_ids
1795 .contains(&id.to_string()));
1796 }
1797 }
1798
1799 #[tokio::test]
1800 async fn test_delete_pending_transactions_partial_failures() {
1801 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1802 setup_mocks();
1803 let relayer_model = create_test_relayer();
1804
1805 let pending_transactions = vec![
1806 TransactionRepoModel {
1807 id: "tx1".to_string(),
1808 relayer_id: relayer_model.id.clone(),
1809 status: TransactionStatus::Pending,
1810 ..TransactionRepoModel::default()
1811 },
1812 TransactionRepoModel {
1813 id: "tx2".to_string(),
1814 relayer_id: relayer_model.id.clone(),
1815 status: TransactionStatus::Sent,
1816 ..TransactionRepoModel::default()
1817 },
1818 TransactionRepoModel {
1819 id: "tx3".to_string(),
1820 relayer_id: relayer_model.id.clone(),
1821 status: TransactionStatus::Submitted,
1822 ..TransactionRepoModel::default()
1823 },
1824 ];
1825
1826 tx_repo
1827 .expect_find_by_status()
1828 .withf(|relayer_id, statuses| {
1829 relayer_id == "test-relayer-id"
1830 && statuses
1831 == [
1832 TransactionStatus::Pending,
1833 TransactionStatus::Sent,
1834 TransactionStatus::Submitted,
1835 ]
1836 })
1837 .returning(move |_, _| Ok(pending_transactions.clone()))
1838 .once();
1839
1840 job_producer
1842 .expect_produce_submit_transaction_job()
1843 .returning(|_, _| Box::pin(ready(Ok(()))))
1844 .times(1);
1845 job_producer
1846 .expect_produce_submit_transaction_job()
1847 .returning(|_, _| {
1848 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1849 "Queue is full".to_string(),
1850 ))))
1851 })
1852 .times(1);
1853 job_producer
1854 .expect_produce_submit_transaction_job()
1855 .returning(|_, _| Box::pin(ready(Ok(()))))
1856 .times(1);
1857
1858 let relayer = EvmRelayer::new(
1859 relayer_model,
1860 signer,
1861 provider,
1862 create_test_evm_network(),
1863 Arc::new(relayer_repo),
1864 Arc::new(network_repo),
1865 Arc::new(tx_repo),
1866 Arc::new(counter),
1867 Arc::new(job_producer),
1868 )
1869 .unwrap();
1870
1871 let result = relayer.delete_pending_transactions().await.unwrap();
1872 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 2);
1873 assert_eq!(result.failed_to_queue_transaction_ids.len(), 1);
1874 assert_eq!(result.total_processed, 3);
1875 }
1876
1877 #[tokio::test]
1878 async fn test_delete_pending_transactions_repository_error() {
1879 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1880 setup_mocks();
1881 let relayer_model = create_test_relayer();
1882
1883 tx_repo
1884 .expect_find_by_status()
1885 .withf(|relayer_id, statuses| {
1886 relayer_id == "test-relayer-id"
1887 && statuses
1888 == [
1889 TransactionStatus::Pending,
1890 TransactionStatus::Sent,
1891 TransactionStatus::Submitted,
1892 ]
1893 })
1894 .returning(|_, _| {
1895 Err(RepositoryError::Unknown(
1896 "Database connection failed".to_string(),
1897 ))
1898 })
1899 .once();
1900
1901 let relayer = EvmRelayer::new(
1902 relayer_model,
1903 signer,
1904 provider,
1905 create_test_evm_network(),
1906 Arc::new(relayer_repo),
1907 Arc::new(network_repo),
1908 Arc::new(tx_repo),
1909 Arc::new(counter),
1910 Arc::new(job_producer),
1911 )
1912 .unwrap();
1913
1914 let result = relayer.delete_pending_transactions().await;
1915 assert!(result.is_err());
1916 match result.err().unwrap() {
1917 RelayerError::NetworkConfiguration(msg) => {
1918 assert!(msg.contains("Database connection failed"))
1919 }
1920 _ => panic!("Expected NetworkConfiguration error for repository failure"),
1921 }
1922 }
1923
1924 #[tokio::test]
1925 async fn test_delete_pending_transactions_all_failures() {
1926 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1927 setup_mocks();
1928 let relayer_model = create_test_relayer();
1929
1930 let pending_transactions = vec![
1931 TransactionRepoModel {
1932 id: "tx1".to_string(),
1933 relayer_id: relayer_model.id.clone(),
1934 status: TransactionStatus::Pending,
1935 ..TransactionRepoModel::default()
1936 },
1937 TransactionRepoModel {
1938 id: "tx2".to_string(),
1939 relayer_id: relayer_model.id.clone(),
1940 status: TransactionStatus::Sent,
1941 ..TransactionRepoModel::default()
1942 },
1943 ];
1944
1945 tx_repo
1946 .expect_find_by_status()
1947 .withf(|relayer_id, statuses| {
1948 relayer_id == "test-relayer-id"
1949 && statuses
1950 == [
1951 TransactionStatus::Pending,
1952 TransactionStatus::Sent,
1953 TransactionStatus::Submitted,
1954 ]
1955 })
1956 .returning(move |_, _| Ok(pending_transactions.clone()))
1957 .once();
1958
1959 job_producer
1960 .expect_produce_submit_transaction_job()
1961 .returning(|_, _| {
1962 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1963 "Queue is full".to_string(),
1964 ))))
1965 })
1966 .times(2);
1967
1968 let relayer = EvmRelayer::new(
1969 relayer_model,
1970 signer,
1971 provider,
1972 create_test_evm_network(),
1973 Arc::new(relayer_repo),
1974 Arc::new(network_repo),
1975 Arc::new(tx_repo),
1976 Arc::new(counter),
1977 Arc::new(job_producer),
1978 )
1979 .unwrap();
1980
1981 let result = relayer.delete_pending_transactions().await.unwrap();
1982 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1983 assert_eq!(result.failed_to_queue_transaction_ids.len(), 2);
1984 assert_eq!(result.total_processed, 2);
1985
1986 let expected_failed_ids = vec!["tx1", "tx2"];
1987 for id in expected_failed_ids {
1988 assert!(result
1989 .failed_to_queue_transaction_ids
1990 .contains(&id.to_string()));
1991 }
1992 }
1993
1994 #[tokio::test]
1995 async fn test_rpc_eth_get_balance() {
1996 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1997 setup_mocks();
1998 let relayer_model = create_test_relayer();
1999
2000 provider
2001 .expect_raw_request_dyn()
2002 .withf(|method, params| {
2003 method == "eth_getBalance"
2004 && params.as_str()
2005 == Some(r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#)
2006 })
2007 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0xde0b6b3a7640000")) }));
2008
2009 let relayer = EvmRelayer::new(
2010 relayer_model,
2011 signer,
2012 provider,
2013 create_test_evm_network(),
2014 Arc::new(relayer_repo),
2015 Arc::new(network_repo),
2016 Arc::new(tx_repo),
2017 Arc::new(counter),
2018 Arc::new(job_producer),
2019 )
2020 .unwrap();
2021
2022 let request = JsonRpcRequest {
2023 jsonrpc: "2.0".to_string(),
2024 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2025 method: "eth_getBalance".to_string(),
2026 params: serde_json::Value::String(
2027 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2028 ),
2029 }),
2030 id: Some(JsonRpcId::Number(1)),
2031 };
2032
2033 let response = relayer.rpc(request).await.unwrap();
2034 assert!(response.error.is_none());
2035 assert!(response.result.is_some());
2036
2037 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2038 assert_eq!(result, serde_json::json!("0xde0b6b3a7640000")); }
2040 }
2041
2042 #[tokio::test]
2043 async fn test_rpc_eth_block_number() {
2044 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2045 setup_mocks();
2046 let relayer_model = create_test_relayer();
2047
2048 provider
2049 .expect_raw_request_dyn()
2050 .withf(|method, params| method == "eth_blockNumber" && params.as_str() == Some("[]"))
2051 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x3039")) }));
2052
2053 let relayer = EvmRelayer::new(
2054 relayer_model,
2055 signer,
2056 provider,
2057 create_test_evm_network(),
2058 Arc::new(relayer_repo),
2059 Arc::new(network_repo),
2060 Arc::new(tx_repo),
2061 Arc::new(counter),
2062 Arc::new(job_producer),
2063 )
2064 .unwrap();
2065
2066 let request = JsonRpcRequest {
2067 jsonrpc: "2.0".to_string(),
2068 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2069 method: "eth_blockNumber".to_string(),
2070 params: serde_json::Value::String("[]".to_string()),
2071 }),
2072 id: Some(JsonRpcId::Number(1)),
2073 };
2074
2075 let response = relayer.rpc(request).await.unwrap();
2076 assert!(response.error.is_none());
2077 assert!(response.result.is_some());
2078
2079 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2080 assert_eq!(result, serde_json::json!("0x3039")); }
2082 }
2083
2084 #[tokio::test]
2085 async fn test_rpc_unsupported_method() {
2086 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2087 setup_mocks();
2088 let relayer_model = create_test_relayer();
2089
2090 provider
2091 .expect_raw_request_dyn()
2092 .withf(|method, _| method == "eth_unsupportedMethod")
2093 .returning(|_, _| {
2094 Box::pin(async {
2095 Err(ProviderError::Other(
2096 "Unsupported method: eth_unsupportedMethod".to_string(),
2097 ))
2098 })
2099 });
2100
2101 let relayer = EvmRelayer::new(
2102 relayer_model,
2103 signer,
2104 provider,
2105 create_test_evm_network(),
2106 Arc::new(relayer_repo),
2107 Arc::new(network_repo),
2108 Arc::new(tx_repo),
2109 Arc::new(counter),
2110 Arc::new(job_producer),
2111 )
2112 .unwrap();
2113
2114 let request = JsonRpcRequest {
2115 jsonrpc: "2.0".to_string(),
2116 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2117 method: "eth_unsupportedMethod".to_string(),
2118 params: serde_json::Value::String("[]".to_string()),
2119 }),
2120 id: Some(JsonRpcId::Number(1)),
2121 };
2122
2123 let response = relayer.rpc(request).await.unwrap();
2124 assert!(response.result.is_none());
2125 assert!(response.error.is_some());
2126
2127 let error = response.error.unwrap();
2128 assert_eq!(error.code, -32603); }
2130
2131 #[tokio::test]
2132 async fn test_rpc_invalid_params() {
2133 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2134 setup_mocks();
2135 let relayer_model = create_test_relayer();
2136
2137 provider
2138 .expect_raw_request_dyn()
2139 .withf(|method, params| method == "eth_getBalance" && params.as_str() == Some("[]"))
2140 .returning(|_, _| {
2141 Box::pin(async {
2142 Err(ProviderError::Other(
2143 "Missing address parameter".to_string(),
2144 ))
2145 })
2146 });
2147
2148 let relayer = EvmRelayer::new(
2149 relayer_model,
2150 signer,
2151 provider,
2152 create_test_evm_network(),
2153 Arc::new(relayer_repo),
2154 Arc::new(network_repo),
2155 Arc::new(tx_repo),
2156 Arc::new(counter),
2157 Arc::new(job_producer),
2158 )
2159 .unwrap();
2160
2161 let request = JsonRpcRequest {
2162 jsonrpc: "2.0".to_string(),
2163 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2164 method: "eth_getBalance".to_string(),
2165 params: serde_json::Value::String("[]".to_string()), }),
2167 id: Some(JsonRpcId::Number(1)),
2168 };
2169
2170 let response = relayer.rpc(request).await.unwrap();
2171 assert!(response.result.is_none());
2172 assert!(response.error.is_some());
2173
2174 let error = response.error.unwrap();
2175 assert_eq!(error.code, -32603); }
2177
2178 #[tokio::test]
2179 async fn test_rpc_non_evm_request() {
2180 let (provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2181 setup_mocks();
2182 let relayer_model = create_test_relayer();
2183
2184 let relayer = EvmRelayer::new(
2185 relayer_model,
2186 signer,
2187 provider,
2188 create_test_evm_network(),
2189 Arc::new(relayer_repo),
2190 Arc::new(network_repo),
2191 Arc::new(tx_repo),
2192 Arc::new(counter),
2193 Arc::new(job_producer),
2194 )
2195 .unwrap();
2196
2197 let request = JsonRpcRequest {
2198 jsonrpc: "2.0".to_string(),
2199 params: NetworkRpcRequest::Solana(crate::models::SolanaRpcRequest::GetSupportedTokens(
2200 crate::models::SolanaGetSupportedTokensRequestParams {},
2201 )),
2202 id: Some(JsonRpcId::Number(1)),
2203 };
2204
2205 let response = relayer.rpc(request).await.unwrap();
2206 assert!(response.result.is_none());
2207 assert!(response.error.is_some());
2208
2209 let error = response.error.unwrap();
2210 assert_eq!(error.code, -32602); }
2212
2213 #[tokio::test]
2214 async fn test_rpc_raw_request_with_array_params() {
2215 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2216 setup_mocks();
2217 let relayer_model = create_test_relayer();
2218
2219 provider
2220 .expect_raw_request_dyn()
2221 .withf(|method, params| {
2222 method == "eth_getTransactionByHash"
2223 && params.as_array().is_some_and(|arr| {
2224 arr.len() == 1 && arr[0].as_str() == Some("0x1234567890abcdef")
2225 })
2226 })
2227 .returning(|_, _| {
2228 Box::pin(async {
2229 Ok(serde_json::json!({
2230 "hash": "0x1234567890abcdef",
2231 "blockNumber": "0x1",
2232 "gasUsed": "0x5208"
2233 }))
2234 })
2235 });
2236
2237 let relayer = EvmRelayer::new(
2238 relayer_model,
2239 signer,
2240 provider,
2241 create_test_evm_network(),
2242 Arc::new(relayer_repo),
2243 Arc::new(network_repo),
2244 Arc::new(tx_repo),
2245 Arc::new(counter),
2246 Arc::new(job_producer),
2247 )
2248 .unwrap();
2249
2250 let request = JsonRpcRequest {
2251 jsonrpc: "2.0".to_string(),
2252 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2253 method: "eth_getTransactionByHash".to_string(),
2254 params: serde_json::json!(["0x1234567890abcdef"]),
2255 }),
2256 id: Some(JsonRpcId::Number(42)),
2257 };
2258
2259 let response = relayer.rpc(request).await.unwrap();
2260 assert!(response.error.is_none());
2261 assert!(response.result.is_some());
2262 assert_eq!(response.id, Some(JsonRpcId::Number(42)));
2263
2264 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2265 assert!(result.get("hash").is_some());
2266 assert!(result.get("blockNumber").is_some());
2267 }
2268 }
2269
2270 #[tokio::test]
2271 async fn test_rpc_raw_request_with_object_params() {
2272 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2273 setup_mocks();
2274 let relayer_model = create_test_relayer();
2275
2276 provider
2277 .expect_raw_request_dyn()
2278 .withf(|method, params| {
2279 method == "eth_call"
2280 && params
2281 .as_object()
2282 .is_some_and(|obj| obj.contains_key("to") && obj.contains_key("data"))
2283 })
2284 .returning(|_, _| {
2285 Box::pin(async {
2286 Ok(serde_json::json!(
2287 "0x0000000000000000000000000000000000000000000000000000000000000001"
2288 ))
2289 })
2290 });
2291
2292 let relayer = EvmRelayer::new(
2293 relayer_model,
2294 signer,
2295 provider,
2296 create_test_evm_network(),
2297 Arc::new(relayer_repo),
2298 Arc::new(network_repo),
2299 Arc::new(tx_repo),
2300 Arc::new(counter),
2301 Arc::new(job_producer),
2302 )
2303 .unwrap();
2304
2305 let request = JsonRpcRequest {
2306 jsonrpc: "2.0".to_string(),
2307 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2308 method: "eth_call".to_string(),
2309 params: serde_json::json!({
2310 "to": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
2311 "data": "0x70a08231000000000000000000000000742d35cc6634c0532925a3b844bc454e4438f44e"
2312 }),
2313 }),
2314 id: Some(JsonRpcId::Number(123)),
2315 };
2316
2317 let response = relayer.rpc(request).await.unwrap();
2318 assert!(response.error.is_none());
2319 assert!(response.result.is_some());
2320 assert_eq!(response.id, Some(JsonRpcId::Number(123)));
2321 }
2322
2323 #[tokio::test]
2324 async fn test_rpc_generic_request_with_empty_params() {
2325 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2326 setup_mocks();
2327 let relayer_model = create_test_relayer();
2328
2329 provider
2330 .expect_raw_request_dyn()
2331 .withf(|method, params| method == "net_version" && params.as_str() == Some("[]"))
2332 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("1")) }));
2333
2334 let relayer = EvmRelayer::new(
2335 relayer_model,
2336 signer,
2337 provider,
2338 create_test_evm_network(),
2339 Arc::new(relayer_repo),
2340 Arc::new(network_repo),
2341 Arc::new(tx_repo),
2342 Arc::new(counter),
2343 Arc::new(job_producer),
2344 )
2345 .unwrap();
2346
2347 let request = JsonRpcRequest {
2348 jsonrpc: "2.0".to_string(),
2349 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2350 method: "net_version".to_string(),
2351 params: serde_json::Value::String("[]".to_string()),
2352 }),
2353 id: Some(JsonRpcId::Number(999)),
2354 };
2355
2356 let response = relayer.rpc(request).await.unwrap();
2357 assert!(response.error.is_none());
2358 assert!(response.result.is_some());
2359 assert_eq!(response.id, Some(JsonRpcId::Number(999)));
2360 }
2361
2362 #[tokio::test]
2363 async fn test_rpc_provider_invalid_address_error() {
2364 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2365 setup_mocks();
2366 let relayer_model = create_test_relayer();
2367
2368 provider.expect_raw_request_dyn().returning(|_, _| {
2369 Box::pin(async {
2370 Err(ProviderError::InvalidAddress(
2371 "Invalid address format".to_string(),
2372 ))
2373 })
2374 });
2375
2376 let relayer = EvmRelayer::new(
2377 relayer_model,
2378 signer,
2379 provider,
2380 create_test_evm_network(),
2381 Arc::new(relayer_repo),
2382 Arc::new(network_repo),
2383 Arc::new(tx_repo),
2384 Arc::new(counter),
2385 Arc::new(job_producer),
2386 )
2387 .unwrap();
2388
2389 let request = JsonRpcRequest {
2390 jsonrpc: "2.0".to_string(),
2391 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2392 method: "eth_getBalance".to_string(),
2393 params: serde_json::Value::String(r#"["invalid_address", "latest"]"#.to_string()),
2394 }),
2395 id: Some(JsonRpcId::Number(1)),
2396 };
2397
2398 let response = relayer.rpc(request).await.unwrap();
2399 assert!(response.result.is_none());
2400 assert!(response.error.is_some());
2401
2402 let error = response.error.unwrap();
2403 assert_eq!(error.code, -32602); }
2405
2406 #[tokio::test]
2407 async fn test_rpc_provider_network_configuration_error() {
2408 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2409 setup_mocks();
2410 let relayer_model = create_test_relayer();
2411
2412 provider.expect_raw_request_dyn().returning(|_, _| {
2413 Box::pin(async {
2414 Err(ProviderError::NetworkConfiguration(
2415 "Network not reachable".to_string(),
2416 ))
2417 })
2418 });
2419
2420 let relayer = EvmRelayer::new(
2421 relayer_model,
2422 signer,
2423 provider,
2424 create_test_evm_network(),
2425 Arc::new(relayer_repo),
2426 Arc::new(network_repo),
2427 Arc::new(tx_repo),
2428 Arc::new(counter),
2429 Arc::new(job_producer),
2430 )
2431 .unwrap();
2432
2433 let request = JsonRpcRequest {
2434 jsonrpc: "2.0".to_string(),
2435 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2436 method: "eth_chainId".to_string(),
2437 params: serde_json::Value::String("[]".to_string()),
2438 }),
2439 id: Some(JsonRpcId::Number(2)),
2440 };
2441
2442 let response = relayer.rpc(request).await.unwrap();
2443 assert!(response.result.is_none());
2444 assert!(response.error.is_some());
2445
2446 let error = response.error.unwrap();
2447 assert_eq!(error.code, -33004); }
2449
2450 #[tokio::test]
2451 async fn test_rpc_provider_timeout_error() {
2452 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2453 setup_mocks();
2454 let relayer_model = create_test_relayer();
2455
2456 provider
2457 .expect_raw_request_dyn()
2458 .returning(|_, _| Box::pin(async { Err(ProviderError::Timeout) }));
2459
2460 let relayer = EvmRelayer::new(
2461 relayer_model,
2462 signer,
2463 provider,
2464 create_test_evm_network(),
2465 Arc::new(relayer_repo),
2466 Arc::new(network_repo),
2467 Arc::new(tx_repo),
2468 Arc::new(counter),
2469 Arc::new(job_producer),
2470 )
2471 .unwrap();
2472
2473 let request = JsonRpcRequest {
2474 jsonrpc: "2.0".to_string(),
2475 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2476 method: "eth_blockNumber".to_string(),
2477 params: serde_json::json!([]),
2478 }),
2479 id: Some(JsonRpcId::Number(3)),
2480 };
2481
2482 let response = relayer.rpc(request).await.unwrap();
2483 assert!(response.result.is_none());
2484 assert!(response.error.is_some());
2485
2486 let error = response.error.unwrap();
2487 assert_eq!(error.code, -33000); }
2489
2490 #[tokio::test]
2491 async fn test_rpc_provider_rate_limited_error() {
2492 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2493 setup_mocks();
2494 let relayer_model = create_test_relayer();
2495
2496 provider
2497 .expect_raw_request_dyn()
2498 .returning(|_, _| Box::pin(async { Err(ProviderError::RateLimited) }));
2499
2500 let relayer = EvmRelayer::new(
2501 relayer_model,
2502 signer,
2503 provider,
2504 create_test_evm_network(),
2505 Arc::new(relayer_repo),
2506 Arc::new(network_repo),
2507 Arc::new(tx_repo),
2508 Arc::new(counter),
2509 Arc::new(job_producer),
2510 )
2511 .unwrap();
2512
2513 let request = JsonRpcRequest {
2514 jsonrpc: "2.0".to_string(),
2515 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2516 method: "eth_getBalance".to_string(),
2517 params: serde_json::Value::String(
2518 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2519 ),
2520 }),
2521 id: Some(JsonRpcId::Number(4)),
2522 };
2523
2524 let response = relayer.rpc(request).await.unwrap();
2525 assert!(response.result.is_none());
2526 assert!(response.error.is_some());
2527
2528 let error = response.error.unwrap();
2529 assert_eq!(error.code, -33001); }
2531
2532 #[tokio::test]
2533 async fn test_rpc_provider_bad_gateway_error() {
2534 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2535 setup_mocks();
2536 let relayer_model = create_test_relayer();
2537
2538 provider
2539 .expect_raw_request_dyn()
2540 .returning(|_, _| Box::pin(async { Err(ProviderError::BadGateway) }));
2541
2542 let relayer = EvmRelayer::new(
2543 relayer_model,
2544 signer,
2545 provider,
2546 create_test_evm_network(),
2547 Arc::new(relayer_repo),
2548 Arc::new(network_repo),
2549 Arc::new(tx_repo),
2550 Arc::new(counter),
2551 Arc::new(job_producer),
2552 )
2553 .unwrap();
2554
2555 let request = JsonRpcRequest {
2556 jsonrpc: "2.0".to_string(),
2557 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2558 method: "eth_gasPrice".to_string(),
2559 params: serde_json::json!([]),
2560 }),
2561 id: Some(JsonRpcId::Number(5)),
2562 };
2563
2564 let response = relayer.rpc(request).await.unwrap();
2565 assert!(response.result.is_none());
2566 assert!(response.error.is_some());
2567
2568 let error = response.error.unwrap();
2569 assert_eq!(error.code, -33002); }
2571
2572 #[tokio::test]
2573 async fn test_rpc_provider_request_error() {
2574 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2575 setup_mocks();
2576 let relayer_model = create_test_relayer();
2577
2578 provider.expect_raw_request_dyn().returning(|_, _| {
2579 Box::pin(async {
2580 Err(ProviderError::RequestError {
2581 error: "Bad request".to_string(),
2582 status_code: 400,
2583 })
2584 })
2585 });
2586
2587 let relayer = EvmRelayer::new(
2588 relayer_model,
2589 signer,
2590 provider,
2591 create_test_evm_network(),
2592 Arc::new(relayer_repo),
2593 Arc::new(network_repo),
2594 Arc::new(tx_repo),
2595 Arc::new(counter),
2596 Arc::new(job_producer),
2597 )
2598 .unwrap();
2599
2600 let request = JsonRpcRequest {
2601 jsonrpc: "2.0".to_string(),
2602 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2603 method: "invalid_method".to_string(),
2604 params: serde_json::Value::String("{}".to_string()),
2605 }),
2606 id: Some(JsonRpcId::Number(6)),
2607 };
2608
2609 let response = relayer.rpc(request).await.unwrap();
2610 assert!(response.result.is_none());
2611 assert!(response.error.is_some());
2612
2613 let error = response.error.unwrap();
2614 assert_eq!(error.code, -33003); }
2616
2617 #[tokio::test]
2618 async fn test_rpc_provider_other_error() {
2619 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2620 setup_mocks();
2621 let relayer_model = create_test_relayer();
2622
2623 provider.expect_raw_request_dyn().returning(|_, _| {
2624 Box::pin(async {
2625 Err(ProviderError::Other(
2626 "Unexpected error occurred".to_string(),
2627 ))
2628 })
2629 });
2630
2631 let relayer = EvmRelayer::new(
2632 relayer_model,
2633 signer,
2634 provider,
2635 create_test_evm_network(),
2636 Arc::new(relayer_repo),
2637 Arc::new(network_repo),
2638 Arc::new(tx_repo),
2639 Arc::new(counter),
2640 Arc::new(job_producer),
2641 )
2642 .unwrap();
2643
2644 let request = JsonRpcRequest {
2645 jsonrpc: "2.0".to_string(),
2646 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2647 method: "eth_getBalance".to_string(),
2648 params: serde_json::json!(["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]),
2649 }),
2650 id: Some(JsonRpcId::Number(7)),
2651 };
2652
2653 let response = relayer.rpc(request).await.unwrap();
2654 assert!(response.result.is_none());
2655 assert!(response.error.is_some());
2656
2657 let error = response.error.unwrap();
2658 assert_eq!(error.code, -32603); }
2660
2661 #[tokio::test]
2662 async fn test_rpc_response_preserves_request_id() {
2663 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2664 setup_mocks();
2665 let relayer_model = create_test_relayer();
2666
2667 provider
2668 .expect_raw_request_dyn()
2669 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x1")) }));
2670
2671 let relayer = EvmRelayer::new(
2672 relayer_model,
2673 signer,
2674 provider,
2675 create_test_evm_network(),
2676 Arc::new(relayer_repo),
2677 Arc::new(network_repo),
2678 Arc::new(tx_repo),
2679 Arc::new(counter),
2680 Arc::new(job_producer),
2681 )
2682 .unwrap();
2683
2684 let request_id = u64::MAX;
2685 let request = JsonRpcRequest {
2686 jsonrpc: "2.0".to_string(),
2687 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2688 method: "eth_chainId".to_string(),
2689 params: serde_json::Value::String("[]".to_string()),
2690 }),
2691 id: Some(JsonRpcId::Number(request_id as i64)),
2692 };
2693
2694 let response = relayer.rpc(request).await.unwrap();
2695 assert_eq!(response.id, Some(JsonRpcId::Number(request_id as i64)));
2696 assert_eq!(response.jsonrpc, "2.0");
2697 }
2698
2699 #[tokio::test]
2700 async fn test_rpc_handles_complex_json_response() {
2701 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2702 setup_mocks();
2703 let relayer_model = create_test_relayer();
2704
2705 let complex_response = serde_json::json!({
2706 "number": "0x1b4",
2707 "hash": "0xdc0818cf78f21a8e70579cb46a43643f78291264dda342ae31049421c82d21ae",
2708 "parentHash": "0xe99e022112df268ce40b8b654759b4f39c3cc1b8c86b2f4c7da48ba6d8a6ae8b",
2709 "transactions": [
2710 {
2711 "hash": "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060",
2712 "from": "0xa7d9ddbe1f17865597fbd27ec712455208b6b76d",
2713 "to": "0xf02c1c8e6114b1dbe8937a39260b5b0a374432bb",
2714 "value": "0xf3dbb76162000"
2715 }
2716 ],
2717 "gasUsed": "0x5208"
2718 });
2719
2720 provider.expect_raw_request_dyn().returning(move |_, _| {
2721 let response = complex_response.clone();
2722 Box::pin(async move { Ok(response) })
2723 });
2724
2725 let relayer = EvmRelayer::new(
2726 relayer_model,
2727 signer,
2728 provider,
2729 create_test_evm_network(),
2730 Arc::new(relayer_repo),
2731 Arc::new(network_repo),
2732 Arc::new(tx_repo),
2733 Arc::new(counter),
2734 Arc::new(job_producer),
2735 )
2736 .unwrap();
2737
2738 let request = JsonRpcRequest {
2739 jsonrpc: "2.0".to_string(),
2740 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2741 method: "eth_getBlockByNumber".to_string(),
2742 params: serde_json::json!(["0x1b4", true]),
2743 }),
2744 id: Some(JsonRpcId::Number(8)),
2745 };
2746
2747 let response = relayer.rpc(request).await.unwrap();
2748 assert!(response.error.is_none());
2749 assert!(response.result.is_some());
2750
2751 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2752 assert!(result.get("transactions").is_some());
2753 assert!(result.get("hash").is_some());
2754 assert!(result.get("gasUsed").is_some());
2755 }
2756 }
2757
2758 #[tokio::test]
2759 async fn test_initialize_relayer_disables_when_validation_fails() {
2760 let (
2761 mut provider,
2762 mut relayer_repo,
2763 network_repo,
2764 tx_repo,
2765 mut job_producer,
2766 signer,
2767 mut counter,
2768 ) = setup_mocks();
2769 let mut relayer_model = create_test_relayer();
2770 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2772
2773 provider
2775 .expect_get_transaction_count()
2776 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
2777
2778 counter
2779 .expect_get()
2780 .returning(|| Box::pin(ready(Ok(Some(0u64)))));
2781
2782 provider
2784 .expect_get_balance()
2785 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64)))));
2786
2787 provider
2788 .expect_health_check()
2789 .returning(|| Box::pin(ready(Ok(true))));
2790
2791 let mut disabled_relayer = relayer_model.clone();
2793 disabled_relayer.system_disabled = true;
2794 relayer_repo
2795 .expect_disable_relayer()
2796 .with(eq("test-relayer-id".to_string()), always())
2797 .returning(move |_, _| Ok(disabled_relayer.clone()));
2798
2799 job_producer
2801 .expect_produce_send_notification_job()
2802 .returning(|_, _| Box::pin(ready(Ok(()))));
2803
2804 job_producer
2806 .expect_produce_relayer_health_check_job()
2807 .returning(|_, _| Box::pin(ready(Ok(()))));
2808
2809 let relayer = EvmRelayer::new(
2810 relayer_model,
2811 signer,
2812 provider,
2813 create_test_evm_network(),
2814 Arc::new(relayer_repo),
2815 Arc::new(network_repo),
2816 Arc::new(tx_repo),
2817 Arc::new(counter),
2818 Arc::new(job_producer),
2819 )
2820 .unwrap();
2821
2822 let result = relayer.initialize_relayer().await;
2823 assert!(result.is_ok());
2824 }
2825
2826 #[tokio::test]
2827 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
2828 let (
2829 mut provider,
2830 mut relayer_repo,
2831 network_repo,
2832 tx_repo,
2833 job_producer,
2834 signer,
2835 mut counter,
2836 ) = setup_mocks();
2837 let mut relayer_model = create_test_relayer();
2838 relayer_model.system_disabled = true; provider
2842 .expect_get_transaction_count()
2843 .returning(|_| Box::pin(ready(Ok(42u64))));
2844
2845 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2846
2847 counter
2848 .expect_get()
2849 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2850
2851 provider
2852 .expect_get_balance()
2853 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2856 .expect_health_check()
2857 .returning(|| Box::pin(ready(Ok(true))));
2858
2859 let mut enabled_relayer = relayer_model.clone();
2861 enabled_relayer.system_disabled = false;
2862 relayer_repo
2863 .expect_enable_relayer()
2864 .with(eq("test-relayer-id".to_string()))
2865 .returning(move |_| Ok(enabled_relayer.clone()));
2866
2867 let relayer = EvmRelayer::new(
2868 relayer_model,
2869 signer,
2870 provider,
2871 create_test_evm_network(),
2872 Arc::new(relayer_repo),
2873 Arc::new(network_repo),
2874 Arc::new(tx_repo),
2875 Arc::new(counter),
2876 Arc::new(job_producer),
2877 )
2878 .unwrap();
2879
2880 let result = relayer.initialize_relayer().await;
2881 assert!(result.is_ok());
2882 }
2883
2884 #[tokio::test]
2885 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
2886 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
2887 setup_mocks();
2888 let mut relayer_model = create_test_relayer();
2889 relayer_model.system_disabled = false; provider
2893 .expect_get_transaction_count()
2894 .returning(|_| Box::pin(ready(Ok(42u64))));
2895
2896 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2897
2898 counter
2899 .expect_get()
2900 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2901
2902 provider
2903 .expect_get_balance()
2904 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2907 .expect_health_check()
2908 .returning(|| Box::pin(ready(Ok(true))));
2909
2910 let relayer = EvmRelayer::new(
2913 relayer_model,
2914 signer,
2915 provider,
2916 create_test_evm_network(),
2917 Arc::new(relayer_repo),
2918 Arc::new(network_repo),
2919 Arc::new(tx_repo),
2920 Arc::new(counter),
2921 Arc::new(job_producer),
2922 )
2923 .unwrap();
2924
2925 let result = relayer.initialize_relayer().await;
2926 assert!(result.is_ok());
2927 }
2928
2929 #[tokio::test]
2930 async fn test_initialize_relayer_sends_notification_when_disabled() {
2931 let (
2932 mut provider,
2933 mut relayer_repo,
2934 network_repo,
2935 tx_repo,
2936 mut job_producer,
2937 signer,
2938 mut counter,
2939 ) = setup_mocks();
2940 let mut relayer_model = create_test_relayer();
2941 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2943
2944 provider
2946 .expect_get_transaction_count()
2947 .returning(|_| Box::pin(ready(Ok(42u64))));
2948
2949 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2950
2951 counter
2952 .expect_get()
2953 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2954
2955 provider
2956 .expect_get_balance()
2957 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider.expect_health_check().returning(|| {
2960 Box::pin(ready(Err(ProviderError::Other(
2961 "RPC validation failed".to_string(),
2962 ))))
2963 });
2964
2965 let mut disabled_relayer = relayer_model.clone();
2967 disabled_relayer.system_disabled = true;
2968 relayer_repo
2969 .expect_disable_relayer()
2970 .with(eq("test-relayer-id".to_string()), always())
2971 .returning(move |_, _| Ok(disabled_relayer.clone()));
2972
2973 job_producer
2975 .expect_produce_send_notification_job()
2976 .returning(|_, _| Box::pin(ready(Ok(()))));
2977
2978 job_producer
2980 .expect_produce_relayer_health_check_job()
2981 .returning(|_, _| Box::pin(ready(Ok(()))));
2982
2983 let relayer = EvmRelayer::new(
2984 relayer_model,
2985 signer,
2986 provider,
2987 create_test_evm_network(),
2988 Arc::new(relayer_repo),
2989 Arc::new(network_repo),
2990 Arc::new(tx_repo),
2991 Arc::new(counter),
2992 Arc::new(job_producer),
2993 )
2994 .unwrap();
2995
2996 let result = relayer.initialize_relayer().await;
2997 assert!(result.is_ok());
2998 }
2999
3000 #[tokio::test]
3001 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
3002 let (
3003 mut provider,
3004 mut relayer_repo,
3005 network_repo,
3006 tx_repo,
3007 mut job_producer,
3008 signer,
3009 mut counter,
3010 ) = setup_mocks();
3011 let mut relayer_model = create_test_relayer();
3012 relayer_model.system_disabled = false; relayer_model.notification_id = None; provider
3017 .expect_get_transaction_count()
3018 .returning(|_| Box::pin(ready(Ok(42u64))));
3019
3020 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
3021
3022 counter
3023 .expect_get()
3024 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
3025
3026 provider
3027 .expect_get_balance()
3028 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); provider
3031 .expect_health_check()
3032 .returning(|| Box::pin(ready(Ok(true))));
3033
3034 let mut disabled_relayer = relayer_model.clone();
3036 disabled_relayer.system_disabled = true;
3037 relayer_repo
3038 .expect_disable_relayer()
3039 .with(eq("test-relayer-id".to_string()), always())
3040 .returning(move |_, _| Ok(disabled_relayer.clone()));
3041
3042 job_producer
3045 .expect_produce_relayer_health_check_job()
3046 .returning(|_, _| Box::pin(ready(Ok(()))));
3047
3048 let relayer = EvmRelayer::new(
3049 relayer_model,
3050 signer,
3051 provider,
3052 create_test_evm_network(),
3053 Arc::new(relayer_repo),
3054 Arc::new(network_repo),
3055 Arc::new(tx_repo),
3056 Arc::new(counter),
3057 Arc::new(job_producer),
3058 )
3059 .unwrap();
3060
3061 let result = relayer.initialize_relayer().await;
3062 assert!(result.is_ok());
3063 }
3064}