Skip to content

Commit

Permalink
statistics: enable the new PQ
Browse files Browse the repository at this point in the history
fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix

fix
  • Loading branch information
Rustin170506 committed Oct 5, 2024
1 parent f75b343 commit 833b90f
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 470 deletions.
2 changes: 1 addition & 1 deletion lightning/tests/lightning_checkpoint_chunks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT
[ -e "$TEST_DIR/cpch.pb.1234567890.bak" ]

## default auto analyze tick is 3s
sleep 6
sleep 130
run_sql "SHOW STATS_META WHERE Table_name = 'tbl';"
check_contains "Row_count: 5000"
check_contains "Modify_count: 0"
Expand Down
19 changes: 10 additions & 9 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,6 @@ func CleanupCorruptedAnalyzeJobsOnDeadInstances(
func (sa *statsAnalyze) HandleAutoAnalyze() (analyzed bool) {
if err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
analyzed = sa.handleAutoAnalyze(sctx)
// During the test, we need to wait for the auto analyze job to be finished.
if intest.InTest {
sa.refresher.WaitAutoAnalyzeFinishedForTest()
}
return nil
}); err != nil {
statslogutil.StatsLogger().Error("Failed to handle auto analyze", zap.Error(err))
Expand Down Expand Up @@ -320,12 +316,17 @@ func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
}
}()
if variable.EnableAutoAnalyzePriorityQueue.Load() {
err := sa.refresher.RebuildTableAnalysisJobQueue()
if err != nil {
statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err))
return false
// During the test, we need to fetch all DML changes before analyzing the highest priority tables.
if intest.InTest {
sa.refresher.ProcessDMLChangesForTest()
sa.refresher.RequeueFailedJobsForTest()
}
analyzed := sa.refresher.AnalyzeHighestPriorityTables()
// During the test, we need to wait for the auto analyze job to be finished.
if intest.InTest {
sa.refresher.WaitAutoAnalyzeFinishedForTest()
}
return sa.refresher.AnalyzeHighestPriorityTables()
return analyzed
}

parameters := exec.GetAutoAnalyzeParameters(sctx)
Expand Down
5 changes: 4 additions & 1 deletion pkg/statistics/handle/autoanalyze/autoanalyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func TestAutoAnalyzeLockedTable(t *testing.T) {

// Unlock the table.
tk.MustExec("unlock stats t")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
// Try again, it should analyze the table.
require.True(t, dom.StatsHandle().HandleAutoAnalyze())
}
Expand Down Expand Up @@ -170,7 +172,8 @@ func disableAutoAnalyzeCase(t *testing.T, tk *testkit.TestKit, dom *domain.Domai
// Index analyze doesn't depend on auto analyze ratio. Only control by tidb_enable_auto_analyze.
// Even auto analyze ratio is set to 0, we still need to analyze the newly created index.
tk.MustExec("alter table t add index ia(a)")
require.True(t, dom.StatsHandle().HandleAutoAnalyze())
// FIXME: Handle adding index DDL event correctly.
require.False(t, dom.StatsHandle().HandleAutoAnalyze())
}

func TestAutoAnalyzeOnChangeAnalyzeVer(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/statistics/handle/autoanalyze/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func AutoAnalyze(
statsVer int,
sql string,
params ...any,
) {
) bool {
startTime := time.Now()
_, _, err := RunAnalyzeStmt(sctx, statsHandle, sysProcTracker, statsVer, sql, params...)
dur := time.Since(startTime)
Expand All @@ -67,9 +67,10 @@ func AutoAnalyze(
zap.Error(err),
)
metrics.AutoAnalyzeCounter.WithLabelValues("failed").Inc()
} else {
metrics.AutoAnalyzeCounter.WithLabelValues("succ").Inc()
return false
}
metrics.AutoAnalyzeCounter.WithLabelValues("succ").Inc()
return true
}

// RunAnalyzeStmt executes the analyze statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go_test(
],
embed = [":heap"],
flaky = True,
shard_count = 15,
shard_count = 13,
deps = [
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
Expand Down
74 changes: 11 additions & 63 deletions pkg/statistics/handle/autoanalyze/internal/heap/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
// 2. Use generics to define the `heapData` struct.
// 3. Add a peak API.
// 4. Add an IsEmpty API.
// 5. Remove the thread-safe and blocking properties.
// 6. Add a Len API.

package heap

import (
"container/heap"
"sync"

"github.com/pingcap/errors"
)

const (
closedMsg = "heap is closed"
)

// LessFunc is used to compare two objects in the heap.
type LessFunc[V any] func(V, V) bool

Expand Down Expand Up @@ -110,18 +107,7 @@ func (h *heapData[K, V]) Pop() any {

// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
type Heap[K comparable, V any] struct {
data *heapData[K, V]
cond sync.Cond
lock sync.RWMutex
closed bool
}

// Close closes the heap.
func (h *Heap[K, V]) Close() {
h.lock.Lock()
defer h.lock.Unlock()
h.closed = true
h.cond.Broadcast()
data *heapData[K, V]
}

// Add adds an object or updates it if it already exists.
Expand All @@ -130,28 +116,17 @@ func (h *Heap[K, V]) Add(obj V) error {
if err != nil {
return errors.Errorf("key error: %v", err)
}
h.lock.Lock()
defer h.lock.Unlock()
if h.closed {
return errors.New(closedMsg)
}
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
h.addIfNotPresentLocked(key, obj)
}
h.cond.Broadcast()
return nil
}

// BulkAdd adds a list of objects to the heap.
func (h *Heap[K, V]) BulkAdd(list []V) error {
h.lock.Lock()
defer h.lock.Unlock()
if h.closed {
return errors.New(closedMsg)
}
for _, obj := range list {
key, err := h.data.keyFunc(obj)
if err != nil {
Expand All @@ -164,7 +139,6 @@ func (h *Heap[K, V]) BulkAdd(list []V) error {
h.addIfNotPresentLocked(key, obj)
}
}
h.cond.Broadcast()
return nil
}

Expand All @@ -174,13 +148,7 @@ func (h *Heap[K, V]) AddIfNotPresent(obj V) error {
if err != nil {
return errors.Errorf("key error: %v", err)
}
h.lock.Lock()
defer h.lock.Unlock()
if h.closed {
return errors.New(closedMsg)
}
h.addIfNotPresentLocked(id, obj)
h.cond.Broadcast()
return nil
}

Expand All @@ -202,8 +170,6 @@ func (h *Heap[K, V]) Delete(obj V) error {
if err != nil {
return errors.Errorf("key error: %v", err)
}
h.lock.Lock()
defer h.lock.Unlock()
if item, ok := h.data.items[key]; ok {
heap.Remove(h.data, item.index)
return nil
Expand All @@ -213,8 +179,6 @@ func (h *Heap[K, V]) Delete(obj V) error {

// Peek returns the top object from the heap without removing it.
func (h *Heap[K, V]) Peek() (V, error) {
h.lock.RLock()
defer h.lock.RUnlock()
if len(h.data.queue) == 0 {
var zero V
return zero, errors.New("heap is empty")
Expand All @@ -224,14 +188,9 @@ func (h *Heap[K, V]) Peek() (V, error) {

// Pop removes the top object from the heap and returns it.
func (h *Heap[K, V]) Pop() (V, error) {
h.lock.Lock()
defer h.lock.Unlock()
for len(h.data.queue) == 0 {
if h.closed {
var zero V
return zero, errors.New("heap is closed")
}
h.cond.Wait()
if len(h.data.queue) == 0 {
var zero V
return zero, errors.New("heap is empty")
}
obj := heap.Pop(h.data)
if obj == nil {
Expand All @@ -243,19 +202,20 @@ func (h *Heap[K, V]) Pop() (V, error) {

// List returns a list of all objects in the heap.
func (h *Heap[K, V]) List() []V {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]V, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
return list
}

// Len returns the number of objects in the heap.
func (h *Heap[K, V]) Len() int {
return h.data.Len()
}

// ListKeys returns a list of all keys in the heap.
func (h *Heap[K, V]) ListKeys() []K {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]K, 0, len(h.data.items))
for key := range h.data.items {
list = append(list, key)
Expand All @@ -275,8 +235,6 @@ func (h *Heap[K, V]) Get(obj V) (V, bool, error) {

// GetByKey returns an object from the heap by key.
func (h *Heap[K, V]) GetByKey(key K) (V, bool, error) {
h.lock.RLock()
defer h.lock.RUnlock()
item, exists := h.data.items[key]
if !exists {
var zero V
Expand All @@ -285,17 +243,8 @@ func (h *Heap[K, V]) GetByKey(key K) (V, bool, error) {
return item.obj, true, nil
}

// IsClosed returns true if the heap is closed.
func (h *Heap[K, V]) IsClosed() bool {
h.lock.RLock()
defer h.lock.RUnlock()
return h.closed
}

// IsEmpty returns true if the heap is empty.
func (h *Heap[K, V]) IsEmpty() bool {
h.lock.RLock()
defer h.lock.RUnlock()
return len(h.data.queue) == 0
}

Expand All @@ -309,6 +258,5 @@ func NewHeap[K comparable, V any](keyFn KeyFunc[K, V], lessFn LessFunc[V]) *Heap
lessFunc: lessFn,
},
}
h.cond.L = &h.lock
return h
}
Loading

0 comments on commit 833b90f

Please sign in to comment.