diff --git a/cmd/flags.go b/cmd/flags.go index e2e82e3d80..1888b70aa3 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -344,6 +344,12 @@ func initCovenantSignerFlags(cmd *cobra.Command, cfg *config.Config) { false, "Fail startup when enabled covenant routes are missing route-level approval trust roots. Request-time validation still enforces exact reserve/network trust-root matches.", ) + cmd.Flags().BoolVar( + &cfg.CovenantSigner.BridgeCovenantFraudDefenseConfirmed, + "covenantSigner.bridgeCovenantFraudDefenseConfirmed", + false, + "Set only after confirming the tBTC Bridge covenant fraud-defense path is deployed. Until set, the covenant signer fails closed and refuses to sign, because a covenant signature would otherwise expose the wallet to an undefeatable fraud challenge.", + ) } // Initialize flags for Maintainer configuration. diff --git a/cmd/flags_test.go b/cmd/flags_test.go index 29ccddd53a..0a248bbe25 100644 --- a/cmd/flags_test.go +++ b/cmd/flags_test.go @@ -226,6 +226,13 @@ var cmdFlagsTests = map[string]struct { expectedValueFromFlag: true, defaultValue: false, }, + "covenantSigner.bridgeCovenantFraudDefenseConfirmed": { + readValueFunc: func(c *config.Config) interface{} { return c.CovenantSigner.BridgeCovenantFraudDefenseConfirmed }, + flagName: "--covenantSigner.bridgeCovenantFraudDefenseConfirmed", + flagValue: "", + expectedValueFromFlag: true, + defaultValue: false, + }, "tbtc.preParamsPoolSize": { readValueFunc: func(c *config.Config) interface{} { return c.Tbtc.PreParamsPoolSize }, flagName: "--tbtc.preParamsPoolSize", diff --git a/cmd/start.go b/cmd/start.go index 71d8d2fbdf..b07860fe3b 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -100,6 +100,7 @@ type startDeps struct { clientInfoRegistry *clientinfo.Registry, perfMetrics *clientinfo.PerformanceMetrics, minActiveOutpointConfirmations uint, + bridgeCovenantFraudDefenseConfirmed bool, ) (covenantsigner.Engine, error) initializeSigner func( ctx context.Context, @@ -254,6 +255,7 @@ func startWithDeps(cmd *cobra.Command, deps startDeps) error { clientInfoRegistry, perfMetrics, // Pass the existing performance metrics instance to avoid duplicate registrations clientConfig.CovenantSigner.MinActiveOutpointConfirmations, + clientConfig.CovenantSigner.BridgeCovenantFraudDefenseConfirmed, ) if err != nil { return fmt.Errorf("error initializing TBTC: [%v]", err) diff --git a/pkg/chain/ethereum/tbtc.go b/pkg/chain/ethereum/tbtc.go index 4ef78e8600..54e7bd4c3e 100644 --- a/pkg/chain/ethereum/tbtc.go +++ b/pkg/chain/ethereum/tbtc.go @@ -1251,6 +1251,32 @@ func (tc *TbtcChain) PastDepositRevealedEvents( return convertedEvents, err } +// convertPastRedemptionRequestedEvent maps a single on-chain RedemptionRequested +// event to its off-chain tbtc.RedemptionRequestedEvent representation. The event +// carries two distinct fee fields, TreasuryFee and TxMaxFee, that must be mapped +// to their matching destination fields independently; keeping the mapping in a +// standalone helper lets that field-by-field correspondence be unit tested. +func convertPastRedemptionRequestedEvent( + event *tbtcabi.BridgeRedemptionRequested, +) (*tbtc.RedemptionRequestedEvent, error) { + redeemerOutputScript, err := bitcoin.NewScriptFromVarLenData( + event.RedeemerOutputScript, + ) + if err != nil { + return nil, err + } + + return &tbtc.RedemptionRequestedEvent{ + WalletPublicKeyHash: event.WalletPubKeyHash, + RedeemerOutputScript: redeemerOutputScript, + Redeemer: chain.Address(event.Redeemer.Hex()), + RequestedAmount: event.RequestedAmount, + TreasuryFee: event.TreasuryFee, + TxMaxFee: event.TxMaxFee, + BlockNumber: event.Raw.BlockNumber, + }, nil +} + func (tc *TbtcChain) PastRedemptionRequestedEvents( filter *tbtc.RedemptionRequestedEventFilter, ) ([]*tbtc.RedemptionRequestedEvent, error) { @@ -1282,23 +1308,11 @@ func (tc *TbtcChain) PastRedemptionRequestedEvents( convertedEvents := make([]*tbtc.RedemptionRequestedEvent, 0) for _, event := range events { - redeemerOutputScript, err := bitcoin.NewScriptFromVarLenData( - event.RedeemerOutputScript, - ) + convertedEvent, err := convertPastRedemptionRequestedEvent(event) if err != nil { return nil, err } - convertedEvent := &tbtc.RedemptionRequestedEvent{ - WalletPublicKeyHash: event.WalletPubKeyHash, - RedeemerOutputScript: redeemerOutputScript, - Redeemer: chain.Address(event.Redeemer.Hex()), - RequestedAmount: event.RequestedAmount, - TreasuryFee: event.TreasuryFee, - TxMaxFee: event.TreasuryFee, - BlockNumber: event.Raw.BlockNumber, - } - convertedEvents = append(convertedEvents, convertedEvent) } diff --git a/pkg/chain/ethereum/tbtc_test.go b/pkg/chain/ethereum/tbtc_test.go index 7a9312e9c7..8a6bbfa406 100644 --- a/pkg/chain/ethereum/tbtc_test.go +++ b/pkg/chain/ethereum/tbtc_test.go @@ -15,6 +15,7 @@ import ( "github.com/keep-network/keep-core/pkg/chain" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" commonEthereum "github.com/keep-network/keep-common/pkg/chain/ethereum" "github.com/keep-network/keep-core/internal/testutils" @@ -24,6 +25,91 @@ import ( tbtcpkg "github.com/keep-network/keep-core/pkg/tbtc" ) +func TestConvertPastRedemptionRequestedEvent(t *testing.T) { + // RedeemerOutputScript is stored on-chain as variable-length data (a + // CompactSizeUint length prefix followed by the script bytes), so build the + // fixture from a known script to guarantee the helper can parse it back. + redeemerOutputScript := bitcoin.Script([]byte{0x76, 0xa9, 0x14}) + redeemerOutputScriptVarLen, err := redeemerOutputScript.ToVarLenData() + if err != nil { + t.Fatal(err) + } + + walletPublicKeyHash := [20]byte{1, 2, 3} + redeemer := common.HexToAddress("0x1111111111111111111111111111111111111111") + + // The treasury fee and the per-transaction max fee are two independent + // on-chain fields. They are set to distinct values so that a mapping which + // reads the wrong source (e.g. populating TxMaxFee from TreasuryFee) is + // caught rather than masked by equal values. + const ( + requestedAmount = uint64(1_000_000) + treasuryFee = uint64(2_000) + txMaxFee = uint64(7_500) + blockNumber = uint64(123_456) + ) + + event := &tbtcabi.BridgeRedemptionRequested{ + WalletPubKeyHash: walletPublicKeyHash, + RedeemerOutputScript: redeemerOutputScriptVarLen, + Redeemer: redeemer, + RequestedAmount: requestedAmount, + TreasuryFee: treasuryFee, + TxMaxFee: txMaxFee, + Raw: types.Log{BlockNumber: blockNumber}, + } + + converted, err := convertPastRedemptionRequestedEvent(event) + if err != nil { + t.Fatalf("unexpected error: [%v]", err) + } + + testutils.AssertUintsEqual( + t, + "requested amount", + requestedAmount, + converted.RequestedAmount, + ) + + // Both fee fields must be populated from their own distinct source field. + testutils.AssertUintsEqual( + t, + "treasury fee", + treasuryFee, + converted.TreasuryFee, + ) + testutils.AssertUintsEqual( + t, + "tx max fee", + txMaxFee, + converted.TxMaxFee, + ) + + testutils.AssertUintsEqual( + t, + "block number", + blockNumber, + converted.BlockNumber, + ) + + testutils.AssertBytesEqual(t, redeemerOutputScript, converted.RedeemerOutputScript) + + testutils.AssertStringsEqual( + t, + "redeemer", + chain.Address(redeemer.Hex()).String(), + converted.Redeemer.String(), + ) + + if converted.WalletPublicKeyHash != walletPublicKeyHash { + t.Errorf( + "unexpected wallet public key hash\nexpected: [%x]\nactual: [%x]", + walletPublicKeyHash, + converted.WalletPublicKeyHash, + ) + } +} + func TestComputeOperatorsIDsHash(t *testing.T) { operatorIDs := []chain.OperatorID{ 5, 1, 55, 45435534, 33, 345, 23, 235, 3333, 2, diff --git a/pkg/covenantsigner/config.go b/pkg/covenantsigner/config.go index e11f7fec0a..f8d470f94b 100644 --- a/pkg/covenantsigner/config.go +++ b/pkg/covenantsigner/config.go @@ -37,6 +37,14 @@ type Config struct { // covenant signer accepts it. When zero (unset), the system defaults to 6 // to align with the deposit sweep finality threshold. MinActiveOutpointConfirmations uint `mapstructure:"minActiveOutpointConfirmations"` + // BridgeCovenantFraudDefenseConfirmed must be set only when the operator has + // confirmed that the tBTC Bridge recognizes covenant active UTXO spends as + // honest spends in Fraud.defeatFraudChallenge (the covenant fraud-defense + // path is deployed). Until then the covenant signer fails closed and refuses + // to produce signatures, because a covenant signature over a covenant active + // UTXO would otherwise be a valid, undefeatable tBTC fraud proof that could + // slash the signing wallet. + BridgeCovenantFraudDefenseConfirmed bool `mapstructure:"bridgeCovenantFraudDefenseConfirmed"` // DataDir is the base directory path used by the disk persistence handle. // When set, the store acquires an exclusive file lock to prevent concurrent // process corruption. When empty, file locking is skipped. diff --git a/pkg/covenantsigner/server.go b/pkg/covenantsigner/server.go index 6cee0883b5..e76f735fbf 100644 --- a/pkg/covenantsigner/server.go +++ b/pkg/covenantsigner/server.go @@ -47,7 +47,9 @@ func Initialize( listenAddress = DefaultListenAddress } - if !isLoopbackListenAddress(listenAddress) && strings.TrimSpace(config.AuthToken) == "" { + isLoopback := isLoopbackListenAddress(listenAddress) + + if !isLoopback && strings.TrimSpace(config.AuthToken) == "" { return nil, false, fmt.Errorf( "covenant signer authToken is required for non-loopback listenAddress [%s]", listenAddress, @@ -66,7 +68,12 @@ func Initialize( if err != nil { return nil, false, err } - if err := validateRequiredApprovalTrustRoots(config, service); err != nil { + // A non-loopback (production) listen address is treated as requiring the + // full multi-party approval model: the signer approval verifier and the + // route trust roots must be configured, mirroring the non-loopback authToken + // requirement above. Loopback deployments may still run with warnings unless + // requireApprovalTrustRoots is set explicitly. + if err := validateRequiredApprovalTrustRoots(config, service, !isLoopback); err != nil { return nil, false, err } if service.signerApprovalVerifier == nil { @@ -181,8 +188,9 @@ func Initialize( func validateRequiredApprovalTrustRoots( config Config, service *Service, + requireForNonLoopbackListenAddress bool, ) error { - if !config.RequireApprovalTrustRoots { + if !config.RequireApprovalTrustRoots && !requireForNonLoopbackListenAddress { return nil } @@ -192,7 +200,7 @@ func validateRequiredApprovalTrustRoots( TemplateSelfV1, ) { return fmt.Errorf( - "covenant signer self_v1 routes require depositorTrustRoots when covenantSigner.requireApprovalTrustRoots=true", + "covenant signer self_v1 routes require depositorTrustRoots when covenantSigner.requireApprovalTrustRoots=true or for a non-loopback listen address", ) } @@ -201,7 +209,7 @@ func validateRequiredApprovalTrustRoots( TemplateQcV1, ) { return fmt.Errorf( - "covenant signer qc_v1 routes require depositorTrustRoots when covenantSigner.requireApprovalTrustRoots=true", + "covenant signer qc_v1 routes require depositorTrustRoots when covenantSigner.requireApprovalTrustRoots=true or for a non-loopback listen address", ) } @@ -210,13 +218,13 @@ func validateRequiredApprovalTrustRoots( TemplateQcV1, ) { return fmt.Errorf( - "covenant signer qc_v1 routes require custodianTrustRoots when covenantSigner.requireApprovalTrustRoots=true", + "covenant signer qc_v1 routes require custodianTrustRoots when covenantSigner.requireApprovalTrustRoots=true or for a non-loopback listen address", ) } if service.signerApprovalVerifier == nil { return fmt.Errorf( - "covenant signer requires a signerApprovalVerifier when covenantSigner.requireApprovalTrustRoots=true", + "covenant signer requires a signerApprovalVerifier when covenantSigner.requireApprovalTrustRoots=true or for a non-loopback listen address", ) } diff --git a/pkg/covenantsigner/server_test.go b/pkg/covenantsigner/server_test.go index a81d4c853d..eac02edf06 100644 --- a/pkg/covenantsigner/server_test.go +++ b/pkg/covenantsigner/server_test.go @@ -415,6 +415,67 @@ func TestInitializeRequiresSignerApprovalVerifierWhenConfigured(t *testing.T) { } } +func TestInitializeRequiresTrustRootsForNonLoopbackListenAddress(t *testing.T) { + handle := newMemoryHandle() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // A non-loopback (production) listen address must fail startup when the + // required approval trust roots are missing, even without + // RequireApprovalTrustRoots being set explicitly. The engine provides a + // verifier, so the failure is attributable to the missing trust roots. + _, enabled, err := Initialize( + ctx, + Config{ + Port: availableLoopbackPort(t), + ListenAddress: "0.0.0.0", + AuthToken: "test-token", + }, + handle, + &scriptedVerifierEngine{}, + ) + if err == nil || enabled { + t.Fatalf("expected non-loopback startup without trust roots to fail, got enabled=%v err=%v", enabled, err) + } + if !strings.Contains(err.Error(), "non-loopback listen address") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestInitializeRequiresSignerApprovalVerifierForNonLoopbackListenAddress(t *testing.T) { + handle := newMemoryHandle() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // A non-loopback (production) listen address with all trust roots present + // but no signer approval verifier must fail startup. + _, enabled, err := Initialize( + ctx, + Config{ + Port: availableLoopbackPort(t), + ListenAddress: "0.0.0.0", + AuthToken: "test-token", + DepositorTrustRoots: []DepositorTrustRoot{ + testDepositorTrustRoot(TemplateQcV1), + }, + CustodianTrustRoots: []CustodianTrustRoot{ + testCustodianTrustRoot(TemplateQcV1), + }, + }, + handle, + &scriptedEngine{}, + ) + if err == nil || enabled { + t.Fatalf("expected non-loopback startup without a verifier to fail, got enabled=%v err=%v", enabled, err) + } + if !strings.Contains(err.Error(), "requires a signerApprovalVerifier") { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), "non-loopback listen address") { + t.Fatalf("unexpected error: %v", err) + } +} + func TestIsLoopbackListenAddressAcceptsBracketedIPv6Loopback(t *testing.T) { if !isLoopbackListenAddress("[::1]") { t.Fatal("expected bracketed IPv6 loopback address to be recognized") diff --git a/pkg/covenantsigner/store.go b/pkg/covenantsigner/store.go index 287614ca76..289b1e0c80 100644 --- a/pkg/covenantsigner/store.go +++ b/pkg/covenantsigner/store.go @@ -13,9 +13,16 @@ import ( ) const ( - jobsDirectory = "covenant-signer/jobs" - poisonedDirectory = "covenant-signer/poisoned" - lockFileName = ".lock" + // jobsDirectory is a single-level persistence directory name. It must not + // contain a path separator: the disk persistence handle creates and + // enumerates only one directory level, so a nested name is skipped on + // reload (its descriptor directory is reported as the first-level parent). + // legacyJobsDirectory is the previously used nested name; job files + // persisted under it are migrated to jobsDirectory on startup. + jobsDirectory = "covenant-signer-jobs" + legacyJobsDirectory = "covenant-signer/jobs" + poisonedDirectory = "covenant-signer/poisoned" + lockFileName = ".lock" ) type Store struct { @@ -44,6 +51,17 @@ func NewStore(handle persistence.BasicHandle, dataDir string) (*Store, error) { return nil, err } store.lockFile = lockFile + + if err := migrateLegacyJobsDirectory(dataDir); err != nil { + // Release the lock if migration fails after successful acquisition. + if closeErr := store.Close(); closeErr != nil { + logger.Warnf( + "failed to release store lock after migration failure: [%v]", + closeErr, + ) + } + return nil, err + } } if err := store.load(); err != nil { @@ -106,6 +124,64 @@ func acquireFileLock(dataDir string) (*os.File, error) { return lockFile, nil } +// migrateLegacyJobsDirectory moves persisted job files from the previously used +// nested legacyJobsDirectory into the flat jobsDirectory. The nested directory +// could not be reliably reloaded by the single-level disk persistence handle, +// so any job files an operator managed to persist under it would otherwise be +// silently skipped on startup. Migration is best-effort per file and +// idempotent: files already present in the destination are left untouched, and +// a missing legacy directory is not an error. +func migrateLegacyJobsDirectory(dataDir string) error { + legacyDir := filepath.Join(dataDir, legacyJobsDirectory) + + entries, err := os.ReadDir(legacyDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf( + "cannot read legacy covenant signer jobs directory [%s]: %w", + legacyDir, + err, + ) + } + + newDir := filepath.Join(dataDir, jobsDirectory) + if err := os.MkdirAll(newDir, 0700); err != nil { + return fmt.Errorf( + "cannot create covenant signer jobs directory [%s]: %w", + newDir, + err, + ) + } + + for _, entry := range entries { + // Only migrate persisted job files; skip subdirectories and the lock + // file left behind under the legacy path. + if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" { + continue + } + + newPath := filepath.Join(newDir, entry.Name()) + // Do not clobber a file already present in the destination. + if _, err := os.Stat(newPath); err == nil { + continue + } + + oldPath := filepath.Join(legacyDir, entry.Name()) + if err := os.Rename(oldPath, newPath); err != nil { + return fmt.Errorf( + "cannot migrate covenant signer job file [%s] to [%s]: %w", + oldPath, + newPath, + err, + ) + } + } + + return nil +} + // Close releases the exclusive file lock and closes the underlying lock file // descriptor. For stores created without a dataDir (in-memory handles), Close // is a safe no-op. Close is idempotent. @@ -184,6 +260,13 @@ func (s *Store) load() error { continue } + // Skip non-job files that share the jobs directory, such as the + // store lock file. Persisted jobs are always saved with a .json + // extension. + if filepath.Ext(descriptor.Name()) != ".json" { + continue + } + content, err := descriptor.Content() if err != nil { return fmt.Errorf( diff --git a/pkg/covenantsigner/store_disk_test.go b/pkg/covenantsigner/store_disk_test.go new file mode 100644 index 0000000000..01a7a474d2 --- /dev/null +++ b/pkg/covenantsigner/store_disk_test.go @@ -0,0 +1,143 @@ +package covenantsigner + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/keep-network/keep-common/pkg/persistence" +) + +func newDiskTestJob(suffix string) *Job { + return &Job{ + RequestID: "kcs_self_" + suffix, + RouteRequestID: "ors_" + suffix, + Route: TemplateSelfV1, + IdempotencyKey: "idem_" + suffix, + FacadeRequestID: "rf_" + suffix, + RequestDigest: "0xdeadbeef", + State: JobStatePending, + Detail: "queued", + CreatedAt: "2026-03-09T00:00:00Z", + UpdatedAt: "2026-03-09T00:00:00Z", + Request: baseRequest(TemplateSelfV1), + } +} + +// TestStoreReloadPreservesJobsOnDisk exercises the store against a real +// disk-backed persistence handle (not the in-memory handle used elsewhere). +// The disk handle creates and enumerates only a single directory level, so a +// nested jobs directory name would make persisted jobs unrecoverable after a +// restart. This test persists a job, restarts the store over the same data +// directory, and asserts both the request ID and route request indexes are +// restored. +func TestStoreReloadPreservesJobsOnDisk(t *testing.T) { + dataDir := t.TempDir() + + handle, err := persistence.NewBasicDiskHandle(dataDir) + if err != nil { + t.Fatal(err) + } + store, err := NewStore(handle, dataDir) + if err != nil { + t.Fatal(err) + } + + job := newDiskTestJob("disk") + if err := store.Put(job); err != nil { + t.Fatal(err) + } + if err := store.Close(); err != nil { + t.Fatal(err) + } + + // Restart the store over the same data directory. + reloadHandle, err := persistence.NewBasicDiskHandle(dataDir) + if err != nil { + t.Fatal(err) + } + reloaded, err := NewStore(reloadHandle, dataDir) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { reloaded.Close() }) + + // The route request index must be restored. + byRoute, ok, err := reloaded.GetByRouteRequest(TemplateSelfV1, job.RouteRequestID) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("expected the route request index to be restored after restart") + } + if byRoute.RequestID != job.RequestID { + t.Fatalf("unexpected request ID from route index: %s", byRoute.RequestID) + } + + // The request ID index must be restored. + byID, ok, err := reloaded.GetByRequestID(job.RequestID) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("expected the request ID index to be restored after restart") + } + if byID.RouteRequestID != job.RouteRequestID { + t.Fatalf("unexpected route request ID from request index: %s", byID.RouteRequestID) + } +} + +// TestStoreMigratesLegacyNestedJobs asserts that a job file left under the +// previously used nested directory is migrated to the flat jobs directory on +// startup and becomes reloadable. +func TestStoreMigratesLegacyNestedJobs(t *testing.T) { + dataDir := t.TempDir() + + // Simulate a job persisted under the legacy nested directory. + legacyDir := filepath.Join(dataDir, legacyJobsDirectory) + if err := os.MkdirAll(legacyDir, 0700); err != nil { + t.Fatal(err) + } + job := newDiskTestJob("legacy") + payload, err := json.Marshal(job) + if err != nil { + t.Fatal(err) + } + legacyFile := filepath.Join(legacyDir, job.RequestID+".json") + if err := os.WriteFile(legacyFile, payload, 0600); err != nil { + t.Fatal(err) + } + + handle, err := persistence.NewBasicDiskHandle(dataDir) + if err != nil { + t.Fatal(err) + } + store, err := NewStore(handle, dataDir) // triggers migration and load + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { store.Close() }) + + // The migrated job must be restored in both indexes. + if _, ok, err := store.GetByRequestID(job.RequestID); err != nil { + t.Fatal(err) + } else if !ok { + t.Fatal("expected the legacy job to be migrated and restored") + } + if _, ok, err := store.GetByRouteRequest(TemplateSelfV1, job.RouteRequestID); err != nil { + t.Fatal(err) + } else if !ok { + t.Fatal("expected the legacy route request index to be restored") + } + + // The file must now live under the flat jobs directory. + newFile := filepath.Join(dataDir, jobsDirectory, job.RequestID+".json") + if _, err := os.Stat(newFile); err != nil { + t.Fatalf("expected migrated file at %s: %v", newFile, err) + } + // The legacy file must no longer be present. + if _, err := os.Stat(legacyFile); !os.IsNotExist(err) { + t.Fatalf("expected legacy file at %s to be moved away", legacyFile) + } +} diff --git a/pkg/maintainer/spv/bitcoin_chain_test.go b/pkg/maintainer/spv/bitcoin_chain_test.go index 2f790bf11f..fc11f9d122 100644 --- a/pkg/maintainer/spv/bitcoin_chain_test.go +++ b/pkg/maintainer/spv/bitcoin_chain_test.go @@ -151,7 +151,34 @@ func (lbc *localBitcoinChain) GetTransactionsForPublicKeyHash( func (lbc *localBitcoinChain) GetTxHashesForPublicKeyHash( publicKeyHash [20]byte, ) ([]bitcoin.Hash, error) { - panic("unsupported") + lbc.mutex.Lock() + defer lbc.mutex.Unlock() + + p2pkh, err := bitcoin.PayToPublicKeyHash(publicKeyHash) + if err != nil { + return nil, err + } + + p2wpkh, err := bitcoin.PayToWitnessPublicKeyHash(publicKeyHash) + if err != nil { + return nil, err + } + + // Return matching transaction hashes in insertion order, which the tests + // use as ascending block height order, mirroring GetTransactionsForPublicKeyHash. + matchingHashes := make([]bitcoin.Hash, 0) + + for _, transaction := range lbc.transactions { + for _, output := range transaction.Outputs { + script := output.PublicKeyScript + if bytes.Equal(script, p2pkh) || bytes.Equal(script, p2wpkh) { + matchingHashes = append(matchingHashes, transaction.Hash()) + break + } + } + } + + return matchingHashes, nil } func (lbc *localBitcoinChain) GetMempoolForPublicKeyHash(publicKeyHash [20]byte) ( diff --git a/pkg/maintainer/spv/deposit_sweep.go b/pkg/maintainer/spv/deposit_sweep.go index 2b0b8a5f77..530a006ec5 100644 --- a/pkg/maintainer/spv/deposit_sweep.go +++ b/pkg/maintainer/spv/deposit_sweep.go @@ -311,40 +311,35 @@ func getUnprovenDepositSweepTransactions( continue } - walletTransactions, err := btcChain.GetTransactionsForPublicKeyHash( + walletUnprovenTransactions, err := getUnprovenWalletTransactions( walletPublicKeyHash, transactionLimit, - ) - if err != nil { - return nil, fmt.Errorf( - "failed to get transactions for wallet: [%v]", - err, - ) - } - - for _, transaction := range walletTransactions { - isUnproven, err := - isUnprovenDepositSweepTransaction( + btcChain, + func(transaction *bitcoin.Transaction) (bool, error) { + isUnproven, err := isUnprovenDepositSweepTransaction( transaction, walletPublicKeyHash, btcChain, spvChain, ) - if err != nil { - return nil, fmt.Errorf( - "failed to check if transaction is an unproven deposit sweep "+ - "transaction: [%v]", - err, - ) - } - - if isUnproven { - unprovenDepositSweepTransactions = append( - unprovenDepositSweepTransactions, - transaction, - ) - } + if err != nil { + return false, fmt.Errorf( + "failed to check if transaction is an unproven deposit "+ + "sweep transaction: [%v]", + err, + ) + } + return isUnproven, nil + }, + ) + if err != nil { + return nil, err } + + unprovenDepositSweepTransactions = append( + unprovenDepositSweepTransactions, + walletUnprovenTransactions..., + ) } return unprovenDepositSweepTransactions, nil diff --git a/pkg/maintainer/spv/moved_funds_sweep.go b/pkg/maintainer/spv/moved_funds_sweep.go index 417a1f5347..b6f7b05f74 100644 --- a/pkg/maintainer/spv/moved_funds_sweep.go +++ b/pkg/maintainer/spv/moved_funds_sweep.go @@ -210,46 +210,40 @@ func getUnprovenMovedFundsSweepTransactions( // When wallet makes a moved funds sweep transaction, it transfers // funds to itself. Therefore we can search all the transactions that - // pay to the wallet's public key hash. - walletTransactions, err := btcChain.GetTransactionsForPublicKeyHash( + // pay to the wallet's public key hash. A wallet can have only one + // unproven moved funds sweep transaction at a time, so the scan is + // bounded to a single match: it stops at the first unproven moved funds + // sweep transaction it finds, scanning past any unrelated (e.g. spam) + // transactions along the way. + walletUnprovenTransactions, err := getUnprovenWalletTransactions( walletPublicKeyHash, - transactionLimit, - ) - if err != nil { - return nil, fmt.Errorf( - "failed to get transactions for wallet: [%v]", - err, - ) - } - - for _, transaction := range walletTransactions { - isUnproven, err := - isUnprovenMovedFundsSweepTransaction( + 1, + btcChain, + func(transaction *bitcoin.Transaction) (bool, error) { + isUnproven, err := isUnprovenMovedFundsSweepTransaction( transaction, walletPublicKeyHash, btcChain, spvChain, ) - if err != nil { - return nil, fmt.Errorf( - "failed to check if transaction is an unproven moved "+ - "funds sweep transaction: [%v]", - err, - ) - } - - if isUnproven { - unprovenMovedFundsSweepTransactions = append( - unprovenMovedFundsSweepTransactions, - transaction, - ) - - // A wallet can have only one unproven moved funds sweep - // transaction at a time. If we found such transaction, we don't - // have to look at this wallet's transactions anymore. - break - } + if err != nil { + return false, fmt.Errorf( + "failed to check if transaction is an unproven moved "+ + "funds sweep transaction: [%v]", + err, + ) + } + return isUnproven, nil + }, + ) + if err != nil { + return nil, err } + + unprovenMovedFundsSweepTransactions = append( + unprovenMovedFundsSweepTransactions, + walletUnprovenTransactions..., + ) } return unprovenMovedFundsSweepTransactions, nil diff --git a/pkg/maintainer/spv/moving_funds.go b/pkg/maintainer/spv/moving_funds.go index 81d1e13e51..ebc4a96a46 100644 --- a/pkg/maintainer/spv/moving_funds.go +++ b/pkg/maintainer/spv/moving_funds.go @@ -197,41 +197,36 @@ func getUnprovenMovingFundsTransactions( // source wallet. targetWalletPublicKeyHash := targetWallets[0] - walletTransactions, err := btcChain.GetTransactionsForPublicKeyHash( + walletUnprovenTransactions, err := getUnprovenWalletTransactions( targetWalletPublicKeyHash, transactionLimit, - ) - if err != nil { - return nil, fmt.Errorf( - "failed to get transactions for wallet: [%v]", - err, - ) - } - - for _, transaction := range walletTransactions { - isUnproven, err := - isUnprovenMovingFundsTransaction( + btcChain, + func(transaction *bitcoin.Transaction) (bool, error) { + isUnproven, err := isUnprovenMovingFundsTransaction( transaction, walletPublicKeyHash, targetWallets, btcChain, spvChain, ) - if err != nil { - return nil, fmt.Errorf( - "failed to check if transaction is an unproven moving funds "+ - "transaction: [%v]", - err, - ) - } - - if isUnproven { - unprovenMovingFundsTransactions = append( - unprovenMovingFundsTransactions, - transaction, - ) - } + if err != nil { + return false, fmt.Errorf( + "failed to check if transaction is an unproven moving "+ + "funds transaction: [%v]", + err, + ) + } + return isUnproven, nil + }, + ) + if err != nil { + return nil, err } + + unprovenMovingFundsTransactions = append( + unprovenMovingFundsTransactions, + walletUnprovenTransactions..., + ) } return unprovenMovingFundsTransactions, nil diff --git a/pkg/maintainer/spv/redemptions.go b/pkg/maintainer/spv/redemptions.go index e504860f81..5d692377a7 100644 --- a/pkg/maintainer/spv/redemptions.go +++ b/pkg/maintainer/spv/redemptions.go @@ -221,40 +221,35 @@ func getUnprovenRedemptionTransactions( continue } - walletTransactions, err := btcChain.GetTransactionsForPublicKeyHash( + walletUnprovenTransactions, err := getUnprovenWalletTransactions( walletPublicKeyHash, transactionLimit, - ) - if err != nil { - return nil, fmt.Errorf( - "failed to get transactions for wallet: [%v]", - err, - ) - } - - for _, transaction := range walletTransactions { - isUnproven, err := - isUnprovenRedemptionTransaction( + btcChain, + func(transaction *bitcoin.Transaction) (bool, error) { + isUnproven, err := isUnprovenRedemptionTransaction( transaction, walletPublicKeyHash, btcChain, spvChain, ) - if err != nil { - return nil, fmt.Errorf( - "failed to check if transaction is an unproven redemption "+ - "transaction: [%v]", - err, - ) - } - - if isUnproven { - unprovenRedemptionTransactions = append( - unprovenRedemptionTransactions, - transaction, - ) - } + if err != nil { + return false, fmt.Errorf( + "failed to check if transaction is an unproven "+ + "redemption transaction: [%v]", + err, + ) + } + return isUnproven, nil + }, + ) + if err != nil { + return nil, err } + + unprovenRedemptionTransactions = append( + unprovenRedemptionTransactions, + walletUnprovenTransactions..., + ) } return unprovenRedemptionTransactions, nil diff --git a/pkg/maintainer/spv/spv.go b/pkg/maintainer/spv/spv.go index 990d8b0ec0..1a793385ef 100644 --- a/pkg/maintainer/spv/spv.go +++ b/pkg/maintainer/spv/spv.go @@ -260,6 +260,63 @@ func (sm *spvMaintainer) proveTransactions( return nil } +// getUnprovenWalletTransactions scans a wallet's confirmed Bitcoin transaction +// history and returns up to transactionLimit transactions that satisfy the +// given predicate, i.e. unproven protocol transactions. +// +// The history is scanned from the newest transaction to the oldest, and the +// limit bounds the number of matching PROTOCOL transactions returned rather +// than the number of raw address transactions examined. This matters: applying +// the limit to the raw address history (a bounded tail lookup) lets unrelated +// transactions paid to the wallet address, such as dust spam, evict real +// protocol transactions from the window before protocol-level filtering runs, +// which would prevent their proofs from ever being submitted. Scanning past +// such transactions until enough protocol matches are found (or the whole +// history is exhausted) keeps proof discovery robust against that spam. +// +// The returned transactions are ordered by block height in ascending order to +// preserve the ordering contract of the address-history lookups this replaces. +func getUnprovenWalletTransactions( + walletPublicKeyHash [20]byte, + transactionLimit int, + btcChain bitcoin.Chain, + isUnproven func(*bitcoin.Transaction) (bool, error), +) ([]*bitcoin.Transaction, error) { + txHashes, err := btcChain.GetTxHashesForPublicKeyHash(walletPublicKeyHash) + if err != nil { + return nil, fmt.Errorf( + "failed to get transaction hashes for wallet: [%v]", + err, + ) + } + + unproven := make([]*bitcoin.Transaction, 0) + + for i := len(txHashes) - 1; i >= 0 && len(unproven) < transactionLimit; i-- { + transaction, err := btcChain.GetTransaction(txHashes[i]) + if err != nil { + return nil, fmt.Errorf("cannot get transaction: [%v]", err) + } + + matches, err := isUnproven(transaction) + if err != nil { + return nil, err + } + + if matches { + unproven = append(unproven, transaction) + } + } + + // The scan collected matches newest-first; reverse to ascending block + // height order to preserve the ordering contract. + for left, right := 0, len(unproven)-1; left < right; left, right = left+1, right-1 { + unproven[left], unproven[right] = unproven[right], unproven[left] + } + + return unproven, nil +} + func isInputCurrentWalletsMainUTXO( fundingTxHash bitcoin.Hash, fundingOutputIndex uint32, diff --git a/pkg/maintainer/spv/spv_test.go b/pkg/maintainer/spv/spv_test.go index 94c2084d11..4372890556 100644 --- a/pkg/maintainer/spv/spv_test.go +++ b/pkg/maintainer/spv/spv_test.go @@ -324,3 +324,89 @@ func TestIsInputCurrentWalletsMainUTXO(t *testing.T) { }) } } + +// TestGetUnprovenWalletTransactions_FindsProtocolTransactionBehindSpam verifies +// that a real protocol transaction is still discovered even when more than +// transactionLimit unrelated (spam) transactions are paid to the same wallet +// address after it. This is the shared discovery path used by all SPV scanners, +// so a bounded tail lookup would let the spam evict the protocol transaction. +func TestGetUnprovenWalletTransactions_FindsProtocolTransactionBehindSpam(t *testing.T) { + btcChain := newLocalBitcoinChain() + + walletPublicKeyHash := [20]byte{ + 0x8d, 0xb5, 0x0e, 0xb5, 0x20, 0x63, 0xea, 0x9d, 0x98, 0xb3, + 0xea, 0xc9, 0x14, 0x89, 0xa9, 0x0f, 0x73, 0x89, 0x86, 0xf6, + } + walletScript, err := bitcoin.PayToWitnessPublicKeyHash(walletPublicKeyHash) + if err != nil { + t.Fatal(err) + } + + // buildWalletTx builds a distinct confirmed transaction paying to the wallet + // address. The output count distinguishes the protocol transaction (a single + // output, as the deposit sweep scanner requires) from spam, and the input + // outpoint index keeps each transaction unique. + buildWalletTx := func(index uint32, outputCount int) *bitcoin.Transaction { + outputs := make([]*bitcoin.TransactionOutput, outputCount) + for i := range outputs { + outputs[i] = &bitcoin.TransactionOutput{ + Value: 546, + PublicKeyScript: walletScript, + } + } + return &bitcoin.Transaction{ + Version: 1, + Inputs: []*bitcoin.TransactionInput{ + { + Outpoint: &bitcoin.TransactionOutpoint{ + TransactionHash: bitcoin.Hash{}, + OutputIndex: index, + }, + Sequence: 0xffffffff, + }, + }, + Outputs: outputs, + Locktime: 0, + } + } + + // The protocol transaction is the oldest transaction paying to the wallet. + protocolTransaction := buildWalletTx(0, 1) + if err := btcChain.BroadcastTransaction(protocolTransaction); err != nil { + t.Fatal(err) + } + + // Add many unrelated transactions after the protocol one, well beyond the + // transaction limit, each paying to the same wallet address. + transactionLimit := 5 + spamCount := transactionLimit * 3 + for i := 0; i < spamCount; i++ { + spamTransaction := buildWalletTx(uint32(i+1), 2) + if err := btcChain.BroadcastTransaction(spamTransaction); err != nil { + t.Fatal(err) + } + } + + protocolTransactionHash := protocolTransaction.Hash() + + found, err := getUnprovenWalletTransactions( + walletPublicKeyHash, + transactionLimit, + btcChain, + func(transaction *bitcoin.Transaction) (bool, error) { + return transaction.Hash() == protocolTransactionHash, nil + }, + ) + if err != nil { + t.Fatal(err) + } + + if len(found) != 1 { + t.Fatalf("expected exactly one unproven transaction, got %d", len(found)) + } + if found[0].Hash() != protocolTransactionHash { + t.Fatal( + "expected the protocol transaction to be discovered behind the spam", + ) + } +} diff --git a/pkg/net/retransmission/strategy.go b/pkg/net/retransmission/strategy.go index fd50384fb2..585824addf 100644 --- a/pkg/net/retransmission/strategy.go +++ b/pkg/net/retransmission/strategy.go @@ -1,6 +1,10 @@ package retransmission -import "github.com/keep-network/keep-core/pkg/net" +import ( + "sync" + + "github.com/keep-network/keep-core/pkg/net" +) // Strategy represents a specific retransmission strategy. type Strategy interface { @@ -44,6 +48,10 @@ func (ss *StandardStrategy) Tick(retransmitFn RetransmitFn) error { // ticks, between third and fourth is 4 ticks and so on. Graphically, the // schedule looks as follows: R _ R _ _ R _ _ _ _ R _ _ _ _ _ _ _ _ R type BackoffStrategy struct { + // mutex guards the retransmission counters below. ScheduleRetransmissions + // invokes Tick from a new goroutine on every tick, so overlapping ticks can + // call Tick concurrently on the same strategy instance. + mutex sync.Mutex tickCounter uint64 delay uint64 retransmitTick uint64 @@ -61,12 +69,23 @@ func WithBackoffStrategy() *BackoffStrategy { // Tick implements the Strategy.Tick function. func (bos *BackoffStrategy) Tick(retransmitFn RetransmitFn) error { + // Update the retransmission counters under the mutex so that concurrent + // Tick calls do not race on them. The decision is captured in a local and + // retransmitFn is invoked after releasing the lock, preserving the original + // behavior of not holding shared state while the message is retransmitted. + bos.mutex.Lock() + bos.tickCounter++ - if bos.tickCounter == bos.retransmitTick { + shouldRetransmit := bos.tickCounter == bos.retransmitTick + if shouldRetransmit { bos.retransmitTick += bos.delay + 1 bos.delay *= 2 + } + + bos.mutex.Unlock() + if shouldRetransmit { return retransmitFn() } diff --git a/pkg/net/retransmission/strategy_test.go b/pkg/net/retransmission/strategy_test.go index 2c4b261f5f..97f685f905 100644 --- a/pkg/net/retransmission/strategy_test.go +++ b/pkg/net/retransmission/strategy_test.go @@ -2,6 +2,7 @@ package retransmission import ( "reflect" + "sync" "testing" ) @@ -43,6 +44,37 @@ func TestStandardStrategy(t *testing.T) { } } +// TestBackoffStrategy_ConcurrentTick verifies that BackoffStrategy.Tick is safe +// to call concurrently, as ScheduleRetransmissions does by invoking Tick from a +// new goroutine on every tick. It is meant to be run with the race detector +// (go test -race); without the mutex guarding the retransmission counters, the +// concurrent read/modify/write of tickCounter, delay, and retransmitTick is a +// data race. +func TestBackoffStrategy_ConcurrentTick(t *testing.T) { + strategy := WithBackoffStrategy() + + const goroutines = 50 + + // A barrier releases all goroutines at once to maximize the overlap of the + // counter updates inside Tick. + start := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + <-start + _ = strategy.Tick(func() error { + return nil + }) + }() + } + + close(start) + wg.Wait() +} + func TestBackoffStrategy(t *testing.T) { strategy := WithBackoffStrategy() diff --git a/pkg/tbtc/covenant_signer.go b/pkg/tbtc/covenant_signer.go index b4f6454e78..743e7af143 100644 --- a/pkg/tbtc/covenant_signer.go +++ b/pkg/tbtc/covenant_signer.go @@ -23,6 +23,13 @@ import ( type covenantSignerEngine struct { node *node minimumActiveOutpointConfirmations uint + // bridgeFraudDefenseConfirmed records the operator's explicit confirmation + // that the tBTC Bridge recognizes covenant active UTXO spends as honest + // spends, i.e. the covenant fraud-defense path is deployed. Until it is set, + // the engine refuses to produce covenant signatures because a covenant + // SIGHASH_ALL signature over a covenant active UTXO is otherwise a valid, + // undefeatable tBTC fraud proof against the signing wallet. + bridgeFraudDefenseConfirmed bool } // defaultMinActiveOutpointConfirmations is the confirmation threshold applied @@ -55,7 +62,15 @@ type qcV1SignerHandoff struct { // newCovenantSignerEngine creates a covenant signer engine bound to the given // node. When minConfirmations is zero (the Go zero-value produced by an unset // config field), defaultMinActiveOutpointConfirmations is used. -func newCovenantSignerEngine(node *node, minConfirmations uint) covenantsigner.Engine { +// +// bridgeFraudDefenseConfirmed must be set only when the operator has confirmed +// that the tBTC Bridge covenant fraud-defense path is deployed. When false (the +// default), the engine fails closed and refuses to produce covenant signatures. +func newCovenantSignerEngine( + node *node, + minConfirmations uint, + bridgeFraudDefenseConfirmed bool, +) covenantsigner.Engine { if minConfirmations == 0 { minConfirmations = defaultMinActiveOutpointConfirmations } @@ -63,6 +78,7 @@ func newCovenantSignerEngine(node *node, minConfirmations uint) covenantsigner.E return &covenantSignerEngine{ node: node, minimumActiveOutpointConfirmations: minConfirmations, + bridgeFraudDefenseConfirmed: bridgeFraudDefenseConfirmed, } } @@ -145,6 +161,23 @@ func (cse *covenantSignerEngine) VerifySignerApproval( return err } + // Fail closed for wallets that are not in a state eligible for covenant + // signing. The signer set hash embedded in a certificate binds only the + // wallet identity, members hash, and threshold, none of which change when a + // wallet is closed or terminated, so a certificate issued while the wallet + // was live would otherwise keep verifying after closure. Rejecting + // non-eligible states here ensures a wallet the closure path intended to + // deauthorize cannot be made to sign a covenant transaction. + if !isCovenantSigningEligibleState(walletChainData.State) { + return covenantsigner.NewInputError( + fmt.Sprintf( + "request.signerApproval.walletPublicKey resolves to a wallet in "+ + "state [%v] that is not eligible for covenant signing", + walletChainData.State, + ), + ) + } + expectedSignerSetHash, err := computeSignerApprovalCertificateSignerSetHash( signerPublicKey, walletChainData, @@ -175,6 +208,16 @@ func (cse *covenantSignerEngine) VerifySignerApproval( return nil } +// isCovenantSigningEligibleState reports whether a wallet in the given state is +// eligible to receive covenant signatures. Covenant migrations are only +// expected for live wallets, so covenant signing fails closed for every other +// state (including closed and terminated wallets that the closure path intends +// to deauthorize). If covenant signing must be allowed for another state in the +// future, add it here explicitly together with justification and tests. +func isCovenantSigningEligibleState(state WalletState) bool { + return state == StateLive +} + func (cse *covenantSignerEngine) resolveSignerApprovalTemplatePublicKey( request covenantsigner.RouteSubmitRequest, ) (*ecdsa.PublicKey, error) { @@ -200,6 +243,26 @@ func (cse *covenantSignerEngine) OnSubmit( ctx context.Context, job *covenantsigner.Job, ) (*covenantsigner.Transition, error) { + // Fail closed unless the operator has confirmed the tBTC Bridge covenant + // fraud-defense path is deployed. Producing a covenant SIGHASH_ALL + // signature over a covenant active UTXO exposes the signing wallet to a + // tBTC fraud challenge it cannot defeat, because the Bridge does not + // recognize a covenant active UTXO spend as an honest spend in + // Fraud.defeatFraudChallenge; only a swept deposit, a spent main UTXO, or a + // processed moved-funds sweep can defeat the challenge. The complete + // remediation is the bridge-side covenant fraud-defense path. Until it is + // deployed and confirmed here, refuse to sign so a valid migration + // signature cannot become a slashable wallet signature. + if !cse.bridgeFraudDefenseConfirmed { + return failedTransition( + covenantsigner.ReasonPolicyRejected, + "covenant signing is disabled until the tBTC Bridge covenant "+ + "fraud-defense path is confirmed deployed; a covenant signature "+ + "would otherwise expose the wallet to an undefeatable fraud "+ + "challenge", + ), nil + } + switch job.Route { case covenantsigner.TemplateSelfV1: return cse.submitSelfV1(ctx, job), nil @@ -750,10 +813,24 @@ func (cse *covenantSignerEngine) buildCovenantTransactionBuilder( activeUtxo *bitcoin.UnspentTransactionOutput, witnessScript bitcoin.Script, ) (*bitcoin.TransactionBuilder, error) { - destinationScript, err := decodePrefixedHex(request.MigrationDestination.DepositScript) + destinationDepositScript, err := decodePrefixedHex(request.MigrationDestination.DepositScript) if err != nil { return nil, fmt.Errorf("migration destination deposit script is invalid") } + if len(destinationDepositScript) == 0 { + return nil, fmt.Errorf("migration destination deposit script must not be empty") + } + // MigrationDestination.DepositScript is the plain tBTC deposit script, not a + // ready-made output script. The Bitcoin funding output must pay to its P2WSH + // script hash (OP_0 ), which is how the tBTC Bridge + // rebuilds and verifies the funding output in revealDepositWithExtraData. + // Using the plain deposit script directly as the output script would make + // the migration deposit unrevealable to the Bridge. + destinationWitnessScriptHash := bitcoin.WitnessScriptHash(destinationDepositScript) + destinationScriptPubKey, err := bitcoin.PayToWitnessScriptHash(destinationWitnessScriptHash) + if err != nil { + return nil, fmt.Errorf("cannot build migration destination locking script: %v", err) + } destinationValue, err := toBitcoinOutputValue( request.MigrationTransactionPlan.DestinationValueSats, "migration destination value", @@ -779,7 +856,7 @@ func (cse *covenantSignerEngine) buildCovenantTransactionBuilder( builder.SetLocktime(request.MigrationTransactionPlan.LockTime) builder.AddOutput(&bitcoin.TransactionOutput{ Value: destinationValue, - PublicKeyScript: destinationScript, + PublicKeyScript: destinationScriptPubKey, }) anchorScript, err := canonicalAnchorScriptPubKey() @@ -794,6 +871,15 @@ func (cse *covenantSignerEngine) buildCovenantTransactionBuilder( return builder, nil } +// signCovenantTransactionInput produces the wallet's tECDSA signature over the +// single covenant input of the migration transaction. +// +// This signature is a normal Bitcoin SIGHASH_ALL signature over a covenant +// active UTXO. A covenant active UTXO is neither a swept deposit, a spent main +// UTXO, nor a processed moved-funds sweep, so the tBTC Bridge cannot defeat a +// fraud challenge that replays this signature. Callers must therefore only +// reach this path once the bridge-side covenant fraud-defense has been +// confirmed deployed; OnSubmit enforces that fail-closed gate. func signCovenantTransactionInput( ctx context.Context, signingExecutor *signingExecutor, diff --git a/pkg/tbtc/covenant_signer_test.go b/pkg/tbtc/covenant_signer_test.go index c7794f98ca..8bd9797d91 100644 --- a/pkg/tbtc/covenant_signer_test.go +++ b/pkg/tbtc/covenant_signer_test.go @@ -83,7 +83,7 @@ func TestCovenantSignerEngine_SubmitSelfV1Ready(t *testing.T) { service, err := covenantsigner.NewService( newCovenantSignerMemoryHandle(), - newCovenantSignerEngine(node, 0), + newCovenantSignerEngine(node, 0, true), ) if err != nil { t.Fatal(err) @@ -250,9 +250,21 @@ func TestCovenantSignerEngine_SubmitSelfV1Ready(t *testing.T) { if transaction.Outputs[0].Value != int64(destinationValueSats) { t.Fatalf("unexpected destination value: %d", transaction.Outputs[0].Value) } - if !bytes.Equal(transaction.Outputs[0].PublicKeyScript, destinationScript) { + // The destination output must pay to the P2WSH script hash of the deposit + // script (OP_0 ), not to the plain deposit script + // itself; otherwise the migration deposit is unrevealable to the tBTC Bridge. + expectedDestinationScript, err := bitcoin.PayToWitnessScriptHash( + bitcoin.WitnessScriptHash(destinationScript), + ) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(transaction.Outputs[0].PublicKeyScript, expectedDestinationScript) { t.Fatal("unexpected destination output script") } + if bytes.Equal(transaction.Outputs[0].PublicKeyScript, destinationScript) { + t.Fatal("destination output must not be the raw deposit script") + } expectedAnchorScript, err := canonicalAnchorScriptPubKey() if err != nil { @@ -318,7 +330,7 @@ func TestCovenantSignerEngine_SubmitQcV1HandoffReady(t *testing.T) { service, err := covenantsigner.NewService( newCovenantSignerMemoryHandle(), - newCovenantSignerEngine(node, 0), + newCovenantSignerEngine(node, 0, true), ) if err != nil { t.Fatal(err) @@ -517,9 +529,21 @@ func TestCovenantSignerEngine_SubmitQcV1HandoffReady(t *testing.T) { if unsignedTransaction.Outputs[0].Value != int64(destinationValueSats) { t.Fatalf("unexpected destination value: %d", unsignedTransaction.Outputs[0].Value) } - if !bytes.Equal(unsignedTransaction.Outputs[0].PublicKeyScript, destinationScript) { + // The destination output must pay to the P2WSH script hash of the deposit + // script (OP_0 ), not to the plain deposit script + // itself; otherwise the migration deposit is unrevealable to the tBTC Bridge. + expectedDestinationScript, err := bitcoin.PayToWitnessScriptHash( + bitcoin.WitnessScriptHash(destinationScript), + ) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(unsignedTransaction.Outputs[0].PublicKeyScript, expectedDestinationScript) { t.Fatal("unexpected destination output script") } + if bytes.Equal(unsignedTransaction.Outputs[0].PublicKeyScript, destinationScript) { + t.Fatal("destination output must not be the raw deposit script") + } expectedAnchorScript, err := canonicalAnchorScriptPubKey() if err != nil { @@ -605,7 +629,7 @@ func TestCovenantSignerEngine_SubmitQcV1RejectsInvalidBeta(t *testing.T) { service, err := covenantsigner.NewService( newCovenantSignerMemoryHandle(), - newCovenantSignerEngine(node, 0), + newCovenantSignerEngine(node, 0, true), ) if err != nil { t.Fatal(err) @@ -721,7 +745,7 @@ func TestCovenantSignerEngine_SubmitQcV1RejectsScriptHashMismatch(t *testing.T) service, err := covenantsigner.NewService( newCovenantSignerMemoryHandle(), - newCovenantSignerEngine(node, 0), + newCovenantSignerEngine(node, 0, true), ) if err != nil { t.Fatal(err) @@ -867,7 +891,7 @@ func TestCovenantSignerEngine_SubmitSelfV1RejectsZeroMaturityHeight(t *testing.T service, err := covenantsigner.NewService( newCovenantSignerMemoryHandle(), - newCovenantSignerEngine(node, 0), + newCovenantSignerEngine(node, 0, true), ) if err != nil { t.Fatal(err) @@ -1009,6 +1033,150 @@ func TestCovenantSignerEngine_EnsureActiveOutpointFinalityRejectsUnconfirmed(t * } } +func TestNode_InvalidateSigningExecutorEvictsCachedExecutorOnArchival(t *testing.T) { + node, _, walletPublicKey := setupCovenantSignerTestNode(t) + + // setupCovenantSignerTestNode already created and cached a signing executor + // for the wallet. Confirm it is served from the cache. + executor, ok, err := node.getSigningExecutor(walletPublicKey) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("expected the node to control the wallet before archival") + } + + // Archive the wallet, removing it from the wallet registry, as the wallet + // closure path does. + walletPublicKeyHash := bitcoin.PublicKeyHash(walletPublicKey) + if err := node.walletRegistry.archiveWallet(walletPublicKeyHash); err != nil { + t.Fatal(err) + } + + // Before invalidation, getSigningExecutor still returns the stale cached + // executor even though the wallet is no longer in the registry -- this is + // the behavior the fix must prevent. + staleExecutor, ok, err := node.getSigningExecutor(walletPublicKey) + if err != nil { + t.Fatal(err) + } + if !ok || staleExecutor != executor { + t.Fatal("expected the stale cached executor to still be present before invalidation") + } + + // Invalidating the signing executor (as handleWalletClosure does after + // archival) evicts the cached executor, so the archived wallet is no longer + // signable through it. + if err := node.invalidateSigningExecutor(walletPublicKey); err != nil { + t.Fatal(err) + } + + _, ok, err = node.getSigningExecutor(walletPublicKey) + if err != nil { + t.Fatal(err) + } + if ok { + t.Fatal("expected the archived wallet to no longer be signable after executor invalidation") + } +} + +func TestArchiveClosedWallets_SkipsWalletNotYetRegisteredOnChain(t *testing.T) { + node, _, walletPublicKey := setupCovenantSignerTestNode(t) + + localChain, ok := node.chain.(*localChain) + if !ok { + t.Fatal("expected local chain implementation") + } + + // Simulate a freshly generated wallet that is present in the local wallet + // registry but not yet recorded on-chain by the Bridge, i.e. still inside + // the DKG approval window. + walletPublicKeyHash := bitcoin.PublicKeyHash(walletPublicKey) + localChain.walletsMutex.Lock() + delete(localChain.wallets, walletPublicKeyHash) + localChain.walletsMutex.Unlock() + + if err := node.archiveClosedWallets(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The pending wallet must be left in place, not archived. + if len(node.walletRegistry.getSigners(walletPublicKey)) == 0 { + t.Fatal("expected the not-yet-registered wallet to be left in place, but it was archived") + } +} + +func TestArchiveClosedWallets_ArchivesClosedWallet(t *testing.T) { + node, _, walletPublicKey := setupCovenantSignerTestNode(t) + + localChain, ok := node.chain.(*localChain) + if !ok { + t.Fatal("expected local chain implementation") + } + + // Transition the on-chain wallet to the closed state, keeping its identity. + walletPublicKeyHash := bitcoin.PublicKeyHash(walletPublicKey) + existing, err := localChain.GetWallet(walletPublicKeyHash) + if err != nil { + t.Fatal(err) + } + closed := *existing + closed.State = StateClosed + localChain.setWallet(walletPublicKeyHash, &closed) + + if err := node.archiveClosedWallets(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The closed wallet must be archived. + if len(node.walletRegistry.getSigners(walletPublicKey)) != 0 { + t.Fatal("expected the closed wallet to be archived, but it is still present") + } +} + +// TestCovenantSignerEngine_OnSubmitFailsClosedWithoutBridgeFraudDefense verifies +// that the engine refuses to produce covenant signatures unless the operator has +// confirmed the tBTC Bridge covenant fraud-defense path is deployed. A covenant +// signature would otherwise expose the signing wallet to an undefeatable fraud +// challenge. +func TestCovenantSignerEngine_OnSubmitFailsClosedWithoutBridgeFraudDefense(t *testing.T) { + node, _, _ := setupCovenantSignerTestNode(t) + + // Construct the engine with the default (fail-closed) configuration: the + // bridge covenant fraud-defense path is not confirmed deployed. + engine := newCovenantSignerEngine(node, 0, false) + + for _, route := range []covenantsigner.TemplateID{ + covenantsigner.TemplateSelfV1, + covenantsigner.TemplateQcV1, + } { + transition, err := engine.OnSubmit( + context.Background(), + &covenantsigner.Job{Route: route}, + ) + if err != nil { + t.Fatalf("[%s] unexpected error: %v", route, err) + } + if transition == nil { + t.Fatalf("[%s] expected a transition", route) + } + if transition.State != covenantsigner.JobStateFailed { + t.Fatalf( + "[%s] expected a failed transition, got state [%v]", + route, + transition.State, + ) + } + if transition.Reason != covenantsigner.ReasonPolicyRejected { + t.Fatalf( + "[%s] expected a policy rejection, got reason [%v]", + route, + transition.Reason, + ) + } + } +} + func setupCovenantSignerTestNode( t *testing.T, ) (*node, *localBitcoinChain, *ecdsa.PublicKey) { @@ -1380,7 +1548,9 @@ func TestCovenantSignerEngine_OnPollReturnsNoTransition(t *testing.T) { } func TestCovenantSignerEngine_SubmitRejectsUnsupportedRoute(t *testing.T) { - transition, err := (&covenantSignerEngine{}).OnSubmit( + // Confirm the bridge fraud-defense so OnSubmit reaches route validation + // rather than the fail-closed gate. + transition, err := (&covenantSignerEngine{bridgeFraudDefenseConfirmed: true}).OnSubmit( context.Background(), &covenantsigner.Job{ Route: covenantsigner.TemplateID("unsupported_route"), @@ -1406,7 +1576,7 @@ func TestCovenantSignerEngine_SubmitRejectsUnsupportedRoute(t *testing.T) { func TestNewCovenantSignerEngine_DefaultMinConfirmations(t *testing.T) { node, _, _ := setupCovenantSignerTestNode(t) - engine := newCovenantSignerEngine(node, 0) + engine := newCovenantSignerEngine(node, 0, true) cse, ok := engine.(*covenantSignerEngine) if !ok { @@ -1424,7 +1594,7 @@ func TestNewCovenantSignerEngine_DefaultMinConfirmations(t *testing.T) { func TestNewCovenantSignerEngine_ExplicitMinConfirmations(t *testing.T) { node, _, _ := setupCovenantSignerTestNode(t) - engine := newCovenantSignerEngine(node, 3) + engine := newCovenantSignerEngine(node, 3, true) cse, ok := engine.(*covenantSignerEngine) if !ok { @@ -1598,3 +1768,78 @@ func TestComputeQcV1SignerHandoffPayloadHash_DeterministicKeyOrdering(t *testing ) } } + +// TestCovenantSignerEngine_VerifySignerApprovalRejectsNonLiveWallet verifies +// that a signer approval certificate which is otherwise valid (matching the +// wallet identity and members hash) is rejected once the wallet leaves the live +// state, so a closed or terminated wallet cannot be made to sign a covenant +// transaction. +func TestCovenantSignerEngine_VerifySignerApprovalRejectsNonLiveWallet(t *testing.T) { + node, _, walletPublicKey := setupCovenantSignerTestNode(t) + + localChain, ok := node.chain.(*localChain) + if !ok { + t.Fatal("expected local chain implementation") + } + + depositorPrivateKey, _ := btcec.PrivKeyFromBytes(btcec.S256(), bytes.Repeat([]byte{0x42}, 32)) + depositorPublicKey := depositorPrivateKey.PubKey().SerializeCompressed() + signerPublicKey := (*btcec.PublicKey)(walletPublicKey).SerializeCompressed() + + template := &covenantsigner.SelfV1Template{ + Template: covenantsigner.TemplateSelfV1, + DepositorPublicKey: "0x" + hex.EncodeToString(depositorPublicKey), + SignerPublicKey: "0x" + hex.EncodeToString(signerPublicKey), + Delta2: 4320, + } + templateJSON, err := json.Marshal(template) + if err != nil { + t.Fatal(err) + } + + // Build a request whose signer approval certificate is issued while the + // wallet is live, so it is genuinely valid apart from the wallet state. + request := covenantsigner.RouteSubmitRequest{ + Route: covenantsigner.TemplateSelfV1, + DestinationCommitmentHash: "0x" + strings.Repeat("11", 32), + MigrationTransactionPlan: &covenantsigner.MigrationTransactionPlan{ + InputValueSats: 1_000_000, + DestinationValueSats: 998_000, + AnchorValueSats: 330, + FeeSats: 1_670, + InputSequence: 0xfffffffd, + LockTime: 912345, + }, + ScriptTemplate: templateJSON, + Signing: covenantsigner.SigningRequirements{ + SignerRequired: true, + CustodianRequired: false, + }, + } + applyTestMigrationTransactionPlanCommitment(t, &request) + applyTestArtifactApprovals(t, node, walletPublicKey, &request, depositorPrivateKey, nil) + + cse := &covenantSignerEngine{node: node} + + // While the wallet is live, the otherwise-valid request is accepted. + if err := cse.VerifySignerApproval(request); err != nil { + t.Fatalf("expected a live wallet to be accepted, got: %v", err) + } + + // Transition the wallet to the closed state, keeping otherwise-valid + // registry data (same identity and members hash), matching the reported + // scenario where a certificate for a now-closed wallet is replayed. + walletPublicKeyHash := bitcoin.PublicKeyHash(walletPublicKey) + existing, err := localChain.GetWallet(walletPublicKeyHash) + if err != nil { + t.Fatal(err) + } + closed := *existing + closed.State = StateClosed + localChain.setWallet(walletPublicKeyHash, &closed) + + // The closed wallet must now be rejected before any signing occurs. + if err := cse.VerifySignerApproval(request); err == nil { + t.Fatal("expected VerifySignerApproval to reject a closed wallet") + } +} diff --git a/pkg/tbtc/deduplicator.go b/pkg/tbtc/deduplicator.go index 37d0b1704f..1938629035 100644 --- a/pkg/tbtc/deduplicator.go +++ b/pkg/tbtc/deduplicator.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "math/big" "strconv" + "sync" "time" "github.com/keep-network/keep-common/pkg/cache" @@ -39,6 +40,17 @@ type deduplicator struct { dkgSeedCache *cache.TimeCache dkgResultHashCache *cache.TimeCache walletClosedCache *cache.TimeCache + + // inProgressMutex guards the inProgress set below. + inProgressMutex sync.Mutex + // inProgress tracks event keys that have been claimed for handling but + // whose handling has not yet completed successfully. Keeping claimed but + // unfinished events here (rather than immediately in the completed caches + // above) lets concurrent duplicate deliveries be ignored while still + // allowing a later redelivery to retry the handling if the current attempt + // fails. An event key is moved into its completed cache only once its + // handler finishes successfully. + inProgress map[string]bool } func newDeduplicator() *deduplicator { @@ -46,73 +58,187 @@ func newDeduplicator() *deduplicator { dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod), dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod), walletClosedCache: cache.NewTimeCache(WalletClosedCachePeriod), + inProgress: make(map[string]bool), + } +} + +// claim attempts to reserve the given event, identified by key, for handling. +// It returns true when the caller should proceed with handling the event and +// false when the event has already been handled successfully (it lives in +// completedCache) or is currently being handled by another goroutine (it lives +// in the inProgress set). +// +// A successful claim must be paired with exactly one later call: markCompleted +// once the handler finishes successfully, or release if the handler fails. +// Releasing a failed claim (instead of marking it completed) is what lets a +// later redelivery of the same event retry the handling rather than being +// silently dropped as a duplicate. +func (d *deduplicator) claim(completedCache *cache.TimeCache, key string) bool { + d.inProgressMutex.Lock() + defer d.inProgressMutex.Unlock() + + // Drop expired entries so a long-past event can be handled again. + completedCache.Sweep() + + if completedCache.Has(key) { + return false + } + + // Guard against a nil map for deduplicator instances built via a struct + // literal (e.g. in tests) rather than through newDeduplicator. + if d.inProgress == nil { + d.inProgress = make(map[string]bool) + } + + if d.inProgress[key] { + return false } + + d.inProgress[key] = true + return true +} + +// markCompleted records the claimed event, identified by key, as successfully +// handled and releases the in-progress claim. Subsequent deliveries of the same +// event are treated as duplicates until the completedCache entry expires. +func (d *deduplicator) markCompleted(completedCache *cache.TimeCache, key string) { + d.inProgressMutex.Lock() + defer d.inProgressMutex.Unlock() + + completedCache.Add(key) + delete(d.inProgress, key) +} + +// release drops the in-progress claim for the given event, identified by key, +// without recording it as completed. It is called when handling fails so a +// later redelivery of the same event can be retried instead of being dropped +// as a duplicate. +func (d *deduplicator) release(key string) { + d.inProgressMutex.Lock() + defer d.inProgressMutex.Unlock() + + delete(d.inProgress, key) +} + +// dkgStartedKey builds the deduplication key identifying a DKG started event. +// The key is the hexadecimal representation of the seed. +func dkgStartedKey(newDKGSeed *big.Int) string { + return newDKGSeed.Text(16) } // notifyDKGStarted notifies the client wants to start the distributed key // generation upon receiving an event. It returns boolean indicating whether the // client should proceed with the execution or ignore the event as a duplicate. +// +// A successful claim must be released with confirmDKGStarted once the event has +// been terminally handled (the local DKG join was started, or the event turned +// out to be unconfirmed) or with abortDKGStarted if handling did not complete, +// so a later redelivery of the same event can retry. func (d *deduplicator) notifyDKGStarted( newDKGSeed *big.Int, ) bool { - d.dkgSeedCache.Sweep() - - // The cache key is the hexadecimal representation of the seed. - cacheKey := newDKGSeed.Text(16) - // If the key is not in the cache, that means the seed was not handled - // yet and the client should proceed with the execution. - if !d.dkgSeedCache.Has(cacheKey) { - d.dkgSeedCache.Add(cacheKey) - return true - } + return d.claim(d.dkgSeedCache, dkgStartedKey(newDKGSeed)) +} + +// confirmDKGStarted marks the given DKG started event as successfully handled +// so subsequent deliveries of the same event are ignored as duplicates. +func (d *deduplicator) confirmDKGStarted(newDKGSeed *big.Int) { + d.markCompleted(d.dkgSeedCache, dkgStartedKey(newDKGSeed)) +} + +// abortDKGStarted releases the in-progress claim for the given DKG started +// event without marking it handled, so a later redelivery of the same event can +// retry the handling. +func (d *deduplicator) abortDKGStarted(newDKGSeed *big.Int) { + d.release(dkgStartedKey(newDKGSeed)) +} - // Otherwise, the DKG seed is a duplicate and the client should not proceed - // with the execution. - return false +// dkgResultSubmittedKey builds the deduplication key identifying a DKG result +// submission event. +func dkgResultSubmittedKey( + newDKGResultSeed *big.Int, + newDKGResultHash DKGChainResultHash, + newDKGResultBlock uint64, +) string { + return newDKGResultSeed.Text(16) + + hex.EncodeToString(newDKGResultHash[:]) + + strconv.Itoa(int(newDKGResultBlock)) } // notifyDKGResultSubmitted notifies the client wants to start some actions // upon the DKG result submission. It returns boolean indicating whether the // client should proceed with the actions or ignore the event as a duplicate. +// +// A successful claim must be released with confirmDKGResultSubmitted once the +// result has been validated (challenged if invalid, or its approval scheduled +// if valid) or with abortDKGResultSubmitted if validation did not reach a +// terminal state, so a later redelivery of the same event can retry. func (d *deduplicator) notifyDKGResultSubmitted( newDKGResultSeed *big.Int, newDKGResultHash DKGChainResultHash, newDKGResultBlock uint64, ) bool { - d.dkgResultHashCache.Sweep() + return d.claim( + d.dkgResultHashCache, + dkgResultSubmittedKey(newDKGResultSeed, newDKGResultHash, newDKGResultBlock), + ) +} - cacheKey := newDKGResultSeed.Text(16) + - hex.EncodeToString(newDKGResultHash[:]) + - strconv.Itoa(int(newDKGResultBlock)) +// confirmDKGResultSubmitted marks the given DKG result submission as +// successfully handled so subsequent deliveries of the same event are ignored +// as duplicates. +func (d *deduplicator) confirmDKGResultSubmitted( + newDKGResultSeed *big.Int, + newDKGResultHash DKGChainResultHash, + newDKGResultBlock uint64, +) { + d.markCompleted( + d.dkgResultHashCache, + dkgResultSubmittedKey(newDKGResultSeed, newDKGResultHash, newDKGResultBlock), + ) +} - // If the key is not in the cache, that means the result was not handled - // yet and the client should proceed with the execution. - if !d.dkgResultHashCache.Has(cacheKey) { - d.dkgResultHashCache.Add(cacheKey) - return true - } +// abortDKGResultSubmitted releases the in-progress claim for the given DKG +// result submission without marking it handled, so a later redelivery of the +// same event can retry the validation. +func (d *deduplicator) abortDKGResultSubmitted( + newDKGResultSeed *big.Int, + newDKGResultHash DKGChainResultHash, + newDKGResultBlock uint64, +) { + d.release( + dkgResultSubmittedKey(newDKGResultSeed, newDKGResultHash, newDKGResultBlock), + ) +} - // Otherwise, the DKG result is a duplicate and the client should not - // proceed with the execution. - return false +// walletClosedKey builds the deduplication key identifying a wallet closure +// event. +func walletClosedKey(walletID [32]byte) string { + return hex.EncodeToString(walletID[:]) } +// notifyWalletClosed notifies the client wants to handle a wallet closure upon +// receiving an event. It returns a boolean indicating whether the client should +// proceed with the handling or ignore the event as a duplicate. +// +// A successful claim must be released with confirmWalletClosed once the wallet +// has actually been archived, or with abortWalletClosed if archival did not +// complete, so a later redelivery of the same event can retry. func (d *deduplicator) notifyWalletClosed( - WalletID [32]byte, + walletID [32]byte, ) bool { - d.walletClosedCache.Sweep() - - // Use wallet ID converted to string as the cache key. - cacheKey := hex.EncodeToString(WalletID[:]) + return d.claim(d.walletClosedCache, walletClosedKey(walletID)) +} - // If the key is not in the cache, that means the wallet closure was not - // handled yet and the client should proceed with the execution. - if !d.walletClosedCache.Has(cacheKey) { - d.walletClosedCache.Add(cacheKey) - return true - } +// confirmWalletClosed marks the given wallet closure as successfully handled so +// subsequent deliveries of the same event are ignored as duplicates. +func (d *deduplicator) confirmWalletClosed(walletID [32]byte) { + d.markCompleted(d.walletClosedCache, walletClosedKey(walletID)) +} - // Otherwise, the wallet closure is a duplicate and the client should not - // proceed with the execution. - return false +// abortWalletClosed releases the in-progress claim for the given wallet closure +// without marking it handled, so a later redelivery of the same event can retry +// the archival. +func (d *deduplicator) abortWalletClosed(walletID [32]byte) { + d.release(walletClosedKey(walletID)) } diff --git a/pkg/tbtc/deduplicator_test.go b/pkg/tbtc/deduplicator_test.go index b75432a8c0..85db7df248 100644 --- a/pkg/tbtc/deduplicator_test.go +++ b/pkg/tbtc/deduplicator_test.go @@ -18,24 +18,28 @@ const ( func TestNotifyDKGStarted(t *testing.T) { deduplicator := deduplicator{ dkgSeedCache: cache.NewTimeCache(testDKGSeedCachePeriod), + inProgress: make(map[string]bool), } seed1 := big.NewInt(100) seed2 := big.NewInt(200) - // Add the first seed. + // Claim and confirm the first seed. canJoinDKG := deduplicator.notifyDKGStarted(seed1) if !canJoinDKG { t.Fatal("should be allowed to join DKG") } + deduplicator.confirmDKGStarted(seed1) - // Add the second seed. + // Claim and confirm the second seed. canJoinDKG = deduplicator.notifyDKGStarted(seed2) if !canJoinDKG { t.Fatal("should be allowed to join DKG") } + deduplicator.confirmDKGStarted(seed2) - // Add the first seed before caching period elapses. + // The first seed is now a confirmed duplicate before the caching period + // elapses. canJoinDKG = deduplicator.notifyDKGStarted(seed1) if canJoinDKG { t.Fatal("should not be allowed to join DKG") @@ -44,13 +48,51 @@ func TestNotifyDKGStarted(t *testing.T) { // Wait until caching period elapses. time.Sleep(testDKGSeedCachePeriod) - // Add the first seed again. + // The first seed can be processed again after expiry. canJoinDKG = deduplicator.notifyDKGStarted(seed1) if !canJoinDKG { t.Fatal("should be allowed to join DKG") } } +// TestNotifyDKGStarted_RetryOpenAfterAbort verifies that a DKG started event +// whose handling did not complete (aborted) can be retried on a later +// redelivery, and that a confirmed one is dropped as a duplicate. +func TestNotifyDKGStarted_RetryOpenAfterAbort(t *testing.T) { + deduplicator := deduplicator{ + dkgSeedCache: cache.NewTimeCache(testDKGSeedCachePeriod), + inProgress: make(map[string]bool), + } + + seed := big.NewInt(100) + + // Claim the event for handling. + if !deduplicator.notifyDKGStarted(seed) { + t.Fatal("first claim should be allowed to process") + } + + // While the claim is in progress, a concurrent duplicate delivery must be + // ignored. + if deduplicator.notifyDKGStarted(seed) { + t.Fatal("in-progress event should not be claimable again") + } + + // Handling failed, so the claim is released. + deduplicator.abortDKGStarted(seed) + + // A later redelivery of the same event must be allowed to retry, rather + // than being dropped as an already-processed duplicate. + if !deduplicator.notifyDKGStarted(seed) { + t.Fatal("event should be claimable again after an aborted attempt") + } + + // Once handling completes successfully, further deliveries are duplicates. + deduplicator.confirmDKGStarted(seed) + if deduplicator.notifyDKGStarted(seed) { + t.Fatal("confirmed event should not be claimable again") + } +} + func TestNotifyDKGResultSubmitted(t *testing.T) { deduplicator := deduplicator{ dkgResultHashCache: cache.NewTimeCache(testDKGResultHashCachePeriod), @@ -70,37 +112,33 @@ func TestNotifyDKGResultSubmitted(t *testing.T) { var hash2 [32]byte copy(hash2[:], hash2Bytes) - // Add the original parameters. + // Claim and confirm the original parameters. canProcess := deduplicator.notifyDKGResultSubmitted(big.NewInt(100), hash1, 500) if !canProcess { t.Fatal("should be allowed to process") } + deduplicator.confirmDKGResultSubmitted(big.NewInt(100), hash1, 500) - // Add with different seed. - canProcess = deduplicator.notifyDKGResultSubmitted(big.NewInt(101), hash1, 500) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add with different result hash. - canProcess = deduplicator.notifyDKGResultSubmitted(big.NewInt(100), hash2, 500) - if !canProcess { - t.Fatal("should be allowed to process") + // Different seed, different result hash, different result block, and + // all-different parameters must be treated as independent events. + for _, tc := range []struct { + seed *big.Int + hash [32]byte + block uint64 + }{ + {big.NewInt(101), hash1, 500}, + {big.NewInt(100), hash2, 500}, + {big.NewInt(100), hash1, 501}, + {big.NewInt(101), hash2, 501}, + } { + if !deduplicator.notifyDKGResultSubmitted(tc.seed, tc.hash, tc.block) { + t.Fatalf("should be allowed to process seed [%v]", tc.seed) + } + deduplicator.confirmDKGResultSubmitted(tc.seed, tc.hash, tc.block) } - // Add with different result block. - canProcess = deduplicator.notifyDKGResultSubmitted(big.NewInt(100), hash1, 501) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add with all different parameters. - canProcess = deduplicator.notifyDKGResultSubmitted(big.NewInt(101), hash2, 501) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the original parameters before caching period elapses. + // The original parameters are now a confirmed duplicate before the caching + // period elapses. canProcess = deduplicator.notifyDKGResultSubmitted(big.NewInt(100), hash1, 500) if canProcess { t.Fatal("should not be allowed to process") @@ -109,34 +147,78 @@ func TestNotifyDKGResultSubmitted(t *testing.T) { // Wait until caching period elapses. time.Sleep(testDKGResultHashCachePeriod) - // Add the original parameters again. + // The original parameters can be processed again after expiry. canProcess = deduplicator.notifyDKGResultSubmitted(big.NewInt(100), hash1, 500) if !canProcess { t.Fatal("should be allowed to process") } } +// TestNotifyDKGResultSubmitted_RetryOpenAfterAbort verifies that a DKG result +// submission whose handling did not complete (aborted) can be retried on a +// later redelivery, and that a confirmed one is dropped as a duplicate. +func TestNotifyDKGResultSubmitted_RetryOpenAfterAbort(t *testing.T) { + deduplicator := deduplicator{ + dkgResultHashCache: cache.NewTimeCache(testDKGResultHashCachePeriod), + inProgress: make(map[string]bool), + } + + var hash [32]byte + seed := big.NewInt(100) + block := uint64(500) + + // Claim the event for handling. + if !deduplicator.notifyDKGResultSubmitted(seed, hash, block) { + t.Fatal("first claim should be allowed to process") + } + + // While the claim is in progress, a concurrent duplicate delivery must be + // ignored. + if deduplicator.notifyDKGResultSubmitted(seed, hash, block) { + t.Fatal("in-progress event should not be claimable again") + } + + // Handling failed, so the claim is released. + deduplicator.abortDKGResultSubmitted(seed, hash, block) + + // A later redelivery of the same event must be allowed to retry, rather + // than being dropped as an already-processed duplicate. + if !deduplicator.notifyDKGResultSubmitted(seed, hash, block) { + t.Fatal("event should be claimable again after an aborted attempt") + } + + // Once handling completes successfully, further deliveries are duplicates. + deduplicator.confirmDKGResultSubmitted(seed, hash, block) + if deduplicator.notifyDKGResultSubmitted(seed, hash, block) { + t.Fatal("confirmed event should not be claimable again") + } +} + func TestNotifyWalletClosed(t *testing.T) { deduplicator := deduplicator{ walletClosedCache: cache.NewTimeCache(testWalletClosedCachePeriod), + inProgress: make(map[string]bool), } wallet1 := [32]byte{1} wallet2 := [32]byte{2} - // Add the first wallet ID. + // Claim and confirm the first wallet ID. canProcess := deduplicator.notifyWalletClosed(wallet1) if !canProcess { t.Fatal("should be allowed to process") } + deduplicator.confirmWalletClosed(wallet1) - // Add the second wallet ID. + // Claim and confirm the second wallet ID. canProcess = deduplicator.notifyWalletClosed(wallet2) if !canProcess { t.Fatal("should be allowed to process") } + deduplicator.confirmWalletClosed(wallet2) - // Add the first wallet ID before caching period elapses. + // The first wallet ID is now a confirmed duplicate before the caching + // period elapses. canProcess = deduplicator.notifyWalletClosed(wallet1) if canProcess { t.Fatal("should not be allowed to process") @@ -145,9 +227,47 @@ func TestNotifyWalletClosed(t *testing.T) { // Wait until caching period elapses. time.Sleep(testWalletClosedCachePeriod) - // Add the first wallet ID again. + // The first wallet ID can be processed again after expiry. canProcess = deduplicator.notifyWalletClosed(wallet1) if !canProcess { t.Fatal("should be allowed to process") } } + +// TestNotifyWalletClosed_RetryOpenAfterAbort verifies that a wallet closure +// whose archival did not complete (aborted) can be retried on a later +// redelivery, and that a confirmed one is dropped as a duplicate. +func TestNotifyWalletClosed_RetryOpenAfterAbort(t *testing.T) { + deduplicator := deduplicator{ + walletClosedCache: cache.NewTimeCache(testWalletClosedCachePeriod), + inProgress: make(map[string]bool), + } + + wallet := [32]byte{1} + + // Claim the event for handling. + if !deduplicator.notifyWalletClosed(wallet) { + t.Fatal("first claim should be allowed to process") + } + + // While the claim is in progress, a concurrent duplicate delivery must be + // ignored. + if deduplicator.notifyWalletClosed(wallet) { + t.Fatal("in-progress event should not be claimable again") + } + + // Archival failed, so the claim is released. + deduplicator.abortWalletClosed(wallet) + + // A later redelivery of the same event must be allowed to retry, rather + // than being dropped as an already-processed duplicate. + if !deduplicator.notifyWalletClosed(wallet) { + t.Fatal("event should be claimable again after an aborted attempt") + } + + // Once archival completes successfully, further deliveries are duplicates. + deduplicator.confirmWalletClosed(wallet) + if deduplicator.notifyWalletClosed(wallet) { + t.Fatal("confirmed event should not be claimable again") + } +} diff --git a/pkg/tbtc/dkg.go b/pkg/tbtc/dkg.go index 177e225a18..cfd4db18df 100644 --- a/pkg/tbtc/dkg.go +++ b/pkg/tbtc/dkg.go @@ -576,12 +576,19 @@ func (de *dkgExecutor) publishDkgResult( // challenge. If the result is valid and the given node was involved in the DKG, // this function schedules an on-chain approve that is submitted once the // challenge period elapses. +// +// It returns a nil error once the result has reached a terminal handled state +// (invalid result challenged, valid result with no local approval duty, or +// valid result with approval scheduled). It returns a non-nil error when a +// transient failure (e.g. an RPC error) prevented validation from reaching a +// terminal state; callers may use that signal to retry on a later redelivery +// of the same event. func (de *dkgExecutor) executeDkgValidation( seed *big.Int, submissionBlock uint64, result *DKGChainResult, resultHash [32]byte, -) { +) error { dkgLogger := logger.With( zap.String("seed", fmt.Sprintf("0x%x", seed)), zap.String("groupPublicKey", fmt.Sprintf("0x%x", result.GroupPublicKey)), @@ -597,7 +604,7 @@ func (de *dkgExecutor) executeDkgValidation( isValid, err := de.chain.IsDKGResultValid(result) if err != nil { dkgLogger.Errorf("cannot validate DKG result: [%v]", err) - return + return fmt.Errorf("cannot validate DKG result: [%w]", err) } if !isValid { @@ -620,7 +627,10 @@ func (de *dkgExecutor) executeDkgValidation( "cannot challenge invalid DKG result: [%v]", err, ) - return + return fmt.Errorf( + "cannot challenge invalid DKG result: [%w]", + err, + ) } if de.metricsRecorder != nil { @@ -642,20 +652,23 @@ func (de *dkgExecutor) executeDkgValidation( "error while waiting for challenge confirmation: [%v]", err, ) - return + return fmt.Errorf( + "error while waiting for challenge confirmation: [%w]", + err, + ) } state, err := de.chain.GetDKGState() if err != nil { dkgLogger.Errorf("cannot check DKG state: [%v]", err) - return + return fmt.Errorf("cannot check DKG state: [%w]", err) } if state != Challenge { dkgLogger.Infof( "invalid DKG result challenged successfully", ) - return + return nil } dkgLogger.Infof( @@ -669,7 +682,7 @@ func (de *dkgExecutor) executeDkgValidation( operatorID, err := de.operatorIDFn() if err != nil { dkgLogger.Errorf("cannot get node's operator ID: [%v]", err) - return + return fmt.Errorf("cannot get node's operator ID: [%w]", err) } // Determine the member indexes controlled by this node's operator. @@ -689,7 +702,7 @@ func (de *dkgExecutor) executeDkgValidation( operatorID, result.Members, ) - return + return nil } dkgLogger.Infof("scheduling DKG result approval") @@ -697,7 +710,7 @@ func (de *dkgExecutor) executeDkgValidation( parameters, err := de.chain.DKGParameters() if err != nil { dkgLogger.Errorf("cannot get current DKG parameters: [%v]", err) - return + return fmt.Errorf("cannot get current DKG parameters: [%w]", err) } // The challenge period starts at the result submission block and lasts @@ -780,6 +793,10 @@ func (de *dkgExecutor) executeDkgValidation( dkgLogger.Infof("[member:%v] approving DKG result", memberIndex) }(currentMemberIndex) } + + // The result was valid and this node's approval duties have been scheduled; + // the event is terminally handled. + return nil } // finalSigningGroup takes three parameters: diff --git a/pkg/tbtc/dkg_result_submitted_handler.go b/pkg/tbtc/dkg_result_submitted_handler.go new file mode 100644 index 0000000000..7d17650c72 --- /dev/null +++ b/pkg/tbtc/dkg_result_submitted_handler.go @@ -0,0 +1,71 @@ +package tbtc + +// handleDKGResultSubmittedEvent runs the deduplication-guarded handling of a +// DKGResultSubmitted event. It claims the event, runs handle, and then either +// confirms the deduplication entry once handling reaches a terminal state (so +// subsequent deliveries of the same event are ignored as duplicates) or releases +// the claim when handle returns an error, so a later redelivery of the same +// event can retry. +// +// handle performs the actual DKG result validation and returns a non-nil error +// when validation did not reach a terminal state (e.g. a transient RPC error +// before the result could be challenged or its approval scheduled). Releasing +// the claim on such a failure is what prevents the node from being silently +// removed from the challenger set for the rest of the challenge window: the +// deduplication entry is recorded as completed only after handling succeeds. +func handleDKGResultSubmittedEvent( + deduplicator *deduplicator, + event *DKGResultSubmittedEvent, + handle func(event *DKGResultSubmittedEvent) error, +) { + if ok := deduplicator.notifyDKGResultSubmitted( + event.Seed, + event.ResultHash, + event.BlockNumber, + ); !ok { + logger.Warnf( + "Result with hash [0x%x] for DKG with seed [0x%x] "+ + "and starting block [%v] has been already processed", + event.ResultHash, + event.Seed, + event.BlockNumber, + ) + return + } + + logger.Infof( + "Result with hash [0x%x] for DKG with seed [0x%x] "+ + "submitted at block [%v]", + event.ResultHash, + event.Seed, + event.BlockNumber, + ) + + if err := handle(event); err != nil { + // Validation did not reach a terminal state (e.g. a transient + // RPC error before the result could be challenged or its + // approval scheduled). Release the deduplication claim so a + // later redelivery of the same event retries validation instead + // of being dropped as an already-processed duplicate. + logger.Warnf( + "DKG result validation for result with hash [0x%x] and "+ + "seed [0x%x] did not complete; allowing retry on event "+ + "redelivery: [%v]", + event.ResultHash, + event.Seed, + err, + ) + deduplicator.abortDKGResultSubmitted( + event.Seed, + event.ResultHash, + event.BlockNumber, + ) + return + } + + deduplicator.confirmDKGResultSubmitted( + event.Seed, + event.ResultHash, + event.BlockNumber, + ) +} diff --git a/pkg/tbtc/dkg_result_submitted_handler_test.go b/pkg/tbtc/dkg_result_submitted_handler_test.go new file mode 100644 index 0000000000..f766d4d42b --- /dev/null +++ b/pkg/tbtc/dkg_result_submitted_handler_test.go @@ -0,0 +1,103 @@ +package tbtc + +import ( + "fmt" + "math/big" + "testing" + + "github.com/keep-network/keep-core/internal/testutils" +) + +// TestHandleDKGResultSubmittedEvent_RetriesAfterTransientFailure verifies the +// TOB-TBTCACEXT-56 fix at the callback level: when the first delivery of a +// DKGResultSubmitted event fails transiently during validation, an identical +// redelivery must reach validation again instead of being dropped as an +// already-processed duplicate. Once validation reaches a terminal state, a +// further redelivery must be ignored as a duplicate. +func TestHandleDKGResultSubmittedEvent_RetriesAfterTransientFailure(t *testing.T) { + deduplicator := newDeduplicator() + + var resultHash DKGChainResultHash + copy(resultHash[:], []byte{0x01, 0x02, 0x03}) + event := &DKGResultSubmittedEvent{ + Seed: big.NewInt(100), + ResultHash: resultHash, + BlockNumber: 500, + } + + var validationCalls int + handle := func(*DKGResultSubmittedEvent) error { + validationCalls++ + if validationCalls == 1 { + // Simulate a transient RPC error before the result could be + // challenged or its approval scheduled. + return fmt.Errorf("transient validation failure") + } + return nil + } + + // First delivery: the event is claimed, validation fails, and the claim is + // released so the event stays retryable. + handleDKGResultSubmittedEvent(deduplicator, event, handle) + + // Redelivery of the identical event must reach validation again rather than + // being dropped as an already-processed duplicate. + handleDKGResultSubmittedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "validation calls after a failed delivery and its retry", + 2, + validationCalls, + ) + + // The retry succeeded and confirmed the event, so a further redelivery must + // be ignored as a duplicate and must not reach validation again. + handleDKGResultSubmittedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "validation calls after the event was confirmed handled", + 2, + validationCalls, + ) +} + +// TestHandleDKGResultSubmittedEvent_IgnoresConcurrentDuplicate verifies that +// while a DKGResultSubmitted event is being handled, a concurrent duplicate +// delivery of the same event is ignored rather than starting a second handling. +func TestHandleDKGResultSubmittedEvent_IgnoresConcurrentDuplicate(t *testing.T) { + deduplicator := newDeduplicator() + + var resultHash DKGChainResultHash + copy(resultHash[:], []byte{0x0a, 0x0b, 0x0c}) + event := &DKGResultSubmittedEvent{ + Seed: big.NewInt(200), + ResultHash: resultHash, + BlockNumber: 700, + } + + var nestedCalls int + handle := func(*DKGResultSubmittedEvent) error { + // While this handling is in progress, a duplicate delivery must be + // dropped and must not enter the handler a second time. + handleDKGResultSubmittedEvent( + deduplicator, + event, + func(*DKGResultSubmittedEvent) error { + nestedCalls++ + return nil + }, + ) + return nil + } + + handleDKGResultSubmittedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "handler calls triggered by an in-progress duplicate", + 0, + nestedCalls, + ) +} diff --git a/pkg/tbtc/dkg_started_handler.go b/pkg/tbtc/dkg_started_handler.go new file mode 100644 index 0000000000..285cdc5a17 --- /dev/null +++ b/pkg/tbtc/dkg_started_handler.go @@ -0,0 +1,53 @@ +package tbtc + +// handleDKGStartedEvent runs the deduplication-guarded handling of a DKGStarted +// event. It claims the event, runs handle, and then either confirms the +// deduplication entry once handling reaches a terminal state (so subsequent +// deliveries of the same event are ignored as duplicates) or releases the claim +// when handle returns an error, so a later redelivery of the same event can +// retry. +// +// handle performs the local DKG-start handling and returns a nil error once the +// event has been terminally handled (the local DKG join was dispatched, or the +// event turned out to be unconfirmed) and a non-nil error when a transient early +// return (e.g. a block-confirmation wait failure, a DKG-state check error, or a +// past-event lookup error) prevented handling from completing. Releasing the +// claim on such a failure is what lets a redelivered event retry so the operator +// still joins the new group: the deduplication entry is recorded as completed +// only after handling succeeds. +func handleDKGStartedEvent( + deduplicator *deduplicator, + event *DKGStartedEvent, + handle func(event *DKGStartedEvent) error, +) { + if ok := deduplicator.notifyDKGStarted( + event.Seed, + ); !ok { + logger.Infof( + "DKG started event with seed [0x%x] has been "+ + "already processed", + event.Seed, + ) + return + } + + if err := handle(event); err != nil { + // Handling did not reach a terminal state (e.g. a block-confirmation + // wait failure, a DKG-state check error, or a past-event lookup error + // before the local DKG join could be dispatched). Release the + // deduplication claim so a later redelivery of the same event retries + // the handling instead of being dropped as an already-processed + // duplicate, which would otherwise leave the operator out of the new + // group. + logger.Warnf( + "handling of DKG started event with seed [0x%x] did "+ + "not complete; allowing retry on event redelivery: [%v]", + event.Seed, + err, + ) + deduplicator.abortDKGStarted(event.Seed) + return + } + + deduplicator.confirmDKGStarted(event.Seed) +} diff --git a/pkg/tbtc/dkg_started_handler_test.go b/pkg/tbtc/dkg_started_handler_test.go new file mode 100644 index 0000000000..3dcccc96e0 --- /dev/null +++ b/pkg/tbtc/dkg_started_handler_test.go @@ -0,0 +1,99 @@ +package tbtc + +import ( + "fmt" + "math/big" + "testing" + + "github.com/keep-network/keep-core/internal/testutils" +) + +// TestHandleDKGStartedEvent_RetriesAfterTransientFailure verifies the +// TOB-TBTCACEXT-72 fix at the callback level: when the first delivery of a +// DKGStarted event fails after the deduplication claim (e.g. a block-confirmation +// wait failure before the local DKG join is dispatched), an identical redelivery +// must reach the handling path again instead of being dropped as an +// already-processed duplicate. Once handling reaches a terminal state, a further +// redelivery must be ignored as a duplicate. +func TestHandleDKGStartedEvent_RetriesAfterTransientFailure(t *testing.T) { + deduplicator := newDeduplicator() + + event := &DKGStartedEvent{ + Seed: big.NewInt(100), + BlockNumber: 500, + } + + var handlingCalls int + handle := func(*DKGStartedEvent) error { + handlingCalls++ + if handlingCalls == 1 { + // Simulate a transient early return (e.g. a failed block-height + // confirmation) before the local DKG join could be dispatched. + return fmt.Errorf("transient DKG started handling failure") + } + return nil + } + + // First delivery: the event is claimed, handling fails, and the claim is + // released so the event stays retryable. + handleDKGStartedEvent(deduplicator, event, handle) + + // Redelivery of the identical event must reach the handling path again + // rather than being dropped as an already-processed duplicate; otherwise the + // operator would never join the new group. + handleDKGStartedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "handling calls after a failed delivery and its retry", + 2, + handlingCalls, + ) + + // The retry succeeded and confirmed the event, so a further redelivery must + // be ignored as a duplicate and must not reach the handling path again. + handleDKGStartedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "handling calls after the event was confirmed handled", + 2, + handlingCalls, + ) +} + +// TestHandleDKGStartedEvent_IgnoresConcurrentDuplicate verifies that while a +// DKGStarted event is being handled, a concurrent duplicate delivery of the same +// event is ignored rather than starting a second handling. +func TestHandleDKGStartedEvent_IgnoresConcurrentDuplicate(t *testing.T) { + deduplicator := newDeduplicator() + + event := &DKGStartedEvent{ + Seed: big.NewInt(200), + BlockNumber: 700, + } + + var nestedCalls int + handle := func(*DKGStartedEvent) error { + // While this handling is in progress, a duplicate delivery must be + // dropped and must not enter the handler a second time. + handleDKGStartedEvent( + deduplicator, + event, + func(*DKGStartedEvent) error { + nestedCalls++ + return nil + }, + ) + return nil + } + + handleDKGStartedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "handler calls triggered by an in-progress duplicate", + 0, + nestedCalls, + ) +} diff --git a/pkg/tbtc/dkg_test.go b/pkg/tbtc/dkg_test.go index 547d1b5079..e3a88bde50 100644 --- a/pkg/tbtc/dkg_test.go +++ b/pkg/tbtc/dkg_test.go @@ -347,7 +347,7 @@ func TestDkgExecutor_ExecuteDkgValidation(t *testing.T) { }, ) - dkgExecutor.executeDkgValidation( + _ = dkgExecutor.executeDkgValidation( dkgResultSubmittedEvent.Seed, dkgResultSubmittedEvent.BlockNumber, dkgResultSubmittedEvent.Result, diff --git a/pkg/tbtc/node.go b/pkg/tbtc/node.go index 9a15e97096..87f92a6a20 100644 --- a/pkg/tbtc/node.go +++ b/pkg/tbtc/node.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "encoding/hex" + "errors" "fmt" "math/big" "sync" @@ -303,13 +304,17 @@ func (n *node) joinDKGIfEligible( // challenge. If the result is valid and the given node was involved in the DKG, // this function schedules an on-chain approve that is submitted once the // challenge period elapses. +// +// It returns a nil error once the result has been terminally handled and a +// non-nil error when a transient failure prevented handling from completing, +// so the caller can retry on a later redelivery of the same event. func (n *node) validateDKG( seed *big.Int, submissionBlock uint64, result *DKGChainResult, resultHash [32]byte, -) { - n.dkgExecutor.executeDkgValidation(seed, submissionBlock, result, resultHash) +) error { + return n.dkgExecutor.executeDkgValidation(seed, submissionBlock, result, resultHash) } // getSigningExecutor gets the signing executor responsible for executing @@ -415,6 +420,28 @@ func (n *node) getSigningExecutor( return executor, true, nil } +// invalidateSigningExecutor removes any cached signing executor for the given +// wallet. It is used when a wallet is archived (closed or terminated) so that a +// stale cached executor cannot keep the node signing for a wallet that has been +// removed from the wallet registry. getSigningExecutor consults the executor +// cache before the wallet registry, so without this eviction an executor +// created while the wallet was live would remain usable after archival. +func (n *node) invalidateSigningExecutor(walletPublicKey *ecdsa.PublicKey) error { + walletPublicKeyBytes, err := marshalPublicKey(walletPublicKey) + if err != nil { + return fmt.Errorf("cannot marshal wallet public key: [%v]", err) + } + + executorKey := hex.EncodeToString(walletPublicKeyBytes) + + n.signingExecutorsMutex.Lock() + defer n.signingExecutorsMutex.Unlock() + + delete(n.signingExecutors, executorKey) + + return nil +} + // getCoordinationExecutor gets the coordination executor responsible for // executing coordination related to a specific wallet whose part is controlled // by this node. The second boolean return value indicates whether the node @@ -1279,29 +1306,31 @@ func (n *node) archiveClosedWallets() error { for _, walletPublicKey := range walletPublicKeys { walletPublicKeyHash := bitcoin.PublicKeyHash(walletPublicKey) - walletID, err := n.chain.CalculateWalletID(walletPublicKey) + walletChainData, err := n.chain.GetWallet(walletPublicKeyHash) if err != nil { - return fmt.Errorf( - "could not calculate wallet ID for wallet with public key "+ - "hash [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - } + if errors.Is(err, ErrWalletNotFound) { + // The wallet is not recorded on-chain by the Bridge. This is the + // case for a freshly generated wallet whose DKG result has not + // yet been approved on-chain; during that approval window the + // wallet exists only on the node's disk. Such a wallet must not + // be archived because it may be about to become active. Only + // wallets that were registered on-chain and are now closed or + // terminated should be archived, so leave this one in place. + continue + } - isRegistered, err := n.chain.IsWalletRegistered(walletID) - if err != nil { return fmt.Errorf( - "could not check if wallet is registered for wallet with ID "+ - "[0x%x]: [%v]", + "could not get on-chain data for wallet with public key "+ + "hash [0x%x]: [%v]", walletPublicKeyHash, err, ) } - if !isRegistered { - // If the wallet is no longer registered it means the wallet has - // been closed or terminated. + // The wallet is recorded on-chain, so it has been registered. Archive it + // only if it has reached a closed or terminated state. + if walletChainData.State == StateClosed || + walletChainData.State == StateTerminated { err := n.walletRegistry.archiveWallet(walletPublicKeyHash) if err != nil { return fmt.Errorf( @@ -1312,10 +1341,10 @@ func (n *node) archiveClosedWallets() error { } logger.Infof( - "successfully archived wallet with ID [0x%x] and public key "+ - "hash [0x%x]", - walletID, + "successfully archived wallet with public key hash [0x%x] "+ + "in state [%v]", walletPublicKeyHash, + walletChainData.State, ) } } @@ -1385,6 +1414,17 @@ func (n *node) handleWalletClosure(walletID [32]byte) error { return fmt.Errorf("failed to archive the wallet: [%v]", err) } + // Evict any cached signing executor for the archived wallet so that the + // closure actually deauthorizes signing for it. getSigningExecutor consults + // the executor cache before the wallet registry, so a stale cached executor + // would otherwise keep the wallet signable until the process restarts. + if err := n.invalidateSigningExecutor(wallet.publicKey); err != nil { + return fmt.Errorf( + "failed to invalidate signing executor for archived wallet: [%v]", + err, + ) + } + logger.Infof( "successfully archived wallet with wallet ID [0x%x] and public key "+ "hash [0x%x]", diff --git a/pkg/tbtc/signing_done.go b/pkg/tbtc/signing_done.go index 58dfeccc83..65fe785df8 100644 --- a/pkg/tbtc/signing_done.go +++ b/pkg/tbtc/signing_done.go @@ -96,6 +96,17 @@ func (sdc *signingDoneCheck) listen( }) sdc.expectedSignersCount = len(attemptMembersIndexes) + // Record the members selected for this attempt so that done messages from + // members excluded from the current attempt can be rejected. This lookup is + // kept local to the listener goroutine below (captured by its closure and + // passed to isValidDoneMessage) instead of being stored on the shared + // signingDoneCheck struct. Keeping it out of shared state means concurrent + // listen calls on the same instance cannot race, and in particular cannot + // trigger a "concurrent map writes" fatal error, when populating it. + attemptMembers := make(map[group.MemberIndex]bool) + for _, memberIndex := range attemptMembersIndexes { + attemptMembers[memberIndex] = true + } sdc.doneSigners = make(map[group.MemberIndex]*signingDoneMessage) go func() { @@ -113,6 +124,7 @@ func (sdc *signingDoneCheck) listen( message, attemptNumber, attemptTimeoutBlock, + attemptMembers, ) { continue } @@ -169,30 +181,44 @@ func (sdc *signingDoneCheck) waitUntilAllDone(ctx context.Context) ( return nil, 0, errWaitDoneTimedOut case <-ticker.C: - if sdc.expectedSignersCount == len(sdc.doneSigners) { - var signature *tecdsa.Signature - var latestEndBlock uint64 - - for _, doneMessage := range sdc.doneSigners { - if signature == nil { - signature = doneMessage.signature - } else { - if !signature.Equals(doneMessage.signature) { - return nil, 0, fmt.Errorf( - "not matching signatures detected: [%v] and [%v]", - signature, - doneMessage.signature, - ) - } - } - - if doneMessage.endBlock > latestEndBlock { - latestEndBlock = doneMessage.endBlock - } + // Read doneSigners under the mutex to avoid a data race with the + // listen goroutine that writes to the same map. Results are captured + // into local variables so the lock can be released before returning. + sdc.doneSignersMutex.Lock() + + if sdc.expectedSignersCount != len(sdc.doneSigners) { + sdc.doneSignersMutex.Unlock() + continue + } + + var signature *tecdsa.Signature + var latestEndBlock uint64 + var mismatchedSignature *tecdsa.Signature + + for _, doneMessage := range sdc.doneSigners { + if signature == nil { + signature = doneMessage.signature + } else if !signature.Equals(doneMessage.signature) { + mismatchedSignature = doneMessage.signature + break + } + + if doneMessage.endBlock > latestEndBlock { + latestEndBlock = doneMessage.endBlock } + } - return &signing.Result{Signature: signature}, latestEndBlock, nil + sdc.doneSignersMutex.Unlock() + + if mismatchedSignature != nil { + return nil, 0, fmt.Errorf( + "not matching signatures detected: [%v] and [%v]", + signature, + mismatchedSignature, + ) } + + return &signing.Result{Signature: signature}, latestEndBlock, nil } } } @@ -205,8 +231,11 @@ func (sdc *signingDoneCheck) isValidDoneMessage( message *big.Int, attemptNumber uint64, attemptTimeoutBlock uint64, + attemptMembers map[group.MemberIndex]bool, ) bool { + sdc.doneSignersMutex.Lock() _, signerDone := sdc.doneSigners[doneMessage.senderID] + sdc.doneSignersMutex.Unlock() if signerDone { // only one done message allowed return false @@ -219,6 +248,13 @@ func (sdc *signingDoneCheck) isValidDoneMessage( return false } + // The sender must have been selected for the current signing attempt. A + // valid wallet group member that was excluded from this attempt must not + // count toward its completion threshold. + if !attemptMembers[doneMessage.senderID] { + return false + } + if doneMessage.message.Cmp(message) != 0 { return false } diff --git a/pkg/tbtc/signing_done_test.go b/pkg/tbtc/signing_done_test.go index 792edd6b68..4cda11f0ab 100644 --- a/pkg/tbtc/signing_done_test.go +++ b/pkg/tbtc/signing_done_test.go @@ -293,6 +293,166 @@ func TestSigningDoneCheck_AnotherSignature(t *testing.T) { } } +// TestSigningDoneCheck_ConcurrentDoneSignersAccess exercises the concurrent +// access to the doneSigners map: the listen goroutine writes to the map as done +// messages arrive while waitUntilAllDone reads it on every tick. It is meant to +// be run with the race detector (go test -race); without the doneSigners mutex +// guarding every access, the concurrent read/write is a data race. +// +// Done messages are broadcast with the standard, stateless retransmission +// strategy so this test isolates the doneSigners race and does not depend on +// the separate backoff-strategy synchronization. +func TestSigningDoneCheck_ConcurrentDoneSignersAccess(t *testing.T) { + groupParameters := &GroupParameters{ + GroupSize: 5, + GroupQuorum: 4, + HonestThreshold: 3, + } + + doneCheck := setupSigningDoneCheck(t, groupParameters) + + memberIndexes := make([]group.MemberIndex, doneCheck.groupSize) + for i := range memberIndexes { + memberIndexes[i] = group.MemberIndex(i + 1) + } + + ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelCtx() + + message := big.NewInt(100) + attemptNumber := uint64(1) + attemptTimeoutBlock := uint64(1000) + attemptMemberIndexes := memberIndexes[:groupParameters.HonestThreshold] + result := &signing.Result{ + Signature: &tecdsa.Signature{ + R: big.NewInt(200), + S: big.NewInt(300), + RecoveryID: 2, + }, + } + + // Start the listener goroutine, which writes to doneSigners as messages + // arrive. + doneCheck.listen( + ctx, + message, + attemptNumber, + attemptTimeoutBlock, + attemptMemberIndexes, + ) + + // Concurrently broadcast every attempt member's done message so the listener + // writes to doneSigners while waitUntilAllDone reads it below. + for _, memberIndex := range attemptMemberIndexes { + go func(memberIndex group.MemberIndex) { + _ = doneCheck.broadcastChannel.Send( + ctx, + &signingDoneMessage{ + senderID: memberIndex, + message: message, + attemptNumber: attemptNumber, + signature: result.Signature, + endBlock: 500 + uint64(memberIndex), + }, + net.StandardRetransmissionStrategy, + ) + }(memberIndex) + } + + returnedResult, _, err := doneCheck.waitUntilAllDone(ctx) + if err != nil { + t.Fatalf("unexpected error: [%v]", err) + } + if returnedResult == nil { + t.Fatal("unexpected nil result") + } + if !result.Signature.Equals(returnedResult.Signature) { + t.Errorf( + "unexpected signature\nexpected: [%v]\nactual: [%v]", + result.Signature, + returnedResult.Signature, + ) + } +} + +// TestSigningDoneCheck_RejectsNonAttemptMember covers the scenario where a +// valid wallet group member that was not selected for the current signing +// attempt sends a done message. Such a message must not count toward the +// attempt completion threshold; otherwise the check could complete before all +// selected signers finish. +func TestSigningDoneCheck_RejectsNonAttemptMember(t *testing.T) { + groupParameters := &GroupParameters{ + GroupSize: 5, + GroupQuorum: 4, + HonestThreshold: 3, + } + + doneCheck := setupSigningDoneCheck(t, groupParameters) + + memberIndexes := make([]group.MemberIndex, doneCheck.groupSize) + for i := range memberIndexes { + memberIndexes[i] = group.MemberIndex(i + 1) + } + + // The attempt selects members 1, 2, and 3. Member 5 is a valid wallet group + // member but is excluded from this attempt. + attemptMemberIndexes := memberIndexes[:groupParameters.HonestThreshold] + nonAttemptMember := memberIndexes[groupParameters.GroupSize-1] + + ctx, cancelCtx := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancelCtx() + + message := big.NewInt(100) + attemptNumber := uint64(1) + attemptTimeoutBlock := uint64(1000) + result := &signing.Result{ + Signature: &tecdsa.Signature{ + R: big.NewInt(200), + S: big.NewInt(300), + RecoveryID: 2, + }, + } + + doneCheck.listen( + ctx, + message, + attemptNumber, + attemptTimeoutBlock, + attemptMemberIndexes, + ) + + // Two attempt members (1 and 2) plus the excluded member (5) send done + // messages. If the excluded member were counted, waitUntilAllDone would see + // three messages, match the expected count, and complete before the third + // attempt member (3) reports. + for _, memberIndex := range []group.MemberIndex{ + attemptMemberIndexes[0], + attemptMemberIndexes[1], + nonAttemptMember, + } { + err := doneCheck.signalDone( + ctx, + memberIndex, + message, + attemptNumber, + result, + 100, + ) + if err != nil { + t.Fatal(err) + } + } + + // The excluded member must not count toward the completion threshold, so + // waitUntilAllDone must time out rather than returning a result. + returnedResult, endBlock, err := doneCheck.waitUntilAllDone(ctx) + if returnedResult != nil { + t.Errorf("expected nil result, got [%v]", returnedResult) + } + testutils.AssertIntsEqual(t, "end block", 0, int(endBlock)) + testutils.AssertErrorsSame(t, errWaitDoneTimedOut, err) +} + // setupSigningDoneCheck sets up an instance of the signing done check ready // to perform test checks. func setupSigningDoneCheck( diff --git a/pkg/tbtc/tbtc.go b/pkg/tbtc/tbtc.go index 70a9756e98..e746dc1f90 100644 --- a/pkg/tbtc/tbtc.go +++ b/pkg/tbtc/tbtc.go @@ -85,6 +85,7 @@ func Initialize( clientInfo *clientinfo.Registry, perfMetrics *clientinfo.PerformanceMetrics, minActiveOutpointConfirmations uint, + bridgeCovenantFraudDefenseConfirmed bool, ) (covenantsigner.Engine, error) { groupParameters := &GroupParameters{ GroupSize: 100, @@ -172,161 +173,142 @@ func Initialize( _ = chain.OnDKGStarted(func(event *DKGStartedEvent) { go func() { - if ok := deduplicator.notifyDKGStarted( - event.Seed, - ); !ok { - logger.Infof( - "DKG started event with seed [0x%x] has been "+ - "already processed", - event.Seed, - ) - return - } - - confirmationBlock := event.BlockNumber + dkgStartedConfirmationBlocks - - logger.Infof( - "observed DKG started event with seed [0x%x] and "+ - "starting block [%v]; waiting for block [%v] to confirm", - event.Seed, - event.BlockNumber, - confirmationBlock, + // handleDKGStartedEvent records the deduplication entry as completed + // only once the local DKG join has been dispatched (or the event was + // authoritatively found unconfirmed), so a transient early return in + // the handler below leaves the event retryable on redelivery. + handleDKGStartedEvent( + deduplicator, + event, + func(event *DKGStartedEvent) error { + confirmationBlock := event.BlockNumber + dkgStartedConfirmationBlocks + + logger.Infof( + "observed DKG started event with seed [0x%x] and "+ + "starting block [%v]; waiting for block [%v] to confirm", + event.Seed, + event.BlockNumber, + confirmationBlock, + ) + + if err := node.waitForBlockHeight(ctx, confirmationBlock); err != nil { + return fmt.Errorf( + "failed to confirm DKG started event: [%w]", + err, + ) + } + + dkgState, err := chain.GetDKGState() + if err != nil { + return fmt.Errorf("failed to check DKG state: [%w]", err) + } + + if dkgState != AwaitingResult { + logger.Infof( + "DKG started event with seed [0x%x] and starting "+ + "block [%v] was not confirmed", + event.Seed, + event.BlockNumber, + ) + + // The event was authoritatively determined to be + // unconfirmed; there is nothing to retry, so treat it as + // terminally handled. + return nil + } + + // Fetch all past DKG started events starting from one + // confirmation period before the original event's block. + // If there was a chain reorg, the event we received could be + // moved to a block with a lower number than the one + // we received. + pastEvents, err := chain.PastDKGStartedEvents( + &DKGStartedEventFilter{ + StartBlock: event.BlockNumber - dkgStartedConfirmationBlocks, + }, + ) + if err != nil { + return fmt.Errorf( + "failed to get past DKG started events: [%w]", + err, + ) + } + + // Should not happen but just in case. + if len(pastEvents) == 0 { + return fmt.Errorf("no past DKG started events") + } + + lastEvent := pastEvents[len(pastEvents)-1] + + logger.Infof( + "DKG started with seed [0x%x] at block [%v]", + lastEvent.Seed, + lastEvent.BlockNumber, + ) + + // The off-chain protocol should be started as close as + // possible to the current block or even further. Starting the + // off-chain protocol with a past block will likely cause a + // failure of the first attempt as the start block is used to + // synchronize the announcements and the state machine. Here we + // ensure a proper start point by delaying the execution by the + // confirmation period length. + node.joinDKGIfEligible( + lastEvent.Seed, + lastEvent.BlockNumber, + dkgStartedConfirmationBlocks, + ) + + // The local DKG join has been dispatched; the event is + // terminally handled. + return nil + }, ) - - err := node.waitForBlockHeight(ctx, confirmationBlock) - if err != nil { - logger.Errorf("failed to confirm DKG started event: [%v]", err) - return - } - - dkgState, err := chain.GetDKGState() - if err != nil { - logger.Errorf("failed to check DKG state: [%v]", err) - return - } - - if dkgState == AwaitingResult { - // Fetch all past DKG started events starting from one - // confirmation period before the original event's block. - // If there was a chain reorg, the event we received could be - // moved to a block with a lower number than the one - // we received. - pastEvents, err := chain.PastDKGStartedEvents( - &DKGStartedEventFilter{ - StartBlock: event.BlockNumber - dkgStartedConfirmationBlocks, - }, - ) - if err != nil { - logger.Errorf("failed to get past DKG started events: [%v]", err) - return - } - - // Should not happen but just in case. - if len(pastEvents) == 0 { - logger.Errorf("no past DKG started events") - return - } - - lastEvent := pastEvents[len(pastEvents)-1] - - logger.Infof( - "DKG started with seed [0x%x] at block [%v]", - lastEvent.Seed, - lastEvent.BlockNumber, - ) - - // The off-chain protocol should be started as close as possible - // to the current block or even further. Starting the off-chain - // protocol with a past block will likely cause a failure of the - // first attempt as the start block is used to synchronize - // the announcements and the state machine. Here we ensure - // a proper start point by delaying the execution by the - // confirmation period length. - node.joinDKGIfEligible( - lastEvent.Seed, - lastEvent.BlockNumber, - dkgStartedConfirmationBlocks, - ) - } else { - logger.Infof( - "DKG started event with seed [0x%x] and starting "+ - "block [%v] was not confirmed", - event.Seed, - event.BlockNumber, - ) - } }() }) _ = chain.OnDKGResultSubmitted(func(event *DKGResultSubmittedEvent) { go func() { - if ok := deduplicator.notifyDKGResultSubmitted( - event.Seed, - event.ResultHash, - event.BlockNumber, - ); !ok { - logger.Warnf( - "Result with hash [0x%x] for DKG with seed [0x%x] "+ - "and starting block [%v] has been already processed", - event.ResultHash, - event.Seed, - event.BlockNumber, - ) - return - } - - logger.Infof( - "Result with hash [0x%x] for DKG with seed [0x%x] "+ - "submitted at block [%v]", - event.ResultHash, - event.Seed, - event.BlockNumber, - ) - - node.validateDKG( - event.Seed, - event.BlockNumber, - event.Result, - event.ResultHash, + // handleDKGResultSubmittedEvent records the deduplication entry as + // completed only after validation reaches a terminal state, so a + // transient failure below leaves the event retryable on redelivery. + handleDKGResultSubmittedEvent( + deduplicator, + event, + func(event *DKGResultSubmittedEvent) error { + return node.validateDKG( + event.Seed, + event.BlockNumber, + event.Result, + event.ResultHash, + ) + }, ) }() }) _ = chain.OnWalletClosed(func(event *WalletClosedEvent) { go func() { - if ok := deduplicator.notifyWalletClosed( - event.WalletID, - ); !ok { - logger.Warnf( - "Wallet closure for wallet with ID [0x%x] at block [%v] "+ - "has been already processed", - event.WalletID, - event.BlockNumber, - ) - return - } - - logger.Infof( - "Wallet with ID [0x%x] has been closed at block [%v]; "+ - "proceeding with handling wallet closure", - event.WalletID, - event.BlockNumber, - ) - - err := node.handleWalletClosure( - event.WalletID, + // handleWalletClosedEvent records the deduplication entry as + // completed only after the wallet has actually been archived, so a + // transient archival failure below leaves the event retryable on + // redelivery. + handleWalletClosedEvent( + deduplicator, + event, + func(event *WalletClosedEvent) error { + return node.handleWalletClosure(event.WalletID) + }, ) - if err != nil { - logger.Errorf( - "Failure while handling wallet closure with ID [0x%x]: [%v]", - event.WalletID, - err, - ) - } }() }) - return newCovenantSignerEngine(node, minActiveOutpointConfirmations), nil + return newCovenantSignerEngine( + node, + minActiveOutpointConfirmations, + bridgeCovenantFraudDefenseConfirmed, + ), nil } // enoughPreParamsInPoolPolicy is a policy that enforces the sufficient size diff --git a/pkg/tbtc/wallet_closed_handler.go b/pkg/tbtc/wallet_closed_handler.go new file mode 100644 index 0000000000..591fd75f36 --- /dev/null +++ b/pkg/tbtc/wallet_closed_handler.go @@ -0,0 +1,59 @@ +package tbtc + +// handleWalletClosedEvent runs the deduplication-guarded handling of a +// WalletClosed event. It claims the event, runs handle, and then either confirms +// the deduplication entry once the wallet has actually been archived (so +// subsequent deliveries of the same event are ignored as duplicates) or releases +// the claim when handle returns an error, so a later redelivery of the same +// event can retry the archival. +// +// handle performs the actual wallet-closure archival and returns a non-nil error +// when archival did not complete (e.g. a transient RPC error while waiting for +// closure confirmation, or a failed archive write). Releasing the claim on such +// a failure is what prevents a closed wallet from remaining signable through +// fresh getSigningExecutor lookups until process restart: the deduplication +// entry is recorded as completed only after the wallet is removed from the local +// registry. +func handleWalletClosedEvent( + deduplicator *deduplicator, + event *WalletClosedEvent, + handle func(event *WalletClosedEvent) error, +) { + if ok := deduplicator.notifyWalletClosed( + event.WalletID, + ); !ok { + logger.Warnf( + "Wallet closure for wallet with ID [0x%x] at block [%v] "+ + "has been already processed", + event.WalletID, + event.BlockNumber, + ) + return + } + + logger.Infof( + "Wallet with ID [0x%x] has been closed at block [%v]; "+ + "proceeding with handling wallet closure", + event.WalletID, + event.BlockNumber, + ) + + if err := handle(event); err != nil { + // Archival did not complete (e.g. a transient RPC error while + // waiting for closure confirmation, or a failed archive write). + // Release the deduplication claim so a later redelivery of the + // same event retries the archival instead of being dropped as + // an already-processed duplicate, which would otherwise leave + // the closed wallet signable in the local registry. + logger.Errorf( + "Failure while handling wallet closure with ID [0x%x]; "+ + "allowing retry on event redelivery: [%v]", + event.WalletID, + err, + ) + deduplicator.abortWalletClosed(event.WalletID) + return + } + + deduplicator.confirmWalletClosed(event.WalletID) +} diff --git a/pkg/tbtc/wallet_closed_handler_test.go b/pkg/tbtc/wallet_closed_handler_test.go new file mode 100644 index 0000000000..e5ecd1cdd0 --- /dev/null +++ b/pkg/tbtc/wallet_closed_handler_test.go @@ -0,0 +1,97 @@ +package tbtc + +import ( + "fmt" + "testing" + + "github.com/keep-network/keep-core/internal/testutils" +) + +// TestHandleWalletClosedEvent_RetriesAfterArchivalFailure verifies the +// TOB-TBTCACEXT-57 fix at the callback level: when the first delivery of a +// WalletClosed event fails during archival, an identical redelivery must reach +// the archival handler again instead of being dropped as an already-processed +// duplicate. Once archival succeeds, a further redelivery must be ignored as a +// duplicate. +func TestHandleWalletClosedEvent_RetriesAfterArchivalFailure(t *testing.T) { + deduplicator := newDeduplicator() + + event := &WalletClosedEvent{ + WalletID: [32]byte{0x01, 0x02, 0x03}, + BlockNumber: 1000, + } + + var archivalCalls int + handle := func(*WalletClosedEvent) error { + archivalCalls++ + if archivalCalls == 1 { + // Simulate a transient failure while waiting for closure + // confirmation before the wallet could be archived. + return fmt.Errorf("transient archival failure") + } + return nil + } + + // First delivery: the event is claimed, archival fails, and the claim is + // released so the event stays retryable. + handleWalletClosedEvent(deduplicator, event, handle) + + // Redelivery of the identical event must reach the archival handler again + // rather than being dropped as an already-processed duplicate; otherwise the + // closed wallet would remain signable in the local registry. + handleWalletClosedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "archival calls after a failed delivery and its retry", + 2, + archivalCalls, + ) + + // The retry succeeded and confirmed the event, so a further redelivery must + // be ignored as a duplicate and must not reach the archival handler again. + handleWalletClosedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "archival calls after the event was confirmed handled", + 2, + archivalCalls, + ) +} + +// TestHandleWalletClosedEvent_IgnoresConcurrentDuplicate verifies that while a +// WalletClosed event is being handled, a concurrent duplicate delivery of the +// same event is ignored rather than starting a second archival. +func TestHandleWalletClosedEvent_IgnoresConcurrentDuplicate(t *testing.T) { + deduplicator := newDeduplicator() + + event := &WalletClosedEvent{ + WalletID: [32]byte{0x0a, 0x0b, 0x0c}, + BlockNumber: 2000, + } + + var nestedCalls int + handle := func(*WalletClosedEvent) error { + // While this archival is in progress, a duplicate delivery must be + // dropped and must not enter the handler a second time. + handleWalletClosedEvent( + deduplicator, + event, + func(*WalletClosedEvent) error { + nestedCalls++ + return nil + }, + ) + return nil + } + + handleWalletClosedEvent(deduplicator, event, handle) + + testutils.AssertIntsEqual( + t, + "handler calls triggered by an in-progress duplicate", + 0, + nestedCalls, + ) +} diff --git a/pkg/tecdsa/retry/retry.go b/pkg/tecdsa/retry/retry.go index 246e8b1fae..798d3bed30 100644 --- a/pkg/tecdsa/retry/retry.go +++ b/pkg/tecdsa/retry/retry.go @@ -305,7 +305,7 @@ func excludeOperatorTriplets( for k := j + 1; k < len(operators); k++ { leftOperator := operators[i] middleOperator := operators[j] - rightOperator := operators[j] + rightOperator := operators[k] // Only include the operators triples that have few enough seats such // that if they were excluded we still have at least diff --git a/pkg/tecdsa/retry/retry_test.go b/pkg/tecdsa/retry/retry_test.go index 24775c4c7e..30e15060d9 100644 --- a/pkg/tecdsa/retry/retry_test.go +++ b/pkg/tecdsa/retry/retry_test.go @@ -2,6 +2,7 @@ package retry import ( "fmt" + "math/rand" "reflect" "strings" "testing" @@ -249,3 +250,90 @@ func assertInvariants( affectedBySeed(t, groupMemberRandomizer, groupMembers, seed, retryCount, retryParticipantsCount) affectedByRetryCount(t, groupMemberRandomizer, groupMembers, seed, retryCount, retryParticipantsCount) } + +// TestExcludeOperatorTriplets_EligibilityFilterUsesThirdOperatorSeats verifies +// that the triplet eligibility filter subtracts the seat count of all three +// distinct operators in a triple. The filter must count the third operator's +// (operators[k]) seats; reusing the middle operator's (operators[j]) seats in +// its place double-counts the middle operator and ignores the third, which both +// admits triples whose true post-exclusion seat count is below +// retryParticipantsCount and drops valid triples. +func TestExcludeOperatorTriplets_EligibilityFilterUsesThirdOperatorSeats(t *testing.T) { + // Four operators with deliberately distinguishable seat counts. The last + // operator (D) controls far more seats than the others, so every triple that + // includes D must be rejected once D's seats are correctly subtracted. + operators := []chain.Address{"A", "B", "C", "D"} + operatorToSeatCount := map[chain.Address]uint{ + "A": 1, + "B": 1, + "C": 1, + "D": 10, + } + + // groupMembers is consistent with the seat counts above: 1+1+1+10 = 13 seats. + groupMembers := make([]chain.Address, 0, 13) + for _, operator := range operators { + for i := uint(0); i < operatorToSeatCount[operator]; i++ { + groupMembers = append(groupMembers, operator) + } + } + + // With retryParticipantsCount = 5, only the triple {A, B, C} leaves enough + // seats: 13 - 1 - 1 - 1 = 10 >= 5. Every triple that includes D leaves + // 13 - 1 - 1 - 10 = 1 < 5 and must be filtered out. The buggy arithmetic + // (subtracting the middle operator's seats twice and ignoring D's seats) + // would instead admit all four triples. + retryParticipantsCount := 5 + + rng := rand.New(rand.NewSource(1)) + + // An out-of-range index makes excludeOperatorTriplets report the number of + // eligible triplets without shuffling, which is exactly the value produced + // by the eligibility filter. + _, eligibleTripletCount, ok := excludeOperatorTriplets( + rng, + groupMembers, + 1<<30, + operatorToSeatCount, + operators, + retryParticipantsCount, + ) + if ok { + t.Fatal("expected excludeOperatorTriplets to report an out-of-range index") + } + if eligibleTripletCount != 1 { + t.Fatalf( + "unexpected eligible triplet count\nexpected: [1] (only {A, B, C})\nactual: [%d]", + eligibleTripletCount, + ) + } + + // The single eligible triplet must be {A, B, C}; excluding it leaves only + // D's seats. If the filter had admitted a triple containing D, the resulting + // subset would still contain one of A, B, or C. + subset, _, ok := excludeOperatorTriplets( + rng, + groupMembers, + 0, + operatorToSeatCount, + operators, + retryParticipantsCount, + ) + if !ok { + t.Fatal("expected excludeOperatorTriplets to select the eligible triplet") + } + for _, operator := range subset { + if operator != "D" { + t.Errorf( + "subset should exclude {A, B, C} and contain only D seats, found [%s]", + operator, + ) + } + } + if len(subset) != 10 { + t.Errorf( + "unexpected subset size\nexpected: [10] (all D seats)\nactual: [%d]", + len(subset), + ) + } +} diff --git a/solidity-v1/contracts/TokenStakingEscrow.sol b/solidity-v1/contracts/TokenStakingEscrow.sol index e7a1564d2e..5192742ca7 100644 --- a/solidity-v1/contracts/TokenStakingEscrow.sol +++ b/solidity-v1/contracts/TokenStakingEscrow.sol @@ -442,7 +442,12 @@ contract TokenStakingEscrow is Ownable { address grantManager ) internal { uint256 amount = availableAmount(operator); - deposits[operator].withdrawn = amount; + // Add to the existing withdrawn counter instead of overwriting it, so + // that a prior partial withdrawal is not discarded. Overwriting would + // reset availableAmount and let the revoked deposit be withdrawn + // repeatedly, draining the escrow's shared token balance. This mirrors + // the accounting used by withdraw and migrate. + deposits[operator].withdrawn = deposit.withdrawn.add(amount); keepToken.safeTransfer(grantManager, amount); emit RevokedDepositWithdrawn(operator, grantManager, amount); diff --git a/solidity-v1/test/token_stake/TestTokenStakingEscrow.js b/solidity-v1/test/token_stake/TestTokenStakingEscrow.js index c135e62b4b..735d86fe86 100644 --- a/solidity-v1/test/token_stake/TestTokenStakingEscrow.js +++ b/solidity-v1/test/token_stake/TestTokenStakingEscrow.js @@ -565,6 +565,28 @@ describe("TokenStakingEscrow", () => { ) }) + it("does not allow a second withdrawal after a partial withdraw", async () => { + // Withdraw part of the unlocked deposit first, so the withdrawn counter + // is non-zero before the grant is revoked. + await time.increaseTo(grantStart.add(time.duration.days(15))) + await escrow.withdraw(operator, { from: operator }) // (300k / 30) * 15 = 150k KEEP + await tokenGrant.revoke(grantId, { from: grantManager }) + + // The first revoked withdrawal transfers the remaining 150k. + await escrow.withdrawRevoked(operator, { from: grantManager }) + + // A second revoked withdrawal must transfer nothing. Overwriting the + // withdrawn counter instead of adding to it would discard the prior + // partial withdrawal and let the same tokens be withdrawn again, draining + // the escrow's shared token balance. + const balanceBefore = await token.balanceOf(grantManager) + await escrow.withdrawRevoked(operator, { from: grantManager }) + const balanceAfter = await token.balanceOf(grantManager) + + const diff = balanceAfter.sub(balanceBefore) + expect(diff).to.eq.BN(0) + }) + it("withdraws entire deposited amount if nothing has been withdrawn before", async () => { await time.increaseTo(grantStart.add(time.duration.days(15))) await tokenGrant.revoke(grantId, { from: grantManager })