client, err := clientv3.New(clientv3.Config{ DialTimeout: 15 * time.Second, DialKeepAliveTime: 15 * time.Second, DialKeepAliveTimeout: 15 * time.Second, DialOptions: []grpc.DialOption{ grpc.WithBlock(), // block until the underlying connection is up // use chained interceptors so that the default (retry and backoff) interceptors are added. // otherwise they will be overwritten by the metric interceptor. // // these optional interceptors will be placed after the default ones. // which seems to be what we want as the metrics will be collected on each attempt (retry) grpc.WithChainUnaryInterceptor(grpcprom.UnaryClientInterceptor), grpc.WithChainStreamInterceptor(grpcprom.StreamClientInterceptor), }, Endpoints: []string{"https://127.0.0.1:2379"}, TLS: tlsConfig, }) if err != nil { panic(err) }
// Watch posts a watch request to run() and waits for a new watcher channel func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType if ow.filterPut { filters = append(filters, pb.WatchCreateRequest_NOPUT) } if ow.filterDelete { filters = append(filters, pb.WatchCreateRequest_NODELETE) }
func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { if w.minRev > curRev { // after network partition, possibly choosing future revision watcher from restore operation // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2" // do not panic when such watcher had been moved from "synced" watcher during restore operation if !w.restore { panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev)) }
// mark 'restore' done, since it's chosen w.restore = false } if w.minRev < compactRev { select { case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}: w.compacted = true wg.delete(w) default: // retry next time } continue } if minRev > w.minRev { minRev = w.minRev } } return minRev }
// in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs curRev := s.store.currentRev compactionRev := s.store.compactMainRev
// UnsafeRange returns keys and values. And in boltdb, keys are revisions. // values are actual key-value pairs in backend. tx := s.store.b.ReadTx() tx.RLock() revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0) evs := kvsToEvents(s.store.lg, wg, revs, vs) // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. // We can only unlock after Unmarshal, which will do deep copy. // Otherwise we will trigger SIGSEGV during boltdb re-mmap. tx.RUnlock()
var victims watcherBatch wb := newWatcherBatch(wg, evs) for w := range wg.watchers { w.minRev = curRev + 1