Skip to content

Commit

Permalink
Merge pull request #12 from rbtr/deleter
Browse files Browse the repository at this point in the history
add deleter plugins
  • Loading branch information
rbtr authored Apr 2, 2020
2 parents 36f096f + c5608bc commit 5d7ff0d
Show file tree
Hide file tree
Showing 27 changed files with 380 additions and 161 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ pachinko currently supports these outputs:
- [trakt collector (`trakt_collector`)](docs/plugins/outputs/trakt.md)

#### processors
pachinko has the following required processors:
- categorizer (internal)

pachinko has the following optional processors:
- tv identifier (pre-tv)
- movie identifier (pre-movie)
- tvdb (intra-tvdb)
- tmdb (intra-tmdb)
- tv path solver (post-tv_path_solver)
- movie path solver (post-movie_path_solver)
- file deleter (deleter)

### how to run it
pachinko is distributed as a container and as a cross-platform binary.
Expand Down
1 change: 1 addition & 0 deletions docs/examples/pachinko.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ processors:
name: tv-path-solver
season-dirs: true
tv-prefix: tv
- name: deleter
pre:
- name: movie
sanitize-name: true
Expand Down
36 changes: 36 additions & 0 deletions docs/plugins/processor/deleter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
### File deleter processor
The deleter processor marks files for deletion by the internal deletion output. This allows files with certain extensions, empty directories, or that match specified regexps to be deleted after Pachinko has finished sorting.

#### Configuration
The default deleter plugin configuration is:
```yaml
- categories: []
directories: true
extensions:
- 7z
- gz
- gzip
- rar
- tar
- zip
- bmp
- gif
- heic
- jpeg
- jpg
- png
- tiff
- info
- nfo
- txt
- website
matchers: []
name: deleter
```
||||
|-|-|-|
|`categories`|`[]string`|[unimplemented] list of categories of file such as text or archive to remove.|
|`directories`|`bool`|whether to remove directories. Even if true, only *empty* dirs will be removed.|
|`extensions`|`[]string` | list of file extensions to remove.|
|`matchers`|`[]string` | regexps to match files to remove.|
7 changes: 7 additions & 0 deletions internal/config/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/mitchellh/mapstructure"
"github.com/rbtr/pachinko/internal/pipeline"
internalout "github.com/rbtr/pachinko/internal/plugin/output"
internalpre "github.com/rbtr/pachinko/internal/plugin/processor/pre"
"github.com/rbtr/pachinko/plugin/input"
"github.com/rbtr/pachinko/plugin/output"
Expand Down Expand Up @@ -62,6 +63,12 @@ func (c *Sort) ConfigurePipeline(pipe *pipeline.Pipeline) error {
}
}

deleter := internalout.NewDeleter(c.DryRun)
if err := deleter.Init(c.ctx); err != nil {
return err
}
pipe.WithOutputs(deleter)

categorizer := internalpre.NewCategorizer()
if err := categorizer.Init(c.ctx); err != nil {
return err
Expand Down
36 changes: 18 additions & 18 deletions internal/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func NewPipeline() *Pipeline {
}
}

