Skip to content

Commit

Permalink
Merge pull request #65 from ivanilves/issue/63/refactor
Browse files Browse the repository at this point in the history
Renamed `--pull-images` to `--pull` + more concurrency
  • Loading branch information
ivanilves committed Oct 1, 2017
2 parents 3cf3ff9 + bceea0e commit 1267e0d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 74 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ You could use `lstags`, if you ...
## How do I use it myself?
I run `lstags` inside a Cron Job on my Kubernetes worker nodes to poll my own Docker registry for a new [stable] images.
```
lstags --pull-images -u myuser -p mypass registry.ivanilves.local/tools/sicario~/v1\\.[0-9]+$/
lstags --pull -u myuser -p mypass registry.ivanilves.local/tools/sicario~/v1\\.[0-9]+$/
```
... and following cronjob runs on my CI server to ensure I always have latest Ubuntu 14.04 and 16.04 images to play with:
```
lstags --pull-images ubuntu~/^1[46]\\.04$/"
lstags --pull ubuntu~/^1[46]\\.04$/"
```
My CI server is connected over crappy Internet link and pulling images in advance makes `docker run` much faster. :wink:

Expand Down
162 changes: 100 additions & 62 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ import (
)

type options struct {
DefaultRegistry string `short:"r" long:"default-registry" default:"registry.hub.docker.com" description:"Docker registry to use by default" env:"DEFAULT_REGISTRY"`
Username string `short:"u" long:"username" default:"" description:"Docker registry username" env:"USERNAME"`
Password string `short:"p" long:"password" default:"" description:"Docker registry password" env:"PASSWORD"`
DockerJSON string `shord:"j" long:"docker-json" default:"~/.docker/config.json" env:"DOCKER_JSON"`
Concurrency int `short:"c" long:"concurrency" default:"32" description:"Concurrent request limit while querying registry" env:"CONCURRENCY"`
PullImages bool `short:"P" long:"pull-images" description:"Pull images matched by filter" env:"PULL_IMAGES"`
InsecureRegistry bool `short:"i" long:"insecure-registry" description:"Use insecure plain-HTTP registriy" env:"INSECURE_REGISTRY"`
TraceRequests bool `short:"T" long:"trace-requests" description:"Trace registry HTTP requests" env:"TRACE_REQUESTS"`
Version bool `short:"V" long:"version" description:"Show version and exit"`
Positional struct {
DefaultRegistry string `short:"r" long:"default-registry" default:"registry.hub.docker.com" description:"Docker registry to use by default" env:"DEFAULT_REGISTRY"`
Username string `short:"u" long:"username" default:"" description:"Docker registry username" env:"USERNAME"`
Password string `short:"p" long:"password" default:"" description:"Docker registry password" env:"PASSWORD"`
DockerJSON string `shord:"j" long:"docker-json" default:"~/.docker/config.json" env:"DOCKER_JSON"`
ConcurrentRequests int `short:"c" long:"concurrent-requests" default:"32" description:"Limit of concurrent requests to the registry" env:"CONCURRENT_REQUESTS"`
Pull bool `short:"P" long:"pull" description:"Pull images matched by filter" env:"PULL"`
InsecureRegistry bool `short:"i" long:"insecure-registry" description:"Use insecure plain-HTTP registriy" env:"INSECURE_REGISTRY"`
TraceRequests bool `short:"T" long:"trace-requests" description:"Trace registry HTTP requests" env:"TRACE_REQUESTS"`
Version bool `short:"V" long:"version" description:"Show version and exit"`
Positional struct {
Repositories []string `positional-arg-name:"REPO1 REPO2" description:"Docker repositories to operate on"`
} `positional-args:"yes"`
}
Expand Down Expand Up @@ -149,79 +149,117 @@ func main() {
const format = "%-12s %-45s %-15s %-25s %s\n"
fmt.Printf(format, "<STATE>", "<DIGEST>", "<(local) ID>", "<Created At>", "<TAG>")

allTags := make([]*tag.Tag, 0)
lsRepos := make([]string, 0)
repoCount := len(o.Positional.Repositories)

type tagResult struct {
Tags []*tag.Tag
Repo string
}

trc := make(chan tagResult, repoCount)

for _, r := range o.Positional.Repositories {
repository, filter, err := trimFilter(r)
if err != nil {
suicide(err)
}
go func(r string, o options, trc chan tagResult) {
repository, filter, err := trimFilter(r)
if err != nil {
suicide(err)
}

registryName := getRegistryName(repository, o.DefaultRegistry)
registryName := getRegistryName(repository, o.DefaultRegistry)

repoRegistryName := registry.FormatRepoName(repository, registryName)
repoLocalName := local.FormatRepoName(repository, registryName)
repoRegistryName := registry.FormatRepoName(repository, registryName)
repoLocalName := local.FormatRepoName(repository, registryName)

username, password, err := assignCredentials(registryName, o.Username, o.Password, o.DockerJSON)
if err != nil {
suicide(err)
}
username, password, err := assignCredentials(registryName, o.Username, o.Password, o.DockerJSON)
if err != nil {
suicide(err)
}

tresp, err := auth.NewToken(registryName, repoRegistryName, username, password)
if err != nil {
suicide(err)
}
tresp, err := auth.NewToken(registryName, repoRegistryName, username, password)
if err != nil {
suicide(err)
}

authorization := getAuthorization(tresp)
authorization := getAuthorization(tresp)

registryTags, err := registry.FetchTags(registryName, repoRegistryName, authorization, o.Concurrency)
if err != nil {
suicide(err)
}
localTags, err := local.FetchTags(repoLocalName)
if err != nil {
suicide(err)
}
registryTags, err := registry.FetchTags(registryName, repoRegistryName, authorization, o.ConcurrentRequests)
if err != nil {
suicide(err)
}
localTags, err := local.FetchTags(repoLocalName)
if err != nil {
suicide(err)
}

sortedKeys, names, joinedTags := tag.Join(registryTags, localTags)

sortedKeys, names, joinedTags := tag.Join(registryTags, localTags)
tags := make([]*tag.Tag, 0)
for _, key := range sortedKeys {
name := names[key]

for _, key := range sortedKeys {
name := names[key]
tg := joinedTags[name]

tg := joinedTags[name]
if !matchesFilter(tg.GetName(), filter) {
continue
}

if !matchesFilter(tg.GetName(), filter) {
continue
tags = append(tags, tg)
}

allTags = append(allTags, tg)
lsRepos = append(lsRepos, repoLocalName)
}
trc <- tagResult{Tags: tags, Repo: repoLocalName}
}(r, o, trc)
}

for i, tg := range allTags {
fmt.Printf(
format,
tg.GetState(),
tg.GetShortDigest(),
tg.GetImageID(),
tg.GetCreatedString(),
lsRepos[i]+":"+tg.GetName(),
)
tagResults := make([]tagResult, repoCount)
repoNumber := 0
for tr := range trc {
repoNumber++
tagResults = append(tagResults, tr)
if repoNumber >= repoCount {
close(trc)
}
}

for _, tr := range tagResults {
for _, tg := range tr.Tags {
fmt.Printf(
format,
tg.GetState(),
tg.GetShortDigest(),
tg.GetImageID(),
tg.GetCreatedString(),
tr.Repo+":"+tg.GetName(),
)
}
}

if o.PullImages {
for i, tg := range allTags {
if tg.NeedsPull() {
ref := lsRepos[i] + ":" + tg.GetName()
if o.Pull {
done := make(chan bool, repoCount)

for _, tr := range tagResults {
go func(tags []*tag.Tag, repo string, done chan bool) {
for _, tg := range tags {
if tg.NeedsPull() {
ref := repo + ":" + tg.GetName()

fmt.Printf("PULLING %s\n", ref)
err := local.Pull(ref)
if err != nil {
suicide(err)
}
}

fmt.Printf("PULLING: %s\n", ref)
err := local.PullImage(ref)
if err != nil {
suicide(err)
done <- true
}
}(tr.Tags, tr.Repo, done)
}

repoNumber := 0
for range done {
repoNumber++

if repoNumber >= repoCount {
close(done)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tag/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func FormatRepoName(repository, registry string) string {
return registry + "/" + repository
}

// PullImage pulls Docker image specified locally
func PullImage(ref string) error {
// Pull pulls Docker image specified locally
func Pull(ref string) error {
cli, err := newClient()
if err != nil {
return err
Expand Down
16 changes: 8 additions & 8 deletions tag/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,19 @@ type detailResponse struct {
Error error
}

func validateConcurrency(concurrency int) (int, error) {
func validateConcurrentRequests(concurrentRequests int) (int, error) {
const min = 1
const max = 128

if concurrency < min {
return 0, errors.New("Concurrency could not be lower than " + strconv.Itoa(min))
if concurrentRequests < min {
return 0, errors.New("Concurrent requests limit could not be lower than " + strconv.Itoa(min))
}

if concurrency > max {
return 0, errors.New("Concurrency could not be higher than " + strconv.Itoa(max))
if concurrentRequests > max {
return 0, errors.New("Concurrent requests limit could not be higher than " + strconv.Itoa(max))
}

return concurrency, nil
return concurrentRequests, nil
}

func calculateBatchSteps(count, limit int) (int, int) {
Expand All @@ -263,8 +263,8 @@ func calculateBatchStepSize(stepNumber, stepsTotal, remain, limit int) int {
}

// FetchTags looks up Docker repo tags present on remote Docker registry
func FetchTags(registry, repo, authorization string, concurrency int) (map[string]*tag.Tag, error) {
batchLimit, err := validateConcurrency(concurrency)
func FetchTags(registry, repo, authorization string, concurrentRequests int) (map[string]*tag.Tag, error) {
batchLimit, err := validateConcurrentRequests(concurrentRequests)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 1267e0d

Please sign in to comment.