diff --git a/pkg/operation/operation.go b/pkg/operation/operation.go index dfef3b99e4744d1713fc3b0a8d06bb14aedfebb5..8b15ecc064f8d42c30e6f6c60d17275ac68486a8 100644 --- a/pkg/operation/operation.go +++ b/pkg/operation/operation.go @@ -2,6 +2,7 @@ package operation import ( "context" + "fmt" "time" "git.perx.ru/perxis/perxis-go/pkg/errors" @@ -53,18 +54,27 @@ func Wrap(client Client, proto *Proto) *Operation { } } +// String возвращает строковое представление операции +func (o *Operation) String() string { + return fmt.Sprintf("Operation #%s (%s, %s) Done:%t", o.Id(), o.CreatedAt(), o.CreatedBy(), o.IsDone()) +} + +// Id возвращает идентификатор операции func (o *Operation) Id() string { - return o.proto.GetError() + return o.proto.GetId() } +// Proto возвращает `common.Operation` func (o *Operation) Proto() *Proto { return o.proto } +// Description возвращает описание операции func (o *Operation) Description() string { return o.proto.GetDescription() } +// CreatedAt возвращает время создания операции func (o *Operation) CreatedAt() time.Time { c := o.proto.GetCreatedAt() if c != nil { @@ -73,10 +83,12 @@ func (o *Operation) CreatedAt() time.Time { return time.Time{} } +// IsDone возвращает true если операция завершена func (o *Operation) IsDone() bool { return o.proto.GetDone() } +// Response возвращает результат операции func (o *Operation) Response() (proto.Message, error) { r := o.proto.GetResponse() if r == nil { @@ -85,14 +97,22 @@ func (o *Operation) Response() (proto.Message, error) { return r.UnmarshalNew() } +// CreatedBy возвращает имя пользователя создавшего операцию +func (o *Operation) CreatedBy() string { + return o.proto.GetCreatedBy() +} + +// SetCreatedBy устанавливает описание операции func (o *Operation) SetCreatedBy(createdBy string) { o.proto.CreatedBy = createdBy } +// SetCreatedAt устанавливает время создания операции func (o *Operation) SetCreatedAt(t time.Time) { o.proto.CreatedAt = timestamppb.New(t) } +// SetResponse устанавливает результат операции func (o *Operation) SetResponse(m proto.Message) error { resp, err := anypb.New(m) if err != nil { @@ -104,11 +124,18 @@ func (o *Operation) SetResponse(m proto.Message) error { return nil } +// SetCancelFunc устанавливает функцию отмены операции +func (o *Operation) SetCancelFunc(cancelFunc func(ctx context.Context)) { + o.cancelFunc = cancelFunc +} + +// SetError устанавливает ошибку операции func (o *Operation) SetError(err error) { o.proto.Result = &common.Operation_Error{Error: err.Error()} o.proto.Done = true } +// Error возвращает ошибку операции func (o *Operation) Error() error { if errStr := o.proto.GetError(); errStr != "" { return errors.New(errStr) @@ -116,30 +143,32 @@ func (o *Operation) Error() error { return nil } +// SetRetention устанавливает время хранения операции func (o *Operation) SetRetention(retention time.Duration) { o.retention = retention } +// IsExpired возвращает true если операция просрочена func (o *Operation) IsExpired() bool { createdAt := o.CreatedAt() - if !createdAt.IsZero() && time.Since(createdAt) > o.retention { + if o.retention != 0 && !createdAt.IsZero() && time.Since(createdAt) > o.retention { return true } return false } +// Cancel отменяет операцию func (o *Operation) Cancel() error { - if o.client != nil { - req := &common.CancelOperationRequest{OperationId: o.Id()} - _, err := o.client.Cancel(context.Background(), req) + req := &common.CancelOperationRequest{OperationId: o.Id()} + op, err := o.client.Cancel(context.Background(), req) + if err != nil { return err } - - if o.cancelFunc != nil { - o.cancelFunc(context.Background()) - } + o.proto = op + return nil } +// Poll запрашивает состояние операции func (o *Operation) Poll(ctx context.Context, opts ...grpc.CallOption) error { req := &common.GetOperationRequest{OperationId: o.Id()} op, err := o.client.Get(ctx, req, opts...) @@ -150,10 +179,12 @@ func (o *Operation) Poll(ctx context.Context, opts ...grpc.CallOption) error { return nil } +// Wait ожидает завершения операции func (o *Operation) Wait(ctx context.Context, opts ...grpc.CallOption) error { return o.waitInterval(ctx, DefaultPollInterval, opts...) } +// waitInterval ожидает завершения операции с указанным интервалом опроса func (o *Operation) waitInterval(ctx context.Context, pollInterval time.Duration, opts ...grpc.CallOption) error { for !o.IsDone() { err := o.Poll(ctx, opts...) diff --git a/pkg/operation/operation_test.go b/pkg/operation/operation_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6fea95d54f56f03c9dddd91e0797ac0346242acf --- /dev/null +++ b/pkg/operation/operation_test.go @@ -0,0 +1,25 @@ +package operation + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOperation(t *testing.T) { + op := New(&Proto{}) + assert.NotNil(t, op) + assert.NotEmpty(t, op.Id()) + assert.False(t, op.IsExpired()) + assert.False(t, op.IsDone()) + assert.Less(t, time.Since(op.CreatedAt()), time.Second) + + op.SetRetention(time.Nanosecond) + assert.True(t, op.IsExpired()) + + op.SetCancelFunc(func(ctx context.Context) { + // do nothing + }) +} diff --git a/pkg/operation/service.go b/pkg/operation/service.go index e053c98b12f708387e4b04b6d4c82f60998962ca..d4daada31b71a5af8c4a9f5c5f97298102c88168 100644 --- a/pkg/operation/service.go +++ b/pkg/operation/service.go @@ -13,27 +13,29 @@ const ( ) type Service interface { + Set(ctx context.Context, op *Operation) error Get(ctx context.Context, id string) (*Operation, error) Cancel(ctx context.Context, id string) (*Operation, error) + Cleanup() } -type OperationService struct { +type service struct { ops map[string]*Operation mu sync.RWMutex lastCleanup time.Time cleanupPeriod time.Duration } -func NewService(retention, cleanupPeriod time.Duration) *OperationService { - return &OperationService{ +func NewService(retention, cleanupPeriod time.Duration) Service { + return &service{ ops: make(map[string]*Operation), mu: sync.RWMutex{}, cleanupPeriod: time.Minute, } } -func NewDefaultService() *OperationService { - return &OperationService{ +func NewDefaultService() Service { + return &service{ ops: make(map[string]*Operation), mu: sync.RWMutex{}, lastCleanup: time.Now(), @@ -41,7 +43,7 @@ func NewDefaultService() *OperationService { } } -func (s *OperationService) Cleanup() { +func (s *service) Cleanup() { s.mu.Lock() defer s.mu.Unlock() for id, op := range s.ops { @@ -49,16 +51,17 @@ func (s *OperationService) Cleanup() { delete(s.ops, id) } } - s.lastCleanup = time.Now() } -func (s *OperationService) CleanupIfNeeded() { +func (s *service) CleanupIfNeeded() { if time.Since(s.lastCleanup) > s.cleanupPeriod { s.Cleanup() + s.lastCleanup = time.Now() } } -func (s *OperationService) Set(op *Operation) error { +// Set сохраняет операцию в хранилище +func (s *service) Set(_ context.Context, op *Operation) error { if op == nil || op.Id() == "" { return errors.New("invalid operation") } @@ -76,18 +79,8 @@ func (s *OperationService) Set(op *Operation) error { return nil } -func (s *OperationService) New(op *Operation) error { - if op == nil || op.Id() == "" { - return errors.New("invalid operation") - } - - s.mu.Lock() - defer s.mu.Unlock() - s.ops[op.Id()] = op - return nil -} - -func (s *OperationService) Get(_ context.Context, id string) (*Operation, error) { +// Get возвращает операцию по ее идентификатору +func (s *service) Get(_ context.Context, id string) (*Operation, error) { s.mu.RLock() defer s.mu.RUnlock() op, ok := s.ops[id] @@ -97,13 +90,18 @@ func (s *OperationService) Get(_ context.Context, id string) (*Operation, error) return op, nil } -func (s *OperationService) Cancel(_ context.Context, id string) (*Operation, error) { +// Cancel отменяет операцию по ее идентификатору +func (s *service) Cancel(ctx context.Context, id string) (*Operation, error) { s.mu.Lock() defer s.mu.Unlock() op, ok := s.ops[id] if !ok { return nil, errors.New("operation not found") } - op.Done = true + + if op.cancelFunc != nil { + op.cancelFunc(ctx) + } + return op, nil }