Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't commit if commit has transaction identifier that has been committed already #2580

Open
vegarsti opened this issue Jun 7, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@vegarsti
Copy link
Contributor

vegarsti commented Jun 7, 2024

I'm not sure if this is a bug or a feature request. It might be that I'm using the library incorrectly or misunderstanding how this should work!

I want to do idempotent commits, using transaction identifiers. I have seen that there has been added some support for this recently, e.g. 88ea110, 54cfa06.

What I'm seeing is that if I make multiple commits with the same app_transaction (same ID and version), they will all be written. And same with any other combination. Is that the correct behavior?
This commit seems to me to indicate that delta-rs should be able to only keep the latest version per txn app ID, so maybe I'm doing something wrong? 54cfa06

For comparison, Databricks SQL will only commit the above scenario once.

In case I am onto something, here is a branch I've pushed with a test that fails on main, but which passes with my modification: It writes two commits where both have the same app ID + version. We verify that the second write did not cause a new active file. vegarsti@4e52f4f

There's some nuance here, though: it's allowed for a commit to have multiple app transactions. I have no idea how to handle that, please advise 😅

Also, if the commit fails because of a concurrent update, I would also need to do the same check after the snapshot update.

@vegarsti vegarsti added the enhancement New feature or request label Jun 7, 2024
@delta-io delta-io deleted a comment Jun 8, 2024
@Blajda
Copy link
Collaborator

Blajda commented Jun 8, 2024

If the commits are performed in series then the behavior you are experiencing is expected. See the below quote from the Delta protocol.

The semantics of the application-specific version are left up to the external system. Delta only ensures that the latest version for a given appId is available in the table snapshot. The Delta transaction protocol does not, for example, assume monotonicity of the version and it would be valid for the version to decrease, possibly representing a "rollback" of an earlier transaction.

I don't think the current changes in your branch should be merged into delta-rs since it's application specific.
The enforcement of monotonicity should be either performed in application code. E.G in this case you write a wrapper around operations for your application needs.

An alternative is to generalize your changes in transaction/mod.rs to instead use some hook interface and then have a specific monotonicity hook implementation that others can use. If the hook approach is decided then maybe a new custom delta-rs config can be added e.g delta-rs.transaction-id that would then be used. Just need to be careful here since there might be a few foot traps. (e.g rollbacks)

@vegarsti
Copy link
Contributor Author

vegarsti commented Jun 8, 2024

Thanks for your answer @Blajda!

I don't think the current changes in your branch should be merged into delta-rs since it's application specific. The enforcement of monotonicity should be either performed in application code. E.G in this case you write a wrapper around operations for your application needs.

I see! Given what the protocol document says, I understand that we can't discard versions with lower versions.

What I'm worried about with a wrapper is that it's likely a race condition, since it's outside the write and thus outside the commit retry logic. But I guess that's hard to avoid!

An alternative is to generalize your changes in transaction/mod.rs to instead use some hook interface and then have a specific monotonicity hook implementation that others can use. If the hook approach is decided then maybe a new custom delta-rs config can be added e.g delta-rs.transaction-id that would then be used. Just need to be careful here since there might be a few foot traps. (e.g rollbacks)

That sounds like a good idea! I see you mention it here as well #2130. Is there any scaffolding for something like that already? Would it be similar to the post commit hooks, but as a pre commit hook?

To clarify: PR #2539 which was merged, what that does is all the work of inserting and keeping track of the identifiers, but there is no reconciliation yet, correct? I was a bit confused by "fix: only keep latest txn per app"

@vegarsti
Copy link
Contributor Author

vegarsti commented Jun 9, 2024

I found the check_for_updated_application_transaction_ids_that_current_txn_depends_on function, which does what I expect!

/// Checks if the winning transaction corresponds to some AppId on which
/// current transaction also depends.
fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
&self,
) -> Result<(), CommitConflictError> {
// Fail if the appIds seen by the current transaction has been updated by the winning
// transaction i.e. the winning transaction have [Txn] corresponding to
// some appId on which current transaction depends on. Example - This can happen when
// multiple instances of the same streaming query are running at the same time.
let winning_txns = self.winning_commit_summary.app_level_transactions();
let txn_overlap: HashSet<&String> = winning_txns
.intersection(&self.txn_info.read_app_ids)
.collect();
if !txn_overlap.is_empty() {
Err(CommitConflictError::ConcurrentTransaction)
} else {
Ok(())
}
}
}

But looks like it would only kick in if there's a commit conflict, i.e. not if they are subsequent, and commit 2 only already has read commit 1.

match conflict_checker.check_conflicts() {
Ok(_) => {
attempt_number += 1;
}
Err(err) => {
this.log_store
.abort_commit_entry(version, tmp_commit)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};

test_app_txn_conflict shows that it fails in that case:

async fn test_app_txn_conflict() {
// A conflict must be raised whenever the same application id is used for two concurrent transactions

In summary, my understanding of how this behaves:

  • One of the two commits will fail if they are concurrent
  • If they are serialized, I need to check the transaction IDs outside and abort early. I guess this could still be added as a hook like you suggested?

Those two scenarios should cover it, though!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants