Compare commits
32 Commits
8df9ec11bd
...
feature/me
Author | SHA1 | Date | |
---|---|---|---|
dd56145418
|
|||
5f301ce32a
|
|||
4356588131
|
|||
1f286384c2
|
|||
d74cdfa143
|
|||
baae7a8d7e
|
|||
4da3293569
|
|||
4ee8043bb8
|
|||
c50547c2fb
|
|||
44323ef46b
|
|||
7d868cbd60
|
|||
11efe45f77
|
|||
f2c81dc9ac
|
|||
ecc308735c
|
|||
e0c93087ea
|
|||
e5726cf742
|
|||
170c5e0730
|
|||
81abb30846
|
|||
97ab34737d
|
|||
23358515b8
|
|||
1870a78fde
|
|||
fad6e517e5
|
|||
9eba0f9042
|
|||
8441f85f88
|
|||
42a6110e3c
|
|||
e9671cbe81
|
|||
662ce1b1a5
|
|||
8cfa0fb4d9
|
|||
2b85b7db66
|
|||
4544292114
|
|||
6eaeb601bd
|
|||
5e20237572
|
13
.drone.yml
13
.drone.yml
@@ -5,14 +5,19 @@ steps:
|
||||
- name: terraform plan
|
||||
image: alpine
|
||||
environment:
|
||||
HCLOUD_TOKEN:
|
||||
HCLOUD_TOKEN:
|
||||
from_secret: serverctl_hcloud_token
|
||||
ACCESS_KEY:
|
||||
from_secret: serverctl_access_key
|
||||
SECRET_KEY:
|
||||
from_secret: serverctl_secret_key
|
||||
SSH_ZIP_KEY:
|
||||
from_secret: serverctl_ssh_zip_key
|
||||
HCLOUD_SSH_KEY_ID:
|
||||
from_secret: serverctl_hcloud_ssh_key_id
|
||||
commands:
|
||||
- apk --update add curl
|
||||
- apk --update add curl zip ansible python3
|
||||
- cd infrastructure && ./unzip-ssh-keys.sh "$SSH_ZIP_KEY" && cd ..
|
||||
- curl --silent --output terraform.zip "https://releases.hashicorp.com/terraform/1.1.6/terraform_1.1.6_linux_amd64.zip"
|
||||
- unzip terraform.zip ; rm -f terraform.zip; chmod +x terraform
|
||||
- mkdir -p ${HOME}/bin ; export PATH=${PATH}:${HOME}/bin; mv terraform ${HOME}/bin/
|
||||
@@ -20,4 +25,6 @@ steps:
|
||||
- cd infrastructure/create-resources
|
||||
- terraform init -backend-config="access_key=$ACCESS_KEY" -backend-config="secret_key=$SECRET_KEY"
|
||||
- terraform validate
|
||||
- terraform plan -vars="hcloud_token=$HCLOUD_TOKEN"
|
||||
- terraform apply -auto-approve -var "hcloud_token=$HCLOUD_TOKEN" -var "pvt_key=../ssh_keys/id_ed25519" -var "pub_key=../ssh_keys/id_ed25519.pub" -var "hcloud_serverctl_ssh_key_id=$HCLOUD_SSH_KEY_ID"
|
||||
- cd ansible
|
||||
- ANSIBLE_HOST_KEY_CHECKING=False /usr/bin/ansible-playbook -u root --key-file '../../ssh_keys/id_ed25519' -e 'pub_key=../../ssh_keys/id_ed25519.pub' site.yml
|
||||
|
@@ -52,6 +52,25 @@ services:
|
||||
logging: *loki-logging
|
||||
depends_on:
|
||||
- db_migrator
|
||||
- rabbitmq
|
||||
|
||||
# Messaging
|
||||
rabbitmq:
|
||||
image: docker.io/bitnami/rabbitmq:latest
|
||||
ports:
|
||||
- '4369:4369'
|
||||
- '5551:5551'
|
||||
- '5552:5552'
|
||||
- '5672:5672'
|
||||
- '25672:25672'
|
||||
- '15672:15672'
|
||||
networks:
|
||||
- back-tier
|
||||
environment:
|
||||
- RABBITMQ_USERNAME=serverctl
|
||||
- RABBITMQ_PASSWORD=serverctlsecret
|
||||
volumes:
|
||||
- 'rabbitmq_data:/bitnami/rabbitmq/mnesia'
|
||||
|
||||
# Logging
|
||||
loki:
|
||||
@@ -114,3 +133,4 @@ volumes:
|
||||
db_data: {}
|
||||
prometheus_data: {}
|
||||
grafana_data: {}
|
||||
rabbitmq_data: {}
|
||||
|
1
infrastructure/.gitignore
vendored
Normal file
1
infrastructure/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
ssh_keys/
|
2
infrastructure/create-resources/.gitignore
vendored
2
infrastructure/create-resources/.gitignore
vendored
@@ -3,3 +3,5 @@
|
||||
.terraform.lock.hcl
|
||||
terraform.tfstate
|
||||
terraform.tfstate.backup
|
||||
secrets.txt
|
||||
.env
|
||||
|
9
infrastructure/create-resources/ansible/.yamllint
Normal file
9
infrastructure/create-resources/ansible/.yamllint
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
extends: default
|
||||
|
||||
rules:
|
||||
line-length:
|
||||
max: 120
|
||||
level: warning
|
||||
truthy:
|
||||
allowed-values: ['true', 'false', 'yes', 'no']
|
12
infrastructure/create-resources/ansible/ansible.cfg
Normal file
12
infrastructure/create-resources/ansible/ansible.cfg
Normal file
@@ -0,0 +1,12 @@
|
||||
[defaults]
|
||||
nocows = True
|
||||
roles_path = ./roles
|
||||
inventory = ./inventory/hosts.cfg
|
||||
|
||||
remote_tmp = $HOME/.ansible/tmp
|
||||
local_tmp = $HOME/.ansible/tmp
|
||||
pipelining = True
|
||||
become = True
|
||||
host_key_checking = False
|
||||
deprecation_warnings = True
|
||||
callback_whitelist = profile_tasks
|
@@ -0,0 +1,3 @@
|
||||
---
|
||||
collections:
|
||||
- name: community.general
|
@@ -0,0 +1,7 @@
|
||||
---
|
||||
k3s_version: v1.22.3+k3s1
|
||||
ansible_user: root
|
||||
systemd_dir: /etc/systemd/system
|
||||
master_ip: "{{ hostvars[groups['serverctl_master_hosts'][0]]['ansible_host'] | default(groups['serverctl_master_hosts'][0]) }}"
|
||||
extra_server_args: ""
|
||||
extra_agent_args: ""
|
@@ -0,0 +1,10 @@
|
||||
---
|
||||
- name: Download k3s binary x64
|
||||
get_url:
|
||||
url: https://github.com/k3s-io/k3s/releases/download/{{ k3s_version }}/k3s
|
||||
checksum: sha256:https://github.com/k3s-io/k3s/releases/download/{{ k3s_version }}/sha256sum-amd64.txt
|
||||
dest: /usr/local/bin/k3s
|
||||
owner: root
|
||||
group: root
|
||||
mode: 0755
|
||||
when: ansible_facts.architecture == "x86_64"
|
@@ -0,0 +1,2 @@
|
||||
---
|
||||
k3s_server_location: /var/lib/rancher/k3s
|
@@ -0,0 +1,78 @@
|
||||
---
|
||||
- name: Copy K3s service file
|
||||
register: k3s_service
|
||||
template:
|
||||
src: "k3s.service.j2"
|
||||
dest: "{{ systemd_dir }}/k3s.service"
|
||||
owner: root
|
||||
group: root
|
||||
mode: 0644
|
||||
|
||||
- name: Enable and check K3s service
|
||||
systemd:
|
||||
name: k3s
|
||||
daemon_reload: yes
|
||||
state: restarted
|
||||
enabled: yes
|
||||
|
||||
- name: Wait for node-token
|
||||
wait_for:
|
||||
path: "{{ k3s_server_location }}/server/node-token"
|
||||
|
||||
- name: Register node-token file access mode
|
||||
stat:
|
||||
path: "{{ k3s_server_location }}/server/node-token"
|
||||
register: p
|
||||
|
||||
- name: Change file access node-token
|
||||
file:
|
||||
path: "{{ k3s_server_location }}/server/node-token"
|
||||
mode: "g+rx,o+rx"
|
||||
|
||||
- name: Read node-token from master
|
||||
slurp:
|
||||
path: "{{ k3s_server_location }}/server/node-token"
|
||||
register: node_token
|
||||
|
||||
- name: Store Master node-token
|
||||
set_fact:
|
||||
token: "{{ node_token.content | b64decode | regex_replace('\n', '') }}"
|
||||
|
||||
- name: Restore node-token file access
|
||||
file:
|
||||
path: "{{ k3s_server_location }}/server/node-token"
|
||||
mode: "{{ p.stat.mode }}"
|
||||
|
||||
- name: Create directory .kube
|
||||
file:
|
||||
path: ~{{ ansible_user }}/.kube
|
||||
state: directory
|
||||
owner: "{{ ansible_user }}"
|
||||
mode: "u=rwx,g=rx,o="
|
||||
|
||||
- name: Copy config file to user home directory
|
||||
copy:
|
||||
src: /etc/rancher/k3s/k3s.yaml
|
||||
dest: ~{{ ansible_user }}/.kube/config
|
||||
remote_src: yes
|
||||
owner: "{{ ansible_user }}"
|
||||
mode: "u=rw,g=,o="
|
||||
|
||||
- name: Replace https://localhost:6443 by https://master-ip:6443
|
||||
command: >-
|
||||
k3s kubectl config set-cluster default
|
||||
--server=https://{{ master_ip }}:6443
|
||||
--kubeconfig ~{{ ansible_user }}/.kube/config
|
||||
changed_when: true
|
||||
|
||||
- name: Create kubectl symlink
|
||||
file:
|
||||
src: /usr/local/bin/k3s
|
||||
dest: /usr/local/bin/kubectl
|
||||
state: link
|
||||
|
||||
- name: Create crictl symlink
|
||||
file:
|
||||
src: /usr/local/bin/k3s
|
||||
dest: /usr/local/bin/crictl
|
||||
state: link
|
@@ -0,0 +1,24 @@
|
||||
[Unit]
|
||||
Description=Lightweight Kubernetes
|
||||
Documentation=https://k3s.io
|
||||
After=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=notify
|
||||
ExecStartPre=-/sbin/modprobe br_netfilter
|
||||
ExecStartPre=-/sbin/modprobe overlay
|
||||
ExecStart=/usr/local/bin/k3s server --data-dir {{ k3s_server_location }} {{ extra_server_args | default("") }}
|
||||
KillMode=process
|
||||
Delegate=yes
|
||||
# Having non-zero Limit*s causes performance problems due to accounting overhead
|
||||
# in the kernel. We recommend using cgroups to do container-local accounting.
|
||||
LimitNOFILE=1048576
|
||||
LimitNPROC=infinity
|
||||
LimitCORE=infinity
|
||||
TasksMax=infinity
|
||||
TimeoutStartSec=0
|
||||
Restart=always
|
||||
RestartSec=5s
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
@@ -0,0 +1,15 @@
|
||||
---
|
||||
- name: Copy K3s service file
|
||||
template:
|
||||
src: "k3s.service.j2"
|
||||
dest: "{{ systemd_dir }}/k3s-node.service"
|
||||
owner: root
|
||||
group: root
|
||||
mode: 0755
|
||||
|
||||
- name: Enable and check K3s service
|
||||
systemd:
|
||||
name: k3s-node
|
||||
daemon_reload: yes
|
||||
state: restarted
|
||||
enabled: yes
|
@@ -0,0 +1,24 @@
|
||||
[Unit]
|
||||
Description=Lightweight Kubernetes
|
||||
Documentation=https://k3s.io
|
||||
After=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=notify
|
||||
ExecStartPre=-/sbin/modprobe br_netfilter
|
||||
ExecStartPre=-/sbin/modprobe overlay
|
||||
ExecStart=/usr/local/bin/k3s agent --server https://{{ master_ip }}:6443 --token {{ hostvars[groups['serverctl_master_hosts'][0]]['token'] }} {{ extra_agent_args | default("") }}
|
||||
KillMode=process
|
||||
Delegate=yes
|
||||
# Having non-zero Limit*s causes performance problems due to accounting overhead
|
||||
# in the kernel. We recommend using cgroups to do container-local accounting.
|
||||
LimitNOFILE=1048576
|
||||
LimitNPROC=infinity
|
||||
LimitCORE=infinity
|
||||
TasksMax=infinity
|
||||
TimeoutStartSec=0
|
||||
Restart=always
|
||||
RestartSec=5s
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
@@ -0,0 +1,18 @@
|
||||
---
|
||||
- name: Enable IPv4 forwarding
|
||||
sysctl:
|
||||
name: net.ipv4.ip_forward
|
||||
value: "1"
|
||||
state: present
|
||||
reload: yes
|
||||
|
||||
- name: Enable IPv6 forwarding
|
||||
sysctl:
|
||||
name: net.ipv6.conf.all.forwarding
|
||||
value: "1"
|
||||
state: present
|
||||
reload: yes
|
||||
when: ansible_all_ipv6_addresses
|
||||
|
||||
- name: Wait for apt to unlock
|
||||
shell: while sudo fuser /var/lib/dpkg/lock >/dev/null >2&1; do sleep 5; done;
|
16
infrastructure/create-resources/ansible/server-install.yml
Normal file
16
infrastructure/create-resources/ansible/server-install.yml
Normal file
@@ -0,0 +1,16 @@
|
||||
- become: yes
|
||||
hosts: all
|
||||
name: server-install
|
||||
tasks:
|
||||
- name: Add the user 'kjuulh' and add it to 'sudo'
|
||||
user:
|
||||
name: kjuulh
|
||||
group: sudo
|
||||
- name:
|
||||
authorized_key:
|
||||
user: kjuulh
|
||||
state: present
|
||||
key: "{{ lookup('file', pub_key) }}"
|
||||
- name: Wait for apt to unlock
|
||||
become: yes
|
||||
shell: while sudo fuser /var/lib/dpkg/lock >/dev/null >2&1; do sleep 5; done;
|
16
infrastructure/create-resources/ansible/site.yml
Normal file
16
infrastructure/create-resources/ansible/site.yml
Normal file
@@ -0,0 +1,16 @@
|
||||
---
|
||||
- hosts: serverctl_cluster
|
||||
gather_facts: yes
|
||||
become: yes
|
||||
roles:
|
||||
- role: prereq
|
||||
- role: download
|
||||
- hosts: serverctl_master_hosts
|
||||
become: yes
|
||||
roles:
|
||||
- role: "./k3s/master"
|
||||
#- hosts: serverctl_node_hosts
|
||||
# become: yes
|
||||
# roles:
|
||||
# - role: "./k3s/node"
|
||||
#
|
83
infrastructure/create-resources/hcloud.tf
Normal file
83
infrastructure/create-resources/hcloud.tf
Normal file
@@ -0,0 +1,83 @@
|
||||
|
||||
variable "serverctl_master_count" {
|
||||
default = 0
|
||||
}
|
||||
|
||||
variable "serverctl_node_count" {
|
||||
default = 0
|
||||
}
|
||||
|
||||
|
||||
resource "hcloud_placement_group" "serverctl_master" {
|
||||
name = "serverctl_master_group"
|
||||
type = "spread"
|
||||
}
|
||||
|
||||
resource "hcloud_server" "serverctl_master" {
|
||||
count = var.serverctl_master_count
|
||||
name = "serverctl-master-${count.index}"
|
||||
image = "debian-11"
|
||||
server_type = "cx11"
|
||||
ssh_keys = [
|
||||
var.hcloud_serverctl_ssh_key_id
|
||||
]
|
||||
placement_group_id = hcloud_placement_group.serverctl_master.id
|
||||
|
||||
|
||||
lifecycle {
|
||||
create_before_destroy = true
|
||||
}
|
||||
|
||||
provisioner "remote-exec" {
|
||||
inline = ["sudo apt update", "sudo apt install python3 -y", "echo Done!"]
|
||||
|
||||
connection {
|
||||
host = self.ipv4_address
|
||||
type = "ssh"
|
||||
user = "root"
|
||||
private_key = file(var.pvt_key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resource "hcloud_placement_group" "serverctl_node" {
|
||||
name = "serverctl_node_group"
|
||||
type = "spread"
|
||||
}
|
||||
|
||||
resource "hcloud_server" "serverctl_node" {
|
||||
count = var.serverctl_node_count
|
||||
name = "serverctl-node-${count.index}"
|
||||
image = "debian-11"
|
||||
server_type = "cx11"
|
||||
ssh_keys = [
|
||||
var.hcloud_serverctl_ssh_key_id
|
||||
]
|
||||
placement_group_id = hcloud_placement_group.serverctl_node.id
|
||||
|
||||
|
||||
lifecycle {
|
||||
create_before_destroy = true
|
||||
}
|
||||
|
||||
provisioner "remote-exec" {
|
||||
inline = ["sudo apt update", "sudo apt install python3 -y", "echo Done!"]
|
||||
|
||||
connection {
|
||||
host = self.ipv4_address
|
||||
type = "ssh"
|
||||
user = "root"
|
||||
private_key = file(var.pvt_key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resource "local_file" "hosts_cfg" {
|
||||
content = templatefile("${path.module}/templates/hosts.tpl",
|
||||
{
|
||||
serverctl_masters = hcloud_server.serverctl_master.*.ipv4_address
|
||||
serverctl_nodes = hcloud_server.serverctl_node.*.ipv4_address
|
||||
}
|
||||
)
|
||||
filename = "ansible/inventory/hosts.cfg"
|
||||
}
|
@@ -1,43 +0,0 @@
|
||||
terraform {
|
||||
required_providers {
|
||||
hcloud = {
|
||||
source = "hetznercloud/hcloud"
|
||||
version = "1.32.2"
|
||||
}
|
||||
}
|
||||
|
||||
backend "s3" {
|
||||
bucket = "serverctl-terraform"
|
||||
key = "terraform.tfstate"
|
||||
|
||||
endpoint = "https://api.minio.front.kjuulh.io"
|
||||
|
||||
region = "main"
|
||||
|
||||
skip_credentials_validation = true
|
||||
skip_metadata_api_check = true
|
||||
skip_region_validation = true
|
||||
force_path_style = true
|
||||
}
|
||||
}
|
||||
|
||||
variable "hcloud_token" {
|
||||
sensitive = true
|
||||
}
|
||||
|
||||
provider "hcloud" {
|
||||
token = var.hcloud_token
|
||||
}
|
||||
|
||||
resource "hcloud_placement_group" "serverctl_master" {
|
||||
name = "serverctl_master_group"
|
||||
type = "spread"
|
||||
}
|
||||
|
||||
resource "hcloud_server" "serverctl_master" {
|
||||
count = 2
|
||||
name = "serverctl-master-${count.index}"
|
||||
image = "debian-11"
|
||||
server_type = "cx11"
|
||||
placement_group_id = hcloud_placement_group.serverctl_master.id
|
||||
}
|
35
infrastructure/create-resources/provider.tf
Normal file
35
infrastructure/create-resources/provider.tf
Normal file
@@ -0,0 +1,35 @@
|
||||
terraform {
|
||||
required_providers {
|
||||
hcloud = {
|
||||
source = "hetznercloud/hcloud"
|
||||
version = "1.32.2"
|
||||
}
|
||||
}
|
||||
|
||||
backend "s3" {
|
||||
bucket = "serverctl-terraform"
|
||||
key = "terraform.tfstate"
|
||||
|
||||
endpoint = "https://api.minio.front.kjuulh.io"
|
||||
|
||||
region = "main"
|
||||
|
||||
skip_credentials_validation = true
|
||||
skip_metadata_api_check = true
|
||||
skip_region_validation = true
|
||||
force_path_style = true
|
||||
}
|
||||
}
|
||||
|
||||
variable "hcloud_token" {
|
||||
sensitive = true
|
||||
}
|
||||
|
||||
provider "hcloud" {
|
||||
token = var.hcloud_token
|
||||
}
|
||||
|
||||
|
||||
variable "hcloud_serverctl_ssh_key_id" {}
|
||||
variable "pvt_key" {}
|
||||
variable "pub_key" {}
|
6
infrastructure/create-resources/setup-terraform.sh
Executable file
6
infrastructure/create-resources/setup-terraform.sh
Executable file
@@ -0,0 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
export $(grep -v "^#" .env | xargs)
|
||||
|
||||
terraform init -backend-config="access_key=$ACCESS_KEY" -backend-config="secret_key=$SECRET_KEY"
|
||||
|
13
infrastructure/create-resources/templates/hosts.tpl
Normal file
13
infrastructure/create-resources/templates/hosts.tpl
Normal file
@@ -0,0 +1,13 @@
|
||||
[serverctl_master_hosts]
|
||||
%{ for ip in serverctl_masters ~}
|
||||
${ip}
|
||||
%{ endfor ~}
|
||||
|
||||
[serverctl_node_hosts]
|
||||
%{ for ip in serverctl_nodes ~}
|
||||
${ip}
|
||||
%{ endfor ~}
|
||||
|
||||
[serverctl_cluster:children]
|
||||
serverctl_master_hosts
|
||||
serverctl_node_hosts
|
BIN
infrastructure/ssh_keys.zip
Normal file
BIN
infrastructure/ssh_keys.zip
Normal file
Binary file not shown.
7
infrastructure/unzip-ssh-keys.sh
Executable file
7
infrastructure/unzip-ssh-keys.sh
Executable file
@@ -0,0 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
ZIP_KEY=$1
|
||||
|
||||
unzip -P "$ZIP_KEY" ssh_keys.zip
|
||||
|
||||
echo "unzip done!"
|
12
infrastructure/zip-ssh-keys.sh
Executable file
12
infrastructure/zip-ssh-keys.sh
Executable file
@@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
ZIP_KEY=$(openssl rand -hex 30)
|
||||
|
||||
mkdir -p ssh_keys/
|
||||
|
||||
cp -f ~/.ssh/id_ed25519* ssh_keys/
|
||||
|
||||
zip -r --password $ZIP_KEY ssh_keys.zip ssh_keys/
|
||||
|
||||
echo "zip done!"
|
||||
echo "Zip key: $ZIP_KEY"
|
@@ -4,6 +4,7 @@ import (
|
||||
"serverctl/pkg/api"
|
||||
"serverctl/pkg/infrastructure"
|
||||
"serverctl/pkg/infrastructure/dependencies"
|
||||
"serverctl/pkg/infrastructure/queue"
|
||||
)
|
||||
|
||||
// Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc).
|
||||
@@ -11,6 +12,12 @@ func Run() {
|
||||
d := dependencies.New()
|
||||
d.Logger.Info("Starting serverctl")
|
||||
|
||||
queue.NewRabbitMQ(d.Logger, &queue.RabbitMqConfig{
|
||||
Username: "serverctl",
|
||||
Password: "serverctlsecret",
|
||||
Host: "rabbitmq",
|
||||
Port: 5672,
|
||||
})
|
||||
// if development add seed data
|
||||
infrastructure.AddSeedData(d.Database, d.Logger)
|
||||
|
||||
|
@@ -18,7 +18,6 @@ require (
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/eko/gocache v1.2.0 // indirect
|
||||
github.com/georgysavva/scany v0.3.0 // indirect
|
||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/gin-gonic/gin v1.7.7 // indirect
|
||||
github.com/go-co-op/gocron v1.12.0 // indirect
|
||||
@@ -37,7 +36,6 @@ require (
|
||||
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||
github.com/jackc/pgtype v1.10.0 // indirect
|
||||
github.com/jackc/pgx v3.6.2+incompatible // indirect
|
||||
github.com/jackc/pgx/v4 v4.15.0 // indirect
|
||||
github.com/jackc/puddle v1.2.1 // indirect
|
||||
github.com/json-iterator/go v1.1.10 // indirect
|
||||
@@ -54,6 +52,7 @@ require (
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.18.0 // indirect
|
||||
github.com/prometheus/procfs v0.6.0 // indirect
|
||||
github.com/rabbitmq/amqp091-go v1.3.0 // indirect
|
||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/spf13/cast v1.3.1 // indirect
|
||||
|
@@ -793,6 +793,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
|
||||
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
|
||||
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM=
|
||||
github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
@@ -846,6 +848,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
|
||||
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
|
||||
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
|
||||
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
|
4
services/entry/pkg/infrastructure/queue/queue.go
Normal file
4
services/entry/pkg/infrastructure/queue/queue.go
Normal file
@@ -0,0 +1,4 @@
|
||||
package queue
|
||||
|
||||
type Queue interface {
|
||||
}
|
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
@@ -0,0 +1,217 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
import amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
type RabbitMQ struct {
|
||||
logger *zap.Logger
|
||||
config *RabbitMqConfig
|
||||
conn *amqp.Connection
|
||||
}
|
||||
|
||||
var _ Queue = RabbitMQ{}
|
||||
|
||||
type RabbitMqConfig struct {
|
||||
Username string
|
||||
Password string
|
||||
Host string
|
||||
Port int
|
||||
}
|
||||
|
||||
func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue {
|
||||
|
||||
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port))
|
||||
if err != nil {
|
||||
logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
//sendBasic(logger, config, conn)
|
||||
//receiveBasic(logger, config, conn)
|
||||
sendPublishingBasic(logger, config, conn)
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
receivePublishingBasic(logger, config, conn)
|
||||
}
|
||||
//sendMany(logger, config, conn)
|
||||
|
||||
return &RabbitMQ{
|
||||
logger: logger,
|
||||
config: config,
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
body := "Hello world!"
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
counter := 0
|
||||
for {
|
||||
body := fmt.Sprintf("message nr: %d", counter)
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
counter += 1
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(q.Name,
|
||||
"",
|
||||
true,
|
||||
false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
|
||||
for d := range msgs {
|
||||
logger.Info("Received msg", zap.String("body", string(d.Body)))
|
||||
}
|
||||
}()
|
||||
}
|
||||
func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
body := "Hello world!"
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
counter := 0
|
||||
for {
|
||||
body := fmt.Sprintf("message nr: %d", counter)
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
counter += 1
|
||||
}
|
||||
}()
|
||||
}
|
||||
func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
counter := 0
|
||||
for {
|
||||
body := fmt.Sprintf("message nr: %d", counter)
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
counter += 1
|
||||
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
}
|
||||
}()
|
||||
}
|
||||
func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ch.Qos(1, 0, false)
|
||||
|
||||
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(q.Name,
|
||||
"",
|
||||
false,
|
||||
false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
|
||||
for d := range msgs {
|
||||
logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body)))
|
||||
t := time.Duration(len(d.Body))
|
||||
time.Sleep(t * time.Second)
|
||||
logger.Info("Received msg: Done")
|
||||
d.Ack(false)
|
||||
}
|
||||
}()
|
||||
}
|
Reference in New Issue
Block a user