Skip to content
Snippets Groups Projects
Commit 1f6b100c authored by Pavel Antonov's avatar Pavel Antonov :asterisk: Committed by Alena Petraki
Browse files

WIP

parent 05d5db44
No related branches found
No related tags found
No related merge requests found
......@@ -5,31 +5,48 @@ import (
"time"
"git.perx.ru/perxis/perxis-go/pkg/errors"
"git.perx.ru/perxis/perxis-go/pkg/id"
"git.perx.ru/perxis/perxis-go/proto/common"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
// DefaultPollInterval интервал опроса операции по умолчанию
const DefaultPollInterval = time.Second
type Client = common.OperationServiceClient
type Proto = common.Operation
// Operation обертка над `common.Operation` для работы с продолжительными операциями.
// Позволяет получить результат операции, проверить статус, отменить операцию
type Operation struct {
proto *Proto
client Client
retention time.Duration
cancelFunc func(ctx context.Context)
newTimer func(time.Duration) (func() <-chan time.Time, func() bool)
}
func NewOperation(client Client, proto *Proto) *Operation {
return &Operation{
// New создает новую операцию
func New(proto *Proto) *Operation {
o := &Operation{
proto: proto,
client: client,
}
o.proto.Id = id.GenerateNewID()
o.proto.CreatedAt = timestamppb.New(time.Now())
return o
}
// Wrap оборачивает `common.Operation` в Operation для работы c сервисом поддерживающим продолжительных операций
func Wrap(client Client, proto *Proto) *Operation {
if proto == nil || proto.GetId() == "" {
return nil
}
func NewRemoteOperation(client Client, proto *Proto) *Operation {
return &Operation{
proto: proto,
client: client,
......@@ -40,6 +57,10 @@ func (o *Operation) Id() string {
return o.proto.GetError()
}
func (o *Operation) Proto() *Proto {
return o.proto
}
func (o *Operation) Description() string {
return o.proto.GetDescription()
}
......@@ -64,6 +85,30 @@ func (o *Operation) Response() (proto.Message, error) {
return r.UnmarshalNew()
}
func (o *Operation) SetCreatedBy(createdBy string) {
o.proto.CreatedBy = createdBy
}
func (o *Operation) SetCreatedAt(t time.Time) {
o.proto.CreatedAt = timestamppb.New(t)
}
func (o *Operation) SetResponse(m proto.Message) error {
resp, err := anypb.New(m)
if err != nil {
return err
}
o.proto.Result = &common.Operation_Response{Response: resp}
o.proto.Done = true
return nil
}
func (o *Operation) SetError(err error) {
o.proto.Result = &common.Operation_Error{Error: err.Error()}
o.proto.Done = true
}
func (o *Operation) Error() error {
if errStr := o.proto.GetError(); errStr != "" {
return errors.New(errStr)
......@@ -71,12 +116,30 @@ func (o *Operation) Error() error {
return nil
}
func (o *Operation) SetRetention(retention time.Duration) {
o.retention = retention
}
func (o *Operation) IsExpired() bool {
createdAt := o.CreatedAt()
if !createdAt.IsZero() && time.Since(createdAt) > o.retention {
return true
}
return false
}
func (o *Operation) Cancel() error {
if o.client != nil {
req := &common.CancelOperationRequest{OperationId: o.Id()}
_, err := o.client.Cancel(context.Background(), req)
return err
}
if o.cancelFunc != nil {
o.cancelFunc(context.Background())
}
}
func (o *Operation) Poll(ctx context.Context, opts ...grpc.CallOption) error {
req := &common.GetOperationRequest{OperationId: o.Id()}
op, err := o.client.Get(ctx, req, opts...)
......@@ -113,5 +176,5 @@ func (o *Operation) waitInterval(ctx context.Context, pollInterval time.Duration
}
}
return errors.Wrap(o.GetError(), "operation failed")
return errors.Wrap(o.Error(), "operation failed")
}
package operation
import (
"context"
"git.perx.ru/perxis/perxis-go/proto/common"
)
type server struct {
svc Service
common.UnimplementedOperationServiceServer
}
func NewServer(svc Service) common.OperationServiceServer {
return &server{
svc: svc,
}
}
func (s *server) Get(ctx context.Context, req *common.GetOperationRequest) (*common.Operation, error) {
op, err := s.svc.Get(ctx, req.GetOperationId())
if err != nil {
return nil, err
}
return op.Proto(), nil
}
func (s *server) Cancel(ctx context.Context, req *common.CancelOperationRequest) (*common.Operation, error) {
op, err := s.svc.Cancel(ctx, req.GetOperationId())
if err != nil {
return nil, err
}
return op.Proto(), nil
}
package operation
import (
"context"
"sync"
"time"
"git.perx.ru/perxis/perxis-go/pkg/errors"
)
const (
DefaultRetention = time.Hour
)
type Service interface {
Get(id string) (*Operation, error)
Cancel(id string) (*Operation, error)
Get(ctx context.Context, id string) (*Operation, error)
Cancel(ctx context.Context, id string) (*Operation, error)
}
type OperationService struct {
ops map[string]*Operation
mu sync.RWMutex
retention time.Duration
cleanup time.Duration
lastCleanup time.Time
cleanupPeriod time.Duration
}
func NewService() *OperationService {
func NewService(retention, cleanupPeriod time.Duration) *OperationService {
return &OperationService{
ops: make(map[string]*Operation),
mu: sync.RWMutex{},
},
cleanupPeriod: time.Minute,
}
}
func NewDefaultService() *OperationService {
return &OperationService{
ops: make(map[string]*Operation),
mu: sync.RWMutex{},
lastCleanup: time.Now(),
cleanupPeriod: time.Minute,
}
}
func (s *OperationService) Cleanup() {
s.mu.Lock()
defer s.mu.Unlock()
for id, op := range s.ops {
createdAt := op.CreatedAt()
if createdAt.IsZero() || time.Since(createdAt) > s.retention {
if op.IsExpired() {
delete(s.ops, id)
}
}
s.lastCleanup = time.Now()
}
func (s *OperationService) CleanupIfNeeded() {
if time.Since(s.lastCleanup) > s.cleanupPeriod {
s.Cleanup()
}
}
func (s *OperationService) Set(op *Operation) error {
......@@ -42,13 +63,31 @@ func (s *OperationService) Set(op *Operation) error {
return errors.New("invalid operation")
}
if op.retention == 0 {
op.retention = DefaultRetention
}
s.mu.Lock()
defer s.mu.Unlock()
s.ops[op.Id()] = op
s.CleanupIfNeeded()
return nil
}
func (s *OperationService) New(op *Operation) error {
if op == nil || op.Id() == "" {
return errors.New("invalid operation")
}
s.mu.Lock()
defer s.mu.Unlock()
s.ops[op.Id()] = op
return nil
}
func (s *OperationService) Get(id string) (*Operation, error) {
func (s *OperationService) Get(_ context.Context, id string) (*Operation, error) {
s.mu.RLock()
defer s.mu.RUnlock()
op, ok := s.ops[id]
......@@ -58,7 +97,7 @@ func (s *OperationService) Get(id string) (*Operation, error) {
return op, nil
}
func (s *OperationService) Cancel(id string) (*Operation, error) {
func (s *OperationService) Cancel(_ context.Context, id string) (*Operation, error) {
s.mu.Lock()
defer s.mu.Unlock()
op, ok := s.ops[id]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment