Skip to content
Snippets Groups Projects
Commit 26067822 authored by Pavel Antonov's avatar Pavel Antonov :asterisk: Committed by Alena Petraki
Browse files

WIP

parent 1f6b100c
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ package operation
import (
"context"
"fmt"
"time"
"git.perx.ru/perxis/perxis-go/pkg/errors"
......@@ -53,18 +54,27 @@ func Wrap(client Client, proto *Proto) *Operation {
}
}
// String возвращает строковое представление операции
func (o *Operation) String() string {
return fmt.Sprintf("Operation #%s (%s, %s) Done:%t", o.Id(), o.CreatedAt(), o.CreatedBy(), o.IsDone())
}
// Id возвращает идентификатор операции
func (o *Operation) Id() string {
return o.proto.GetError()
return o.proto.GetId()
}
// Proto возвращает `common.Operation`
func (o *Operation) Proto() *Proto {
return o.proto
}
// Description возвращает описание операции
func (o *Operation) Description() string {
return o.proto.GetDescription()
}
// CreatedAt возвращает время создания операции
func (o *Operation) CreatedAt() time.Time {
c := o.proto.GetCreatedAt()
if c != nil {
......@@ -73,10 +83,12 @@ func (o *Operation) CreatedAt() time.Time {
return time.Time{}
}
// IsDone возвращает true если операция завершена
func (o *Operation) IsDone() bool {
return o.proto.GetDone()
}
// Response возвращает результат операции
func (o *Operation) Response() (proto.Message, error) {
r := o.proto.GetResponse()
if r == nil {
......@@ -85,14 +97,22 @@ func (o *Operation) Response() (proto.Message, error) {
return r.UnmarshalNew()
}
// CreatedBy возвращает имя пользователя создавшего операцию
func (o *Operation) CreatedBy() string {
return o.proto.GetCreatedBy()
}
// SetCreatedBy устанавливает описание операции
func (o *Operation) SetCreatedBy(createdBy string) {
o.proto.CreatedBy = createdBy
}
// SetCreatedAt устанавливает время создания операции
func (o *Operation) SetCreatedAt(t time.Time) {
o.proto.CreatedAt = timestamppb.New(t)
}
// SetResponse устанавливает результат операции
func (o *Operation) SetResponse(m proto.Message) error {
resp, err := anypb.New(m)
if err != nil {
......@@ -104,11 +124,18 @@ func (o *Operation) SetResponse(m proto.Message) error {
return nil
}
// SetCancelFunc устанавливает функцию отмены операции
func (o *Operation) SetCancelFunc(cancelFunc func(ctx context.Context)) {
o.cancelFunc = cancelFunc
}
// SetError устанавливает ошибку операции
func (o *Operation) SetError(err error) {
o.proto.Result = &common.Operation_Error{Error: err.Error()}
o.proto.Done = true
}
// Error возвращает ошибку операции
func (o *Operation) Error() error {
if errStr := o.proto.GetError(); errStr != "" {
return errors.New(errStr)
......@@ -116,30 +143,32 @@ func (o *Operation) Error() error {
return nil
}
// SetRetention устанавливает время хранения операции
func (o *Operation) SetRetention(retention time.Duration) {
o.retention = retention
}
// IsExpired возвращает true если операция просрочена
func (o *Operation) IsExpired() bool {
createdAt := o.CreatedAt()
if !createdAt.IsZero() && time.Since(createdAt) > o.retention {
if o.retention != 0 && !createdAt.IsZero() && time.Since(createdAt) > o.retention {
return true
}
return false
}
// Cancel отменяет операцию
func (o *Operation) Cancel() error {
if o.client != nil {
req := &common.CancelOperationRequest{OperationId: o.Id()}
_, err := o.client.Cancel(context.Background(), req)
op, err := o.client.Cancel(context.Background(), req)
if err != nil {
return err
}
if o.cancelFunc != nil {
o.cancelFunc(context.Background())
}
o.proto = op
return nil
}
// Poll запрашивает состояние операции
func (o *Operation) Poll(ctx context.Context, opts ...grpc.CallOption) error {
req := &common.GetOperationRequest{OperationId: o.Id()}
op, err := o.client.Get(ctx, req, opts...)
......@@ -150,10 +179,12 @@ func (o *Operation) Poll(ctx context.Context, opts ...grpc.CallOption) error {
return nil
}
// Wait ожидает завершения операции
func (o *Operation) Wait(ctx context.Context, opts ...grpc.CallOption) error {
return o.waitInterval(ctx, DefaultPollInterval, opts...)
}
// waitInterval ожидает завершения операции с указанным интервалом опроса
func (o *Operation) waitInterval(ctx context.Context, pollInterval time.Duration, opts ...grpc.CallOption) error {
for !o.IsDone() {
err := o.Poll(ctx, opts...)
......
package operation
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestOperation(t *testing.T) {
op := New(&Proto{})
assert.NotNil(t, op)
assert.NotEmpty(t, op.Id())
assert.False(t, op.IsExpired())
assert.False(t, op.IsDone())
assert.Less(t, time.Since(op.CreatedAt()), time.Second)
op.SetRetention(time.Nanosecond)
assert.True(t, op.IsExpired())
op.SetCancelFunc(func(ctx context.Context) {
// do nothing
})
}
......@@ -13,27 +13,29 @@ const (
)
type Service interface {
Set(ctx context.Context, op *Operation) error
Get(ctx context.Context, id string) (*Operation, error)
Cancel(ctx context.Context, id string) (*Operation, error)
Cleanup()
}
type OperationService struct {
type service struct {
ops map[string]*Operation
mu sync.RWMutex
lastCleanup time.Time
cleanupPeriod time.Duration
}
func NewService(retention, cleanupPeriod time.Duration) *OperationService {
return &OperationService{
func NewService(retention, cleanupPeriod time.Duration) Service {
return &service{
ops: make(map[string]*Operation),
mu: sync.RWMutex{},
cleanupPeriod: time.Minute,
}
}
func NewDefaultService() *OperationService {
return &OperationService{
func NewDefaultService() Service {
return &service{
ops: make(map[string]*Operation),
mu: sync.RWMutex{},
lastCleanup: time.Now(),
......@@ -41,7 +43,7 @@ func NewDefaultService() *OperationService {
}
}
func (s *OperationService) Cleanup() {
func (s *service) Cleanup() {
s.mu.Lock()
defer s.mu.Unlock()
for id, op := range s.ops {
......@@ -49,16 +51,17 @@ func (s *OperationService) Cleanup() {
delete(s.ops, id)
}
}
s.lastCleanup = time.Now()
}
func (s *OperationService) CleanupIfNeeded() {
func (s *service) CleanupIfNeeded() {
if time.Since(s.lastCleanup) > s.cleanupPeriod {
s.Cleanup()
s.lastCleanup = time.Now()
}
}
func (s *OperationService) Set(op *Operation) error {
// Set сохраняет операцию в хранилище
func (s *service) Set(_ context.Context, op *Operation) error {
if op == nil || op.Id() == "" {
return errors.New("invalid operation")
}
......@@ -76,18 +79,8 @@ func (s *OperationService) Set(op *Operation) error {
return nil
}
func (s *OperationService) New(op *Operation) error {
if op == nil || op.Id() == "" {
return errors.New("invalid operation")
}
s.mu.Lock()
defer s.mu.Unlock()
s.ops[op.Id()] = op
return nil
}
func (s *OperationService) Get(_ context.Context, id string) (*Operation, error) {
// Get возвращает операцию по ее идентификатору
func (s *service) Get(_ context.Context, id string) (*Operation, error) {
s.mu.RLock()
defer s.mu.RUnlock()
op, ok := s.ops[id]
......@@ -97,13 +90,18 @@ func (s *OperationService) Get(_ context.Context, id string) (*Operation, error)
return op, nil
}
func (s *OperationService) Cancel(_ context.Context, id string) (*Operation, error) {
// Cancel отменяет операцию по ее идентификатору
func (s *service) Cancel(ctx context.Context, id string) (*Operation, error) {
s.mu.Lock()
defer s.mu.Unlock()
op, ok := s.ops[id]
if !ok {
return nil, errors.New("operation not found")
}
op.Done = true
if op.cancelFunc != nil {
op.cancelFunc(ctx)
}
return op, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment