secure-cluster-communication #14
@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/members/join", seedAddr)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).Error("Failed to create join request")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"seed": seedAddr,
|
"seed": seedAddr,
|
||||||
|
@@ -124,7 +124,10 @@ EOF
|
|||||||
# Test 3: Cluster formation
|
# Test 3: Cluster formation
|
||||||
test_cluster_formation() {
|
test_cluster_formation() {
|
||||||
test_start "2-node cluster formation and Merkle Tree replication"
|
test_start "2-node cluster formation and Merkle Tree replication"
|
||||||
|
|
||||||
|
# Shared cluster secret for authentication (Issue #13)
|
||||||
|
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
|
||||||
|
|
||||||
# Node 1 config
|
# Node 1 config
|
||||||
cat > cluster1.yaml <<EOF
|
cat > cluster1.yaml <<EOF
|
||||||
node_id: "cluster-1"
|
node_id: "cluster-1"
|
||||||
@@ -138,8 +141,9 @@ gossip_interval_max: 10
|
|||||||
sync_interval: 10
|
sync_interval: 10
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Node 2 config
|
# Node 2 config
|
||||||
cat > cluster2.yaml <<EOF
|
cat > cluster2.yaml <<EOF
|
||||||
node_id: "cluster-2"
|
node_id: "cluster-2"
|
||||||
@@ -153,6 +157,7 @@ gossip_interval_max: 10
|
|||||||
sync_interval: 10
|
sync_interval: 10
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
@@ -230,15 +235,18 @@ EOF
|
|||||||
# but same path. The Merkle tree sync should then trigger conflict resolution.
|
# but same path. The Merkle tree sync should then trigger conflict resolution.
|
||||||
test_conflict_resolution() {
|
test_conflict_resolution() {
|
||||||
test_start "Conflict resolution test (Merkle Tree based)"
|
test_start "Conflict resolution test (Merkle Tree based)"
|
||||||
|
|
||||||
# Create conflicting data using our utility
|
# Create conflicting data using our utility
|
||||||
rm -rf conflict1_data conflict2_data 2>/dev/null || true
|
rm -rf conflict1_data conflict2_data 2>/dev/null || true
|
||||||
mkdir -p conflict1_data conflict2_data
|
mkdir -p conflict1_data conflict2_data
|
||||||
|
|
||||||
cd "$SCRIPT_DIR"
|
cd "$SCRIPT_DIR"
|
||||||
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
||||||
cd "$TEST_DIR"
|
cd "$TEST_DIR"
|
||||||
|
|
||||||
|
# Shared cluster secret for authentication (Issue #13)
|
||||||
|
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
|
||||||
|
|
||||||
# Create configs
|
# Create configs
|
||||||
cat > conflict1.yaml <<EOF
|
cat > conflict1.yaml <<EOF
|
||||||
node_id: "conflict-1"
|
node_id: "conflict-1"
|
||||||
@@ -250,8 +258,9 @@ log_level: "info"
|
|||||||
sync_interval: 3
|
sync_interval: 3
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
cat > conflict2.yaml <<EOF
|
cat > conflict2.yaml <<EOF
|
||||||
node_id: "conflict-2"
|
node_id: "conflict-2"
|
||||||
bind_address: "127.0.0.1"
|
bind_address: "127.0.0.1"
|
||||||
@@ -262,6 +271,7 @@ log_level: "info"
|
|||||||
sync_interval: 3
|
sync_interval: 3
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
|
@@ -43,9 +43,11 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
|
|
||||||
// Member endpoints (available when clustering is enabled)
|
// Member endpoints (available when clustering is enabled)
|
||||||
if s.config.ClusteringEnabled {
|
if s.config.ClusteringEnabled {
|
||||||
// Apply cluster authentication middleware if cluster secret is configured
|
// GET /members/ is unprotected for monitoring/inspection
|
||||||
|
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||||
|
|
||||||
|
// Apply cluster authentication middleware to all cluster communication endpoints
|
||||||
if s.clusterAuthService != nil {
|
if s.clusterAuthService != nil {
|
||||||
router.Handle("/members/", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMembersHandler))).Methods("GET")
|
|
||||||
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
|
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
|
||||||
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
|
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
|
||||||
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
|
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
|
||||||
@@ -57,7 +59,6 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
|
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
|
||||||
} else {
|
} else {
|
||||||
// Fallback to unprotected endpoints (for backwards compatibility)
|
// Fallback to unprotected endpoints (for backwards compatibility)
|
||||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
|
||||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||||
|
Reference in New Issue
Block a user