Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,23 @@ jobs:
CGO_CFLAGS: "-Wno-return-local-addr"

integration_tests:
name: integration tests (snapshotter ${{ matrix.snapshotter && 'enabled' || 'disabled' }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# Run with and without the containerd snapshotter to cover both OCI and
# Docker v2 manifest formats.
snapshotter: [true, false]
steps:
- uses: actions/checkout@v4
- uses: docker/setup-docker-action@v4
with:
version: latest
# Disable containerd snapshotter which defaults to using OCI manifests,
# which are currently not supported by Kraken.
# TODO(thijmv): Remove this override once OCI manifests are supported.
daemon-config: |
{
"features": {
"containerd-snapshotter": false
"containerd-snapshotter": ${{ matrix.snapshotter }}
}
}
- uses: actions/setup-python@v5
Expand Down
13 changes: 9 additions & 4 deletions build-index/tagstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,17 @@ func (s *tagStore) resolveFromBackend(tag string) (core.Digest, error) {
}
var b bytes.Buffer
if err := backendClient.Download(tag, tag, &b); err != nil {
if err == backenderrors.ErrBlobNotFound {
if errors.Is(err, backenderrors.ErrBlobNotFound) {
log.With("tag", tag).Debug("Tag not found in backend")
return core.Digest{}, ErrTagNotFound
} else {
// Kraken is expected to accept image pushes even when remote storage is
// down by storing the blob on disk and flushing it to the remote storage
// once it becomes available again. When we experience a backend error,
// we return 404, instead of 500, as the latter would abort Docker pushes
// that HEAD before PUT (e.g. containerd snapshotter).
log.With("tag", tag, "error", err).Error("Failed to download tag from backend")
}
log.With("tag", tag).Errorf("Failed to download tag from backend: %s", err)
return core.Digest{}, fmt.Errorf("backend client: %s", err)
return core.Digest{}, ErrTagNotFound
}
d, err := core.ParseSHA256Digest(b.String())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion build-index/tagstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestGetFromBackendUnkownError(t *testing.T) {
mocks.backendClient.EXPECT().Download(tag, tag, w).Return(fmt.Errorf("test error"))

_, err := store.Get(tag)
require.Error(err)
require.Equal(ErrTagNotFound, err)
}

func TestGetFromBackendInvalidValue(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion nginx/config/build-index.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ server {
proxy_cache tags;
proxy_cache_methods GET;
proxy_cache_valid 200 5m;
proxy_cache_valid any 1s;
proxy_cache_lock on;

proxy_read_timeout {{if .proxy_read_timeout}}{{.proxy_read_timeout}}{{else}}3m{{end}};
Expand Down
68 changes: 32 additions & 36 deletions proxy/proxyserver/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (

"github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/manifest/schema2"
"github.com/uber-go/tally"
"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/core"
"github.com/uber/kraken/origin/blobclient"
"github.com/uber/kraken/utils/dockerutil"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/log"
"go.uber.org/zap"
Expand Down Expand Up @@ -326,59 +326,55 @@ func (ph *PrefetchHandler) shouldSkipPrefetch(b blobInfo, logger *zap.SugaredLog
return false
}

// processManifest handles both ManifestLists and single Manifests.
// processManifest parses the manifest and returns all blob infos, including
// the manifest blob itself.
func (ph *PrefetchHandler) processManifest(logger *zap.SugaredLogger, namespace string, manifestBytes []byte) ([]blobInfo, error) {
// Attempt to process as a manifest list.
blobs, err := ph.tryProcessManifestList(logger, namespace, manifestBytes)
if err == nil && len(blobs) > 0 {
return blobs, nil
manifest, digest, err := dockerutil.ParseManifest(bytes.NewReader(manifestBytes))
Comment thread
thijmv marked this conversation as resolved.
if err != nil {
return nil, fmt.Errorf("parse manifest: %w", err)
}

// Fallback to single manifest.
var manifest schema2.Manifest
if err := json.NewDecoder(bytes.NewReader(manifestBytes)).Decode(&manifest); err != nil {
logger.With("namespace", namespace).Errorf("Failed to parse single manifest: %v", err)
return nil, fmt.Errorf("invalid single manifest: %w", err)
self := blobInfo{digest: digest, size: int64(len(manifestBytes))}
blobs, err := ph.getBlobInfos(logger, namespace, manifest)
if err != nil {
return nil, err
}
return ph.processLayers(manifest.Layers)
return append([]blobInfo{self}, blobs...), nil
Comment thread
thijmv marked this conversation as resolved.
}

// tryProcessManifestList attempts to decode a manifest list.
func (ph *PrefetchHandler) tryProcessManifestList(logger *zap.SugaredLogger, namespace string, manifestBytes []byte) ([]blobInfo, error) {
var manifestList manifestlist.ManifestList
if err := json.NewDecoder(bytes.NewReader(manifestBytes)).Decode(&manifestList); err != nil || len(manifestList.Manifests) == 0 {
return nil, fmt.Errorf("not a valid manifest list")
// getBlobInfos returns all blob infos for a manifest. For manifest lists and
// OCI indexes, it also descends one level to collect sub-manifest blob infos.
func (ph *PrefetchHandler) getBlobInfos(logger *zap.SugaredLogger, namespace string, manifest distribution.Manifest) ([]blobInfo, error) {
ml, ok := manifest.(*manifestlist.DeserializedManifestList)
if !ok {
return ph.processLayers(manifest.References())
}
logger.With("namespace", namespace).Info("Processing manifest list")
return ph.processManifestList(logger, namespace, manifestList)
}

// processManifestList processes a manifest list.
func (ph *PrefetchHandler) processManifestList(logger *zap.SugaredLogger, namespace string, manifestList manifestlist.ManifestList) ([]blobInfo, error) {
var allBlobs []blobInfo
for _, descriptor := range manifestList.Manifests {
manifestDigestHex := descriptor.Digest.Hex()
digest, err := core.NewSHA256DigestFromHex(manifestDigestHex)
for _, subDesc := range ml.Manifests {
subDigest, err := core.NewSHA256DigestFromHex(subDesc.Digest.Hex())
if err != nil {
return nil, fmt.Errorf("failed to parse manifest digest %s: %w", manifestDigestHex, err)
return nil, fmt.Errorf("parse sub-manifest digest: %w", err)
}

buf := &bytes.Buffer{}
startTime := time.Now()
if err := ph.clusterClient.DownloadBlob(context.Background(), namespace, digest, buf); err != nil {
if err := ph.clusterClient.DownloadBlob(context.Background(), namespace, subDigest, buf); err != nil {
ph.metrics.Counter("download_manifest_error").Inc(1)
logger.With("error", err).Error("Failed to download manifest blob")
continue
logger.With("error", err).Error("Failed to download sub-manifest blob")
return nil, fmt.Errorf("download sub-manifest %s: %w", subDigest, err)
}
ph.getManifestLatency.RecordDuration(time.Since(startTime))
var manifest schema2.Manifest
if err := json.NewDecoder(buf).Decode(&manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest: %w", err)

subManifest, _, err := dockerutil.ParseManifest(buf)
if err != nil {
return nil, fmt.Errorf("parse sub-manifest %s: %w", subDigest, err)
}
blobs, err := ph.processLayers(manifest.Layers)
subBlobs, err := ph.processLayers(subManifest.References())
if err != nil {
return nil, err
return nil, fmt.Errorf("get blob infos for sub-manifest %s: %w", subDigest, err)
}
allBlobs = append(allBlobs, blobs...)
allBlobs = append(allBlobs, blobInfo{digest: subDigest, size: subDesc.Size})
allBlobs = append(allBlobs, subBlobs...)
}
return allBlobs, nil
}
Expand Down
4 changes: 3 additions & 1 deletion proxy/proxyserver/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"github.com/uber/kraken/utils/log"
)

var _manifestRegexp = regexp.MustCompile(`^application/vnd.docker.distribution.manifest.v\d\+(json|prettyjws)`)
var _manifestRegexp = regexp.MustCompile(
`^application/vnd\.(docker\.distribution\.manifest\.v\d\+(json|prettyjws)|oci\.image\.manifest\.v1\+json)`,
)

// PreheatHandler defines the handler of preheat.
type PreheatHandler struct {
Expand Down
112 changes: 112 additions & 0 deletions proxy/proxyserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func TestPrefetchV1(t *testing.T) {
tagRequest := url.QueryEscape(fmt.Sprintf("%s/%s", namespace, tag))
mocks.tagClient.EXPECT().Get(tagRequest).Return(manifest, nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, manifest, mockutil.MatchWriter(bs)).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, manifest, io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, layers[0], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, layers[1], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, layers[2], io.Discard).Return(nil)
_, err = httputil.Post(
Expand All @@ -216,6 +218,52 @@ func TestPrefetchV1(t *testing.T) {
require.NoError(err)
}

func TestPrefetchV1WithManifestList(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

addr := mocks.startServer()

repo := "kraken-test"
namespace := "preheat"
tag := "abcdef:v1.0.0"

m1Refs := core.DigestListFixture(3)
m2Refs := core.DigestListFixture(3)
m1Digest, m1Bytes := dockerutil.ManifestFixture(m1Refs[0], m1Refs[1], m1Refs[2])
m2Digest, m2Bytes := dockerutil.ManifestFixture(m2Refs[0], m2Refs[1], m2Refs[2])
mlDigest, mlBytes := dockerutil.ManifestListFixture(m1Digest, m2Digest)

b, err := json.Marshal(prefetchBody{
Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag),
TraceId: "abc",
})
require.NoError(err)

tagRequest := url.QueryEscape(fmt.Sprintf("%s/%s", namespace, tag))
mocks.tagClient.EXPECT().Get(tagRequest).Return(mlDigest, nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, mlDigest, mockutil.MatchWriter(mlBytes)).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m1Digest, mockutil.MatchWriter(m1Bytes)).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m2Digest, mockutil.MatchWriter(m2Bytes)).Return(nil)

mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, mlDigest, io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m1Digest, io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m2Digest, io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m1Refs[0], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m1Refs[1], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m1Refs[2], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m2Refs[0], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m2Refs[1], io.Discard).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m2Refs[2], io.Discard).Return(nil)

_, err = httputil.Post(
fmt.Sprintf("http://%s/proxy/v1/registry/prefetch", addr),
httputil.SendBody(bytes.NewReader(b)))
require.NoError(err)
}

func TestPrefetchV2(t *testing.T) {
require := require.New(t)

Expand All @@ -241,6 +289,8 @@ func TestPrefetchV2(t *testing.T) {
mocks.tagClient.EXPECT().Get(tagRequest).Return(manifest, nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, manifest, mockutil.MatchWriter(bs)).Return(nil)

mocks.originClient.EXPECT().PrefetchBlob(namespace, manifest).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[0]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[1]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[2]).Return(nil)
res, err := httputil.Post(
Expand All @@ -263,6 +313,66 @@ func TestPrefetchV2(t *testing.T) {
}, resBody)
}

func TestPrefetchV2WithOCIIndex(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

addr := mocks.startServer()

repo := "kraken-test"
namespace := "preheat"
tag := "abcdef:v1.0.0"

m1Refs := core.DigestListFixture(3)
m2Refs := core.DigestListFixture(3)
m1Digest, m1Bytes := dockerutil.OCIManifestFixture(m1Refs[0], m1Refs[1], m1Refs[2])
m2Digest, m2Bytes := dockerutil.OCIManifestFixture(m2Refs[0], m2Refs[1], m2Refs[2])
mlDigest, mlBytes := dockerutil.OCIIndexFixture(m1Digest, m2Digest)

b, err := json.Marshal(prefetchBody{
Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag),
TraceId: "abc",
})
require.NoError(err)

tagRequest := url.QueryEscape(fmt.Sprintf("%s/%s", namespace, tag))
mocks.tagClient.EXPECT().Get(tagRequest).Return(mlDigest, nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, mlDigest, mockutil.MatchWriter(mlBytes)).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m1Digest, mockutil.MatchWriter(m1Bytes)).Return(nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, m2Digest, mockutil.MatchWriter(m2Bytes)).Return(nil)

mocks.originClient.EXPECT().PrefetchBlob(namespace, mlDigest).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m1Digest).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m2Digest).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m1Refs[0]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m1Refs[1]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m1Refs[2]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m2Refs[0]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m2Refs[1]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, m2Refs[2]).Return(nil)

res, err := httputil.Post(
fmt.Sprintf("http://%s/proxy/v2/registry/prefetch", addr),
httputil.SendBody(bytes.NewReader(b)))
require.NoError(err)

var resBody prefetchResponse
resBodyBytes, err := io.ReadAll(res.Body)
require.NoError(err)
err = json.Unmarshal(resBodyBytes, &resBody)
require.NoError(err)

require.Equal(prefetchResponse{
Message: "prefetching initiated successfully",
TraceId: "abc",
Status: "success",
Tag: "abcdef:v1.0.0",
Prefetched: true,
}, resBody)
}

func TestPrefetchV2OriginError(t *testing.T) {
require := require.New(t)

Expand All @@ -288,6 +398,8 @@ func TestPrefetchV2OriginError(t *testing.T) {
mocks.tagClient.EXPECT().Get(tagRequest).Return(manifest, nil)
mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, manifest, mockutil.MatchWriter(bs)).Return(nil)

mocks.originClient.EXPECT().PrefetchBlob(namespace, manifest).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[0]).Return(nil)
mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[1]).Return(errors.New("foo err"))
mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[2]).Return(nil)
_, err = httputil.Post(
Expand Down
2 changes: 1 addition & 1 deletion test/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _setup_test_image(name):
return new_name


TEST_IMAGE = _setup_test_image('alpine:latest') # todo: switch to latest after supporting oci format manifests
TEST_IMAGE = _setup_test_image('alpine:latest')
TEST_IMAGE_2 = _setup_test_image('redis:latest')


Expand Down
Loading
Loading