1use crate::{
7 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
8 domain::{get_network_relayer, Relayer},
9 jobs::{handle_result, Job, JobProducerTrait, RelayerHealthCheck},
10 models::{
11 produce_relayer_enabled_payload, DefaultAppState, DisabledReason, NetworkRepoModel,
12 NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
13 TransactionRepoModel,
14 },
15 observability::request_id::set_request_id,
16 queues::{HandlerError, WorkerContext},
17 repositories::{
18 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
19 Repository, TransactionCounterTrait, TransactionRepository,
20 },
21 utils::calculate_scheduled_timestamp,
22};
23use actix_web::web::ThinData;
24use eyre::Result;
25use std::time::Duration;
26use tracing::{debug, info, instrument, warn};
27
28#[instrument(
53 level = "debug",
54 skip(job, app_state),
55 fields(
56 request_id = ?job.request_id,
57 job_id = %job.message_id,
58 job_type = %job.job_type.to_string(),
59 attempt = %ctx.attempt,
60 relayer_id = %job.data.relayer_id,
61 task_id = %ctx.task_id,
62 )
63)]
64pub async fn relayer_health_check_handler(
65 job: Job<RelayerHealthCheck>,
66 app_state: ThinData<DefaultAppState>,
67 ctx: WorkerContext,
68) -> Result<(), HandlerError> {
69 if let Some(request_id) = job.request_id.clone() {
70 set_request_id(request_id);
71 }
72
73 relayer_health_check_handler_impl(job, app_state, ctx).await
74}
75
76#[allow(clippy::type_complexity)]
78async fn relayer_health_check_handler_impl<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
79 job: Job<RelayerHealthCheck>,
80 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
81 ctx: WorkerContext,
82) -> Result<(), HandlerError>
83where
84 J: JobProducerTrait + Send + Sync + 'static,
85 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
86 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
87 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
88 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
89 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
90 TCR: TransactionCounterTrait + Send + Sync + 'static,
91 PR: PluginRepositoryTrait + Send + Sync + 'static,
92 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
93{
94 let result = check_and_reenable_relayer(job.data, &app_state).await;
95 handle_result(
96 result,
97 &ctx,
98 "relayer_health_check",
99 WORKER_DEFAULT_MAXIMUM_RETRIES,
100 )
101}
102
103async fn check_and_reenable_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
104 data: RelayerHealthCheck,
105 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
106) -> Result<()>
107where
108 J: JobProducerTrait + Send + Sync + 'static,
109 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
110 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
111 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
112 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
113 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
114 TCR: TransactionCounterTrait + Send + Sync + 'static,
115 PR: PluginRepositoryTrait + Send + Sync + 'static,
116 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
117{
118 let relayer_id = data.relayer_id.clone();
119
120 debug!(
121 relayer_id = %relayer_id,
122 retry_count = data.retry_count,
123 "Running health check"
124 );
125
126 if let Some(metadata) = &data.metadata {
130 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
131 .await
132 .map_err(|e| eyre::eyre!("Failed to get relayer for targeted action: {}", e))?;
133
134 match relayer_service.handle_health_action(metadata).await {
135 Ok(true) => return Ok(()),
136 Ok(false) => { }
137 Err(e) => return Err(eyre::eyre!("Targeted health action failed: {}", e)),
138 }
139 }
140
141 let relayer = app_state
143 .relayer_repository
144 .get_by_id(relayer_id.clone())
145 .await
146 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
147
148 if !relayer.system_disabled {
149 info!(
150 relayer_id = %relayer_id,
151 "Relayer is not disabled, skipping health check"
152 );
153 return Ok(());
154 }
155
156 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
158 .await
159 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
160
161 match relayer_service.check_health().await {
163 Ok(_) => {
164 info!(
166 relayer_id = %relayer_id,
167 retry_count = data.retry_count,
168 "Health checks passed, re-enabling relayer"
169 );
170
171 let enabled_relayer = app_state
173 .relayer_repository
174 .enable_relayer(relayer_id.clone())
175 .await
176 .map_err(|e| eyre::eyre!("Failed to enable relayer: {}", e))?;
177
178 if let Some(notification_id) = &enabled_relayer.notification_id {
180 app_state
181 .job_producer
182 .produce_send_notification_job(
183 produce_relayer_enabled_payload(
184 notification_id,
185 &enabled_relayer,
186 data.retry_count,
187 ),
188 None,
189 )
190 .await
191 .map_err(|e| eyre::eyre!("Failed to send notification: {}", e))?;
192
193 info!(
194 relayer_id = %relayer_id,
195 notification_id = %notification_id,
196 "Sent relayer recovery notification"
197 );
198 }
199
200 Ok(())
201 }
202 Err(failures) => {
203 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
205 DisabledReason::RpcValidationFailed("Unknown error".to_string())
206 });
207
208 warn!(
209 relayer_id = %relayer_id,
210 retry_count = data.retry_count,
211 reason = %reason,
212 "Health checks failed, scheduling retry"
213 );
214
215 let should_update = match &relayer.disabled_reason {
218 Some(old_reason) => !old_reason.same_variant(&reason),
219 None => true, };
221
222 if should_update {
223 debug!(
224 relayer_id = %relayer_id,
225 old_reason = ?relayer.disabled_reason,
226 new_reason = %reason,
227 "Disabled reason variant has changed, updating"
228 );
229
230 app_state
231 .relayer_repository
232 .disable_relayer(relayer_id.clone(), reason.clone())
233 .await
234 .map_err(|e| eyre::eyre!("Failed to update disabled reason: {}", e))?;
235 } else {
236 debug!(
237 relayer_id = %relayer_id,
238 reason = %reason,
239 "Disabled reason variant unchanged, skipping update"
240 );
241 }
242
243 let delay = calculate_backoff_delay(data.retry_count);
245
246 debug!(
247 relayer_id = %relayer_id,
248 next_retry = data.retry_count + 1,
249 delay_seconds = delay.as_secs(),
250 "Scheduling next health check attempt"
251 );
252
253 app_state
255 .job_producer
256 .produce_relayer_health_check_job(
257 RelayerHealthCheck::with_retry_count(relayer_id, data.retry_count + 1),
258 Some(calculate_scheduled_timestamp(delay.as_secs() as i64)),
259 )
260 .await
261 .map_err(|e| eyre::eyre!("Failed to schedule retry: {}", e))?;
262
263 Ok(())
264 }
265 }
266}
267
268fn calculate_backoff_delay(retry_count: u32) -> Duration {
277 let seconds = match retry_count {
278 0 => 10,
279 1 => 20,
280 2 => 30,
281 3 => 45,
282 _ => 60, };
284 Duration::from_secs(seconds)
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::models::{
291 DisabledReason, NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
292 };
293
294 #[test]
295 fn test_calculate_backoff_delay() {
296 assert_eq!(calculate_backoff_delay(0), Duration::from_secs(10)); assert_eq!(calculate_backoff_delay(1), Duration::from_secs(20)); assert_eq!(calculate_backoff_delay(2), Duration::from_secs(30)); assert_eq!(calculate_backoff_delay(3), Duration::from_secs(45)); assert_eq!(calculate_backoff_delay(4), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(10), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(100), Duration::from_secs(60)); }
304
305 #[test]
306 fn test_relayer_health_check_creation() {
307 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
308 assert_eq!(health_check.relayer_id, "test-relayer");
309 assert_eq!(health_check.retry_count, 0);
310
311 let health_check_with_retry =
312 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 3);
313 assert_eq!(health_check_with_retry.relayer_id, "test-relayer");
314 assert_eq!(health_check_with_retry.retry_count, 3);
315 }
316
317 fn create_disabled_relayer(id: &str) -> RelayerRepoModel {
318 RelayerRepoModel {
319 id: id.to_string(),
320 name: format!("Relayer {id}"),
321 network: "sepolia".to_string(),
322 paused: false,
323 network_type: NetworkType::Evm,
324 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
325 gas_price_cap: None,
326 whitelist_receivers: None,
327 eip1559_pricing: Some(false),
328 private_transactions: Some(false),
329 min_balance: Some(0),
330 gas_limit_estimation: Some(false),
331 }),
332 signer_id: "test-signer".to_string(),
333 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
334 notification_id: Some("test-notification".to_string()),
335 system_disabled: true,
336 disabled_reason: Some(DisabledReason::RpcValidationFailed(
337 "RPC unavailable".to_string(),
338 )),
339 custom_rpc_urls: None,
340 }
341 }
342
343 #[tokio::test]
344 async fn test_health_check_data_structure() {
345 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
347 assert_eq!(health_check.relayer_id, "test-relayer");
348 assert_eq!(health_check.retry_count, 0);
349
350 let health_check_retry =
352 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 5);
353 assert_eq!(health_check_retry.retry_count, 5);
354
355 let expected_delay = calculate_backoff_delay(5);
357 assert_eq!(expected_delay, Duration::from_secs(60)); }
359
360 #[tokio::test]
362 async fn test_relayer_health_check_handler_impl_exits_on_enabled() {
363 use crate::jobs::MockJobProducerTrait;
364 use crate::models::AppState;
365 use crate::repositories::{
366 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
367 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
368 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
369 };
370 use std::sync::Arc;
371
372 let mock_job_producer = MockJobProducerTrait::new();
374
375 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
377
378 let mut relayer = create_disabled_relayer("test-handler-enabled");
380 relayer.system_disabled = false;
381 relayer.disabled_reason = None;
382 relayer_repo.create(relayer).await.unwrap();
383
384 let app_state = actix_web::web::ThinData(AppState {
386 relayer_repository: relayer_repo,
387 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
388 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
389 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
390 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
391 transaction_counter_store: Arc::new(
392 TransactionCounterRepositoryStorage::new_in_memory(),
393 ),
394 job_producer: Arc::new(mock_job_producer),
395 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
396 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
397 });
398
399 let health_check = RelayerHealthCheck::new("test-handler-enabled".to_string());
401 let job = Job::new(crate::jobs::JobType::RelayerHealthCheck, health_check);
402 let ctx = WorkerContext::new(1, "test-task".into());
403
404 let result = relayer_health_check_handler_impl(job, app_state, ctx).await;
406
407 assert!(result.is_ok());
409 }
410
411 #[tokio::test]
412 async fn test_relayer_health_check_backoff_progression() {
413 let delays: Vec<Duration> = (0..6).map(calculate_backoff_delay).collect();
415
416 assert_eq!(delays[0], Duration::from_secs(10)); assert_eq!(delays[1], Duration::from_secs(20)); assert_eq!(delays[2], Duration::from_secs(30)); assert_eq!(delays[3], Duration::from_secs(45)); assert_eq!(delays[4], Duration::from_secs(60)); assert_eq!(delays[5], Duration::from_secs(60)); for i in 0..4 {
426 assert!(
427 delays[i] < delays[i + 1],
428 "Delay should increase with retry count"
429 );
430 }
431
432 assert_eq!(delays[4], delays[5], "Delay should cap at 60 seconds");
434 }
435
436 #[tokio::test]
437 async fn test_disabled_reason_is_preserved() {
438 use crate::repositories::RelayerRepositoryStorage;
440 let repo = RelayerRepositoryStorage::new_in_memory();
441
442 let relayer = create_disabled_relayer("test-relayer-2");
443 let disabled_reason = relayer.disabled_reason.clone();
444
445 repo.create(relayer).await.unwrap();
446
447 let retrieved = repo.get_by_id("test-relayer-2".to_string()).await.unwrap();
449
450 assert!(retrieved.system_disabled);
451 assert_eq!(retrieved.disabled_reason, disabled_reason);
452
453 if let Some(reason) = &retrieved.disabled_reason {
455 let description = reason.description();
456 assert!(description.contains("RPC"));
457 }
458 }
459
460 #[tokio::test]
461 async fn test_check_and_reenable_relayer_exits_early_if_not_disabled() {
462 use crate::jobs::MockJobProducerTrait;
463 use crate::models::AppState;
464 use crate::repositories::{
465 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
466 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
467 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
468 };
469 use std::sync::Arc;
470
471 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
473
474 let mut relayer = create_disabled_relayer("test-check-enabled");
476 relayer.system_disabled = false;
477 relayer.disabled_reason = None;
478 relayer_repo.create(relayer).await.unwrap();
479
480 let mock_job_producer = MockJobProducerTrait::new();
482
483 let app_state = AppState {
485 relayer_repository: relayer_repo.clone(),
486 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
487 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
488 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
489 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
490 transaction_counter_store: Arc::new(
491 TransactionCounterRepositoryStorage::new_in_memory(),
492 ),
493 job_producer: Arc::new(mock_job_producer),
494 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
495 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
496 };
497
498 let health_check = RelayerHealthCheck::new("test-check-enabled".to_string());
500
501 let thin_app_state = actix_web::web::ThinData(app_state);
503
504 let result = check_and_reenable_relayer(health_check, &thin_app_state).await;
506
507 assert!(result.is_ok());
509
510 let retrieved = relayer_repo
512 .get_by_id("test-check-enabled".to_string())
513 .await
514 .unwrap();
515 assert!(!retrieved.system_disabled);
516 assert!(retrieved.disabled_reason.is_none());
517 }
518
519 #[tokio::test]
520 async fn test_check_and_reenable_variant_comparison() {
521 use crate::models::DisabledReason;
523
524 let reason1 = DisabledReason::RpcValidationFailed("Error A".to_string());
526 let reason2 = DisabledReason::RpcValidationFailed("Error B".to_string());
527 assert!(reason1.same_variant(&reason2));
528
529 let reason3 = DisabledReason::NonceSyncFailed("Error".to_string());
531 assert!(!reason1.same_variant(&reason3));
532
533 let multi1 = DisabledReason::Multiple(vec![
535 DisabledReason::RpcValidationFailed("A".to_string()),
536 DisabledReason::NonceSyncFailed("B".to_string()),
537 ]);
538 let multi2 = DisabledReason::Multiple(vec![
539 DisabledReason::RpcValidationFailed("C".to_string()),
540 DisabledReason::NonceSyncFailed("D".to_string()),
541 ]);
542 assert!(multi1.same_variant(&multi2));
543
544 let multi3 = DisabledReason::Multiple(vec![
546 DisabledReason::RpcValidationFailed("A".to_string()),
547 DisabledReason::BalanceCheckFailed("B".to_string()),
548 ]);
549 assert!(!multi1.same_variant(&multi3));
550 }
551
552 #[tokio::test]
553 async fn test_backoff_delay_calculation_edge_cases() {
554 let delay0 = calculate_backoff_delay(0);
558 assert_eq!(delay0, Duration::from_secs(10));
559
560 let delay_large = calculate_backoff_delay(100);
562 assert_eq!(delay_large, Duration::from_secs(60));
563
564 let mut prev_delay = Duration::from_secs(0);
566 for retry in 0..10 {
567 let delay = calculate_backoff_delay(retry);
568 if delay < Duration::from_secs(60) {
569 assert!(delay > prev_delay, "Retry {retry}: delay should increase");
571 } else {
572 assert_eq!(
574 delay,
575 Duration::from_secs(60),
576 "Retry {retry}: should cap at 60s"
577 );
578 }
579 prev_delay = delay;
580 }
581 }
582
583 #[tokio::test]
584 async fn test_disabled_reason_from_health_failures() {
585 use crate::models::{DisabledReason, HealthCheckFailure};
586
587 let empty_result = DisabledReason::from_health_failures(vec![]);
589 assert!(empty_result.is_none());
590
591 let single_failure = vec![HealthCheckFailure::RpcValidationFailed(
593 "RPC down".to_string(),
594 )];
595 let single_result = DisabledReason::from_health_failures(single_failure);
596 assert!(single_result.is_some());
597 match single_result.unwrap() {
598 DisabledReason::RpcValidationFailed(msg) => {
599 assert_eq!(msg, "RPC down");
600 }
601 _ => panic!("Expected RpcValidationFailed variant"),
602 }
603
604 let multiple_failures = vec![
606 HealthCheckFailure::RpcValidationFailed("RPC error".to_string()),
607 HealthCheckFailure::NonceSyncFailed("Nonce error".to_string()),
608 ];
609 let multiple_result = DisabledReason::from_health_failures(multiple_failures);
610 assert!(multiple_result.is_some());
611 match multiple_result.unwrap() {
612 DisabledReason::Multiple(reasons) => {
613 assert_eq!(reasons.len(), 2);
614 assert!(matches!(reasons[0], DisabledReason::RpcValidationFailed(_)));
615 assert!(matches!(reasons[1], DisabledReason::NonceSyncFailed(_)));
616 }
617 _ => panic!("Expected Multiple variant"),
618 }
619 }
620
621 #[tokio::test]
622 async fn test_relayer_health_check_retry_count_increments() {
623 let retry_counts = vec![0, 1, 2, 5, 10];
625
626 for retry_count in retry_counts {
627 let health_check =
628 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count);
629
630 assert_eq!(health_check.retry_count, retry_count);
632
633 let next_health_check =
635 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count + 1);
636 assert_eq!(next_health_check.retry_count, retry_count + 1);
637
638 let current_delay = calculate_backoff_delay(retry_count);
640 let next_delay = calculate_backoff_delay(retry_count + 1);
641
642 if current_delay < Duration::from_secs(60) {
643 assert!(next_delay >= current_delay);
644 } else {
645 assert_eq!(next_delay, Duration::from_secs(60));
646 }
647 }
648 }
649
650 #[tokio::test]
651 async fn test_repository_enable_disable_operations() {
652 use crate::models::DisabledReason;
653 use crate::repositories::{RelayerRepositoryStorage, Repository};
654
655 let repo = RelayerRepositoryStorage::new_in_memory();
656
657 let mut relayer = create_disabled_relayer("test-enable-disable");
659 relayer.system_disabled = false;
660 relayer.disabled_reason = None;
661 repo.create(relayer).await.unwrap();
662
663 let reason = DisabledReason::RpcValidationFailed("Test error".to_string());
665 let disabled = repo
666 .disable_relayer("test-enable-disable".to_string(), reason.clone())
667 .await
668 .unwrap();
669
670 assert!(disabled.system_disabled);
671 assert_eq!(disabled.disabled_reason, Some(reason));
672
673 let enabled = repo
675 .enable_relayer("test-enable-disable".to_string())
676 .await
677 .unwrap();
678
679 assert!(!enabled.system_disabled);
680 assert!(enabled.disabled_reason.is_none());
681
682 let retrieved = repo
684 .get_by_id("test-enable-disable".to_string())
685 .await
686 .unwrap();
687 assert!(!retrieved.system_disabled);
688 assert!(retrieved.disabled_reason.is_none());
689 }
690
691 #[tokio::test]
692 async fn test_disabled_reason_safe_description() {
693 use crate::models::DisabledReason;
694
695 let reasons = vec![
697 DisabledReason::NonceSyncFailed("Error with API key abc123".to_string()),
698 DisabledReason::RpcValidationFailed(
699 "RPC error: http://secret-rpc.com:8545".to_string(),
700 ),
701 DisabledReason::BalanceCheckFailed("Balance: 1.5 ETH at address 0x123...".to_string()),
702 ];
703
704 for reason in reasons {
705 let safe_desc = reason.safe_description();
706
707 assert!(!safe_desc.contains("abc123"));
709 assert!(!safe_desc.contains("http://"));
710 assert!(!safe_desc.contains("0x123"));
711 assert!(!safe_desc.contains("1.5 ETH"));
712
713 assert!(!safe_desc.is_empty());
715 }
716
717 let multiple = DisabledReason::Multiple(vec![
719 DisabledReason::RpcValidationFailed("Secret RPC info".to_string()),
720 DisabledReason::NonceSyncFailed("Secret nonce info".to_string()),
721 ]);
722
723 let safe_desc = multiple.safe_description();
724 assert!(!safe_desc.contains("Secret"));
725 assert!(safe_desc.contains("RPC endpoint validation failed"));
726 assert!(safe_desc.contains("Nonce synchronization failed"));
727 }
728}