@@ -106,18 +106,17 @@ func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context,
106106 return nil
107107}
108108
109- func (s * runtimeClusterManager ) runLeaderElection (ctx context.Context , agencyClient agency. Agency , myURL string ) {
110- le := election .NewLeaderElectionCell [string ](agencyClient , masterURLKey , masterURLTTL )
109+ func (s * runtimeClusterManager ) runLeaderElection (ctx context.Context , myURL string ) {
110+ le := election .NewLeaderElectionCell [string ](masterURLKey , masterURLTTL )
111111
112- var err error
113- var delay time.Duration
112+ delay := time .Microsecond
114113 resignErrBackoff := backoff .NewExponentialBackOff ()
115114 for {
116115 timer := time .NewTimer (delay )
117116 // Wait a bit
118117 select {
119118 case <- timer .C :
120- // Delay over, just continue
119+ // Delay over, just continue
121120 case <- ctx .Done ():
122121 // We're asked to stop
123122 if ! timer .Stop () {
@@ -126,11 +125,18 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
126125 return
127126 }
128127
128+ agencyClient , err := s .createAgencyAPI ()
129+ if err != nil {
130+ delay = time .Second
131+ s .log .Debug ().Err (err ).Msgf ("could not create agency client. Retrying in %s" , delay )
132+ continue
133+ }
134+
129135 oldMasterURL := s .GetMasterURL ()
130136 if s .avoidBeingMaster {
131137 if oldMasterURL == "" {
132138 s .log .Debug ().Msg ("Initializing master URL before resigning" )
133- currMasterURL , err := le .Read (ctx )
139+ currMasterURL , err := le .Read (ctx , agencyClient )
134140 if err != nil {
135141 delay = 5 * time .Second
136142 s .log .Err (err ).Msgf ("Failed to read current value before resigning. Retrying in %s" , delay )
@@ -140,7 +146,7 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
140146 }
141147
142148 s .log .Debug ().Str ("master_url" , myURL ).Msgf ("Resigning leadership" )
143- err = le .Resign (ctx )
149+ err = le .Resign (ctx , agencyClient )
144150 if err != nil {
145151 delay = resignErrBackoff .NextBackOff ()
146152 s .log .Err (err ).Msgf ("Resigning leadership failed. Retrying in %s" , delay )
@@ -157,7 +163,7 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
157163 s .log .Debug ().
158164 Str ("master_url" , myURL ).
159165 Msg ("Updating leadership" )
160- masterURL , isMaster , delay , err = le .Update (ctx , myURL )
166+ masterURL , isMaster , delay , err = le .Update (ctx , agencyClient , myURL )
161167 if err != nil {
162168 delay = 5 * time .Second
163169 s .log .Error ().Err (err ).Msgf ("Update leader election failed. Retrying in %s" , delay )
@@ -166,16 +172,15 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
166172 if isMaster && masterURL != myURL {
167173 s .log .Error ().Msgf ("Unexpected error: this peer is a master but URL differs. Should be %s got %s" , myURL , masterURL )
168174 }
175+ if ! isMaster && masterURL == myURL {
176+ s .log .Error ().Msgf ("Unexpected error: this peer is not a master but URL in agency is mine" )
177+ }
169178
170179 s .updateMasterURL (masterURL , isMaster )
171180 }
172181}
173182
174183func (s * runtimeClusterManager ) updateMasterURL (masterURL string , isMaster bool ) {
175- s .log .Debug ().
176- Str ("new_master_url" , masterURL ).
177- Bool ("is_master" , isMaster ).
178- Msg ("Leadership updated" )
179184 newState := stateRunningSlave
180185 if isMaster {
181186 newState = stateRunningMaster
@@ -215,16 +220,11 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
215220 return
216221 }
217222
218- agencyClient , err := s .createAgencyAPI ()
219- if err != nil {
220- log .Error ().Msg ("Could not create agency API client" )
221- return
222- }
223223 ownURL := myPeer .CreateStarterURL ("/" )
224- go s .runLeaderElection (ctx , agencyClient , ownURL )
224+ go s .runLeaderElection (ctx , ownURL )
225225
226226 for {
227- var delay time.Duration
227+ delay := time .Microsecond
228228 // Loop until stopping
229229 if ctx .Err () != nil {
230230 // Stop requested
@@ -243,7 +243,7 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
243243 delay = time .Second * 15
244244 }
245245 } else {
246- // we are still leading, check again later
246+ // we are still leading or not initialized , check again later
247247 delay = time .Second * 5
248248 }
249249
0 commit comments