diff --git a/id/client.go b/id/client.go index 704f8d5f27f756f1f65d73ab138f08cc090bdf7d..99c2cedf3e86a809be30fb3e2b4ab61ab93eb5ee 100644 --- a/id/client.go +++ b/id/client.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/clients" ) const ( @@ -62,6 +60,6 @@ func (id *ClientId) Validate() error { return id.SpaceId.Validate() } -func NewClientId(c clients.Client) *ObjectId { - return &ObjectId{Descriptor: &ClientId{SpaceId: SpaceId{SpaceID: c.SpaceID}, ClientID: c.ID}} +func NewClientId(spaceID, id string) *ObjectId { + return &ObjectId{Descriptor: &ClientId{SpaceId: SpaceId{SpaceID: spaceID}, ClientID: id}} } diff --git a/id/collection.go b/id/collection.go index 6518513b59238f0377aafe495dd4cac1f43a8436..f507d66c85e35de208468243383c377f09c88259 100644 --- a/id/collection.go +++ b/id/collection.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/collections" ) const ( @@ -61,14 +59,6 @@ func (id *CollectionId) Validate() error { return id.EnvironmentId.Validate() } -func NewCollectionId(coll collections.Collection) *ObjectId { - return &ObjectId{Descriptor: &CollectionId{ - EnvironmentId: EnvironmentId{ - SpaceId: SpaceId{ - SpaceID: coll.SpaceID, - }, - EnvironmentID: coll.EnvID, - }, - CollectionID: coll.ID, - }} +func NewCollectionId(spaceID, envID, id string) *ObjectId { + return &ObjectId{Descriptor: &CollectionId{EnvironmentId: EnvironmentId{SpaceId: SpaceId{SpaceID: spaceID}, EnvironmentID: envID}, CollectionID: id}} } diff --git a/id/environment.go b/id/environment.go index 0da72195ae0e1bc9d9262e6b958f61c30c0a2f36..364529f681c2f9e4dc8c785eb338afc554ae3642 100644 --- a/id/environment.go +++ b/id/environment.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/environments" ) const ( @@ -61,11 +59,7 @@ func (id *EnvironmentId) Validate() error { } return id.SpaceId.Validate() } -func NewEnvironmentId(env environments.Environment) *ObjectId { - return &ObjectId{Descriptor: &EnvironmentId{ - SpaceId: SpaceId{ - SpaceID: env.SpaceID, - }, - EnvironmentID: env.ID, - }} + +func NewEnvironmentId(spaceID, id string) *ObjectId { + return &ObjectId{Descriptor: &EnvironmentId{SpaceId: SpaceId{SpaceID: spaceID}, EnvironmentID: id}} } diff --git a/id/item.go b/id/item.go index dbfb8e18cb5128fc0ab26a4298203a38a4af1ad6..968175a2cb4f4429199d8848bd9409828461b08a 100644 --- a/id/item.go +++ b/id/item.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/items" ) const ( @@ -64,17 +62,7 @@ func (i *ItemId) Validate() error { } return i.CollectionId.Validate() } -func NewItemId(i items.Item) *ObjectId { - return &ObjectId{Descriptor: &ItemId{ - CollectionId: CollectionId{ - EnvironmentId: EnvironmentId{ - SpaceId: SpaceId{ - SpaceID: i.SpaceID, - }, - EnvironmentID: i.EnvID, - }, - CollectionID: i.CollectionID, - }, - ItemID: i.ID, - }} + +func NewItemId(spaceID, envID, collID, id string) *ObjectId { + return &ObjectId{Descriptor: &ItemId{CollectionId: CollectionId{EnvironmentId: EnvironmentId{SpaceId: SpaceId{SpaceID: spaceID}, EnvironmentID: envID}, CollectionID: collID}, ItemID: id}} } diff --git a/id/organization.go b/id/organization.go index 03fd47fc9f51fbdb67c219c6a636f0f117d032c7..5aedfb709ef6ff5bde9fd7f06bcba49f07fbd643 100644 --- a/id/organization.go +++ b/id/organization.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/organizations" ) const ( @@ -57,6 +55,6 @@ func (id *OrganizationId) Validate() error { return nil } -func NewOrganizationId(o organizations.Organization) *ObjectId { - return &ObjectId{Descriptor: &OrganizationId{OrganizationID: o.ID}} +func NewOrganizationId(id string) *ObjectId { + return &ObjectId{Descriptor: &OrganizationId{OrganizationID: id}} } diff --git a/id/registry.go b/id/registry.go index 89b779c96f98472f391073283274014ff1075756..e457b9baf639cafe88b1d72974a1e955fd711fab 100644 --- a/id/registry.go +++ b/id/registry.go @@ -62,8 +62,6 @@ func (r *Registry) FromMap(m map[string]interface{}) (*ObjectId, error) { func (r *Registry) FromObject(v interface{}) (*ObjectId, error) { t := reflect.TypeOf(v) - fmt.Println(t.String()) - fmt.Println(r.handlers) if handler, ok := r.handlers[t]; ok { i := handler(v) if i == nil { diff --git a/id/role.go b/id/role.go index 4e507d92e6443a7bd890fd5ea0414c1fddebb3c3..d9f8f01989b5bb9cca5da38acad867356b4dea92 100644 --- a/id/role.go +++ b/id/role.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/roles" ) const ( @@ -61,11 +59,7 @@ func (id *RoleId) Validate() error { return id.SpaceId.Validate() } -func NewRoleId(r roles.Role) *ObjectId { - return &ObjectId{Descriptor: &RoleId{ - SpaceId: SpaceId{ - SpaceID: r.SpaceID, - }, - RoleID: r.ID, - }} + +func NewRoleId(spaceID, id string) *ObjectId { + return &ObjectId{Descriptor: &RoleId{SpaceId: SpaceId{SpaceID: spaceID}, RoleID: id}} } diff --git a/id/space.go b/id/space.go index a9d88bc66ba8105f00825674a2015ea16779860f..9859a4788b05fb906f6a22f230a4b88366ab9311 100644 --- a/id/space.go +++ b/id/space.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/spaces" ) const ( @@ -58,6 +56,6 @@ func (id *SpaceId) Validate() error { return nil } -func NewSpaceId(s spaces.Space) *ObjectId { - return &ObjectId{Descriptor: &SpaceId{SpaceID: s.ID}} +func NewSpaceId(id string) *ObjectId { + return &ObjectId{Descriptor: &SpaceId{SpaceID: id}} } diff --git a/id/system.go b/id/system.go index 28eb6e04f4e5e7eff20e2b4218c33f8bdcec1fc9..f8f82faaf7ace71c255c32fb37657a1d7b28296a 100644 --- a/id/system.go +++ b/id/system.go @@ -21,4 +21,4 @@ func (id *SystemId) FromParts(parts []string) error { func (id *SystemId) Map() map[string]any { return map[string]any{"type": System} } func (id *SystemId) FromMap(m map[string]any) error { return nil } func (id *SystemId) Validate() error { return nil } -func (id *SystemId) NewSystemId() *ObjectId { return &ObjectId{Descriptor: &SystemId{}} } +func NewSystemId() *ObjectId { return &ObjectId{Descriptor: &SystemId{}} } diff --git a/id/user.go b/id/user.go index 8b4100b01039f5ebd65722e2521645304df31b04..85459c7e29e94bcaf3edce684ec2d28eb8669a50 100644 --- a/id/user.go +++ b/id/user.go @@ -2,8 +2,6 @@ package id import ( "fmt" - - "git.perx.ru/perxis/perxis-go/pkg/users" ) const ( @@ -57,6 +55,6 @@ func (id *UserId) Validate() error { return nil } -func NewUserId(u users.User) *ObjectId { - return &ObjectId{Descriptor: &UserId{UserID: u.ID}} +func NewUserId(id string) *ObjectId { + return &ObjectId{Descriptor: &UserId{UserID: id}} } diff --git a/log/zap/buffered_write_syncer.go b/log/zap/buffered_write_syncer.go new file mode 100644 index 0000000000000000000000000000000000000000..b6149240b4c18f6255cc66d8da330aa9b5db318a --- /dev/null +++ b/log/zap/buffered_write_syncer.go @@ -0,0 +1,182 @@ +package zap + +import ( + "context" + "sync" + "time" + + "git.perx.ru/perxis/perxis-go/log" + "git.perx.ru/perxis/perxis-go/pkg/auth" + "git.perx.ru/perxis/perxis-go/pkg/errors" +) + +const ( + defaultMaxBufferSize = 1000 + defaultMaxSyncQueueSize = 16 + defaultFlushInterval = 5 * time.Second +) + +var SyncQueueOverflow = errors.New("sync queue overflow") + +// BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service. +// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал, +// записи отправляются в очередь для синхронизации с log.Service. +type BufferedWriteSyncer struct { + // FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию. + // + // Значение по умолчанию для этого параметра равно 5 секунд. + FlushInterval time.Duration + + // MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы. + // Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service. + // + // Значение по умолчанию для этого параметра равно 1000. + MaxBufferSize int + + // MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service. + // + // Значение по умолчанию для этого параметра равно 16. + MaxSyncQueueSize int + + // Service сервис для хранения записей + Service log.Service + + wg sync.WaitGroup + mu sync.RWMutex + buffer []*log.Entry + syncQueue chan []*log.Entry + + flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен + started bool // started указывает, был ли выполнен Start + stopped bool // stopped указывает, был ли выполнен Stop +} + +func (ws *BufferedWriteSyncer) start() { + if ws.Service == nil { + panic("service is required") + } + + if ws.FlushInterval == 0 { + ws.FlushInterval = defaultFlushInterval + } + + if ws.MaxBufferSize == 0 { + ws.MaxBufferSize = defaultMaxBufferSize + } + + if ws.MaxSyncQueueSize == 0 { + ws.MaxSyncQueueSize = defaultMaxSyncQueueSize + } + + ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) + ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) + ws.flushStop = make(chan struct{}) + + ws.wg.Add(2) + go ws.syncLoop() + go ws.flushLoop() + + ws.started = true +} + +func (ws *BufferedWriteSyncer) Stop() error { + ws.mu.Lock() + defer ws.mu.Unlock() + + if !ws.started || ws.stopped { + return nil + } + ws.stopped = true + + close(ws.flushStop) // завершаем flushLoop + + err := ws.flush() // очищаем оставшиеся записи + + close(ws.syncQueue) // завершаем syncLoop + + ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop + + return err +} + +// Write отправляет запись в буфер. +// Когда количество буферизованных записей превышает максимальный размер буфера, буферизированные записи будут отправлены на синхронизацию. +func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { + ws.mu.Lock() + defer ws.mu.Unlock() + + if !ws.started { + ws.start() + } + + // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. + if len(ws.buffer)+1 > ws.MaxBufferSize { + err := ws.flush() + if err != nil { + return err + } + } + + ws.buffer = append(ws.buffer, entry) + + return nil +} + +// Sync освобождает буфер и отправляет буферизированные записи на синхронизацию. +func (ws *BufferedWriteSyncer) Sync() error { + ws.mu.Lock() + defer ws.mu.Unlock() + + if ws.started { + return ws.flush() + } + + return nil +} + +// flush освобождает буфер и отправляет буферизированные записи на синхронизацию. +// Если очередь на синхронизацию переполнена, будет возвращена ошибка SyncQueueOverflow +// +// ВНИМАНИЕ: Не является безопасным для конкурентного вызова. +func (ws *BufferedWriteSyncer) flush() error { + if len(ws.buffer) == 0 { + return nil + } + + // Проверяем, не достигли ли мы предела размера очереди. Если это так, возвращаем ошибку. + if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize { + return SyncQueueOverflow + } + + ws.syncQueue <- ws.buffer + ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) + + return nil +} + +// flushLoop периодически отправляет буферизированные записи на синхронизацию. +func (ws *BufferedWriteSyncer) flushLoop() { + ticker := time.NewTicker(ws.FlushInterval) + defer func() { + ticker.Stop() + ws.wg.Done() + }() + + for { + select { + case <-ticker.C: + _ = ws.Sync() + case <-ws.flushStop: + return + } + } +} + +// syncLoop синхронизирует записи с log.Service. +func (ws *BufferedWriteSyncer) syncLoop() { + defer ws.wg.Done() + + for entries := range ws.syncQueue { + _ = ws.Service.Log(auth.WithSystem(context.Background()), entries) + } +} diff --git a/log/zap/buffered_write_syncer_test.go b/log/zap/buffered_write_syncer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..58e40098bb2370b7899d31fc42c2ea569ca81cf7 --- /dev/null +++ b/log/zap/buffered_write_syncer_test.go @@ -0,0 +1,143 @@ +package zap + +import ( + "sync" + "testing" + "time" + + "git.perx.ru/perxis/perxis-go/log" + logmocks "git.perx.ru/perxis/perxis-go/log/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestBufferedWriteSyncer_Write(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + entries := args.Get(1).([]*log.Entry) + require.Equal(t, 2, len(entries)) + }). + Once() + + ws := &BufferedWriteSyncer{Service: service} + + err := ws.Write(&log.Entry{Message: "first log message"}) + require.NoError(t, err) + + err = ws.Write(&log.Entry{Message: "second log message"}) + require.NoError(t, err) + + err = ws.Stop() + require.NoError(t, err) + + service.AssertExpectations(t) +} + +func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + entries := args.Get(1).([]*log.Entry) + require.Equal(t, 100, len(entries)) + }). + Once() + + ws := &BufferedWriteSyncer{Service: service} + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + }(&wg) + } + + wg.Wait() + + err := ws.Stop() + require.NoError(t, err) + + service.AssertExpectations(t) +} + +func TestBufferedWriteSyncer_Flush(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + entries := args.Get(1).([]*log.Entry) + require.Equal(t, 10, len(entries)) + }). + Times(10) + + ws := &BufferedWriteSyncer{Service: service} + + for i := 0; i < 10; i++ { + for j := 0; j < 10; j++ { + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + } + err := ws.Sync() + require.NoError(t, err) + } + + err := ws.Stop() + require.NoError(t, err) + + service.AssertExpectations(t) +} + +func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + entries := args.Get(1).([]*log.Entry) + assert.Equal(t, 10, len(entries)) + }). + Times(10) + + ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10} + + for i := 0; i < 100; i++ { + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + } + + err := ws.Stop() + require.NoError(t, err) + + service.AssertExpectations(t) +} + +func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + entries := args.Get(1).([]*log.Entry) + assert.Equal(t, 10, len(entries)) + }). + Once() + + ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second} + + for j := 0; j < 10; j++ { + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + } + + time.Sleep(3 * time.Second) // ждем, пока сработает интервал + + err := ws.Stop() + require.NoError(t, err) + + service.AssertExpectations(t) +} diff --git a/log/zap/core.go b/log/zap/core.go new file mode 100644 index 0000000000000000000000000000000000000000..f2b220eab78a6cd2416408385b521c743bcefb94 --- /dev/null +++ b/log/zap/core.go @@ -0,0 +1,88 @@ +package zap + +import ( + oid "git.perx.ru/perxis/perxis-go/id" + "git.perx.ru/perxis/perxis-go/log" + "git.perx.ru/perxis/perxis-go/pkg/id" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// WriteSyncer отвечает за хранение и синхронизацию log.Entry +type WriteSyncer interface { + Write(entry *log.Entry) error + Sync() error +} + +// Core кодирует zapcore.Entry в log.Entry и отправляет их в WriteSyncer +type Core struct { + zapcore.LevelEnabler + + writeSyncer WriteSyncer + fields []zap.Field +} + +func NewCore(writeSyncer WriteSyncer) *Core { + return &Core{ + LevelEnabler: zapcore.InfoLevel, + writeSyncer: writeSyncer, + } +} + +func (core *Core) With(fields []zapcore.Field) zapcore.Core { + return &Core{ + LevelEnabler: core.LevelEnabler, + writeSyncer: core.writeSyncer, + fields: append(core.fields, fields...), + } +} + +func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if core.Enabled(entry.Level) { + return checkedEntry.AddCore(entry, core) + } + return checkedEntry +} + +func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { + return core.writeSyncer.Write(core.getEntry(entry, fields)) +} + +func (core *Core) Sync() error { + return core.writeSyncer.Sync() +} + +func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { + if len(core.fields) > 0 { + fields = append(fields, core.fields...) + } + + enc := zapcore.NewMapObjectEncoder() + for _, field := range fields { + field.AddTo(enc) + } + + ent := &log.Entry{ + ID: id.GenerateNewID(), + Timestamp: entry.Time, + LogLevel: log.Level(entry.Level), + Message: entry.Message, + } + + ent.Category, _ = enc.Fields["category"].(string) + ent.Component, _ = enc.Fields["component"].(string) + ent.Event, _ = enc.Fields["event"].(string) + ent.ObjectID, _ = enc.Fields["object"].(*oid.ObjectId) + ent.CallerID, _ = enc.Fields["caller"].(*oid.ObjectId) + ent.Attr = enc.Fields["attr"] + + if tags, ok := enc.Fields["tags"].([]any); ok { + for _, item := range tags { + if tag, ok := item.(string); ok { + ent.Tags = append(ent.Tags, tag) + } + } + } + + return ent +} diff --git a/log/zap/core_test.go b/log/zap/core_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f78916cb9faeb197179cebbaa608735d86ceddee --- /dev/null +++ b/log/zap/core_test.go @@ -0,0 +1,64 @@ +package zap + +import ( + "testing" + + "git.perx.ru/perxis/perxis-go/id" + "git.perx.ru/perxis/perxis-go/log" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestCore_getEntry(t *testing.T) { + core := NewCore(nil) + + tests := []struct { + name string + input struct { + entry zapcore.Entry + fields []zapcore.Field + } + want *log.Entry + }{ + { + name: "simple", + input: struct { + entry zapcore.Entry + fields []zapcore.Field + }{ + entry: zapcore.Entry{Level: zapcore.InfoLevel, Message: "создан элемент коллекции"}, + fields: []zapcore.Field{ + zap.String("key", "val"), // будет проигнорировано + Category("create"), + Component("Items.Service"), + Event("Items.Create"), + Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), + Caller("/users/PHVz"), + Attr("any"), + Tags("tag1", "tag2", "tag3"), + }, + }, + want: &log.Entry{ + LogLevel: log.Level(zapcore.InfoLevel), + Message: "создан элемент коллекции", + Category: "create", + Component: "Items.Service", + Event: "Items.Create", + ObjectID: id.MustObjectId("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), + CallerID: id.MustObjectId("/users/PHVz"), + Attr: "any", + Tags: []string{"tag1", "tag2", "tag3"}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := core.getEntry(tc.input.entry, tc.input.fields) + got.ID = tc.want.ID // игнорируем ID + got.Timestamp = tc.want.Timestamp // игнорируем Timestamp + require.Equal(t, tc.want, got) + }) + } +} diff --git a/log/zap/example_test.go b/log/zap/example_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d99b83c23b4ae05eaa091d2c8e96e1820b90d6fb --- /dev/null +++ b/log/zap/example_test.go @@ -0,0 +1,96 @@ +package zap + +import ( + "context" + "reflect" + "slices" + "testing" + + "git.perx.ru/perxis/perxis-go/id" + "git.perx.ru/perxis/perxis-go/log" + logmocks "git.perx.ru/perxis/perxis-go/log/mocks" + "git.perx.ru/perxis/perxis-go/pkg/auth" + "git.perx.ru/perxis/perxis-go/pkg/items" + "git.perx.ru/perxis/perxis-go/pkg/users" + usersmocks "git.perx.ru/perxis/perxis-go/pkg/users/mocks" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestExample(t *testing.T) { + item := items.NewItem("WPNN", "9VGP", "GxNv", "W0fl", nil, nil) + user := &users.User{ID: "294de355"} + + wantEntries := []*log.Entry{ + { + LogLevel: log.Level(zapcore.InfoLevel), + Message: "Successfully created", + Component: "Items", + Event: items.EventCreateItem, + ObjectID: id.MustObjectId(item), + CallerID: id.MustObjectId(user), + Tags: []string{"tag1", "tag2", "tag3"}, + }, + { + LogLevel: log.Level(zapcore.WarnLevel), + Message: "Successfully updated", + Component: "Items", + Event: items.EventUpdateItem, + ObjectID: id.MustObjectId(item), + CallerID: id.MustObjectId(user), + Attr: map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}, + }, + } + + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + entries := args.Get(1).([]*log.Entry) + require.True(t, slices.EqualFunc(wantEntries, entries, func(wantEntry, gotEntry *log.Entry) bool { + require.NotEmpty(t, gotEntry.ID) + require.NotEmpty(t, gotEntry.Timestamp) + gotEntry.ID = wantEntry.ID // игнорируем ID + gotEntry.Timestamp = wantEntry.Timestamp // игнорируем Timestamp + return reflect.DeepEqual(wantEntry, gotEntry) + })) + }). + Once() + + usersService := &usersmocks.Users{} + usersService.On("GetByIdentity", mock.Anything, "74d90aaf").Return(user, nil).Once() + + factory := auth.PrincipalFactory{Users: usersService} + + ws := &BufferedWriteSyncer{Service: service} + logger := zap.New(NewCore(ws)) + + // Пример отправки логов для сервиса Items + { + logger := logger.With(Component("Items")) + ctx := auth.WithPrincipal(context.Background(), factory.User("74d90aaf")) + + // Отправка лога при создании item + logger.Info("Successfully created", + Event(items.EventCreateItem), + Object(item), + CallerFromContext(ctx), + Tags("tag1", "tag2", "tag3"), + ) + + // Отправка лога при обновлении item + logger.Warn("Successfully updated", + Event(items.EventUpdateItem), + Object(item), + CallerFromContext(ctx), + Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}), + ) + } + + err := ws.Stop() + require.NoError(t, err) + + service.AssertExpectations(t) +} diff --git a/log/zap/field.go b/log/zap/field.go index e0de88be61059c325821ff68afe0bcbc7b88b6ba..acc6932d5fab8b2975d09998557e6ed41197010d 100644 --- a/log/zap/field.go +++ b/log/zap/field.go @@ -2,78 +2,72 @@ package zap import ( "context" - "fmt" "git.perx.ru/perxis/perxis-go/id" + _ "git.perx.ru/perxis/perxis-go/id/system" // регистрируем обработчики для системных объектов "git.perx.ru/perxis/perxis-go/pkg/auth" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -const ( - unknownObject = "unknown" - unknownCaller = "unknown" -) - -func Category(category string) zapcore.Field { +func Category(category string) zap.Field { + if category == "" { + return zap.Skip() + } return zap.String("category", category) } -func Component(component string) zapcore.Field { +func Component(component string) zap.Field { + if component == "" { + return zap.Skip() + } return zap.String("component", component) } -func Event(event string) zapcore.Field { +func Event(event string) zap.Field { + if event == "" { + return zap.Skip() + } return zap.String("event", event) } -// Object возвращает поле и устанавливает передаваемый аргумент в качестве идентификатора объекта в формате ObjectID. -// Поддерживаемые типы: string, fmt.Stringer. -// Если передан аргумент другого типа, будет произведена попытка привести переданное значение к ObjectID. -func Object(v any) zapcore.Field { - var object = unknownObject - switch value := v.(type) { - case string: - object = value - case fmt.Stringer: - object = value.String() - default: - oid, err := id.FromObject(v) - if err == nil { - object = oid.String() - } +// Object возвращает поле и устанавливает передаваемый аргумент в качестве идентификатора объекта в формате ObjectId. +// Поддерживает типы в формате ObjectId: id.Descriptor, string, map[string]any, системные объекты. +func Object(v any) zap.Field { + oid, err := id.NewObjectId(v) + if err != nil { + return zap.Skip() } - return zap.String("object", object) + return zap.Reflect("object", oid) } -// Caller возвращает поле и устанавливает передаваемый аргумент в качестве "вызывающего" в формате ObjectID. -// Поддерживаемые типы: string, fmt.Stringer. -// Если передан аргумент другого типа, будет произведена попытка привести переданное значение к ObjectID. -func Caller(v any) zapcore.Field { - var caller = unknownCaller - switch value := v.(type) { - case string: - caller = value - case fmt.Stringer: - caller = value.String() - default: - oid, err := id.FromObject(v) - if err == nil { - caller = oid.String() - } +// Caller возвращает поле и устанавливает передаваемый аргумент в качестве "вызывающего" в формате ObjectId. +// Поддерживает типы в формате ObjectId: id.Descriptor, string, map[string]any, системные объекты. +func Caller(v any) zap.Field { + oid, err := id.NewObjectId(v) + if err != nil { + return zap.Skip() } - return zap.String("caller", caller) + return zap.Reflect("caller", oid) } -// CallerFromContext извлекает auth.Principal из контекста и устанавливает его в качестве "вызывающего" в формате ObjectID. -func CallerFromContext(ctx context.Context) zapcore.Field { +// CallerFromContext извлекает auth.Principal из контекста и устанавливает его в качестве "вызывающего" в формате Object. +func CallerFromContext(ctx context.Context) zap.Field { + if ctx == nil { + return zap.Skip() + } return Caller(auth.GetPrincipal(ctx)) } -func Attr(attr any) zapcore.Field { +func Attr(attr any) zap.Field { + if attr == nil { + return zap.Skip() + } return zap.Any("attr", attr) } -func Tags(tags ...string) zapcore.Field { +func Tags(tags ...string) zap.Field { + if len(tags) == 0 { + return zap.Skip() + } return zap.Strings("tags", tags) } diff --git a/log/zap/field_test.go b/log/zap/field_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ead5af2386b5606a3861dc563ba6e49a6ed43f7b --- /dev/null +++ b/log/zap/field_test.go @@ -0,0 +1,185 @@ +package zap + +import ( + "context" + "testing" + + "git.perx.ru/perxis/perxis-go/id" + "git.perx.ru/perxis/perxis-go/pkg/auth" + "git.perx.ru/perxis/perxis-go/pkg/items" + "git.perx.ru/perxis/perxis-go/pkg/users" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestCategory(t *testing.T) { + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "ok", field: Category("update"), want: zap.String("category", "update")}, + {name: "invalid", field: Category(""), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.True(t, tc.want.Equals(tc.field)) + }) + } +} + +func TestComponent(t *testing.T) { + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "ok", field: Component("Items"), want: zap.String("component", "Items")}, + {name: "invalid", field: Component(""), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.True(t, tc.want.Equals(tc.field)) + }) + } +} + +func TestEvent(t *testing.T) { + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "ok", field: Event("items.create"), want: zap.String("event", "items.create")}, + {name: "invalid", field: Event(""), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.True(t, tc.want.Equals(tc.field)) + }) + } +} + +func TestObjectID(t *testing.T) { + item := &items.Item{ + ID: "c4ca4238a0b923820dcc509a6f75849b", + SpaceID: "c81e728d9d4c2f636f067f89cc14862c", + EnvID: "eccbc87e4b5ce2fe28308fd9f2a7baf3", + CollectionID: "a87ff679a2f3e71d9181a67b7542122c", + } + + oid := id.MustObjectId(item) + itemId := id.NewItemId(item.SpaceID, item.EnvID, item.CollectionID, item.ID) + + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "system object", field: Object(item), want: zap.Reflect("object", oid)}, + {name: "object id", field: Object(itemId), want: zap.Reflect("object", oid)}, + {name: "string", field: Object(oid.String()), want: zap.Reflect("object", oid)}, + {name: "invalid", field: Object(nil), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.want.Equals(zap.Skip()) { + assert.True(t, tc.want.Equals(tc.field)) + return + } + assert.Equal(t, tc.want.Interface.(id.Descriptor).String(), tc.field.Interface.(id.Descriptor).String()) + }) + } +} + +func TestCallerID(t *testing.T) { + user := &users.User{ + ID: "c4ca4238a0b923820dcc509a6f75849b", + } + + oid := id.MustObjectId(user) + userId := id.NewUserId(user.ID) + + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "system object", field: Caller(user), want: zap.Reflect("caller", oid)}, + {name: "object id", field: Caller(userId), want: zap.Reflect("caller", oid)}, + {name: "string", field: Caller(oid.String()), want: zap.Reflect("caller", oid)}, + {name: "invalid", field: Caller(nil), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.want.Equals(zap.Skip()) { + assert.True(t, tc.want.Equals(tc.field)) + return + } + assert.Equal(t, tc.want.Interface.(id.Descriptor).String(), tc.field.Interface.(id.Descriptor).String()) + }) + } +} + +func TestCallerIDFromContext(t *testing.T) { + ctx := auth.WithSystem(context.Background()) + oid := id.MustObjectId(auth.GetPrincipal(ctx)) + + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "ok", field: CallerFromContext(ctx), want: zap.Reflect("caller", oid)}, + {name: "invalid", field: CallerFromContext(context.TODO()), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.want.Equals(zap.Skip()) { + assert.True(t, tc.want.Equals(tc.field)) + return + } + assert.Equal(t, tc.want.Interface.(id.Descriptor).String(), tc.field.Interface.(id.Descriptor).String()) + }) + } +} + +func TestAttr(t *testing.T) { + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "ok", field: Attr(map[string]string{"a": "b"}), want: zap.Reflect("attr", map[string]string{"a": "b"})}, + {name: "invalid", field: Attr(nil), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.True(t, tc.want.Equals(tc.field)) + }) + } +} + +func TestTags(t *testing.T) { + tests := []struct { + name string + field zap.Field + want zap.Field + }{ + {name: "ok", field: Tags("a", "b", "c"), want: zap.Strings("tags", []string{"a", "b", "c"})}, + {name: "invalid", field: Tags(nil...), want: zap.Skip()}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.True(t, tc.want.Equals(tc.field)) + }) + } +} diff --git a/log/zap/filter_core.go b/log/zap/filter_core.go new file mode 100644 index 0000000000000000000000000000000000000000..1c6099e6baebacfed5d6bc21e844673d42d42534 --- /dev/null +++ b/log/zap/filter_core.go @@ -0,0 +1,84 @@ +package zap + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type FilterFunc func(entry zapcore.Entry, fields []zapcore.Field) bool + +func HasField(field zapcore.Field) FilterFunc { + return func(_ zapcore.Entry, fields []zapcore.Field) bool { + for _, f := range fields { + if f.Equals(field) { + return true + } + } + return false + } +} + +func NotHasField(field zapcore.Field) FilterFunc { + return func(entry zapcore.Entry, fields []zapcore.Field) bool { + return !HasField(field)(entry, fields) + } +} + +type filterCore struct { + zapcore.Core + + filters []FilterFunc + + // fields хранит контекст записей ядра, передаваемых при вызове With. + // В методе Write передаются только поля конкретной записи, но мы также хотим учитывать поля контекста ядра. + fields []zap.Field +} + +// RegisterFilters - добавить фильтры, которые будут применяться при записи лога (вызове `core.Write`) +// Метод `core.Write` будет вызван только в случае, когда результат всех фильтров `true` +// +// Обратить внимание, фильтр не применяется к полям, которые были добавлены в `core` через вызов `core.With` +// до вызова RegisterFilters. Пример: +// +// l, _ := zap.NewDevelopment() +// core := l.Core().With([]zapcore.Field{zap.Int("a", 5)}) +// core = RegisterFilters(core, HasField(zap.Int("a", 5))) +// +// logger := zap.New(core) +// logger.Info("Test log") // НЕ будет записан +// logger.Info("Test log", zap.Int("a", 5)) // будет записан +func RegisterFilters(core zapcore.Core, filters ...FilterFunc) zapcore.Core { + return &filterCore{ + Core: core, + filters: filters, + } +} + +func (core *filterCore) With(fields []zapcore.Field) zapcore.Core { + return &filterCore{ + Core: core.Core.With(fields), + filters: core.filters, + fields: append(core.fields, fields...), + } +} + +func (core *filterCore) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if core.Core.Enabled(entry.Level) { + return checkedEntry.AddCore(entry, core) + } + return checkedEntry +} + +func (core *filterCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { + if len(core.fields) > 0 { + fields = append(core.fields, fields...) + } + + for _, filter := range core.filters { + if !filter(entry, fields) { + return nil + } + } + + return core.Core.Write(entry, fields) +} diff --git a/log/zap/filter_core_test.go b/log/zap/filter_core_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2cf2cf01561ce7ee84a99ed3a420bc6a71026275 --- /dev/null +++ b/log/zap/filter_core_test.go @@ -0,0 +1,48 @@ +package zap + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" +) + +func TestFilterCore_Write(t *testing.T) { + core, logs := observer.New(zapcore.InfoLevel) + core = RegisterFilters(core, HasField(zap.Bool("check", true))) + + err := core.With([]zapcore.Field{zap.Bool("check", true)}).Write(zapcore.Entry{Message: "msg"}, nil) + require.NoError(t, err) + + err = core.Write(zapcore.Entry{Message: "msg"}, []zapcore.Field{zap.Bool("check", true)}) + require.NoError(t, err) + + err = core.Write(zapcore.Entry{Message: "msg"}, nil) + require.NoError(t, err) + + require.Equal(t, 2, logs.Len()) +} + +func TestNotHasField(t *testing.T) { + core, logs := observer.New(zapcore.InfoLevel) + core = RegisterFilters(core, NotHasField(zap.Int("b", 2))) + + err := core.Write(zapcore.Entry{Message: "msg"}, []zapcore.Field{ + zap.Int("a", 1), + zap.Int("b", 2), + }) + require.NoError(t, err) + + err = core.Write(zapcore.Entry{Message: "msg"}, []zapcore.Field{ + zap.Int("a", 1), + zap.Int("b", 3), + }) + require.NoError(t, err) + + err = core.Write(zapcore.Entry{Message: "msg"}, []zapcore.Field{}) + require.NoError(t, err) + + require.Equal(t, 2, logs.Len()) +}