-
Notifications
You must be signed in to change notification settings - Fork 846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sftp refactor #3073
base: main
Are you sure you want to change the base?
Sftp refactor #3073
Conversation
This commit reduces the scope of critical sections guarded by scannerMut to remove a deadlock that causes the last file to not be deleted when the SFTP input is used with watching enabled.
`(*watcherPathProvider).Next()` currently uses recursion to loop until a path is found. This commit refactors that function to use a for loop instead which is more straight forward to read.
This integration test makes sure that when `delete_on_finish` is true and watching is enabled that we delete every file.
Before this commit, when a file was exuasted the `ReadBatch` method returned ErrNotConnected which cause the engine to call `Connect` again. Aside from being awkward, this causes the connection status to incorrectly be reported as disconnected during normal operation. This commit moves the logic to advance to the next file when the current file is exhuasted into a the ReadBatch method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ooesili, thanks for bearing with me for a review on this! Really awesome job man 🏆 This does make it easier to go through the code. I think I saw a few potential issues, but should be good otherwise.
I think @rockwotj also left a few comments in #3037. Please have a look and then I'm happy to merge both PRs.
s.scannerMut.Unlock() | ||
_ = s.scanner.Close(ctx) | ||
s.scanner = nil | ||
s.currentPath = "" | ||
if errors.Is(err, io.EOF) { | ||
err = service.ErrNotConnected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this returns service.ErrNotConnected
here, it looks like ReadBatch()
ignores it because it only checks for sftp.ErrSSHFxConnectionLost
. Not sure if this is intentional.
return nil, nil, service.ErrNotConnected | ||
} | ||
return parts, codecAckFn, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also service.ErrEndOfInput
and other errors returned from initScanner()
which seem to be swallowed here.
|
||
var errEndOfPaths = errors.New("end of paths") | ||
func (s *sftpReader) closeClient() { | ||
if s.client == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's an issue, but when s.closed == true
, can we get multiple concurrent calls to this closeClient()
method via the initClient()
cleanup
which is called in the ackFn callback. If yes, I'm thinking that one goroutine could close the client while the other one is just preparing to remove the file.
files, err := client.Glob("/upload/*.txt") | ||
assert.NoError(c, err) | ||
assert.Empty(c, files) | ||
}, time.Second, time.Millisecond*100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One second may be a wee bit too short for the test when using -race
. Something like 3 seconds should do.
Before this commit, when a file was exuasted the
ReadBatch
methodreturned ErrNotConnected which cause the engine to call
Connect
again.Aside from being awkward, this causes the connection status to
incorrectly be reported as disconnected during normal operation.
This commit moves the logic to advance to the next file when the current
file is exhuasted into a the ReadBatch method.
Builds on top of #2435