func (p *Pipeline) runInputs(ctx context.Context, sink chan<- types.Media) {
func (p *Pipeline) runInputs(ctx context.Context, sink chan<- types.Item) {
var wg sync.WaitGroup
for _, input := range p.inputs {
wg.Add(1)
go func(_ context.Context, f func(chan<- types.Media), source chan<- types.Media) {
go func(_ context.Context, f func(chan<- types.Item), source chan<- types.Item) {
defer wg.Done()
f(source)
}(ctx, input.Consume, sink)
Expand All @@ -49,52 +49,52 @@ func (p *Pipeline) runInputs(ctx context.Context, sink chan<- types.Media) {
log.Debug("pipeline: inputs finished")
}

func (p *Pipeline) runProcessors(ctx context.Context, source, sink chan types.Media) {
func (p *Pipeline) runProcessors(ctx context.Context, source, sink chan types.Item) {
// this noop post-processor attaches the final internal input stream to the
// external sink
p.processors = processor.AppendFunc(p.processors, func(in <-chan types.Media, _ chan<- types.Media) {
p.processors = processor.AppendFunc(p.processors, func(in <-chan types.Item, _ chan<- types.Item) {
for m := range in {
sink <- m
}
})
var wg sync.WaitGroup
in := source
out := make(chan types.Media)
out := make(chan types.Item)
for _, processor := range p.processors {
wg.Add(1)
go func(_ context.Context, f func(<-chan types.Media, chan<- types.Media), in <-chan types.Media, out chan<- types.Media) {
go func(_ context.Context, f func(<-chan types.Item, chan<- types.Item), in <-chan types.Item, out chan<- types.Item) {
defer wg.Done()
f(in, out)
close(out)
}(ctx, processor.Process, in, out)
in = out
out = make(chan types.Media)
out = make(chan types.Item)
}
wg.Wait()
log.Debug("pipeline: processors finished")
}

func (p *Pipeline) runOutputs(ctx context.Context, source chan types.Media) {
sinks := []chan<- types.Media{}
func (p *Pipeline) runOutputs(ctx context.Context, source chan types.Item) {
sinks := []chan<- types.Item{}
var wg sync.WaitGroup
for _, output := range p.outputs {
wg.Add(1)
out := make(chan types.Media)
go func(_ context.Context, f func(<-chan types.Media), in <-chan types.Media) {
out := make(chan types.Item)
go func(_ context.Context, f func(<-chan types.Item), in <-chan types.Item) {
defer wg.Done()
f(in)
}(ctx, output.Receive, out)
sinks = append(sinks, out)
}

wg.Add(1)
go func(ctx context.Context, in <-chan types.Media, outs []chan<- types.Media) {
go func(ctx context.Context, in <-chan types.Item, outs []chan<- types.Item) {
var wgOut sync.WaitGroup
defer wg.Done()
for m := range in {
for _, out := range outs {
wgOut.Add(1)
go func(_ context.Context, i types.Media, o chan<- types.Media) {
go func(_ context.Context, i types.Item, o chan<- types.Item) {
defer wgOut.Done()
o <- i
}(ctx, m, out)
Expand Down Expand Up @@ -125,11 +125,11 @@ func (p *Pipeline) Run(ctx context.Context) error {
log.Debug("running pipeline")

var wg sync.WaitGroup
in := make(chan types.Media, p.Buffer)
out := make(chan types.Media, p.Buffer)
in := make(chan types.Item, p.Buffer)
out := make(chan types.Item, p.Buffer)

wg.Add(1)
go func(ctx context.Context, sink chan types.Media) {
go func(ctx context.Context, sink chan types.Item) {
log.Trace("pipeline: executing input thread")
defer wg.Done()
p.runInputs(ctx, sink)
Expand All @@ -138,7 +138,7 @@ func (p *Pipeline) Run(ctx context.Context) error {
}(ctx, in)

wg.Add(1)
go func(ctx context.Context, source, sink chan types.Media) {
go func(ctx context.Context, source, sink chan types.Item) {
log.Trace("pipeline: executing processor thread")
defer wg.Done()
p.runProcessors(ctx, source, sink)
Expand All @@ -147,7 +147,7 @@ func (p *Pipeline) Run(ctx context.Context) error {
}(ctx, in, out)

wg.Add(1)
go func(ctx context.Context, source chan types.Media) {
go func(ctx context.Context, source chan types.Item) {
log.Trace("pipeline: executing output thread")
defer wg.Done()
p.runOutputs(ctx, source)
Expand Down
54 changes: 54 additions & 0 deletions internal/plugin/output/deleter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright © 2020 The Pachinko Authors
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
package output

import (
"container/list"
"context"
"os"

"github.com/rbtr/pachinko/plugin/output"
"github.com/rbtr/pachinko/types"
log "github.com/sirupsen/logrus"
)

// Deleter is a deleter output used to clean up chaff
type Deleter struct {
DryRun bool
}

func (*Deleter) Init(context.Context) error {
return nil
}

// Receive implements the Plugin interface on the Deleter
func (d *Deleter) Receive(c <-chan types.Item) {
log.Trace("started deleter output")
q := list.New()
for m := range c {
log.Infof("deleter_output: received_input %#v", m)
if m.Delete {
log.Infof("deleter_output: queueing %s", m.SourcePath)
q.PushBack(m)
}
}
for e := q.Front(); e != nil; e = e.Next() {
i := (e.Value).(types.Item)
log.Infof("deleter_output: deleting %s", i.SourcePath)
if d.DryRun {
continue
}
if err := os.Remove(i.SourcePath); err != nil {
log.Error(err)
}
}
}

func NewDeleter(dryRun bool) output.Output {
return &Deleter{DryRun: dryRun}
}
50 changes: 12 additions & 38 deletions internal/plugin/processor/pre/categorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,11 @@ import (
)

var defaultCategoryFileExtensions = map[types.Category][]string{
types.Archive: {
"7z",
"gz",
"gzip",
"rar",
"tar",
"zip",
},
types.Image: {
"bmp",
"gif",
"heic",
"jpeg",
"jpg",
"png",
"tiff",
},
types.Subtitle: {
"srt",
"sub",
},
types.Text: {
"info",
"nfo",
"txt",
"website",
},
types.Video: {
"avi",
"divx",
"m4v",
"mkv",
"mov",
"mp4",
"xvid",
},
types.Archive: types.ArchiveExtensions,
types.Image: types.ImageExtensions,
types.Subtitle: types.SubtitleExtensions,
types.Text: types.TextExtensions,
types.Video: types.VideoExtensions,
}

type FileCategorizer struct {
Expand All @@ -78,7 +47,12 @@ func (cat *FileCategorizer) Init(context.Context) error {
return nil
}

func (cat *FileCategorizer) identify(m types.Media) types.Media {
func (cat *FileCategorizer) identify(m types.Item) types.Item {
// don't attempt to categorize directories
if m.FileType == types.Directory {
return m
}

ext := path.Ext(m.SourcePath)

category := types.Unknown
Expand All @@ -96,7 +70,7 @@ func (cat *FileCategorizer) identify(m types.Media) types.Media {
return m
}

func (cat *FileCategorizer) Process(in <-chan types.Media, out chan<- types.Media) {
func (cat *FileCategorizer) Process(in <-chan types.Item, out chan<- types.Item) {
log.Trace("started categorizer")
for m := range in {
log.Debugf("categorizer: received input: %v", m)
Expand Down
Loading

0 comments on commit 5d7ff0d

Please sign in to comment.