diff --git a/command/command.go b/command/command.go new file mode 100644 index 0000000000000000000000000000000000000000..461c2b9b74a95642f0078eaef869318087d706db --- /dev/null +++ b/command/command.go @@ -0,0 +1,23 @@ +package command + +import ( + "context" + + "mtoohey.com/iter/v2" +) + +type Command interface { + Execute(ctx context.Context) error +} + +type SubCommand interface { + Commands(ctx context.Context) iter.Iter[Command] +} + +type Commands iter.Iter[Command] + +type EmptyCommand struct{} + +func (*EmptyCommand) Execute(ctx context.Context) error { + return nil +} diff --git a/command/history.go b/command/history.go new file mode 100644 index 0000000000000000000000000000000000000000..0d25cf7168b9270d203769b0befb544e61f9b4c6 --- /dev/null +++ b/command/history.go @@ -0,0 +1,13 @@ +package command + +type History struct { + commands []Command +} + +func NewHistory() *History { + return &History{} +} + +func (h *History) Add(cmd Command) { + h.commands = append(h.commands, cmd) +} diff --git a/command/invoker.go b/command/invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..00c74a1954c3c7bc745d411cdc539ff4b05afad6 --- /dev/null +++ b/command/invoker.go @@ -0,0 +1,49 @@ +package command + +import ( + "context" + + "mtoohey.com/iter/v2" +) + +type Invoker struct { + history *History +} + +func NewInvoker() *Invoker { + return &Invoker{ + history: NewHistory(), + } +} + +func (e *Invoker) Execute(ctx context.Context, cmds ...Command) error { + return e.ExecuteIter(ctx, iter.Elems(cmds)) +} + +func (e *Invoker) executeCommand(ctx context.Context, cmd Command) error { + if e.history != nil { + e.history.Add(cmd) + } + return cmd.Execute(ctx) +} + +func (e *Invoker) ExecuteIter(ctx context.Context, cmds iter.Iter[Command]) error { + for { + cmd, cont := cmds() + if !cont { + break + } + + if err := e.executeCommand(ctx, cmd); err != nil { + return err + } + + if sub, ok := cmd.(SubCommand); ok { + subs := sub.Commands(ctx) + if subs != nil { + cmds = subs.Chain(cmds) + } + } + } + return nil +} diff --git a/command/invoker_test.go b/command/invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..52b2b7c8dfc5751d8c9f9bd18f30b72c72ba4f12 --- /dev/null +++ b/command/invoker_test.go @@ -0,0 +1,116 @@ +package command + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "mtoohey.com/iter/v2" + + "git.perx.ru/perxis/perxis-go/pkg/errors" +) + +type push struct { + s *string + r rune +} + +func (p *push) Execute(ctx context.Context) error { + str := *(p.s) + str = str + string(p.r) + *(p.s) = str + return nil +} + +type pop struct { + s *string +} + +func (p *pop) Execute(ctx context.Context) error { + str := *(p.s) + + if len(str) == 0 { + return errors.New("pop: empty string") + } + + *(p.s) = str[:len(str)-1] + return nil +} + +type repeat struct { + cmd Command + n int +} + +func (r *repeat) Execute(ctx context.Context) error { + return nil +} + +func (r *repeat) Commands(ctx context.Context) iter.Iter[Command] { + n := r.n + return iter.GenWhile(func() (Command, error) { + n = n - 1 + if n < 0 { + return nil, errors.New("repeat: out of bounds") + } + return r.cmd, nil + }) +} + +func TestInvoker_Execute(t *testing.T) { + t.Run("push", func(t *testing.T) { + invoker := NewInvoker() + + var cmds []Command + var str string + cmds = append(cmds, &push{s: &str, r: 'a'}) + cmds = append(cmds, &push{s: &str, r: 'b'}) + cmds = append(cmds, &push{s: &str, r: 'c'}) + err := invoker.Execute(context.Background(), cmds...) + require.NoError(t, err) + require.Equal(t, "abc", str) + }) + + t.Run("pop", func(t *testing.T) { + invoker := NewInvoker() + + var cmds []Command + var str string + cmds = append(cmds, &push{s: &str, r: 'a'}) + cmds = append(cmds, &push{s: &str, r: 'b'}) + cmds = append(cmds, &push{s: &str, r: 'C'}) + cmds = append(cmds, &push{s: &str, r: 'D'}) + cmds = append(cmds, &pop{s: &str}) + cmds = append(cmds, &pop{s: &str}) + cmds = append(cmds, &push{s: &str, r: 'c'}) + err := invoker.Execute(context.Background(), cmds...) + require.NoError(t, err) + require.Equal(t, "abc", str) + }) + t.Run("repeat", func(t *testing.T) { + invoker := NewInvoker() + + var cmds []Command + var str string + + cmds = append(cmds, &repeat{cmd: &push{s: &str, r: 'a'}, n: 4}) + cmds = append(cmds, &pop{s: &str}) + cmds = append(cmds, &pop{s: &str}) + cmds = append(cmds, &pop{s: &str}) + err := invoker.Execute(context.Background(), cmds...) + require.NoError(t, err) + require.Equal(t, "a", str) + }) + t.Run("error", func(t *testing.T) { + invoker := NewInvoker() + + var cmds []Command + var str string + + cmds = append(cmds, &repeat{cmd: &push{s: &str, r: 'a'}, n: 4}) + cmds = append(cmds, &repeat{cmd: &pop{s: &str}, n: 5}) + err := invoker.Execute(context.Background(), cmds...) + require.Error(t, err) + }) +} diff --git a/datasource/collection.go b/datasource/collection.go new file mode 100644 index 0000000000000000000000000000000000000000..a51ab66906d1e7cc5befff18b57a8db41cf70da0 --- /dev/null +++ b/datasource/collection.go @@ -0,0 +1,80 @@ +package datasource + +import ( + "context" + + "git.perx.ru/perxis/perxis-go/pkg/collections" + "github.com/barweiss/go-tuple" + "mtoohey.com/iter/v2" +) + +// CollectionTuple - кортеж из двух коллекций +type CollectionTuple = tuple.T2[*collections.Collection, *collections.Collection] + +// CollectionDataSource возвращает Datasource по коллекциям +// type CollectionDataSource func() iter.Iter[*collections.Collection] + +// CollectionDataSource - это структура данных, представляющая источник данных для коллекций. +type CollectionDataSource struct { + err error + spaceId string + envId string + filter *collections.Filter + svc collections.Collections + colls []*collections.Collection +} + +// NewCollectionsDatasource является функцией для создания нового источника данных коллекций. +// Эта функция принимает следующие параметры: +// svc - интерфейс Collections, который предоставляет методы для работы с коллекциями данных. +// spaceId - строка, идентифицирующая пространство данных. +// envId - строка, идентифицирующая среду данных. +// filter - структура Filter, которая используется для настройки фильтрации возвращаемых коллекций. +// Возвращает структуру CollectionDataSource, которая может быть использована +// для итерации по коллекциям, отфильтрованным на основе предоставленных параметров. +func NewCollectionsDatasource(svc collections.Collections, spaceId, envId string, filter *collections.Filter) *CollectionDataSource { + return &CollectionDataSource{ + svc: svc, + spaceId: spaceId, + envId: envId, + filter: filter, + } +} + +func (ds *CollectionDataSource) Error() error { + return ds.err +} + +func (ds *CollectionDataSource) Iter(ctx context.Context) iter.Iter[*collections.Collection] { + i := 0 + + return func() (*collections.Collection, bool) { + if ds.err != nil { + return nil, false + } + + if ds.colls == nil { + ds.colls, ds.err = ds.svc.List(ctx, ds.spaceId, ds.envId, ds.filter) + } + + if ds.err != nil { + return nil, false + } + + if i >= len(ds.colls) { + return nil, false + } + + c := ds.colls[i] + i++ + return c, true + } +} + +func CompareCollectionID(a, b *collections.Collection) (bool, bool) { + return a.ID < b.ID, a.ID == b.ID +} + +func MatchCollectionByID(a, b iter.Iter[*collections.Collection]) iter.Iter[CollectionTuple] { + return Match(a, b, CompareCollectionID) +} diff --git a/datasource/collection_commands.go b/datasource/collection_commands.go new file mode 100644 index 0000000000000000000000000000000000000000..1e3f33197c9f2ae510c7906d7f05790780ceeb84 --- /dev/null +++ b/datasource/collection_commands.go @@ -0,0 +1,104 @@ +package datasource + +import ( + "context" + + "git.perx.ru/perxis/perxis-go/command" + + "git.perx.ru/perxis/perxis-go/pkg/content" + + "git.perx.ru/perxis/perxis-go/pkg/collections" + "mtoohey.com/iter/v2" +) + +type Builder interface { + CreateCollection(c *collections.Collection) command.Command + DeleteCollection(c *collections.Collection) command.Command + CopyItems(source, target *collections.Collection) command.Command + SyncItems(source, target *collections.Collection) command.Command +} + +func NewCreateCollectionCommand(cnt *content.Content, coll *collections.Collection) *CreateCollectionCommand { + return &CreateCollectionCommand{Collection: coll} +} + +type CreateCollectionCommand struct { + Collection *collections.Collection +} + +func (cmd *CreateCollectionCommand) Execute(ctx context.Context) error { + return nil +} + +type CopyItems struct { + Source *collections.Collection + Target *collections.Collection +} + +func (cmd *CopyItems) Execute(ctx context.Context) error { + return cmd.Executor.Handle(ctx, cmd) +} + +type SyncItemsCommand struct { + Source *ItemDataSource + Target *ItemDataSource +} + +type SyncCollectionsCommand struct { + Source *CollectionDataSource + Target *CollectionDataSource + Executor Executor + ParentCmd Command + Error error + Strategy func(cmd *SyncCollectionsCommand, Source, Target *collections.Collection) iter.Iter[Command] +} + +func SyncCollectionsStrategy(cmd *SyncCollectionsCommand, Source, Target *collections.Collection) iter.Iter[Command] { + if Source == nil { + return iter.Elems([]Command{&NilCommand{}}) + } + + if Target == nil { + coll := Source.Clone() + coll.SpaceID = cmd.Target.spaceId + coll.EnvID = cmd.Target.envId + return iter.Elems([]Command{ + &CreateCollectionCommand{Collection: coll}, + &CopyItems{Source: Source, Target: coll}, + }) + } + + return iter.Elems([]Command{ + &CopyItems{Source: Source, Target: Target}, + }) +} + +func SyncCollectionsStrategy(cmd *SyncCollectionsCommand, Source, Target *collections.Collection) iter.Iter[Command] { + if Source == nil { + return iter.Elems([]Command{&NilCommand{}}) + } + + if Target == nil { + coll := Source.Clone() + coll.SpaceID = cmd.Target.spaceId + coll.EnvID = cmd.Target.envId + return iter.Elems([]Command{ + &CreateCollectionCommand{Collection: coll}, + &CopyItems{Source: Source, Target: coll}, + }) + } + + return iter.Elems([]Command{ + &CopyItems{Source: Source, Target: Target}, + }) +} + +func (cmd *SyncCollectionsCommand) Execute(ctx context.Context) error { + match := MatchCollectionByID(cmd.Source.Iter(ctx), cmd.Target.Iter(ctx)) + + cmds := iter.FlatMap(match, func(t CollectionTuple) iter.Iter[Command] { + return cmd.Strategy(cmd, t.V1, t.V2) + }) + + return cmd.Executor.ExecuteIter(ctx, cmds) +} diff --git a/pkg/datasource/collection_test.go b/datasource/collection_test.go similarity index 100% rename from pkg/datasource/collection_test.go rename to datasource/collection_test.go diff --git a/datasource/command.go b/datasource/command.go new file mode 100644 index 0000000000000000000000000000000000000000..0d7172bf8dbc1c02c4cff55bcee7dbb8380447bc --- /dev/null +++ b/datasource/command.go @@ -0,0 +1 @@ +package datasource diff --git a/pkg/datasource/datasource.go b/datasource/datasource.go similarity index 100% rename from pkg/datasource/datasource.go rename to datasource/datasource.go diff --git a/pkg/datasource/filter.go b/datasource/filter.go similarity index 96% rename from pkg/datasource/filter.go rename to datasource/filter.go index ff8bab79d180a730e971f120f4373836e7068c17..31482e4c6f4bd1b789c8b3d930204c346e1bd88b 100644 --- a/pkg/datasource/filter.go +++ b/datasource/filter.go @@ -17,5 +17,3 @@ func FilterEqual[T Value[T]](t tuple.T2[T, T]) bool { func FilterDiff[T Value[T]](t tuple.T2[T, T]) bool { return !FilterEqual(t) } - -func Chanded \ No newline at end of file diff --git a/pkg/datasource/items.go b/datasource/items.go similarity index 100% rename from pkg/datasource/items.go rename to datasource/items.go diff --git a/pkg/datasource/items_test.go b/datasource/items_test.go similarity index 100% rename from pkg/datasource/items_test.go rename to datasource/items_test.go diff --git a/datasource/match.go b/datasource/match.go new file mode 100644 index 0000000000000000000000000000000000000000..bf33a9028c0e05e8afbe6c4bfe97c33256e2d406 --- /dev/null +++ b/datasource/match.go @@ -0,0 +1,78 @@ +package datasource + +import ( + "github.com/barweiss/go-tuple" + "mtoohey.com/iter/v2" +) + +type Compare uint + +const ( + EQUAL Compare = iota + LESS + GREATER +) + +type CompareFunc[V, W any] func(V, W) (less bool, equal bool) + +// MatchID возвращает Datasource по двум наборам коллекций, объединяющие коллекции с одинаковыми ID +// Оба итератора должны возвращать записи в порядке возрастания ID. +// Match читает значения вперед на 1 запись, чтобы определить, какие записи объединить. +func Match[V, W any](i1 iter.Iter[V], i2 iter.Iter[W], f CompareFunc[V, W]) iter.Iter[tuple.T2[V, W]] { + var ( + c1 V // Current value from i1 + c2 W // Current value from i2 + cont1 bool // Continue reading from i1 + cont2 bool // Continue reading from i2 + skip1 bool // Skip reading from i1 + skip2 bool // Skip reading from i2 + ) + + return func() (tuple.T2[V, W], bool) { + var t tuple.T2[V, W] + + if !skip1 { + c1, cont1 = i1() + } + + if !skip2 { + c2, cont2 = i2() + } + + skip1 = false + skip2 = false + + if !cont1 && !cont2 { + skip1 = true + skip2 = true + return t, false + } + + if !cont1 { + t = tuple.New2(Zero[V](), c2) + skip1 = true + return t, true + } + + if !cont2 { + t = tuple.New2(c1, Zero[W]()) + skip2 = true + return t, true + } + + less, equal := f(c1, c2) + + switch { + case equal: + t = tuple.New2(c1, c2) + case less: + t = tuple.New2(c1, Zero[W]()) + skip2 = true + default: + t = tuple.New2(Zero[V](), c2) + skip1 = true + } + + return t, true + } +} diff --git a/datasource/value.go b/datasource/value.go new file mode 100644 index 0000000000000000000000000000000000000000..c0b237979d268e05e1ce9ab39f3f1ce282adac32 --- /dev/null +++ b/datasource/value.go @@ -0,0 +1,25 @@ +package datasource + +func Zero[V any]() V { + var e V + return e +} + +type Value[V comparable] interface { + IsEqual(other V) bool + comparable +} + +type ComparableValue[V comparable] interface { + Value[V] + IsLess(other V) bool +} + +type IDValue[V comparable] interface { + Value[V] + GetID() string +} + +func IsZero[V comparable](t V) bool { + return t == Zero[V]() +} diff --git a/pkg/datasource/copy.go b/pkg/datasource/copy.go index 701921ef84a3918a05bd4e4d581299fa3ae9d352..208ab3eac9849ef42bd2eec275941107dee829fd 100644 --- a/pkg/datasource/copy.go +++ b/pkg/datasource/copy.go @@ -34,6 +34,16 @@ type Copy struct { env2 string } +type SyncConfig struct { + SrcCollectionFilter func(*collections.Collection) bool // Фильтр коллекций в источнике + DstCollectionFilter func(*collections.Collection) bool // Фильтр коллекций в приемнике + SyncCollectionFilter func(tuple *CollectionTuple) bool // Фильтр пар коллекций для синхронизации + CollectionMatcher func(*collections.Collection, *collections.Collection) bool // Функция сравнения коллекций ?? + SrcItemFilter func(*items.Item) bool // Фильтр элементов в источнике + DstItemFilter func(*items.Item) bool // Фильтр элементов в приемнике + SyncItemFilter func(tuple *ItemTuple) bool // Функция сравнения элементов ?? +} + func NewCopy(colSvc collections.Collections, itemSvc items.Items, space1, space2, env1, env2 string) *Copy { return &Copy{ colSvc: colSvc, diff --git a/sync/command.go b/sync/command.go new file mode 100644 index 0000000000000000000000000000000000000000..8b126612782aa7096de147d4c591bdb16310bfe2 --- /dev/null +++ b/sync/command.go @@ -0,0 +1,34 @@ +package sync + +import ( + "context" + + "mtoohey.com/iter/v2" +) + +type Command interface { + Execute(ctx context.Context) error + Commands(ctx context.Context) iter.Iter[Command] +} + +type Commands iter.Iter[Command] + +type Executor interface { + Execute(ctx context.Context, cmds ...Command) error + ExecuteIter(ctx context.Context, cmds iter.Iter[Command]) error +} + +type executor struct { +} + +func (e *executor) Execute(ctx context.Context, cmds ...Command) error { + return nil +} + +func (e *executor) ExecuteIter(ctx context.Context, cmds iter.Iter[Command]) error { + cmds.ForEach(func(cmd Command) { + _ = cmd.Execute(ctx) + sub := cmd.Commands(ctx) + cmds = cmds.Chain(sub) + }) +} diff --git a/sync/doc.go b/sync/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..1ca2a85edd97d3308db052b6cb9b32ce4a2ce794 --- /dev/null +++ b/sync/doc.go @@ -0,0 +1 @@ +package sync diff --git a/sync/sync.go b/sync/sync.go new file mode 100644 index 0000000000000000000000000000000000000000..edf74223eb56d956c7c64fb801aede37b46bb554 --- /dev/null +++ b/sync/sync.go @@ -0,0 +1,31 @@ +package sync + +import ( + "git.perx.ru/perxis/perxis-go/pkg/collections" + "mtoohey.com/iter/v2" +) + +type Sync struct { + i iter.Iter[Command] +} + +// NewSync - создает новый Sync +func NewSync(i iter.Iter[Command]) *Sync { + return &Sync{ + i: i, + } +} + +func (s *Sync) AddCommands(commands iter.Iter[Command]) { + s.i = s.i.Chain(commands) +} + +// Sync - выполняет синхронизацию +func (s *Sync) Sync() error { + +} + +// AddSourceCollectionFilter - добавляет фильтр по коллекции +func (s *Sync) AddSourceCollectionFilter(filter *collections.Filter) { + +}