diff --git a/pkg/operation/operation.go b/pkg/operation/operation.go index 11e17ca1b6e05254b58fba732d96ea392bb5a34d..dfef3b99e4744d1713fc3b0a8d06bb14aedfebb5 100644 --- a/pkg/operation/operation.go +++ b/pkg/operation/operation.go @@ -5,31 +5,48 @@ import ( "time" "git.perx.ru/perxis/perxis-go/pkg/errors" - + "git.perx.ru/perxis/perxis-go/pkg/id" "git.perx.ru/perxis/perxis-go/proto/common" "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" ) +// DefaultPollInterval интервал опроса операции по умолчанию const DefaultPollInterval = time.Second type Client = common.OperationServiceClient type Proto = common.Operation +// Operation обертка над `common.Operation` для работы с продолжительными операциями. +// Позволяет получить результат операции, проверить статус, отменить операцию type Operation struct { - proto *Proto - client Client + proto *Proto + client Client + + retention time.Duration + cancelFunc func(ctx context.Context) + newTimer func(time.Duration) (func() <-chan time.Time, func() bool) } -func NewOperation(client Client, proto *Proto) *Operation { - return &Operation{ - proto: proto, - client: client, +// New создает новую операцию +func New(proto *Proto) *Operation { + o := &Operation{ + proto: proto, } + o.proto.Id = id.GenerateNewID() + o.proto.CreatedAt = timestamppb.New(time.Now()) + return o } -func NewRemoteOperation(client Client, proto *Proto) *Operation { +// Wrap оборачивает `common.Operation` в Operation для работы c сервисом поддерживающим продолжительных операций +func Wrap(client Client, proto *Proto) *Operation { + if proto == nil || proto.GetId() == "" { + return nil + } + return &Operation{ proto: proto, client: client, @@ -40,6 +57,10 @@ func (o *Operation) Id() string { return o.proto.GetError() } +func (o *Operation) Proto() *Proto { + return o.proto +} + func (o *Operation) Description() string { return o.proto.GetDescription() } @@ -64,6 +85,30 @@ func (o *Operation) Response() (proto.Message, error) { return r.UnmarshalNew() } +func (o *Operation) SetCreatedBy(createdBy string) { + o.proto.CreatedBy = createdBy +} + +func (o *Operation) SetCreatedAt(t time.Time) { + o.proto.CreatedAt = timestamppb.New(t) +} + +func (o *Operation) SetResponse(m proto.Message) error { + resp, err := anypb.New(m) + if err != nil { + return err + } + + o.proto.Result = &common.Operation_Response{Response: resp} + o.proto.Done = true + return nil +} + +func (o *Operation) SetError(err error) { + o.proto.Result = &common.Operation_Error{Error: err.Error()} + o.proto.Done = true +} + func (o *Operation) Error() error { if errStr := o.proto.GetError(); errStr != "" { return errors.New(errStr) @@ -71,10 +116,28 @@ func (o *Operation) Error() error { return nil } +func (o *Operation) SetRetention(retention time.Duration) { + o.retention = retention +} + +func (o *Operation) IsExpired() bool { + createdAt := o.CreatedAt() + if !createdAt.IsZero() && time.Since(createdAt) > o.retention { + return true + } + return false +} + func (o *Operation) Cancel() error { - req := &common.CancelOperationRequest{OperationId: o.Id()} - _, err := o.client.Cancel(context.Background(), req) - return err + if o.client != nil { + req := &common.CancelOperationRequest{OperationId: o.Id()} + _, err := o.client.Cancel(context.Background(), req) + return err + } + + if o.cancelFunc != nil { + o.cancelFunc(context.Background()) + } } func (o *Operation) Poll(ctx context.Context, opts ...grpc.CallOption) error { @@ -113,5 +176,5 @@ func (o *Operation) waitInterval(ctx context.Context, pollInterval time.Duration } } - return errors.Wrap(o.GetError(), "operation failed") + return errors.Wrap(o.Error(), "operation failed") } diff --git a/pkg/operation/server.go b/pkg/operation/server.go new file mode 100644 index 0000000000000000000000000000000000000000..1106ed3360aff0b8777b5f3272134296c700990b --- /dev/null +++ b/pkg/operation/server.go @@ -0,0 +1,34 @@ +package operation + +import ( + "context" + + "git.perx.ru/perxis/perxis-go/proto/common" +) + +type server struct { + svc Service + common.UnimplementedOperationServiceServer +} + +func NewServer(svc Service) common.OperationServiceServer { + return &server{ + svc: svc, + } +} + +func (s *server) Get(ctx context.Context, req *common.GetOperationRequest) (*common.Operation, error) { + op, err := s.svc.Get(ctx, req.GetOperationId()) + if err != nil { + return nil, err + } + return op.Proto(), nil +} + +func (s *server) Cancel(ctx context.Context, req *common.CancelOperationRequest) (*common.Operation, error) { + op, err := s.svc.Cancel(ctx, req.GetOperationId()) + if err != nil { + return nil, err + } + return op.Proto(), nil +} diff --git a/pkg/operation/service.go b/pkg/operation/service.go index 1b89c3289fe4726d69bd0b9f2c05744d91d9db86..e053c98b12f708387e4b04b6d4c82f60998962ca 100644 --- a/pkg/operation/service.go +++ b/pkg/operation/service.go @@ -1,40 +1,61 @@ package operation import ( + "context" "sync" "time" "git.perx.ru/perxis/perxis-go/pkg/errors" ) +const ( + DefaultRetention = time.Hour +) + type Service interface { - Get(id string) (*Operation, error) - Cancel(id string) (*Operation, error) + Get(ctx context.Context, id string) (*Operation, error) + Cancel(ctx context.Context, id string) (*Operation, error) } type OperationService struct { - ops map[string]*Operation - mu sync.RWMutex - retention time.Duration - cleanup time.Duration + ops map[string]*Operation + mu sync.RWMutex + lastCleanup time.Time + cleanupPeriod time.Duration +} + +func NewService(retention, cleanupPeriod time.Duration) *OperationService { + return &OperationService{ + ops: make(map[string]*Operation), + mu: sync.RWMutex{}, + cleanupPeriod: time.Minute, + } } -func NewService() *OperationService { +func NewDefaultService() *OperationService { return &OperationService{ - ops: make(map[string]*Operation), - mu: sync.RWMutex{}, - }, + ops: make(map[string]*Operation), + mu: sync.RWMutex{}, + lastCleanup: time.Now(), + cleanupPeriod: time.Minute, + } } func (s *OperationService) Cleanup() { s.mu.Lock() defer s.mu.Unlock() for id, op := range s.ops { - createdAt := op.CreatedAt() - if createdAt.IsZero() || time.Since(createdAt) > s.retention { + if op.IsExpired() { delete(s.ops, id) } } + s.lastCleanup = time.Now() +} + +func (s *OperationService) CleanupIfNeeded() { + if time.Since(s.lastCleanup) > s.cleanupPeriod { + s.Cleanup() + } } func (s *OperationService) Set(op *Operation) error { @@ -42,13 +63,31 @@ func (s *OperationService) Set(op *Operation) error { return errors.New("invalid operation") } + if op.retention == 0 { + op.retention = DefaultRetention + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.ops[op.Id()] = op + + s.CleanupIfNeeded() + return nil +} + +func (s *OperationService) New(op *Operation) error { + if op == nil || op.Id() == "" { + return errors.New("invalid operation") + } + s.mu.Lock() defer s.mu.Unlock() s.ops[op.Id()] = op return nil } -func (s *OperationService) Get(id string) (*Operation, error) { +func (s *OperationService) Get(_ context.Context, id string) (*Operation, error) { s.mu.RLock() defer s.mu.RUnlock() op, ok := s.ops[id] @@ -58,7 +97,7 @@ func (s *OperationService) Get(id string) (*Operation, error) { return op, nil } -func (s *OperationService) Cancel(id string) (*Operation, error) { +func (s *OperationService) Cancel(_ context.Context, id string) (*Operation, error) { s.mu.Lock() defer s.mu.Unlock() op, ok := s.ops[id]