-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel][Metrics][PR#6] Support TransactionReport to log metrics for a Transaction operation #4037
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Left some comments and asked some questions. Will review the rest of the tests later
import java.util.Optional; | ||
|
||
/** Stores the metrics results for a {@link SnapshotReport} */ | ||
@JsonPropertyOrder({"timestampToVersionResolutionDurationNs", "loadInitialDeltaActionsDurationNs"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you tell me about this? (As a reply to this comment; don't need you to add anything to the code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found in some test runs the order the fields were serialized in was not deterministic so added this.
kernel/kernel-api/src/main/java/io/delta/kernel/metrics/TransactionMetricsResult.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/metrics/TransactionMetricsResult.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/metrics/TransactionReport.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/TransactionReportImpl.java
Show resolved
Hide resolved
"Expected a successful SnapshotReport but missing version"); | ||
this.snapshotVersion = requireNonNull(snapshotReport).getVersion().get(); | ||
if (snapshotVersion < 0) { | ||
// For a new table, no Snapshot is actually loaded and thus no SnapshotReport is emitted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, then -- what is the value of snapshotReport
for a new table that is passed into this constructor? does it just have a garbage reportUUID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we had to instantiate some sort of report to satisfy the APIs but the report that is passed in was never "emitted" to the engine
kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/TransactionMetrics.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
def resolvePath(path: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this only being used once. Could we just have the underlying test invoke the defaultEngine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used once in both test suites, and presumably will be used by any future metric report suites
* Wraps an {@link Engine} to implement the metrics reporter such that it appends any reports | ||
* to the provided in memory buffer. | ||
*/ | ||
class EngineWithInMemoryMetricsReporter(buf: ArrayBuffer[MetricsReport], baseEngine: Engine) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does it take in the buffer to which we want to append the metrics?
would it ever take in a buffer that is non-empty? all the usages I see use val reports = ArrayBuffer.empty[MetricsReport]
do our engine APIs not make it simple enough to get the metrics reports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this implementation could create the empty buffer itself, but we would still need it to be externally accessible so we can access the reports. I don't think there's much value added by doing this and I think it would actually make collectMetricsReports
a bit more complex.
do our engine APIs not make it simple enough to get the metrics reports?
I think this is pretty simple :) we don't provide any out-of-the-box functionality to buffer the reports in memory thus we need to implement an "in memory metrics reporter". We add this to the existing defaultEngine reporter such that we have both this one, and the default logging reporter so we do log the reports in the test suite as well.
Which Delta project/connector is this regarding?
Description
Adds
TransactionReport
for reporting an attempted or successful transaction.We record
TransactionReport
either after successfully committing the transaction or if an exception is thrown during the commit attempt. We only record a report for failed commit attempts. If there are failures elsewhere, such as while writing the data, they will not be reported.We also add support for serializing
TransactionReport
s in this PR.How was this patch tested?
Adds unit tests.
Does this PR introduce any user-facing changes?
No.