Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 189 additions & 6 deletions cmd/collectors/cmperf/cmmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ const (
CounterKindListString
)

type StatusCodeEnum uint8

const (
CompleteCollection = 0
PartialCollection = 1
SecondaryMetricsFile = 2
MemoryError = 3
InternalError = 4
ChildTimeoutError = 5
NetworkError = 6
MetaMismatchError = 7
CollectionTimeoutError = 8
NoAdditionalStatus = 9
)

type MetricsFileRecord struct {
Version *MetricsFileVersion
Schema *ObjectSchema
Batch *ObjectCollection
Summary *CollectionStatus
}

type CollectionStatus struct {
Statuses []StatusCode
}

type MetricsFileVersion struct {
FormatVersion uint32
}

type StatusCode struct {
Code StatusCodeEnum
Nodes []string
}

type CounterType struct {
Index uint32
Type CounterKind
Expand Down Expand Up @@ -189,8 +224,8 @@ func (c *CounterType) ListString() ([]string, bool) {
return value, ok
}

func Messages(path string) iter.Seq2[*ObjectCollection, error] {
return func(yield func(*ObjectCollection, error) bool) {
func Messages(path string) iter.Seq2[*MetricsFileRecord, error] {
return func(yield func(*MetricsFileRecord, error) bool) {
f, err := os.Open(path)

if err != nil {
Expand Down Expand Up @@ -234,16 +269,164 @@ func Messages(path string) iter.Seq2[*ObjectCollection, error] {
}
}

func readProto(src []byte) (*ObjectCollection, error) {
func readProto(data []byte) (*MetricsFileRecord, error) {
return handleMetricsFileRecord(data)
}

func handleMetricsFileRecord(data []byte) (*MetricsFileRecord, error) {
var (
fc easyproto.FieldContext
err error
)

record := MetricsFileRecord{}

for len(data) > 0 {
data, err = fc.NextField(data)
if err != nil {
return nil, err
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return nil, errors.New("failed to read MetricsFileVersion data")
}
fileVersion, err := handleMetricsFileVersion(data)
if err != nil {
return nil, err
}
record.Version = &fileVersion
case 2:
data, ok := fc.MessageData()
if !ok {
return nil, errors.New("failed to read ObjectSchema data")
}
schema, err := handleObjectSchema(data)
if err != nil {
return nil, err
}
record.Schema = &schema
case 3:
data, ok := fc.MessageData()
if !ok {
return nil, errors.New("failed to read ObjectCollection data")
}
objectCollection, err := handleObjectCollection(data)
if err != nil {
return nil, err
}
record.Batch = objectCollection
case 4:
data, ok := fc.MessageData()
if !ok {
return nil, errors.New("failed to read CollectionStatus data")
}
collectionStatus, err := handleCollectionStatus(data)
if err != nil {
return nil, err
}
record.Summary = &collectionStatus
}
}
return &record, nil
}

func handleCollectionStatus(data []byte) (CollectionStatus, error) {
var (
fc easyproto.FieldContext
collectionStatus CollectionStatus
)

for len(data) > 0 {
var err error
data, err = fc.NextField(data)
if err != nil {
return collectionStatus, fmt.Errorf("failed to read CollectionStatus data for field %d: %w", fc.FieldNum, err)
}
if fc.FieldNum == 1 {
value, ok := fc.MessageData()
if !ok {
return collectionStatus, errors.New("failed to read collection status code")
}
code, err := handleStatusCode(value)
if err != nil {
return CollectionStatus{}, err
}
collectionStatus.Statuses = append(collectionStatus.Statuses, code)
}
}

return collectionStatus, nil
}

func handleStatusCode(value []byte) (StatusCode, error) {
var (
fc easyproto.FieldContext
statusCode StatusCode
)
for len(value) > 0 {
var err error
value, err = fc.NextField(value)
if err != nil {
return statusCode, errors.New("failed to read statusCode data")
}
switch fc.FieldNum {
case 1:
value, ok := fc.Uint64()
if !ok {
return statusCode, errors.New("failed to read statusCode code value")
}
if value > math.MaxUint8 {
return statusCode, fmt.Errorf("status code exceeds uint8 %d", value)
}
statusCode.Code = StatusCodeEnum(value)
case 2:
val, ok := fc.String()
if !ok {
return statusCode, errors.New("failed to read statusCode nodes value")
}
statusCode.Nodes = append(statusCode.Nodes, val)
}
}

return statusCode, nil
}

func handleMetricsFileVersion(data []byte) (MetricsFileVersion, error) {
var (
fc easyproto.FieldContext
fileVersion MetricsFileVersion
)

for len(data) > 0 {
var err error
data, err = fc.NextField(data)
if err != nil {
return fileVersion, fmt.Errorf("failed to read metricsFileVersion data: %w", err)
}
if fc.FieldNum == 1 {
key, ok := fc.Uint32()
if !ok {
return fileVersion, errors.New("failed to read format_version value")
}
fileVersion.FormatVersion = key
}
}

return fileVersion, nil
}

func handleObjectCollection(data []byte) (*ObjectCollection, error) {
var (
fc easyproto.FieldContext
err error
)

cm := ObjectCollection{}

for len(src) > 0 {
src, err = fc.NextField(src)
for len(data) > 0 {
data, err = fc.NextField(data)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -381,7 +564,7 @@ func handleCounterSchema(value []byte) (CounterSchema, error) {
return counterSchema, errors.New("failed to read counter schema counter_y_labels")
}
counterSchema.LabelsY = append(counterSchema.LabelsY, val)
case 9:
case 8:
val, ok := fc.Uint32()
if !ok {
return counterSchema, errors.New("failed to read counter schema base_counter_index")
Expand Down
97 changes: 36 additions & 61 deletions cmd/collectors/cmperf/cmmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,58 +66,51 @@ func mustList32(t *testing.T, counter CounterType) []uint32 {
return value
}

func mustList64(t *testing.T, counter CounterType) []uint64 {
t.Helper()
value, ok := counter.List64()
if !ok {
t.Fatalf("expected []uint64 value for counter type %v", counter.Type)
}
return value
}

func TestMessages(t *testing.T) {

path := "testdata/test1.pb"
obs := make([]*ObjectCollection, 0, 2)
path := "testdata/wqd.pb"
obs := make([]*MetricsFileRecord, 0, 5)

for aMsg, err := range Messages(path) {
assert.Nil(t, err)
assert.NotNil(t, aMsg)
obs = append(obs, aMsg)
}

assert.Equal(t, len(obs), 2)

assert.Equal(t, obs[0].Timestamp, uint64(1776429464001))
assert.Equal(t, obs[0].Period, uint32(60))
assert.Equal(t, obs[0].Node, "cm-test")
assert.Equal(t, obs[0].Schema.Name, "cm-test")
assert.Equal(t, len(obs[0].Schema.CounterSchema), 2)
assert.Equal(t, obs[0].Schema.CounterSchema[0].Name, "counter-1")
assert.Equal(t, obs[0].Schema.CounterSchema[0].Index, uint32(1))
assert.Equal(t, obs[0].Schema.CounterSchema[0].Type, CookString)
assert.Equal(t, obs[0].Schema.CounterSchema[0].DimX, uint32(0))
assert.Equal(t, obs[0].Schema.CounterSchema[0].DimY, uint32(0))
assert.Equal(t, len(obs[0].Schema.CounterSchema[0].LabelsX), 0)
assert.Equal(t, len(obs[0].Schema.CounterSchema[0].LabelsY), 0)

assert.Equal(t, obs[0].Data.Name, "od-1")
assert.Equal(t, len(obs[0].Data.Instances), 2)
assert.Equal(t, obs[0].Data.Instances[0].Name, "vol1")
assert.Equal(t, obs[0].Data.Instances[0].UUID, "06b3c803-ff78-11eb-ba17-00a098e24321")
assert.Equal(t, len(obs[0].Data.Instances[0].Counters), 4)
assert.Equal(t, obs[0].Data.Instances[0].Counters[0].Index, uint32(1))
assert.Equal(t, mustUint64Value(t, obs[0].Data.Instances[0].Counters[0]), 42)
assert.False(t, obs[0].Data.Instances[0].Counters[0].IsUint32())
assert.True(t, obs[0].Data.Instances[0].Counters[0].IsUint64())
assert.False(t, obs[0].Data.Instances[0].Counters[0].IsList32())
assert.False(t, obs[0].Data.Instances[0].Counters[0].IsList64())
assert.False(t, obs[0].Data.Instances[0].Counters[0].IsListString())

assert.Equal(t, obs[0].Data.Instances[1].Counters[3].Index, 4)
assert.Equal(t, obs[0].Data.Instances[1].Counters[2].scalar, 144)

listString, b := obs[1].Data.Instances[0].Counters[3].ListString()
assert.True(t, b)
assert.Equal(t, listString, []string{"abc", "def", "ghi"})
assert.Equal(t, mustList32(t, obs[1].Data.Instances[0].Counters[4]), []uint32{100, 200, 300})

assert.Equal(t, obs[1].Schema.CounterSchema[0].LabelsX, []string{"opcode 1", "opcode 2"})
assert.Equal(t, obs[1].Schema.CounterSchema[1].Name, "ops")
assert.Equal(t, obs[1].Schema.CounterSchema[2].Name, "latency")
assert.Equal(t, obs[1].Schema.CounterSchema[2].BaseIndex, 1)
assert.Equal(t, obs[1].Schema.CounterSchema[2].Type, CookAverage)
assert.Equal(t, len(obs), 5)
Comment on lines 78 to +89

version := obs[0]
schema := obs[1]
rec1 := obs[2].Batch
rec2 := obs[3].Batch
summary := obs[4]

assert.Equal(t, version.Version.FormatVersion, 1)
assert.Equal(t, schema.Schema.Name, "workload_queue_dblade")
assert.Equal(t, schema.Schema.CounterSchema[0].Name, "instance_name")
assert.Equal(t, schema.Schema.CounterSchema[15].Name, "cache_miss_rate")
assert.Equal(t, schema.Schema.CounterSchema[15].BaseIndex, 16)

assert.Equal(t, rec1.Timestamp, 1779210300000)
assert.Equal(t, rec2.Timestamp, 1779210300000)
assert.Equal(t, rec1.Period, 60)
assert.Equal(t, rec1.Data.Name, "workload_queue_dblade")
assert.Equal(t, rec1.Data.Instances[0].Name, "<none>")
assert.Equal(t, rec1.Data.Instances[0].UUID, "<none>")
assert.True(t, rec1.Data.Instances[0].Counters[13].IsUint64())
assert.Equal(t, mustUint64Value(t, rec1.Data.Instances[0].Counters[13]), 5957)
assert.Equal(t, mustList64(t, rec1.Data.Instances[0].Counters[4]), []uint64{9190949671, 0, 0, 0})

assert.Equal(t, len(summary.Summary.Statuses), 2)
}

func TestCounterTypeAccessors(t *testing.T) {
Expand Down Expand Up @@ -187,24 +180,6 @@ func TestHandleCounterSchemaLabels(t *testing.T) {
assert.Equal(t, schema.LabelsY, []string{"read", "write"})
}

func TestReadProtoRejectsMalformedNestedMessages(t *testing.T) {
t.Run("schema", func(t *testing.T) {
payload := appendVarintField(nil, 5, 1)
msg, err := readProto(payload)
assert.Nil(t, msg)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "failed to read object schema message")
})

t.Run("data", func(t *testing.T) {
payload := appendVarintField(nil, 6, 1)
msg, err := readProto(payload)
assert.Nil(t, msg)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "failed to read object data message")
})
}

func BenchmarkCounterType(b *testing.B) {
b.Run("uint32", func(b *testing.B) {
b.ReportAllocs()
Expand Down
Binary file removed cmd/collectors/cmperf/cmmetrics/testdata/test1.pb
Binary file not shown.
Binary file added cmd/collectors/cmperf/cmmetrics/testdata/wqd.pb
Binary file not shown.
Loading