Files
kalzu-value-store/cluster/gossip.go
ryyst c7dcebb894 feat: implement secure cluster authentication (issue #13)
Implemented a comprehensive secure authentication mechanism for inter-node
cluster communication with the following features:

1. Global Cluster Secret (GCS)
   - Auto-generated cryptographically secure random secret (256-bit)
   - Configurable via YAML config file
   - Shared across all cluster nodes for authentication

2. Cluster Authentication Middleware
   - Validates X-Cluster-Secret and X-Node-ID headers
   - Applied to all cluster endpoints (/members/*, /merkle_tree/*, /kv_range)
   - Comprehensive logging of authentication attempts

3. Authenticated HTTP Client
   - Custom HTTP client with cluster auth headers
   - TLS support with configurable certificate verification
   - Protocol-aware (http/https based on TLS settings)

4. Secure Bootstrap Endpoint
   - New /auth/cluster-bootstrap endpoint
   - Protected by JWT authentication with admin scope
   - Allows new nodes to securely obtain cluster secret

5. Updated Cluster Communication
   - All gossip protocol requests include auth headers
   - All Merkle tree sync requests include auth headers
   - All data replication requests include auth headers

6. Configuration
   - cluster_secret: Shared secret (auto-generated if not provided)
   - cluster_tls_enabled: Enable TLS for inter-node communication
   - cluster_tls_cert_file: Path to TLS certificate
   - cluster_tls_key_file: Path to TLS private key
   - cluster_tls_skip_verify: Skip TLS verification (testing only)

This implementation addresses the security vulnerability of unprotected
cluster endpoints and provides a flexible, secure approach to protecting
internal cluster communication while allowing for automated node bootstrapping.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:19:40 +03:00

313 lines
8.4 KiB
Go

package cluster
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"github.com/sirupsen/logrus"
"kvs/types"
)
// GossipService handles gossip protocol operations
type GossipService struct {
config *types.Config
members map[string]*types.Member
membersMu sync.RWMutex
logger *logrus.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewGossipService creates a new gossip service
func NewGossipService(config *types.Config, logger *logrus.Logger) *GossipService {
ctx, cancel := context.WithCancel(context.Background())
return &GossipService{
config: config,
members: make(map[string]*types.Member),
logger: logger,
ctx: ctx,
cancel: cancel,
}
}
// Start begins the gossip routine
func (s *GossipService) Start() {
if !s.config.ClusteringEnabled {
s.logger.Info("Clustering disabled, skipping gossip routine")
return
}
s.wg.Add(1)
go s.gossipRoutine()
}
// Stop terminates the gossip service
func (s *GossipService) Stop() {
s.cancel()
s.wg.Wait()
}
// AddMember adds a member to the gossip member list
func (s *GossipService) AddMember(member *types.Member) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
s.members[member.ID] = member
s.logger.WithFields(logrus.Fields{
"node_id": member.ID,
"address": member.Address,
}).Info("Member added")
}
// RemoveMember removes a member from the gossip member list
func (s *GossipService) RemoveMember(nodeID string) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
if member, exists := s.members[nodeID]; exists {
delete(s.members, nodeID)
s.logger.WithFields(logrus.Fields{
"node_id": member.ID,
"address": member.Address,
}).Info("Member removed")
}
}
// GetMembers returns a copy of all members
func (s *GossipService) GetMembers() []*types.Member {
s.membersMu.RLock()
defer s.membersMu.RUnlock()
members := make([]*types.Member, 0, len(s.members))
for _, member := range s.members {
members = append(members, member)
}
return members
}
// GetHealthyMembers returns members that have been seen recently
func (s *GossipService) GetHealthyMembers() []*types.Member {
s.membersMu.RLock()
defer s.membersMu.RUnlock()
now := time.Now().UnixMilli()
healthyMembers := make([]*types.Member, 0)
for _, member := range s.members {
// Consider member healthy if last seen within last 5 minutes
if now-member.LastSeen < 5*60*1000 {
healthyMembers = append(healthyMembers, member)
}
}
return healthyMembers
}
// gossipRoutine runs periodically to exchange member lists
func (s *GossipService) gossipRoutine() {
defer s.wg.Done()
for {
// Random interval between 1-2 minutes
minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second
maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second
interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval)))
select {
case <-s.ctx.Done():
return
case <-time.After(interval):
s.performGossipRound()
}
}
}
// performGossipRound performs a gossip round with random healthy peers
func (s *GossipService) performGossipRound() {
members := s.GetHealthyMembers()
if len(members) == 0 {
s.logger.Debug("No healthy members for gossip round")
return
}
// Select 1-3 random peers for gossip
maxPeers := 3
if len(members) < maxPeers {
maxPeers = len(members)
}
// Shuffle and select
rand.Shuffle(len(members), func(i, j int) {
members[i], members[j] = members[j], members[i]
})
selectedPeers := members[:rand.Intn(maxPeers)+1]
for _, peer := range selectedPeers {
go s.gossipWithPeer(peer)
}
}
// gossipWithPeer performs gossip with a specific peer
func (s *GossipService) gossipWithPeer(peer *types.Member) error {
s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer")
// Get our current member list
localMembers := s.GetMembers()
// Send our member list to the peer
gossipData := make([]types.Member, len(localMembers))
for i, member := range localMembers {
gossipData[i] = *member
}
// Add ourselves to the list
selfMember := types.Member{
ID: s.config.NodeID,
Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port),
LastSeen: time.Now().UnixMilli(),
JoinedTimestamp: s.GetJoinedTimestamp(),
}
gossipData = append(gossipData, selfMember)
jsonData, err := json.Marshal(gossipData)
if err != nil {
s.logger.WithError(err).Error("Failed to marshal gossip data")
return err
}
// Send HTTP request to peer with cluster authentication
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create gossip request")
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,
"error": err.Error(),
}).Warn("Failed to gossip with peer")
s.markPeerUnhealthy(peer.ID)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,
"status": resp.StatusCode,
}).Warn("Gossip request failed")
s.markPeerUnhealthy(peer.ID)
return fmt.Errorf("gossip request failed with status %d", resp.StatusCode)
}
// Process response - peer's member list
var remoteMemberList []types.Member
if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil {
s.logger.WithError(err).Error("Failed to decode gossip response")
return err
}
// Merge remote member list with our local list
s.MergeMemberList(remoteMemberList, s.config.NodeID)
// Update peer's last seen timestamp
s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli())
s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer")
return nil
}
// markPeerUnhealthy marks a peer as unhealthy
func (s *GossipService) markPeerUnhealthy(nodeID string) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
if member, exists := s.members[nodeID]; exists {
// Mark as last seen a long time ago to indicate unhealthy
member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago
s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy")
}
}
// updateMemberLastSeen updates member's last seen timestamp
func (s *GossipService) updateMemberLastSeen(nodeID string, timestamp int64) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
if member, exists := s.members[nodeID]; exists {
member.LastSeen = timestamp
}
}
// MergeMemberList merges remote member list with local member list
func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID string) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
now := time.Now().UnixMilli()
for _, remoteMember := range remoteMembers {
// Skip ourselves
if remoteMember.ID == selfNodeID {
continue
}
if localMember, exists := s.members[remoteMember.ID]; exists {
// Update existing member
if remoteMember.LastSeen > localMember.LastSeen {
localMember.LastSeen = remoteMember.LastSeen
}
// Keep the earlier joined timestamp
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
localMember.JoinedTimestamp = remoteMember.JoinedTimestamp
}
} else {
// Add new member
newMember := &types.Member{
ID: remoteMember.ID,
Address: remoteMember.Address,
LastSeen: remoteMember.LastSeen,
JoinedTimestamp: remoteMember.JoinedTimestamp,
}
s.members[remoteMember.ID] = newMember
s.logger.WithFields(logrus.Fields{
"node_id": remoteMember.ID,
"address": remoteMember.Address,
}).Info("Discovered new member through gossip")
}
}
// Clean up old members (not seen for more than 10 minutes)
toRemove := make([]string, 0)
for nodeID, member := range s.members {
if now-member.LastSeen > 10*60*1000 { // 10 minutes
toRemove = append(toRemove, nodeID)
}
}
for _, nodeID := range toRemove {
delete(s.members, nodeID)
s.logger.WithField("node_id", nodeID).Info("Removed stale member")
}
}
// GetJoinedTimestamp placeholder - would be implemented by the server
func (s *GossipService) GetJoinedTimestamp() int64 {
// This should be implemented by the server that uses this service
return time.Now().UnixMilli()
}